Writing better / faster / more julia-ish code

Using ReactiveBasics, I am using Signals to model time series events (extension of discussion here). I want to build a signal that gives the median value of the last 5 inputs to another signal. I have developed 2 approaches, and I suspect both are much slower than they need to be. How can I improve?

Approach 1:

ff = Signal(0.0)

macro medWin(sig,win = 5)
    arr = []
    quote
        flatmap($(esc(sig))) do iv
            unshift!($arr, iv)
            length($arr) > $(esc(win)) ? pop!($arr) : false
            median($arr) |> Signal
        end
    end
end

function testmacro()
    @time begin
        f = Signal(0.0)
        g = @medWin(f)
        for i=1:10^5
            push!(f, i)
        end
        f,g
    end
end

testmacro()

Output:

1.243613 seconds (898.99 k allocations: 32.028 MB, 0.70% gc time)

I was concerned that using an array within the global scope to hold values would be expensive, so I tried to find another approach that would use the functions in ReactiveSignal more directly:

function lastX(u::Signal, n ::Int)
    sigs = Array{Signal}(1,n+1)
    sigs[1] = u
    for i=1:n
        sigs[i+1] = previous(sigs[i])
    end
    return zip(sigs[2:end]...)
end

function testfunction()
    @time begin
        f = Signal(0.0)
        g = lastX(f, 5)
        h = flatmap(g) do tup
            v = collect(tup)
            return median(v) |> Signal
        end
        for i=1:10^5
            push!(f, i)
        end
        f,h
    end
end

Output:

 1.755510 seconds (13.00 M allocations: 633.236 MB, 16.01% gc time)

I suspect the answer lies in the ReactiveBasics implementations of previous, subscribe!, and flatmap; would have to look closer. Any insight into why this uses so much memory would be helpful to me learning.

Is my first approach as good as it gets? Or are there more improvements that can be achieved?

    arr = []
    quote
        ...
    end

I’m pretty much sure it’s not what you actually want because arr exists only during macro expand time and it’s starting value - Any[] - is inserted everywhere instead of it’s name. Try running macroexpand(:(@medWin Signal(0))) in Julia REPL to see what is actually generated.

Also running your code on Julia 0.6 gives an error that lastX is not defined.

I did not expect the macro approach to work like this, either; however, I tried this based on this thread. To demonstrate:

ff = Signal(0.0)
gg = @medWin(ff,3)

for i=1:10
    push!(ff, i)
    println("ff is ", ff.value)
    println("gg is ", gg.value)
    println("-----")
end

Gives desired output for gg (median of last 3 values of f):

ff is 1.0
gg is 0.5
-----
ff is 2.0
gg is 1.0
-----
ff is 3.0
gg is 2.0
-----
ff is 4.0
gg is 3.0
-----
ff is 5.0
gg is 4.0
-----
ff is 6.0
gg is 5.0
-----
ff is 7.0
gg is 6.0
-----
ff is 8.0
gg is 7.0
-----
ff is 9.0
gg is 8.0
-----
ff is 10.0
gg is 9.0
-----

And, i forgot to include my lastX function:

function lastX(u::Signal, n ::Int)
    sigs = Array{Signal}(1,n+1)
    sigs[1] = u
    for i=1:n
        sigs[i+1] = previous(sigs[i])
    end
    return zip(sigs[2:end]...)
end

I’m not sure what you are expecting but the difference between the two is that by splicing in the value, you have

a compile time generated (non-threadsafe and non-reentrant) array that are shared through multiple runtime execution of the same macro expansion

Which doesn’t seems to be what you want unless you want multiple invocation at runtime to affect each other or you will never use this macro in a function or a local scope.

Your domonstration doesn’t show the difference between generating the array at compile time and runtime. You should only do this if you really know what it is doing. Trying things and observing something that seems to work without understanding/checking the document is one of the major reason why people write code/packages that breaks easily due to changes in base. (It’s unlikely that this particular behavior will break but it’s a bad habit to do so.)

