How to parallelize list comprehension and map with multithreading?

The macro Threads.@threads parallelizes for loops. Is there something similar for list comprehension and map? I can trivially rewrite them as for loops, but prefer not to.

For map there is pmap. I don’t think you can parallelize comprehensions as-is.

There is a package ThreadsX, which can parallelize map and do much more.

And it can do comprehensions as well

using ThreadsX

xs = rand(1000)
function f(x)
    r = 0.0
    for i in 1:5_000_000
        r += sqrt(i)
    end
    return x*r
end

julia> Threads.nthreads()
4

julia> @time ThreadsX.collect(f(x) for x in xs);
2.241343 seconds (2.86 k allocations: 216.922 KiB)

julia> @time ThreadsX.map(f, xs);
2.233077 seconds (2.86 k allocations: 217.016 KiB)

julia> @time [f(x) for x in xs];
8.352408 seconds (2 allocations: 7.953 KiB)
7 Likes
  1. How can I use ThreadsX with the comprehension like the one below?
my_function(x,y) = x+y
xs=[1, 2, 3, 4, 5]
ys=[10, 20, 30, 40, 50]
MyOutput = [my_function(x,y) for x in xs, y in ys]

  1. I ask about this also for the reason that I have a problem with ThreadsX.map and ThreadsX.collect. That is, adding ThreadsX works for a simple code like the one below:
using ThreadsX
using BenchmarkTools

xs = rand(1000)
function f(x)
    r = 0.0
    for i in 1:5_000_000
        r += sqrt(i)
    end
    return x*r
end
@btime [f(x) for x in xs] 
@btime collect(f(x) for x in xs) 
@btime ThreadsX.collect(f(x) for x in xs) 

Indeed, for the collect, there are significant speed gains:

11.575 s (2 allocations: 7.95 KiB)
11.582 s (2 allocations: 7.95 KiB)
1.992 s (771 allocations: 105.72 KiB)

However, for a bit more complicated application with map, ThreadsX actually slows the code:

@btime mysum1 = ThreadsX.map(((x,y) for x in rand(10000), y in rand(10000))) do (x,y)
       2x+3y
end

@btime mysum2 = map(((x,y) for x in rand(10000), y in rand(10000))) do (x,y)
       2x+3y
end
1.875 s (4318 allocations: 4.06 GiB)
531.316 ms (6 allocations: 763.09 MiB)

The worst thing, however, is that with a more complex application with collect or map, adding ThreadsX to the (big program) code

myOutputs = ThreadsX.map(((x,y) for x in Xs, y in Ys)) do (x,y)
       myFunctionl(x,y)
end

or

myOutputs = ThreadsX.collect(myFunction(x,y) for x in Xs, y in Ys)

breaks the execution and altogether quits Julia, which I’ve never experienced before, spitting the message below (for which I shorten the path of the executed file for brevity):

signal (7): Bus error
in expression starting at /home/pin expression starting at /home/.../myfile.jl:180
in expression starting at /home/.../myfile.jl:180
double free or corruption (out)
free(): double free detected in tcache 2

signal (11): Segmentation fault

signal (6): Aborted
in expression starting at /home/.../myfile.jl:180
in expression starting at /home/.../myfile.jl:180
in expres11): Segmentation fault
in expression starting at /home/.../myfile.jl:180

signal (6): Aborted
in expression starting at /home/.../myfile.jl:180
H5FL_fac_free at /home/piotrek/.julia/artifacts/997813d46a8a06e6e9871a2a01483f91ce954eca/lib/libhdf5.so (unknown line)
unknown function (ip: 0x254908f)
Allocations: 105048431 (Pool: 105003795; Big: 44636); GC: 291

Any ideas?

This is almost as good as getting “unreachable reached” :slight_smile:

I need to look into the performance penalty of ThreadsX.

The segfault could be due to threaded access to libhdf5. Does Julia’s HDF5 API allow concurrent access? If not, and if the input and output of myFunctionl have a small serialization overhead, you can use Folds.map with DistributedEx to use multi-process (but single-threaded) parallelization.

In the meantime, ThreadsX.map seems to work just fine with Iterators.product(Xs, Ys).

@tkf, I cannot answer your question.

I tried using Folds.sum with DistributedEx, as in the official example. That works:

using Folds
using ThreadsX
using BenchmarkTools

@btime sum(1:10000)
@btime Folds.sum(1:10000)
@btime Folds.sum(1:10000, ThreadedEx())
@btime Folds.sum(1:10000, DistributedEx())

though with no performance gain:

0.024 ns (0 allocations: 0 bytes)
6.198 ÎĽs (57 allocations: 3.53 KiB)
6.281 ÎĽs (58 allocations: 3.55 KiB)
309.233 ÎĽs (243 allocations: 13.42 KiB)

Actually I don’t know how to use Folds.map with DistributedEx for the map example that I supplied with my previous comment. I can do just this:

@btime map(((x,y) for x in rand(10000), y in rand(10000))) do (x,y)
       2x+3y
end

@btime ThreadsX.map(((x,y) for x in rand(10000), y in rand(10000))) do (x,y)
       2x+3y
end

@btime Folds.map(((x,y) for x in rand(10000), y in rand(10000))) do (x,y)
       2x+3y
end

but again with no performance gain

406.065 ms (6 allocations: 763.09 MiB)
1.924 s (4317 allocations: 3.19 GiB)
1.529 s (530 allocations: 2.51 GiB)

