Get Started with Hugging Face Accelerate
Contents
Get Started with Hugging Face Accelerate#
The TorchTrainer can help you easily launch your Accelerate training across a distributed Ray cluster.
You only need to run your existing training code with a TorchTrainer. You can expect the final code to look like this:
from accelerate import Accelerator
def train_func(config):
# Instantiate the accelerator
accelerator = Accelerator(...)
model = ...
optimizer = ...
train_dataloader = ...
eval_dataloader = ...
lr_scheduler = ...
# Prepare everything for distributed training
(
model,
optimizer,
train_dataloader,
eval_dataloader,
lr_scheduler,
) = accelerator.prepare(
model, optimizer, train_dataloader, eval_dataloader, lr_scheduler
)
# Start training
...
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig
trainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(...),
...
)
trainer.fit()
Tip
Model and data preparation for distributed training is completely handled by the Accelerator object and its Accelerator.prepare() method.
Unlike with native PyTorch, PyTorch Lightning, or Hugging Face Transformers, don’t call any additional Ray Train utilities
like prepare_model() or prepare_data_loader() in your training function.
Configure Accelerate#
In Ray Train, you can set configurations through the accelerate.Accelerator object in your training function. Below are starter examples for configuring Accelerate.
For example, to run DeepSpeed with Accelerate, create a DeepSpeedPlugin from a dictionary:
from accelerate import Accelerator, DeepSpeedPlugin
DEEPSPEED_CONFIG = {
"fp16": {
"enabled": True
},
"zero_optimization": {
"stage": 3,
"offload_optimizer": {
"device": "cpu",
"pin_memory": False
},
"overlap_comm": True,
"contiguous_gradients": True,
"reduce_bucket_size": "auto",
"stage3_prefetch_bucket_size": "auto",
"stage3_param_persistence_threshold": "auto",
"gather_16bit_weights_on_model_save": True,
"round_robin_gradients": True
},
"gradient_accumulation_steps": "auto",
"gradient_clipping": "auto",
"steps_per_print": 10,
"train_batch_size": "auto",
"train_micro_batch_size_per_gpu": "auto",
"wall_clock_breakdown": False
}
def train_func(config):
# Create a DeepSpeedPlugin from config dict
ds_plugin = DeepSpeedPlugin(hf_ds_config=DEEPSPEED_CONFIG)
# Initialize Accelerator
accelerator = Accelerator(
...,
deepspeed_plugin=ds_plugin,
)
# Start training
...
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig
trainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(...),
...
)
trainer.fit()
For PyTorch FSDP, create a FullyShardedDataParallelPlugin and pass it to the Accelerator.
from torch.distributed.fsdp.fully_sharded_data_parallel import FullOptimStateDictConfig, FullStateDictConfig
from accelerate import Accelerator, FullyShardedDataParallelPlugin
def train_func(config):
fsdp_plugin = FullyShardedDataParallelPlugin(
state_dict_config=FullStateDictConfig(
offload_to_cpu=False,
rank0_only=False
),
optim_state_dict_config=FullOptimStateDictConfig(
offload_to_cpu=False,
rank0_only=False
)
)
# Initialize accelerator
accelerator = Accelerator(
...,
fsdp_plugin=fsdp_plugin,
)
# Start training
...
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig
trainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(...),
...
)
trainer.fit()
Note that Accelerate also provides a CLI tool, "accelerate config", to generate a configuration and launch your training
job with "accelerate launch". However, it’s not necessary here because Ray’s TorchTrainer already sets up the Torch
distributed environment and launches the training function on all workers.
Next, see these end-to-end examples below for more details:
Show Code
"""
Minimal Ray Train + Accelerate example adapted from
https://github.com/huggingface/accelerate/blob/main/examples/nlp_example.py
Fine-tune a BERT model with Hugging Face Accelerate and Ray Train and Ray Data
"""
import evaluate
import torch
from accelerate import Accelerator
from datasets import load_dataset
from tempfile import TemporaryDirectory
from torch.optim import AdamW
from transformers import (
AutoModelForSequenceClassification,
AutoTokenizer,
get_linear_schedule_with_warmup,
set_seed,
)
import ray
import ray.train
from ray.train import DataConfig, ScalingConfig, Checkpoint
from ray.train.torch import TorchTrainer
def train_func(config):
"""Your training function that will be launched on each worker."""
# Unpack training configs
lr = config["lr"]
seed = config["seed"]
num_epochs = config["num_epochs"]
train_batch_size = config["train_batch_size"]
eval_batch_size = config["eval_batch_size"]
train_ds_size = config["train_dataset_size"]
set_seed(seed)
# Initialize accelerator
accelerator = Accelerator()
# Load datasets and metrics
metric = evaluate.load("glue", "mrpc")
# Prepare Ray Data loaders
# ====================================================
train_ds = ray.train.get_dataset_shard("train")
eval_ds = ray.train.get_dataset_shard("validation")
tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")
def collate_fn(batch):
outputs = tokenizer(
list(batch["sentence1"]),
list(batch["sentence2"]),
truncation=True,
padding="longest",
return_tensors="pt",
)
outputs["labels"] = torch.LongTensor(batch["label"])
outputs = {k: v.to(accelerator.device) for k, v in outputs.items()}
return outputs
train_dataloader = train_ds.iter_torch_batches(
batch_size=train_batch_size, collate_fn=collate_fn
)
eval_dataloader = eval_ds.iter_torch_batches(
batch_size=eval_batch_size, collate_fn=collate_fn
)
# ====================================================
# Instantiate the model, optimizer, lr_scheduler
model = AutoModelForSequenceClassification.from_pretrained(
"bert-base-cased", return_dict=True
)
optimizer = AdamW(params=model.parameters(), lr=lr)
steps_per_epoch = train_ds_size // (accelerator.num_processes * train_batch_size)
lr_scheduler = get_linear_schedule_with_warmup(
optimizer=optimizer,
num_warmup_steps=100,
num_training_steps=(steps_per_epoch * num_epochs),
)
# Prepare everything with accelerator
model, optimizer, lr_scheduler = accelerator.prepare(model, optimizer, lr_scheduler)
for epoch in range(num_epochs):
# Training
model.train()
for batch in train_dataloader:
outputs = model(**batch)
loss = outputs.loss
accelerator.backward(loss)
optimizer.step()
lr_scheduler.step()
optimizer.zero_grad()
# Evaluation
model.eval()
for batch in eval_dataloader:
with torch.no_grad():
outputs = model(**batch)
predictions = outputs.logits.argmax(dim=-1)
predictions, references = accelerator.gather_for_metrics(
(predictions, batch["labels"])
)
metric.add_batch(
predictions=predictions,
references=references,
)
eval_metric = metric.compute()
accelerator.print(f"epoch {epoch}:", eval_metric)
# Report Checkpoint and metrics to Ray Train
# ==========================================
with TemporaryDirectory() as tmpdir:
if accelerator.is_main_process:
unwrapped_model = accelerator.unwrap_model(model)
accelerator.save(unwrapped_model, f"{tmpdir}/ckpt_{epoch}.bin")
checkpoint = Checkpoint.from_directory(tmpdir)
else:
checkpoint = None
ray.train.report(metrics=eval_metric, checkpoint=checkpoint)
if __name__ == "__main__":
config = {
"lr": 2e-5,
"num_epochs": 3,
"seed": 42,
"train_batch_size": 16,
"eval_batch_size": 32,
}
# Prepare Ray Datasets
hf_datasets = load_dataset("glue", "mrpc")
ray_datasets = {
"train": ray.data.from_huggingface(hf_datasets["train"]),
"validation": ray.data.from_huggingface(hf_datasets["validation"]),
}
config["train_dataset_size"] = ray_datasets["train"].count()
trainer = TorchTrainer(
train_func,
train_loop_config=config,
datasets=ray_datasets,
dataset_config=DataConfig(datasets_to_split=["train", "validation"]),
scaling_config=ScalingConfig(num_workers=4, use_gpu=True),
)
result = trainer.fit()
Show Code
"""
Minimal Ray Train + Accelerate example adapted from
https://github.com/huggingface/accelerate/blob/main/examples/nlp_example.py
Fine-tune a BERT model with Hugging Face Accelerate and Ray Train
"""
import evaluate
import torch
from accelerate import Accelerator
from datasets import load_dataset
from tempfile import TemporaryDirectory
from torch.optim import AdamW
from torch.utils.data import DataLoader
from transformers import (
AutoModelForSequenceClassification,
AutoTokenizer,
get_linear_schedule_with_warmup,
set_seed,
)
import ray.train
from ray.train import ScalingConfig, Checkpoint
from ray.train.torch import TorchTrainer
def train_func(config):
"""Your training function that will be launched on each worker."""
# Unpack training configs
lr = config["lr"]
seed = config["seed"]
num_epochs = config["num_epochs"]
train_batch_size = config["train_batch_size"]
eval_batch_size = config["eval_batch_size"]
set_seed(seed)
# Initialize accelerator
accelerator = Accelerator()
# Load datasets and metrics
metric = evaluate.load("glue", "mrpc")
# Prepare PyTorch DataLoaders
# ====================================================
hf_datasets = load_dataset("glue", "mrpc")
tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")
def collate_fn(batch):
outputs = tokenizer(
[sample["sentence1"] for sample in batch],
[sample["sentence2"] for sample in batch],
truncation=True,
padding="longest",
return_tensors="pt",
)
outputs["labels"] = torch.LongTensor([sample["label"] for sample in batch])
outputs = {k: v.to(accelerator.device) for k, v in outputs.items()}
return outputs
# Instantiate dataloaders.
train_dataloader = DataLoader(
hf_datasets["train"],
shuffle=True,
collate_fn=collate_fn,
batch_size=train_batch_size,
drop_last=True,
)
eval_dataloader = DataLoader(
hf_datasets["validation"],
shuffle=False,
collate_fn=collate_fn,
batch_size=eval_batch_size,
drop_last=True,
)
# ====================================================
# Instantiate the model, optimizer, lr_scheduler
model = AutoModelForSequenceClassification.from_pretrained(
"bert-base-cased", return_dict=True
)
optimizer = AdamW(params=model.parameters(), lr=lr)
steps_per_epoch = len(train_dataloader)
lr_scheduler = get_linear_schedule_with_warmup(
optimizer=optimizer,
num_warmup_steps=100,
num_training_steps=(steps_per_epoch * num_epochs),
)
# Prepare everything with accelerator
(
model,
optimizer,
train_dataloader,
eval_dataloader,
lr_scheduler,
) = accelerator.prepare(
model, optimizer, train_dataloader, eval_dataloader, lr_scheduler
)
for epoch in range(num_epochs):
# Training
model.train()
for batch in train_dataloader:
outputs = model(**batch)
loss = outputs.loss
accelerator.backward(loss)
optimizer.step()
lr_scheduler.step()
optimizer.zero_grad()
# Evaluation
model.eval()
for batch in eval_dataloader:
with torch.no_grad():
outputs = model(**batch)
predictions = outputs.logits.argmax(dim=-1)
predictions, references = accelerator.gather_for_metrics(
(predictions, batch["labels"])
)
metric.add_batch(
predictions=predictions,
references=references,
)
eval_metric = metric.compute()
accelerator.print(f"epoch {epoch}:", eval_metric)
# Report Checkpoint and metrics to Ray Train
# ==========================================
with TemporaryDirectory() as tmpdir:
if accelerator.is_main_process:
unwrapped_model = accelerator.unwrap_model(model)
accelerator.save(unwrapped_model, f"{tmpdir}/ckpt_{epoch}.bin")
checkpoint = Checkpoint.from_directory(tmpdir)
else:
checkpoint = None
ray.train.report(metrics=eval_metric, checkpoint=checkpoint)
if __name__ == "__main__":
config = {
"lr": 2e-5,
"num_epochs": 3,
"seed": 42,
"train_batch_size": 16,
"eval_batch_size": 32,
}
trainer = TorchTrainer(
train_func,
train_loop_config=config,
scaling_config=ScalingConfig(num_workers=4, use_gpu=True),
)
result = trainer.fit()
See also
If you’re looking for more advanced use cases, check out this Llama-2 fine-tuning example:
You may also find these user guides helpful:
AccelerateTrainer Migration Guide#
Before Ray 2.7, Ray Train’s AccelerateTrainer API was the
recommended way to run Accelerate code. As a subclass of TorchTrainer,
the AccelerateTrainer takes in a configuration file generated by accelerate config and applies it to all workers.
Aside from that, the functionality of AccelerateTrainer is identical to TorchTrainer.
However, this caused confusion around whether this was the only way to run Accelerate code.
Because you can express the full Accelerate functionality with the Accelerator and TorchTrainer combination, the plan is to deprecate the AccelerateTrainer in Ray 2.8,
and it’s recommend to run your Accelerate code directly with TorchTrainer.