2 Likes

I’m afraid it’s still not runnable since previous is undefined. Can you provide a complete runnable example?

It appears that not all of the functions here on github have been pushed to the version found in Pkg.add(“ReactiveBasics”). For completeness, I have included all the functions applicable below:

type Signal{T}
   value::T
   callbacks::Vector{Function}   
end

Signal(val) = Signal(val, Function[])

value(u::Signal) = u.value

function subscribe!(f, u::Signal)
    push!(u.callbacks, f)
    u
end

function flatmap(f, input::Signal)
    signal = Signal(f(input.value).value)
    subscribe!(input) do u
        innersig = f(u)
        push!(signal, innersig.value)
        subscribe!(innersig) do v
            push!(signal, v)
        end
    end      
    signal
end

function Base.map(f, u::Signal)
    signal = Signal(f(u.value))
    subscribe!(x -> push!(signal, f(x)), u)
    signal
end
function Base.map(f, u::Signal, v::Signal)
    signal = Signal(f(u.value, v.value))
    subscribe!(x -> push!(signal, f(x, v.value)), u)
    subscribe!(x -> push!(signal, f(u.value, x)), v)
    signal
end
function Base.map(f, u::Signal, v::Signal, w::Signal)
    signal = Signal(f(u.value, v.value, w.value))
    subscribe!(x -> push!(signal, f(x, v.value, w.value)), u)
    subscribe!(x -> push!(signal, f(u.value, x, w.value)), v)
    subscribe!(x -> push!(signal, f(u.value, v.value, x)), w)
    signal
end
function Base.map(f, u::Signal, v::Signal, w::Signal, xs::Signal...)
    us = (u,v,w,xs...)
    signal = Signal(f((u.value for u in us)...))
    for (i,u) in enumerate(us)
        subscribe!(u) do x
            vals = f(((i == j ? x : us[j].value for j in 1:length(us))...)...)
            push!(signal, vals)
        end
    end
    signal
end

function Base.push!(u::Signal, val)
    u.value = val
    foreach(f -> f(val), u.callbacks)
end

function Base.zip(u::Signal, us::Signal...)
    map((args...) -> (args...), u, us...)
end

function previous(input::Signal, default=value(input))
    past = Ref(default)
    map(input) do u
        res = past[]
        past[] = u
        res
    end
end

And my code (note that I have moved “arr” within the quote of the macro, based on comments above):

function lastX(u::Signal, n ::Int)
    sigs = Vector{Signal}(n+1)
    sigs[1] = u
    for i=1:n
        sigs[i+1] = previous(sigs[i])
    end
    return zip(sigs[2:end]...)
end

function testfunction()
    @time begin
        f = Signal(0.0)
        g = lastX(f, 5)
        h = flatmap(g) do tup
            v = collect(tup)
            return median(v) |> Signal
        end
        for i=1:10^5
            push!(f, i)
        end
        f,h
    end
end

macro medWin(sig,win = 5)
    quote
        arr = []
        flatmap($(esc(sig))) do iv
            unshift!(arr, iv)
            length(arr) > $(esc(win)) ? pop!(arr) : false
            median(arr) |> Signal
        end
    end
end

function testmacro()
    @time begin
        f = Signal(0.0)
        g = @medWin(f)
        for i=1:10^5
            push!(f, i)
        end
        f,g
    end
end

testfunction()

testmacro()

A bit of benchmarking (using BenchmarkTools.@benchmark), profiling (using @profile macro) and @time on subexpressions shows that by far the most expensive part of the code is:

for i=1:10^5
    push!(f, i)
end

Which assigns a new value to a signal (fast) and calls its single callback (slow). Without learning the library and diving deep into your code, I don’t know what this (anonymous) callback actually is, but this is exactly your bottleneck. Optimize this function and you’ll optimize the whole code.