How to write a master-subproblem iterative algorithm asynchronously?

There are some deficiencies in that earlier code, which is not surprising since I was inexperienced.

It turns out that there is a decent termination criterion, as long as the subproblems can be solved in a reasonable short time. In my real-world application, the subproblem is a small-scaled MIP, therefore the condition is fulfilled.

The following algorithm is an improved version. Although it might seem a bit long, I believe it is natural, clear and concise.

function bilin_expr(j, iˈı::Function, β) # pivot
    pBus = j ∈ Rng1 ? X[j].pBus_1 + X[j].pBus_2 : X[j].pBus
    JuMP.@expression(model, β ⋅ iˈı.(pBus))
end;
function subproblemˈs_duty(j, snap, inbox) # the fussy version
    t = let β = snap.β, mj = inn[j]
        JuMP.@objective(mj, Min, bilin_expr(j, identity, β))
        JuMP.optimize!(mj) # 🕰️
        JuMP.termination_status(mj)
    end
    if t == JuMP.OPTIMAL
        con = JuMP.@build_constraint(θ[j] ≤ bilin_expr(j, ı, β)) # always a VI
        can_cut_off = function(new_snap)
            local β = new_snap.β
            vio_degree = new_snap.θ[j] - bilin_expr(j, ı, β)
            return vio_degree > COT, vio_degree
        end
        lens = function(new_snap)
            j, can_cut_off(new_snap), con
        end
    else # [abnormal] should be thrown in the master
        lens = function(new_snap)
            j, (false, t), missing
        end
    end
    @lock inbox_lock push!(inbox, lens)
end;
function wait_until_all_started(taskvec)
    in_pool = Set(eachindex(taskvec))
    while true
        isempty(in_pool) && return
        progress = false
        for j = in_pool
            if istaskstarted(taskvec[j])
                pop!(in_pool, j)
                progress = true
                break
            end
        end
        progress && continue
        yield()
    end
end
function shot!(timestamp)
    JuMP.optimize!(model)
    JuMP.termination_status(model) == JuMP.OPTIMAL || error("$(JuMP.termination_status(model))")
    snap = (t = timestamp += 1, θ = ı.(θ), β = ı.(β), ub = JuMP.objective_bound(model))
    timestamp, snap
end
function warm_up()
    inbox, js_remains = Function[], Set(1:J)
    _, snap = shot!(0)
    sub_tasks = [Threads.@spawn subproblemˈs_duty(j, snap, inbox) for j = 1:J]
    wait_until_all_started(sub_tasks)
    while true
        if isempty(inbox)
            yield()
            continue
        end
        while !isempty(inbox)
            lens = @lock inbox_lock pop!(inbox)
            local j, (_, ø), con = lens(snap)
            ismissing(con) && error("Subproblem $j terminates with $ø")
            JuMP.add_constraint(model, con) # add the earliest cuts
            pop!(js_remains, j)
            isempty(js_remains) && return
        end
    end
end
function masterˈs_loop(snap, timestamp, inbox)
    js_caught, js_detained = Int[], Int[]
    while true
        if isempty(inbox)
            yield()
            continue
        end # ∀ j, we have j ∈ js_caught ∪ js_detained ∪ js_at_large
        while !isempty(inbox)
            lens = @lock inbox_lock pop!(inbox)
            local j, (cut_off_by_j, ø), con = lens(snap)
            ismissing(con) && error("Subproblem $j terminates with $ø")
            if cut_off_by_j
                JuMP.add_constraint(model, con)
                println("▶ ub = $(snap.ub) | t = $(snap.t), j = $j | vio = $ø")
                push!(js_caught, j)
            else
                push!(js_detained, j) # mark it as saturated
            end
        end
        if !isempty(js_caught)
            timestamp, snap = shot!(timestamp)
            for j = js_detained Threads.@spawn subproblemˈs_duty(j, snap, inbox) end
            for j = js_caught   Threads.@spawn subproblemˈs_duty(j, snap, inbox) end
            empty!(js_detained)
            empty!(js_caught)
        elseif length(js_detained) == J
            println("▶▶▶ No more progress can be made, quit!")
            return
        else
            @debug "awaiting js := $(setdiff(1:J, [js_caught; js_detained]))"
        end
    end
