How to write a master-subproblem iterative algorithm asynchronously?

My question is inspired by an optimization problem, e.g. the famous Benders decomposition. But I’ll introduce it here in an abstract easily-comprehended fashion.

Settings of the problem

:blue_circle: On one hand, there is a master problem keeping hold of a set of cuts (in optimization aka constraints). The status of the master problem is completely determined by that set of cuts. At a certain status, the master problem can produce a corresponding x:

function solve_master(model) # a JuMP Model at some certain status
    x::Vector{Float64} = _some_computation(model)
    return x # a trial solution
end

_some_computation here is solving an LP (linear programming). Therefore it is relatively easy—taking few time to finish.

:blue_circle: On the other hand, there is a vector of (j in 1:J) subproblems. Given a certain j, along with some x solved from solve_master, the jth subproblem can (always) produce a cut for the (master) model:

function solve_subproblem(j::Int, x::Vector{Float64})
    return cut = _some_computing_work(j, x)
end

Although a cut can surely be produced as such, it may fail to strictly enrich the (master) model (i.e. not helpful). If it is not helpful, we would discard it outright. To decide whether a cut is helpful, we would query the x_new = solve_master(model) in which the arg model should ideally be at its latest status. Note that the x that we used to generate the cut could have already become outdated, since solve_subproblem takes certain period of time (in practice, this is an MIP solving work, therefore it takes a relatively long time to finish). Therefore the odds are that model has received some helpful cuts from some j_other which isn’t j.


Now we assume the general case J > 1, say, J = 2.
Since the J subproblems can be solved independently, we can formulate the overall iterative algorithm as

model = _some_initialization()
const J = 2
const my_lock = Threads.ReentrantLock()
while true
    x = solve_master(model) # `model` should be at its present status
    is_globally_success = Threads.Atomic{Bool}(false)
    Threads.@threads for j = 1:J
        local cut = solve_subproblem(j, x)
        # if possible, we had best use `x_new` for the following judge function (but how?)
        # Nonetheless, using the existing `x` is still valid.
        local is_success::Bool = _some_judge(j, cut, x_new) 
        if is_success
            @lock my_lock add_cut!(model, cut) # strictly enrich the (master) `model`
            is_globally_success[] = true
        end
    end
    is_globally_success[] && continue
    break
end

Comment: Although the code above embodies the idea of multithreading and it is theoretically viable to generate the correct result, it lacks an asynchronous spirit.


For simplicity we let J = 2. Then there are 3 entities: Master, subproblem1, subproblem2.

The main work for Master is x = _some_computation(model)
The main work for subproblem1 is is_success_1 = _some_computing_work(1, x)
The main work for subproblem2 is is_success_2 = _some_computing_work(2, x)

(By “main work” I mean that it takes some cpu time to finish.)

I wonder if there is a way in julia, so that I can achieve the follows:

From the standpoint of subproblem1, it restarts its main work (not immediately, but schedule one) whenever an updated x is available. (same for subproblem 2)
From the standpoint of Master, it restarts its main work (not immediately, but schedule one) whenever there is any (i.e. any j in 1:J) update to the model. If there are more than 1 updates, then it takes the latest (i.e. all updates accepted) model as its arg and restarts its main work.

So the philosophy is “event-triggered”, where events include

  • there is a new x generated (and thus is available)
  • A successful update due to subproblem1 is available
  • A successful update due to subproblem2 is available

The termination criterion is simple: At some time upon a new x is generated, neither subproblem1 nor subproblem2 can come up with a successful update. Then we quit the overall algorithm.

You have not yet clarified why the @threads solution is problematic for you. (I would recommend @sync for j=1:J @spawn foo end, though – Threads.@threads is pretty flawed and makes it very hard to understand what’s happening)

Your threads solution waits between new generations of x until both subproblems have completed, either successfully or unsuccessfully.

You cannot do an event-triggered thingy with your 3 events: You MUST also have an event for “subproblem j has completed, no successful update is forthcoming”, otherwise you have no termination criterion.

edit: I think you ninja’ed me with an edit. If subproblem j generates a successful update, then you want to interrupt / discard all other subproblems and immediately compute a new x?

The Threads.@threads for j = 1:J # J = 2 algorithm I wrote above can yield theoretically correct result. I just think it is not cool enough. Because it still follows the traditional sequential programming logic, apart from the multi-threading technique.

