import random
from contextlib import closing
from functools import partial
import multiprocessing
import numpy as np
import scipy
from scipy.stats import zscore
from scipy.spatial.distance import squareform
from MDAnalysis.analysis.rms import rmsd as find_rmsd
def _rmsd_rows(i_chunk, coords):
r"""RMDS values for a set of row indexes
This function is only used inside rmsd_matrix() but must
be defined at the module's namespace so that it can be
pickled and passed to multiprocessing.pool.imap_unordered
Parameters
----------
i_chunk : :py:class:`list`
list of row indexes
coords : :class:`~numpy:numpy.ndarray`
Atomic coordinates
Returns
-------
i_chunk : :py:class:`list`
Input list of row indexes
rmsd_chunk : :py:class:`list`
list of lists where each list item is a list of RMSD values
"""
n = len(coords)
rmsd_chunk = list()
for i in i_chunk:
ri = coords[i]
rmsd_values = [find_rmsd(ri, coords[j], superposition=True)
for j in range(i+1, n)]
rmsd_chunk.append(rmsd_values)
return i_chunk, rmsd_chunk
[docs]def rmsd_matrix(xyz, condensed=False):
r"""RMSD matrix between coordinate frames.
Parameters
----------
xyz : :class:`~numpy:numpy.ndarray`
Bare coordinates shape=(N, M, 3) with N: number of frames,
M: number of atoms
condensed: bool
Flag return matrix as square or condensed
Returns
-------
:class:`~numpy:numpy.ndarray`
Square NxN matrix, or condensed N*(N+1)/2 matrix
"""
n = len(xyz)
rmsd = np.zeros(n * n).reshape(n, n)
# rmsd is a symmetric matrix with zeros in the diagonal. Thus, we only
# calculate the upper diagonal; Distribute rows of RMSD among cores
indexes = list(range(0, n-1)) # row indexes
random.shuffle(indexes) # to balance load among cores
n_cores = multiprocessing.cpu_count()
m = int(n / n_cores) # number of rows per core
# One chunk of rows is assigned to one core
i_chunks = [indexes[i*m: (i+1)*m] for i in range(n_cores)]
# left over rows assigned to last chunk
i_chunks[-1].extend(indexes[m * n_cores:])
rr = partial(_rmsd_rows, coords=xyz)
# multiprocessing.Pool is a context manager only for python >= 3.3
with closing(multiprocessing.Pool(processes=n_cores)) as pool:
for i_chunk, dists in pool.imap_unordered(rr, i_chunks):
for k, i in enumerate(i_chunk):
rmsd[i][i+1:] = dists[k]
pool.terminate()
rmsd += rmsd.transpose()
if condensed is True:
rmsd = squareform(rmsd)
return rmsd
[docs]def distance_submatrix(dist_mat, indexes):
r"""Extract matrix of distances for a subset of indexes
If matrix is in condensed format, then the submatrix is returned in
condensed format too.
Parameters
----------
dist_mat: :class:`~numpy:numpy.ndarray`
NxN distance matrix
indexes: sequence of int
sequence of indexes from which a submatrix is extracted.
Returns
-------
:class:`~numpy:numpy.ndarray`
"""
m = dist_mat
if dist_mat.ndim == 1:
m = squareform(dist_mat)
subm = m[indexes][:, indexes]
if dist_mat.ndim == 1:
subm = squareform(subm)
return subm
[docs]def generate_distance_matrix(feature_vectors, weights=None,
func1d=zscore, func1d_args=None,
func1d_kwargs=None):
r"""
Calculate a distance matrix between measurements, based on
features of the measurements
Each measurement is characterized by a set of features, here
implemented as a vector. Thus, we sample each feature vector-item
a number of times equal to the number of measurements.
Distance `d` between two feature vectors `f` and `g` given weights `w`
`d**2 = sum_i w_i * (f_i - g_i)**2
Parameters
----------
feature_vectors: list
List of feature vectors, one vector for each measurement
weights: list
List of feature weight vectors, one for each measurement
func1d: function
Apply this function to the set of values obtained for each
feature vector-item. The size of the set is number of
measurements. The default function transforms the
values in each feature vector-item set such that the set
has zero mean and unit standard deviation.
func1d_args: list
Positional arguments to func1D
func1d_kwargs: dict
Optional arguments for func1D
Returns
-------
numpy.ndarray
Distance matrix in vector-form distance
"""
if len(set([len(fv) for fv in feature_vectors])) > 1:
raise RuntimeError('features vectors have different sizes')
xyz = np.array(feature_vectors) # shape=(#vectors, #features)
# Apply func1D to each feature vector-item set (axis 0 of array xyz)
fargs = [] if func1d_args is None else list(func1d_args)
fkwargs = {} if func1d_kwargs is None else dict(func1d_kwargs)
xyz = np.apply_along_axis(func1d, 0, xyz, *fargs, **fkwargs)
# weight each feature vector
if weights is not None:
if len(weights) != len(feature_vectors):
raise RuntimeError('Number of weight vectors different than '
'number of feature vectors')
xyz *= np.array(weights)
return squareform(scipy.spatial.distance_matrix(xyz, xyz))