Multi-threading or multi-processing, how to know which to use and when?

I am wanting to run a simple for loop that is writing certain lines from a very large file to a new file based on a if check of pre computed criteria.

reader = FASTA.Reader(GzipDecompressorStream(open("/home/people/robmur/ku_00014/data/termite_metagenome/pre-post_fungus/sequences/final_assemblies/megahit_final_assembly_500bp_filterd.fasta.gz")))

    writer = open(FASTA.Writer, "/home/people/robmur/ku_00014/data/termite_metagenome/pre-post_fungus/sequences/final_assemblies/500bp_filter_samples/"*sample)

    println("writing to file")

    @threads for record in reader

        if FASTA.identifier(record) in passContig

            write(writer, record)

        end

    end

    close(reader)

I settled on using @threads however i am new the julia so I am entirely unsure if Distributed would be more optimal here.

My understanding of the difference is that distributed computing would run multiple processes on different cores whilst @threads will split the task up into subtasks and run on the same (or different cores)?

Does this then mean Distributed will run multiple iterations of the for loop at once?

Distributed will also run the processes on entirely different computers.

So using that notion in your mental model should help guide you in which to choose. It requires arranging for each to have access to the data it requires.

2 Likes

Generally speaking, multithreading is more light-weigt than multiprocessing. It’s cheaper to start a thread than a process and as a user you don’t have to get into the @everywhere business. So if you’re not concerned about distributed computing involving multiple machines, multithreading should likely be your first choice. However, what I like about Distributed is that processes are “very much separate” conceptually and actually. For this reason, one typically doesn’t run into race conditions that easily (they can still exist of course).

Unless you explicitly pin threads or processes, you shouldn’t make any assumptions about on which core a certain thread or process will run. The OS can move threads and process around. (In fact, on macOS you can’t even pin at all!)

Well, both multithreading as well as multiprocessing give you parallelism. I guess I don’t understand the question?

5 Likes

So it is my understanding that distributed is best for computing clusters normally? However I don’t wish my code to be only optimised for used on a HPC.

Well, given that a HPC cluster is a system with hundreds or thousands of separate nodes, distributed computing is a must there. Multithreading is limited to a single machine / node only. This is why people typically combine multithreading (e.g. OpenMP within a single node) with distributed (e.g. MPI for inter-node communication) techniques.

In Julia, we don’t have OpenMP but built-in (task-based) multithreading through @threads, Threads.@spawn. For MPI, we have MPI.jl but also the built-in “replacement” Distributed.

2 Likes

Well, both multithreading as well as multiprocessing give you parallelism. I guess I don’t understand the question?

Ah so I can use multi threading for parallelisation of a for loop. Is the default or something you have to ask it to do?

As a tangent to that, is it better to finish one iteration quicker or to have multiple iterations running at once?

For this reason, one typically doesn’t run into race conditions that easily (they can still exist of course).

What are race conditions?

Unless you explicitly pin threads or processes, you shouldn’t make any assumptions about on which core a certain thread or process will run.

I have no desire to specify threads or cores. I am however wanting to have the @threads dynamically increase with the number available. Such as on a HPC when I would specify 20 cores for a job, I want to macro to use all 20. Is there a way to do this?

This is why people typically combine multithreading (e.g. OpenMP within a single node) with distributed (e.g. MPI for inter-node communication) techniques.

Ohh that is cool! How would you combine those for this simple loop posted above? (sorry I am very new to Julia). Below I will also post the full function I am writing:

