Behavior of threads

Yes, that makes sense, but why not just pass only the view of the global array the task needs, instead of passing in the whole array & indexing inside of each task? That achieves exactly the same thing, without both task local storage and without the need for accessing a hypothetical task id.

I guess it comes down to a difference in style, one the one side just having a global array and indexing into it with whatever scheme you deem good, versus only giving each task access to exactly what it needs, and no more.

But maybe Iā€™m missing something that breaks in your particular use - a concrete example would be appreciated.

The difference is that during execution in OpenMP, you only ever have up to the number of OpenMP threads different threads of execution. In Julia, this is not the case - we have tasks, which abstract over an OS/hardware thread and can in fact move between them. I.e., in one moment threadid() may give 1, and in the next it may give 23 (in a cpu that has e.g. 24 hardware threads and julia spawned that many threads for its internal task queue). So when you have a global array and index into it with threadid(), you may end up writing to a different location than you did last time, which is a data race, since you presumably didnā€™t intend to write there. You can find some more information here

Our multithreading is decidedly NOT the same as that of OpenMP, and shouldnā€™t be confused as such. You will run into issues otherwise.

Ok, so here is a sketch of the problem as I perceive it:

Consider a function like this:

function f(data, a::T)  where {T}
    ... process data
    a(result) # and stash the result
    return a
end

There may be dozens of other functions like this, g, h, ā€¦
which may be called either like this

f(the_whole_data, a)

or like this

@sync begin
Base.Threads.@threads for i in 1:N
    chunk = i-th chunk of the whole data
    @spawn f(the_whole_data[chunk], a)
end
end

The functions f, g, ā€¦ do not care and should not care
whether they are run in a serial fashion on the whole data
or in a parallel fashion on chunks.
However, the callable object a will know what the form of execution is
and needs to stash the results accordingly: It refers
to an array which either can be populated in its entirety (serial execution),
or in sections corresponding to each chunk (parallel execution).
I do not want to (cannot) redesign the interfaces of the
functions f, g, ā€¦ to pass in additional information regarding
the chunks. So how will the object a know what chunk is being worked on?

Couldnā€™t callable a store this information in its own state? (e.g. as a capture if a is a closure, or as an additional field if a is of a custom callable type)

Something along the lines of:

function f(input_data, stash::T)  where {T}
    # ... process data
    stash(result) # and stash the result
    return a
end

function stash!(output_data, result)
    output_data .= result
end


let
    the_whole_data = ...
    the_whole_output = ...

    Base.Threads.@threads for i in 1:N
        chunk = i-th chunk of the whole data

        # create a specific callable that knows where to stash the results
        # (a closure in this case)
        stash(result) = stash!((@view the_whole_output[chunk]), result)

        # I don't think you need an explicit spawn here
        f(the_whole_data[chunk], stash)
    end
end
1 Like

Here you are addressing the data by index i. Thatā€˜s fine because each i is a separate task so itā€˜s not racey. Indexing chunks by threadid() would be problematic.

The @spawn inside the loop looks strange and is not needed, though.

3 Likes

Usually one would make a copy of a for each chunk and have a reduction function later. Or if a is an array of the same length of the data, pass the slice of it the function as well.

That depends on what a is exactly. There isnā€™t a single solution for all types of reductions.

IIUC, in @PetrKryslUCSDā€™s design a is a callable object that performs the reduction. I.e. it is not an array (or slice) in which results are stored, but rather a function that stores the results in (a slice of) the destination array.

In that case make a copy of a for each chunk and reduce later. I donā€™t see anyway to a know what to update. If that is too large to store, I think one would need to create ā€œchunk-sizedā€ versions of a to that, like a structure similar to a but containing views of the final output with appropriate size. Also to be reduced later.

Something like this:

using ChunkSplitters

struct A{T}
    output_big_array::T
end
# some function that operates on the array contained in a, given
# a parameter x and an input array v
function (a::A)(x,v) 
    a.output_big_array .= a.output_big_array .+ x * v
    return a
end

# function that updates the array contained in A
function f!(input_big_array, a) 
    a(2.0, input_big_array)
end

input_big_array = ones(1000)
output_big_array = zeros(1000)

function foo(input_big_array, output_big_array; nchunks = Threads.nthreads())
    # create a structure for each chunk, with a slice of the array; r is a range
    a_chunks = [ A(@view(output_big_array[r])) for (r,_) in chunks(output_big_array,nchunks) ]
    Threads.@threads for (r, ichunk) in chunks(input_big_array, nchunks)
        f!(@view(input_big_array[r]), a_chunks[ichunk])
    end 
    # create the final structure
    a = A(output_big_array)
    return a
end

1 Like

Sorry, copy-paste error. It should have looked like this:

@sync begin
for i in 1:N
    chunk = i-th chunk of the whole data
    @spawn f(the_whole_data[chunk], a)
end
end

Wouldnā€™t this version

@sync begin
Base.Threads.@threads for i in 1:N
    chunk = i-th chunk of the whole data
    f(the_whole_data[chunk], a)
end
end

be more efficient than this one?

@sync begin
for i in 1:N
    chunk = i-th chunk of the whole data
    @spawn f(the_whole_data[chunk], a)
end
end

since the runtime will do only the necessary spawns instead of N spawns if N happens to be very large?

I am assuming that the dynamic thread scheduling of Julia is influenced by and does something similar to the cilk_for of OpenCilk to schedule loop iterations. cilk_for loop iterations are recursively split until the number of iterations reach ā€œgrainsizeā€, a metric of how much work should be in each parallel task. Grainsize can be declared by the programmer, or determined automatically by the Cilk runtime.

1 Like

I think in the first code fragment @sync is not needed, actually?

I elaborated these ideas in an application. The algorithm based on tasks does not work very well. Some of the tasks tend to take quite a bit longer than others. Perhaps you are onto something with the spawning?

The threaded version actually works quite well.

Some scaling data is provided in this thread: Parallel assembly of a finite element sparse matrix - #19 by PetrKryslUCSD

You can use ntasks >> nthreads to deal with that. And if there is any pattern that associates the task length with the index, using the :scatter chunking strategy may help as well.

1 Like

That is an interesting idea. Iā€™ll check it out.