Tutorial: Concurrency patterns for controlled parallelisms

Hi, I just wrote a quick tutorial on a couple of patterns using Channel that are handy for some parallel processing: Concurrency patterns for controlled parallelisms.

High-level data parallelism is the best starting point for writing parallel programs. However, it is sometimes required to control the parallelism in your program so that, e.g., the usages of the bounded resources like memory can be managed. This is where it is necessary to deal with concurrency. Although there are a lot of concurrency primitives, Channel is the most versatile tool that Julia provides out-of-the-box. In this tutorial, we look at how to implement simple and useful patterns based on Channel. Some of these patterns are known as task-parallel algorithmic skeletons (or parallel skeletons).

  1. Worker pool
    1. Re-distribution hacks
  2. Task farm
  3. Pipeline
  4. Promise (request-response)
    1. Improving the API
    2. Wrapping single-thread API

I wrote this mainly because I couldn’t find a place this kind of usage is summarized (other than JuliaActors and Nathan Daly’s CspExamples.jl CSP examples in Julia? ; so let me know if you know other related resources). I think these are very common patterns used in Go community (minus select) and I learned these patterns by looking into their discussions.

20 Likes

Thank you for your tutorial, it is really great.

But, it is somewhat difficult to read, I’ll try to describe my issues, maybe it will help to improve it.

  1. in the first example, an argument of the allocation function is body. This is very confusing, because it is a function, which is usually denoted with verb. Maybe it’s better to use word like process or something similar?
    function allocate(process)
        open("/dev/urandom") do file
            buffer = Vector{UInt8}(undef, buffer_length)
            process((file, buffer))
        end
    end
  1. To be honest, worker pool construction which consists of three interwined closures is very hard to understand. It took me some time to understand, that first example can be rewritten as (I hope I am not mistaken and this is correct representation of the original idea)
let buffer_length = 2^10
    ntasks = Threads.nthreads()
    @sync for _ in 1:ntasks
        @spawn begin
            open("/dev/urandom") do file
                buffer = Vector{UInt8}(undef, buffer_length)
                for input in works
                    read!(file, buffer)
                    input[] = sum(buffer; init = 0.0)
                end
            end
        end
    end

    sum(results) / (length(results) * buffer_length)
end

Maybe it makes sense to add something like Explanation subsection, where worker pool pattern can be shown as in the snippet above and than step by step it can be transformed to the three functions version.

  1. Am I correct, that if you do not need to allocate any resources, you can just use identity instead of allocate, i.e.
    workerpool(identity, works) do ref
        ref[] = sum(rand(100))
    end

(as I have said, it’s not easy to correctly trace all variables and functions).

  1. I’ve tried to trace threads usage with the following modification
 function workerpool(work!, allocate, request; ntasks = Threads.nthreads())
    @sync for _ in 1:ntasks
        @spawn allocate() do resource
            cnt = 0
            for input in request
                cnt += 1
                work!(input, resource)
            end
            @info Threads.threadid() cnt
        end
    end
end

and result was

┌ Info: 4
â””   cnt = 1
┌ Info: 3
â””   cnt = 1
┌ Info: 2
â””   cnt = 1
┌ Info: 1
â””   cnt = 29

so it looks like all work was done mostly by one thread. Is it a problem of a scheduler or in this particular example execution time was so small, that Julia just didn’t have time to parallelize task? Or it is the problem discussed in the next section “Re-distribution hacks”?

  1. In Re-distribution hacks it is written “use of @async impedes migration of the tasks across OS threads (which is not implemented as of Julia 1.6 but is likely to be implemented in the future Julia versions)”. What does that mean? That using @async is not recommended because it schedule everything on one thread? But this behaviour may change in the future and this is what is said in brackets?
1 Like

Thanks for the feedbacks!

Yes, process seems to be a good name here.

(Though I’m not entirely sure if this principle always applies to argument names for callbacks; e.g., is f in open(f, name) a verb?)

Ah, yes, thanks for pointing it out. I know that it’s always to better to start with concrete examples (especially in tutorial) but generalization is always attractive… I agree starting with a concrete code like you posted would be a good idea.

You’d need

workerpool(f -> f(nothing), works) do ref, _
    ref[] = sum(rand(100))
end

This probably should exist as an arity overload workerpool(work!, request).

It may be the case, but I’d say the scheduler is doing the right thing here because we are mostly(?) only doing I/O and so it’s not very meaningful to use multiple CPUs. Maybe we need more CPU-intensive works for demonstrating parallelization.

Using @async is not recommended because Julia does not move the tasks across thread. If you start them from different OS threads, multiple @async can use multiple OS threads. The problem is that @async tasks will “stick” to one OS thread (Note: this is an implementation detail AFAICT but my hunch is that this behavior will not be changed at least in 1.x time-frame).

As of Julia 1.6, @spawn tasks also is not migrated anyway and so mentioning this is kind of unnecessary. But if you play with Julia 1.8-DEV, it now does migrate @spawn tasks.

2 Likes

this looks like x7 is piped into the worker f and result f(x4) comes out at the other end.

I’m guessing this is meant to say x7 is waiting to be processed next while f(x4) is on its way out?

Yes, the latter is what I meant. Maybe it’s clearer if I don’t put xs before fs.

2 Likes