Multi-GPU-Training mit Pytorch
Um die Rechenleistung weiter zu steigern, lassen sich AIME Server mit bis zu acht GPUs ausrüsten und kürzestmögliche Durchlaufzeiten erreichen. Um die volle Leistung von AIME-Maschinen zu nutzen, ist es wichtig sicherzustellen, dass alle installierten GPUs am Training effektiv teilnehmen.
Der folgende Artikel erklärt, wie man ein Modell mittels PyTorch effektiv mit mehreren GPUs trainiert. Der erste Teil befasst sich mit dem einfacheren DataParallel
-Ansatz, der wenig effektiv nur einen einzigen Prozess nutzt. Der zweite Teil erklärt den effektiveren Lösungsweg, der unter Verwendung von DistributedDataParallel
mittels Nutzung mehrerer paralleler Prozesse eine bessere Leistung bietet.
Multi-GPU-Training mittels DataParallel
in einem einzigen Prozess
Der einfachste Weg, alle installierten GPUs mit PyTorch zu nutzen, ist die Verwendung der Funktion DataParallel
aus dem PyTorch-Modul torch.nn.parallel
. Dies kann fast auf die gleiche Weise wie bei einem Ein-GPU-Training erfolgen. Nachdem das Modell initialisiert wurde, passen Sie es wie in der folgenden Zeile dargestellt an:
model = torch.nn.parallel.DataParallel(model, device_ids=list(range(<num_gpus>)), dim=0)
wobei <num_gpus>
die Anzahl der zu nutzenden GPUs darstellt.
Beachten Sie, dass die im Dataloader verwendete Batchgröße der globalen Batchgröße aller GPUs entspricht. Wenn Sie also die lokale Batchgröße jeder GPU verwenden möchten, müssen Sie sie mit der Anzahl der GPUs multiplizieren.
Im Folgenden wird ein voll funktionsfähiges Beispiel für ein Multi-GPU-Training mit einem resnet50-Modell aus der Torchvision-Bibliothek unter Verwendung von DataParallel
gelistet:
#!/usr/bin/env python3
from pathlib import Path
import torch
import torchvision
def load_data(num_gpus):
transforms = torchvision.transforms.Compose([
torchvision.transforms.Resize(256),
torchvision.transforms.CenterCrop(224),
torchvision.transforms.ToTensor(),
torchvision.transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
dataset = torchvision.datasets.ImageFolder(root=, transform=transforms)
dataloader = torch.utils.data.DataLoader(
dataset=dataset,
batch_size=64,
shuffle=False,
num_workers=4*num_gpus
)
return dataloader
def save_model(epoch, model, optimizer):
"""Saves model checkpoint on given epoch with given data name.
"""
checkpoint_folder = Path.cwd() / 'model_checkpoints'
if not checkpoint_folder.is_dir():
checkpoint_folder.mkdir()
file = checkpoint_folder / f'epoch_{epoch}.pt'
if not file.is_file():
file.touch()
torch.save(
{
'epoch': epoch,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
},
file
)
return True
def load_model(epoch, model, optimizer):
"""Loads model state from file.
"""
file = Path.cwd() / 'model_checkpoints' / f'epoch_{epoch}.pt'
checkpoint = torch.load(file)
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
return model, optimizer
def run_training(num_gpus):
model = torchvision.models.resnet50(pretrained=False)
model = model.cuda()
model = torch.nn.parallel.DataParallel(model, device_ids=list(range(num_gpus)), dim=0)
optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.9, weight_decay=1e-4)
criterion = torch.nn.CrossEntropyLoss()
criterion.cuda()
model.train()
num_epochs = 30
dataloader = load_data(num_gpus)
total_steps = len(dataloader)
for epoch in range(1, num_epochs):
print(f'\nEpoch {epoch}\n')
if epoch > 1:
model, optimizer = load_model(epoch-1, model, optimizer)
for step, (images, labels) in enumerate(dataloader, 1):
images, labels = images.cuda(), labels.cuda()
optimizer.zero_grad()
outputs = model(images)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
if step % 10 == 0:
print(f'Epoch [{epoch} / {num_epochs}], Step [{step} / {total_steps}], Loss: {loss.item():.4f}')
save_model(epoch, model, optimizer)
if __name__ == "__main__":
num_gpus = torch.cuda.device_count()
print('num_gpus: ', num_gpus)
run_training(num_gpus)
Die Nutzung von DataParallel
ist zwar recht unkompliziert ist und die hierdurch erreichte Leistungssteigerung im Vergleich zu einer einzelnen GPU bereits sichtbar, jedoch lässt sich die Performance noch weiter verbessern, da alle Berechnungen im selben Prozess ablaufen und die verfügbaren GPUs nicht voll ausgelastet werden. Für eine höhere Leistungsausbeute benötigt man mehrere parallele Prozesse, idealerweise jeweils einen eigenen Prozess pro GPU. Dies lässt sich unter Nutzung der Funktion DistributedDataParallel
erreichen, die im folgenden Abschnitt detailliert erklärt wird. Die folgende Tabelle zeigt einen Vergleich der Trainings-Performance beider Methoden gemessen mit und unserem Benchmark-Tool https://github.com/aime-team/pytorch-benchmarks. Hier konnte die Trainings-Performance von 'DistributedDataParallel' im Vergleich zu 'DataParallel' um bis zu 17% gesteigert werden.
Anzahl der GPUs | Bilder pro Sekunde mit Data Parallel | Bilder pro Sekunde mit Distributed Data Parallel |
---|---|---|
1x NVIDIA RTX 3090 | 473 | - |
2x NVIDIA RTX 3090 | 883 | 944 |
4x NVIDIA RTX 3090 | 1526 | 1788 |
Multi-GPU-Training mit mehreren Prozessen (DistributedDataParallel
)
Die in PyTorch verfügbare Funktion DistributedDataParallel
aus dem Modul torch.nn.parallel
ist in der Lage, das Training auf alle GPUs zu verteilen, wobei ein Subprozess pro GPU jeweils ihre volle Kapazität ausnutzt. Im Vergleich zu DataParallel
sind jedoch recht viele zusätzliche Arbeitsschritte erforderlich. Als erstes müssen die Umgebungsvariablen master address und master port mit den folgenden Zeilen gesetzt werden:
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
Der Master-Port kann auf eine beliebige Nummer geändert werden.
Dann wird mit der spawn
-Methode aus dem Modul torch.multiprocessing für jede GPU ein eigener Prozess erzeugt:
torch.multiprocessing.spawn(
run_training_process_on_given_gpu,
args=(args, ),
nprocs=<num_gpus>,
join=True)
Die Funktion run_training_process_on_given_gpu
muss den gesamten Trainingscode für jede GPU mit ihren Argumenten args
(ohne Rang) als Tupel enthalten. nprocs
ist die Anzahl der Prozesse, die gespawnt werden sollen (z.B. die Anzahl der GPUs). Bei der Implementierung der Funktion run_training_process_on_given_gpu
muss das erste Positionsargument dem Rang des Prozesses entsprechen. Die Spawn-Methode initialisiert nun nprocs
-Prozesse. Das Positionsargument rank
wird von den nprocs
-Prozessen automatisch ab Rang 0 aufsteigend gesetzt. In jedem Prozess ist nun eine Prozessgruppe mit der Methode init_process_group
aus dem Modul torch.distributed
zu initialisieren:
torch.distributed.init_process_group(backend=<backend>, rank=rank, world_size=<num_gpus>, init_method='env://')
Übliche Werte für <backend>
sind 'gloo'
und 'nccl'
, wobei 'nccl'
für Multi-GPU-Training empfohlen wird. Weitere Details zu Backends für verteiltes Training finden Sie unter https://pytorch.org/docs/stable/distributed.html. Die world_size
entspricht hier der Anzahl der GPUs. Die Initialisierungsmethode 'env://'
zieht alle benötigten Informationen aus der Umgebung. Der Rang meint hier den Rang der GPU, der durch die spawn
-Methode weiter oben für jede GPU festgelegt wurde.
Jetzt muss das Modell mit der folgenden Zeile für das verteilte Training vorbereitet werden. Die folgenden Schritte müssen in jedem Prozess mit dem angegebenen Rang durchgeführt werden:
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[rank])
Anschließend wird Ihr Modell und Ihre LossFunction (criterion) in den Speicher jeder einzelnen GPU geladen:
torch.cuda.set_device(rank)
model.cuda(rank)
criterion = torch.nn.CrossEntropyLoss()
criterion.cuda(rank)
Der nächste Schritt besteht darin, den Dataloader für das verteilte Training vorzubereiten. Stellen Sie zunächst die Parameter für die Transformation der Daten in das Modell ein:
transforms = torchvision.transforms.Compose([
torchvision.transforms.Resize(256),
torchvision.transforms.CenterCrop(224),
torchvision.transforms.ToTensor(),
torchvision.transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]),])
Initialisieren Sie dann den Datensatz, z.B. mit der Methode ImageFolder()
aus dem Modul torchvision.datasets
:
dataset = torchvision.datasets.ImageFolder(root=<image_destination>, transform=transforms)
Dann müssen wir den DistributedSampler
aus dem Modul torch.utils.data.distributed
initialisieren.
sampler = torch.utils.data.distributed.DistributedSampler(dataset)
Nun können wir den Dataloader aus dem Modul torch.utils.data
mit dem angegebenen Datensatz und Sampler initialisieren:
dataloader = torch.utils.data.DataLoader(dataset=dataset, batch_size=<batch_size>, shuffle=False, num_workers=4*<num_gpus>, pin_memory=True, sampler=sampler)
Um die höchste Leistung zu erzielen, wird empfohlen, die Anzahl der Worker als vierfachen Wert der GPU-Anzahl festzulegen. Eine weitere Leistungssteigerung kann erreicht werden, indem im Dataloader pin_memory=True
in Kombination mit non_blocking=True
gesetzt wird, während die Daten mit dem cuda()-Aufruf in den GPU-Speicher verschoben werden (siehe unten). Im Gegensatz zu einem Einzel-GPU- oder DataParallel
-Training muss das shuffle
-Argument auf False
gesetzt werden, da der Sampler bereits das Mischen der Daten übernimmt. Hier entspricht die batch_size der lokalen Batchgröße jeder GPU und nicht der globalen Batchgröße, wie in einer DataParallel
-Anwendung.
Jetzt sind das Modell und der Dataloader bereit für das verteilte Training.
Die letzte Änderung für das Multi-GPU-Training erfolgt in der Trainingsschleife. Die Daten müssen nach jedem Schritt mit dem Befehl cuda()
in den Speicher der GPUs verschoben werden:
for step, (data, label) in enumerate(dataloader):
data, label = data.cuda(rank, non_blocking=True), label.cuda(rank, non_blocking=True)
Wenn das Argument non_blocking
auf True
gesetzt wird, wartet der Dataloader mit dem nächsten Befehl nicht bis die Daten in den Speicher der GPU geladen wurden. pin_memory=True
bedeutet der Datentransport geschieht in einem fest definierten Bereich, der für andere Aufgaben blockiert wird. Die Kombination aus beiden sorgt für einen weiteren Leistungsschub.
Modell speichern
Das trainierte Modell speichert man mit folgendem Befehl auf dem Festspeicher:
torch.save(
{
'epoch': epoch,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
},
file)
mit file
als String oder PosixPath, der den Dateispeicherort der Checkpoint-Datei enthält. Es reicht aus, den Checkpoint nur von einer GPU (hier mit Rang 0) zu speichern, da die Informationen aller anderen GPUs bereits enthalten sind. Die Checkpoints der restlichen GPUs ebenfalls zu speichern und dabei aufeinander warten zu lassen ist also überflüssig.
Modell laden
Um einen gespeicherten Checkpoint Ihres Modells in alle beteiligten Prozesse zu laden, verwenden Sie die folgende Befehle:
torch.distributed.barrier()
map_location = {'cuda:0': f'cuda:{rank}'}
checkpoint = torch.load(file, map_location=map_location)
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
mit file
als String oder PosixPath, der den Dateispeicherort der Checkpoint-Datei enthält. Der Befehl torch.distributed.barrier()
sorgt dafür, dass die Prozesse synchronisiert werden und map_location
kümmert sich um die Verteilung über alle Prozesse.
Code-Zusammenfassung
Als Zusammenfassung wird im Folgenden ein Beispiel für ein voll funktionsfähiges Multi-GPU-Training eines resnet50-Modells aus der Torchvision-Bibliothek mittels der DistributedDataParallel
-Methode gelistet:
#!/usr/bin/env python3
import sys
import os
from pathlib import Path
import torch
import torchvision
def save_model(epoch, model, optimizer):
"""Saves model checkpoint on given epoch with given data name.
"""
checkpoint_folder = Path.cwd() / 'model_checkpoints'
if not checkpoint_folder.is_dir():
checkpoint_folder.mkdir()
file = checkpoint_folder / f'epoch_{epoch}.pt'
if not file.is_file():
file.touch()
torch.save(
{
'epoch': epoch,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
},
file
)
return True
def load_model(rank, epoch, model, optimizer):
"""Loads model state from file to the GPU with given rank.
"""
torch.distributed.barrier()
map_location = {'cuda:0': f'cuda:{rank}'}
file = Path.cwd() / 'model_checkpoints' / f'epoch_{epoch}.pt'
checkpoint = torch.load(file, map_location=map_location)
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
return model, optimizer
def load_data(num_gpus):
transforms = torchvision.transforms.Compose([
torchvision.transforms.Resize(256),
torchvision.transforms.CenterCrop(224),
torchvision.transforms.ToTensor(),
torchvision.transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
dataset = torchvision.datasets.ImageFolder(root=, transform=transforms)
sampler = torch.utils.data.distributed.DistributedSampler(dataset)
dataloader = torch.utils.data.DataLoader(
dataset=dataset,
batch_size=64,
shuffle=False,
num_workers=4*num_gpus,
pin_memory=True,
sampler=sampler
)
return dataloader
def run_training_process_on_given_gpu(rank, num_gpus):
torch.cuda.set_device(rank)
torch.distributed.init_process_group(backend='nccl', rank=rank,
world_size=num_gpus, init_method='env://')
model = torchvision.models.resnet50(pretrained=False)
model = model.cuda(rank)
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[rank])
optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.9, weight_decay=1e-4)
criterion = torch.nn.CrossEntropyLoss()
criterion.cuda(rank)
model.train()
num_epochs = 30
dataloader = load_data(num_gpus)
total_steps = len(dataloader)
for epoch in range(1, num_epochs):
if rank == 0:
print(f'\nEpoch {epoch}\n')
if epoch > 1:
model, optimizer = load_model(rank, epoch-1, model, optimizer)
for step, (images, labels) in enumerate(dataloader, 1):
images, labels = images.cuda(rank, non_blocking=True), labels.cuda(rank, non_blocking=True)
optimizer.zero_grad()
outputs = model(images)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
if step % 10 == 0:
if rank == 0:
print(f'Epoch [{epoch} / {num_epochs}], Step [{step} / {total_steps}], Loss: {loss.item():.4f}')
if rank == 0:
save_model(epoch, model, optimizer)
torch.distributed.destroy_process_group()
if __name__ == "__main__":
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
num_gpus = torch.cuda.device_count()
print('num_gpus: ', num_gpus)
torch.multiprocessing.spawn(run_training_process_on_given_gpu, args=(num_gpus, ), nprocs=num_gpus, join=True)
Um die volle Kapazität aller beteiligten GPUs zu nutzen, muss also, wie gezeigt, das Modul DistributedDataParallel
verwendet werden. Mit unserem Bechmark-Tool https://github.com/aime-team/pytorch-benchmarks, der den Code beider Methoden enthält, lassen sich die Performance-Unterschiede von DistributedDataParallel
und DataParallel
vergleichen.