Do I need @async?

macros
reactiveprogramming
parallel

#1

I am working on a some simple helper functions to make simple backtesting very easy in julia. I am extending the ReactiveBasics packge. Here is some code for an example using a simple moving average crossover:

using ReactiveBasics

# my helper macros / functions
macro window(sig, wind::Int)
    quote
        arr=[]
        len = ($wind)
        flatmap($sig) do s
            push!(arr,s)
            length(arr) > len ? shift!(arr) : false
            arr |> Signal
        end
    end
end

function smap(f,s1::Signal, s2::Signal)
    flatmap(s1) do v1
        v2 = s2.value
        f(v1,v2) |> Signal
    end
end

function smap(f,s1::Signal, s2::Signal, s3::Signal...)
    flatmap(s1) do v1
        v2 = s2.value
        v3 = map(value, s3)
        f(v1,v2, v3...) |> Signal
    end
end

macro myfoldp(f, v0, sig)
    quote
        arr = []
        flatmap($sig) do s
            push!(arr,s)
            foldl($f, $v0, arr) |> Signal
        end
    end
end

macro myfoldp(f, v0, sig, args...)
    quote
        arr = []
        flatmap($sig) do s
            push!(arr,s)
            func(a,b) = ($f)(a,b,($args)...)
            foldl(func, $v0, arr) |> Signal
        end
    end
end

macro collect(sig)
    quote
        arr = []
        flatmap($sig) do s
            push!(arr,s) |> Signal
        end
    end
end

# setup model with Signals
setup = quote
    # generate some random data with positive edge
    returns = .5 + randn(10000)
    returns = normalize(returns)
    
    #crate return and price signals
    func(a,b) = a*(1+b)
    ret = Signal(0.0)
    price = @myfoldp func 50 ret
    
    # 50 and 200 day moving average signals
    ma50 = map(mean, @window(price,50))
    ma200 = map(mean, @window(price, 200))
    
    # crossover logic
    trend = smap(ma50,ma200) do f,s
        f >= s ? 1 : -1
    end
    
    # store data in arrays
    collecs1 = map(x->(@collect x), [ret, price, ma50, ma200, trend])
end

# initial run to get first complete 200 day window
function train(x=200)
    for i=1:x
        push!(ret, returns[i])
    end
end

# run simulation for 400 days
function runsim(xi=201,xf=600)
    for i=xi:xf
        push!(ret,returns[i])
    end
end

# save results in csv
function store_results(filename)
    c1 = map(value,collecs1)
    ba = foldl(hcat, c1)
    cols = ["ret", "price", "ma50", "ma200", "trend"]
    aba = hcat(cols,transpose(ba))
    naba = permutedims(aba,[2,1])
    writecsv(filename, naba)
end

# quickly setup, train, run, and save results
function run_all(filename, x=200, y=600)
    eval(setup)
    train(x)
    runsim(x+1,y)
    store_results(filename)
end

# run it:
run_all("data.csv")

This all works as expected / desired. My question concerns this bit right here:

    ma50 = map(mean, @window(price,50))
    ma200 = map(mean, @window(price, 200))
    
    trend = smap(ma50,ma200) do f,s
        f >= s ? 1 : -1
    end

For the rest of the code, each map on a signal is chained sequentially. However, these two moving average signals are not chained together, but are both needed to be updated for the trend signal to give the correct value. the smap function updates on ma50, sampling the current value at that time of ma200. However, if the chain from price -> ma50 -> trend were to somehow update more quickly than price -> ma200, then the calculation in the logic of the trend signal would be sampling the wrong value. I am not sure if this is possible; if it is, I want to make sure that I can explicitly prevent this undesired behavior.

In this example case, it would be trivial to write a Signal function that calculated both the 50day and 200 day averages off of the same window; more generally, however, I want to be able to conveniently group calculations together that are needed simultaneously, and potentially run the associated functions in parallel. I thought this might do the trick:

@sync begin
    @async ma50 = map(mean, @window(price,50))
    @async ma200 = map(mean, @window(price, 200))
end
trend = smap(ma50,ma200) do f,s
    f >= s ? 1 : -1
end

However, when I tried this, it ended up that none of these Signals updated properly when updates were pushed to ret. What would be a good, syntactically clean way of ensuring that my Signals are correctly related to each other?


#2

Do you want the two async tasks to interleave computation? If that is the case you will need the two tasks to explicitly yield to each other - something like rand(Bool) && yield() at an appropriate place ?


#3

I am not sure. I was able to write a macro to get the behavior I want, but it doesn’t utilize any of the task/scheduling features of julia at all, and seems like a rather inelegant solution. Perhaps there is a better approach?

macro bundle(f,g...)
  quote
    coll = map(eval,($g)) |> collect
    unshift!(coll, ($f))
    queues,sigs = [],[]
    for c in coll
      push!(queues,[])
      push!(sigs,Signal(c.value))
    end
    function eq()
      len = map(length,queues)
      if len[1]>0 && foldl(*, len / len[1]) == 1
        for (i,q) in enumerate(queues)
          v = pop!(q)
          push!(sigs[i], v)
        end
      end
    end
    for (i,gsig) in enumerate(coll)
      subscribe!(gsig) do s
        push!(queues[i], s)
        eq()
      end
    end
    flatmap(last(sigs)) do u
      out = map(value, sigs[1:end-1])
      push!(out, u)
      Signal(out)
    end
  end
end

Now, using this @bundle macro, we can create a signal that doesn’t update until all the bundled signals themselves update:

f = Signal(1)
g = Signal(-2)
h = @bundle f g       # h.value = [1,-2]

push!(f, 5); h.value  # h.value = [1,-2]
push!(g, -7); h.value # h.value = [5,-7]

Using @sync and @async when mapping the relationships between different signals doesn’t seem to work; I suspect that this is because instead of having asynchronous updates to Signals, it only asynchronously create the Signals. I would probably have to rewrite the map and flatmap functions to incorporate asynchronous structure. I don’t really care about the tasks running asynchronously; what I really want is to make sure that particular tasks happen only after (multiple) previous tasks, in an event-driven way. Is this a good use case for scheduled tasks, or am I barking up the wrong tree?