Why, I don’t think so. I think its logic is quite clear—just wait all parallel tasks to finish—do a local synchronization.

Yes, this is exactly why I want to improve on it. Maybe the subproblem1 takes 1 second to finish, but the subproblem2 takes 10 second to finish. If subproblem1 returns success, then Master can immediately go back to update x with the updated model. By the way, unsuccessful is actually favorable, as it means that the current x is of good quality—closer to the converging point.

In practice I can set a timer e.g. 1 hour. If no “improving” events occur during this period, I quit forcibly.

If there is any j coming up with a successful update, and Master is triggered by this event, then Master can re-do its main work and generate a new x to in turn trigger all J subproblems.

All updates are useful, even if it was done at an older x.

It sounds like you might be able to get the event-based behavior you are interested in by spawning a few tasks to do your computations and communicating x and j values to those tasks using channels. For a basic tutorial of a calculation of this flavor, see here

2 Likes

Note that this does not necessarily make your algorithm better! This depends a lot on your details. The way you wrote your code, it can discover and apply many independent j improvements on the same x without recomputing. This is awesome for some problems! (it also imposes a pretty stringent requirement on your problem – namely that model updates mostly commute for different j).

So what you’re really trying to do is only parallelize the search for one needle in the haystack?

What is your rough value of J and the expected runtime of _some_computing_work(j, x) and _modify!(model, j) and solve_master(model)?

(rough means: to an error of 100x. That makes a big difference in what approaches can work wrt overhead. A 1 us problem needs quite different handling than a 1s problem, because all constant overheads are magnified by a million)

1 Like

So … you may have several updates from sub-problem 1, compute a new x every time, and trigger sub-problem 2 for each of them? That is, several sub-problem-2 in flight (for different x’es) at the same time?

And you only terminate when all sub-problems have failed to find an improvement? (or possibly after some time if they don’t complete).

This one spawns subproblems when a new x is found. If a subproblem finds an improvement, a new x is computed, and subproblems are spawned for this x. This continues until all subproblems (for all x’es) are finished.

model = _some_initialization()
const J = 2
const my_lock = Threads.ReentrantLock()

subtasks = Set{Task}()
donetasks = Set{Task}()
subdone = Threads.Event(true)  # autoreset
found = true
x = solve_master(model)
while found
    for j in 1:2
        sub = @spawn begin
            local success = compute(j, $x)
            @lock my_lock begin
                 success && modify!(model, j)
                 push!(donetasks, current_task())
            end
            notify(subdone)
            return success
        end
        push!(subtasks, sub)
    end
    found = false
    while !isempty(subtasks) && !found
        wait(subdone)
        @lock my_lock begin
            for t in donetasks
                found |= fetch(t)::Bool
            end
            setdiff!(subtasks, donetasks)
            empty!(donetasks)
            found && (x = solve_master(model))
        end
    end
end
1 Like

Thanks, I’ll have a read.

J should be large to study the large-scale behavior. _modify! is adding a cut, it is very fast. For the rest I’ve revised my 1st post. I take your point—it is problem specific!

No, probably not, since it is relatively expensive to perform solving any of the subproblems (an MIP solve).

Yes. All iterative algorithms seek to make (strict) improvements. If no more improvements can be made, then we quit.

Thank you very much for your replies, I’ll take time to grasp your code.


Sorry for the vague concepts delivered, I’ve revised my first post considerably.

Thanks for updating the problem description!

Is the following a correctness problem?

x1 = solve_master(model)
cut1 = solve_subproblem(1, x1)
@assert _some_judge(1, cut, x1) #passes
cut2 = solve_subproblem(2, x1)
@assert _some_judge(2, cut2, x1) #passes
add_cut!(model, cut1)
x2 = solve_master(model)
@assert _some_judge(2, cut2, x2) #fails!
add_cut!(model, cut2) # we're adding the cut2 even though it's not an enrichment

I.e. you add cut2 even though it is no longer an enrichment because the model was updated in between the check and the modification. To me, this sounds icky, and you should double-check that you are really OK with using outdated judgements to make decisions.

This is unfortunately meaningless.

With “large” you could mean 20 (larger than 2!), 1000 (more than you have cpu cores!), 1e7 (please don’t start a Task for each individual j), or 1e10 (try not to make an array indexed over J), or 1e12 (cannot make an array indexed over J, time for a big cluster). Just give us the ballpark!

