but all it does is add overhead as it is Thread(1) waiting for Thread(2) to complete. Rather than Thread(1) waiting for fun() to complete without the need for Thread(2) to exist.
If fun() spawns new threads itself, the @sync in Thread(1) will not wait for them with this code. @sync operates lexically, i.e. it only knows about threads spawned inside its text block.
but all it does is add overhead as it is Thread(1) waiting for Thread(2) to complete. Rather than Thread(1) waiting for fun() to complete without the need for Thread(2) to exist.
If fun() spawns new threads itself, the @sync in Thread(1) will not wait for them with this code. @sync operates lexically, i.e. it only knows about threads spawned inside its text block.
That’s why I have nested @syncs
Sorry I think I am misunderstanding something then. I only have one function inside the loop which is the write() so I am unsure how this would result in parallelisation as I am understanding to be @sync waiting for the thread spawned by write() to finish each time which is essentially what happens by default?
Not rude at all! I am new to Julia and learning a lot so am bound to make silly choices !
So this for loop is part of a larger function and was where the vast majority of time was spent by the program because the object being iterated over is huge. So I though if I could parallelise the loop (that being have multiple iterations of it running at once) I could have sped up the process.
I guess another option would be to split the object up and iterate over it using multiple loops using your @sync method so the loops could run at the same time?
What you need to do is work out where the time is spent.
maybe it’s the @subset or maybe the innerjoin - I don’t know
You need to keep an eye on the data dependency but this kind of idea
coverage = CSV.File(cov, delim = "\t")
covDf = DataFrame(coverage)
# add this for scoping inside the blocks
local lowCovDf , highCovDf, passHighCov, passLowCov
@sync begin
Threads.@spawn lowCovDf = @subset(covDf, :2 .> lower_cov, :2 .< upper_cov)
Threads.@spawn highCovDf = @subset(covDf, :2 .> 0.5)
end
@sync begin
Threads.@spawn passHighCov = innerjoin(minDepDf, highCovDf, on = :Contig)
Threads.@spawn passLowCov = innerjoin(lowCovDepDf, lowCovDf, on = :Contig)
end
passContig = vcat([x for x in passHighCov[!, :Contig]], [z for z in passLowCov[!, :Contig]])
The local was to make the scoping rules work when I tested it
I’m not seriously suggesting this, although I have used something similar myself
but to push things to the absolute limit … if you call fetch() on a task mutlitple times, it returns the value, so you can thread your data dependencies creating tasks and using fetch …
passContig = begin
t = Dict()
t[:depth] = Threads.@spawn CSV.File(dep, delim="\t")
t[:depDf] = Threads.@spawn DataFrame(fetch(t[:depth]))
t[:minDepDf] = Threads.@spawn @subset(fetch(t:[depDf]), :2 .> min_dep)
t[:lowCovDepDf] = Threads.@spawn @subset(fetch(t[:depDf]), :2 .> dep_check)
t[:coverage] = Threads.@spawn CSV.File(cov, delim = "\t")
t[:covDf] = Threads.@spawn DataFrame(fetch(t[:coverage]))
t[:lowCovDf] = Threads.@spawn @subset(fetch(t[:covDf]), :2 .> lower_cov, :2 .< upper_cov)
t[:highCovDf] = Threads.@spawn @subset(fetch(t[:covDf]), :2 .> 0.5)
t[:passHighCov] = Threads.@spawn innerjoin(fetch(t[:minDepDf]), fetch(t[:highCovDf]), on = :Contig)
t[:passLowCov] = Threads.@spawn innerjoin(fetch(t[:lowCovDepDf]), fetch(t[:lowCovDf]), on = :Contig)
t[:xs] = Threads.@spawn [x for x in fetch(t[:passHighCov])[!, :Contig]]
t[:zs] = Threads.@spawn [z for z in fetch(t[:passLowCov])[!, :Contig]]
fetch(Threads.@spawn vcat(fetch(t[:xs]), fetch(t[:zs])))
end
# create a channel, with 3 empty slots in the buffer, and spawn it as a new task
wchan = Channel(size=3, spawn=true) do record
# open the writer
open(FASTA.Writer, "/path/"*sample) do writer
# take! a record from the channel, until the record is nothing
# (nothing is sentinel value, just needs to be something record never is)
while (record = take!(wchan)) !== nothing
# write the record as normal
write(writer, record)
end
end
end
# all the wchan stuff is now waiting on another Thread
# broadcast(record->put!(wchan, record), filter(record -> FASTA.identifier(record) in passContig, reader))
# broadcast(fn, collection) # apply fn to everything in collection, it's like map
# filter(fn, collection) # every member of the collection which returns true for fn is returned
# lets unroll all that back into a loop with if
for record in reader
if FASTA.identifier(record) in passContig
# put the record in the channel, we can write 3 to it (the number of empty slots) until it blocks
record->put!(wchan, record)
end
end
# put the sentinel in the channel, so the other Thread stops and closes the file
put!(wchan, nothing)
This is called “Communicating Sequential Processes” in the literature
I found a function what allows me to preallocate the memory for a record and just rewrite it as I was unaware looping over results in each record being read into memory in a different place making lots of time spent cleaning up!
I am now trying:
reader = FASTA.Reader(GzipDecompressorStream(open("some/other/path/megahit_final_assembly.fa.gz")))
writer = open(FASTA.Writer, "some/path/"*sample)
record = FASTA.Record()
while !eof(reader)
read!(reader, record)
if FASTA.identifier(record) in passContig
write(writer, record)
end
end
I will see this this speeds things up! I still think splitting it into multiple different sub loops may be best though. Would that cause issues with all trying to write to the same location though?
So the lookup in this vector could be a serious bottleneck. Maybe building a Set would help? Should be as easy as Set(passContig) instead of passContig. But you should hoist it outside the reader loop:
setPassContig = Set(passContig)
while !eof(reader)
read!(reader, record)
if FASTA.identifier(record) in setPassContig
write(writer, record)
end
end
BTW, did you try to use a Profiler on your problem?