Limit number of @spawn'ed threads

I’m trying to use Threads.@spawn to execute a function in parallel. According to the docs and common sense, I expected it to run Threads.nthreads() spawned functions simultaneously, and in my case nthreads() == 4.

However, this is not what seems to actually happen: @spawn starts way more threads than 4. The case I originally encountered is the following:

for i in 1:50
    Threads.@spawn begin
        @info (i, "start")
        # some CLI program that performs long computations:
        run(`bash -c 'for i in {1..300000}; do echo \$i > /dev/null; done'`)
        @info (i, "end")
    end
end

This immediately overloads my computer with 50 busy processes, instead of keeping 4 of them running at any given time as I expected.

Basically the same happens in this pure-julia case:

for i in 1:50
    Threads.@spawn begin
        @info (i, "start")
        sleep(1)
        @info (i, "end")
    end
end

which prints all 50 "start"s immediately, waits for one second, and prints all "end"s.

Could you please clarify what’s the issue here? If that’s relevant, @threads for instead of @spawn uses 4 threads as it should, but my actual function calls may take very different time to complete - that’s why I need @spawn.

2 Likes

Threads.@spawn creates a task that will run on any “available” thread. So in the pure Julia case as soon as the tasks calls sleep that thread is available to run another task. In the run case you get the same situation. The run has executed as a child process and Julia goes looking for the next operation to perform.

The only time a thread isn’t available is when it’s doing CPU operations i.e. math. As soon as it tries to access the disk/network/terminal some sort of wait/sleep is invoked and another task can start using the CPU on that thread.

I’m not sure what your ultimate goal is but in this scenario you probably want to use:

Threads.@threads for i in 1:50
     ....
end

That will ensure that only nthreads() operations are executing at a time.

Thanks for the explanation of how @spawn works! I didn’t see it anywhere in the docs that only computational operations in pure julia make a thread “unavailable”.

As I mentioned in the first post, @threads indeed uses 4 threads as expected. However, it doesn’t work effectively when iterations take very different time: @threads allocated iterations to threads at the very beginning.

If your loop does not have to respect the ordering, Threads.foreach is a good API. It will not be available until Julia 1.6 but it’s easy to copy-and-paste the definition to your project.

Threads.foreach is good especially if each iteration is somewhat long (say more than a few hundreds of microseconds) and/or contain some I/O. If you have purely CPU-bound loops and if you want to load-balancing, https://github.com/tkf/ThreadsX.jl may be useful (disclaimer: my package).

3 Likes

You could do something like:

tasks = [ 1, 2, 3, 4, 5, 6 ]
protect = ReentrantLock()
for i in 1:nthreads()
   Threads.spawn begin
        lock(protect)
        while length(tasks) > 0
             task = tasks[end]
             resize!(tasks, length(tasks) - 1)
             unlock(protect)
             run(`bash -c 'for i in {1..300000}; do echo \$i > /dev/null; done'`)
             lock(protect)
        end
        unlock(protect)
    end
end

That will ensure that only nthreads() operations are done at the same time. The lock/unlocks are kind of weird but you need to have protected locked for the length(tasks) > 0 test.

I wrote that without testing, hopefully it’s basically correct.

Lock is a very low-level threading construct. I don’t think you need it most of the time (unless you are writing super performance-sensitive concurrent program all the time). I believe it should be the last thing you recommend in Discourse. Channel is a better solution here.

Then again, if you’re spawning an entire shell in the middle, the lock has trivial overhead by comparison, so :man_shrugging:. If the concurrent work to be done is starting processes, you don’t need threads, you just need tasks (which are concurrent rather than actually parallel).

1 Like

Just in case it’s not clear, I’m not recommending Channel because it’s faster than locks. Channel uses locks inside, after all. I’m recommending Channel because the user code would be easier to write, read, and maintain.

This is exactly why I’m recommending Threads.foreach(f, channel) than Threads.@threads. Unfortunately, we have asyncmap but not asyncforeach.

This shouldn’t be using threads at all. Just use @sync and @async:

@sync for i in 1:50
    @async begin
        @info (i, "start")
        # some CLI program that performs long computations:
        run(`bash -c 'for i in {1..300000}; do echo \$i > /dev/null; done'`)
        @info (i, "end")
    end
end

Of course, that doesn’t solve the throttling issue, but there’s no reason for threads.

That’s true. But is there a reason to not use threads? Why avoid threads when you already have an API that solves the throttling issue?

Indeed, I understand that run(...) can effectively be started in parallel with just @async from a single Julia thread. The main reason I wanted to use the multithreading machinery is composability. For example, there is a function performing several computations in parallel like this:

function f(x)
    @spawn ...
    @spawn ...
    for i in 1:10
        @spawn ...
    end
end

When I need to execute f() for multiple parameter values, it can be done easily and efficiently:

for x in 1:10
    @spawn f(x)
end

This works fine if ellipses ... above are replaced with pure Julia computations: all nested parts run simultaneously as far as possible. However, if some of the ellipses mean run(`long computation`), suddenly I cannot easily use composable multithreading. As far as I understand, there is no similar composable way to use @async/@sync for this, so that no more than nthreads() computations are run simultaneously.

As I guess has been explained here, spawning tasks does not in any way limit concurrency—on the contrary, it allows expressing an arbitrary amount of potential concurrency, letting the scheduler decide how to map work onto actual threads, of which there are a fixed number. When you start external processes, however, there’s no work being done in Julia and there’s no way for Julia’s scheduler to have any idea if they’re going to be using up a CPU core or just sitting there waiting for I/O or sleeping or whatever. From Julia’s perspective running an external process is just blocking and waiting for the kernel to say “this process is done”. You’d need some centralized system-wide work scheduler for that, like Grand Central Dispatch. Unfortunately, that’s neither portable nor compatible with Julia high performance approach to threading. Anyway, the short version is that there’s no way for Julia to know that you want to throttle these external tasks, so you have to explicitly express that somehow. The Threads.foreach construct is one way but there are others. You don’t need threads for this at all, so one version is to write a work semaphore and spawn tasks that wait for a slot to be available.

1 Like

Yes, thanks for these explanations I have a better mental model of how threads work in Julia! Basically, performing blocking operations like IO doesn’t make the thread “unavailable” and other operations can still be scheduled there.

1 Like

Using a stateful iterator it is not difficult to spawn tasks in batches…

using Base.Threads
let
    item_list = 1:35
    n_tasks = 5

    # stateful iterator
    ch = Channel() do _ch
        for it in item_list
            put!(_ch, it)
        end
    end
    # spawn in batches
    while isopen(ch)
        spawned = 0
        println("new batch ---- ")
        @sync for it in ch
            @spawn begin
                @show it
                sleep(1)
            end
            spawned += 1
            spawned < n_tasks || break
        end
    end
end