Your course of action is correct.

_some_judge(2, cut2, x1) == true technically means “the cut2 can cut off the trial point x1”, where cut off and trial are terms in optimization. In fact, a trial point is merely a hint for the subproblem—guiding them to generate a corresponding cut. As long as the cut is returned by solve_subproblem, regardless of which specific trial point (an older one or a younger one) serving as the input arg (i.e. the hint), the cut is valid for the master model and can be added to model (but we don’t have to because adding cuts will make model heavier—a modest side effect).

In your code, you re-solve_master to get the younger trial point x2, and you find that cut2 fails to cut x2 off. If this happens, we typically can discard cut2 outright. But your choice add_cut!(model, cut2) is not incorrect and have a chance to be meaningful since cut2 is valid after all.

Despite the circumstance of multiple trial points, the termination criterion is always based on the youngest x. Say, at a phase of the algorithm the algorithm generates x_99 (based on the latest status of model). And we iterate over 1:J, finding that none of the subproblems can generate a cut to cut off x_99, thus we quit.

Suppose we have a server with 104 logic processors. I think it is already large-scale for one thread to take care of 100 MIPs. Therefore J = 10000 might be a proper large-scale instance.


One more side note:
A subproblem is assumed to be not hard to complete. e.g. We can revise our physical problem repeatedly until we can make sure that for any j and x, solve_subproblem(j, x) will be finished within 1 second.

I cannot understand this usage stated as follows

Values can be interpolated into @spawn via $ , which copies the value directly into the constructed underlying closure. This allows you to insert the value of a variable, isolating the asynchronous code from changes to the variable’s value in the current task.

I find this behavior

julia> function mutate!(x)
           x .= -1
       end;

julia> for _ = 1
           x = [2, 3]
           tm = Threads.@spawn mutate!($x)
           sleep(0.01)
           tp = Threads.@spawn println(x)
           wait.((tm, tp))
       end
[-1, -1]

julia> for _ = 1
           x = [2, 3]
           tm = Threads.@spawn mutate!(copy(x))
           sleep(0.01)
           tp = Threads.@spawn println(x)
           wait.((tm, tp))
       end
[2, 3]

Seems $x and copy(x) are not alike.

Edit: Related issue Documentation for interpolation into `@spawn` · Issue #36647 · JuliaLang/julia · GitHub

It’s not about mutating x, but reassigning x:

function fun()
    x = [2,3]
    Threads.@spawn (sleep(0.1); x = [1,2])
    println(x)
    sleep(0.2)
    println(x)
    Threads.@spawn (sleep(0.1); $x = [0,1])
    sleep(0.2)
    println(x)
end

julia> fun()
[2, 3]
[1, 2]
[1, 2]

So @spawn ... $x ... is the same as let x=x; @spawn ... x ...; end. Reassignment to x will not affect the outside x when interpolated with $x. Technically, a new variable name is created for x, so reassignment outside of the @spawn will not affect it:

function fun2()
    x = [2,3]
    t = Threads.@spawn (sleep(0.1); println($x))
    x = [1,2]
    println("wait")
    wait(t)

    x = [2,3]
    t = Threads.@spawn (sleep(0.1); println(x))
    x = [1,2]
    println("wait")
    wait(t)
end

julia> fun2()
wait
[2, 3]
wait
[1, 2]


1 Like

Now, this is very different from e.g. fortran and C, where a variable can be thought of as the name of a fixed storage into which data is filled when the variable is assigned to. Reassigning then means to put something else into the storage.

This mental model is wrong for julia. In julia a variable is a name which refers to some object. Reassigning in julia means to let the name refer to some other object. The old object still exists, and if nothing refers to it anymore, it will eventually be buried by the garbage collector.

1 Like

I came to think of it. The concoction I made above, with spawning J submodels for every update to the model may create an inordinate amount of running tasks. Say J == 100 and it turns out that the submodel for j == 10 always completes in a few milliseconds, while the others require half an hour. You may quickly have thousands of tasks running. It won’t overbook your cpu, but it can be a bit chaotic and may result in less than optimal task-scheduling.

This process can be throttled with a Base.Semaphore. A semaphore is a resource counter. It counts down when doing an acquire, but waits if it reaches 0. It counts up when you do a release. Say you only want 50 tasks running at once:

