Is there a @parallel macro to parallelize independed expressions in a block?

I’m looking for a macro, call it @parallel here, that transcribes code like this:

@parallel begin
    res1 = expr1
    res2 = expr2
    …
    resN = exprN
end

into

tsk1 = Threads.@spawn expr1
tsk2 = Threads.@spawn expr2
…
tskN = Threads.@spawn exprN
res1 = fetch(tsk1); res2 = fetch(tsk2); … resN = fetch(tskN)

I looked in the usual threading packages, but they all seem to be only concerned with parallelizing for-loops.

Does anyone know a solution to this?

I’m not sure of a macro like that, but you can have containers of expressions, like:

julia> f(x) = x^2; g(x) = x^3; h(x) = x^4;

julia> exprs = [ f, g, h ]
3-element Vector{Function}:
 f (generic function with 1 method)
 g (generic function with 1 method)
 h (generic function with 1 method)

julia> Threads.@threads for func in exprs
           println(func(2.0))
       end
4.0
8.0
16.0

Even better, the OhMyThreads.jl tcollect function might work:

julia> results = tcollect(func(2.0) for func in exprs)
3-element Vector{Float64}:
  4.0
  8.0
 16.0
1 Like

I don’t know if there is a macro for that bu it’s simply like

using Base.Threads 

function f(x)
    sleep(x)
    return 2x
end 

result = fetch.([
    @spawn f(1);
    @spawn f(2);
    @spawn f(3);
    @spawn f(4)])

println(result)

# The output is generated as [2, 4, 6, 8] in nearly 4 secs.

Note: If you want to name them you can also do

result1, result2, result3, result4= fetch.([
    @spawn f(1);
    @spawn f(2);
    @spawn f(3);
    @spawn f(4)])

The reason behind my question is that I was looking for a simple was to go though my code and annotate blocks of long-running but independent statements without obfuscating the program logic with boiler plate code. At the moment I’m going with @spawn and fetch in one or the other way. But I was hoping for some macro wizard to pop up who would help me to de-uglify my code again…
I really have no experience in writing macros and my first attempts did not work…

I don’t know of one and it’s very unlikely to find a macro that requires a compound expression of subexpressions that are strictly single-line assignments, and especially for Threads.@spawn to split one global or local scope into several locally scoped tasks (no such problem per loop iteration). But if you know your input code is safe for it, this could be done with a macro.

Share what you’ve tried and why it didn’t work.

I tried the following (which ignores the assignments for now), but got the problem that the expressions are not evaluated in the correct context.

macro parallel(exp)
    if exp.head === :block
        ret = quote
            tasks = Task[]
        end
        exps = exp.args
        for i in eachindex(exps)
            ex = exps[i]
            if ex isa Expr # ignore line number nodes
                push!(ret.args, :(push!(tasks, Threads.@spawn($ex))))
            end
        end
        push!(ret.args, :(map(fetch, tasks)))
        return ret
    end
end

str = "Hello"

function testsp()
    str = "Not"
    tsk = Threads.@spawn begin
        println("startet on thread: ", Threads.threadid())
        str*" again (@spawn)"
    end
    fetch(tsk)
end

function test()
    str = "Not"
    @parallel begin
        begin
            println("startet on thread: ", Threads.threadid());
            str*" again (@parallel)"
        end
    end
end

println(testsp())
println("***************************")
println(test())

If you run it the output is:

startet on thread: 3
Not again (@spawn)
***************************
startet on thread: 2
["Hello again (@parallel)"]

As you can see, using @spawn directly produces the correct output, using the local definition of str, but my macro uses the global definition…

1 Like

Your macro is using the global definition in the transformed code because of macro hygiene. You’ll want to read the manual for the exact rules, but basically the macro definition will resolve some symbols in its returned expression in the definition scope instead of the call’s scope, and there’s various ways of opting out. In cases like yours, you really only need symbols to be resolved in the call scope, so we can entirely opt out with esc. Note that macro hygiene doesn’t apply to Expr transforming anywhere else, but if you use a function call to generate the macro’s output expression, that expression will be subject to macro hygiene:

julia> blah() = :(str)
blah (generic function with 1 method)

julia> blah() # exactly what you generate
:str

julia> macro blah() blah() end # macro just uses function
@blah (macro with 1 method)

julia> @macroexpand @blah # hygiene expanded in definition scope
:(Main.str)

julia> blah() = esc(:(str))
blah (generic function with 1 method)

julia> blah()
:($(Expr(:escape, :str)))

julia> @macroexpand @blah # hygiene: expanded in call scope
:str

