SharedArrays in function with @distributed

Hi my friends, I have a function with @thread . In this threads function, I use @distribute to speedup calculations loops. But my result is incorrect when I use function. My result is correct without function.

Hi, it’s hard to say without having a MWE, but already from what you say there seems to be a misunderstanding. Either you use Threads.@threads for ... or @distributed for ..., not both mixed together. If you can show an example of where things go wrong it is easier to help!

It is my sample’s code. Suf is sharedArray and @distributed for doing fast. function readudp read 1280 byte data from port 2000 and function broker process data. Suf result is valid when I don’t use function(thread) but it’s invalid with thread’s function.

using Distributed, SharedArrays, Statistics, FFTW, ThreadPools
addprocs(8)
@everywhere using SharedArrays, FFTW, Statistics
Suf=convert(SharedArray, zeros(16000,320))
function readudp(chan::Channel)
udp2=UDPSocket()
bind(udp2, ip"0.0.0.0",2000)
while true
from ,pck=recvfrom(udp2)
put!(pck)
end 
close(udp2)
end

function broker(chan::Channel)
index=0;
while true
y=take!(chan)
cvt[:, (index+1):(index+4)]=reshape(y,(320,4))
index=index+4;
If index==10000
Index=0
@distributed for (I,j) in collect(Iterators.product(1:50,1:320))
Suf[1+(i-1)×320:i×320,j]=(fft(mean(x×x',dims=1)))
end
end 
end
q=10000; chan=Channel{Array{UInt8,1}}(q)
P=ThreadPools.@tspawnat 2 readudp(chan)

P1=ThreadPools.@tspawnat 1 broker(chan)

Nobody want to help me???

Sorry, but it’s not very easy to understand the problem. The code you posted was not correct, so I modified it a little to not have syntax errors:

using Distributed, SharedArrays, Statistics, FFTW, ThreadPools

addprocs(8)

@everywhere using SharedArrays, FFTW, Statistics, Sockets

Suf = convert(SharedArray, zeros(16000, 320))

function readudp(chan::Channel)
    udp2 = UDPSocket()
    bind(udp2, ip"0.0.0.0", 2000)
    while true
        from, pck = recvfrom(udp2)
        put!(pck)
    end
    close(udp2)
end

function broker(chan::Channel)
    index = 0
    while true
        y = take!(chan)
        cvt[:, (index+1):(index+4)] = reshape(y, (320, 4))
        index = index + 4
        if index == 10000
            Index = 0
            @distributed for (I, j) in collect(Iterators.product(1:50, 1:320))
                Suf[1+(i-1)×320:i×320, j] = (fft(mean(x × x', dims = 1)))
            end
        end
    end
end

q = 10000;
chan = Channel{Array{UInt8,1}}(q)
P = ThreadPools.@tspawnat 2 readudp(chan)

P1 = ThreadPools.@tspawnat 1 broker(chan)

However, I don’t know what this code is meant to do, nor does it give an error like this. What is the result that you obtain, and what is the correct result? How does the serial code look like?