[ANN] ThreadPools.jl v1.0.0 - Improved thread management for background and nonuniform tasks

All,

I’m happy to announce a (breaking) API upgrade to ThreadPools.jl to better separate choices around handling background tasks and handling nonuniform tasks. Many thanks to @tkf and @ianshmean and many others who contributed to the ideas.

Overview

ThreadPools.jl is a simple package for thread parallelization that exposes a few macros and functions that mimic Base.Threads.@threads , Base.map , and Base.foreach. These macros/functions handle cases that the built-in functions are not always well-suited for:

  • Groups of tasks that the user wants to keep off of the primary thread
  • Groups of tasks that are very nonuniform in duration

For the first case, ThreadPools exposes a @bthreads (“background threads”) macro that behaves identically to Threads.@threads , but keeps the primary thread job-free. For the second case, the package exposes a @qthreads (“queued threads”) macro. This macro queues up the jobs before thread assignment, only starting a new job when a job on any previous thread has finished. The background version of the same is the @qbthreads (“queued background threads”) macro. Versions of map and foreach with the same prefixes are also available:

Foreground (primary allowed) Background (primary forbidden)
Uniform tasks
  • Base.Threads.@threads
  • ThreadPools.pmap(fn, itrs)
  • ThreadPools.pforeach(fn, itrs)
  • ThreadPools.@bthreads
  • ThreadPools.bmap(fn, itrs)
  • ThreadPools.bforeach(fn, itrs)
Nonuniform tasks
  • ThreadPools.@qthreads
  • ThreadPools.qmap(fn, itrs)
  • ThreadPools.qforeach(fn, itrs)
  • ThreadPools.@qbthreads
  • ThreadPools.qbmap(fn, itrs)
  • ThreadPools.qbforeach(fn, itrs)

The package also exposes a lower-level @pspawnat macro that mimics the Base.Threads.@spawn macro, but allows direct thread assignment for users who want to develop their own scheduling.

Logging

There are also logging versions of each of the above:

julia> using Plots

pool = logpforeach(x -> sleep(0.01*x), 1:64);

plot(pool)

Figure_1

pool = logqforeach(x -> sleep(0.01*x), 1:64);

plot(pool)

Figure_2

Note that the above examples demonstrate the two scheduling strategies far better than I can describe. :wink:

Composing

There may be times where the default macros don’t quite do what you want. For those cases, a composable API is exposed. For example, perhaps we want to limit usage to only two threads off of the primary:

julia> pwith(ThreadPools.QueuePool(2,2)) do pool
         pforeach(pool, x -> println((x,Threads.threadid())), 1:8)
       end;
(2, 2)
(1, 3)
(3, 2)
(4, 3)
(5, 2)
(6, 3)
(7, 2)
(8, 3)

Here, only threads 2 and 3 are used (in this case, on an 8-thread machine).

Anyway, thanks for all the early-adopter inputs, and I hope that some find the package useful. Feel free to submit issues at ThreadPools.jl if you find problems or have requests. Cheers, all!

16 Likes

This is great. I had a need for a thread-specific spawn-like function, so really appreciate the addition of @pspawnat

julia> using ThreadPools
julia> t = @pspawnat 4 Threads.threadid()
Task (runnable) @0x0000000010743c70

julia> fetch(t)
4

@oxinabox just pointed out (correctly) that pmap overloads the standard lib Distributed.pmap, which I’d really prefer not to do. Sigh. I may have to change those functions to tmap and tforeach.

Congratulations for v1 release. The logging plots are beautiful!

Also, I can’t help but thinking to support Distributed+Base.Threads pool on top of this interface. It would be amazing if we can use multiple threads of multiple machines by just pmap(fn, DistributedThreadPool(), itr)

Re:

I think it may be better to remove the restriction ::Function from pforeach(fn::Function, pool, itr) and deprecating pforeach(pool, fn::Function, itr). Please see Remove ::Function restrictions? · Issue #6 · tro3/ThreadPools.jl · GitHub. (I’m sorry to start nitpicking the API right off the bat…)

@tkf, no problem. The point is to make this thing useful, and you all have way more context on how to do that than I do. :slight_smile:

1 Like
(v1.3) pkg> add ThreadPools
 Resolving package versions...
 Installed ThreadPools ─ v0.2.0
  Updating `~/.julia/environments/v1.3/Project.toml`
  [b189fb0b] + ThreadPools v0.2.0
  Updating `~/.julia/environments/v1.3/Manifest.toml`
  [b189fb0b] + ThreadPools v0.2.0

Well, I guess 1.0.0 is not out yet.

I spoke too soon!

(v1.3) pkg> up
  Resolving package versions...
 Installed ThreadPools ─ v1.0.0
  Updating `~/.julia/environments/v1.3/Project.toml`
  [b189fb0b] ↑ ThreadPools v0.2.0 ⇒ v1.0.0
  Updating `~/.julia/environments/v1.3/Manifest.toml`
  [b189fb0b] ↑ ThreadPools v0.2.0 ⇒ v1.0.0

@StevenSiew is it working okay for you now?

IIRC, Distributed.pmap does load balancing while your ThreadPools.pmap does not. Another reason why it should perhaps have a different name.

Otherwise, great work! Some of this, like for example @pspawnat, should go into Base at some point, IMHO.

3 Likes

Completely agreed. I was going to go with the “use tmap and deprecate pmap”, suggestion in 1.0.1. Then the question becomes whether to propagate the prefix to tforeach and @tspawnat as well, for consistency.

Thanks, and agreed (in that order). :slight_smile:

1 Like

@tro3 , Is it recommended to use ThreadPools with I/O heavy tasks, for something like below ?

function simulate_api_call()
     println("API call started with Thread : $(Threads.threadid())")

     # Simulate a long-running API call
     # response_task = @async HTTP.get("https://randomuser.me/api/?results=50000")

     response_task = Threads.@spawn HTTP.get("https://randomuser.me/api/?results=50000")

     println("API call completed with Thread : $(Threads.threadid())")

     return response_task
 end

function test_spawn_3()
    results = Vector{Task}(undef, Threads.nthreads() - 1)

    ThreadPools.logqbforeach(1:Threads.nthreads() - 1) do i
        results[i] = simulate_api_call()
    end

    responses = fetch.(results)

    return responses
end