Asynchronous tasks vs multi-threading

Parallel is a section of Julia’s doc containing the following 2 subsections

  1. Asynchronous “tasks”, or coroutines
  2. Multi-threading

I notice that setting JULIA_NUM_THREADS to >1 (proper physical multi-threading) is only required in the 2nd subsection above. I’m not sure if this indicates that setting up JULIA_NUM_THREADS is not required if you use @spawn etc. following the spirit of the 1st subsection?

I also notice setting JULIA_NUM_THREADS is not mentioned in this blog


Does this confirm my idea?

My second question

Given that asynchronous programming (using @spawn etc.) is listed as the 1st subsection, does it imply that this class of methods are more powerful than the 2nd subsection (using Threads.@threads for etc.)?

You can indeed use @spawn with JULIA_NUM_THREADS=1, but your Tasks will just not run in parallel, i.e. the processor will only be busy with one task at a time. Asynchronous (but not really parallel) programming is mainly useful if the tasks stall to some extent, allowing us to work on another task in the mean time. I.e. you won’t see any benefits for compute-heavy tasks.

Well, the first line in the section How to use it is

To use Julia with multiple threads, set the JULIA_NUM_THREADS environment variable:

The blog post is just structured to first grab your attention with the nice example, and only later explains the ‘boring’ set up.

@spawn is also used for proper parallelism. It’s just that for a single thread this devolves to the asynchronous programming from the first section. But @spawn and using Tasks directly (probably best with sticky set to false, as in @spawn) is indeed more powerful (flexible) than @threads.

1 Like

My point was to say, that was a bit far away. According to my screenshot (especially the content that I underline with red color), I thought it should be self-contained.

I can’t understand the following

julia> Threads.nthreads()
1

julia> import Base.Threads.@spawn

julia> function test()
           t0 = time() # start a timer
           T = @spawn sleep(1) # the minor work
           sleep(1.5) # The major work
           istaskdone(T) && println(" The minor work was executed ALONG WITH the major work")
           println("t_elapsed = $(time() - t0)")
       end
test (generic function with 1 method)

julia> test()
 The minor work was executed ALONG WITH the major work
t_elapsed = 1.5120000839233398

Given there is only 1 thread that julia can rely on, why sleep(1) and sleep(1.5) can be executed in parallel?

Because sleep does not involve any work by the CPU. The tasks here do not compete for some shared resource, so there’s nothing preventing both of them to do what they have to (i.e. nothing in this case) “at the same time”.

This is what @eldee explained here:

See also the distinction between concurrency and parallelism. You’ll find numerous discussions about this on the web, for example :

3 Likes

I devise a new instance

julia> Threads.nthreads()
2

julia> import Base.Threads.@spawn

julia> function mysum(N) # N = 31 takes 2 s; N = 32 takes 4 s
           t0 = time()
           s = -1e16
           println("N = $N, s is reset!")
           for i = 1:(2^N)
               s += i
           end
           println("N = $N, After $(time() - t0) seconds, s = $s")
       end;

julia> function test()
           t0 = time() # start a timer
           T = @spawn mysum(31) # the minor work
           mysum(32) # The major work
           str = istaskdone(T) ? "finished" : "not finished yet"
           println("the minor task is $str")
           println("t_elapsed = $(time() - t0)")
       end;

julia> test()
N = 32, s is reset!
N = 31, s is reset!
N = 32, After 4.295000076293945 seconds, s = 9.213372036930064e18
the minor task is not finished yet
t_elapsed = 4.295000076293945

julia> N = 31, After 6.375 seconds, s = 2.2958430092889073e18
julia> 

Although I made use of 2 threads, the result is unexpected in that the minor work was not executed along with the major work.

I suspect the unintuitive behaviour is due to the printlns (IO thread), though I’m not sure of the details:

julia> using .Threads; nthreads()
2

julia> function mysum_noprints(N)
           t0 = time()  # Note that you can also use @time or @timed
           s = -1e16
           for i = 1:(2^N)
               s += i
           end
           return s, time() - t0
       end;

julia> function test()
           t0 = time() # start a timer
           T = @spawn mysum_noprints(31) # the minor work
           s_major, t_major = mysum_noprints(32) # The major work
           s_minor, t_minor = fetch(T)
           println("Total elapsed time = $(time() - t0)")
           println("\t t_minor = $t_minor, t_major = $t_major")
           println("\t (s_minor = $s_minor, s_major = $s_major)")
       end;

julia> test()
Total elapsed time = 4.1540000438690186
         t_minor = 2.0870001316070557, t_major = 4.1540000438690186
         (s_minor = 2.2958430092889073e18, s_major = 9.213372036930064e18)

Edit: Note that if you run julia with two threads via julia -t 2 you get 0 interactive threads:

