I have a problem using ZMQ with Distributed on another worker. This is my first try with Distributed package.
I am working on a project, where I receive data from a measurement device using the ZMQ protocol.
Every second I get a message with the data. I want to collect e.g. 30 seconds. before I do some post-processing. Since the messages keep coming in I wanted to use a separate process that fills the buffer and do the post-processing on the main process. In principle I could do this with @async, but I want to spread the computational load since otherwise I can’t do this in real-time.
using Distributed
addprocs(1)
@everywhere using Pkg
@everywhere Pkg.activate(".")
@everywhere using ZMQ
@everywhere using Dates # for the time stamp
@everywhere include("./test/distributed_zmq.jl") # Contains the functions I later use
@everywhere sock, ctx = init_zmq_client(url = source_url)
This seems to work fine.
I then start the function that fills the buffer on the second process
s = @spawnat 2 zmq_fill_buffer(sock; buffer_len = 10)
As expected this gives me the Future
for this process.
Future(2, 1, 17, ReentrantLock(nothing, Base.GenericCondition{Base.Threads.SpinLock}(Base.InvasiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(0)), 0), nothing)
When I want to fetch the data I get an error:
On worker 2:
StateError("Socket operation on non-socket")
Stacktrace:
[1] _send
@ ~/.julia/packages/ZMQ/R3wSD/src/comm.jl:14
[2] #send#13
@ ~/.julia/packages/ZMQ/R3wSD/src/comm.jl:45
[3] send
@ ~/.julia/packages/ZMQ/R3wSD/src/comm.jl:43 [inlined]
[4] read_zmq_meta_data
@ ~/git/02-pipesense/PipeSense.jl/test/distributed_zmq.jl:34
[5] #zmq_fill_buffer#4
@ ~/git/02-pipesense/PipeSense.jl/test/distributed_zmq.jl:69
[6] #13
@ /opt/julia/julia-1.7.1/share/julia/stdlib/v1.7/Distributed/src/macros.jl:83
[7] #103
@ /opt/julia/julia-1.7.1/share/julia/stdlib/v1.7/Distributed/src/process_messages.jl:274
[8] run_work_thunk
@ /opt/julia/julia-1.7.1/share/julia/stdlib/v1.7/Distributed/src/process_messages.jl:63
[9] run_work_thunk
@ /opt/julia/julia-1.7.1/share/julia/stdlib/v1.7/Distributed/src/process_messages.jl:72
[10] #96
@ ./task.jl:423
On line 34 I send the request to get the data:
ZMQ.send(sock, Float64(0.0))
Can I make the Socket available to a different process?
If yes, what is the problem?
If not, is there a better way of filling the buffer?
It seems somehow similar to:
Thanks for any help,
Martin