function resample(;cov::String, assembly::String, dep::String, upper_cov::Number=0.5, lower_cov::Number=0.1, min_dep::Int=5, dep_check::Int=10)

    0 <= lower-cov < upper_cov <= 1 || throw(ArgumentError("<upper_cov> must be between 0 and 1 and larger than <lower_cov>"))

    dep_check > min_dep || throw(ArgumentError("<dep_check> must be larger than <min_dep>"))

    splitext(assembly)[2] == ".gz" ||  splitext(assembly)[2] == ".fasta" || println("it is neither")

    # Getting the sample name from the input coverage file

    sample = splitext(basename(cov))[2]*".fasta"

    println("Reading depth")

    depth = CSV.File(dep, delim="\t")

    depDf = DataFrame(depth)

    minDepDf = @subset(depDf, :2 .> min_dep)

    lowCovDepDf = @subset(depDf, :2 .> dep_check)

    println("Reading Cov")

    coverage = CSV.File(cov, delim = "\t")

    covDf = DataFrame(coverage)

    lowCovDf = @subset(covDf, :2 .> lower_cov, :2 .< upper_cov)

    highCovDf = @subset(covDf, :2 .> 0.5)

    println("generating pass lists")

    passHighCov = innerjoin(minDepDf, highCovDf, on = :Contig)

    passLowCov = innerjoin(lowCovDepDf,lowCovDf, on = :Contig)

    println("recovering pass contigs")

    passContig = vcat([x for x in passHighCov[!, :Contig]], [z for z in passLowCov[!, :Contig]])

    if assembly[findlast(isequal('.'), assembly):end] == ".gz"

        reader = FASTA.Reader(GzipDecompressorStream(open(assembly)))

    else

        reader = FASTA.Reader(open(assembly))

    writer = open(FASTA.Writer, "/home/people/robmur/ku_00014/data/termite_metagenome/pre-post_fungus/sequences/final_assemblies/500bp_filter_samples/"*sample)

    println("writing to file")

    @threads for record in reader

        if FASTA.identifier(record) in passContig

            write(writer, record)

        end

    end

    close(reader)

end # module

test="myTestFile.txt"

if splitext(test)[2] == ".gz"

    println("it is a gz file")

else

    println("it is a fasta file")

end

It’s not just for HPCs. It really depends on the nature of your problem and how you want to divide your work.

For instance, I have two networked PCs at home and I can take advantage of Distributed and pmap across them both.

It can also depend on the shared state of your modules. PyPlot, for instance, will not allow multi-threaded access, so you cannot generate multiple plots in parallel using Threads, but you can using multiple Processes.

1 Like

You only need to start Julia with multiple threads, i.e. with -t N where N is the number of threads. By default, Julia starts in single threaded mode.

