How to write a master-subproblem iterative algorithm asynchronously?

This should be a fix

function taskbody(j, e)
    sleep(j)
    notify(e) # raise
end; function waitbody(e, t_vec)
    for _ = 1:J
        wait(e)
        println(istaskdone.(t_vec))
        reset(e) # lower it manually
    end
    wait.(t_vec)
end; function test_revised()
    e = Base.Event()
    t_vec = [@task taskbody(j, e) for j = 1:J]
    for t = t_vec t.sticky = false end
    t0 = Threads.@spawn waitbody(e, t_vec)
    schedule.(t_vec)
    wait(t0)
    return nothing
end

const J = 3
@time test_revised()

But I don’t know what lock conflict implies

julia> @time test_revised()
Bool[1, 0, 0]
Bool[1, 1, 0]
Bool[1, 1, 1]
  3.015793 seconds (132 allocations: 5.672 KiB, 1 lock conflict)
help?> @time
Any lock conflicts where a ReentrantLock had to wait are shown as a count. (added in Julia 1.11)
help?> @async
  Values can be interpolated into @async via $
help?> Threads.@spawn
  Values can be interpolated into @spawn via $

I think this feature should also exist for @task. Why not? Here is a demo

julia> for _ = 1 # pass the name                                                                          
           x = [1, 2, 3]                                                                                  
           t = @task circshift(x, 1)                                                                      
           x = [4, 5, 6] # suppose x is reassigned                                                        
           schedule(t)                                                                                    
           println(fetch(t))                                                                              
       end
[6, 4, 5]

julia> for _ = 1 # pass the value                                                                         
           x = [1, 2, 3]                                                                                  
           t = let x = x                                                                                  
               @task circshift(x, 1)                                                                      
           end                                                                                            
           x = [4, 5, 6] # suppose x is reassigned                                                        
           schedule(t)                                                                                    
           println(fetch(t))                                                                          
       end
[3, 1, 2]

suggested in Github Clarify interpolation in @async and Threads.@spawn by ettersi · Pull Request #36654 · JuliaLang/julia · GitHub.

