code recommender

Leveling up Training: NVTabular and PyTorch Lightning

I’m going to use NVTabular with PyTorch Lightning to train a wide and deep recommender model on MovieLens 25M. It’s quite a chimera implementation as you shall see.

In my last post, I tried to run NVIDIA Merlin models using free instances on the web. That did not go so well. But the value of using the GPU for the data engineering and data loading parts is very attractive. In this post, I’m going to use NVTabular with PyTorch Lightning to train a wide and deep recommender model on MovieLens 25M.

For the code, you may check my Kaggle notebook. We have quite the chimera for this implementation as listed below:

  • Large chunks of the code are lifted from the NVTabular tutorial.
  • For the model, I am using some parts of James Le’s work.
  • Optuna is used for hyperparameter tuning.
  • Training is done via PyTorch Lightning.
  • I am also leveraging CometML for metric tracking.
  • The dataset I used is MovieLens 25M. It has 25 million ratings and one million tag applications applied to 62,000 movies by 162,000 users.

Cool? Let’s start!

Data Processing with NVTabular

There are several advantages to using NVTabular. You can use datasets that are larger than memory (it uses dask), and all processing can be done in the GPU. Also, the framework uses DAGs, which are conceptually familiar to most engineers. Our operations will be defined using these DAGs.

We will first define our workflow. First, we’re going to use implicit ratings where 1 is a rating of 4 and 5. Second, we’ll be converting the genres column into a multi-hot categorical feature. Third, we’ll be joining the ratings and the genres tables. Note that the >> is overloaded and behaves just like a pipe. If you run this cell, a DAG will appear.

# movies – id, genres
# train and valid are ratings
# with the following schema:
# userid, moveid, rating (1-5)
# all are stored in parquet format
# join the columns userid and movie id
# wait for the "train" and "valid" datasets later…
joined = ["userId", "movieId"] >> nvt.ops.JoinExternal(movies, on=["movieId"])
# convert users and movies to categoricals
cat_features = joined >> nvt.ops.Categorify()
# convert explicit ratings (4 & 5) as implicit (1)
ratings = nvt.ColumnGroup(["rating"]) >> nvt.ops.LambdaOp(lambda col: (col > 3).astype("int8"))
output = cat_features + ratings
# workflow is like a pipeline in sklearn
workflow = nvt.Workflow(output)

You may find the >> operations on lists strange and may take some getting used to. Note also that the actual datasets aren’t defined yet. We will need to define a Dataset, which we will transform using the above Workflow. A Dataset is an abstraction to use chunks of the dataset under the hood. The Workflow will then compute statistics and other information from the Dataset.

train_dataset = nvt.Dataset([os.path.join(WORKING_DIR, "train.parquet")])
valid_dataset = nvt.Dataset([os.path.join(WORKING_DIR, "valid.parquet")])
# fit the workflow with our dataset
# define the dtypes
CATEGORICAL_COLUMNS = ["userId", "movieId"]
LABEL_COLUMNS = ["rating"]
dict_dtypes = {}
dict_dtypes[col] = np.int64
for col in LABEL_COLUMNS:
dict_dtypes[col] = np.float32
# transform data
output_path=os.path.join(WORKING_DIR, "train"),,
cats=["userId", "movieId", "genres"],
# save the workflow, "workflow"))

If you run the above snippet, then you will have two resulting output directories. The first, train, will contain parquet files, the schema, and other metadata about your dataset. The second, workflow, will contain the computed statistics, categoricals, etc.

To use the datasets and workflows in training the model, you will use iterators and data loaders. It looks like the following.

