Hi everyone.
I have 2 threads. First, is reading data from udp. Second, process data. When I use @distributed before loops, I have concurrency violation. This is my code.
using Distributed , Sockets
using ThreadPools
addprocs(10, lazy=false);
function rec_data(port::Integer, to::Sockets.InetAddr, chan::Channel)
sock=UDPSocket()
if bind(sock, Sockets.localhost, port)
Threads.@spawn begin
while true
from, pck = recvfrom(sock)
msg = serialize(pkg)
if msg == :exit
break
else
put!(chan, msg) # send it to the broker
end
end
close(sock)
end
else
println(stderr, "port $port not available")
end
end
function broker(chan::Channel)
Index=0;
while true
msg = take!(chan)
Index=index+1;
if index==10
Index=0;
@distributed for I in 1:10
do_complicated_processing_and_send(data)
end
end
end
end
queuelength = 8000 # or something long enough
chan = Channel{Array{UInt8,1}}(queuelength)
rectask = ThreadPools.@tspawnat 2 rec_data(port, to, chan)
brokertask = ThreadPools.@tspawnat 1 broker(chan)
What is my problem?
Thanks