for k0=1:nt:n k1 = min(k0+nt-1,n);
@threads for k=k0:k1 compute!(k, ...) end
compress_results!(k0, k1, ...) end
The result is correct, but execution is not faster. I’m guessing the main reason is that spawning new threads for each k is inefficient. I cannot move the parallelization to the outer loop, since different k represent subarrays of varying lengths on which we do computations and in the end, we move the results to the beginning of the chunk to save space.
Is there a way to spawn threads outside of the outer loop and then reuse them for each chunk k0:k1?
ChatGPT suggested a solution with Channels, which seems to give the correct result, but the execution is slower and @time reports lots of lock conflicts.
If the question is too vague, I can give more specifics: I’m trying to implement multithreaded product Z=X*Y of SparseMatrixCSCs with sizes (l,m) and (m,n). My compute! is spcolmul! that calculates the k-th column of Z, it requires subvectors rowval[ip+1:ip+l] and nzval[ip+1:ip+l] to do calculations and then stores the result in rowval[ip+1:ip+s] and nzval[ip+1:ip+s], where s (number of nonzeros in column) is typically much smaller than l (e.g. s=10^2 vs n=10^6). In non-parallel case, for next k we do ip += s and continue on the next subvectors. In parallel case, threads execute compute! on subvectors with ranges ip+1 : ip+l, ..., ip+(nt-1)*l+1 : ip+nt*l and after it is done, it moves the results to the start ip+1.
There is some overhead in spawning tasks, a few microseconds. There is a package, Polyester.jl, which avoids this. It is also possible to start tasks with @spawn prior to any loops, and let them wait for work (This is, more or less, what Polyester.jl does). You must then communicate the work to them, e.g. indices or a vector. The easiest is to use a Channel, but it possible to do it at a lower level, with Threads.Event or Threads.Condition, together with locks and/or @atomic struct fields or Atomic structs.
However, I side with @jling, make sure it is the actual task starts which slow you down, and not e.g. allocations or garbage collections. These things can be devastating for parallel computations.
Let me report some timings for n=10^6 and both matrices have 10 nonzeros per column:
original loop: for k=1:n:
6.467625 seconds (28 allocations: 1.248 GiB)
single thread processing the chunks: for k=k0:k1
6.929176 seconds (76 allocations: 1.250 GiB)
parallelism in each chunk: @threads for k=k0:k1
5.757460 seconds (11.42 M allocations: 1.936 GiB, 0.33% gc time)
parallelism in each chunk: @threads :static for k=k0:k1
15.217362 seconds (11.42 M allocations: 1.936 GiB, 0.36% gc time)
Julia has a lot of multithreading/task/channel functionality that I am not very familiar with. Could you please give some advice how to use it in my case?
There are way too many allocations in the parallel versions.
I didn’t understand this. Maybe can you elaborate a bit why the non-threaded loop seems to not suffer from this?
Maybe it would be faster to give each thread (Task would be the correct term here) its own work space. I.e. you chunk the the outer loop and then each chunk gets its own task where it allocates a work area to operate on. Yes this will use more memory overall but if you can afford that it can be much faster. Multiple threads writing to the same area of memory can potentially kill performance.
@sgaure With Polyester.@batch, the timing is no better, but at least the allocations are minimal:
7.019002 seconds (76 allocations: 1.250 GiB, 0.33% gc time)
@abraemer Let’s say we have two n\times n matrices, each has s nonzero elements per column, with n=10^6 and s=10. When multiplying them, the product Z will be of size n\times n, each column will have at most s^2=100 entries. To compute those 10^2 entries with Gustavson’s algorithm (in my original post I included a link to SparseArrays.jl), I need a subvector of length n. After the computation is done, the results are compacted to use up only ≤s^2 entries in the vectors rowval and nzval. Hence when computing the next column, the space for computation (subvector of length n) is just slightly shifted (by at most s^2). However, if I want to do this in parallel, I need nt such subvectors where I do computations, and these subvectors are moving as k0 increases. So for each chunk I must do parallelization only in the inner loop, because I don’t want to allocate a vector of size n*n=10^12 but just nt*n=10^7 when using 10 threads.
If you do a sparse matmul of two n \times n matrices, then I’d try to parallelize this by either chunking the first matrix along its rows (so having k\frac{n}{k} \times n matrices) or chunking the right hand-side matrix along the columns. If you use CSC format then the latter is trivial. Then you can keep using the same algorithm.
It is also possible to start tasks with @spawn prior to any loops, and let them wait for work (This is, more or less, what Polyester.jl does). You must then communicate the work to them, e.g. indices or a vector. The easiest is to use a Channel , but it possible to do it at a lower level, with Threads.Event or Threads.Condition , together with locks and/or @atomic struct fields or Atomic structs.
Starting tasks outside of loops, which wait to be given work, sounds like what I’m looking for. Could you tell me a bit more how to do this?
That’s exactly what I’m doing. But the second chunk can be computed only when the first is finished, because the end of the first chunk is the beginning of the second. Unless I allocate more, do chunks separately, and in the end concatenate them.
Yes that is what I am proposing. The memory overhead could very well be worth it because in turn the chhunks have a decend size and are truly independent.
Could it be the compress_results! which takes most of the time? You can try to time them individually.
looptime = 0.0
compresstime = 0.0
for k0=1:nt:n
k1 = min(k0+nt-1,n);
looptime += @elapsed for k=k0:k1
compute!(k, ...)
end
compresstime += @elapsed compress_results!(k0, k1, ...)
end
@show looptime compresstime
If the individual times are very short, there is a nano-second timer time_ns() which can be used as start = time_ns(); smthing(); timeused += time_ns()-start. Alternatively, you can figure out where the program spends its time with e.g. ProfileView.jl or PProf.jl.
No, compress_results! is negligible. It is used in the second benchmark that I posted, where I process chunks with a single thread, and it takes 6.92s instead of the original loop that needs 6.46s.
Could you just describe how it can be done with tasks that are waiting for work? I presume it can be done in no more than 10 or 20 lines of code.
P.S. Going to sleep now, will return tomorrow, hopefully this thread won’t fade by then.
Ok, will try the suggesion by abraemer, thank you! But my original questions still stands: how to spawn tasks/workers outside the loop and give them jobs inside the inner loop.
tasks = Task[] # array for keeping tasks references
channel = Channel{Any}(Inf)
for _ in 1:nthreads()
t = @spawn begin
while true
indices = take!(channel)
do the work on indices
store the result
end
end
push!(tasks, t)
end
# put work to the channel
for ...
put!( channel, indices )
end
# close the channel, this will terminate the processes when it is empty.
close(channel)
wait.(tasks) # wait until they're all finished.
If you always put! thing like k0:k1, the channel can be created with Channel{UnitRange{Int}}(Inf). Or 0 instead of Inf, i.e. an unbuffered channel. I doubt this can be done faster than Polyester.jl