I dont’t know how to use it properly. I get MethodError: no method matching my_function(::Tuple{Int64, Int64}) when I try

my_function(x,y) = x+y
xs=[1, 2, 3, 4, 5]
ys=[10, 20, 30, 40, 50]
XsYs=Iterators.product(xs, ys)
MyOutput = map(my_function,XsYs)

Apologies, I should have given a complete example. The issue you’re coming on is that the elements of Iterators.product(...) are tuples. You could splat them, with something like

ThreadsX.map(z->my_function(z...), Iterators.product(Xs, Ys))

or you could make a new method for my_function and then call as you were before:

my_function(xy) = my_function(xy[1], xy[2])
ThreadsX.map(my_function, Iterators.product(Xs, Ys))

I haven’t benchmarked these things and so maybe using 2D generators is better for performance, but I would guess the performance should be similar. ThreadsX/the whole transducer ecosystem seems (very impressively) capable of handling pretty complicated iterators and still making good use of however many threads you give it.

2 Likes

Thanks!

I manage to run the codes:

using ThreadsX
using BenchmarkTools
my_function(x,y) = x+y
xs=rand(1000)
ys=rand(1000)
my_Function(xy) = my_function(xy[1], xy[2])

@btime map(my_Function, Iterators.product(xs, ys))
@btime ThreadsX.map(my_Function, Iterators.product(xs, ys))
@btime ThreadsX.map(z->my_function(z...), Iterators.product(xs, ys))
@btime map(z->my_function(z...), Iterators.product(xs, ys))

but ThreadsX doesn’t seem to improve on performance:

517.969 ÎĽs (6 allocations: 7.63 MiB)
4.365 ms (3862 allocations: 52.20 MiB)
4.834 ms (3863 allocations: 52.20 MiB)
517.939 ÎĽs (6 allocations: 7.63 MiB)

If your end goal is to use functions invoking HDF5 API, you need to figure out if its API allows concurrent access. There is no other way to make it work with threading. Otherwise, use DistributedEx() by passing it to the last argument of Folds API.

If you are trying to understand when and how to use threading, see the docstring examples in Folds.jl · Folds. These docstrings “typically” contain two examples. The first one is for demonstrating the functionality with a small size problem. So, there is no benefits in using threading (e.g., sum(1:10000)). The second one is complicated enough to see some speedups with multiple threads (at least in my machine when I tried). These example may give you some ideas when/how to use threading. Typically, you need a serial computation that takes at least about 10 μs.

(But your last example with ThreadsX.map and Folds.map should produce some speedups. It’s simply a performance bug that I need to fix.)

Also, it’s better idea to share the value of Threads.nthreads() (and nprocs() if using Distributed) for sharing the parallel benchmark result.

3 Likes

I have only limited understanding of your post, but I guess GitHub - JuliaIO/JLD.jl: Saving and loading julia variables while preserving native types 's load invokes HDF5. I managed to take it our of the myFunction(x,y) that I put into map function and to add a loaded variable to the function:

z=load(mypath/myfile.jld)
myOutputs = map(((x,y) for x in Xs, y in Ys)) do (x,y)
       myFunctionl(x,y,z)
end

This works, but when I add ThreadsX., then I get a weird crash.

After I execute my code (in Atom), it spits “Internal error” boxes, though it doesn’t stop running until some time, as it prints the results that I tell the code to do. What is weird is that at some point Atom stops printing the results, it apparently keeps running, but it doesn’t print any error in the REPL, as it always does. The internal error box that I see is the following:

Screenshot from 2021-05-12 16-44-10

My Julia Client is up-to-date: 0.12.6.

I should chime back in and say that my earlier suggestion about using ThreadsX was probably more of a distraction than actual help. I just didn’t know that Folds.jl existed when I wrote that. Probably a good idea to switch over to that one.

I wouldn’t guess that that would solve the issue you’re seeing, but I’m just a little worried I’ve made an extra distraction by suggesting a different package. Sorry about that.

Hmm… Are you using display or logging like @info? Maybe Atom is not thread-aware. Since it’s from Atom’s internal, I think it’s better to try this outside Atom.

If you want to do this inside Atom, I think it’d be better to come up with an MWE and report a bug.

ThreadsX.jl still contains some functions that deos not exist in Folds.jl (e.g., sorting). So, using it is still totally fine. Though I imagine it’s a bit confusing that they have some overlapping functionalities.

1 Like

Thanks @tkf, after removing displays it finally worked!

Is this expected, or is it just Atom bug?

Yeah, it seems that Atom’s internal is not thread-aware.

1 Like

Although I can also see the performance gain in my use case of list comprehension, the significant increase in memory allocation puzzles me, which is counter-intuitive. Usually I would tend to think less memory usage indicates better performance. Do the allocation and memory usage shown make sense here?

In my understanding, these allocations are coming from internal Julia implementation of the scheduler. So yes, multithreading adds some overhead and it is inevitable. But at the same time, parallelization itself reduces overall time significantly, so it’s still a win in the end.

From my experience (if we are not talking about super optimized things like LoopVectorizations.jl) there is a lower boundary, where this overhead makes multithreading slower than single-threaded applications. But if we are talking about milliseconds or seconds as in this example, you can safely ignore these allocations.

1 Like

In my case, the size of the list varies a lot. So I am not sure if I should pursue in this ThreadsX direction…