`break` with a threaded loop?

There are two problems in this code.

First, init = Int[] means that this vector is shared across multiple tasks (and threads if Threads.nthreads() > 1). It means that the same vector is mutated by multiple tasks that can run on different threads. So, this code is not thread-safe (in particular can have data races). You need to pass OnInit(() -> Int[]) or EmptyVector{Int}() etc.

Second, the binary function passed to foldxt is not “good” in the sense that it probably is not doing what you want it to do. The binary function passed to foldxt have to combine two intermediate results. But, your function ignores the second argument. It means that it only process the first basecase. It can be observed by something like

julia> foldxt(1:100, init=EmptyVector{Int}(), basesize=10) do r, _
           # sleep(0.01)
           r = append!!(r, SingletonVector(Threads.threadid()))
           if length(r) >= 100
               reduced(r)
           else
               r
           end
       end
10-element Vector{Int64}:
 1
 1
 1
 1
 1
 1
 1
 1
 1
 1

You can see that the output has 10 elements even if the input has 100 elements.

Note that I use init=EmptyVector{Int}(). If you use init=Int[], it may look like doing what you want it to do. However, as I said, it was not a valid Julia program.

Perhaps what you wanted to write was something like this:

1:1000 |>
Map() do _
    sleep(0.01)
    SingletonVector(Threads.threadid())
end |>
foldxt(init = OnInit(() -> Int[]), basesize = 1) do a, b
    r = append!!(a, b)
    if length(r) >= 100
        reduced(r)
    else
        r
    end
end
3 Likes