UDP receive and send with julia

hi. i want to write a program that receive a lot of data with Udp. next i do some operations on data in a function. finally, i send my results to another udp. these operations must be done while i receive data. can i do these work in julia?
the sample code is here.
using Sockets
function recdata()
@async begin
sock=UDPSocket()
bind(sock,ip"x.x.x.x",port)
while true
mydata=reinterpret(Int8,recv(sock))
func1(mydata)
end
close(sock)
end
end
function func1(data)
####some operation that take maximum 1 second
senddata(output);
end
function senddata(output)
data=reinterpret(Int8,output)
soc=UDPSocket()
send(soc, ip"y.y.y.y",port2,data)
sleep(0.2)
end

i think, in this method i have lost data. i can do this work with threads in other programs like c and java and etc. commonly, i defined several threads for receiving and processing and sending data.
can i do similar work in julia? i receive best performance with julia in processing time and need use julia.

I reformatted your code using triple backticks (see Please read: make it easier to help you).

Normally you will use some serialization (serialize and deserialize) to transmit data via an UDP socket (See also GitHub - JuliaIO/JSON.jl: JSON parsing and printing). Can reinterpret work in your case? Transmission over UDP is done in UINT8.

1 Like

I have an application which reads UDP data, processes it and displays it. I don’t know if that’s similar to your case, but if it is then the following might be useful:

  • I separated the udp receive, processing and display into separate tasks running on different threads using ThreadPools.@tspawnat. For some reason Threads.@spawn assigns tasks to random threads and they can end up all running on the same one.

  • I used Channel() for a blocking, thread-safe queue to communicate between tasks. This allows the receive to carry on if, e.g. the processing operates on larger blocks and could cause packets to be dropped.

  • I found I had to increase the UDP buffer size as the default kept dropping packets. You may not have to do that depending on data rate, etc. - mine runs on a Windows laptop and receives around 65 Mbps. I don’t think UDPSocket() allows you to change the buffer size directly but there’s a dicussion on how to do it at Set UDPSocket reciever buffer size? - #4 by ufechner7

Thanks a lot. Can you explain tasks with an example.

There’s lots in the documentation about tasks, threads, etc. I’m only a beginner so there may well be better ways to do all this, but (making sure you’re running Julia with multiple threads) try

using ThreadPools

function f1()
    println("f1 in thread ", Threads.threadid())
end

task = ThreadPools.@tspawnat 2 f1()

Sorry that I didn’t notice it in my first response: you implemented a sequential server task. If you get more UDP messages than your task can handle, you will loose packages, because your recdata task is not responsive.

The right strategy then is to switch to a parallel server: You @spawn a parallel server task each time you receive a message and thus keep the rec_data task responsive to new messages. The pattern is as follows:

using Sockets

function rec_data(port::Integer, to::Sockets.InetAddr)
    sock=UDPSocket()
    if bind(sock, Sockets.localhost, port)
        Threads.@spawn begin
            while true
                from, pck = recvfrom(sock)
                msg = deserialize(pck)
                if msg == :exit
                    break
                else
                    Threads.@spawn par_serve(from, to, msg)
                end
            end
            close(sock)
        end
    else
        println(stderr, "port $port not available")
    end
end

function par_serve(from, to, msg)
    # do something with msg and from
    output = some_op_taking_time(from, msg)
    outpck = serialize(output)
    sock = UDPSocket()
    send(sock, to.host, to.port, outpck)
    close(sock)
end

This should be sufficient for most situations. If it is not, you must increase the bandwidth of your UDP connections and serve on more ports.

Please notice that this may create a new bottleneck in your application, namely in the server you send your output to. You must handle that in a similar way or perhaps you can use a Channel for that as mentioned above.

Probably a minor point, but Threads.@spawn didn’t work very well for me as I couldn’t find any way of controlling which thread the tasks ran on. If the receive task was scheduled on the same thread as the processing/display, which hapened quite often, it ended up dropping packets. It may well be that my understanding is lacking but, as I mentioned above, I used ThreadPools instead and that seems to work OK.

Thanks a lot.
My receive packet rate is very high. It’s almost. 60Mbps. I have to receive data then process them. My processes are like as multiplication and etc. My process time is one second then I must send 50Mbps data in other udp network.

Yes, I can imagine circumstances where having the receiving task on a dedicated thread may be necessary. The Julia scheduler is not preemptive and thus a serving task running o the same thread as the receiving task may block the latter for some time. It depends mainly on the service time and the rate of incoming messages. I wonder if we could do some measurements on that.

Is this your maximum service time? If yes, your real computation/service times should be much lower.

I confess I’ve got a bit lost now. In the code above, each packet seems to be processed and sent separately. If the processing of each packet takes around 1 s I don’t see how you’ll be able to keep up with the 60 Mbps input unless you’ve got a lot of parallel computing power available or the packets are extremely large. This isn’t really my area of expertise so I’m probably missing something.

1 Like

