Hi @abraemer
I’ve created a worker module, in which I’ve exposed Workers.@async
macro to pass async to a pool of tasks; which effectively uses a thread available to execute any async task.
module Workers
const WORK_QUEUE = Channel{Task}(1000)
"Get the current length of the provided Channel `ch`, with the `WORK_QUEUE` as default."
get_queue_tasks(ch::Channel=WORK_QUEUE) = length(ch.data)
"Check if the provided Channel `ch` is at or over capacity, with the `WORK_QUEUE` as default."
is_queue_full(ch::Channel=WORK_QUEUE) = get_queue_tasks(ch) ≥ ch.sz_max
"Get the percentage fullness of Channel `ch` when compared to its maximum capacity, with the `WORK_QUEUE` as default."
function get_percent_queue(ch::Channel=WORK_QUEUE)
is_queue_full(ch) && return 100.0
x = get_queue_tasks(ch)
x == 0 && return 0.0
return round(x / ch.sz_max * 100.0, digits=2)
end
"""
@async expr
Adds `expr` as a new task to `Workers.WORK_QUEUE`, wrapped in a try-catch
to display any error messages.
The tasks in `Workers.WORK_QUEUE` are run on any available threads except for
thread 1, which is reserved for the HTTP handler (unless only one thread is
in use).
"""
macro async(expr)
esc(quote
tsk = @task begin
try
$expr
catch err
@error "Async function failed: $(typeof(err))" exception=(err, catch_backtrace())
return err
end
end
tsk.sticky = VERSION >= v"1.7" # Prevent task from switching threads
tsk.storage = current_task().storage
put!(Workers.WORK_QUEUE, tsk)
tsk
end)
end
"""
init()
If `Threads.nthreads()` is greater than 1, initialise async function on each
thread apart from thread 1, which is left for the HTTP handler.
If only 1 thread is available,
`Workers.@async` acts in the same way as `Base.@async`.
"""
function init()
@info "DataExporterService is starting with $(Threads.nthreads()) threads."
tids = Threads.nthreads() == 1 ? (1:1) : 2:Threads.nthreads()
Threads.@threads :static for _ in 1:Threads.nthreads()
if Threads.threadid() in tids
Base.@async begin
for task in WORK_QUEUE
# Force schedule task onto this worker thread
_ = ccall(:jl_set_task_tid, Cint, (Any, Cint), task, Threads.threadid()-1)
schedule(task)
wait(task)
end
end
end
end
return
end
end # module
I start the julia application with 4 threads and I use main thread for the HTTP service, and I expect remaining 3 threads to be available for task execution, using the above pool, as and when I submit the tasks it starts executing;
Here’s how I execute those tasks:
for sproc in SPROC_REF
sql_string = "CALL $(sproc[1]);"
Workers.@async sproc_caller(sproc[2], sql_string, request, request_id, sproc_name, x_request_id=x_request_id)
end
And as you can see in the above sproc_caller
function I’ve logs attached; My requirement is that; Once I submit that task to the worker pool; I want that task to be expected in the background and release the thread so that I can use that thread to submit more tasks to the worker pool. But here the issue is; execute_psql_string_await
I have in the wait for the SQL command to return a results, I don’t want that; What I want is execute that and release the Julia thread for me to use for other tasks.
execute_psql_string_await
is a wrapper function around LibPQ async_execute
function as follows:
function execute_psql_string_await(pg_params::PostgreSQLConnectionParams, psql_string::AbstractString, wait::Bool, options=Dict{String, String}(), nowarn=false)
!nowarn && @warn "Executing a PSQL query from a string"
@debug psql_string
with_postgresql(pg_params, options) do psql
result = async_execute(psql, psql_string)
if wait
return fetch(result)
else
return result
end
end
end
execute_psql_string_await(psql_string::AbstractString) = execute_psql_string_await(global_psql_config(), psql_string, false)
Further to above; I also tried using Workers.@async
with execute_psql_string_await(sql_string)
function; Yet the thread is handing there until that query get executed. Not sure what other options I’ve for this.
Another issue with the options you suggested is that; I can’t directly use @spawn because it will use the main thread for other async tasks that I’m using; I need to keep the main thread free all the time as it should accept the incoming requests through the HTTP server.