const sem = Base.Semaphore(50)

while something do
    for j in 1:J
         Threads.@spawn begin
             try
                 Base.acquire(sem)
                 ... do work ...
             finally
                 Base.release(sem)
             end
         end
    end
    ... do some work ...
end

It is also possible to acquire before the @spawn, to avoid spawning more at all. Then you’ll be waiting in the j-loop, without being able to ... do some work ... until all the the js are spawned.

It’s safest to acquire/release in a try ... finally construction, since the release then will happen no matter how the work terminates (normal return, error, ctrl-c, or otherwise). There’s also an acquire(sem) do; ...; end which does the release for you.

1 Like

There is an established macro

I guess the local keyword here is redundant, am I right? A macro cannot be defined in a local scope, therefore temp has no chance to hack a same-name quantity somewhere else.

Absolutely. But, the local will be inserted where the macro is used, so it may end up as a local. However, variables created in macros are “sanitized” anyway, that is, made into unique names which can’t be reached by accident. This is done by making variable names which are syntactically incorrect, with var"##something". That is, var"#536#temp" is just a variable name. All of the user’s variables must be enclosed in esc, otherwise they’re prefixed with the module name in which the macro is defined.

 julia> macro acquire(s, expr)
               quote
                   temp = $(esc(s))
                   Base.acquire(temp)
                   try
                       $(esc(expr))
                   finally
                       Base.release(temp)
                   end
               end
           end
@acquire (macro with 1 method)

julia> @macroexpand @acquire a b
quote
    var"#536#temp" = a
    (Main.Base).acquire(var"#536#temp")
    try
        b
    finally
        (Main.Base).release(var"#536#temp")
    end
end

This works also for your own variables:

julia> a = 2
2

julia> var"#b" = 3
3

julia> var"a"
2

julia> var"#b" + a
5
1 Like

Actually, a macro is very close to being an ordinary function, which receives something called an Expr (or Symbol), and returns an Expr, usually made by a quote statement. A macro can call ordinary functions to manipulate the Expr it works on. The returned Expr is then inserted where the macro call is done.

An Expr is a lisp-like data-structure. (It’s said that julia is an abbreviation for Jeff’s Unusual Lisp is Automated). You can create your own Expr with Expr, quote or :( ... ):

julia> dump( :(a + b + c) )
Expr
  head: Symbol call
  args: Array{Any}((4,))
    1: Symbol +
    2: Symbol a
    3: Symbol b
    4: Symbol c

julia> dump(Expr(:call, :+, :a, :b, :c))
Expr
  head: Symbol call
  args: Array{Any}((4,))
    1: Symbol +
    2: Symbol a
    3: Symbol b
    4: Symbol c
1 Like

I’m trying to grasp the autoreset option.

To this end I devise a test which I’m not sure I fully get its point

function test(autoreset::Bool)
    e = Base.Event(autoreset);
    t_vec = Vector{Task}(undef, J) 
    for j = 1:J
        t_vec[j] = Threads.@spawn begin
            sleep(j)
            notify(e) # send
        end
    end
    for _ = 1:J
        wait(e) # receive
        println(istaskdone.(t_vec))
    end
    wait.(t_vec)
    return nothing
end

const J = 3
test(false) # All J lines are seen simultaneously
test(true)  # delays can be noticed

The Event works as “flag”, notify raises the flag. Those waiting (there’s only one here) sees it and continues. The flag remains raised until someone calls reset on the event. However, with autoreset, the flag is lowered immediately when one of the waiters sees it.

By the way, your example would risk hanging with autoreset. If, for some reason, all the spawned tasks finish before the waiting loop starts (due to idiosyncracies in the task scheduling, a garbage collection occurs and things continue in a different order, or … whatever), the waiting loop would wake up only once. The next iteration it would call wait, but nobody will notify, they have all done it already. There is no counting of notifies before the wait is called. It is quite difficult to get such synchronization right. I’m not absolutely sure my example is foolproof, even though I’ve programmed such things for 30 years.

In my original example, it’s possible to remove the notify and wait, and things would still work, but the master loop would spin-wait until there’s something in the donetasks set. So the Event is there mainly to calm down the master loop and not waste a cpu-core by spin-waiting.

1 Like