Questions about parallel computing

I want to calculate human chromosomes’s something, and then using each chromosomes for parallel computing, and finally write it into a csv file, but when I use the Threads.@threads macro, I get an error, and if I don’t add this macro, I can run it directly.

using CSV,DataFrames
......
Threads.@threads for number in 1:chr.ngroups#representing each chromosome
               chromosome=chr[number]
               start=chromosome[1,3];final=chromosome[end,3]
               chromosomename=chromosome[1,1]
               println('[',Dates.format(now(), "YYYY-m-d HH:MM:SS"),']'," Start calculating ",chromosomename,"'s pdrs")
               chdf=generatechdf(reader,chromosomename,start,final)#the function was defined by myself
               chdf=lastchdf(chdf)
               pdrs=repeat([(NaN,0,0,0)],nrow(chromosome))
               pdrspos=calculatepospdrs(chdf,chromosome)#the function was defined by myself
               pdrsneg=calculatenegpdrs(chdf,chromosome)#the function was defined by myself
               pdrs[findall(==("C"),chromosome.Column2)]=pdrspos
               pdrs[findall(==("G"),chromosome.Column2)]=pdrsneg
               pd=vcat(pd,rename!(DataFrame(pdrs),[:pdr,:discordant,:sum,:allsum]))#this step can intergrate each chromosome
               println('[',Dates.format(now(), "YYYY-m-d HH:MM:SS"),']'," Finish calculating ",chromosomename,"'s pdrs")
               end

[2022-10-18 14:40:23] Start calculating chr16's pdrs
[2022-10-18 14:40:23] Start calculating chr21's pdrs
[2022-10-18 14:40:23] Start calculating chr7's pdrs
[2022-10-18 14:40:23] Start calculating chr1's pdrs
ERROR: TaskFailedException
Stacktrace:
 [1] wait
   @ ./task.jl:334 [inlined]
 [2] threading_run(func::Function)
   @ Base.Threads ./threadingconstructs.jl:38
 [3] top-level scope
   @ ./threadingconstructs.jl:97

    nested task error: BGZFStreams.BGZFDataError("invalid gzip identifier")
    Stacktrace:

I still have a problem, I inside the for loop vcat each chromosome calculation results, but I want to let it go output according to the order of chromosome 1, 2, 3 and so on, but it isn’t parallel to calculate according to the order of 1, 2, 3(like 16 ,21 ,7 ,1)and next it will be different.And when they finally vcat two chromosomes is finished at the same time, then this is disordered?

I would try not to mutate any shared state inside your loop (i.e. resize your dataframe). Get the results first and then concatenate them afterwards. For example:

results = Vector{Any}(undef, N)
Threads.@threads for i = 1:N
    results[i] = mycalc(chr[i])
end
# combine the results here

If you know the size and shape of the output, you can change the type of results for your problem. It is always a good idea to break your code up into smaller functions as well.

I would also remove all the print statements. If you want some form of progress, instead of printing you can use something like ProgressBars.jl:

using ProgressBars

#...
Threads.@threads for i in ProgressBar(1:N)
#.. Etc
1 Like

Thanks.I would try that

I just test the vcat command.But when I canceled the vcat command.I got the same error.

Threads.@threads for number in 1:chr.ngroups
        chromosome=chr[number]
        start=chromosome[1,3];final=chromosome[end,3]
        chromosomename=chromosome[1,1]
        println('[',Dates.format(now(), "YYYY-m-d HH:MM:SS"),']'," Start calculating ",chromosomename,"'s pdrs")
        chdf=generatechdf(reader,chromosomename,start,final)
        chdf=lastchdf(chdf)
        pdrs=repeat([(NaN,0,0,0)],nrow(chromosome))
        pdrspos=calculatepospdrs(chdf,chromosome)
        pdrsneg=calculatenegpdrs(chdf,chromosome)
        pdrs[findall(==("C"),chromosome.Column2)]=pdrspos
        pdrs[findall(==("G"),chromosome.Column2)]=pdrsneg
        #pd=vcat(pd,rename!(DataFrame(pdrs),[:pdr,:discordant,:sum,:allsum]))
        println('[',Dates.format(now(), "YYYY-m-d HH:MM:SS"),']'," Finish calculating ",chromosomename,"'s pdrs")
        end
[2022-10-18 20:17:49] Start calculating chr21's pdrs
[2022-10-18 20:17:49] Start calculating chr1's pdrs
[2022-10-18 20:17:49] Start calculating chr7's pdrs
[2022-10-18 20:17:49] Start calculating chr16's pdrs
ERROR: TaskFailedException
Stacktrace:
 [1] wait
   @ ./task.jl:334 [inlined]
 [2] threading_run(func::Function)
   @ Base.Threads ./threadingconstructs.jl:38
 [3] top-level scope
   @ ./threadingconstructs.jl:97

    nested task error: BGZFStreams.BGZFDataError("invalid gzip identifier")
    Stacktrace:
      [1] bgzferror(message::String)
        @ BGZFStreams ~/anaconda3/envs/julia/share/julia/packages/BGZFStreams/bsx6S/src/bgzfstream.jl:350
      [2] read_bgzf_block!(input::IOStream, block::Vector{UInt8})
        @ BGZFStreams ~/anaconda3/envs/julia/share/julia/packages/BGZFStreams/bsx6S/src/bgzfstream.jl:415
      [3] read_blocks!(stream::BGZFStreams.BGZFStream{IOStream})

I cannot know how to parallel this. * Should I write a function to contain the intermediate command?

Normally, I would say solve your other problems first, and then address threading problems. But in this case, I’m actually more worried about the cause of the threading problem.

When you get an error like that, try to get some idea about where it’s coming from — even if you don’t entirely understand what the code is doing. In this case, the error complains about an “invalid gzip identifier”. This means that there’s some gzip file being used somewhere in your code, and presumably the name of that file is not changing during each iteration of the for loop. It works just fine when you only use one thread, because the separate iterations of the for loop are never running at the same time, so they can do whatever they want to the file. But with threads, two or more iterations are trying to do different things to the file at the same time. Obviously, this isn’t possible. You’ve discovered why thread safety matters.

Now, you’ve hidden some code from us, so we can’t know what’s going on, but my guess is that you’re writing this file in generatechdf, and then trying to use it in calculatepospdrs. But some other thread is doing the same, so the file will get corrupted between those two lines. In case there’s something worse happening, I would try to solve that problem first. If possible, try to work without files at all. Otherwise, use a different file name for each thread.

1 Like