Skip to content

Commit 1495485

Browse files
authored
Merge pull request #327 from maxibor/strip_fastq
Speed up strip_input_fastq process
2 parents d3d923f + d04ab65 commit 1495485

4 files changed

Lines changed: 173 additions & 118 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
3333
* [#266](https://github.com/nf-core/eager/issues/266) - Added sanity checks for input filetypes (i.e. only BAM files can be supplied if `--bam`)
3434
* [#237](https://github.com/nf-core/eager/issues/237) - Fixed and Updated script scrape_software_versions
3535
* [#322](https://github.com/nf-core/eager/pull/322) - Move extract map reads fastq compression to pigz
36+
* [#327](https://github.com/nf-core/eager/pull/327) - Speed up strip_input_fastq process and make it more robust
3637

3738
### `Dependencies`
3839

bin/extract_map_reads.py

Lines changed: 167 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@
33
import argparse
44
import multiprocessing
55
import pysam
6+
from xopen import xopen
67
from functools import partial
7-
import gzip
88
import sys
9+
from Bio.SeqIO.QualityIO import FastqGeneralIterator
10+
from io import StringIO
911

1012

1113
def _get_args():
@@ -57,34 +59,36 @@ def _get_args():
5759
return(bam, in_fwd, in_rev, out_fwd, out_rev, mode, proc)
5860

5961

60-
def extract_mapped_chr(chr, bam):
62+
def extract_mapped_chr(chr):
6163
"""
6264
Get mapped reads per chromosome
63-
INPUT:
64-
- chr(str): chromosome
65-
- bam(str): bamfile path
66-
OUTPUT:
67-
- res(list): list of mapped reads (str) name per chromosome
65+
Args:
66+
chr(str): chromosome
67+
bam(str): bamfile path
68+
Returns:
69+
res(list): list of mapped reads (str) name per chromosome
6870
"""
6971
res = []
70-
bamfile = pysam.AlignmentFile(bam, "rb")
71-
reads = bamfile.fetch(chr, multiple_iterators=True)
72+
reads = BAMFILE.fetch(chr, multiple_iterators=True)
7273
for read in reads:
73-
res.append(read.query_name)
74+
if read.is_unmapped == False:
75+
if read.query_name.startswith("M_"):
76+
read_name = read.query_name.replace(
77+
"M_", "").split()[0].split("/")[0]
78+
else:
79+
read_name = read.query_name.split()[0].split("/")[0]
80+
res.append(read_name)
7481
return(res)
7582

7683

77-
def extract_mapped(bam, processes):
78-
"""
79-
Get mapped reads in parallel
80-
INPUT:
81-
- bam(str): bamfile path
82-
OUTPUT:
83-
- result(list) list of mapped reads name (str)
84+
def extract_mapped(proc):
85+
"""Get mapped reads in parallel
86+
Returns:
87+
bamfile(str): path to bam alignment file
88+
result(list): list of mapped reads name (str)
8489
"""
8590
try:
86-
bamfile = pysam.AlignmentFile(bam, "rb")
87-
chrs = bamfile.references
91+
chrs = BAMFILE.references
8892
except ValueError as e:
8993
print(e)
9094

@@ -93,50 +97,50 @@ def extract_mapped(bam, processes):
9397
return([])
9498

9599
# Checking that nb_process is not > nb_chromosomes
96-
elif len(chrs) < processes:
97-
print(
98-
f"""Requested {processes} processe(s),
99-
but can only be parallelized on {len(chrs)}
100-
processes with these data""")
101-
processes = len(chrs)
102-
103-
extract_mapped_chr_partial = partial(extract_mapped_chr, bam=bam)
104-
p = multiprocessing.Pool(processes)
105-
res = p.map(extract_mapped_chr_partial, chrs)
106-
p.close()
107-
p.join()
108-
result = [i for ares in res for i in ares]
100+
proc = min(proc, len(chrs))
101+
with multiprocessing.Pool(proc) as p:
102+
res = p.map(extract_mapped_chr, chrs)
103+
result = [i for ares in res for i in ares if len(i) > 0]
109104
return(result)
110105

111106

112107
def parse_fq(fq):
113-
"""
114-
Parse a FASTQ file
115-
INPUT:
116-
- fq(str): path to fastq file
117-
OUTPUT:
118-
- fqd(dict): dictionary with read names as keys, seq and quality as values
108+
"""Parse a FASTQ file
109+
Args:
110+
fq(str): path to fastq file
111+
Returns:
112+
fqd(dict): dictionary with read names as keys, seq and quality as values
119113
in a list
120114
"""
121-
122115
def get_fq_reads(allreads):
123-
fqd = {}
124-
myflag = True
125-
for line in allreads:
126-
line = line.decode('utf-8').rstrip()
127-
if myflag == True:
128-
instrument = line.split()[0].split(":")[0]
129-
myflag = False
130-
if line.startswith(instrument):
131-
seqname = line[1:].split()[0]
132-
fqd[seqname] = []
133-
continue
134-
else:
135-
fqd[seqname].append(line)
136-
return(fqd)
116+
read_dict = {}
117+
for title, seq, qual in FastqGeneralIterator(allreads):
118+
# NEED TO ONLY KEEP THE FIRST PART OF THE FASTQ READ NAME FOR CROSS
119+
# REFERENCING WITH BAM FILE: ONLY THIS INFORMATION IS KEPT WHEN
120+
# COLLAPSING READS WITH ADAPTERREMOVAL
121+
122+
# Until fastq format 1.8
123+
# Split after slash
124+
# @HWUSI-EAS100R:6:73:941:1973#0/1
125+
suf_title = ""
126+
title_space = title.split()
127+
if len(title_space) > 1:
128+
title = title_space[0]
129+
suf_title = f" {title_space[1]}"
130+
131+
# From fastq format 1.8
132+
# Split after space
133+
# @EAS139:136:FC706VJ:2:2104:15343:197393 1:Y:18:ATCACG
134+
title_slash = title.split("/")
135+
if len(title_slash) > 1:
136+
title = title_slash[0]
137+
suf_title = f"/{title_slash[1]}"
138+
139+
read_dict[title] = [suf_title, seq, "+", qual]
140+
return(read_dict)
137141

138142
if fq.endswith('.gz'):
139-
with gzip.open(fq, 'rb') as allreads:
143+
with xopen(fq, 'r') as allreads:
140144
fqd = get_fq_reads(allreads)
141145
else:
142146
with open(fq, 'r') as allreads:
@@ -145,83 +149,139 @@ def get_fq_reads(allreads):
145149
return(fqd)
146150

147151

148-
def sort_mapped(fq_dict, mapped_reads):
149-
"""
150-
Sort mapped reads from dictionary of fastq reads
151-
INPUT:
152-
- fq_dict(dict) dictionary with read names as keys, seq and quality as values
153-
in a list
154-
- mapped_reads(list) list of mapped reads
155-
OUTPUT:
156-
- mfqd(dict) dictionary with mapped read names as keys, seq and quality as values
152+
def get_mapped_reads(fq_dict, mapped_reads):
153+
"""Sort mapped reads from dictionary of fastq reads
154+
Args:
155+
fq_dict(dict) dictionary with read names as keys, seq and quality as values
157156
in a list
158-
- fqd(dict) dictionary with unmapped read names as key, unmapped/mapped (u|m),
157+
mapped_reads(list) list of mapped reads
158+
Returns:
159+
fqd(dict) dictionary with read names as key, unmapped/mapped (u|m),
159160
seq and quality as values in a list
160161
"""
162+
163+
def intersection(list1, list2):
164+
return(set(list1).intersection(list2))
165+
166+
def difference(list1, list2):
167+
return(set(list1).difference(list2))
168+
161169
fqd = {}
162-
unmapped = [i for i in list(fq_dict.keys()) if i not in mapped_reads]
163-
mapped = [i for i in list(fq_dict.keys()) if i in mapped_reads]
164-
# print(unmap)
165-
for r in unmapped:
166-
fqd[r] = ['u']+fq_dict[r]
167-
for r in mapped:
168-
fqd[r] = ['m']+fq_dict[r]
170+
all_reads = list(fq_dict.keys())
171+
mapped = intersection(all_reads, mapped_reads)
172+
unmapped = difference(all_reads, mapped_reads)
173+
174+
for rm in mapped:
175+
fqd[rm] = ['m']+fq_dict[rm]
176+
for ru in unmapped:
177+
fqd[ru] = ['u']+fq_dict[ru]
169178

170179
return(fqd)
171180

172181

173-
def write_fq(fq_dict, fname, mode):
174-
"""
175-
Write to fastq file
176-
INPUT:
177-
- fq_dict(dict) dictionary with unmapped read names as keys,
178-
unmapped/mapped (u|m), seq, and quality as values in a list
179-
- fname(string) Path to output fastq file
180-
- mode(string) strip (remove read) or replace (replace read sequence) by Ns
182+
def write_fq(fq_dict, fname, write_mode, strip_mode, proc):
183+
"""Write to fastq file
184+
Args:
185+
fq_dict(dict): fq_dict with unmapped read names as keys,
186+
unmapped/mapped (u|m), seq, and quality as values in a list
187+
fname(string) Path to output fastq file
188+
write_mode (str): 'rb' or 'r'
189+
strip_mode (str): strip (remove read) or replace (replace read sequence) by Ns
190+
proc(int) number of processes
181191
"""
182-
with open(fname, 'w') as f:
183-
for k in list(fq_dict.keys()):
184-
if mode == 'strip':
185-
if fq_dict[k][0] == 'u':
186-
f.write(f"@{k}\n")
187-
for i in fq_dict[k][1:]:
188-
f.write(f"{i}\n")
189-
elif fq_dict[k][0] == 'm':
190-
continue
191-
elif mode == 'replace':
192-
if fq_dict[k][0] == 'u':
193-
f.write(f"@{k}\n")
194-
for i in fq_dict[k][1:]:
195-
f.write(f"{i}\n")
196-
elif fq_dict[k][0] == 'm':
197-
f.write(f"@{k}\n")
198-
f.write(f"{'N'*len(fq_dict[k][1])}\n")
199-
for i in fq_dict[k][2:]:
200-
f.write(f"{i}\n")
192+
fq_dict_keys = list(fq_dict.keys())
193+
if write_mode == 'wb':
194+
with xopen(fname, mode='wb', threads=proc) as fw:
195+
for fq_dict_key in fq_dict_keys:
196+
wstring = ""
197+
if strip_mode == 'strip':
198+
if fq_dict[fq_dict_key][0] == 'u':
199+
wstring += f"@{fq_dict_key+fq_dict[fq_dict_key][1]}\n"
200+
for i in fq_dict[fq_dict_key][2:]:
201+
wstring += f"{i}\n"
202+
elif strip_mode == 'replace':
203+
# if unmapped, write all the read lines
204+
if fq_dict[fq_dict_key][0] == 'u':
205+
wstring += f"@{fq_dict_key+fq_dict[fq_dict_key][1]}\n"
206+
for i in fq_dict[fq_dict_key][2:]:
207+
wstring += f"{i}\n"
208+
# if mapped, write all the read lines, but replace sequence
209+
# by N*(len(sequence))
210+
elif fq_dict[fq_dict_key][0] == 'm':
211+
wstring += f"@{fq_dict_key+fq_dict[fq_dict_key][1]}\n"
212+
wstring += f"{'N'*len(fq_dict[fq_dict_key][2])}\n"
213+
for i in fq_dict[fq_dict_key][3:]:
214+
wstring += f"{i}\n"
215+
fw.write(wstring.encode())
216+
else:
217+
with open(fname, 'w') as fw:
218+
for fq_dict_key in fq_dict_keys:
219+
wstring = ""
220+
if strip_mode == 'strip':
221+
if fq_dict[fq_dict_key][0] == 'u':
222+
wstring += f"@{fq_dict_key+fq_dict[fq_dict_key][1]}\n"
223+
for i in fq_dict[fq_dict_key][2:]:
224+
wstring += f"{i}\n"
225+
elif strip_mode == 'replace':
226+
# if unmapped, write all the read lines
227+
if fq_dict[fq_dict_key][0] == 'u':
228+
wstring += f"@{fq_dict_key+fq_dict[fq_dict_key][1]}\n"
229+
for i in fq_dict[fq_dict_key][2:]:
230+
wstring += f"{i}\n"
231+
# if mapped, write all the read lines, but replace sequence
232+
# by N*(len(sequence))
233+
elif fq_dict[fq_dict_key][0] == 'm':
234+
wstring += f"@{fq_dict_key+fq_dict[fq_dict_key][1]}\n"
235+
wstring += f"{'N'*len(fq_dict[fq_dict_key][2])}\n"
236+
for i in fq_dict[fq_dict_key][3:]:
237+
wstring += f"{i}\n"
238+
fw.write(wstring)
201239

202240

203241
def check_strip_mode(mode):
204242
if mode.lower() not in ['replace', 'strip']:
205243
print(f"Mode must be {' or '.join(mode)}")
244+
return(mode.lower())
206245

207246

208247
if __name__ == "__main__":
209248
BAM, IN_FWD, IN_REV, OUT_FWD, OUT_REV, MODE, PROC = _get_args()
210249

211250
if OUT_FWD == None:
212-
out_fwd = f"{IN_FWD.split('/')[-1].split('.')[0]}.r1.fq"
251+
out_fwd = f"{IN_FWD.split('/')[-1].split('.')[0]}.r1.fq.gz"
213252
else:
214253
out_fwd = OUT_FWD
215254

216-
mapped_reads = extract_mapped(BAM, PROC)
217-
fwd_dict = parse_fq(IN_FWD)
218-
fwd_reads = sort_mapped(fwd_dict, mapped_reads)
219-
write_fq(fwd_reads, out_fwd, MODE)
255+
if out_fwd.endswith(".gz"):
256+
write_mode = "wb"
257+
else:
258+
write_mode = "w"
259+
260+
strip_mode = check_strip_mode(MODE)
261+
BAMFILE = pysam.AlignmentFile(BAM, 'r')
262+
263+
# FORWARD OR SE FILE
264+
print(f"- Extracting mapped reads from {BAM}")
265+
mapped_reads = extract_mapped(proc=PROC)
266+
print(f"- Parsing forward fq file {IN_FWD}")
267+
fqd_fwd = parse_fq(IN_FWD)
268+
print("- Cross referencing mapped reads in forward fq")
269+
fq_dict_fwd = get_mapped_reads(fqd_fwd, mapped_reads)
270+
# print(fq_dict_fwd)
271+
print(f"- Writing forward fq to {out_fwd}")
272+
write_fq(fq_dict=fq_dict_fwd, fname=out_fwd,
273+
write_mode=write_mode, strip_mode=strip_mode, proc=PROC)
274+
275+
# REVERSE FILE
220276
if IN_REV:
221277
if OUT_REV == None:
222-
out_rev = f"{IN_REV.split('/')[-1].split('.')[0]}.r2.fq"
278+
out_rev = f"{IN_REV.split('/')[-1].split('.')[0]}.r2.fq.gz"
223279
else:
224280
out_rev = OUT_REV
225-
rev_dict = parse_fq(IN_REV)
226-
rev_reads = sort_mapped(rev_dict, mapped_reads)
227-
write_fq(rev_reads, out_rev, MODE)
281+
print(f"- Parsing reverse fq file {IN_REV}")
282+
fqd_rev = parse_fq(IN_REV)
283+
print("- Cross referencing mapped reads in reverse fq")
284+
fq_dict_rev = get_mapped_reads(fqd_rev, mapped_reads)
285+
print(f"- Writing reverse fq to {out_rev}")
286+
write_fq(fq_dict=fq_dict_rev, fname=out_rev,
287+
write_mode=write_mode, strip_mode=strip_mode, proc=PROC)

conf/base.config

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,7 @@ process {
6767
memory = { check_max( 2.GB, 'memory' ) }
6868
cache = false
6969
}
70-
71-
withName:strip_input_fastq {
72-
time = { check_max( 4.h * task.attempt, 'time' ) }
73-
}
70+
7471

7572
withName:qualimap{
7673
errorStrategy = 'ignore'

main.nf

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1298,7 +1298,7 @@ if (params.run_bam_filtering) {
12981298
process strip_input_fastq {
12991299
label 'mc_medium'
13001300
tag "${bam.baseName}"
1301-
publishDir "${params.outdir}/samtools/stripped_fastq", mode: 'copy'
1301+
publishDir "${params.outdir}/stripped_fastq", mode: 'copy'
13021302

13031303
when:
13041304
params.strip_input_fastq
@@ -1313,20 +1313,17 @@ process strip_input_fastq {
13131313

13141314
script:
13151315
if (params.singleEnd) {
1316-
out_fwd = bam.baseName+'.stripped.fq'
1316+
out_fwd = bam.baseName+'.stripped.fq.gz'
13171317
"""
13181318
samtools index $bam
13191319
extract_map_reads.py $bam ${fq[0]} -m ${params.strip_mode} -of $out_fwd -p ${task.cpus}
1320-
pigz -p ${task.cpus} $out_fwd
13211320
"""
13221321
} else {
1323-
out_fwd = bam.baseName+'.stripped.fwd.fq'
1324-
out_rev = bam.baseName+'.stripped.rev.fq'
1322+
out_fwd = bam.baseName+'.stripped.fwd.fq.gz'
1323+
out_rev = bam.baseName+'.stripped.rev.fq.gz'
13251324
"""
13261325
samtools index $bam
13271326
extract_map_reads.py $bam ${fq[0]} -rev ${fq[1]} -m ${params.strip_mode} -of $out_fwd -or $out_rev -p ${task.cpus}
1328-
pigz -p ${task.cpus} $out_fwd
1329-
pigz -p ${task.cpus} $out_rev
13301327
"""
13311328
}
13321329

0 commit comments

Comments
 (0)