I am processing batches of data by 10-minute time steps. Per step, my script calls an external binary (like 10 times as a sort of Monte Carlo variation) that produces data from raw files, and writes output, which is then combined in Julia, and write an output file.
My Linux machine has 30 cores. Currently, I use OhMyThreads @tasks in :greedy mode to handle the next time step. But each time step does 10 @spawns of Linux tasks, some finish in 5 minutes, others in half an hour. I can control this somewhat using the nice
command.
The issue is that the @tasks scheduler does not see that the finished @spawn tasks have freed up the CPU, so it just waits till all tasks of the time step have completed before launching new.
Is there a way to have @tasks or other scheduler know about the entire thread pool?
1 Like
One way to solve this is with a Base.Semaphore
. Let’s say you want 16 tasks to run at the same time. You can then do Base.acquire
, before launching the command line, and releasing the semaphore.
Another option is to not process the your data in 10-minute batches, but launch them all at once, setting @set ntasks = 16
inside the @tasks
macro.
In either case, you need to solve this on the task level. Query the number of available threads once to determine the maximum number of concurrent tasks to run, and then control the number of running tasks either with a Semaphore
, an atomic counter guarding when tasks are launched, or with @tasks
+ @set ntasks
2 Likes
It works perfectly with Base.Semaphore
and it was very convenient to implement it. Thanks for your suggestion, Jacob!
I tested it as follows:
function worker(p,t)
for i in 1:3
println()
print(t,":",p,":",i)
sleep(Base.rand()*2)
end
end
function sem_processing_pipeline(t, s::Base.Semaphore)
for p in 1:5
Base.acquire(s)
@spawn try
println()
println("Starting pipeline: ", p, " of time step ", t)
worker(p,t)
finally
Base.release(s)
end
end
end
function sem_time_stepper()
sem = Base.Semaphore(15)
for t in 1:10
println()
println("Processing time step: "*string(t))
sem_processing_pipeline(t,sem)
end
end
(updated to reflect the correct form indicated by @foobar_lv2)
The typical pattern has the acquire outside of the task. So
for item in iter
Base.acquire(sem)
Threads.@spawn try fun(item) finally Base.release(sem) end
end
You acquire the semaphore before spawning your task, and the spawned task itself is responsible for releasing the semaphore.
Otherwise you risk a situation where you start a million tasks immediately that then all wait on the semaphore. No need to hold all that memory at the same time and stress the scheduler so much.
(only one single task is ever waiting on the semaphore)
3 Likes
Thanks, I added the try
-finally
.
I currently use acquire for each task that spawns. Perhaps it should only be the tasks in my sem_processing_pipeline
function and not the ones in sem_time_stepper
?
update 1: It needs the acquire
also in sem_time_stepper
, without it I see the CPU usage already go down as next 3 time steps are not spawned.
update 2: Probably the ntasks
needs to be small, otherwise I will be wasting threads on just launching other tasks which don’t do any significant work.
update 3: Neither of the above updates was right. When ntasks
is small, the threads die out. One can just disable it and the acquire
s in the processing pipeline function handle the maximum number of threads to stay at 100% CPU.
update 4: I have updated the example to use the correct form as indicated by @foobar_lv2. If done poorly, it can last 40 seconds, if done right, it does it in 8.5 seconds @time
with the tasks actually lasting up to 2 seconds beyond that.
It was running the Base.semaphore
with @tasks
yesterday and it kept the thread pool active.
@tasks for c in 1:length(combinations[1])
Base.acquire(sem)
try
# the work, calling bash
finally
Base.release(sem)
end
end
However, today I rewrote it with @spawn try
and the tasks immediately end before the output is produced by the worker (bash command). Can this be fixed somehow?
@tasks for c in 1:length(combinations[1])
Base.acquire(sem)
@spawn try
# the work, calling bash
# but it skips over it.
finally
Base.release(sem)
end
end
I tried going back, and it looks like it works only with @tasks
in both the time step and the processing pipeline function, otherwise the threads die out. But that is probably the method @foobar_lv2 warned is stressing the scheduler more. Well, it’s not a million tasks, but potentially, 2880.
When you run @spawn x
, it will spawn a task running x
, and return the running task immediately. So in your latter example, it uses @tasks
(which itself spawns tasks) to run the loop, and the loop then spawns tasks using @spawn
. The @tasks
macro has no understanding that it needs to wait for any tasks other than the ones it spawned itself.
Here are two examples of how to do it:
Setup:
julia> using OhMyThreads
julia> run_shell() = (run(`sleep 2`); run(`echo hello`))
- Option 1: Using
@set ntasks
julia> @tasks for i in 1:8
@set ntasks = 4
run_shell() # note that this command waits
end
- Option 2: Using the semaphore and spawn, no
@tasks
julia> sem = Base.Semaphore(4)
for i in 1:8
Base.acquire(sem)
Threads.@spawn begin
try
run_shell()
finally
Base.release(sem)
end
end
end
The latter has two problems:
- The loop ends once the last task is spawned, not finished, and that’s probably not what you want
- If the shell command errors, the error is not handled well.
If you have only tens of thousands of tasks (or less) total to spawn, and not millions, I would recommend wrapping the above in a @sync
to handle the errors and wait for all the spawned tasks to finish:
julia> @sync begin
sem = Base.Semaphore(4)
for i in 1:8
Base.acquire(sem)
Threads.@spawn begin
try
run_shell()
finally
Base.release(sem)
end
end
end
end
1 Like
In order to wait until all work is done, you can simply do
function worker(p,t)
for i in 1:3
println()
print(t,":",p,":",i)
sleep(Base.rand()*2)
end
end
function sem_processing_pipeline(t, s::Base.Semaphore)
for p in 1:5
Base.acquire(s)
@spawn try
println()
println("Starting pipeline: ", p, " of time step ", t)
worker(p,t)
finally
Base.release(s)
end
end
end
function sem_time_stepper()
num_parallel = 15
sem = Base.Semaphore(num_parallel)
for t in 1:10
println()
println("Processing time step: "*string(t))
sem_processing_pipeline(t,sem)
end
for i=1:15
Base.acquire(num_parallel)
end
end
For this pattern to work, it is essential that you do
Base.acquire(s)
@spawn try
...
instead of
@spawn try
Base.acquire(s)
...
because otherwise you may miss some jobs / produce a race.
But that is probably the method @foobar_lv2 warned is stressing the scheduler more. Well, it’s not a million tasks, but potentially, 2880.
No, 3k tasks are nothing to worry about; 1M tasks work perfectly fine but ain’t cheap to schedule. Consider
julia> function foo(n)
@sync for i=1:n
Threads.@spawn 1+1
end
end
julia> foo(1);
julia> @timev foo(1000_000)
0.909401 seconds (4.00 M allocations: 322.595 MiB, 14.74% gc time)
elapsed time (ns): 9.09400887e8
gc time (ns): 134035862
bytes allocated: 338264992
pool allocs: 4000025
non-pool GC allocs: 0
malloc() calls: 10
free() calls: 194
minor collections: 4
full collections: 2
Thanks, forgot that @sync
and to remove @tasks
which was left after copy & paste.
This is the structure I am using.
function worker(p,t)
for i in 1:3
sleep(Base.rand()*2)
end
end
function sem_processing_pipeline(t, s::Base.Semaphore)
@sync for p in 1:5
Base.acquire(s)
@spawn begin #best
try
println("Starting pipeline: ", p, " of time step ", t)
worker(p,t)
finally
Base.release(sem)
end
end
end
end
function sem_time_stepper()
sem = Base.Semaphore(15)
@tasks for t in 1:10
@set scheduler = :greedy # optional
println("Processing time step: "*string(t))
sem_processing_pipeline(t,sem)
end
end
There needs to be @tasks
in the time step loop function. Multi-threading this makes sure that there is new supply of tasks when those within one time step are running out, and the ntasks
and :greedy
options control how sequentially it proceeds.
The docs also has another suggestion, Base.acquire(sem) do
. When I tried it briefly, it gave me some error that the number of acquires and releases are mismatched, and it does not seem to be needed, as the above works.