Experiment on PyTorch Lightning and Catalyst- the high level frameworks for PyTorch

Note: this article is like a condensed version of my own experiment and experience, it does not systematically give a detail for what are the frameworks and how to start using them, better for you to explore them and read the documentation or example from official sites or other articles.

Note: this is also not a comparison article between the frameworks, but much rather, provide a working approach that worked for me, for doing something with the frameworks.

Why I am writing this and where I am from

Writing training code is always tedious, the standard code normally would be like:

# prepare data and data loader
# ...
For each epoch:
For each batch:
output = model(batch)
loss = criterion(output, target)
loss.backward()
optimizer.step()
optimizer.zero_grad()
# ...

Problem is always about a bit experimentation of adding varieties (e.g. logging tensors distribution, doing some custom code to average loss per epoch…), ended up there are 1000 versions of “template” or “past code” to start a new project and endless combine of version to get a better starting point next time.

The purpose of using framework (for me)

Usually framework would encapsulate the complexity by providing a easy start for common flow (like above), and with some configuration to add functionalities, that comes handy, it would normally comes with callback like following so that user can override the callback to do something else without modifying the training loop directly.

def train():
on_batch_begin()
process_batch() # default to be self.model(batch)
on_batch_end()
on_loss_begin()
process_loss_backward() # the loss.backward() inside this
on_loss_end()
...

PyTorch Lightning

First of all, the documentation is very well written, as beginner, it’s super easy to know how to convert ordinary PyTorch training code into PyTorch Lightning.

The core item is Lightning Module and Trainer.

The Lightning Module

The smallest override needed as mentioned from documentation is:

>>> import pytorch_lightning as pl
>>> class LitModel(pl.LightningModule):
...
... def __init__(self):
... super().__init__()
... self.l1 = torch.nn.Linear(28 * 28, 10)
...
... def forward(self, x):
... return torch.relu(self.l1(x.view(x.size(0), -1)))
...
... def training_step(self, batch, batch_idx):
... x, y = batch
... y_hat = self(x)
... loss = F.cross_entropy(y_hat, y)
... return loss
...
... def configure_optimizers(self):
... return torch.optim.Adam(self.parameters(), lr=0.02)

Consider it as a super torch.nn.Module, so you have your layers (or sub module) defined in __init__() and your forward function is the connection of layers like nn.Module forward().

The training_step() and configure_optimizers(), on the other hand are code and objects that are outside of nn.Module but within the usual training loop.

While my “template” is a bit more complicated than their example:

class MNISTModel(LightningModule):
def __init__(self, learning_rate=1e-3, batch_size=32):
super().__init__()
# data
self.train_data = None
self.val_data = None
self.test_data = None
# init for Trainer class
self.learning_rate = learning_rate
self.batch_size = batch_size
# metrics
self.f1 = metrics.F1(reduction='sum') # using sum in steps and avg in epoch migtht be wrong
self.accuracy = metrics.Accuracy(reduction='sum') # using sum in steps and avg in epoch migtht be wrong
# model
self.model = MNISTCoreModel()
def forward(self, input_tensor):
# self.print("forward input_tensor:",input_tensor.shape)
out = self.model(input_tensor)
return out
def custom_step(self, batch, batch_idx, mode):
x, y = batch
y_hat = self(x)
return {
f'{mode}loss': F.cross_entropy(y_hat, y),
f'{mode}f1': self.f1(y_hat, y),
f'{mode}accuracy': self.accuracy(y_hat, y),
}
def custom_epoch_end(self, outputs, mode):
avg_loss = torch.stack([x[f'{mode}loss'] for x in outputs]).mean()
avg_f1 = torch.stack([x[f'{mode}f1'] for x in outputs]).mean()
avg_accuracy = torch.stack([x[f'{mode}accuracy'] for x in outputs]).mean()
tensorboard_logs = {
f'{mode}loss': avg_loss,
f'{mode}f1': avg_f1,
f'{mode}accuracy': avg_accuracy,
}
self.print(tensorboard_logs)
return {
**tensorboard_logs,
'log': tensorboard_logs
}
def training_step(self, batch, batch_idx):
return self.custom_step(batch,batch_idx,"")
def validation_step(self, batch, batch_idx):
return self.custom_step(batch,batch_idx,"val_")
def test_step(self, batch, batch_idx):
return self.custom_step(batch,batch_idx,"test_")
def training_epoch_end(self, outputs):
return self.custom_epoch_end(outputs,"")
def validation_epoch_end(self, outputs):
return self.custom_epoch_end(outputs,"val_")
def test_epoch_end(self, outputs):
return self.custom_epoch_end(outputs,"test_")
def configure_optimizers(self):
optimizer = torch.optim.Adam(self.parameters(), lr=self.learning_rate)
lr_scheduler = {'scheduler': torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer, 5, 2),
'name': 'CosineAnnealingWarmRestartsLR'}
return [optimizer], [lr_scheduler]
def prepare_data(self):
MNIST('data', train=True, download=True, transform=transforms.ToTensor())
MNIST('data', train=False, download=True, transform=transforms.ToTensor())
def setup(self, stage):
self.summarize()
# transform
transform=transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])
mnist_train = MNIST('data', train=True, download=False, transform=transform)
mnist_test = MNIST('data', train=False, download=False, transform=transform)
# train/val split
mnist_train, mnist_val = random_split(mnist_train, [55000, 5000])
# assign to use in dataloaders
self.train_data = mnist_train
self.val_data = mnist_val
self.test_data = mnist_test
def train_dataloader(self):
return DataLoader(self.train_data, batch_size=self.batch_size, shuffle=True, num_workers=4)
def val_dataloader(self):
return DataLoader(self.val_data, batch_size=self.batch_size, shuffle=False, num_workers=4)
def test_dataloader(self):
return DataLoader(self.test_data, batch_size=self.batch_size, shuffle=False, num_workers=4)

