Yes, that makes sense, but why not just pass only the view of the global array the task needs, instead of passing in the whole array & indexing inside of each task? That achieves exactly the same thing, without both task local storage and without the need for accessing a hypothetical task id.
I guess it comes down to a difference in style, one the one side just having a global array and indexing into it with whatever scheme you deem good, versus only giving each task access to exactly what it needs, and no more.
But maybe Iām missing something that breaks in your particular use - a concrete example would be appreciated.
The difference is that during execution in OpenMP, you only ever have up to the number of OpenMP threads different threads of execution. In Julia, this is not the case - we have tasks, which abstract over an OS/hardware thread and can in fact move between them. I.e., in one moment threadid() may give 1, and in the next it may give 23 (in a cpu that has e.g. 24 hardware threads and julia spawned that many threads for its internal task queue). So when you have a global array and index into it with threadid(), you may end up writing to a different location than you did last time, which is a data race, since you presumably didnāt intend to write there. You can find some more information here
Our multithreading is decidedly NOT the same as that of OpenMP, and shouldnāt be confused as such. You will run into issues otherwise.
Ok, so here is a sketch of the problem as I perceive it:
Consider a function like this:
function f(data, a::T) where {T}
... process data
a(result) # and stash the result
return a
end
There may be dozens of other functions like this, g, h, ā¦
which may be called either like this
f(the_whole_data, a)
or like this
@sync begin
Base.Threads.@threads for i in 1:N
chunk = i-th chunk of the whole data
@spawn f(the_whole_data[chunk], a)
end
end
The functions f, g, ā¦ do not care and should not care
whether they are run in a serial fashion on the whole data
or in a parallel fashion on chunks.
However, the callable object a will know what the form of execution is
and needs to stash the results accordingly: It refers
to an array which either can be populated in its entirety (serial execution),
or in sections corresponding to each chunk (parallel execution).
I do not want to (cannot) redesign the interfaces of the
functions f, g, ā¦ to pass in additional information regarding
the chunks. So how will the object a know what chunk is being worked on?
Couldnāt callable a store this information in its own state? (e.g. as a capture if a is a closure, or as an additional field if a is of a custom callable type)
Something along the lines of:
function f(input_data, stash::T) where {T}
# ... process data
stash(result) # and stash the result
return a
end
function stash!(output_data, result)
output_data .= result
end
let
the_whole_data = ...
the_whole_output = ...
Base.Threads.@threads for i in 1:N
chunk = i-th chunk of the whole data
# create a specific callable that knows where to stash the results
# (a closure in this case)
stash(result) = stash!((@view the_whole_output[chunk]), result)
# I don't think you need an explicit spawn here
f(the_whole_data[chunk], stash)
end
end
Here you are addressing the data by index i. Thatās fine because each i is a separate task so itās not racey. Indexing chunks by threadid() would be problematic.
The @spawn inside the loop looks strange and is not needed, though.
Usually one would make a copy of a for each chunk and have a reduction function later. Or if a is an array of the same length of the data, pass the slice of it the function as well.
That depends on what a is exactly. There isnāt a single solution for all types of reductions.
IIUC, in @PetrKryslUCSDās design a is a callable object that performs the reduction. I.e. it is not an array (or slice) in which results are stored, but rather a function that stores the results in (a slice of) the destination array.
In that case make a copy of a for each chunk and reduce later. I donāt see anyway to a know what to update. If that is too large to store, I think one would need to create āchunk-sizedā versions of a to that, like a structure similar to a but containing views of the final output with appropriate size. Also to be reduced later.
using ChunkSplitters
struct A{T}
output_big_array::T
end
# some function that operates on the array contained in a, given
# a parameter x and an input array v
function (a::A)(x,v)
a.output_big_array .= a.output_big_array .+ x * v
return a
end
# function that updates the array contained in A
function f!(input_big_array, a)
a(2.0, input_big_array)
end
input_big_array = ones(1000)
output_big_array = zeros(1000)
function foo(input_big_array, output_big_array; nchunks = Threads.nthreads())
# create a structure for each chunk, with a slice of the array; r is a range
a_chunks = [ A(@view(output_big_array[r])) for (r,_) in chunks(output_big_array,nchunks) ]
Threads.@threads for (r, ichunk) in chunks(input_big_array, nchunks)
f!(@view(input_big_array[r]), a_chunks[ichunk])
end
# create the final structure
a = A(output_big_array)
return a
end
@sync begin
Base.Threads.@threads for i in 1:N
chunk = i-th chunk of the whole data
f(the_whole_data[chunk], a)
end
end
be more efficient than this one?
@sync begin
for i in 1:N
chunk = i-th chunk of the whole data
@spawn f(the_whole_data[chunk], a)
end
end
since the runtime will do only the necessary spawns instead of N spawns if N happens to be very large?
I am assuming that the dynamic thread scheduling of Julia is influenced by and does something similar to the cilk_for of OpenCilk to schedule loop iterations. cilk_for loop iterations are recursively split until the number of iterations reach āgrainsizeā, a metric of how much work should be in each parallel task. Grainsize can be declared by the programmer, or determined automatically by the Cilk runtime.
I think in the first code fragment @sync is not needed, actually?
I elaborated these ideas in an application. The algorithm based on tasks does not work very well. Some of the tasks tend to take quite a bit longer than others. Perhaps you are onto something with the spawning?
You can use ntasks >> nthreads to deal with that. And if there is any pattern that associates the task length with the index, using the :scatter chunking strategy may help as well.