I don’t know if that fixes everything in your macro, and I will instead transform code directly like your first example. I don’t know if your first example would actually work either, but at least it’ll show you the mandatory assignment bit:

macro parallel(ex) # avoid exp, that's a Base function
    if !(ex isa Expr && ex.head === :block)
        throw(ArgumentError("@parallel requires 1 `begin` expression."))
    end
    # begin block in expression is a quote, and this is simple enough to mutate
    for assign in view(ex.args, 1:length(ex.args)) # don't work on lines we add
        if assign isa LineNumberNode continue end # just skip these
        if !(assign isa Expr && assign.head === :(=))
            throw(ArgumentError("@parallel requires all subexpressions to be assignments."))
        end
        var = assign.args[1]
        assign.args[1] = Symbol("tsk_", var) # generate task name, could do this another way
        assign.args[2] = :(Threads.@spawn $(assign.args[2]))
        push!(ex.args, :($var = fetch($(assign.args[1]))))
    end
    return esc(ex) # resolve all in call scope
end
julia> @macroexpand1 @parallel begin # don't expand recursively to see @spawn calls
         res1 = expr1
         res2 = expr2
         resN = exprN
       end
quote
    #= REPL[97]:2 =#
    tsk_res1 = #= REPL[96]:12 =# Threads.@spawn(expr1)
    #= REPL[97]:3 =#
    tsk_res2 = #= REPL[96]:12 =# Threads.@spawn(expr2)
    #= REPL[97]:4 =#
    tsk_resN = #= REPL[96]:12 =# Threads.@spawn(exprN)
    res1 = fetch(tsk_res1)
    res2 = fetch(tsk_res2)
    resN = fetch(tsk_resN)
end

julia> @macroexpand @parallel begin # we do expand recursively to check hygiene
           x = begin
                   println("startet on thread: ", Threads.threadid());
                   str*" again (@parallel)"
               end
       end
quote
    #= REPL[120]:2 =#
    tsk_x = begin
            #= threadingconstructs.jl:484 =#
            let
                #= threadingconstructs.jl:485 =#
                local var"#59#task" = Base.Threads.Task((()->begin
                                    #= REPL[118]:13 =#
                                    begin
                                        #= REPL[120]:3 =#
                                        println("startet on thread: ", Threads.threadid())
                                        #= REPL[120]:4 =#
                                        str * " again (@parallel)"
                                    end
                                end))
                #= threadingconstructs.jl:486 =#
                (var"#59#task").sticky = false
                #= threadingconstructs.jl:487 =#
                Base.Threads._spawn_set_thrpool(var"#59#task", :default)
                #= threadingconstructs.jl:488 =#
                if $(Expr(:islocal, Symbol("##sync#41")))
                    #= threadingconstructs.jl:489 =#
                    Base.Threads.put!(var"##sync#41", var"#59#task")
                end
                #= threadingconstructs.jl:491 =#
                Base.Threads.schedule(var"#59#task")
                #= threadingconstructs.jl:492 =#
                var"#59#task"
            end
        end
    x = fetch(tsk_x)
end

Thanks very much @Benny! That did the trick! Some day I will hopefully learn the escaping rules…
In the initial post I included the assignments as if they were mandatory, but actually one should be able to mix expressions with assignment and without. So I extend the code to the following form:

macro parallel(ex)
    if !(ex isa Expr && ex.head === :block)
        throw(ArgumentError("@parallel requires 1 `begin` expression."))
    end
    # begin block in expression is a quote, and this is simple enough to mutate
    for i in 1:length(ex.args) # don't work on lines we add
        statement = ex.args[i]
        statement isa LineNumberNode && continue # just skip these
        taskid = gensym("taskid") # generate task name
        if statement isa Expr 
            if statement.head === :(=)
                var = statement.args[1]
                statement.args[1] = taskid 
                statement.args[2] = :(Threads.@spawn $(statement.args[2]))
                push!(ex.args, :($var = fetch($(taskid))))
            else
                ex.args[i] = :($taskid = Threads.@spawn $(statement))
                push!(ex.args, :(wait($(taskid))))
            end
        else
            # no need to spawn if it is an immediate value
            # one could also error here, because case makes no sense, as it has no effect!?! 
        end
    end
    return esc(ex) # resolve all in call scope
end

Up to now it passes all my testing. What it currently does not support are cascaded assignments or assignments nested deeper. But that’s something which would it make much more difficult to solve, if possible at all by a macro…

Thanks again!

And what I just noticed is that cascaded assignments or assignments within a statement also work, if the variables are already defined outside of the @parallel block!