Julia: how to run embarrassingly parallel jobs with nested for loops?

Hey! I’d like to run 4 independent tasks in parallel for a certain number of times using 4 cores. Here is the script, say test.jl, that I run with the command julia --threads 4 test.jl:

using Distributed

function remote_computation()
    thid = Threads.threadid()
    println("Remote computation with thread $thid")
    sleep(5)  # Heavy computation: to be parallelized
    # (usually I would put some real computation here but for reproducibility I used sleep)
    rand()
end

addprocs(Threads.nthreads())  # Add as many processes as available threads, 4 in my case

for t in 1:10  # Say we want to do the full parallelized process 10 times
    results_matrix = zeros(2,2)
    @sync for i in 1:2
        for j in 1:2
            @async Threads.@spawn results_matrix[i, j] = remote_computation()
        end
    end
    println("Completed parallelized process number: $t")
end

Notice that I have a nested for loop as, in my real application, remote_computation depends on i and j which explains why I have two levels of iteration. I removed this dependency for the sake of simplicity.

Expectation: My expectation is that for a sequence of 10 (outermost loop), the 4 processes are run in parallel and finish about at the same time. Also, if I do htop, I expect my 4 CPUs to be 100% busy during the whole execution (notice that in this MWE, using sleep may not make them busy, right?).
In my understanding, that would give the following output.

Remote computation with thread 2
Remote computation with thread 3
Remote computation with thread 1
Remote computation with thread 4
Completed parallelized process number: 1
Remote computation with thread 1
Remote computation with thread 3
Remote computation with thread 2
Remote computation with thread 4
Completed parallelized process number: 2
etc... (10 times in total)

With the 4 CPUs 100% busy all along.

Reality: However, my expectation is only realized for the 1st of the 10 iterations and then jobs are almost run sequentially (only 2 threads active in parallel) and my cores are ~30% busy with one at 100%. One can see this with the fact that the same thread ID is being repeated starting from iteration 2.

Remote computation with thread 2
Remote computation with thread 3
Remote computation with thread 1
Remote computation with thread 4
Completed parallelized process number: 1
Remote computation with thread 1  # thread 1 used for 3 jobs out of 4
Remote computation with thread 1
Remote computation with thread 1
Remote computation with thread 4
Completed parallelized process number: 2
etc... (10 times in total)

I tried to follow this discussion to use @sync to have my script wait for the 4 processes are finished and @async to have them executed in parallel, but apparently I did miss something here.

You’re mixing up threads and processes here. Are you planning to run multiple threads on multiple machines? If not, just stick to the threads and don’t spawn extra processes.

Here you are mixing up threads and tasks. Read the docs on threads. Instead of calling @async and @sync you should be just spawning the threads and letting them run. If you want to synchronize them, use a channel or call wait or fetch.

1 Like

Thanks for clarifying! I plan to use multiple threads on a single machine, thus I may stick to the threads as you recommended. I made a trial using wait with a “tasks matrix” but still observed the same “sequential” execution after iteration 2. Is it the right way to use wait?

for t in 1:10  # Say we want to do the full parallelized process 10 times
    results_matrix = zeros(2,2)
    tasks_matrix = Array{Task,2}(undef,2,2)
    for i in 1:2
        for j in 1:2
            tasks_matrix[i,j] = Threads.@spawn results_matrix[i, j] = remote_computation()
        end
    end
    for i in 1:2
        for j in 1:2
            wait(tasks_matrix[i,j])
        end
    end
    println("Completed parallelized process number: $t")
end

I think the fact that your remote_computation calls sleep is problematic for your results. Change it as follows and see how your latest example works:

function remote_computation()
   thid = Threads.threadid()
   println("Remote computation with thread $thid")
   start = rand();
   for i in 1:10000000
      start += 1
   end
   return start
end

1 Like

I think I got close to what I wanted using a combination of Threads.@spawn and @sync in the outer loop to wait for the results of each thread before continuing.

for t in 1:10  # Say we want to do the full parallelized process 10 times
    results_matrix = zeros(2,2)
    @sync for i in 1:2
        for j in 1:2
            Threads.@spawn begin
                results_matrix[i, j] = remote_computation()
            end
        end
    end
end

I do not observe the 4 CPUs at 100% though and still some threads are being used several times for the same parallelized iteration.

PS : I used a real remote computation function that does not use sleep as you suggested.

Yes, it looks like @sync handles threads

" Wait until all lexically-enclosed uses of @async , @spawn , @spawnat and @distributed are complete. All exceptions thrown by enclosed async operations are collected and thrown as a CompositeException ."

So your solution should work right.

there is some overhead in spawning the thread and such, this can cause less than full utilization, particularly for tasks that take a small amount of time. If your task takes 5 seconds or more, you should see nearly full utilization. If your task takes 50ms or less you probably won’t. somewhere in between is the transition.

1 Like

Indeed I can see the overhead using a 20s task. I could see full utilization + equal distribution between the 4 threads on a remote machine dedicated to computing (no other processes running in the background), which is exactly what I was looking for.

On my own laptop I do not see that “perfect” distribution, but I guess this is because I am running other processes at the same time such as internet browser, IDE, etc.

Thanks a lot for your help!