(...)>julia -t 2
(...)

julia> Threads.nthreads()
2

julia> Threads.nthreads(:interactive)
0

julia> Threads.nthreads(:default)
2

But also if I explicitly request a separate interactive thread via julia -t 2,1 I still observe the same strange behaviour for your original (println) code.

Edit2: @threads works fine, though, also for julia -t 2:

julia> function test()
           t0 = time() # start a timer
           Threads.@threads for N = (31, 32)
               mysum(N)
           end
           println("t_elapsed = $(time() - t0)")
       end;

julia> test()
N = 31, s is reset!
N = 32, s is reset!
N = 31, After 2.069999933242798 seconds, s = 2.2958430092889073e18
N = 32, After 4.13700008392334 seconds, s = 9.213372036930064e18
t_elapsed = 4.151000022888184

But the ‘equivalent’ @spawn code does not:

julia> function test()
           t0 = time() # start a timer
           tasks = map((31, 32)) do N
               Threads.@spawn mysum($N)
           end
           wait.(tasks)
           println("t_elapsed = $(time() - t0)")
    end;

julia> test()
N = 31, s is reset!
N = 32, s is reset!
N = 31, After 2.061000108718872 seconds, s = 2.2958430092889073e18
N = 32, After 6.184999942779541 seconds, s = 9.213372036930064e18
t_elapsed = 6.189000129699707

So I guess

might deserve more nuance.

2 Likes

All the asynchronous things in julia use the same basic methods. That is, they create a Task,
set the sticky bit in the task to false (if any thread can be used) or to true (if the Task must run on the current thread), and delivers the task to the scheduler for running when there are idle threads.

Threads.@spawn puts its argument into a Task, sets sticky = false and hands it over to the scheduler. You’ll have to manually collect the tasks when they’re finished, with a wait, fetch or @sync.

The Threads.@threads for i in 1:n macro just creates a number of Tasks, and set them up to run different parts of the interval 1:n once they are scheduled on a thread (with sticky = false). Waiting for the tasks is done automatically.

The @async macro sets sticky = true, which means the scheduler will only run it on the current thread, once it’s idle (if you wait for input or similar). @async is discouraged because it also means that the task which uses it will be locked to the current thread.

All of this will work even with a single thread, but then only a single task will run at any point in time (though another task may start running if the running task goes to sleep, waits for io, triggers a garbage collection, etc.).

2 Likes

Very nice conclusion.

But I still find it nontrivial to understand the behavior of @spawn. I execute the follows in REPL (I don’t know whether there would be a big difference if @spawn were enclosed in functions, and I’m not aware of its underlying theory).

Threads.nthreads() # I start with 4 threads
import Base.Threads.@spawn
function mysum(N) # N = 32 takes ≈ 4 s
    t0 = time()
    s = -1e16
    for i = 1:(2^N)
        s += i
    end
    return N, s, time()-t0
end;

# [Step 1] copy-paste the entire following block to REPL and observe the t_elapsed
t0 = time(); 
task1 = @spawn mysum(32);
task2 = @spawn mysum(32);
task3 = @spawn mysum(32);
@info "t_elapsed is $(time() - t0)" # my inference: 0.13s ⇒ not congested here

# (Maybe there should be some "cooling-down" time---say 5s---between executing Steps? I'm unsure)

# [Step 2] copy-paste the entire following block to REPL and observe the t_elapsed
t0 = time(); 
task1 = @spawn mysum(32);
task2 = @spawn mysum(32);
task3 = @spawn mysum(32);
task4 = @spawn mysum(32);
@info "t_elapsed is $(time() - t0)" # my inference: 4.63s ⇒ congested here

# [Step 3] copy-paste the entire following block to REPL and observe the t_elapsed
N = 3;
task_vec = Vector{Task}(undef, N);
t0 = time(); for i = 1:N
    task_vec[i] = @spawn mysum(32)
end;
@info "t_elapsed for N = $N is $(time() - t0)" # my inference: 0.026s ⇒ not congested here

# [Step 4] copy-paste the entire following block to REPL and observe the t_elapsed
N = 4;
task_vec = Vector{Task}(undef, N);
t0 = time(); for i = 1:N
    task_vec[i] = @spawn mysum(32)
end;
@info "t_elapsed for N = $N is $(time() - t0)" # my inference: 4.58s ⇒ congested here

# [Step 5] copy-paste the entire following block to REPL and observe the t_elapsed
N = 4;
task_vec = Vector{Task}(undef, N);
t0 = time(); for i = 1:N
    task_vec[i] = @spawn mysum(32)
end; @info "t_elapsed for N = $N is $(time() - t0)" # my inference: 0.111s ⇒ not congested here