I must explain clearly. I wrote processing and send function. But I didn’t add receiving function. I made rand and etc for making input. My input sizes are 2 matrices that have a lot of complex values. I process them and then send result. My main question is adding receiving function that add data to circular buffer. When size queue is specific value like as 10000 complex number. I must start processing them. When Each 10000 complex number added circular queue , I must start process and send function. My input package are extremely large. Can I add large packet in circular buffer and process them when the capacity of buffer is 10000?
We have receive( msg, nbytes) method in vc++ for reading n bytes on network. Is there similar method in julia for reading n bytes in julia?

Please provide an MWE as described here since I’m lost with your explanation too.

n is given by the UDP package size received with pck = recv(sock).

Yes, a MWE would be helpful but I’m guessing that you might want a structure a bit like the following (modifying @pbayer’s code a bit). It’s close to what I do but I’m a beginner and there may be better ways of doing, it in which case I’d love to know how to improve it.

using Sockets
using ThreadPools

function rec_data(port::Integer, to::Sockets.InetAddr, chan::Channel)
    sock=UDPSocket()
    if bind(sock, Sockets.localhost, port)
        Threads.@spawn begin
            while true
                from, pck = recvfrom(sock)
                put!(chan, pck
                if msg == :exit
                    break
                else
                    Threads.@spawn par_serve(from, to, msg)
                end
            end
            close(sock)
        end
    else
        println(stderr, "port $port not available")
    end
end

function process(chan::Channel)
    while some_condition
        packet = take!(chan)
        # convert packet contents to complex numbers or whatever
        # put data into suitable data structure (circular buffer/array/etc)
        if enough_data
            do_complicated_processing()
            send_data()
        end
    end
end

queuelength = 8000 # or something long enough
chan = Channel{Array{UInt8,1}}(queuelength)
rectask = ThreadPools.@tspawnat 2 rec_data(port, to, chan)
proctask = ThreadPools.@tspawnat 3 process(chan)

Yes, I can imagine circumstances where having the receiving task on a dedicated thread may be necessary. The Julia scheduler is not preemptive and thus a serving task running o the same thread as the receiving task may block the latter for some time. It depends mainly on the service time and the rate of incoming messages. I wonder if we could do some measurements on that.

Some means of stopping a high-priority task from being blocked would help my application. At the moment my processing is fine on a single thread but in the next stage of development I’ll want to parallelise it.

With your proposal above you are halfway to parallelization. You still have a bottleneck, namely in the do_complicated_processing(), which is done serially in your single processing task.

But you can change that to a broker task like the following:

using Sockets
using ThreadPools

function rec_data(port::Integer, to::Sockets.InetAddr, chan::Channel)
    sock=UDPSocket()
    if bind(sock, Sockets.localhost, port)
        Threads.@spawn begin
            while true
                from, pck = recvfrom(sock)
                msg = serialize(pkg)
                if msg == :exit
                    break
                else
                    put!(chan, msg)  # send it to the broker
                end
            end
            close(sock)
        end
    else
        println(stderr, "port $port not available")
    end
end

function broker(chan::Channel)
    while some_condition
        msg = take!(chan)
        # convert message contents to complex numbers or whatever
        # put data into suitable data structure (circular buffer/array/etc)
        if enough(data)
            ThreadPools.@tspawnat rand(4:Threads.nthreads()) begin   
                do_complicated_processing_and_send(data)
            end
        end
    end
end

queuelength = 8000 # or something long enough
chan = Channel{Array{UInt8,1}}(queuelength)
rectask = ThreadPools.@tspawnat 2 rec_data(port, to, chan)
brokertask = ThreadPools.@tspawnat 3 broker(chan)

The broker task is responsive and starts a parallel task on some thread >= 4 if enough data is available.

I think this is not possible at the moment, but the outlined strategy should do. Notice that a task yields to the scheduler (blocks) when it does a recv or take!. The problem is that it gets not scheduled again until the subsequent task on that thread has finished its work or yields. Therefore it makes sense to keep at least the rec_data task on a separate thread to stay responsive to incoming UDP messages.

I further think that the system would benefit if the data chunks going to the complicated processing don’t get too big. If the processing is shorter and done more often, the system will parallelize better.

1 Like

Thanks, it sounds as though I’m not too far off the right lines and hopefully it will cover the OP’s case as well. Parallelising the processing obviously means the output ordering can’t be guaranteed so, depending on the application, some marshalling might be needed before sending.
My instincts from (limited) experience in parallel processing in C++ would be to have a number of fixed processing tasks and distribute the data vis queues of some sort. Does @tspwanat have sufficiently low overhead to make that unnecessary?

Thanks a lot.
When I use rec_ data function. I see Port not available.
I have another program that send data. But my program shows port 2000 not avaliable all time.

This means that port 2000 is already used by another socket and bind(sock, Sockets.localhost, 2000) returned false.

If you have the socket variable (bound to 2000) available, you can

  • close it to allow for a new socket to bind to that port or
  • modify rec_data accordingly to receive over that socket.

If you bound a socket unwittingly to that port, you can close your Julia session to free that port again.

Or (if that port is used by another program) you must use another port.