Skip to content

Commit 4e93c55

Browse files
authored
Merge pull request #902 from nf-core/endorspy-resume-fix
Endorspy resume fix
2 parents f1e6be9 + 6779efc commit 4e93c55

2 files changed

Lines changed: 51 additions & 27 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
1111

1212
- [#882](https://github.com/nf-core/eager/pull/882) Define DSL1 execution explicitly, as new versions Nextflow made DSL2 default (♥ to & fix from @Lehmann-Fabian)
1313
- [#879](https://github.com/nf-core/eager/issues/879) Add missing threads parameter for pre-clipping FastQC for single end data that caused insufficient memory in some cases (♥ to @marcel-keller for reporting)
14+
- [#880](https://github.com/nf-core/eager/issues/880) Fix failure of endorSpy to be cached or reexecuted on resume (♥ to @KathrinNaegele, @TCLamnidis, & @mahesh-panchal for reporting and debugging)
1415
- [#885](https://github.com/nf-core/eager/issues/885) Specify task memory for all tools in get_software_versions to account for incompatibilty of java with some SGE clusters causing hanging of the process (♥ to @maxibor for reporting)
1516
- [#887](https://github.com/nf-core/eager/issues/887) Clarify what is considered 'ultra-short' reads in the help text of clip_readlength, for when you may wish to turn of length filtering during AdapterRemoval (♥ to @TCLamnidis for reporting)
1617
- [#889](https://github.com/nf-core/eager/issues/889) Remove/updated parameters from benchmarking test profiles (♥ to @TCLamnidis for reporting)
@@ -19,6 +20,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
1920
- [#899](https://github.com/nf-core/eager/issues/897) Fix pipeline crash for circulargenerator if reference file does not end in .fasta (♥ to @scarlhoff for reporting)
2021
- Fixed some missing default values in the nextflow parameter schema JSON
2122
- [#789](https://github.com/nf-core/eager/issues/789) Substantial speed and memory optimisation of the `extract_map_reads.py` script (♥ to @ivelsko for reporting, @maxibor for optimisation)
23+
- Fix staging of input bams for genotyping_pileupcaller process. Downstream changes from changes introduced when fixing endorspy caching.
2224

2325
### `Dependencies`
2426

main.nf

Lines changed: 49 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -769,7 +769,7 @@ ch_input_for_fastp.fourcol
769769
[ samplename, libraryid, lane, seqtype, organism, strandedness, udg, r1, r2 ]
770770

771771
}
772-
.set { ch_skipfastp_for_merge }
772+
.set { ch_skipfastp_for_merge }
773773

774774
ch_output_from_fastp
775775
.map{
@@ -800,7 +800,7 @@ process adapter_removal {
800800

801801
input:
802802
tuple samplename, libraryid, lane, seqtype, organism, strandedness, udg, file(r1), file(r2) from ch_fastp_for_adapterremoval
803-
path adapterlist from ch_adapterlist.collect().dump(tag: "Adapter list")
803+
path adapterlist from ch_adapterlist.collect()
804804

805805
output:
806806
tuple samplename, libraryid, lane, seqtype, organism, strandedness, udg, path("output/*{combined.fq,.se.truncated,pair1.truncated}.gz") into ch_output_from_adapterremoval_r1
@@ -968,13 +968,10 @@ if ( params.skip_collapse ){
968968
// AdapterRemoval bypass when not running it
969969
if (!params.skip_adapterremoval) {
970970
ch_output_from_adapterremoval.mix(ch_fastp_for_skipadapterremoval)
971-
.dump(tag: "post_ar_adapterremoval_decision_skipar")
972971
.filter { it =~/.*combined.fq.gz|.*truncated.gz/ }
973-
.dump(tag: "ar_bypass")
974972
.into { ch_adapterremoval_for_post_ar_trimming; ch_adapterremoval_for_skip_post_ar_trimming; }
975973
} else {
976974
ch_fastp_for_skipadapterremoval
977-
.dump(tag: "post_ar_adapterremoval_decision_withar")
978975
.into { ch_adapterremoval_for_post_ar_trimming; ch_adapterremoval_for_skip_post_ar_trimming; }
979976
}
980977

@@ -1076,7 +1073,6 @@ ch_branched_for_lanemerge = ch_inlinebarcoderemoval_for_lanemerge
10761073
[ samplename, libraryid, lane, seqtype, organism, strandedness, udg, r1, r2 ]
10771074

10781075
}
1079-
.dump(tag: "lanemerge_bypass_decision")
10801076
.branch {
10811077
skip_merge: it[7].size() == 1 // Can skip merging if only single lanes
10821078
merge_me: it[7].size() > 1
@@ -1097,7 +1093,6 @@ ch_branched_for_lanemerge_skipme = ch_branched_for_lanemerge.skip_merge
10971093

10981094
[ samplename, libraryid, lane, seqtype, organism, strandedness, udg, r1, r2 ]
10991095
}
1100-
.dump(tag: "lanemerge_reconfigure")
11011096

11021097

11031098
ch_branched_for_lanemerge_ready = ch_branched_for_lanemerge.merge_me
@@ -1125,15 +1120,15 @@ process lanemerge {
11251120
publishDir "${params.outdir}/lanemerging", mode: params.publish_dir_mode
11261121

11271122
input:
1128-
tuple samplename, libraryid, lane, seqtype, organism, strandedness, udg, path(r1), path(r2) from ch_branched_for_lanemerge_ready.dump(tag: "lange_merge_input")
1123+
tuple samplename, libraryid, lane, seqtype, organism, strandedness, udg, path(r1), path(r2) from ch_branched_for_lanemerge_ready
11291124

11301125
output:
11311126
tuple samplename, libraryid, lane, seqtype, organism, strandedness, udg, path("*_R1_lanemerged.fq.gz") into ch_lanemerge_for_mapping_r1
11321127
tuple samplename, libraryid, lane, seqtype, organism, strandedness, udg, path("*_R2_lanemerged.fq.gz") optional true into ch_lanemerge_for_mapping_r2
11331128

11341129
script:
11351130
if ( seqtype == 'PE' && ( params.skip_collapse || params.skip_adapterremoval ) ){
1136-
lane = 0
1131+
def lane = 0
11371132
"""
11381133
cat ${r1} > "${libraryid}"_R1_lanemerged.fq.gz
11391134
cat ${r2} > "${libraryid}"_R2_lanemerged.fq.gz
@@ -1149,7 +1144,6 @@ process lanemerge {
11491144
// Ensuring always valid R2 file even if doesn't exist for AWS
11501145
if ( ( params.skip_collapse || params.skip_adapterremoval ) ) {
11511146
ch_lanemerge_for_mapping_r1
1152-
.dump(tag: "post_lanemerge_reconfigure")
11531147
.mix(ch_lanemerge_for_mapping_r2)
11541148
.groupTuple(by: [0,1,2,3,4,5,6])
11551149
.map{
@@ -1264,8 +1258,8 @@ process bwa {
12641258
publishDir "${params.outdir}/mapping/bwa", mode: params.publish_dir_mode
12651259

12661260
input:
1267-
tuple samplename, libraryid, lane, seqtype, organism, strandedness, udg, path(r1), path(r2) from ch_lanemerge_for_bwa.dump(tag: "bwa_input_reads")
1268-
path index from bwa_index.collect().dump(tag: "input_index")
1261+
tuple samplename, libraryid, lane, seqtype, organism, strandedness, udg, path(r1), path(r2) from ch_lanemerge_for_bwa
1262+
path index from bwa_index.collect()
12691263

12701264
output:
12711265
tuple samplename, libraryid, lane, seqtype, organism, strandedness, udg, path("*.mapped.bam"), path("*.{bai,csi}") into ch_output_from_bwa
@@ -1564,17 +1558,21 @@ ch_branched_for_seqtypemerge = ch_mapping_for_seqtype_merging
15641558
it ->
15651559
def samplename = it[0]
15661560
def libraryid = it[1]
1567-
def lane = it[2]
1561+
def lane = 0
15681562
def seqtype = it[3].unique() // How to deal with this?
15691563
def organism = it[4]
15701564
def strandedness = it[5]
15711565
def udg = it[6]
15721566
def r1 = it[7]
15731567
def r2 = it[8]
15741568

1575-
// We will assume if mixing it is better to set as PE as this is informative
1569+
// 1. We will assume if mixing it is better to set as PE as this is informative
15761570
// for DeDup (and markduplicates doesn't care), but will throw a warning!
1577-
def seqtype_new = seqtype.flatten().size() > 1 ? 'PE' : seqtype
1571+
// 2. We will also flatten to a single value to address problems with 'unstable'
1572+
// Nextflow ArrayBag object types not allowing the .join to work between resumes
1573+
// See: https://github.com/nf-core/eager/issues/880
1574+
1575+
def seqtype_new = seqtype.flatten().size() > 1 ? 'PE' : seqtype.flatten()[0]
15781576

15791577
if ( seqtype.flatten().size() > 1 && params.dedupper == 'dedup' ) {
15801578
log.warn "[nf-core/eager] Warning: you are running DeDup on BAMs with a mixture of PE/SE data for library: ${libraryid}. DeDup is designed for PE data only, deduplication maybe suboptimal!"
@@ -1583,7 +1581,6 @@ ch_branched_for_seqtypemerge = ch_mapping_for_seqtype_merging
15831581
[ samplename, libraryid, lane, seqtype_new, organism, strandedness, udg, r1, r2 ]
15841582

15851583
}
1586-
.dump(tag: "pre_seqtype_decision")
15871584
.branch {
15881585
skip_merge: it[7].size() == 1 // Can skip merging if only single lanes
15891586
merge_me: it[7].size() > 1
@@ -1791,11 +1788,12 @@ if (params.run_bam_filtering) {
17911788
def seqtype = it[3]
17921789
def organism = it[4]
17931790
def strandedness = it[5]
1794-
def udg = it[6]
1791+
def udg = it[6]
17951792
def stats = file(it[7])
17961793
def poststats = file("$projectDir/assets/nf-core_eager_dummy.txt")
17971794

1798-
[samplename, libraryid, lane, seqtype, organism, strandedness, udg, stats, poststats ] }
1795+
[samplename, libraryid, lane, seqtype, organism, strandedness, udg, stats, poststats ]
1796+
}
17991797
.set{ ch_allflagstats_for_endorspy }
18001798
}
18011799

@@ -1956,7 +1954,6 @@ ch_input_for_librarymerging.merge_me
19561954

19571955
[it[0], libraryid, it[2], seqtype, it[4], it[5], it[6], bam, bai ]
19581956
}
1959-
.dump(tag: "input_for_lib_merging")
19601957
.set { ch_fixedinput_for_librarymerging }
19611958

19621959
process library_merge {
@@ -1965,7 +1962,7 @@ process library_merge {
19651962
publishDir "${params.outdir}/merged_bams/initial", mode: params.publish_dir_mode
19661963

19671964
input:
1968-
tuple samplename, libraryid, lane, seqtype, organism, strandedness, udg, file(bam), file(bai) from ch_fixedinput_for_librarymerging.dump(tag: "library_merge_input")
1965+
tuple samplename, libraryid, lane, seqtype, organism, strandedness, udg, file(bam), file(bai) from ch_fixedinput_for_librarymerging
19691966

19701967
output:
19711968
tuple samplename, val("${samplename}_libmerged"), lane, seqtype, organism, strandedness, udg, path("*_libmerged_rmdup.bam"), path("*_libmerged_rmdup.bam.{bai,csi}") into ch_output_from_librarymerging
@@ -2233,7 +2230,7 @@ process bam_trim {
22332230
tuple samplename, libraryid, lane, seqtype, organism, strandedness, udg, path(bam), path(bai) from ch_bamutils_decision.totrim
22342231

22352232
output:
2236-
tuple samplename, libraryid, lane, seqtype, organism, strandedness, udg, file("*.trimmed.bam"), file("*.trimmed.bam.{bai,csi}") into ch_trimmed_from_bamutils
2233+
tuple samplename, libraryid, lane, seqtype, organism, strandedness, udg, path("*.trimmed.bam"), path("*.trimmed.bam.{bai,csi}") into ch_trimmed_from_bamutils
22372234

22382235
script:
22392236
def softclip = params.bamutils_softclip ? '-c' : ''
@@ -2265,7 +2262,7 @@ ch_trimmed_formerge = ch_bamutils_decision.notrim
22652262
def seqtype = it[3]
22662263
def organism = it[4]
22672264
def strandedness = it[5]
2268-
def udg = it[6]
2265+
def udg = it[6]
22692266
def bam = it[7].flatten()
22702267
def bai = it[8].flatten()
22712268

@@ -2491,10 +2488,36 @@ ch_damagemanipulation_for_genotyping_pileupcaller
24912488
// Create pileupcaller input tuples
24922489
ch_input_for_genotyping_pileupcaller.singleStranded
24932490
.groupTuple(by:[5])
2491+
.map{
2492+
def samplename = it[0]
2493+
def libraryid = it[1]
2494+
def lane = it[2]
2495+
def seqtype = it[3]
2496+
def organism = it[4]
2497+
def strandedness = it[5]
2498+
def udg = it[6]
2499+
def bam = it[7].flatten()
2500+
def bai = it[8].flatten()
2501+
2502+
[samplename, libraryid, lane, seqtype, organism, strandedness, udg, bam, bai ]
2503+
}
24942504
.set {ch_prepped_for_pileupcaller_single}
24952505

24962506
ch_input_for_genotyping_pileupcaller.doubleStranded
24972507
.groupTuple(by:[5])
2508+
.map{
2509+
def samplename = it[0]
2510+
def libraryid = it[1]
2511+
def lane = it[2]
2512+
def seqtype = it[3]
2513+
def organism = it[4]
2514+
def strandedness = it[5]
2515+
def udg = it[6]
2516+
def bam = it[7].flatten()
2517+
def bai = it[8].flatten()
2518+
2519+
[samplename, libraryid, lane, seqtype, organism, strandedness, udg, bam, bai ]
2520+
}
24982521
.set {ch_prepped_for_pileupcaller_double}
24992522

25002523
process genotyping_pileupcaller {
@@ -2506,12 +2529,12 @@ process genotyping_pileupcaller {
25062529
params.run_genotyping && params.genotyping_tool == 'pileupcaller'
25072530

25082531
input:
2509-
tuple samplename, libraryid, lane, seqtype, organism, strandedness, udg, bam, bai from ch_prepped_for_pileupcaller_double.mix(ch_prepped_for_pileupcaller_single)
2532+
tuple samplename, libraryid, lane, seqtype, organism, strandedness, udg, path(bam), path(bai) from ch_prepped_for_pileupcaller_double.mix(ch_prepped_for_pileupcaller_single)
25102533
file fasta from ch_fasta_for_genotyping_pileupcaller.collect()
25112534
file fai from ch_fai_for_pileupcaller.collect()
25122535
file dict from ch_dict_for_pileupcaller.collect()
25132536
path(bed) from ch_bed_for_pileupcaller.collect()
2514-
path(snp) from ch_snp_for_pileupcaller.collect().dump(tag: "pileupcaller_snp_file")
2537+
path(snp) from ch_snp_for_pileupcaller.collect()
25152538

25162539
output:
25172540
tuple samplename, libraryid, lane, seqtype, organism, strandedness, udg, path("pileupcaller.${strandedness}.*") into ch_for_eigenstrat_snp_coverage
@@ -2542,7 +2565,7 @@ process eigenstrat_snp_coverage {
25422565
params.run_genotyping && params.genotyping_tool == 'pileupcaller'
25432566

25442567
input:
2545-
tuple samplename, libraryid, lane, seqtype, organism, strandedness, udg, path("*") from ch_for_eigenstrat_snp_coverage.dump(tag:'eigenstrat_input')
2568+
tuple samplename, libraryid, lane, seqtype, organism, strandedness, udg, path("*") from ch_for_eigenstrat_snp_coverage
25462569

25472570
output:
25482571
tuple samplename, libraryid, lane, seqtype, organism, strandedness, udg, path("*.json") into ch_eigenstrat_snp_cov_for_multiqc
@@ -2673,7 +2696,7 @@ process vcf2genome {
26732696
if (!params.additional_vcf_files) {
26742697
ch_vcfs_for_multivcfanalyzer = ch_ug_for_multivcfanalyzer.map{ it[-1] }.collect()
26752698
} else {
2676-
ch_vcfs_for_multivcfanalyzer = ch_ug_for_multivcfanalyzer.map{ it[-1] }.mix(ch_extravcfs_for_multivcfanalyzer).collect().dump(tag: "postmix")
2699+
ch_vcfs_for_multivcfanalyzer = ch_ug_for_multivcfanalyzer.map{ it[-1] }.mix(ch_extravcfs_for_multivcfanalyzer).collect()
26772700
}
26782701

26792702
process multivcfanalyzer {
@@ -3342,7 +3365,6 @@ workflow.onError {
33423365
def extract_data(tsvFile) {
33433366
Channel.fromPath(tsvFile)
33443367
.splitCsv(header: true, sep: '\t')
3345-
.dump(tag:'tsv_extract')
33463368
.map { row ->
33473369

33483370
def expected_keys = ['Sample_Name', 'Library_ID', 'Lane', 'Colour_Chemistry', 'SeqType', 'Organism', 'Strandedness', 'UDG_Treatment', 'R1', 'R2', 'BAM']

0 commit comments

Comments
 (0)