# [Step 6] copy-paste the entire following block to REPL and observe the t_elapsed
N = 100;
task_vec = Vector{Task}(undef, N);
t0 = time(); for i = 1:N
    task_vec[i] = @spawn mysum(32)
end; @info "t_elapsed for N = $N is $(time() - t0)" # my inference: 0.113s ⇒ purely @spawn is fast

The difference between Step 1 and Step 2 might be due to @info needs a thread also (according to @eldee).

Edit: Yes, it makes some sense if @info needs a thread to be executed.
Edit: Yes, we need some “cooling-down” time if we use @spawn. This is because maybe there are some tasks still occupying CPU threads behind the scenes. This is very unlike our daily ordinary blocking-programming case.
Edit: If I were starting julia with 5 threads, then by start contrast, I cannot observe any congestion from Step 1 to Step 5. My chip is an 11-th Gen Intel core CPU with 4 cores and 8 logical processors. I find it usable and make sense to set JULIA_NUM_THREADS = 8.

I did a test to compare Threads.@threads for method and @spawn method.
Although they have similar performance, the parallel performance is not very satisfactory (expected 10 seconds, get 50 seconds)

Threads.nthreads() # in this test I use 8 threads
import Base.Threads.@spawn
function CPU_intensive_work(N) 
    s = collect(1:99999)
    for i = 1:(10000N)
        s = circshift(s, -3)
    end
    s
end

CPU_intensive_work(10); # do a first trial run to facilitate testing

(t = time();
CPU_intensive_work(10);
time() -t) # 11.19 seconds

t = time(); Threads.@threads for i = 1:8
    CPU_intensive_work(10)
end; time() - t # 50.70 seconds

t = time();
(@spawn CPU_intensive_work(10);
@spawn CPU_intensive_work(10);
@spawn CPU_intensive_work(10);
@spawn CPU_intensive_work(10);
@spawn CPU_intensive_work(10);
@spawn CPU_intensive_work(10);
@spawn CPU_intensive_work(10);
@spawn CPU_intensive_work(10));
time() - t # 50.28 seconds

The above result was run under the condition JULIA_THREADS_NUM = 8. I reset it to 4, yielding the following result (omit repetition code):

(t = time();
CPU_intensive_work(10);
time() -t) # 10 seconds

t = time(); Threads.@threads for i = 1:4
    CPU_intensive_work(10)
end; time() - t # 20.11 seconds

t = time();
(@spawn CPU_intensive_work(10);
@spawn CPU_intensive_work(10);
@spawn CPU_intensive_work(10);
@spawn CPU_intensive_work(10));
time() - t # 18.92 seconds

PS I recall that when I aim to buy a smartphone, the ad was telling me about the single-core and multi-core performance. At that time I cannot fully appreciate the meaning of “multi-core performance”. I guess maybe this is a proper example that explain things :smiling_face_with_tear:: With 4 threads, the performance is halved compared to single-core. With 8 threads, the performance shrinks to the 20% of the single-core. (right?)

There are several things with this setup.

When using @spawn you should wait until the tasks are completed. I.e. a wait or fetch must be performed before you measure the time. Otherwise you’re only measuring the time it takes to @spawn.

The other thing is more subtle. When running interactively in this fashion it may happen that the spawned task is running on the same thread as the REPL. Thus, you will not get back the REPL prompt until the spawned task is complete, effectively running the tasks serially. This is the “congestion” you observe.

You can avoid this by starting julia with an “interactive” thread. I.e. start with julia -t 4,1 (JULIA_NUM_THREADS=4,1).

t0 = time(); 
task1 = @spawn mysum(32);
task2 = @spawn mysum(32);
task3 = @spawn mysum(32);
wait.((task1, task2, task3))
@info "t_elapsed is $(time() - t0)" 

# [Step 2] copy-paste the entire following block to REPL and observe the t_elapsed
t0 = time(); 
task1 = @spawn mysum(32);
task2 = @spawn mysum(32);
task3 = @spawn mysum(32);
task4 = @spawn mysum(32);
wait.((task1, task2, task3, task4))
@info "t_elapsed is $(time() - t0)"

This waiting can also be avoided by enclosing the block with the @spawns in a @sync block:

N = 3;
t0 = time();
@sync for i = 1:N
    @spawn mysum(32)
end;
@info "t_elapsed for N = $N is $(time() - t0)"

For your other experiment:

function CPU_intensive_work(N) 
    s = collect(1:99999)
    for i = 1:(10000N)
        s = circshift(s, -3)
    end
    s
end

This is not CPU-intensive. Rather, it’s heavy in allocations:

julia> @time CPU_intensive_work(10);
 10.083843 seconds (300.00 k allocations: 74.513 GiB, 14.06% gc time)

