How to fetch a generator of @spawn correctly?

I have the below MWE for a simple function test_serial(). I tried to parallelize its p1!()&p2!() functions. However, it seems I am not using the fetch correctly. Could you please guide me here?

function p1!(x)
  x .+= 1;
end

function p2!(x)
    x .*= 2;
end

#
function test_serial()
    x =[zeros(10),zeros(10)]
    for i in 1:5
        p1!(x[1])
        p1!(x[2])
        x[1] .+= 1;
        p2!(x[1])
        p2!(x[2])
    end
  return x
end
x = test_serial()
#=
julia> x
[124.0, 124.0, 124.0, 124.0, 124.0, 124.0, 124.0, 124.0, 124.0, 124.0]
 [62.0, 62.0, 62.0, 62.0, 62.0, 62.0, 62.0, 62.0, 62.0, 62.0]
=#
function test_parallel()
    x =[zeros(10),zeros(10)]
    for i in 1:5
        tasks = [Threads.@spawn begin
                            p1!(x[i])
                            end for i = 1:2];
        fetch(tasks);
        x[1] .+= 1;
        tasks = [Threads.@spawn begin
                            p2!(x[i])
                            end for i = 1:2];
        fetch(tasks);
    end
  return x
end
x = test_parallel()
#=
julia> x
2-element Vector{Vector{Float64}}:
 [218.0, 218.0, 218.0, 218.0, 218.0, 218.0, 218.0, 218.0, 218.0, 218.0]
 [74.0, 74.0, 74.0, 74.0, 74.0, 74.0, 74.0, 74.0, 74.0, 74.0]
=#

Any comment here please?

Seems like fetch only has sensible methods for a single task, channel, future etc. and a catch-all fallback for everything else defined as

fetch(@nospecialize x) = x

in task.jl (try running @which fetch([1, 2]) in the repl).

Thus, the call fetch(tasks) will dispatch to this fallback and not wait on anything. To actually wait for the tasks within the vector, broadcasting would work: fetch.(tasks).

1 Like

Yes fetch.(tasks) works.
Thank you very much!