How to run two tasks on parallel?

As an example, showing the difference between the thread/non-threaded version:

tmin=1;
tmax=10;
D1 = [1.0 2 3 4 5 6 7 8 9 10];
D2 = [10.0 9 8 7 6 5 4 3 2 1];
X = zeros(10);
Y = zeros(10);

function calculate!(X, Y, D1, D2, tmin,tmax)
    for n in tmin+1:tmax
        X[n] = D1[n]*D2[n] + Y[n-1];
        Y[n] = D1[n-1]*D2[n-1] + X[n-1];
    end
    nothing
end

function calculate_threaded!(X, Y, D1, D2, tmin, tmax)
    for n in tmin+1:tmax
        Xn = Threads.@spawn D1[n]*D2[n] + Y[n-1]; 
        Yn = Threads.@spawn D1[n-1]*D2[n-1] + X[n-1];
        X[n] = fetch(Xn)
        Y[n] = fetch(Yn)
    end
    nothing
end

We benchmark to see:

using BenchmarkTools
@btime calculate!(X, Y, D1, D2, tmin, tmax)
# 31.539 ns (0 allocations: 0 bytes)
@btime calculate_threaded!(X, Y, D1, D2, tmin, tmax)
# 41.249 μs (125 allocations: 9.53 KiB)

Using locks will likely only make this worse. You will only see a performance benefit if the calculations in “Threads.@spawn” is significant (like a big matrix calculation).

1 Like

The issue is that I do not want to create and turn it off the spawn at each time-step. So, I want it to keep running over time-step. But, it seems, in my case above, when t1 starts working it does not change the new value of flag[] with t2. Can you guide me only how to update its value with t2`. I mean in general how can I make a parameter seen for threads mean while they are running

Here is the code with each thread communicating with a Channel (Asynchronous Programming · The Julia Language)

function thread1(X, Y, D1, D2, tmin, tmax, x_channel::Channel, y_channel::Channel)
    put!(x_channel, 0.0)
    for n in tmin+1:tmax
        Y_prev = take!(y_channel)
        X[n] =  D1[n]*D2[n] + Y_prev;
        put!(x_channel, X[n])
    end
    nothing
end
function thread2(X, Y, D1, D2, tmin, tmax, x_channel::Channel, y_channel::Channel)
    put!(y_channel, 0.0)
    for n in tmin+1:tmax
        X_prev = take!(x_channel)
        Y[n] = D1[n-1]*D2[n-1] + X_prev;
        put!(y_channel, Y[n])
    end
    nothing
end

function calculate_only_two_threads!(X, Y, D1, D2, tmin, tmax)
    x_channel = Channel{Float64}(1)
    y_channel = Channel{Float64}(1)

    task_x = Threads.@spawn thread1(X, Y, D1, D2, tmin, tmax, x_channel, y_channel)
    task_y = Threads.@spawn thread2(X, Y, D1, D2, tmin, tmax, x_channel, y_channel)

    wait(task_x)
    wait(task_y)
end

This has the benefit of only spawning a single thread. The benchmarks below:

using BenchmarkTools
@btime calculate!(X, Y, D1, D2, tmin, tmax)
# 31.539 ns (0 allocations: 0 bytes)
@btime calculate_threaded!(X, Y, D1, D2, tmin, tmax)
# 41.249 μs (125 allocations: 9.53 KiB)
@btime calculate_only_two_threads!(X, Y, D1, D2, tmin, tmax)
# 10.109 μs (29 allocations: 1.94 KiB)

Notice that this threaded version is over 300 times slower than the sequential version. Having to synchronise between threads is usually a performance killer. Someone could likely write some better parallel code to do this, but it would always likely be at least 10-100x slower, unless the workload done by each thread was increased by orders of magnitude.

1 Like

Thank you very much for your helpful explanations and your proposed code.

Does executing take!(x_channel)/take!(y_channel) is conditioned of having a new value (modified value) for x_channel/y_channel? If not, then thread1 could finish executing its for loop before thread2 even starts, right?

Such as having ::Channel between the main thread and the spawned one, as below right?

function thread1(X, Y, D1, D2, tmin, tmax, x_channel::Channel, y_channel::Channel)
    put!(x_channel, 0.0)
    for n in tmin+1:tmax
        Y_prev = take!(y_channel)
        X[n] =  D1[n]*D2[n] + Y_prev;
        put!(x_channel, X[n])
    end
    nothing
end

function calculate_only_two_threads!(X, Y, D1, D2, tmin, tmax)
    x_channel = Channel{Float64}(1)
    y_channel = Channel{Float64}(1)

    task_x = Threads.@spawn thread1(X, Y, D1, D2, tmin, tmax, x_channel, y_channel)
    
    put!(y_channel, 0.0)
    for n in tmin+1:tmax
        X_prev = take!(x_channel)
        Y[n] = D1[n-1]*D2[n-1] + X_prev;
        put!(y_channel, Y[n])
    end
    wait(task_x)
end

Sorry there are two spawned threads, for the x and y. And 3 in total, including the main one, but that spends most of the time waiting for the other two to finish.

take! will wait until there is at least one item in the channel, but the channel has a size of 1, and there is only at most a single item in the channel. put! adds one item to the channel. There is probably a much more elegant way of doing this, but this is the way that made sense when I wrote the code. The channel is designed for asynchronous use specifically, with all the locks and safety you need when doing things in parallel, so essentially the following:

will wait until the corresponding put! from thread1.

You could theoretically put the second work in the main thread, but I don’t think it would make much difference, as I don’t think threads are pinned to CPU cores. It will just sleep the main thread until the others are finished. You can benchmark both versions to see if there is a significant difference, but I personally would prefer to move the work to two separate threads and let the main thread wait.

1 Like

I got it. Thank you!

What does it mean?

Yes, as you think, the benchmark shows a small difference.

  3.700 μs (21 allocations: 1.45 KiB)
  5.300 μs (28 allocations: 2.16 KiB)

Great! if it’s better then keep it for sure. This difference is likely just spawning the second thread. This should go away when you’re doing more work (longer times).

Pinning usually refers to having a single thread running on a particular core. So a pinned thread would only execute on a single core (like Core 0). But without pinning, threads will just execute on any threads, and long running ones may even switch between threads. This can also be called Processor/Thread Affinity and you can read more here - Processor affinity - Wikipedia. Usually, having a task pinned to a single thread helps performance because the L1 and L2 cache (the really really fast memory for the CPU) is local to that core, so keeping the execution there will reduce cache misses in your program.

1 Like

But without pinning, threads will just execute on any cores, right?
Can we pin in Julia?

Yes i meant cores. I’m pretty sure there’s probably a way, but I have never had to do it.

1 Like

Is this an unfortunately chosen example? If you unroll the loop once the x decouples from the y.
Also, this is too little work to run in parallel.

1 Like

Exactly, I just introduced this example for knowing the mechanism of parallelization.