Source code for dimepy.spectrum

# -*- coding: utf-8 -*-
# encoding: utf-8

"""
The class :py:class:`Spectrum` has been designed to load data
from a mzML file and to represetn the data as a python object.

.. note::
    This class is still being actively developed and will likely change over time.
"""
import numpy as np
from typing import Tuple, List
from scipy.stats import binned_statistic
from pymzml.run import Reader as pymzmlReader
from .scan import Scan
from .utils import terms, bin_masses_and_intensities
import itertools

# Copyright (c) 2017-2019 Keiron O'Shea
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public
# License as published by the Free Software Foundation; either
# version 3 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public
# License along with this program; if not, write to the
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
# Boston, MA 02110-1301 USA

[docs]class Spectrum(object): """ Initialise Spectrum object for a given mzML file. """
[docs] def __init__(self, filepath: str, identifier: str, injection_order: int = None, stratification: str = None, snr_estimator: str = False, peak_type: str = "raw", MS1_precision: float = 5e-6, MSn_precision: float = 20e-6 ): """ Initialise a Spectrum object for a given mzML file. Arguments: filepath (str): Path to the mzML file to parse. identifier (str): Unique identifier for the Spectrum object. injection_order (int): The injection number of the Spectrum object. stratification (str): Class label of the Spectrum object. snr_estimator (str): Signal to noise method used to filter. Currently supported signal-to-noise estimation methods are: * 'median' (default) * 'mean' * 'mad' peak_type (raw): What peak type to load in. Currently supported peak types are: * raw (default) * centroided * reprofiled MS1_precision (float): Measured precision for the MS level 1. MSn_precision (float): Measured precision for the MS level n. """ self.filepath = filepath self.identifier = identifier self.injection_order = injection_order self.stratification = stratification self.snr_estimator = snr_estimator self.peak_type = peak_type self.MS1_precision = MS1_precision self.MSn_precision = MSn_precision self.read_scans = [] self._masses = False self._intensities = False self._scans, self._to_use = self._base_load()
def _base_load(self) -> Tuple[np.array, np.array]: extraAccessions = [ [[y, ["value"]] for y in terms[x]] for x in terms.keys() ] # Flatten the list of lists of lists into a list of lists. extraAccessions = list(itertools.chain.from_iterable(extraAccessions)) reader = pymzmlReader( self.filepath, extraAccessions=extraAccessions, MS1_Precision = self.MS1_precision, MSn_Precision = self.MSn_precision ) scans = [] to_use = [] for scan in reader: scans.append(scan) to_use.append(True) return np.array(scans), np.array(to_use)
[docs] def limit_polarity(self, polarity: str) -> None: """ Limit the Scans found within the mzML file to whatever polarity is given. This should only be called where fast-polarity switching is used. Arguments: polarity (str): polarity type of the scans required Supported polarity types are: * 'positive' * 'negative' """ def _determine_polarity(scan) -> str: scan_polarity = None for polarity_acc in terms["polarity"]: if scan.get(polarity_acc) != None: scan_polarity = terms["polarity"][polarity_acc] return scan_polarity for index, scan in enumerate(self._scans): if _determine_polarity(scan) != polarity.upper(): self._to_use[index] = False
[docs] def limit_infusion(self, threshold: int = 3) -> None: """ This method is a slight extension of Manfred Beckmann's (meb@aber.ac.uk) LCT/Q-ToF scan retrieval method in FIEMSpro in which we use the median absolute deviation of all TICs within a Spectrum to determine when the infusion has taken place. Consider the following Infusion Profile: :: _ / \ / \_ ____/ \_________________ 0 0.5 1 1.5 2 [min] |--------| Apex We are only interested in the scans in which the infusion takes place (20 - 50 seconds). Applying this method changes the to_use values to only be True where the TIC is >= TIC * mad_multiplier. Arguments: mad_multiplier (int): The multiplier for the median absolute deviation method to take the infusion profile from. """ def _calculate_mad(tics: np.array) -> float: return np.median(np.abs(tics - np.median(tics))) def _get_mask(tics: np.array, mad: float) -> np.array: tics = tics[:, None] median = np.median(tics, axis=0) diff = np.sum((tics - median)**2, axis=-1) diff = np.sqrt(diff) med_abs_deviation = np.median(diff) modified_z_score = 0.6745 * diff / med_abs_deviation return modified_z_score >= threshold tics = np.array([scan.TIC for scan in self._scans[self._to_use]]) mad = _calculate_mad(tics) apex_index = _get_mask(tics, mad) to_use = np.where(self._to_use == True)[0] for i, j in enumerate(to_use): self._to_use[j] = apex_index[i]
[docs] def reset(self) -> None: """ A method to reset the Spectrum object in its entirety. """ self._to_use = self._to_use == False self._masses = False self._intensities = False self.read_scans = []
[docs] def load_scans(self) -> None: """ This method loads the scans in accordance to whatever Scans are set to True in the to_use list. .. note:: If you want to actually make use of masses and intensities (you probably do), then ensure that you call this method. """ scans = [] for scan in self._scans[self._to_use]: scan = Scan(scan, snr_estimator=self.snr_estimator, peak_type=self.peak_type) scans.append(scan) self.read_scans = scans self._load_masses_and_ints_from_scans()
def _load_masses_and_ints_from_scans(self) -> None: masses = [] intensities = [] for scan in self.read_scans: masses.extend(scan.masses) intensities.extend(scan.intensities) masses = np.array(masses) intensities = np.array(intensities) sorted_idx = np.argsort(masses) self._masses = masses[sorted_idx] self._intensities = intensities[sorted_idx]
[docs] def bin(self, bin_width: float = 0.01, statistic: str = "mean"): """" Method to conduct mass binning to nominal mass and mass spectrum generation across a Spectrum. Arguments: bin_width (float): The mass-to-ion bin-widths to use for binning. statistic (str): The statistic to use to calculate bin values. Supported statistic types are: * 'mean' (default): compute the mean of intensities for points within each bin. Empty bins will be represented by NaN. * 'std': compute the standard deviation within each bin. This is implicitly calculated with ddof=0. * 'median': compute the median of values for points within each bin. Empty bins will be represented by NaN. * 'count': compute the count of points within each bin. This is identical to an unweighted histogram. values array is not referenced. * 'sum': compute the sum of values for points within each bin. This is identical to a weighted histogram. * 'min': compute the minimum of values for points within each bin. Empty bins will be represented by NaN. * 'max': compute the maximum of values for point within each bin. Empty bins will be represented by NaN. """ self._masses, self._intensities = bin_masses_and_intensities( self.masses, self.intensities, bin_width, statistic)
[docs] def remove_spurious_peaks(self, bin_width: float = 0.01, threshold: float = 0.25, scan_grouping: float = 50.0): """ Method that's highly influenced by Jasen Finch's (jsf9@aber.ac.uk) binneR, in which spurios peaks can be removed. At the time of writing, this method has serious performance issues and needs to be rectified. but should still work as intended (provided that you don't mind how long it takes to complete) Arguments: bin_width (float): The mass-to-ion bin-widths to use for binning. threshold (float): Percentage of scans in which a peak must be in in order for it to be considered. scan_grouping (float): Mass-to-ion scan groups, this splits the scans into groups to ease the processing somewhat. It is strongly recommended that you keep this at it's default value of of 50.0 .. note:: load_scans() must first be run in order for this to work. """ def _determine_scan_group(): medians = [np.mean(x.masses) for x in self.scans] bins = np.arange(np.min(medians), np.max(medians), scan_grouping) _, _, binnumber = binned_statistic(medians, medians, statistic="mean", bins=bins) scan_groups = {} for index, group in enumerate(np.unique(binnumber)): scan_groups[index] = np.array(self.scans)[binnumber == group] return scan_groups def _get_bins(scan_list: list) -> np.array: mass_ranges = np.array([x.mass_range for x in scan_list]) min_mass = np.min([x[0] for x in mass_ranges]) - bin_width max_mass = np.max([x[1] for x in mass_ranges]) + bin_width return np.arange(min_mass, max_mass, step=bin_width) def _calculate_bins(scan_list: list, bins: np.array) -> np.array: scan_index = [] for index, scan in enumerate(scan_list): _, _, binnumber = binned_statistic(scan.masses, scan.intensities, bins=bins) counts = [] for bin_index in range(len(bins)): number_bins = np.count_nonzero(binnumber == bin_index) counts.append(number_bins) scan_index.append(np.array(counts)) _tmp_si = np.array(scan_index) return bins[_tmp_si.sum(axis=0) >= len(scan_list) * threshold] def _remove_from_scans(scan_list: list, non_spurios_masses: np.array) -> None: for scan in scan_list: masses = [] intensities = [] for non_spurios_mass in non_spurios_masses: keep = np.where( np.logical_and( scan.masses >= non_spurios_mass, scan.masses <= non_spurios_mass + bin_width)) masses.extend(scan.masses[keep].tolist()) intensities.extend(scan.intensities[keep].tolist()) scan.masses = masses scan.intensities = intensities if scan_grouping: scan_groups = _determine_scan_group() else: scan_groups = {0: self.scans} for scan_group in scan_groups: scan_list = scan_groups[scan_group] bins = _get_bins(scan_list) non_spurios_masses = _calculate_bins(scan_list, bins) _remove_from_scans(scan_list, non_spurios_masses) # Load in new masses and intensities. self._load_masses_and_ints_from_scans()
@property def scans(self) -> List[Scan]: return self.read_scans @property def masses(self) -> np.array: if type(self._masses) != bool: return self._masses else: raise ValueError("No masses generated, run load_scans() first!") @property def intensities(self) -> np.array: if type(self._intensities) != bool: return self._intensities else: raise ValueError("No intensities generated, run load_scans() first!") @property def to_use(self) -> List[bool]: return self._to_use @property def mass_range(self) -> Tuple[float, float]: return [np.min(self.masses), np.max(self.masses)]