Coverage for annotate.py : 80%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2Greatly inspired/copied from:
3https://github.com/kusterlab/prosit/blob/master/prosit
5And released under an Apache 2.0 license
6"""
8import collections
9from collections import OrderedDict, defaultdict
10from typing import Callable, Dict, List, Tuple, Union, Iterator
11import warnings
13import numpy
14from numpy import bool_, float64, ndarray
16from elfragmentador import constants, encoding_decoding
19def solve_alias(x):
20 try:
21 x = x if len(x) == 1 else x[:1] + f"[{constants.MOD_PEPTIDE_ALIASES[x]}]"
22 except KeyError:
23 x = (
24 x
25 if len(x) == 1
26 else x[:1] + f"[{constants.MOD_PEPTIDE_ALIASES[x.replace('[', '[+')]}]"
27 )
29 x = x if len(x) != 3 else x[:1] # Takes care of C[]
31 return x
34def peptide_parser(p: str, solve_aliases=False) -> Iterator[str]:
35 """
36 Parses maxquant formatted peptide strings
38 Examples
39 ========
40 >>> list(peptide_parser("AAACC"))
41 ['n', 'A', 'A', 'A', 'C', 'C', 'c']
42 >>> list(peptide_parser("AAAM(ox)CC"))
43 ['n', 'A', 'A', 'A', 'M(ox)', 'C', 'C', 'c']
44 >>> list(peptide_parser("AAAM[+16]CC"))
45 ['n', 'A', 'A', 'A', 'M[+16]', 'C', 'C', 'c']
46 """
48 ANNOTATIONS = "[](){}"
50 # This fixes a bug where passing a list would yield the incorrect results
51 p = "".join(p)
53 if p[0] in ANNOTATIONS:
54 raise ValueError(f"sequence starts with '{p[0]}'")
55 n = len(p)
56 i = 0
58 # Yield n terminus if its not explicit in the sequence
59 if p[0] != "n":
60 yield "n"
62 while i < n:
63 if p[i] == "_":
64 i += 1
65 continue
66 elif i + 1 < n and p[i + 1] in ANNOTATIONS:
67 p_ = p[i + 2 :]
68 annots = [x for x in ANNOTATIONS if x in p_]
69 nexts = []
70 for an in annots:
71 nexts.append(p_.index(an))
72 j = min(nexts)
73 offset = i + j + 3
74 out = p[i:offset]
75 yield_value = solve_alias(out) if solve_aliases else out
76 i = offset
77 else:
78 yield_value = p[i]
79 i += 1
81 yield yield_value
83 # Yield c terminus if its not explicit in the sequence
84 if yield_value != "c":
85 yield "c"
88def mass_diff_encode_seq(seq):
89 iter = peptide_parser(seq, solve_aliases=True)
90 iter = encoding_decoding.clip_explicit_terminus(list(iter))
91 # For some reason skyline detects T[80] but not T[+80] ...
92 # And does not detect T[181] as a valid mod ...
93 out = "".join([constants.MASS_DIFF_ALIASES_I[x].replace("+", "") for x in iter])
94 return out
97def canonicalize_seq(seq: str, robust: bool = False) -> str:
98 """canonicalize_seq Solves all modification aliases in a sequence.
100 Given a sequence, converts al supported modification aliases to the
101 "canonical" version of them and returns the new version.
103 Parameters
104 ----------
105 seq : str
106 Modified peptide sequence, for example "PEPTIDE[+23]TIDE")
107 robust : bool, optional
108 Wether you want error to be silent and return none when they happen, by default False
110 Returns
111 -------
112 str
113 Same sequence as input but with all mod aliases replaced for the primary
114 one in this package
116 Raises
117 ------
118 e
119 [description]
120 """
121 try:
122 out = "".join(peptide_parser(seq, solve_aliases=True))
123 except KeyError as e:
124 out = None
125 if not robust:
126 warnings.warn(f"Non-supported sequence found in {seq}")
127 raise e
129 return out
132def get_theoretical_mass(peptide: str):
133 """
134 Calculates the theoretical mass of a peptide
136 Example
137 -------
138 >>> get_theoretical_mass("MYPEPTIDE")
139 1093.4637787
140 """
141 aas = peptide_parser(peptide)
142 out = sum([constants.MOD_AA_MASSES[a] for a in aas])
143 return out
146def get_precursor_mz(peptide: str, charge: int):
147 """
148 Calculates the theoretical mass/charge of a precursor peptide
149 (assumes positive mode)
151 Example
152 -------
153 >>> get_precursor_mz("MYPEPTIDE", 1)
154 1094.471055167
155 >>> get_precursor_mz("MYPEPTIDE", 2)
156 547.739165817
157 """
158 return get_mz(get_theoretical_mass(peptide), 0, charge)
161def get_forward_backward(peptide: str) -> Tuple[ndarray, ndarray]:
162 """
163 Calculates masses forward and backwards from aminoacid
164 sequences
166 Examples
167 ========
168 >>> get_forward_backward("AMC")
169 (array([ 1.00782503, 72.04493903, 203.08542403, 363.11607276,
170 380.11881242]), array([ 17.00273967, 177.03338839, 308.07387339, 379.11098739,
171 380.11881242]))
172 >>> get_forward_backward("AM[147]C")
173 (array([ 1.00782503, 72.04493903, 219.08033403, 379.11098276,
174 396.11372242]), array([ 17.00273967, 177.03338839, 324.06878339, 395.10589739,
175 396.11372242]))
176 >>> get_forward_backward("n[+42]AM[147]C")
177 (array([ 43.01839004, 114.05550404, 261.09089904, 421.12154776,
178 438.12428742]), array([ 17.00273967, 177.03338839, 324.06878339, 395.10589739,
179 438.12428742]))
180 """
181 amino_acids = peptide_parser(peptide)
182 masses = [constants.MOD_AA_MASSES[a] for a in amino_acids]
183 forward = numpy.cumsum(masses)
184 backward = numpy.cumsum(masses[::-1])
185 return forward, backward
188def get_mz(sum_: float64, ion_offset: float, charge: int) -> float64:
189 return (sum_ + ion_offset + charge * constants.PROTON) / charge
192def get_mzs(cumsum: ndarray, ion_type: str, z: int) -> List[float64]:
193 # return (cumsum[:-1] + constants.ION_OFFSET[ion_type] + (z * constants.PROTON))/z
194 # TODO this can be vectorized if needed
195 return [get_mz(s, constants.ION_OFFSET[ion_type], z) for s in cumsum[:-2]][1:]
198def get_annotation(
199 forward: ndarray, backward: ndarray, charge: int, ion_types: str
200) -> OrderedDict:
201 """
202 Calculates the ion annotations based on the forward
203 and backward cumulative masses
205 Example
206 =======
207 >>> fw, bw = get_forward_backward("AMC")
208 >>> fw
209 array([ 1.00782503, 72.04493903, 203.08542403, 363.11607276, 380.11881242])
210 >>> bw
211 array([ 17.00273967, 177.03338839, 308.07387339, 379.11098739, 380.11881242])
212 >>> get_annotation(fw, bw, 3, "y")
213 OrderedDict([('y1', 60.354347608199994), ...])
214 """
215 tmp = "{}{}"
216 tmp_nl = "{}{}-{}"
217 all_ = {}
218 for ion_type in ion_types:
219 if ion_type in constants.FORWARD:
220 cummass = forward
221 elif ion_type in constants.BACKWARD:
222 cummass = backward
223 else:
224 raise ValueError("unknown ion_type: {}".format(ion_type))
225 masses = get_mzs(cummass, ion_type, charge)
226 d = {tmp.format(ion_type, i + 1): m for i, m in enumerate(masses)}
227 all_.update(d)
228 """
229 for nl, offset in constants.NEUTRAL_LOSS.items():
230 nl_masses = get_mzs(cummass - offset, ion_type, charge)
231 d = {tmp_nl.format(ion_type, i + 1, nl): m for i, m in enumerate(nl_masses)}
232 all_.update(d)
233 """
234 return collections.OrderedDict(sorted(all_.items(), key=lambda t: t[0]))
237def get_peptide_ions(aa_seq: str) -> Dict[str, float64]:
238 """Gets the theoretical masses of fragment ions
240 Args:
241 aa_seq (str): Aminoacid sequence with modifications
243 Returns:
244 Dict[str, float64]: Keys are ion names and values are the mass
246 Examples:
247 >>> foo = get_peptide_ions("AA")
248 >>> sorted(foo.keys())
249 ['z1b1', 'z1y1', 'z2b1', 'z2y1', 'z3b1', 'z3y1']
250 >>> foo['z1y1'] # ground truth from http://db.systemsbiology.net:8080/proteomicsToolkit/FragIonServlet.html
251 90.054955167
252 >>> foo['z1b1']
253 72.044390467
254 """
255 out = _get_peptide_ions(
256 aa_seq,
257 charges=range(1, constants.MAX_FRAG_CHARGE + 1),
258 ion_types=constants.ION_TYPES,
259 )
260 return out
263def _get_peptide_ions(
264 aa_seq: str,
265 charges: Union[List[int], range] = range(1, 5),
266 ion_types: Union[str, List[str]] = "yb",
267) -> Dict[str, float64]:
268 """
270 Examples
271 ========
272 >>> foo = _get_peptide_ions("AA", [1,2])
273 >>> foo
274 {'z1y1': 90.054955167, ...}
275 """
276 fw, bw = get_forward_backward(aa_seq)
277 out = {}
279 for charge in charges:
280 for ion in ion_types:
281 ion_dict = get_annotation(fw, bw, charge, ion)
282 ion_dict = {"z" + str(charge) + k: v for k, v in ion_dict.items()}
283 out.update(ion_dict)
285 return out
288def get_tolerance(
289 theoretical: float64, tolerance: int = 25, unit: str = "ppm"
290) -> float64:
291 if unit == "ppm":
292 return theoretical * float(tolerance) / 10 ** 6
293 elif unit == "da":
294 return float(tolerance)
295 else:
296 raise ValueError("unit {} not implemented".format(unit))
299def is_in_tolerance(
300 theoretical: float64, observed: float, tolerance: int = 25, unit: str = "ppm"
301) -> bool_:
302 mz_tolerance = get_tolerance(theoretical, tolerance, unit)
303 lower = observed - mz_tolerance
304 upper = observed + mz_tolerance
305 return theoretical >= lower and theoretical <= upper
308def annotate_peaks(theoretical_peaks, mzs, intensities, tolerance=25, unit="ppm"):
309 annots = {}
310 max_delta = tolerance if unit == "da" else max(mzs) * tolerance / 1e6
312 raise DeprecationWarning(
313 "Use `annotate_peaks2`, this version of the"
314 " function is deprecated and will be removed in the future"
315 )
317 for mz, inten in zip(mzs, intensities):
318 matching = {
319 k: inten
320 for k, v in theoretical_peaks.items()
321 if abs(mz - v) <= max_delta and is_in_tolerance(v, mz, tolerance, unit)
322 }
323 annots.update(matching)
325 max_int = max([v for v in annots.values()] + [0])
326 annots = {k: v / max_int for k, v in annots.items()}
327 return annots
330def is_sorted(
331 lst: Union[
332 List[List[Union[int, float]]],
333 List[List[float]],
334 List[List[Union[str, float64]]],
335 List[List[Union[float64, int]]],
336 ],
337 key: Callable = lambda x: x,
338) -> bool:
339 for i, el in enumerate(lst[1:]):
340 if key(el) < key(lst[i]): # i is the index of the previous element
341 return False
342 return True
345def sort_if_needed(
346 lst: Union[
347 List[List[Union[int, float]]],
348 List[List[float]],
349 List[List[Union[str, float64]]],
350 List[List[Union[float64, int]]],
351 ],
352 key: Callable = lambda x: x,
353) -> None:
354 if not is_sorted(lst, key):
355 lst.sort(key=key)
358def annotate_peaks2(
359 theoretical_peaks: Dict[str, float64],
360 mzs: Union[List[float64], List[float]],
361 intensities: Union[List[float], List[int]],
362 tolerance: int = 25,
363 unit: str = "ppm",
364) -> Dict[str, float]:
365 max_delta = tolerance if unit == "da" else max(mzs) * tolerance / 1e6
367 mz_pairs = [[m, i] for m, i in zip(mzs, intensities)]
368 theo_peaks = [[k, v] for k, v in theoretical_peaks.items()]
370 sort_if_needed(mz_pairs, key=lambda x: x[0])
371 sort_if_needed(theo_peaks, key=lambda x: x[1])
373 theo_iter = iter(theo_peaks)
374 curr_theo_key, curr_theo_val = next(theo_iter)
376 annots = defaultdict(lambda: 0)
377 for mz, inten in mz_pairs:
378 deltamass = mz - curr_theo_val
379 try:
380 while deltamass >= max_delta:
381 curr_theo_key, curr_theo_val = next(theo_iter)
382 deltamass = mz - curr_theo_val
383 except StopIteration:
384 pass
386 in_deltam = abs(deltamass) <= max_delta
387 if in_deltam and abs(deltamass) <= get_tolerance(
388 curr_theo_val, tolerance, unit
389 ):
390 annots[curr_theo_key] += inten
391 else:
392 try:
393 while True:
394 curr_theo_key, curr_theo_val = next(theo_iter)
395 deltamass = mz - curr_theo_val
396 if deltamass < -max_delta:
397 break
398 in_deltam = abs(deltamass) <= max_delta
399 if in_deltam and abs(deltamass) <= get_tolerance(
400 curr_theo_val, tolerance, unit
401 ):
402 annots[curr_theo_key] += inten
403 except StopIteration:
404 pass
406 max_int = max([v for v in annots.values()] + [0])
407 annots = {k: v / max_int for k, v in annots.items()}
408 return annots