The Trainer

There are a lot of different features (flags) they provide (please refer to their documentation which provide a full explanation of each of them)

a sample trainer I used mostly:

seed_everything(1024)tpu_gpu_options = {}
if RUNTIME_MODE == "TPU":
tpu_gpu_options = {
# "tpu_cores": NUM_TPU_CORES,
"tpu_cores": 8,
}
elif RUNTIME_MODE == "GPU":
tpu_gpu_options = {
"gpus":1,
}
# most basic trainer, uses good defaults
trainer = Trainer(
**tpu_gpu_options,
# num_nodes=1,

min_epochs=1, max_epochs=300,
val_check_interval=1.0, # every 1 epoch (float for epoch and int for mini-batch)
progress_bar_refresh_rate=1,
row_log_interval=50, # save metric
log_save_interval=100, # logger save (anticipate this should be larger than row_log _interval?)
# accumulate_grad_batches=2,
# gradient_clip_val=0.5,

# auto_scale_batch_size=True, # None|'power'|'binsearch'
# auto_lr_find=True,
checkpoint_callback=checkpoint_callback,
early_stop_callback=early_stop_callback, # cannot make it work if using TPU
# early_stop_callback=True,
callbacks=[LearningRateLogger()],
#### Debug #####
# profiler=True,
# track_grad_norm=2,
)
trainer.fit(model)

The beginning hurdles for me

The second hurdle is TPU usage, the sample on documentation work for training (and make sure you use 1 or 8 for tpu_cores in Trainer config), but the early stopping callback throw error everytime it try to collect the data from different core.

The third one is the reason why I want to try other framework (Catalyst), which is the way it implement handling of multiple optimizers (e.g. encoder decoder with different optimizer or GAN with discriminator and generator), in while the framework decided to process each optimizer per batch (meaning part of the forward() is executed x times with x = num of optimizer), the official documentation GAN sample does work, just I “believe” there could be an implementation that fit better on the structure of the Lightning Module, I would like to write another article on this part in future.

Finally for developer want to know the core flow by reading code

Catalyst

First of all, the documentation is not yet completed, so my understanding is through source code reading on and example code they provided.

For another reference, you can read: https://medium.com/pytorch/catalyst-101-accelerated-pytorch-bd766a556d92

The core items for Catalyst, I believe is Runner with the different types of Callbacks

Runner

class CustomRunner(dl.Runner):

def predict_batch(self, batch):
# model inference step
return self.model(batch[0].to(self.device).view(batch[0].size(0), -1))

def _handle_batch(self, batch):
# model train/valid step
x, y = batch
y_hat = self.model(x.view(x.size(0), -1))

loss = F.cross_entropy(y_hat, y)
accuracy01, accuracy03 = metrics.accuracy(y_hat, y, topk=(1, 3))
self.batch_metrics.update(
{"loss": loss, "accuracy01": accuracy01, "accuracy03": accuracy03}
)

if self.is_train_loader:
loss.backward()
self.optimizer.step()
self.optimizer.zero_grad()

So this _handle_batch() look alike PyTorch Lightning’s combination of training_step(), validation_step() (and maybe test_step()).

And the runner itself process train (or run) and add config with following (which already see :

runner = CustomRunner()
# model training
runner.train(
model=model,
optimizer=optimizer,
loaders=loaders,
logdir="./logs",
num_epochs=5,
verbose=True,
load_best_on_end=True,
)

First I look and this, I am worried, as it looked to me that I have to manage the loss.backward(), optimizer.step()…as well, the optimizer, model and schedulers are not within the Runner module

Then I see another example of using the 2nd important items — Callbacks

Callbacks

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# data
num_samples, num_features = int(1e4), int(1e1)
X, y = torch.rand(num_samples, num_features), torch.rand(num_samples)
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, 1)
criterion = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [3, 6])

# model training
runner = dl.SupervisedRunner()
runner.train(
model=model,
criterion=criterion,
optimizer=optimizer,
scheduler=scheduler,
loaders=loaders,
logdir="./logdir",
num_epochs=8,
verbose=True,
callbacks=[dl.BatchOverfitCallback(train=10, valid=0.5)]
)

Note that the difference is there is no overriding of Runner (but instead using a subclass SupervisedRunner(), and most standard procedure are already there)

And the callback system is a list (sometimes you would see a dictionary object, which I want to explore more but I cannot make it work yet), and looks like they would all run, and I don’t know how they determine the sequence (until I read the source code), but they work magically.

A bit deeper in Callbacks (class and subclasses)

Catalysts seems to put different component (E.g. Optimizer) or features (like Logging) of training as Callbacks object, and each Callbacks subclass object have callback functions like on_batch_start()… (read Callback class definition from this: https://github.com/catalyst-team/catalyst/blob/master/catalyst/core/callback.py)

I admire the vision of how each component and feature is implemented as Callback and this allow minimum overriding of callbacks and better code separation.

Finally for developer want to know the core flow by reading code

Final words

I wish I can explore more and write more about these when I work on them more. They are both very good framework to start with, it’s a matter of choice instead of which one is better than the other.

Good Reference(s)

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store