Experiment on PyTorch Lightning and Catalyst- the high level frameworks for PyTorch
Note: this article is like a condensed version of my own experiment and experience, it does not systematically give a detail for what are the frameworks and how to start using them, better for you to explore them and read the documentation or example from official sites or other articles.
Note: this is also not a comparison article between the frameworks, but much rather, provide a working approach that worked for me, for doing something with the frameworks.
Why I am writing this and where I am from
I used to write my own training code and participated in creating a deep learning framework (just for the team to use).
Writing training code is always tedious, the standard code normally would be like:
# prepare data and data loader
# ...For each epoch:
For each batch:
output = model(batch)
loss = criterion(output, target)
loss.backward()
optimizer.step()
optimizer.zero_grad()
# ...
Problem is always about a bit experimentation of adding varieties (e.g. logging tensors distribution, doing some custom code to average loss per epoch…), ended up there are 1000 versions of “template” or “past code” to start a new project and endless combine of version to get a better starting point next time.
The purpose of using framework (for me)
Talking about framework, the first few me and my team explored is fast.ai and AlllenNLP (and maybe Keras when deciding what callbacks is needed). What they help is prepare code that’s what regular training/inference flow which also allow flexibility to adapt.
Usually framework would encapsulate the complexity by providing a easy start for common flow (like above), and with some configuration to add functionalities, that comes handy, it would normally comes with callback like following so that user can override the callback to do something else without modifying the training loop directly.
def train():
on_batch_begin()
process_batch() # default to be self.model(batch)
on_batch_end() on_loss_begin()
process_loss_backward() # the loss.backward() inside this
on_loss_end()
...
PyTorch Lightning
The first framework I personally started seriously using is PyTorch Lightning, I love it (until I build my vanilla GAN). There are a lot of advantage using it.
First of all, the documentation is very well written, as beginner, it’s super easy to know how to convert ordinary PyTorch training code into PyTorch Lightning.
The core item is Lightning Module and Trainer.
The Lightning Module
The Lightning Module defines how the training run with a bunch of predefined callbacks (e.g. training_step, training_epoch_end…), so you can override any of these if there are things you want it to behave different than the default.
The smallest override needed as mentioned from documentation is:
>>> import pytorch_lightning as pl
>>> class LitModel(pl.LightningModule):
...
... def __init__(self):
... super().__init__()
... self.l1 = torch.nn.Linear(28 * 28, 10)
...
... def forward(self, x):
... return torch.relu(self.l1(x.view(x.size(0), -1)))
...
... def training_step(self, batch, batch_idx):
... x, y = batch
... y_hat = self(x)
... loss = F.cross_entropy(y_hat, y)
... return loss
...
... def configure_optimizers(self):
... return torch.optim.Adam(self.parameters(), lr=0.02)
Consider it as a super torch.nn.Module, so you have your layers (or sub module) defined in __init__() and your forward function is the connection of layers like nn.Module forward().
The training_step() and configure_optimizers(), on the other hand are code and objects that are outside of nn.Module but within the usual training loop.
While my “template” is a bit more complicated than their example:
class MNISTModel(LightningModule):
def __init__(self, learning_rate=1e-3, batch_size=32):
super().__init__()
# data
self.train_data = None
self.val_data = None
self.test_data = None # init for Trainer class
self.learning_rate = learning_rate
self.batch_size = batch_size # metrics
self.f1 = metrics.F1(reduction='sum') # using sum in steps and avg in epoch migtht be wrong
self.accuracy = metrics.Accuracy(reduction='sum') # using sum in steps and avg in epoch migtht be wrong # model
self.model = MNISTCoreModel() def forward(self, input_tensor):
# self.print("forward input_tensor:",input_tensor.shape)
out = self.model(input_tensor)
return out def custom_step(self, batch, batch_idx, mode):
x, y = batch
y_hat = self(x)
return {
f'{mode}loss': F.cross_entropy(y_hat, y),
f'{mode}f1': self.f1(y_hat, y),
f'{mode}accuracy': self.accuracy(y_hat, y),
} def custom_epoch_end(self, outputs, mode):
avg_loss = torch.stack([x[f'{mode}loss'] for x in outputs]).mean()
avg_f1 = torch.stack([x[f'{mode}f1'] for x in outputs]).mean()
avg_accuracy = torch.stack([x[f'{mode}accuracy'] for x in outputs]).mean()
tensorboard_logs = {
f'{mode}loss': avg_loss,
f'{mode}f1': avg_f1,
f'{mode}accuracy': avg_accuracy,
}
self.print(tensorboard_logs)
return {
**tensorboard_logs,
'log': tensorboard_logs
} def training_step(self, batch, batch_idx):
return self.custom_step(batch,batch_idx,"") def validation_step(self, batch, batch_idx):
return self.custom_step(batch,batch_idx,"val_") def test_step(self, batch, batch_idx):
return self.custom_step(batch,batch_idx,"test_") def training_epoch_end(self, outputs):
return self.custom_epoch_end(outputs,"") def validation_epoch_end(self, outputs):
return self.custom_epoch_end(outputs,"val_") def test_epoch_end(self, outputs):
return self.custom_epoch_end(outputs,"test_") def configure_optimizers(self):
optimizer = torch.optim.Adam(self.parameters(), lr=self.learning_rate)
lr_scheduler = {'scheduler': torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer, 5, 2),
'name': 'CosineAnnealingWarmRestartsLR'}
return [optimizer], [lr_scheduler] def prepare_data(self):
MNIST('data', train=True, download=True, transform=transforms.ToTensor())
MNIST('data', train=False, download=True, transform=transforms.ToTensor()) def setup(self, stage):
self.summarize() # transform
transform=transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])
mnist_train = MNIST('data', train=True, download=False, transform=transform)
mnist_test = MNIST('data', train=False, download=False, transform=transform) # train/val split
mnist_train, mnist_val = random_split(mnist_train, [55000, 5000]) # assign to use in dataloaders
self.train_data = mnist_train
self.val_data = mnist_val
self.test_data = mnist_test def train_dataloader(self):
return DataLoader(self.train_data, batch_size=self.batch_size, shuffle=True, num_workers=4) def val_dataloader(self):
return DataLoader(self.val_data, batch_size=self.batch_size, shuffle=False, num_workers=4) def test_dataloader(self):
return DataLoader(self.test_data, batch_size=self.batch_size, shuffle=False, num_workers=4)
The Trainer
The trainer, to my understanding, is to provide all those common features while we train, for example, checkpointing, early stopping, defining how many epochs to run…
There are a lot of different features (flags) they provide (please refer to their documentation which provide a full explanation of each of them)
a sample trainer I used mostly:
seed_everything(1024)tpu_gpu_options = {}
if RUNTIME_MODE == "TPU":
tpu_gpu_options = {
# "tpu_cores": NUM_TPU_CORES,
"tpu_cores": 8,
}
elif RUNTIME_MODE == "GPU":
tpu_gpu_options = {
"gpus":1,
}# most basic trainer, uses good defaults
trainer = Trainer(
**tpu_gpu_options,
# num_nodes=1,
min_epochs=1, max_epochs=300,
val_check_interval=1.0, # every 1 epoch (float for epoch and int for mini-batch) progress_bar_refresh_rate=1,
row_log_interval=50, # save metric
log_save_interval=100, # logger save (anticipate this should be larger than row_log _interval?) # accumulate_grad_batches=2,
# gradient_clip_val=0.5,
# auto_scale_batch_size=True, # None|'power'|'binsearch'
# auto_lr_find=True, checkpoint_callback=checkpoint_callback,
early_stop_callback=early_stop_callback, # cannot make it work if using TPU
# early_stop_callback=True,
callbacks=[LearningRateLogger()], #### Debug #####
# profiler=True,
# track_grad_norm=2,
)
trainer.fit(model)
The beginning hurdles for me
One of the early hurdle for me is the expected input/output per call back function, for example the step function for train seems expect “loss” as output key, instead of any random key like “train_loss”, while validation and test seems also expecting “val_loss”
The second hurdle is TPU usage, the sample on documentation work for training (and make sure you use 1 or 8 for tpu_cores in Trainer config), but the early stopping callback throw error everytime it try to collect the data from different core.
The third one is the reason why I want to try other framework (Catalyst), which is the way it implement handling of multiple optimizers (e.g. encoder decoder with different optimizer or GAN with discriminator and generator), in while the framework decided to process each optimizer per batch (meaning part of the forward() is executed x times with x = num of optimizer), the official documentation GAN sample does work, just I “believe” there could be an implementation that fit better on the structure of the Lightning Module, I would like to write another article on this part in future.
Finally for developer want to know the core flow by reading code
Read this file and you get the idea of how the callbacks run and their parameters and with code snipplets.
Catalyst
Honestly I am super new to Catalyst, it’s unlike PyTorch Lightning, a bit harder to understand for me, but I am excited about knowing more about it.
First of all, the documentation is not yet completed, so my understanding is through source code reading on and example code they provided.
For another reference, you can read: https://medium.com/pytorch/catalyst-101-accelerated-pytorch-bd766a556d92
The core items for Catalyst, I believe is Runner with the different types of Callbacks
Runner
The simpliest sample documentation provided as follow:
class CustomRunner(dl.Runner):
def predict_batch(self, batch):
# model inference step
return self.model(batch[0].to(self.device).view(batch[0].size(0), -1))
def _handle_batch(self, batch):
# model train/valid step
x, y = batch
y_hat = self.model(x.view(x.size(0), -1))
loss = F.cross_entropy(y_hat, y)
accuracy01, accuracy03 = metrics.accuracy(y_hat, y, topk=(1, 3))
self.batch_metrics.update(
{"loss": loss, "accuracy01": accuracy01, "accuracy03": accuracy03}
)
if self.is_train_loader:
loss.backward()
self.optimizer.step()
self.optimizer.zero_grad()
So this _handle_batch() look alike PyTorch Lightning’s combination of training_step(), validation_step() (and maybe test_step()).
And the runner itself process train (or run) and add config with following (which already see :
runner = CustomRunner()
# model training
runner.train(
model=model,
optimizer=optimizer,
loaders=loaders,
logdir="./logs",
num_epochs=5,
verbose=True,
load_best_on_end=True,
)
First I look and this, I am worried, as it looked to me that I have to manage the loss.backward(), optimizer.step()…as well, the optimizer, model and schedulers are not within the Runner module
Then I see another example of using the 2nd important items — Callbacks
Callbacks
The example from documentation (https://catalyst-team.github.io/catalyst/api/callbacks.html#catalyst.callbacks.batch_overfit.BatchOverfitCallback):
import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl
# data
num_samples, num_features = int(1e4), int(1e1)
X, y = torch.rand(num_samples, num_features), torch.rand(num_samples)
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}
# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, 1)
criterion = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [3, 6])
# model training
runner = dl.SupervisedRunner()
runner.train(
model=model,
criterion=criterion,
optimizer=optimizer,
scheduler=scheduler,
loaders=loaders,
logdir="./logdir",
num_epochs=8,
verbose=True,
callbacks=[dl.BatchOverfitCallback(train=10, valid=0.5)]
)
Note that the difference is there is no overriding of Runner (but instead using a subclass SupervisedRunner(), and most standard procedure are already there)
And the callback system is a list (sometimes you would see a dictionary object, which I want to explore more but I cannot make it work yet), and looks like they would all run, and I don’t know how they determine the sequence (until I read the source code), but they work magically.
A bit deeper in Callbacks (class and subclasses)
With my limited understanding, the callbacks are designed very differently compare to other frameworks I experienced.
Catalysts seems to put different component (E.g. Optimizer) or features (like Logging) of training as Callbacks object, and each Callbacks subclass object have callback functions like on_batch_start()… (read Callback class definition from this: https://github.com/catalyst-team/catalyst/blob/master/catalyst/core/callback.py)
I admire the vision of how each component and feature is implemented as Callback and this allow minimum overriding of callbacks and better code separation.
Finally for developer want to know the core flow by reading code
I believe this is where the core flow:
Final words
Thank you so much for surviving through the article as I know how messy it’s being written and it’s not beginner friendly (even I consider myself beginner in the frameworks)
I wish I can explore more and write more about these when I work on them more. They are both very good framework to start with, it’s a matter of choice instead of which one is better than the other.
Good Reference(s)
https://neptune.ai/blog/model-training-libraries-pytorch-ecosystem