from nvtabular.loader.torch import TorchAsyncItr, DLDataLoader
# define your categoricals, continuous variables, and labels
train_iter = TorchAsyncItr(
train_loader = DLDataLoader(
train_iter, batch_size=None, collate_fn=lambda x: x, pin_memory=False, num_workers=0
# you can also use the workflow to get info about your data
# for example, if you have categoricals, you can get the vocabular and embedding sizes:
proc = nvt.Workflow.load(os.path.join(WORKING_DIR, "workflow"))
EMBEDDING_TABLE_SHAPES, MH_EMBEDDING_TABLE_SHAPES = nvt.ops.get_embedding_sizes(proc)

Wide and Deep Networks with NVTabular, TorchFM, and PyTorch

The model we’re using is a wide and deep network which was first used in Google Play. The wide features are the user and item embeddings. For the deep features, we pass the users, items, and item feature embeddings to successively fully connected layers. I’m modifying the genres variable to use multi-hot encodings, which if you look under the hood, is summing together embeddings of the individual categorical values.

See Image 1 for a visual representation from the original authors.

Image 1. Wide and Deep Network.
import pytorch_lightning as pl
from nvtabular.framework_utils.torch.layers import ConcatenatedEmbeddings, MultiHotEmbeddings
import torch
class WideAndDeepMultihot(pl.LightningModule):
def __init__(
cat_names, cont_names, label_names,
batch_size = None,
# configure from model_conf
embedding_table_shapes = model_conf["embedding_table_shape"]
# truncated for simplicity… emb_dropout, layer_hidden_dims, layer_dropout_rates
# are from model_conf
mh_shapes = None
if isinstance(embedding_table_shapes, tuple):
embedding_table_shapes, mh_shapes = embedding_table_shapes
if embedding_table_shapes:
self.initial_cat_layer = ConcatenatedEmbeddings(
embedding_table_shapes, dropout=emb_dropout
if mh_shapes:
self.mh_cat_layer = MultiHotEmbeddings(mh_shapes, dropout=emb_dropout, mode=bag_mode)
self.initial_cont_layer = torch.nn.BatchNorm1d(num_continuous)
embedding_size = sum(emb_size for _, emb_size in embedding_table_shapes.values())
if mh_shapes is not None:
embedding_size = embedding_size + sum(emb_size for _, emb_size in mh_shapes.values())
layer_input_sizes = [embedding_size + num_continuous] + layer_hidden_dims[:1]
layer_output_sizes = layer_hidden_dims
self.layers = torch.nn.ModuleList(
torch.nn.Linear(input_size, output_size),
for input_size, output_size, dropout_rate in zip(
layer_input_sizes, layer_output_sizes, layer_dropout_rates
# output layer receives wide and deep
head_input_size = layer_input_sizes[0] + layer_output_sizes[1]
self.output_layer = torch.nn.Linear(head_input_size, 1)

One Training Step

To train our model, we define a single training step. This is required by PyTorch Lightning.

First, the data loader from NVTabular outputs a dictionary containing the batch of inputs. In this example, we are handling only categorical values, but this transform step can handle continuous values as well. The output is a tuple of categoricals and continuous variables, plus the label.

class DictTransform:
def __init__(self, cat_names, cont_names, label_names=None):
self.cats = cat_names
self.conts = cont_names
self.labels = label_names
def transform_with_label(self, batch):
cats = None
conts = None
batch, labels = batch
# take apart the batch and put together into subsets
if self.cats:
cats = self.create_stack(batch, self.cats)
if self.conts:
conts, _ = self.create_stack(batch, self.conts)
return cats, conts, labels
def create_stack(self, batch, target_columns):
columns = []
mh_s = {}
for column_name in target_columns:
target = batch[column_name]
if isinstance(target, torch.Tensor):
if target.is_sparse:
mh_s[column_name] = target
# if not a tensor, must be tuple
# multihot column type, appending tuple representation
mh_s[column_name] = target
if columns:
if len(columns) > 1:
# concatenate categoricals — converting the lists above to a torch tensor
# batch x n_categoricals
columns =, 1)
columns = columns[0].unsqueeze(1)
return columns, mh_s

Secondly, we define the training step and evaluation steps which uses the transform function above.

class WideAndDeepMultihot(pl.LightningModule):
# others go here…
def training_step(self, batch, batch_idx):
# unpack
x_cat, x_cont, y = self.transform.transform_with_label(batch)
# forward
y_pred = self((x_cat, x_cont))
# loss func
loss = self.loss_func(y_pred, y)
return loss
def validation_step(self, batch, batch_idx):
score_prefix = "val"
return self.__shared_evaluation(batch, batch_idx, score_prefix)
def test_step(self, batch, batch_idx):
score_prefix = "test"
return self.__shared_evaluation(batch, batch_idx, score_prefix)
def __shared_evaluation(self, batch, batch_idx, prefix):
with torch.no_grad():
# unpack
x_cat, x_cont, y = self.transform.transform_with_label(batch)
y_pred = self((x_cat, x_cont))
# loss func
loss = self.loss_func(y_pred, y)
# metrics for binary only
y_pred_class =, self.decision_boundary)
y_true_class = y.long()
prec = precision(y_pred_class, y_true_class)
rec = recall(y_pred_class, y_true_class)
f1 = f1_score(y_pred_class, y_true_class)
auc_val = auroc(y_pred, y_true_class)
score_dict = {f"{prefix}_loss_epoch": loss, f"{prefix}_recall" : rec,
f"{prefix}_precision" : prec, f"{prefix}_f1_score" : f1,
f"{prefix}_auc" : auc_val}
self.log_dict(score_dict, on_epoch=True, prog_bar=True, logger=True, batch_size=self.batch_size)
return score_dict

I’m omitting the forward step since it is simply a matter of inputting the categorical and continuous variables to the correct layers, concatenating the wide and deep components of the model, and adding a sigmoid head.

Training with Optuna, PyTorch Lightning, and CometML

With everything defined properly, it’s time to stitch it all together. Each of the functions here (create_loaders, create_model, and create_trainer) is user-defined. As the name suggests, it simply creates these objects for training.

import gc
def objective(trial):
### Dataset section
# see
train_loader, valid_loader = create_loaders(train_dataset, valid_dataset)
### Model section
# see
epochs = 1
patience = 3
model = create_model(trial, epochs, patience)
### trainer specific
# see
trainer = create_trainer(epochs, patience), train_dataloaders=train_loader, val_dataloaders = valid_loader)
## validation metrics
val_metrics = trainer.test(dataloaders=valid_loader)
# cleanup
del model
return val_metrics[0]["test_precision"]
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=6)

Evaluation and Conclusions

I’ve launched only 6 trials for this one. Decent, but more trials can yield better results.

Test metric value
Loss 0.1889
ROC AUC 0.7622
Precision 0.7189
Recall 0.7189
F1 Score 0.7189

As you can probably guess, there are a lot of components that had to be stitched together. Building this could be a pain especially when there are multiple projects that probably require the same thing. I recommend creating an in-house framework to distribute this template sustainably.

The next steps could be:

  • Deploy an inference service through TorchServe.
  • Create a training-deployment pipeline using Kedro.
  • Extract top-n user recommendations and store them in a cache server.

Thanks for reading!

By krsnewwave

I'm a software engineer and a data science guy on recommender systems, natural language processing, and computer vision.

Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s