I’m trying to use Threads.@spawn to execute a function in parallel. According to the docs and common sense, I expected it to run Threads.nthreads() spawned functions simultaneously, and in my case nthreads() == 4.
However, this is not what seems to actually happen: @spawn starts way more threads than 4. The case I originally encountered is the following:
for i in 1:50
Threads.@spawn begin
@info (i, "start")
# some CLI program that performs long computations:
run(`bash -c 'for i in {1..300000}; do echo \$i > /dev/null; done'`)
@info (i, "end")
end
end
This immediately overloads my computer with 50 busy processes, instead of keeping 4 of them running at any given time as I expected.
Basically the same happens in this pure-julia case:
for i in 1:50
Threads.@spawn begin
@info (i, "start")
sleep(1)
@info (i, "end")
end
end
which prints all 50 "start"s immediately, waits for one second, and prints all "end"s.
Could you please clarify what’s the issue here? If that’s relevant, @threads for instead of @spawn uses 4 threads as it should, but my actual function calls may take very different time to complete - that’s why I need @spawn.
Threads.@spawn creates a task that will run on any “available” thread. So in the pure Julia case as soon as the tasks calls sleep that thread is available to run another task. In the run case you get the same situation. The run has executed as a child process and Julia goes looking for the next operation to perform.
The only time a thread isn’t available is when it’s doing CPU operations i.e. math. As soon as it tries to access the disk/network/terminal some sort of wait/sleep is invoked and another task can start using the CPU on that thread.
I’m not sure what your ultimate goal is but in this scenario you probably want to use:
Threads.@threads for i in 1:50
....
end
That will ensure that only nthreads() operations are executing at a time.
Thanks for the explanation of how @spawn works! I didn’t see it anywhere in the docs that only computational operations in pure julia make a thread “unavailable”.
As I mentioned in the first post, @threads indeed uses 4 threads as expected. However, it doesn’t work effectively when iterations take very different time: @threads allocated iterations to threads at the very beginning.
If your loop does not have to respect the ordering, Threads.foreach is a good API. It will not be available until Julia 1.6 but it’s easy to copy-and-paste the definition to your project.
Threads.foreach is good especially if each iteration is somewhat long (say more than a few hundreds of microseconds) and/or contain some I/O. If you have purely CPU-bound loops and if you want to load-balancing, https://github.com/tkf/ThreadsX.jl may be useful (disclaimer: my package).
tasks = [ 1, 2, 3, 4, 5, 6 ]
protect = ReentrantLock()
for i in 1:nthreads()
Threads.spawn begin
lock(protect)
while length(tasks) > 0
task = tasks[end]
resize!(tasks, length(tasks) - 1)
unlock(protect)
run(`bash -c 'for i in {1..300000}; do echo \$i > /dev/null; done'`)
lock(protect)
end
unlock(protect)
end
end
That will ensure that only nthreads() operations are done at the same time. The lock/unlocks are kind of weird but you need to have protected locked for the length(tasks) > 0 test.
I wrote that without testing, hopefully it’s basically correct.
Lock is a very low-level threading construct. I don’t think you need it most of the time (unless you are writing super performance-sensitive concurrent program all the time). I believe it should be the last thing you recommend in Discourse. Channel is a better solution here.
Then again, if you’re spawning an entire shell in the middle, the lock has trivial overhead by comparison, so . If the concurrent work to be done is starting processes, you don’t need threads, you just need tasks (which are concurrent rather than actually parallel).
Just in case it’s not clear, I’m not recommending Channel because it’s faster than locks. Channel uses locks inside, after all. I’m recommending Channel because the user code would be easier to write, read, and maintain.
This is exactly why I’m recommending Threads.foreach(f, channel) than Threads.@threads. Unfortunately, we have asyncmap but not asyncforeach.
This shouldn’t be using threads at all. Just use @sync and @async:
@sync for i in 1:50
@async begin
@info (i, "start")
# some CLI program that performs long computations:
run(`bash -c 'for i in {1..300000}; do echo \$i > /dev/null; done'`)
@info (i, "end")
end
end
Of course, that doesn’t solve the throttling issue, but there’s no reason for threads.
Indeed, I understand that run(...) can effectively be started in parallel with just @async from a single Julia thread. The main reason I wanted to use the multithreading machinery is composability. For example, there is a function performing several computations in parallel like this:
function f(x)
@spawn ...
@spawn ...
for i in 1:10
@spawn ...
end
end
When I need to execute f() for multiple parameter values, it can be done easily and efficiently:
for x in 1:10
@spawn f(x)
end
This works fine if ellipses ... above are replaced with pure Julia computations: all nested parts run simultaneously as far as possible. However, if some of the ellipses mean run(`long computation`), suddenly I cannot easily use composable multithreading. As far as I understand, there is no similar composable way to use @async/@sync for this, so that no more than nthreads() computations are run simultaneously.
As I guess has been explained here, spawning tasks does not in any way limit concurrency—on the contrary, it allows expressing an arbitrary amount of potential concurrency, letting the scheduler decide how to map work onto actual threads, of which there are a fixed number. When you start external processes, however, there’s no work being done in Julia and there’s no way for Julia’s scheduler to have any idea if they’re going to be using up a CPU core or just sitting there waiting for I/O or sleeping or whatever. From Julia’s perspective running an external process is just blocking and waiting for the kernel to say “this process is done”. You’d need some centralized system-wide work scheduler for that, like Grand Central Dispatch. Unfortunately, that’s neither portable nor compatible with Julia high performance approach to threading. Anyway, the short version is that there’s no way for Julia to know that you want to throttle these external tasks, so you have to explicitly express that somehow. The Threads.foreach construct is one way but there are others. You don’t need threads for this at all, so one version is to write a work semaphore and spawn tasks that wait for a slot to be available.
Yes, thanks for these explanations I have a better mental model of how threads work in Julia! Basically, performing blocking operations like IO doesn’t make the thread “unavailable” and other operations can still be scheduled there.
Using a stateful iterator it is not difficult to spawn tasks in batches…
using Base.Threads
let
item_list = 1:35
n_tasks = 5
# stateful iterator
ch = Channel() do _ch
for it in item_list
put!(_ch, it)
end
end
# spawn in batches
while isopen(ch)
spawned = 0
println("new batch ---- ")
@sync for it in ch
@spawn begin
@show it
sleep(1)
end
spawned += 1
spawned < n_tasks || break
end
end
end