import os
import numpy as np
import pandas as pd
from alphabase.io.hdf import HDF_File
from pyteomics import mzml
from peptdeep.utils import logging
from inspect import currentframe, getframeinfo
try:
from alpharaw.raw_access.pythermorawfilereader import RawFileReader
except (ImportError, AttributeError, RuntimeError) as e:
frameinfo = getframeinfo(currentframe())
logging.warn(
f"{frameinfo.filename}#L{frameinfo.lineno}: Cannot import `RawFileReader`, check if PythonNet is installed. See https://github.com/MannLabs/alphapeptdeep#pip"
)
RawFileReader = None
[docs]
class MSReaderBase:
[docs]
def __init__(self):
self.spectrum_df: pd.DataFrame = pd.DataFrame()
self.peak_df: pd.DataFrame = pd.DataFrame()
# self.mzs: np.ndarray = np.array([])
# self.intensities: np.ndarray = np.array([])
[docs]
def load(self, file_path):
raise NotImplementedError("load()")
[docs]
def build_spectrum_df(
self,
scan_list: list,
scan_indices: np.ndarray,
rt_list: list,
mobility_list: list = None,
nce_list: list = None,
):
"""Build spectrum_df by the given information
Parameters
----------
scan_list : list
scan number list
scan_indices : np.array
starts and end positions of ms2
peaks for each scan
rt_list : list
retention time (minutes) for each scan
mobility_list : list, optional
mobility for each scan. Defaults to None.
"""
def set_col(col, indexes, values, dtype, na_value):
self.spectrum_df.loc[indexes, col] = values
self.spectrum_df[col].fillna(na_value, inplace=True)
self.spectrum_df[col] = self.spectrum_df[col].astype(dtype)
scan_list = np.array(scan_list, dtype=np.int32)
if scan_list.min() > 0:
# thermo scan >= 1
scan_list -= 1
idx_len = np.max(scan_list) + 1
self.spectrum_df = pd.DataFrame(index=np.arange(idx_len, dtype=np.int64))
self.spectrum_df["spec_idx"] = self.spectrum_df.index.values
set_col("peak_start_idx", scan_list, scan_indices[:-1], np.int64, -1)
set_col("peak_end_idx", scan_list, scan_indices[1:], np.int64, -1)
set_col("rt", scan_list, rt_list, np.float64, np.nan)
if mobility_list is not None:
set_col("mobility", scan_list, mobility_list, np.float64, np.nan)
if nce_list is not None:
set_col("nce", scan_list, nce_list, np.float64, np.nan)
[docs]
def get_peaks(self, spec_idx: int):
"""Get peak (mz and intensity) values by `spec_idx`
Parameters
----------
spec_idx : int
indicator for a spectrum,
could be scan_num-1 for thermo data.
Returns
-------
np.array
mz values for the given spec_idx
np.array
intensity values for the given spec_idx
"""
if spec_idx not in self.spectrum_df.index:
return None, None
start_idx, end_idx = self.spectrum_df.loc[
spec_idx, ["peak_start_idx", "peak_end_idx"]
].values.astype(np.int64)
return (
self.peak_df.mz.values[start_idx:end_idx],
self.peak_df.intensity.values[start_idx:end_idx],
)
[docs]
def get_peaks_by_scan_num(self, scan_num: int):
"""Get peak (mz and intensity) values by `spec_idx`
Parameters
----------
scan_num : int
scan_num of thermodata
Returns
-------
np.array
mz values for the given spec_idx (scan)
np.array
intensity values for the given spec_idx
"""
return self.get_peaks(scan_num - 1)
[docs]
class AlphaPept_HDF_MS1_Reader(MSReaderBase):
"""MS1 from AlphaPept HDF"""
[docs]
def load(self, file_path):
hdf = HDF_File(file_path)
self.peak_df["mz"] = hdf.Raw.MS1_scans.mass_list_ms1.values
self.peak_df["intensity"] = hdf.Raw.MS1_scans.int_list_ms1.values
self.build_spectrum_df(
scan_list=hdf.Raw.MS1_scans.scan_list_ms1.values,
scan_indices=hdf.Raw.MS1_scans.indices_ms1.values,
rt_list=hdf.Raw.MS1_scans.rt_list_ms1.values,
mobility_list=hdf.Raw.MS1_scans.mobility.values
if hasattr(hdf.Raw.MS1_scans, "mobility")
else None,
)
[docs]
class AlphaPept_HDF_MS2_Reader(MSReaderBase):
"""MS2 from AlphaPept HDF"""
[docs]
def load(self, file_path):
hdf = HDF_File(file_path)
self.peak_df["mz"] = hdf.Raw.MS2_scans.mass_list_ms2.values
self.peak_df["intensity"] = hdf.Raw.MS2_scans.int_list_ms2.values
if hasattr(hdf.Raw.MS2_scans, "mobility2"):
scan_list = np.arange(len(hdf.Raw.MS2_scans.rt_list_ms2))
else:
scan_list = hdf.Raw.MS2_scans.scan_list_ms2.values
self.build_spectrum_df(
scan_list=scan_list,
scan_indices=hdf.Raw.MS2_scans.indices_ms2.values,
rt_list=hdf.Raw.MS2_scans.rt_list_ms2.values,
mobility_list=hdf.Raw.MS2_scans.mobility2.values
if hasattr(hdf.Raw.MS2_scans, "mobility2")
else None,
)
[docs]
class MZMLReader(MSReaderBase):
[docs]
def load(self, mzmlF):
if isinstance(mzmlF, str):
f = mzml.read(mzmlF)
else:
f = mzmlF
scanset = set()
masses_list = []
intens_list = []
scan_list = []
rt_list = []
nce_list = []
for entry in f:
if entry["ms level"] != 2: # only care about MS2 scans
continue
scan = int(entry["id"].split("scan=")[1])
if scan in scanset:
continue
# accept only hcd and cid
filter_string = entry["scanList"]["scan"][0]["filter string"]
if "@hcd" in filter_string:
nce_list.append(filter_string.split("@hcd")[1].split(" ")[0])
elif "@cid" in filter_string:
nce_list.append(filter_string.split("@cid")[1].split(" ")[0])
else:
continue
scanset.add(scan)
scan_list.append(scan)
masses_list.append(entry["m/z array"])
intens_list.append(entry["intensity array"])
rt_list.append(entry["scanList"]["scan"][0]["scan start time"])
if isinstance(mzmlF, str):
f.close()
self.build_spectrum_df(
scan_list, index_ragged_list(masses_list), rt_list, nce_list=nce_list
)
self.peak_df["mz"] = np.concatenate(masses_list)
self.peak_df["intensity"] = np.concatenate(intens_list)
[docs]
def read_until(file, until):
lines = []
while True:
line = file.readline().strip()
if line == "":
break
elif line.startswith(until):
break
else:
lines.append(line)
return lines
[docs]
def find_line(lines, start):
for line in lines:
if line.startswith(start):
return line
return None
[docs]
def parse_pfind_scan_from_TITLE(pfind_title):
return int(pfind_title.split(".")[-4])
[docs]
def is_pfind_mgf(mgf):
return mgf.upper().endswith("_HCDFT.MGF")
[docs]
def index_ragged_list(ragged_list: list) -> np.ndarray:
"""Create lookup indices for a list of arrays for concatenation.
Parameters
----------
value : list
Input list of arrays.
Returns
-------
indices
A numpy array with indices.
"""
indices = np.zeros(len(ragged_list) + 1, np.int64)
indices[1:] = [len(i) for i in ragged_list]
indices = np.cumsum(indices)
return indices
[docs]
class MGFReader(MSReaderBase):
"""MGF Reader (MS2)"""
[docs]
def load(self, mgf):
if isinstance(mgf, str):
f = open(mgf)
else:
f = mgf
scanset = set()
masses_list = []
intens_list = []
scan_list = []
rt_list = []
while True:
line = f.readline()
if not line:
break
if line.startswith("BEGIN IONS"):
lines = read_until(f, "END IONS")
masses = []
intens = []
scan = None
RT = 0
for line in lines:
if line[0].isdigit():
mass, inten = [float(i) for i in line.strip().split()]
masses.append(mass)
intens.append(inten)
elif line.startswith("SCAN="):
scan = int(line.split("=")[1])
elif line.startswith("RTINSECOND"):
RT = float(line.split("=")[1]) / 60
if not scan:
title = find_line(lines, "TITLE=")
scan = parse_pfind_scan_from_TITLE(title)
if scan in scanset:
continue
scanset.add(scan)
scan_list.append(scan)
rt_list.append(RT)
masses_list.append(np.array(masses))
intens_list.append(np.array(intens))
if isinstance(mgf, str):
f.close()
self.build_spectrum_df(scan_list, index_ragged_list(masses_list), rt_list)
self.peak_df["mz"] = np.concatenate(masses_list)
self.peak_df["intensity"] = np.concatenate(intens_list)
[docs]
class MSReaderProvider:
"""Factory class to register and get MS Readers"""
[docs]
def __init__(self):
self.reader_dict = {}
[docs]
def register_reader(self, ms2_type, reader_class):
self.reader_dict[ms2_type.lower()] = reader_class
[docs]
def get_reader(self, file_type) -> MSReaderBase:
if file_type not in self.reader_dict:
frameinfo = getframeinfo(currentframe())
logging.warn(
f'{frameinfo.filename}#L{frameinfo.lineno}: "{file_type}" is not registered in `MSReaderProvider` yet.'
)
return None
else:
return self.reader_dict[file_type.lower()]()
ms2_reader_provider = MSReaderProvider()
ms2_reader_provider.register_reader("mgf", MGFReader)
ms2_reader_provider.register_reader("alphapept", AlphaPept_HDF_MS2_Reader)
ms2_reader_provider.register_reader("alphapept_hdf", AlphaPept_HDF_MS2_Reader)
ms2_reader_provider.register_reader("mzml", MZMLReader)
ms1_reader_provider = MSReaderProvider()
ms1_reader_provider.register_reader("alphapept", AlphaPept_HDF_MS1_Reader)
ms1_reader_provider.register_reader("alphapept_hdf", AlphaPept_HDF_MS1_Reader)
if RawFileReader is None:
class ThermoRawMS1Reader:
def __init__(self):
raise NotImplementedError("RawFileReader is not available")
class ThermoRawMS2Reader:
def __init__(self):
raise NotImplementedError("RawFileReader is not available")
else:
[docs]
class ThermoRawMS1Reader(MSReaderBase):
"""Thermo Raw MS1 Reader"""
[docs]
def __init__(self):
super().__init__()
self.profile_mode = False
[docs]
def load(self, raw_path):
rawfile = RawFileReader(raw_path)
spec_indices = np.array(
range(rawfile.FirstSpectrumNumber, rawfile.LastSpectrumNumber + 1)
)
scan_list = []
rt_list = []
masses_list = []
intens_list = []
for i in spec_indices:
try:
ms_order = rawfile.GetMSOrderForScanNum(i)
if ms_order == 1:
if self.profile_mode:
masses, intens = rawfile.GetProfileMassListFromScanNum(i)
else:
masses, intens = rawfile.GetCentroidMassListFromScanNum(i)
scan_list.append(i)
rt_list.append(rawfile.RTInSecondsFromScanNum(i))
masses_list.append(masses)
intens_list.append(intens)
except KeyboardInterrupt as e:
raise e
except SystemExit as e:
raise e
except Exception as e:
print(f"Bad scan={i} in raw file '{raw_path}'")
self.build_spectrum_df(
scan_list,
index_ragged_list(masses_list),
rt_list,
)
self.peak_df["mz"] = np.concatenate(masses_list)
self.peak_df["intensity"] = np.concatenate(intens_list)
rawfile.Close()
[docs]
class ThermoRawMS2Reader(MSReaderBase):
"""Thermo RAW MS2 Reader"""
[docs]
def __init__(self):
super().__init__()
self.profile_mode = False
[docs]
def load(self, raw_path):
rawfile = RawFileReader(raw_path)
spec_indices = np.array(
range(rawfile.FirstSpectrumNumber, rawfile.LastSpectrumNumber + 1)
)
scan_list = []
rt_list = []
masses_list = []
intens_list = []
for i in spec_indices:
try:
ms_order = rawfile.GetMSOrderForScanNum(i)
if ms_order == 2:
if self.profile_mode:
masses, intens = rawfile.GetProfileMassListFromScanNum(i)
else:
masses, intens = rawfile.GetCentroidMassListFromScanNum(i)
scan_list.append(i)
rt_list.append(rawfile.RTFromScanNum(i))
masses_list.append(masses)
intens_list.append(intens)
except KeyboardInterrupt as e:
raise e
except SystemExit as e:
raise e
# except Exception as e:
# print(f"Bad scan={i} in raw file '{raw_path}'")
self.build_spectrum_df(
scan_list,
index_ragged_list(masses_list),
rt_list,
)
self.peak_df["mz"] = np.concatenate(masses_list)
self.peak_df["intensity"] = np.concatenate(intens_list)
rawfile.Close()
ms2_reader_provider.register_reader("thermo", ThermoRawMS2Reader)
ms2_reader_provider.register_reader("thermo_raw", ThermoRawMS2Reader)
ms1_reader_provider.register_reader("thermo", ThermoRawMS1Reader)
ms1_reader_provider.register_reader("thermo_raw", ThermoRawMS1Reader)