ZMQ and Distributed

I have a problem using ZMQ with Distributed on another worker. This is my first try with Distributed package.

I am working on a project, where I receive data from a measurement device using the ZMQ protocol.
Every second I get a message with the data. I want to collect e.g. 30 seconds. before I do some post-processing. Since the messages keep coming in I wanted to use a separate process that fills the buffer and do the post-processing on the main process. In principle I could do this with @async, but I want to spread the computational load since otherwise I can’t do this in real-time.

using Distributed
addprocs(1)
@everywhere using Pkg
@everywhere Pkg.activate(".")
@everywhere using ZMQ
@everywhere using Dates   # for the time stamp
@everywhere include("./test/distributed_zmq.jl")  # Contains the functions I later use
@everywhere sock, ctx = init_zmq_client(url = source_url)

This seems to work fine.
I then start the function that fills the buffer on the second process

s = @spawnat 2 zmq_fill_buffer(sock; buffer_len = 10)

As expected this gives me the Future for this process.
Future(2, 1, 17, ReentrantLock(nothing, Base.GenericCondition{Base.Threads.SpinLock}(Base.InvasiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(0)), 0), nothing)

When I want to fetch the data I get an error:

On worker 2:
StateError("Socket operation on non-socket")
Stacktrace:
  [1] _send
    @ ~/.julia/packages/ZMQ/R3wSD/src/comm.jl:14
  [2] #send#13
    @ ~/.julia/packages/ZMQ/R3wSD/src/comm.jl:45
  [3] send
    @ ~/.julia/packages/ZMQ/R3wSD/src/comm.jl:43 [inlined]
  [4] read_zmq_meta_data
    @ ~/git/02-pipesense/PipeSense.jl/test/distributed_zmq.jl:34
  [5] #zmq_fill_buffer#4
    @ ~/git/02-pipesense/PipeSense.jl/test/distributed_zmq.jl:69
  [6] #13
    @ /opt/julia/julia-1.7.1/share/julia/stdlib/v1.7/Distributed/src/macros.jl:83
  [7] #103
    @ /opt/julia/julia-1.7.1/share/julia/stdlib/v1.7/Distributed/src/process_messages.jl:274
  [8] run_work_thunk
    @ /opt/julia/julia-1.7.1/share/julia/stdlib/v1.7/Distributed/src/process_messages.jl:63
  [9] run_work_thunk
    @ /opt/julia/julia-1.7.1/share/julia/stdlib/v1.7/Distributed/src/process_messages.jl:72
 [10] #96
    @ ./task.jl:423

On line 34 I send the request to get the data:

ZMQ.send(sock, Float64(0.0))

Can I make the Socket available to a different process?
If yes, what is the problem?
If not, is there a better way of filling the buffer?

It seems somehow similar to:

Thanks for any help,
Martin

Didn’t look into this, but what does zmq_fill_buffer return?