end
function masterˈs_algorithm()
    timestamp, inbox = -1, Function[]
    timestamp, snap = shot!(timestamp)
    sub_tasks = [Threads.@spawn subproblemˈs_duty(j, snap, inbox) for j = 1:J]
    wait_until_all_started(sub_tasks)
    wait(Threads.@spawn masterˈs_loop(snap, timestamp, inbox))
end

I’ve conducted some tests to prove its effectiveness.

And I believe it is efficient:
For a J = 40 * 255 test case (legitimately large scale), the loggings are like

julia> @time begin
    warm_up()
    masterˈs_algorithm()
end

...omit lines...
▶ ub = 123114.13741671735 | t = 21, j = 677 | vio = 0.00012887796537341956
▶ ub = 123114.13678854181 | t = 22, j = 403 | vio = 1.0083634816027143e-5
▶ ub = 123114.13677849865 | t = 23, j = 2360 | vio = 3.5200807962709746e-5
▶ ub = 123114.13677849865 | t = 23, j = 1230 | vio = 1.4319802176032681e-5
▶ ub = 123114.13672897799 | t = 24, j = 1096 | vio = 0.0001275890287768533
▶▶▶ No more progress can be made, quit!
1585.385998 seconds (1.84 G allocations: 44.676 GiB, 1.38% gc time, 280 lock conflicts, 30.78% compilation time)

julia> versioninfo()
Julia Version 1.11.6
Commit 9615af0f269 (2025-07-09 12:58 UTC)
Build Info:
  Official https://julialang.org/ release
Platform Info:
  OS: Linux (x86_64-linux-gnu)
  CPU: 256 × AMD EPYC 7763 64-Core Processor
  WORD_SIZE: 64
  LLVM: libLLVM-16.0.6 (ORCJIT, znver3)
Threads: 255 default, 1 interactive, 127 GC (on 256 virtual cores)

Note that 255 is my Threads.nthreads(:default).


Edit: It appears that an even better definition to the function masterˈs_loop(snap, timestamp, inbox) above exists, therefore I put it here

an alternative definition of master's_loop
function masterˈs_loop(snap, timestamp, inbox)
    v, i = fill(0, J), 0
    while true
        if isempty(inbox) # no event happens
            yield()
            continue
        end # ∀ j, we have j ∈ buffer ∪ inbox ∪ js_at_large
        up = false
        while true
            lens = @lock inbox_lock pop!(inbox)
            local j, (cut_off_by_j, ø), con = lens(snap)
            ismissing(con) && error("Subproblem $j terminates with $ø")
            if cut_off_by_j
                up = true
                JuMP.add_constraint(model, con)
                println("▶ ub = $(snap.ub) | t = $(snap.t), j = $j | vio = $ø")
            end
            v[i+=1] = j # write the buffer
            isempty(inbox) && break
            if up && i == LOGIS # LOGICAL PROCESSORS
                break # early break to update master model, i.e. snap
            end
        end
        if up
            timestamp, snap = shot!(timestamp)
            for j = view(v, 1:i) Threads.@spawn subproblemˈs_duty(j, snap, inbox) end
            i = 0
        elseif i == J
            println("▶▶▶ No more progress can be made, quit!")
            return
        end
    end
end

This new version embodies the idea of “batch” updating, in which the size of the batch is in accordance with the number of logical processors.
The key advantage of this new version is that it adds fewer cuts to the master.
An other advantage compared with the older version is that the Logging under the new version reads smoother (as it uses batch update and early break).
The final convergent value is the same.