How to achieve Non blocking IO with Julia

I have a function that is executed through a Julia thread. In this function, there is a execute_psql_string_await function that performs a PostgreSQL stored procedure call using the LibPQ library. Currently, while this query is being executed, my thread is blocked. However, I do not want this behavior. I want a non-blocking way to execute this query, meaning that I trigger the stored procedure and release the thread without waiting for the result. Any thoughts are welcome.


function sproc_caller(table_name::String, sql_string::String, request::Dict, request_id::String, sproc_name::String)
    try
        @info "Start sproc_caller table_name $(table_name) execute_psql_string, on thread $(Threads.threadid())"
        @time execute_psql_string_await(sql_string)
        @info "End sproc_caller table_name $(table_name) execute_psql_string,  on thread $(Threads.threadid())"
    catch err
        @error "Error in Stored Procedure on thread $(Threads.threadid()), Error : $(err)" exception=(err, catch_backtrace())
end
1 Like

Are you sure the thread is blocked? Julia distinguishes between Task and threads. Usually when a Task is waiting for some IO then the thread can switch to another Task. Of course if the wait is a spin lock/busy loop or similar then this does not work but this is then a fundamental limitation of whatever library you are using and there is no way around this. Usually there is a reason though why an API is designed that way.

Or do you just wish for your Task to progress instead of waiting on the return of execute_psql_string_await? In that case you can simply spawn that call in another Task such that is executed concurrently, i.e. do Threads.@spawn execute_psql_string_await(sql_string). Another possibility could to look for a different method since the name execute_psql_string_AWAIT suggests that there might be a non-blocking variant perhaps called execute_psql_string or similar.

2 Likes

Hi @abraemer

I’ve created a worker module, in which I’ve exposed Workers.@async macro to pass async to a pool of tasks; which effectively uses a thread available to execute any async task.

module Workers

const WORK_QUEUE = Channel{Task}(1000)

"Get the current length of the provided Channel `ch`, with the `WORK_QUEUE` as default."
get_queue_tasks(ch::Channel=WORK_QUEUE) = length(ch.data)

"Check if the provided Channel `ch` is at or over capacity, with the `WORK_QUEUE` as default."
is_queue_full(ch::Channel=WORK_QUEUE) = get_queue_tasks(ch) ≥ ch.sz_max

"Get the percentage fullness of Channel `ch` when compared to its maximum capacity, with the `WORK_QUEUE` as default."
function get_percent_queue(ch::Channel=WORK_QUEUE)
    is_queue_full(ch) && return 100.0
    x = get_queue_tasks(ch)
    x == 0 && return 0.0
    return round(x / ch.sz_max * 100.0, digits=2)
end

"""
    @async expr

Adds `expr` as a new task to `Workers.WORK_QUEUE`, wrapped in a try-catch
to display any error messages.

The tasks in `Workers.WORK_QUEUE` are run on any available threads except for
thread 1, which is reserved for the HTTP handler (unless only one thread is
in use).
"""
macro async(expr)
    esc(quote
        tsk = @task begin
            try
                $expr
            catch err
                @error "Async function failed: $(typeof(err))" exception=(err, catch_backtrace())
                return err
            end
        end
        tsk.sticky = VERSION >= v"1.7" # Prevent task from switching threads
        tsk.storage = current_task().storage
        put!(Workers.WORK_QUEUE, tsk)
        tsk
    end)
end

"""
    init()

If `Threads.nthreads()` is greater than 1, initialise async function on each
thread apart from thread 1, which is left for the HTTP handler.

If only 1 thread is available,
`Workers.@async` acts in the same way as `Base.@async`.
"""
function init()
    @info "DataExporterService is starting with $(Threads.nthreads()) threads."
    tids = Threads.nthreads() == 1 ? (1:1) : 2:Threads.nthreads()
    Threads.@threads :static for _ in 1:Threads.nthreads()
        if Threads.threadid() in tids
            Base.@async begin
                for task in WORK_QUEUE
                    # Force schedule task onto this worker thread
                    _ = ccall(:jl_set_task_tid, Cint, (Any, Cint), task, Threads.threadid()-1)
                    schedule(task)
                    wait(task)
                end
            end
        end
    end
    return
end

end # module

I start the julia application with 4 threads and I use main thread for the HTTP service, and I expect remaining 3 threads to be available for task execution, using the above pool, as and when I submit the tasks it starts executing;

Here’s how I execute those tasks:

for sproc in SPROC_REF
  sql_string = "CALL $(sproc[1]);"
  Workers.@async sproc_caller(sproc[2], sql_string, request, request_id, sproc_name, x_request_id=x_request_id)
end

And as you can see in the above sproc_caller function I’ve logs attached; My requirement is that; Once I submit that task to the worker pool; I want that task to be expected in the background and release the thread so that I can use that thread to submit more tasks to the worker pool. But here the issue is; execute_psql_string_await I have in the wait for the SQL command to return a results, I don’t want that; What I want is execute that and release the Julia thread for me to use for other tasks.

execute_psql_string_await is a wrapper function around LibPQ async_execute function as follows:

function execute_psql_string_await(pg_params::PostgreSQLConnectionParams, psql_string::AbstractString, wait::Bool, options=Dict{String, String}(), nowarn=false)
    !nowarn && @warn "Executing a PSQL query from a string"
    @debug psql_string
    with_postgresql(pg_params, options) do psql
        result = async_execute(psql, psql_string)
        if wait
            return fetch(result)
        else
            return result
        end
    end
end
execute_psql_string_await(psql_string::AbstractString) = execute_psql_string_await(global_psql_config(), psql_string, false)

Further to above; I also tried using Workers.@async with execute_psql_string_await(sql_string) function; Yet the thread is handing there until that query get executed. Not sure what other options I’ve for this.

Another issue with the options you suggested is that; I can’t directly use @spawn because it will use the main thread for other async tasks that I’m using; I need to keep the main thread free all the time as it should accept the incoming requests through the HTTP server.

So I don’t think you actually await the result of the call. The code you posted shows, that fetch(result) is called only if you pass wait=true which you don’t. Is is possible that you just see the overhead of using the execute_psql_string_awaitfunction? I found this issue which could be related:

Also I think you forgot to include the logs.

Anyways I think there might be a much easier way of handling this whole situation since the advent of threadpools with 1.10. Simply start your julia with --threads 3,1 which gives you 3 default threads and 1 interactive thread with the main thread being in the interactive pool. Then you replace the calls to your Workers.@async foo() with Threads.@spawn :default foo() to spawn the task on the other threadpool. You don’t need any additional worker task or queue. This will all be handled by Julia’s scheduler. You can of course track how many Tasks are currently schedule by e.g. using a global variable with an atomic integer that you increment/decrement in the tasks. This should also solve any issues with execute_psql_string_await since even if this call should block, then Julia can simply schedule another Task on that thread.

1 Like