[ANN] NATS.jl

Excellent! Thank you so much! I’m working with a large control system manufacturer that seems to be moving toward NATS for their edge devices, which means this package could very well end up being quite important for JuliaControl applications.

Are they using NATS client on those edge devices or communicating over MQTT using NATS as message broker? For MQTT we need JetStream enabled and probably some specific MQTT API in NATS.jl

Some docs I found:

Right, excellent point. From what I understand about the system’s configuration is that while the MQTT protocol is used inside the device network, only QoS0 is used, which makes it compatible with core NATS. Moreover, the python test code they gave me seems to only use basic NATS (no JetStream):

from pynats import NATSClient
from statistics import mean
import json

# Subscribe and print the MPU load

with NATSClient(url=NATS_CONNECTION_STRING) as client:
    samples = 10
    values = []
    deviceID = ""

    client.connect()
    
    def mpu_topic_callback(msg):
        payload = json.loads(msg.payload)
        if payload['success']:
            print(f"MPU Load is {payload['value']:.2f}")
            values.append(payload['value'])
            deviceID = payload['deviceID']
        else:
            print("ERROR: was not successful in retrieving a value")
    

    client.subscribe(subject='devicehub.alias.MPU_load_tag', callback=mpu_topic_callback)
    client.wait(count=samples)
    
    print("done!")
    print(f"The mean MPU value is {mean(values)}")
    calc_result = {
        "deviceID": deviceID,
        "value": mean(values)
    }
    client.publish(subject="mycompany.myapp.calculations.MPULoad.Mean", payload=json.dumps(calc_result))

Yes, for QoS0 it seems to be correct to use core NATS but some messages might be lost, the only thing guarantee is that you don’t get message more than once.

@RGonTheNoble, this pattern won’t work well in NATS.jl unless you use thread locks or Channel instead array, python has no multithreading so it may work there.

See PR I made today: sync subscription next by jakubwro · Pull Request #21 · jakubwro/NATS.jl · GitHub

The code you posted becomes:

using NATS, Statistics, JSON3
N = 10
nc = NATS.connect()
sub = subscribe(nc, "devicehub.alias.MPU_load_tag")
unsubscribe(nc, sub; max_msgs = N)
msgs = next(JSON3.Object, nc, sub, N)
failures = filter(m -> !m.success, msgs)
isempty(failures) || @warn "$(length(failures)) failures"
samples = map(m -> m.value, filter(m -> m.success, msgs))
publish(nc, "mycompany.myapp.calculations.MPULoad.Mean", "$(mean(samples))")

Right. That was a simple toy example, but what I’m really going for is a pattern where I have a TimeSeriesCollector (which already uses locks), that collects messages over a predefined period of time, and when the time is exceeded, that chunk is sent to whatever multivariate timeseries algorithm I want to implement that runs on a Dict{TimeSeries{T}} where T.

using TimeRecords
using Dates
import TimeRecords.findbounds

@kwdef struct TimeSeriesCollector{T}
    interval :: Second
    lock  :: ReentrantLock = ReentrantLock()
    data  :: Dict{String, TimeSeries{T}} = Dict{String, TimeSeries{T}}()
    timer :: Base.RefValue{DateTime}  = Ref(floor(now(UTC), interval))
end


function collector_callback!(dict_callback::Function, collector::TimeSeriesCollector, tagrecord::Pair{<:String, <:TimeRecord})
    (tag, rec) = tagrecord
    rectime = datetime(rec)

    #Execute callback if the time has been exceeded
    if rectime > collector.timer[]
        (snapshot, time_window) = lock(collector.lock) do
            #Record the time window and set the timer
            time_window = collector.timer[] .- (collector.interval, Second(0))
            settimer!(collector, rectime)

            #Obtain a snapshot
            snapshot = copy_and_clear!(collector)
            
            #Record the new value to the collector
            record!(collector, tagrecord)
            return (snapshot, time_window)
        end

        #Apply the callback to the data snapshot and time window
        return dict_callback(snapshot, time_window)

    else #Simply record the new data and move on
        lock(collector.lock) do
            record!(collector, tagrecord)
        end
        return nothing
    end
end

function copy_and_clear!(collector::TimeSeriesCollector)
    snapshot = deepcopy(collector.data)
    for v in values(collector.data)
        keeplatest!(v)
    end
    return snapshot
end

function record!(collector::TimeSeriesCollector{T}, tagrecord::Pair{<:AbstractString, <:TimeRecord}; warn_mismatch=false) where T
    (tag, rec) = tagrecord
    ts = get!(collector.data, tag) do
        if warn_mismatch
            @warn "Following tag '"*tag*"' does not exist in registry, creating new series"
        end
        TimeSeries{T}(TimeRecord{T}[])
    end
    push!(ts, rec)
    return nothing
end

So I guess the workflow to have this algorithm applied to a collection of multiple tags and run in the background indefinitely would go something like this?

