Source code for msmbuilder.metrics.hybrid

import numpy as np
import logging
logger = logging.getLogger(__name__)
from baseclasses import AbstractDistanceMetric


[docs]class Hybrid(AbstractDistanceMetric): "A linear combination of other distance metrics" class HybridPreparedTrajectory(object): """Container to to hold the prepared trajectory. This container needs to support slice notation in a way that kind of passes through the indices to the 2nd dimension. So if you have a HybridPreparedTrajectory with 3 bases metrics, and you do metric[0:100], it needs to return a HybridPrepareTrajectory with the same three base metrics, but with each of the base prepared trajectories sliced 0:100. We don't want to slice out base_metrics and thus have metric[0] return only one of the three prepared_trajectories in its full length.""" def __init__(self, *args): self.num_base = len(args) self.length = len(args[0]) if not np.all((len(arg) == self.length for arg in args)): raise ValueError("Must all be equal length") self.datas = args def __getitem__(self, key): if isinstance(key, int): key = slice(key, key+1) return Hybrid.HybridPreparedTrajectory(*(d[key] for d in self.datas)) def __len__(self): return self.length def __setitem__(self, key, value): try: if self.num_base != value.num_base: raise ValueError("Must be prepared over the same metrics") except: raise ValueError("I can only set in something which is also a HybridPreparedTrajectory") for i in xrange(self.num_base): self.datas[i][key] = value.datas[i] def __init__(self, base_metrics, weights): """Create a hybrid linear combinatiin distance metric Parameters ---------- base_metrics : list of distance metric objects weights : list of floats list of scalars of equal length to `base_metrics` -- each base metric will be multiplied by that scalar when they get summed. """ self.base_metrics = base_metrics self.weights = weights self.num = len(self.base_metrics) if not len(self.weights) == self.num: raise ValueError() def prepare_trajectory(self, trajectory): """Preprocess trajectory for use with this metric Parameters ---------- trajectory : msmbuilder.Trajectory Trajectory to prepare Returns ------- prepared_trajectory : array_like The prepared trajectory is a special array like object called HybridPreparedTrajectory which is designed to pass through the slicing correctly so that if you ask for prepared_trajectory[5] you get the appropriate 5th frames dihedral angles, RMSD, etc (depending what base metrics you used) """ prepared = (m.prepare_trajectory(trajectory) for m in self.base_metrics) return self.HybridPreparedTrajectory(*prepared) def one_to_many(self, prepared_traj1, prepared_traj2, index1, indices2): """Calculate a vector of distances from one frame of the first trajectory to many frames of the second trajectory The distances calculated are from the `index1`th frame of `prepared_traj1` to the frames in `prepared_traj2` with indices `indices2` Parameters ---------- prepared_traj1 : ndarray First prepared trajectory prepared_traj2 : ndarray Second prepared trajectory index1 : int index in `prepared_trajectory` indices2 : ndarray list of indices in `prepared_traj2` to calculate the distances to Returns ------- Vector of distances of length len(indices2) """ distances = None for i in range(self.num): d = self.base_metrics[i].one_to_many(prepared_traj1.datas[i], prepared_traj2.datas[i], index1, indices2) if distances is None: distances = self.weights[i] * d else: distances += self.weights[i] * d return distances def one_to_all(self, prepared_traj1, prepared_traj2, index1): """Calculate the vector of distances from the index1th frame of prepared_traj1 to all of the frames in prepared_traj2. Parameters ---------- prepared_traj1 : prepared_trajectory First prepared trajectory prepared_traj2 : prepared_trajectory Second prepared trajectory index1 : int index in `prepared_trajectory` Returns ------- distances : ndarray vector of distances of length len(prepared_traj2) Notes ----- Although this might seem to be a special case of one_to_many(), it can often be implemented in a much more optimized way because it doesn't require construction of the indices2 array and array slicing in python is kindof slow. """ distances = None for i in range(self.num): d = self.base_metrics[i].one_to_all(prepared_traj1.datas[i], prepared_traj2.datas[i], index1) if distances is None: distances = self.weights[i] * d else: distances += self.weights[i] * d return distances def all_pairwise(self, prepared_traj): """Calculate condensed distance metric of all pairwise distances See `scipy.spatial.distance.squareform` for information on how to convert the condensed distance matrix to a redundant square matrix Parameters ---------- prepared_traj : array_like Prepared trajectory Returns ------- Y : ndarray A 1D array containing the distance from each frame to each other frame See Also -------- fast_pdist scipy.spatial.distance.squareform """ distances = None for i in range(self.num): d = self.base_metrics[i].all_pairwise(prepared_traj.datas[i]) distances = self.weights[i] * d if distances is None else distances + self.weights[i] * d return distances
[docs]class HybridPNorm(Hybrid): """A p-norm combination of other distance metrics. With p=2 for instance, this gives you the root mean square combination of the base metrics""" def __init__(self, base_metrics, weights, p=2): """Initialize the HybridPNorm distance metric. Parameters ---------- base_metrics : list of distance metric objects weights : list of floats list of scalars of equal length to `base_metrics` -- each base metric will be multiplied by that scalar. p : float p should be a scalar, greater than 0, which will be the exponent. If p=2, all the base metrics will be squared, then summed, then the square root will be taken. If p=3, the base metrics will be cubed, summed and cube rooted, etc. """ self.p = float(p) super(HybridPNorm, self).__init__(base_metrics, weights) def one_to_many(self, prepared_traj1, prepared_traj2, index1, indices2): """Calculate a vector of distances from one frame of the first trajectory to many frames of the second trajectory The distances calculated are from the `index1`th frame of `prepared_traj1` to the frames in `prepared_traj2` with indices `indices2` Parameters ---------- prepared_traj1 : ndarray First prepared trajectory prepared_traj2 : ndarray Second prepared trajectory index1 : int index in `prepared_trajectory` indices2 : ndarray list of indices in `prepared_traj2` to calculate the distances to Returns ------- Vector of distances of length len(indices2) """ distances = None for i in range(self.num): d = self.base_metrics[i].one_to_many(prepared_traj1.datas[i], prepared_traj2.datas[i], index1, indices2) if distances is None: distances = (self.weights[i]*d)**self.p else: distances += (self.weights[i]*d)**self.p return distances**(1.0 / self.p) def one_to_all(self, prepared_traj1, prepared_traj2, index1): """Calculate the vector of distances from the index1th frame of prepared_traj1 to all of the frames in prepared_traj2. Parameters ---------- prepared_traj1 : prepared_trajectory First prepared trajectory prepared_traj2 : prepared_trajectory Second prepared trajectory index1 : int index in `prepared_trajectory` Returns ------- distances : ndarray vector of distances of length len(prepared_traj2) Notes ----- Although this might seem to be a special case of one_to_many(), it can often be implemented in a much more optimized way because it doesn't require construction of the indices2 array and array slicing in python is kindof slow. """ distances = None for i in range(self.num): d = self.base_metrics[i].one_to_all(prepared_traj1.datas[i], prepared_traj2.datas[i], index1) if distances is None: distances = (self.weights[i]*d)**self.p else: distances += (self.weights[i]*d)**self.p return distances**(1.0 / self.p) def all_pairwise(self, prepared_traj): """Calculate condensed distance metric of all pairwise distances See `scipy.spatial.distance.squareform` for information on how to convert the condensed distance matrix to a redundant square matrix Parameters ---------- prepared_traj : array_like Prepared trajectory Returns ------- Y : ndarray A 1D array containing the distance from each frame to each other frame See Also -------- fast_pdist scipy.spatial.distance.squareform """ distances = None for i in range(self.num): d = self.base_metrics[i].all_pairwise(prepared_traj.datas[i]) d = (self.weights[i]*d)**self.p distances = d if distances is None else distances + (self.weights[i]*d) logger.info('got %s', i) return distances**(1.0 / self.p)