âžś  ~  julia -t 4
               _
   _       _ _(_)_     |  Documentation: https://docs.julialang.org
  (_)     | (_) (_)    |
   _ _   _| |_  __ _   |  Type "?" for help, "]?" for Pkg help.
  | | | | | | |/ _` |  |
  | | |_| | | | (_| |  |  Version 1.7.0-rc3 (2021-11-15)
 _/ |\__'_|_|_|\__'_|  |  Official https://julialang.org/ release
|__/                   |

julia> using Base.Threads: @threads, nthreads

julia> using BenchmarkTools

julia> function mysum(xs)
           s = zero(eltype(xs))
           for x in xs
               s += x
           end
           return s
       end
mysum (generic function with 1 method)

julia> function mysum_threads(xs)
           @assert length(xs)%nthreads() == 0
           L = length(xs)Ă·nthreads()
           subsums = zeros(eltype(xs), nthreads())

           Threads.@threads for i in 1:nthreads()
               range = 1+(i-1)*L:L+(i-1)*L
               subsums[i] = @views sum(xs[range])
           end
           return sum(subsums)
       end
mysum_threads (generic function with 1 method)

julia> xs = rand(1_000_000);

julia> @btime mysum($xs);
  1.544 ms (0 allocations: 0 bytes)

julia> @btime mysum_threads($xs);
  73.896 ÎĽs (21 allocations: 1.91 KiB)

As you can tell, the threaded function is much faster.

Note that I had to write mysum_threads in a different way. If I would have just mysum with an additional @threads in front of the loop I would get a race condition (different threads “fight” over trying to update the variable s at the same time). This just gives you wrong results:

julia> function mysum_threads_race(xs)
           s = zero(eltype(xs))
           Threads.@threads for x in xs
               s += x
           end
           return s
       end
mysum_threads_race (generic function with 1 method)

julia> mysum(xs)
499706.19076058315

julia> mysum_threads_race(xs)
125071.96748508354

julia> mysum_threads(xs)
499706.1907605851

In general, before (thinking about) going parallel, you should make the serial version reasonably fast. Using parallelization to “circumvent” serial optimization is a bad idea and almost never works. So, make sure that your serial code doesn’t unnecessarily allocate temporary arrays etc. before going parallel.

3 Likes

You have what might be described as a kind of “race condition” here in this code because the write order of the records is non-deterministic.

writer = open(FASTA.Writer, "/path/"*sample)
    @threads for record in reader
        if FASTA.identifier(record) in passContig
            write(writer, record)
        end
    end

as an aside the following is more idiomatic Julia. Your code doesn’t close(writer) whereas this will close it at the end of the do writer … end block.

open(FASTA.Writer, "/path/"*sample) do writer
    @threads for record in reader
        if FASTA.identifier(record) in passContig
            write(writer, record)
        end
    end
end

and just to muddy the waters even more …
You could split it into two threads using channels, one for read I/O and one for write I/O, and use broadcast … filter … instead of for … if …
This will also preserve the read/write order

wchan = Channel(size=3, spawn=true) do record
    open(FASTA.Writer, "/path/"*sample) do writer
        while (record = take!(wchan)) !== nothing
            write(writer, record)
        end
    end
end

broadcast(record->put!(wchan, record), filter(record -> FASTA.identifier(record) in passContig, reader))

put!(wchan, nothing)
1 Like

But the individual writes are guaranteed to be atomic due to the nature of Julia threading?

Yes, I did go look too. As far as I can tell, the calls are made with all the data at once. The Linux syscall for write seems to have no limit on datasize. Obviously I can’t check Windows is the same.

1 Like

Stackoverflow disagrees;)

Hmm how best to speed this code up then!

To be on the safe side you should be using some locking strategy?

So if I am understanding (completely new to this) I need a write lock to ensure that multiple threads don’t try write to the same file at the same time? OR is it to avoid racing?

and here is another post which agreees with that but in a different way

Great. I love learning things :slight_smile:
The way to deal with that is to use a ReentrantLock

This is from some code of mine, and shows a way to Threads without using @threads

    lk = ReentrantLock()
    open(joinpath(dir, "$(dataname).data.mjs"), "w") do io
        @sync begin
           Threads.@spawn report_data_json(emps, branches, io, lk)
           Threads.@spawn chart_data_json(leads, period, io, lk)
        end
    end

The @sync block waits until the @spawn’ed tasks complete. I use the lk in the spawned threads so that the multilple JSON objects they create do not interleave.

The functions themselves spawn futher threads - the top level @sync cannot see the spawns inside the function - they only have lexical scope - so another @sync is needed

function report_data_json(emps, branches, io, lk)
    @sync begin
        Threads.@spawn branches_summed_json(branches, io, lk)
        Threads.@spawn employees_summed_json(emps, io, lk)
        Threads.@spawn employees_split_json(emps, io, lk)
    end
end

Eventually they get to do the actual write which uses the lock like this

    lock(lk)
    try
        to_js_value(io, df, ["BranchId", "Id", "What"], "export const employees_split")        
    finally
        unlock(lk)
    end
2 Likes

:slight_smile: you were looking while I was typing

1 Like

True race conditions are different, the values can change underneath you

a = 0
printab() = begin global a; b = a; sleep(2rand()); println("a:$a, b:$b"); end
inca() = begin global a; sleep(2rand()); a = a + 1; end
@sync begin
  Threads.@spawn inca()
  Threads.@spawn inca()
  Threads.@spawn printab()
  Threads.@spawn inca()
  Threads.@spawn inca()  
  Threads.@spawn printab()
end

example output

a:2, b:0
a:2, b:0
Task (done) @0x00007fd33901ab60

So you need to manage operations. The @atomic macros can help too

Some new versions of which are in the 1.7 release

Many thanks for all the info! It is a lot to absorb and try to understand.

Your example of using @sync is interesting! Would it work in this instance where there is only one task occurring inside the loop?

The preferred goal if for my to spawn multiple iterations of the loop at once all writing to the same writer and if I understand the @sync + @spawn combo allows for multiple tasks within a loop to occur at once?