#alg_callback(d::Dict{TimeSeries}, dt::TimeInterval) is some algorithm I want to use

const CLIENT = NATS.connect()
const COLLECTOR = TimeSeriesCollector(interval=Second(60)

function main_callback(msg)
    obj = JSON3.read(msg)
    rec = TimeRecord(obj.timestamp, obj.value)
    tag = obj.tagname
    response = collector_callback!(alg_callback, COLLECTOR, tag=>rec)
    if !isnothing(response)
         publish(CLIENT, "mycompany.myapp.calculations.my_algorithm", response)
    end
end

sub1 = subscribe(main_callback, CLIENT, "devicehub.alias.tag1")
sub2 = subscribe(main_callback, CLIENT, "devicehub.alias.tag2")

I think you might want here a single subscription if your subjects are well structured on the server:

sub = subscribe(main_callback, CLIENT, "devicehub.alias.*")

Then you can forget about locks and callbacks and iterate subscription with next.

obj = JSON3.read(msg)

This not going to work, you might want obj = JSON3.read(payload(msg)), but you can skip all this and just make your callback as:

function main_callback(msg::JSON3.Object)
    ...
end

NATS.jl will convert object for you. You can go even further, define your own transport struct and define conversion methods for this struct as I did for JSON3.Object. Remember to add them before you create connection to avoid world age problem.

function Base.show(io::IO, ::MIME_PAYLOAD, payload::JSON3.Object)
    JSON3.write(io, payload)
    nothing
end

function convert(::Type{JSON3.Object}, msg::NATS.Msg)
    JSON3.read(@view msg.payload[(begin + msg.headers_length):end])
end
    response = collector_callback!(alg_callback, COLLECTOR, tag=>rec)
    if !isnothing(response)
         publish(CLIENT, "mycompany.myapp.calculations.my_algorithm", response)
    end

Not sure what is return type of collecter_callback!, there seem to be a bug (multiple returns, no condition), but I guess you cannot just simply publish it unless it is String or JSON3.Object, but you can always implement show for NATS.MIME_PAYLOAD and your object type to make it work.

pattern where I have a TimeSeriesCollector (which already uses locks), that collects messages over a predefined period of time, and when the time is exceeded, that chunk is sent to whatever multivariate timeseries algorithm

This sounds like a long running computation, doing it in callback (or main loop if you use next) may lag whole processing pipeline. You might want to delegate it to another task created with Threads.@spawn. Also remember to run Julia with at least 2 threads, otherwise everything will be scheduled on a single thread anyway.

Thanks a lot, this is all extremely helpful! (specially the main_callback(msg::JSON3.Object) tidbit) By the way, I did plan on having the algorithm return a JSON string or something that could be serialized as one. I haven’t figured out which level I want to do the translation yet.

From your previous comments, I have a couple more questions:

(1) I can’t guarantee that the subjects will be well structured. In my experience with industrial data systems, they’re generally not well structured at all, mostly because there is no single universal hierarchy (there’s usually at least three). I would need software a pattern where I can pick and choose from a list of tags that I can subscribe to and apply the same function to. I can deal with locks if it comes down to it. If that’s the case, would the multiple subscription model below still work?

sub1 = subscribe(main_callback, CLIENT, "devicehub.alias.tag1")
sub2 = subscribe(main_callback, CLIENT, "devicehub.alias.tag2")

(2) If I do end up having to use the multiple subscription pattern, I should probably still use Threads.@spawn for the entire eval/publish block right? This way it won’t get stuck if it hits rare invocation of collector_callback! that has to evaluate the algorithm.

function main_callback(obj::JSON3.Object)
    rec = TimeRecord(obj.timestamp, obj.value)
    tag = obj.tagname
    Threads.@spawn begin
        collector_callback!(alg_callback, COLLECTOR, tag=>rec)
        response = fetch(callback_task)
        if !isnothing(response)
            publish(CLIENT, "mycompany.myapp.calculations.my_algorithm", response)
        end
    end
end

sure, it should work, but you can also workaround this by aggregating subscriptions into one stream and do not bother with multithreading at this level, you can parallelise your algorithm in isolation from messaging.

BUFFER_LENGTH = 1000
ch = Channel{JSON3.Object}(BUFFER_LENGTH)
sub1 = subscribe(CLIENT, "devicehub.alias.tag1") do msg::JSON3.Object
     put!(ch, msg)
end
sub2 = subscribe(CLIENT, "devicehub.alias.tag2") do msg::JSON3.Object
    put!(ch, msg)
end
while isopen(ch)
    json = take!(ch)
    # do logic here
end

To break the loop unsubscribe subs and close the channel.

You can do it, even stats from spawned tasks are accounted for subscription from which handler you will spawn it. Another option is to add spawn = true kw option to `subscribe, it will spawn each message handler in separate task. It was designed for long running tasks IO bound. In your mixed case you can get better performance spawning manually.

1 Like