Creating a macro to broadcast in parallel

Hey all,

I’m trying to create a macro to broadcast in parallel. I have a large dataframe and I frequently create new columns based on values from old columns.

Let’s take string concatenation for example.
The non parallel broadcasting version looks like this:

df.input .* df.target

and the parallel broadcasted version looks like this

function divide_range_into_max_n_chunks(r::UnitRange{Int64}, n::Int64)
    range_length = r.stop - r.start
    chunk_size = Int64(ceil(range_length / n))
    return [i:min(i + chunk_size - 1, r.stop) for i in r.start:chunk_size:r.stop]
end

df_length = nrow(df)
concatenated_col = Vector(undef, df_length)

@threads for subrange in divide_range_into_max_n_chunks(1:df_length, nthreads())
    concatenated_col[subrange] = view(df.input, subrange) .* view(df.target, subrange)
end

I would like to turn the code up there into a macro like this

@parallel_broadcast df.input .* df.target

but I’m very new to this and I’m not sure what I’m doing wrong. I piped the code above into Meta.show_sexpr and came up with this, but it doesn’t quite do the job. Here is what I have so far:

macro parallel_broadcast(col_a, broadcasted_operator, col_b)
    return (:macrocall, Symbol("@threads"), :(#= none:1 =#), (:for, (:(=), :subrange, (:call, :divide_range_into_max_n_chunks, (:call, :(:), 1, :df_length), (:call, :nthreads))), (:block,
      :(#= none:2 =#),
      (:(=), (:ref, :result_col, :subrange), (:call, broadcasted_operator, (:call, :view, col_a, :subrange), (:call, :view, col_b, :subrange)))
    )))
end

I cannot help, but I am curious: is there any reason to use a macro instead of wrapping that code into a function?

3 Likes

How tall your dataframe has to be for this to pay back?

If you have a function (that does a lot of computation) I can understand but if your macro is just targeting + - * / etc I doubt it will be worthwhile.

For anything that can be done using map instead of broadcast, you can use the ThreadsX library:

using ThreadsX
ThreadsX.map(*, df.input, df.target)
2 Likes

I made a package ExBroadcast.jl for similar purpose(unreleased, just for self usage). And any broadcast could be parallized by adding a @mtb.

using ExBroadcast
@mtb df.input .* df.target # dot expression is supported
@mtb broadcast(*, df.input,  df.target) # also the call version

I hope this help you.

You may be interested in Strided.jl which does pretty much what you are proposing.
https://github.com/Jutho/Strided.jl

5 Likes