Multi-threading or multi-processing, how to know which to use and when?

This would work yes

@sync begin
   Threads.@spawn fun()
end

but all it does is add overhead as it is Thread(1) waiting for Thread(2) to complete. Rather than Thread(1) waiting for fun() to complete without the need for Thread(2) to exist.

If fun() spawns new threads itself, the @sync in Thread(1) will not wait for them with this code. @sync operates lexically, i.e. it only knows about threads spawned inside its text block.

That’s why I have nested @syncs

but all it does is add overhead as it is Thread(1) waiting for Thread(2) to complete. Rather than Thread(1) waiting for fun() to complete without the need for Thread(2) to exist.

If fun() spawns new threads itself, the @sync in Thread(1) will not wait for them with this code. @sync operates lexically, i.e. it only knows about threads spawned inside its text block.

That’s why I have nested @syncs

Sorry I think I am misunderstanding something then. I only have one function inside the loop which is the write() so I am unsure how this would result in parallelisation as I am understanding to be @sync waiting for the thread spawned by write() to finish each time which is essentially what happens by default?

You’re right. I’m not trying to parallelise your code. I’m trying to help you understand Julia Threading :slight_smile:

Without being rude, how did you imagine using threads would help with what is essentially a serial process ?

The best I could manage was the Channel version, with one thread reading and one thread writing.

I imagine your problem is I/O bound so all you’re doing is adding more things waiting for the disk.

Ahh okay, thank makes more sense now then :slight_smile:

Not rude at all! I am new to Julia and learning a lot so am bound to make silly choices !

So this for loop is part of a larger function and was where the vast majority of time was spent by the program because the object being iterated over is huge. So I though if I could parallelise the loop (that being have multiple iterations of it running at once) I could have sped up the process.

I guess another option would be to split the object up and iterate over it using multiple loops using your @sync method so the loops could run at the same time?

What you need to do is work out where the time is spent.

maybe it’s the @subset or maybe the innerjoin - I don’t know :slight_smile:

You need to keep an eye on the data dependency but this kind of idea

    coverage = CSV.File(cov, delim = "\t")
    covDf = DataFrame(coverage)

    # add this for scoping inside the blocks
    local lowCovDf , highCovDf, passHighCov, passLowCov 

    @sync begin
         Threads.@spawn lowCovDf =  @subset(covDf, :2 .> lower_cov, :2 .< upper_cov)
         Threads.@spawn highCovDf = @subset(covDf, :2 .> 0.5)
    end 
    @sync begin 
         Threads.@spawn passHighCov = innerjoin(minDepDf, highCovDf, on = :Contig)
         Threads.@spawn passLowCov = innerjoin(lowCovDepDf, lowCovDf, on = :Contig)
    end
    passContig = vcat([x for x in passHighCov[!, :Contig]], [z for z in passLowCov[!, :Contig]])

This is that I have started to do :slight_smile: But it is unfortunately the for loop in which most of the time is spent.

# add this for scoping inside the blocks
    local lowCovDf , highCovDf, passHighCov, passLowCov 

Are variable passed to a function no global within that function?

Would you mind going through that this does a bit more please? I am not fully understanding it.

The local was to make the scoping rules work when I tested it

I’m not seriously suggesting this, although I have used something similar myself

but to push things to the absolute limit … if you call fetch() on a task mutlitple times, it returns the value, so you can thread your data dependencies creating tasks and using fetch …

passContig = begin
    t = Dict()
    t[:depth] = Threads.@spawn CSV.File(dep, delim="\t")
    t[:depDf] = Threads.@spawn DataFrame(fetch(t[:depth]))
    t[:minDepDf] = Threads.@spawn @subset(fetch(t:[depDf]), :2 .> min_dep)
    t[:lowCovDepDf] = Threads.@spawn @subset(fetch(t[:depDf]), :2 .> dep_check)
    t[:coverage] = Threads.@spawn CSV.File(cov, delim = "\t")
    t[:covDf] = Threads.@spawn DataFrame(fetch(t[:coverage]))
    t[:lowCovDf] = Threads.@spawn @subset(fetch(t[:covDf]), :2 .> lower_cov, :2 .< upper_cov)
    t[:highCovDf] = Threads.@spawn @subset(fetch(t[:covDf]), :2 .> 0.5)
    t[:passHighCov] = Threads.@spawn innerjoin(fetch(t[:minDepDf]), fetch(t[:highCovDf]), on = :Contig)
    t[:passLowCov] = Threads.@spawn innerjoin(fetch(t[:lowCovDepDf]), fetch(t[:lowCovDf]), on = :Contig)
    t[:xs] = Threads.@spawn [x for x in fetch(t[:passHighCov])[!, :Contig]]
    t[:zs] = Threads.@spawn [z for z in fetch(t[:passLowCov])[!, :Contig]]
    fetch(Threads.@spawn vcat(fetch(t[:xs]), fetch(t[:zs])))
end
    # create a channel, with 3 empty slots in the buffer, and spawn it as a new task
    wchan = Channel(size=3, spawn=true) do record
        # open the writer
        open(FASTA.Writer, "/path/"*sample) do writer
            # take! a record from the channel, until the record is nothing
            # (nothing is sentinel value, just needs to be something record never is)
            while (record = take!(wchan)) !== nothing
                # write the record as normal
                write(writer, record)
            end
        end
    end

    # all the wchan stuff is now waiting on another Thread

    # broadcast(record->put!(wchan, record), filter(record -> FASTA.identifier(record) in passContig, reader))
    # broadcast(fn, collection) # apply fn to everything in collection, it's like map
    # filter(fn, collection) # every member of the collection which returns true for fn is returned
    # lets unroll all that back into a loop with if

    for record in reader
        if FASTA.identifier(record) in passContig
            # put the record in the channel, we can write 3 to it (the number of empty slots) until it blocks
            record->put!(wchan, record)
        end
    end
    # put the sentinel in the channel, so the other Thread stops and closes the file
    put!(wchan, nothing)

This is called “Communicating Sequential Processes” in the literature

http://www.usingcsp.com/

I found a function what allows me to preallocate the memory for a record and just rewrite it as I was unaware looping over results in each record being read into memory in a different place making lots of time spent cleaning up!

I am now trying:


    reader = FASTA.Reader(GzipDecompressorStream(open("some/other/path/megahit_final_assembly.fa.gz")))

    writer = open(FASTA.Writer, "some/path/"*sample)

    record = FASTA.Record()

    while !eof(reader)

        read!(reader, record)

        if FASTA.identifier(record) in passContig

            write(writer, record)

        end

    end

I will see this this speeds things up! I still think splitting it into multiple different sub loops may be best though. Would that cause issues with all trying to write to the same location though?

What type has passContig here?

it is a Vector{String15} around 1-2 million long depending on the sample

So the lookup in this vector could be a serious bottleneck. Maybe building a Set would help? Should be as easy as Set(passContig) instead of passContig. But you should hoist it outside the reader loop:

setPassContig = Set(passContig)
while !eof(reader)
    read!(reader, record)
    if FASTA.identifier(record) in setPassContig
        write(writer, record)
    end
end

BTW, did you try to use a Profiler on your problem?

1 Like