from collections import defaultdict
import logging
from random import Random
from typing import Dict, List, Set, Tuple, Union
import warnings
import copy

from rdkit import Chem
from rdkit.Chem.Scaffolds import MurckoScaffold
from tqdm import tqdm
import numpy as np

from .data import MoleculeDataset, make_mol

[docs]def generate_scaffold(mol: Union[str, Chem.Mol, Tuple[Chem.Mol, Chem.Mol]], include_chirality: bool = False) -> str: """ Computes the Bemis-Murcko scaffold for a SMILES string. :param mol: A SMILES or an RDKit molecule. :param include_chirality: Whether to include chirality in the computed scaffold.. :return: The Bemis-Murcko scaffold for the molecule. """ if isinstance(mol, str): mol = make_mol(mol, keep_h = False, add_h = False, keep_atom_map = False) if isinstance(mol, tuple): mol = copy.deepcopy(mol[0]) for atom in mol.GetAtoms(): atom.SetAtomMapNum(0) scaffold = MurckoScaffold.MurckoScaffoldSmiles(mol = mol, includeChirality = include_chirality) return scaffold
[docs]def scaffold_to_smiles(mols: Union[List[str], List[Chem.Mol], List[Tuple[Chem.Mol, Chem.Mol]]], use_indices: bool = False) -> Dict[str, Union[Set[str], Set[int]]]: """ Computes the scaffold for each SMILES and returns a mapping from scaffolds to sets of smiles (or indices). :param mols: A list of SMILES or RDKit molecules. :param use_indices: Whether to map to the SMILES's index in :code:`mols` rather than mapping to the smiles string itself. This is necessary if there are duplicate smiles. :return: A dictionary mapping each unique scaffold to all SMILES (or indices) which have that scaffold. """ scaffolds = defaultdict(set) for i, mol in tqdm(enumerate(mols), total = len(mols)): scaffold = generate_scaffold(mol) if use_indices: scaffolds[scaffold].add(i) else: scaffolds[scaffold].add(mol) return scaffolds
[docs]def scaffold_split(data: MoleculeDataset, sizes: Tuple[float, float, float] = (0.8, 0.1, 0.1), balanced: bool = False, key_molecule_index: int = 0, seed: int = 0, logger: logging.Logger = None) -> Tuple[MoleculeDataset, MoleculeDataset, MoleculeDataset]: r""" Splits a :class:`` by scaffold so that no molecules sharing a scaffold are in different splits. :param data: A :class:`MoleculeDataset`. :param sizes: A length-3 tuple with the proportions of data in the train, validation, and test sets. :param balanced: Whether to balance the sizes of scaffolds in each set rather than putting the smallest in test set. :param key_molecule_index: For data with multiple molecules, this sets which molecule will be considered during splitting. :param seed: Random seed for shuffling when doing balanced splitting. :param logger: A logger for recording output. :return: A tuple of :class:``\ s containing the train, validation, and test splits of the data. """ if not (len(sizes) == 3 and np.isclose(sum(sizes), 1)): raise ValueError(f"Invalid train/val/test splits! got: {sizes}") # Split train_size, val_size, test_size = sizes[0] * len(data), sizes[1] * len(data), sizes[2] * len(data) train, val, test = [], [], [] train_scaffold_count, val_scaffold_count, test_scaffold_count = 0, 0, 0 # Map from scaffold to index in the data key_mols = [m[key_molecule_index] for m in data.mols(flatten=False)] scaffold_to_indices = scaffold_to_smiles(key_mols, use_indices=True) # Seed randomness random = Random(seed) if balanced: # Put stuff that's bigger than half the val/test size into train, rest just order randomly index_sets = list(scaffold_to_indices.values()) big_index_sets = [] small_index_sets = [] for index_set in index_sets: if len(index_set) > val_size / 2 or len(index_set) > test_size / 2: big_index_sets.append(index_set) else: small_index_sets.append(index_set) random.seed(seed) random.shuffle(big_index_sets) random.shuffle(small_index_sets) index_sets = big_index_sets + small_index_sets else: # Sort from largest to smallest scaffold sets index_sets = sorted(list(scaffold_to_indices.values()), key=lambda index_set: len(index_set), reverse=True) for index_set in index_sets: if len(train) + len(index_set) <= train_size: train += index_set train_scaffold_count += 1 elif len(val) + len(index_set) <= val_size: val += index_set val_scaffold_count += 1 else: test += index_set test_scaffold_count += 1 if logger is not None: logger.debug(f'Total scaffolds = {len(scaffold_to_indices):,} | ' f'train scaffolds = {train_scaffold_count:,} | ' f'val scaffolds = {val_scaffold_count:,} | ' f'test scaffolds = {test_scaffold_count:,}') if logger is not None and not data.is_atom_bond_targets: log_scaffold_stats(data, index_sets, logger=logger) # Map from indices to data train = [data[i] for i in train] val = [data[i] for i in val] test = [data[i] for i in test] return MoleculeDataset(train), MoleculeDataset(val), MoleculeDataset(test)
[docs]def log_scaffold_stats(data: MoleculeDataset, index_sets: List[Set[int]], num_scaffolds: int = 10, num_labels: int = 20, logger: logging.Logger = None) -> List[Tuple[List[float], List[int]]]: """ Logs and returns statistics about counts and average target values in molecular scaffolds. :param data: A :class:``. :param index_sets: A list of sets of indices representing splits of the data. :param num_scaffolds: The number of scaffolds about which to display statistics. :param num_labels: The number of labels about which to display statistics. :param logger: A logger for recording output. :return: A list of tuples where each tuple contains a list of average target values across the first :code:`num_labels` labels and a list of the number of non-zero values for the first :code:`num_scaffolds` scaffolds, sorted in decreasing order of scaffold frequency. """ if logger is not None: logger.debug('Label averages per scaffold, in decreasing order of scaffold frequency,' f'capped at {num_scaffolds} scaffolds and {num_labels} labels:') stats = [] index_sets = sorted(index_sets, key=lambda idx_set: len(idx_set), reverse=True) for scaffold_num, index_set in enumerate(index_sets[:num_scaffolds]): data_set = [data[i] for i in index_set] targets = np.array([d.targets for d in data_set], dtype=float) with warnings.catch_warnings(): # Likely warning of empty slice of target has no values besides NaN warnings.simplefilter('ignore', category=RuntimeWarning) target_avgs = np.nanmean(targets, axis=0)[:num_labels] counts = np.count_nonzero(~np.isnan(targets), axis=0)[:num_labels] stats.append((target_avgs, counts)) if logger is not None: logger.debug(f'Scaffold {scaffold_num}') for task_num, (target_avg, count) in enumerate(zip(target_avgs, counts)): logger.debug(f'Task {task_num}: count = {count:,} | target average = {target_avg:.6f}') logger.debug('\n') return stats