How to Collect Data from a Async Call?

Hi, I have build a application where I have a function which collect data from a API call then store it to a Array as follows:

using DotEnv, HTTP, JSON3

DotEnv.config();

# Function to collect data
function collect_data(method, url, token, body)
    r = HTTP.request(
        method, 
        url,
        ["Content-Type" => "application/json", "X-Token" => ENV["HERA_SECRET"]],
        JSON3.write(body),
        require_ssl_verification = false
    )
    if isequal(r.status, 200)
        try
            return JSON3.read(r.body)["data"]
        catch err
            println("Error while parsing: $err")
        end
    end
    return Dict()
end

# A preallocated array
linear_data = Array{Any}(undef, 1000);

# linear call to API
for i in 1:1000
    linear_data[i] = collect_data("POST", URL, ENV["HERA_SECRET"], params);
end

But I want to convert that in to Async call. But to use Distributed jl probably I need a shared array. But shared array do not support the type Any. My plan was to create an array

using Distributed, SharedArrays

# Add Processes
addprocs(n) 

# Shared array does not support Any type
non_linear_data = SharedArray{Any}(1000)

@sync @distributed for i in 1:1000
    non_linear_data[i] = collect_data("POST", URL, ENV["HERA_SECRET"], params);
end

But I am stuck at the distributed part. Is there any better way to resolve that? In Go that would be easy to create a Bidirectional Channel and feed the result into the Channel and later collect the data from the Channel into a slice.

FYI there’s Transducers.dcollect that implements collect-like API based on Distributed.jl. It’d be something like

linear_data = dcollect(collect_data("POST", URL, ENV["HERA_SECRET"], params) for i in 1:1000)

You can also try thread-based version with tcollect.

2 Likes

Hi thanks for the solution, though I have not tried it out and I was looking for more detail how to do that but not any magic solution. Because I wanted to learn that In general how to solve that problem with @distribute or channel in Julia. But could not find any well described solution. If you know some document I can have a look to be able to learn that?

You should be able to do this even as a single-processor asynchronous task. For example:

julia> linear_data = zeros(Int, 10);

# Linear
julia> @time for i in 1:10
           linear_data[i] = (sleep(1); i);
       end
 10.019392 seconds (50 allocations: 1.406 KiB)

# Asynchronous
julia> @time @sync for i in 1:10
           @async linear_data[i] = (sleep(1); i);
       end
  1.014280 seconds (1.78 k allocations: 95.128 KiB)

julia> linear_data
10-element Array{Int64,1}:
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10

Of course you may distribute this to speed it up even more.

If you are curious about implementation strategies of robust and composable parallel collect, I explained an implementation strategy I use in Transducer’s tutorial: https://juliafolds.github.io/Transducers.jl/dev/tutorials/tutorial_parallel/#tutorial-parallel-collect

Cool. That’s awesome. I am reading it.
Thanks