I’ve just finished one version of the algorithm, which can bring the correct convergent result.
Actually is my very first realistic asynchronous program. Many thanks to @sgaure (e.g. post #8).

Well, I do find it tricky to implement a decent & concise & problem-itself-based termination criterion! Therefore I adopt a pragmatic one temporarily.

function subproblemˈs_duty(j, snap, inbox) # the hasty version
    mj = inn[j]
    reset_mj_obj!(mj, snap.β, snap.μ, snap.ν)
    JuMP.optimize!(mj); JuMP.assert_is_solved_and_feasible(mj; allow_local = false)
    Δ = snap.θ[j] - JuMP.objective_value(mj)
    if (is_cut_off = Δ > COT)
        con = build_con(j, mj) # on the currently solved status of `mj`
        @lock model_lock JuMP.add_constraint(model, con) # add it in haste
    end
    lens = function(new_snap) # this is dumb here but make provision for the fussy version
        j, is_cut_off
    end
    @lock inbox_lock push!(inbox, lens)
end

function masterˈs_algorithm(Δt)
    function shot!()
        JuMP.optimize!(model); JuMP.assert_is_solved_and_feasible(model; allow_local = false, dual = true)
        println("▶ modelˈs ObjBound = $(JuMP.objective_bound(model))")
        local snap = (t = timestamp += 1, θ = ı.(θ), β = ı.(β), μ = ı.(μ), ν = ı(ν))
    end
    function masterˈs_loop()
        while true
            if isempty(inbox)
                yield()
                continue
            end
            js2reply, is_global_cut_off = Set{Int}(), false
            while !isempty(inbox)
                lens = @lock inbox_lock pop!(inbox)
                local j, is_cut_off = lens(snap)
                println("t = $(snap.t), j = $j, is_cut_off = $is_cut_off")
                push!(js2reply, j)
                is_global_cut_off |= is_cut_off
            end
            if is_global_cut_off
                @lock model_lock snap = shot!()
                t0 = time()
            elseif time() - t0 > Δt
                println("▶▶▶ Long time no improvements, quit!")
                return
            end
            for j = js2reply Threads.@spawn subproblemˈs_duty(j, snap, inbox) end
        end
    end
    t0 = time()
    timestamp, inbox = -1, Function[]
    snap = shot!()
    masterˈs_task = Threads.@spawn masterˈs_loop()
    for j = 1:J Threads.@spawn subproblemˈs_duty(j, snap, inbox) end
    wait(masterˈs_task)
end

masterˈs_algorithm(5)

This follows a hasty fashion—adding cut to model upon it is proved to be capable of cutting off the very trial point who generates cut. This follows the convention about how we would add cuts in a serial program.

Since we are currently writing asynchronous programs. It is natural to have an updated trial point in master’s loop, motivating a fussy fashion—adding cut to model upon it is proved to be capable of cutting off the updated trial point, instead of the potentially older one who generates cut.

To this end, we need to prepare a judging function can_cut_off inside subproblemˈs_duty:

function subproblemˈs_duty(j, snap, inbox) # the fussy version
    mj = inn[j]
    reset_mj_obj!(mj, snap.β, snap.μ, snap.ν)
    JuMP.optimize!(mj); JuMP.assert_is_solved_and_feasible(mj; allow_local = false)
    con = build_con(j, mj) # on the currently solved status of `mj`
    can_cut_off = function(new_snap) # the body has some details that you can skip
        Θ, Β, Μ, Ν = new_snap.θ, new_snap.β, new_snap.μ, new_snap.ν
        pair = haskey(mj, :bLent)
        pBus, q = ı.(mj[:pBus]), ı.(mj[:q])
        term_beta = pair ? sum((pBus[t,1] + pBus[t,2])Β[t] for t=1:T) : Β ⋅ pBus
        term_mu = pair ? sum((q[t,1] + q[t,2])Μ[t] for t=1:T) : Μ ⋅ q
        term_nu = sum(q)Ν
        cut_term = +(ı(mj[:prim_obj]), term_beta, term_mu, term_nu)
        local is_cut_off = Θ[j] > cut_term + COT # return a Bool
    end
    lens = function(new_snap) # this is dumb here but make provision for other versions
        j, can_cut_off(new_snap), con
    end
    @lock inbox_lock push!(inbox, lens)
end

The related part in masterˈs_loop is revised accordingly as

local j, is_cut_off, con = lens(snap)
is_cut_off && @lock model_lock JuMP.add_constraint(model, con)

That’s neat.

One comment: The isempty on a Vector is not documented to be thread safe. I believe it is in practice. isempty(x) boils down to x.size[1] == 0, which I think happens to be atomic on all architectures julia is supported on. In some future it might perhaps happen that the compiler is able to prove that the isempty(inbox) can’t change at the top of the outer while loop, and creates an infinite loop, but apparently it doesn’t do that now.

Channel is first-in first-out, but I prefer a last-in first-out counterpart. I don’t know if there is an established object. Therefore I though Vector might suit as well. push! & pop! happens to be exactly last-in first-out. But isempty is a reading operation, how can it be unsafe?

How can this ever happen? The outer while loop has many controls in its body which are hard to analyze their negligibility, especially in the context of multithreading concurrency.

Some architechtures can’t read and write in 64-bit chunks, but read/write 2 32-bit chunks, or 4 16-bit chunks instead, in some order. If read and write happen at the same time, the read can become corrupted with incomplete data. I don’t think this can happen on any architectures which julia runs on.

This is a bit more subtle. I was thinking of the top part:

while true
    if isempty(inbox)
        yield()
        continue
    end
...
end

Imagine a future where julia does pre-emptive scheduling, and yield() therefore has become a no-op. This top of the loop will just spin until there’s something in the inbox. In languages like C and java, you would have to mark inbox as volatile to tell the compiler it can change at any time, and every access must actually be performed. Otherwise, the compiler is free to hoist the test out of the spin-loop, since it can’t change inside.

Julia’s compiler can already do such things with bounds checking. The behaviour rests on assumptions about aliasing, i.e. whether there exists other references to the same memory location. In your case it certainly does, there’s one in each spawned task, and the compiler will know.

In general, Julia takes the opposite approach from C, assuming there is aliasing unless it can prove there’s not. And there are some tools for opting out (There’s a Const and @aliasscope in Base.Experimental to do such things). I don’t think the aliasing logic can change until julia 2.0 (a mythical future version of julia which nobody has started working on).

1 Like

Have any thoughts possible? @odow (ideas mainly in my #23 post) I’m unsure if I’m the first to try writing such an asynchronous cutting plane algorithm.

PS The runnable code is here. My setting:
map(x -> Threads.nthreads(x), (:default, :interactive)) == (7, 1).

I’m unsure if I’m the first to try writing such an asynchronous cutting plane algorithm

SDDP.jl supports an asynchronous multithreading algorithm. It’s pretty much what you’re trying to write.

Each thread does full iterations asynchronously. Each node has a lock so that no two threads access the same node at the same time. Iterations may overlap, so one thread may be doing a forward pass as another is doing the backward pass, etc. There’s a single Bool available to all threads that is used to control termination.

Take a look from here:

Well, to be honest I found few likeness.

Are your SDDP.jl scenario tree based, IIRC? Therefore you have the node concept.
Multistage stochastic programs (MSSP) are very hard to solve to optimality. (Actually my problem can be deemed a Two stage 1 scenario MIP with J blocks. My problem is already hard when J is large, let alone >2 stage multiple scenarios.)

Seems you’re using the Distributed module. I currently have no idea about it.

Your algorithm:

@sync for _ in 1:max_threads
        Threads.@spawn begin

where max_threads is hardware based. By comparison, my algorithm is hardware-agnostic (as long as there are multiple logical processors). And you used @sync, which I hadn’t use it, neither wait the subproblem tasks. You made use of Channels, but I didn’t (as my comment in the #25 post).

One thing I’m most curious about: how can you run multiple forward-backward pass? Is this meaningful? i.e. Can the SDDP training algorithm be run in parallel?

Well, actually not that curious, since I’m not that interested on MSSP anymore. (Therefore you can opt to neglect my questions… (I’m somewhat not very spirited on this.))


Generally speaking, I think JuMP is more flexible than SDDP.jl (or some other extension packages). I find it relatively easy to write algorithms with JuMP—although it as well takes me years to get familiar with it. I was a noob with little experience then. But happily I’ve gained quite a few proficiency now.

I’m unsure if I’m the first to try writing such an asynchronous cutting plane algorithm.

I’m also okay being the second.

There are two separate parallel implementations in SDDP.jl:

  • SDDP.Asynchronous, which uses Distributed and Channel
  • SDDP.Threaded, which uses shared memory multi-threading. This doesn’t use Channel.

You can ignore the SDDP.Asynchronous code.

If there are two stages and J subproblems in the second stage, then SDDP is equivalent to your problem.

I’m also okay being the second

I suggest you google asynchronous AND parallel AND (Benders OR "cutting plane"). There is a large literature on different approaches. Chances are, if you think you have a novel parallel scheme, someone has already published something about it.

As one example: https://www.cirrelt.ca/documentstravail/cirrelt-2021-41.pdf

There are options to parallelize the computation of the J subproblems but synchronize each first-stage solve. Or you could have a copy of the model on each thread that independently do iterations, and then every N iterations they share the relevant cuts between threads. Or you can go full asynchronous where the threads do iterations on top of one another. There are many possible variations.