The reason is that circshift(s, -3) creates a new vector. Allocation in itself does not take much time, but the garbage collection does. It’s devastating for parallel tasks. Most of the allocations can be avoided by doing it in-place:

function CPU_intensive_work(N) 
    s = collect(1:99999)
    for i = 1:(10000N)
        circshift!(s, -3)
    end
end
julia> @time CPU_intensive_work(10);
  5.150725 seconds (3 allocations: 781.320 KiB)

In general it’s better to use @time than doing the timing yourself, you then get information on allocations, and time spent in gc, compilation and locks. Better yet is to use @btime or @benchmark from the package BenchmarkTools.jl, or @b or @be from package Chairmarks.jl. These macros will run the benchmarks several times and report the average.

Overall, it’s in general a good idea to enclose things in functions. There are two reasons for this. Julia’s unit of compilation is the function. That is, functions are compiled to machine code. REPL commands may not be, but are run in an interpreter.
The other reason is that variables you create in the REPL are global. When non-const global variables are used, the compiler can’t make any assumption about their type or value, resulting in slow code.

3 Likes

Thanks, very instructive and clear explanation!

I wonder how can I achieve the logic in the if-end block below

function intensive_work(parameter)
    # some body depending on `parameter`
    # therefore the execution time of this body is unpredictable
    return some_value
end
function main()
    p = [1, 2, 3] # a vector of parameters, each indicating a subproblem
    task_vec = similar(p, Task)
    for (i, parameter) = enumerate(p)
        task_vec[i] = @spawn intensive_work(parameter)
    end
    if any_one_of_the_task_is_finished # [How to?]
        fetch_that_returned_value # [How to?]
        return # early break
    end
end
main()

This is one of things I miss in Julia, a waitany([array of tasks]) followed by a loop with istaskdone. Or a fetchany([array of tasks]).

What you can do is to use a Channel:

function intensive_work(parameter, channel, stopit)
    # some body depending on `parameter`
    # therefore the execution time of this body is unpredictable
    # do a regular stopit[] && return  to finish when told to.
    put!(channel, some_value)
    return
end

function main()
    p = [1, 2, 3] # a vector of parameters, each indicating a subproblem
    task_vec = similar(p, Task)
    channel = Channel{Int}()   # Int, or whatever is the return type, default is Any
    stopit = Threads.Atomic{Bool}(false)
    for (i, parameter) = enumerate(p)
        task_vec[i] = @spawn intensive_work(parameter, channel, stopit)
    end
    returned_value = take!(channel)
    stopit[] = true  # tell every task to stop, otherwise they'll run to completion
    foreach(wait, task_vec)  # wait for them
    return returned_value
end

There is no general safe way to stop tasks other than to tell them to finish. The stopit[] = true can instead be done inside the intensive_work. E.g. as

Threads.atomic_or!(stopit, true) || put!(channel, some_value)

to ensure only a single task writes to the channel (atomic_or! returns the previous value).

The type of the Channel, the Int in the example can be dropped, but then the compiler doesn’t know what type returned_value is, leading to “type instability” and possibly bad performance.

2 Likes

While I think it’s important to understand how julia’s tasks and threads work, synchronization primitives like lock/unlock/Base.Semaphore/Event/Condition/notify etc. there’s also a package OhMyThreads.jl which automates a lot of the common patterns in concurrent programming in julia.

1 Like

waitany and waitall have been added in 1.12 FWIW.

2 Likes
Warning
This may scale poorly compared to writing code that 
uses multiple individual tasks that each runs serially, 
since this needs to scan the list of tasks each time and 
synchronize with each one every time this is called. 
Or consider using waitall(tasks; failfast=true) instead.

I think it just dissuade people from using it.


Well, I think maybe the more foolproof Threads.@threads for i = 1:I suits my application better. e.g. I can split a massive parallel workload into several batches, doing

for batch = 1:10
    Threads.@threads for i = 1:4 # match the number of available physical threads
        # some work
    end
    if some_demands_are_met_already
        break # early break
    end
end

instead of simply doing

Threads.@threads for i = 1:40
    # some work
end

Just to expand on this, all IO goes through the dedicated task running the libuv event loop, which is pinned to the main thread. This also includes some stuff that’s not strictly IO, such as sleep.

This is problematic in various contexts, for example if a long-running, non-yielding task happens to be scheduled on the main thread. If another task reaches a printing/logging/sleep call, it will have to wait until the non-yielding task finishes before it can make forward progress. The workaround is to make sure all long-running tasks yield on a regular basis, if necessary by adding explicit yield() calls.

Relevant issue: weird interaction between `sleep` function and Threads · Issue #43952 · JuliaLang/julia · GitHub

2 Likes