I’m trying to use ZMQ.jl to use a PUB socket to send out some data. I’m finding that it is very slow, and that it seems at least partially due to Message. If I @benchmark Message("1") I find that it takes a minimum of 3.6 us. That would suggest that simply due to converting to Message I would be limited to sending out about 280k one character messages per second. That seems really slow. I tried it on v0.5.0 my machine and on v0.5.1 on juliabox and saw similar behavior. Am I doing something wrong?
julia> using ZMQ, BenchmarkTools
julia> @time @benchmark Message("1")
14.434919 seconds (7.52 M allocations: 1.834 GB, 69.05% gc time)
BenchmarkTools.Trial:
memory estimate: 3.06 KiB
allocs estimate: 10
--------------
minimum time: 3.640 μs (0.00% GC)
median time: 4.928 μs (0.00% GC)
mean time: 38.892 μs (82.17% GC)
maximum time: 99.467 ms (98.67% GC)
--------------
samples: 10000
evals/sample: 9
time tolerance: 5.00%
memory tolerance: 1.00%
Warning: I’m not one of the ZMQ.jl devs. I may be entirely wrong about everything.
I see the same performance on my machine. ProfileView.jl shows that almost all of the time is spent in the gc_protect_handle function, which, according to the comments, is needed to enable zero-copy ZMQ messages which share data with Julia. It looks like the idea is that, rather than copying the data into the message, ZMQ.jl shares the data between Julia and the message, and this gc_protect scheme ensures that the data isn’t garbage-collected until ZMQ is done with it.
But I think that in the case of a tiny message like "1", a copy might be more efficient. This would require changing the constructor of Message in ZMQ.jl to do something like:
if the length of the data is less than some threshold, then:
instead of calling Message(origin, ::Ptr{T}, len)…
call the constructor Message(len::Integer) where len is the number of bytes in the Julia message data
get the pointer to the zmq message data by calling the zmq_msg_data C function
copy the Julia message data to that pointer
otherwise use the current shared-memory behavior
This is probably not too hard to do, and I would encourage you to try it. This could be a great way to make your life better and contribute to Julia. I’m happy to clarify any of my above statements if they’re not clear.
Also, it’s probably worth posting an issue over at ZMQ.jl before diving too deeply into this, since, as I said, I’m not one of the devs and I could be totally wrong.
Thanks for the response. It looks like setindex! is defined on Message, but it doesn’t work because unsafe_store is now unsafe_store!. So that seems to handle the last two points you had. Below I’ve pasted my progress towards a modification. It still needs some work, but I think its close. The timing is now about 140 ns per message (FWIW pyzmq clocked in at about 800 ns per message). But I think I’m copying the data twice (once into the IOBuffer, then once into the message). I’m also accessing fields of the IOBuffer directly, which seems like a bad idea. Any more tips?
using ZMQ, BenchmarkTools
function Base.setindex!(a::Message, v, i::Integer)
if i < 1 || i > length(a)
throw(BoundsError())
end
unsafe_store!(pointer(a), v, i)
end
function message(s)
b = IOBuffer()
write(b,s)
m = Message(b.size)
m[:]=b.data
m
end
@benchmark message("1")
It seems there is another bottleneck in ZMQ.jl for small message. If you google “zmq throughput” you’ll see a variety of sources reporting throughputs of a few (2-7) million messages per second for small message. For example, here they show throughput of about 2.5 million 1 byte messages per second. The test is on an older version of ZMQ, and not particularly well reported, but it’s consistent with (actually on the slow side) other sources I’ve found.
Given that, I thought I’d try benchmarking and profiling send. It looks like get_events is the culprit. So I made a version of the send function as follows:
function sendfast(socket::Socket, zmsg::Message, SNDMORE::Bool=false)
while true
rc = ccall((:zmq_msg_send, zmq), Cint, (Ptr{Message}, Ptr{Void}, Cint),
&zmsg, socket.data, (ZMQ_SNDMORE*SNDMORE) | ZMQ_DONTWAIT)
if rc == -1
zmq_errno() == EAGAIN || throw(StateError(jl_zmq_error_str()))
while (get_events(socket) & POLLOUT) == 0
wait(socket)
end
else
#get_events(socket) != 0 && notify(socket)
notify(socket)
break
end
end
end
I get 1.5us/message with send and 0.1 us/message with sendfast.
The removed call to get_events is a call to zmq_getsockopt with the ZMQ.EVENTS flag, which reports if a message can be sent to the socket without blocking.
I’m not sure what the best way to modify send is, but it seems like the equivalent of sendfast should be easily available or be the default. The tests pass if I replace send with sendfast, but I’m not sure the tests really cover this situation. I guess the best thing to do would be to create a testcast with desired behavior that fails with sendfast, any ideas?