Source code for idpflex.cluster

import sys

import numpy as np
import pickle

import scipy
from scipy.spatial.distance import squareform
from scipy.stats import zscore
from scipy.cluster import hierarchy
from tqdm import tqdm
from collections import namedtuple

from idpflex.distances import (rmsd_matrix, extract_coordinates)
from idpflex.cnextend import Tree
from idpflex.properties import ScalarProperty, propagator_size_weighted_sum


[docs]class ClusterTrove(namedtuple('ClusterTrove', 'idx rmsd tree')): r"""A namedtuple with a `keys()` method for easy access of fields, which are described below under header `Parameters` Parameters ---------- idx : :class:`list` Frame indexes for the representative structures (indexes start at zero) rmsd : :class:`~numpy:numpy.ndarray` distance matrix between representative structures. tree : :class:`~idpflex.cnextend.Tree` Clustering of representative structures. Leaf nodes associated with each centroid contain property `iframe`, which is the frame index in the trajectory pointing to the atomic structure corresponding to the centroid. """
[docs] def keys(self): r"""Return the list of field names""" return self._fields
[docs] def save(self, filename): r"""Serialize the cluster trove and save to file Parameters ---------- filename: str File name """ with open(filename, 'wb') as outfile: pickle.dump(self, outfile)
[docs]def trajectory_centroids(a_universe, selection='not name H*', segment_length=1000, n_representatives=1000): r"""Cluster a set of consecutive trajectory segments into a set of representative structures via structural similarity (RMSD) The simulated trajectory is divided into consecutive segments, and hierarchical clustering is performed on each segment to yield a limited number of representative structures (centroids) per segment. Parameters ---------- a_universe : :class:`~MDAnalysis.core.universe.Universe` Topology and trajectory. selection : str atoms for which to calculate RMSD. See the `selections page <https://www.mdanalysis.org/docs/documentation_pages/selections.html>`_ for atom selection syntax. segment_length: int divide trajectory into segments of this length n_representatives : int Desired total number of representative structures. The final number may be close but not equal to the desired number. Returns ------- rep_ifr : list Frame indexes of representative structures (centroids) """ # noqa: E501 group = a_universe.select_atoms(selection) # Fragmentation of the trajectory n_frame = len(a_universe.trajectory) n_segments = int(n_frame / segment_length) nc = max(1, int(n_representatives / n_segments)) # clusters per segment rep_ifr = list() # frame indexes of representative structures info = """Clustering the trajectory: Creating {} representatives by partitioning {} frames into {} segments and retrieving {} representatives from each segment. """.format(nc * n_segments, n_frame, n_segments, nc) sys.stdout.write(info) sys.stdout.flush() # Hierarchical clustering on each trajectory fragment for i_segment in tqdm(range(n_segments)): indexes = range(i_segment * segment_length, (i_segment + 1) * segment_length) xyz = extract_coordinates(a_universe, group, indexes) rmsd = rmsd_matrix(xyz, condensed=True) z = hierarchy.linkage(rmsd, method='complete') for node in Tree(z=z).nodes_at_depth(nc-1): # Find the frame of each representative structure i_frame = i_segment * segment_length + node.representative(rmsd).id rep_ifr.append(i_frame) rep_ifr.sort() return rep_ifr
[docs]def cluster_with_properties(a_universe, pcls, p_names=None, selection='not name H*', segment_length=1000, n_representatives=1000): r"""Cluster a set of representative structures by structural similarity (RMSD) and by a set of properties The simulated trajectory is divided into segments, and hierarchical clustering is performed on each segment to yield a limited number of representative structures (the centroids). Properties are calculated for each centroid, thus each centroid is described by a property vector. The dimensionality of the vector is related to the number of properties and the dimensionality of each property. The distances between any two centroids is calculated as the Euclidean distance between their respective vector properties. The distance matrix containing distances between all possible centroid pairs is employed as the similarity measure to generate the hierarchical tree of centroids. The properties calculated for the centroids are stored in the leaf nodes of the hierarchical tree. Properties are then propagated up to the tree's root node. Parameters ---------- a_universe : :class:`~MDAnalysis.core.universe.Universe` Topology and trajectory. pcls : list Property classes, such as :class:`~idpflex.properties.Asphericity` of :class:`~idpflex.properties.SaSa` p_names : list Property names. If None, then default property names are used selection : str atoms for which to calculate RMSD. See the `selections page <https://www.mdanalysis.org/docs/documentation_pages/selections.html>`_ for atom selection syntax. segment_length: int divide trajectory into segments of this length n_representatives : int Desired total number of representative structures. The final number may be close but not equal to the desired number. Returns ------- :class:`~idpflex.cluster.ClusterTrove` Hierarchical clustering tree of the centroids """ # noqa: E501 rep_ifr = trajectory_centroids(a_universe, selection=selection, segment_length=segment_length, n_representatives=n_representatives) n_centroids = len(rep_ifr) # can be different than n_representatives # Create names if not passed if p_names is None: p_names = [Property.default_name for Property in pcls] # Calculate properties for each centroid l_prop = list() for p_name, Pcl in zip(p_names, pcls): l_prop.append([Pcl(name=p_name).from_universe(a_universe, index=i) for i in tqdm(rep_ifr)]) # Calculate distances between pair of centroids xyz = np.zeros((len(pcls), n_centroids)) for i_prop, prop in enumerate(l_prop): xyz[i_prop] = [p.y for p in prop] # zero mean and unity variance for each property xyz = np.transpose(zscore(xyz, axis=1)) distance_matrix = squareform(scipy.spatial.distance_matrix(xyz, xyz)) # Cluster the representative structures tree = Tree(z=hierarchy.linkage(distance_matrix, method='complete')) for i_leaf, leaf in enumerate(tree.leafs): leaf.add_property(ScalarProperty(name='iframe', y=rep_ifr[i_leaf])) # Propagate the properties up the tree [propagator_size_weighted_sum(prop, tree) for prop in l_prop] return ClusterTrove(rep_ifr, distance_matrix, tree)
[docs]def cluster_trajectory(a_universe, selection='not name H*', segment_length=1000, n_representatives=1000): r"""Cluster a set of representative structures by structural similarity (RMSD) The simulated trajectory is divided into segments, and hierarchical clustering is performed on each segment to yield a limited number of representative structures. These are then clustered into the final hierachical tree. Parameters ---------- a_universe : :class:`~MDAnalysis.core.universe.Universe` Topology and trajectory. selection : str atoms for which to calculate RMSD. See the `selections page <https://www.mdanalysis.org/docs/documentation_pages/selections.html>`_ for atom selection syntax. segment_length: int divide trajectory into segments of this length n_representatives : int Desired total number of representative structures. The final number may be close but not equal to the desired number. distance_matrix: :class:`~numpy:numpy.ndarray` Returns ------- :class:`~idpflex.cluster.ClusterTrove` clustering results for the representatives """ # noqa: E501 rep_ifr = trajectory_centroids(a_universe, selection=selection, segment_length=segment_length, n_representatives=n_representatives) group = a_universe.select_atoms(selection) xyz = extract_coordinates(a_universe, group, rep_ifr) distance_matrix = rmsd_matrix(xyz, condensed=True) # Cluster the representative structures tree = Tree(z=hierarchy.linkage(distance_matrix, method='complete')) for i_leaf, leaf in enumerate(tree.leafs): leaf.add_property(ScalarProperty(name='iframe', y=rep_ifr[i_leaf])) return ClusterTrove(rep_ifr, distance_matrix, tree)
[docs]def load_cluster_trove(filename): r"""Load a previously saved :class:`~idpflex.cluster.ClusterTrove` instance Parameters ---------- filename: str File name containing the serialized :class:`~idpflex.cluster.ClusterTrove` Returns ------- :class:`~idpflex.cluster.ClusterTrove` Cluster trove instance stored in file """ with open(filename, 'rb') as infile: t = pickle.load(infile) return t