import math
from typing import List, Union
import numpy as np
import torch
import torch.nn as nn
from torch.optim import Optimizer
from torch.optim.lr_scheduler import _LRScheduler
[docs]def compute_pnorm(model: nn.Module) -> float:
"""
Computes the norm of the parameters of a model.
:param model: A PyTorch model.
:return: The norm of the parameters of the model.
"""
return math.sqrt(sum([p.norm().item() ** 2 for p in model.parameters()]))
[docs]def compute_gnorm(model: nn.Module) -> float:
"""
Computes the norm of the gradients of a model.
:param model: A PyTorch model.
:return: The norm of the gradients of the model.
"""
return math.sqrt(sum([p.grad.norm().item() ** 2 for p in model.parameters() if p.grad is not None]))
[docs]def param_count(model: nn.Module) -> int:
"""
Determines number of trainable parameters.
:param model: An PyTorch model.
:return: The number of trainable parameters in the model.
"""
return sum(param.numel() for param in model.parameters() if param.requires_grad)
[docs]def param_count_all(model: nn.Module) -> int:
"""
Determines number of trainable parameters.
:param model: An PyTorch model.
:return: The number of trainable parameters in the model.
"""
return sum(param.numel() for param in model.parameters())
[docs]def index_select_ND(source: torch.Tensor, index: torch.Tensor) -> torch.Tensor:
"""
Selects the message features from source corresponding to the atom or bond indices in :code:`index`.
:param source: A tensor of shape :code:`(num_bonds, hidden_size)` containing message features.
:param index: A tensor of shape :code:`(num_atoms/num_bonds, max_num_bonds)` containing the atom or bond
indices to select from :code:`source`.
:return: A tensor of shape :code:`(num_atoms/num_bonds, max_num_bonds, hidden_size)` containing the message
features corresponding to the atoms/bonds specified in index.
"""
index_size = index.size() # (num_atoms/num_bonds, max_num_bonds)
suffix_dim = source.size()[1:] # (hidden_size,)
final_size = index_size + suffix_dim # (num_atoms/num_bonds, max_num_bonds, hidden_size)
target = source.index_select(dim=0, index=index.view(-1)) # (num_atoms/num_bonds * max_num_bonds, hidden_size)
target = target.view(final_size) # (num_atoms/num_bonds, max_num_bonds, hidden_size)
return target
[docs]def get_activation_function(activation: str) -> nn.Module:
"""
Gets an activation function module given the name of the activation.
Supports:
* :code:`ReLU`
* :code:`LeakyReLU`
* :code:`PReLU`
* :code:`tanh`
* :code:`SELU`
* :code:`ELU`
:param activation: The name of the activation function.
:return: The activation function module.
"""
if activation == 'ReLU':
return nn.ReLU()
elif activation == 'LeakyReLU':
return nn.LeakyReLU(0.1)
elif activation == 'PReLU':
return nn.PReLU()
elif activation == 'tanh':
return nn.Tanh()
elif activation == 'SELU':
return nn.SELU()
elif activation == 'ELU':
return nn.ELU()
else:
raise ValueError(f'Activation "{activation}" not supported.')
[docs]def initialize_weights(model: nn.Module) -> None:
"""
Initializes the weights of a model in place.
:param model: An PyTorch model.
"""
for param in model.parameters():
if param.dim() == 1:
nn.init.constant_(param, 0)
else:
nn.init.xavier_normal_(param)
[docs]class NoamLR(_LRScheduler):
"""
Noam learning rate scheduler with piecewise linear increase and exponential decay.
The learning rate increases linearly from init_lr to max_lr over the course of
the first warmup_steps (where :code:`warmup_steps = warmup_epochs * steps_per_epoch`).
Then the learning rate decreases exponentially from :code:`max_lr` to :code:`final_lr` over the
course of the remaining :code:`total_steps - warmup_steps` (where :code:`total_steps =
total_epochs * steps_per_epoch`). This is roughly based on the learning rate
schedule from `Attention is All You Need <https://arxiv.org/abs/1706.03762>`_, section 5.3.
"""
def __init__(self,
optimizer: Optimizer,
warmup_epochs: List[Union[float, int]],
total_epochs: List[int],
steps_per_epoch: int,
init_lr: List[float],
max_lr: List[float],
final_lr: List[float]):
"""
:param optimizer: A PyTorch optimizer.
:param warmup_epochs: The number of epochs during which to linearly increase the learning rate.
:param total_epochs: The total number of epochs.
:param steps_per_epoch: The number of steps (batches) per epoch.
:param init_lr: The initial learning rate.
:param max_lr: The maximum learning rate (achieved after :code:`warmup_epochs`).
:param final_lr: The final learning rate (achieved after :code:`total_epochs`).
"""
if not (
len(optimizer.param_groups) == len(warmup_epochs) == len(total_epochs)
== len(init_lr) == len(max_lr) == len(final_lr)
):
raise ValueError(
"Number of param groups must match the number of epochs and learning rates! "
f"got: len(optimizer.param_groups)= {len(optimizer.param_groups)}, "
f"len(warmup_epochs)= {len(warmup_epochs)}, "
f"len(total_epochs)= {len(total_epochs)}, "
f"len(init_lr)= {len(init_lr)}, "
f"len(max_lr)= {len(max_lr)}, "
f"len(final_lr)= {len(final_lr)}"
)
self.num_lrs = len(optimizer.param_groups)
self.optimizer = optimizer
self.warmup_epochs = np.array(warmup_epochs)
self.total_epochs = np.array(total_epochs)
self.steps_per_epoch = steps_per_epoch
self.init_lr = np.array(init_lr)
self.max_lr = np.array(max_lr)
self.final_lr = np.array(final_lr)
self.current_step = 0
self.lr = init_lr
self.warmup_steps = (self.warmup_epochs * self.steps_per_epoch).astype(int)
self.total_steps = self.total_epochs * self.steps_per_epoch
self.linear_increment = (self.max_lr - self.init_lr) / self.warmup_steps
self.exponential_gamma = (self.final_lr / self.max_lr) ** (1 / (self.total_steps - self.warmup_steps))
super(NoamLR, self).__init__(optimizer)
[docs] def get_lr(self) -> List[float]:
"""
Gets a list of the current learning rates.
:return: A list of the current learning rates.
"""
return list(self.lr)
[docs] def step(self, current_step: int = None):
"""
Updates the learning rate by taking a step.
:param current_step: Optionally specify what step to set the learning rate to.
If None, :code:`current_step = self.current_step + 1`.
"""
if current_step is not None:
self.current_step = current_step
else:
self.current_step += 1
for i in range(self.num_lrs):
if self.current_step <= self.warmup_steps[i]:
self.lr[i] = self.init_lr[i] + self.current_step * self.linear_increment[i]
elif self.current_step <= self.total_steps[i]:
self.lr[i] = self.max_lr[i] * (self.exponential_gamma[i] ** (self.current_step - self.warmup_steps[i]))
else: # theoretically this case should never be reached since training should stop at total_steps
self.lr[i] = self.final_lr[i]
self.optimizer.param_groups[i]['lr'] = self.lr[i]
[docs]def activate_dropout(module: nn.Module, dropout_prob: float):
"""
Set p of dropout layers and set to train mode during inference for uncertainty estimation.
:param model: A :class:`~chemprop.models.model.MoleculeModel`.
:param dropout_prob: A float on (0,1) indicating the dropout probability.
"""
if isinstance(module, nn.Dropout):
module.p = dropout_prob
module.train()