Parallel worlds ages don't match

Im currently developing the multithreading in Simulate.jl. Now – with the current development version – I create a master clock and setup three parallel clocks:

using Simulate
clk = Clock()
multiply!(clk)

got 3 threads parallel to master!

I then create a test variable and an event with an anonymous function to increment it:

a = 1
ev = Simulate.SimEvent(SF(()->global a+=1),Main,1.0,0.0)
Simulate.SimEvent{SimFunction}(SimFunction(Main, var"#7#8"(), nothing, nothing), Main, 1.0, 0.0)

I can execute this and it does, what it should do. If I send this event over the event channel to the 1st parallel clock (on thread 2) and tell it to run, I get an exception telling me that it doesn’t know my newly created method var"#7#8":

put!(clk.ac[1].ch, Simulate.Register(ev))   # I register the event to the clock on thread 2
take!(clk.ac[1].ch)                         # I get the response
put!(clk.ac[1].ch, Simulate.Run(10.0))      # I tell the clock to run
Simulate.Run(10.0)
clock 2 exception: MethodError(var"#7#8"(), (), 0x000000000000689e)

If I query the task on thread 2, I get the following funny message:

clk.ac[1].ref[]
Task (failed) @0x000000010de02f50
MethodError: no method matching (::var"#7#8")()
The applicable method may be too new: running in world age 26782, while current world is 26783.
Closest candidates are:
  #7() at In[3]:2 (method too new to be called from this world context.)

If I change the sequence, define first my method and then multiply to parallel threads, everything works fine:

clk = Clock();
a = 1
ev = Simulate.SimEvent(SF(()->global a+=1),Main,1.0,0.0)
multiply!(clk)   # now I start everything on the other threads
put!(clk.ac[1].ch, Simulate.Register(ev))
take!(clk.ac[1].ch)
put!(clk.ac[1].ch, Simulate.Run(10.0))
take!(clk.ac[1].ch)
got 3 threads parallel to master!
Simulate.Response(10.0)

I query the task as before:

clk.ac[1].ref[]

Task (runnable) @0x000000013adc0fd0

Should the above failure happen? Is there a way to sync the “parallel worlds” on different threads to my world (on thread 1) in order to exchange newly created functions between threads?

Funny problem, isn’t it? :wink:

1 Like

after some thinking about this problem, I guess each task get’s its own “world” when it starts and keeps it for its lifetime. Is that so?

In this case I would have to start new tasks on each thread to get the parallel worlds synced. I don’t like that solution since it is expensive. Does anybody know how to start in Julia tasks on each threads without @spawning multiple ones and filtering out the hits?

Or is there an alternative way to do it?

I’m a little but of a novice in this parallel computing situation. But! I got this error once. I fixed it by making sure that all workers running the code had the same package state.

One trick I’ve found for doing this is to put a run(scp -r ... ...) statement in my code to remotely copy the latest version to the client address.

Not sure if that’ll help you

The analog in tasks and multithreading would be to copy over the code and to evaluate it at execution time within the task. It works, but:

julia> using BenchmarkTools

julia> ev = Simulate.SimEvent(SF(()->global a+=1), Main, 1.0,0.0)
Simulate.SimEvent{SimFunction}(SimFunction(Main, var"#28#29"(), nothing, nothing), Main, 1.0, 0.0)

julia> @btime Simulate.evExec(ev.ex, ev.scope)
  114.546 ns (1 allocation: 16 bytes)
500503

julia> ev = Simulate.SimEvent(:(global a+=1), Main, 1.0,0.0)
Simulate.SimEvent{Expr}(:(global a += 1), Main, 1.0, 0.0)

julia> @btime Simulate.evExec(ev.ex, ev.scope)
┌ Warning: Evaluating expressions is slow, use `SimFunction` instead
└ @ Simulate ~/.julia/dev/Simulate/src/events.jl:35
  84.708 μs (45 allocations: 2.70 KiB)
54949

Alternatively I can use only predefined methods. But this is a restriction excluding to pass anonymous functions and macros to tasks which have been started before.

I think you can wrap the code that may receive functions with Base.invokelatest. Something like

new_function = take!(channel)
Base.invokelatest(new_function) do new_function
    ... use new_function ...
end

There is a similar unsolved problem with the Logging module: https://github.com/JuliaLang/julia/issues/33865

2 Likes

edit [1]: now that works fine, thank you very much.

I included a try/catch block in my sfExec-function:

    try
        ....
    catch exc
        if exc isa MethodError
            if x.kw === nothing
                return x.arg === nothing ? invokelatest(x.efun) : invokelatest(x.efun, arg...)
            else
                return x.arg === nothing ? invokelatest(x.efun; kw...) : invokelatest(x.efun, arg...; kw...)
            end
        end
    end

[1] There was still an error in my first fix

1 Like