There are some deficiencies in that earlier code, which is not surprising since I was inexperienced.
It turns out that there is a decent termination criterion, as long as the subproblems can be solved in a reasonable short time. In my real-world application, the subproblem is a small-scaled MIP, therefore the condition is fulfilled.
The following algorithm is an improved version. Although it might seem a bit long, I believe it is natural, clear and concise.
function bilin_expr(j, iˈı::Function, β) # pivot
pBus = j ∈ Rng1 ? X[j].pBus_1 + X[j].pBus_2 : X[j].pBus
JuMP.@expression(model, β ⋅ iˈı.(pBus))
end;
function subproblemˈs_duty(j, snap, inbox) # the fussy version
t = let β = snap.β, mj = inn[j]
JuMP.@objective(mj, Min, bilin_expr(j, identity, β))
JuMP.optimize!(mj) # 🕰️
JuMP.termination_status(mj)
end
if t == JuMP.OPTIMAL
con = JuMP.@build_constraint(θ[j] ≤ bilin_expr(j, ı, β)) # always a VI
can_cut_off = function(new_snap)
local β = new_snap.β
vio_degree = new_snap.θ[j] - bilin_expr(j, ı, β)
return vio_degree > COT, vio_degree
end
lens = function(new_snap)
j, can_cut_off(new_snap), con
end
else # [abnormal] should be thrown in the master
lens = function(new_snap)
j, (false, t), missing
end
end
@lock inbox_lock push!(inbox, lens)
end;
function wait_until_all_started(taskvec)
in_pool = Set(eachindex(taskvec))
while true
isempty(in_pool) && return
progress = false
for j = in_pool
if istaskstarted(taskvec[j])
pop!(in_pool, j)
progress = true
break
end
end
progress && continue
yield()
end
end
function shot!(timestamp)
JuMP.optimize!(model)
JuMP.termination_status(model) == JuMP.OPTIMAL || error("$(JuMP.termination_status(model))")
snap = (t = timestamp += 1, θ = ı.(θ), β = ı.(β), ub = JuMP.objective_bound(model))
timestamp, snap
end
function warm_up()
inbox, js_remains = Function[], Set(1:J)
_, snap = shot!(0)
sub_tasks = [Threads.@spawn subproblemˈs_duty(j, snap, inbox) for j = 1:J]
wait_until_all_started(sub_tasks)
while true
if isempty(inbox)
yield()
continue
end
while !isempty(inbox)
lens = @lock inbox_lock pop!(inbox)
local j, (_, ø), con = lens(snap)
ismissing(con) && error("Subproblem $j terminates with $ø")
JuMP.add_constraint(model, con) # add the earliest cuts
pop!(js_remains, j)
isempty(js_remains) && return
end
end
end
function masterˈs_loop(snap, timestamp, inbox)
js_caught, js_detained = Int[], Int[]
while true
if isempty(inbox)
yield()
continue
end # ∀ j, we have j ∈ js_caught ∪ js_detained ∪ js_at_large
while !isempty(inbox)
lens = @lock inbox_lock pop!(inbox)
local j, (cut_off_by_j, ø), con = lens(snap)
ismissing(con) && error("Subproblem $j terminates with $ø")
if cut_off_by_j
JuMP.add_constraint(model, con)
println("▶ ub = $(snap.ub) | t = $(snap.t), j = $j | vio = $ø")
push!(js_caught, j)
else
push!(js_detained, j) # mark it as saturated
end
end
if !isempty(js_caught)
timestamp, snap = shot!(timestamp)
for j = js_detained Threads.@spawn subproblemˈs_duty(j, snap, inbox) end
for j = js_caught Threads.@spawn subproblemˈs_duty(j, snap, inbox) end
empty!(js_detained)
empty!(js_caught)
elseif length(js_detained) == J
println("▶▶▶ No more progress can be made, quit!")
return
else
@debug "awaiting js := $(setdiff(1:J, [js_caught; js_detained]))"
end
end
end
function masterˈs_algorithm()
timestamp, inbox = -1, Function[]
timestamp, snap = shot!(timestamp)
sub_tasks = [Threads.@spawn subproblemˈs_duty(j, snap, inbox) for j = 1:J]
wait_until_all_started(sub_tasks)
wait(Threads.@spawn masterˈs_loop(snap, timestamp, inbox))
end
I’ve conducted some tests to prove its effectiveness.
And I believe it is efficient:
For a J = 40 * 255
test case (legitimately large scale), the loggings are like
julia> @time begin
warm_up()
masterˈs_algorithm()
end
...omit lines...
▶ ub = 123114.13741671735 | t = 21, j = 677 | vio = 0.00012887796537341956
▶ ub = 123114.13678854181 | t = 22, j = 403 | vio = 1.0083634816027143e-5
▶ ub = 123114.13677849865 | t = 23, j = 2360 | vio = 3.5200807962709746e-5
▶ ub = 123114.13677849865 | t = 23, j = 1230 | vio = 1.4319802176032681e-5
▶ ub = 123114.13672897799 | t = 24, j = 1096 | vio = 0.0001275890287768533
▶▶▶ No more progress can be made, quit!
1585.385998 seconds (1.84 G allocations: 44.676 GiB, 1.38% gc time, 280 lock conflicts, 30.78% compilation time)
julia> versioninfo()
Julia Version 1.11.6
Commit 9615af0f269 (2025-07-09 12:58 UTC)
Build Info:
Official https://julialang.org/ release
Platform Info:
OS: Linux (x86_64-linux-gnu)
CPU: 256 × AMD EPYC 7763 64-Core Processor
WORD_SIZE: 64
LLVM: libLLVM-16.0.6 (ORCJIT, znver3)
Threads: 255 default, 1 interactive, 127 GC (on 256 virtual cores)
Note that 255
is my Threads.nthreads(:default)
.
Edit: It appears that an even better definition to the function masterˈs_loop(snap, timestamp, inbox)
above exists, therefore I put it here
an alternative definition of master's_loop
function masterˈs_loop(snap, timestamp, inbox)
v, i = fill(0, J), 0
while true
if isempty(inbox) # no event happens
yield()
continue
end # ∀ j, we have j ∈ buffer ∪ inbox ∪ js_at_large
up = false
while true
lens = @lock inbox_lock pop!(inbox)
local j, (cut_off_by_j, ø), con = lens(snap)
ismissing(con) && error("Subproblem $j terminates with $ø")
if cut_off_by_j
up = true
JuMP.add_constraint(model, con)
println("▶ ub = $(snap.ub) | t = $(snap.t), j = $j | vio = $ø")
end
v[i+=1] = j # write the buffer
isempty(inbox) && break
if up && i == LOGIS # LOGICAL PROCESSORS
break # early break to update master model, i.e. snap
end
end
if up
timestamp, snap = shot!(timestamp)
for j = view(v, 1:i) Threads.@spawn subproblemˈs_duty(j, snap, inbox) end
i = 0
elseif i == J
println("▶▶▶ No more progress can be made, quit!")
return
end
end
end
This new version embodies the idea of “batch” updating, in which the size of the batch is in accordance with the number of logical processors.
The key advantage of this new version is that it adds fewer cuts to the master.
An other advantage compared with the older version is that the Logging under the new version reads smoother (as it uses batch update and early break).
The final convergent value is the same.