Add Tasks to Background Worker

I’m trying to create a worker who will pick up tasks from a global list and execute them without blocking the repl (i.e. the worker executes in the background). In the following MWE, I’d expect that adding tasks to the worker would return nothing (or possibly printing if the function prints), and calling tasks would show a decreasing number of tasks until it all tasks are executed. From there, the tasker should wait for a new task to be pushed to the list, and pushing a new task would cause it to be executed by the worker.

So far, either I hit a concurrency violation, the tasks are never executed, or the program blocks the repl.

Here is my MWE:

using Base.Threads

# Global task list, locking mechanism, and condition variable
global tasks = []
global tasks_lock = ReentrantLock() 
global new_task_condition = Threads.Condition()

# Function to add tasks to the global list
function add_task(new_task)
    lock(tasks_lock) do
        push!(tasks, new_task)
   end
   notify(new_task_condition)
end

# Function to define a worker
function worker()
    while true
        local task = nothing
        lock(tasks_lock) do
            while isempty(tasks)
                wait(new_task_condition)
            end
            task = popfirst!(tasks)
        end
        task()
    end
end

# Start a worker in the background
Threads.@spawn worker()

# Add tasks to the queue
add_task(() -> println("Hello, world!"))
add_task(() -> println("Goodbye, world!"))
add_task(() -> println(sum(1:1000)))
add_task(() -> sleep(2); println("Task 2 seconds"))

This snippet gives a concurrency violation.

To wait on a Threads.Condition, you must be holding the condition’s lock first (hence the concurrency violation error, though the error does inform you that the lock must be held). Thus, a Threads.Condition can act as a lock itself if you’d like. The other problem is that your worker function takes the tasks_lock and never releases it, thus never allowing your add_task function to acquire the lock to add the task to the global array.

As an alternative, I’d suggest using a Channel! It’s basically an array + condition and can handle all of these tricky concurrency patterns internally.

2 Likes

Here’s a small example of the channel-based approach:

using Base.Threads
channel = Channel{Task}(Inf)

function consumer(channel::Channel)
    while true
        task = take!(channel)
    	schedule(task)
    	wait(task)
    end
end

worker1 = @spawn consumer(channel)
worker2 = @spawn consumer(channel)

With tasks:

put!(channel, @task println("Hello, world!"))
put!(channel, @task println("Goodbye, world!"))
put!(channel, @task (sleep(2); println("Task 2 seconds")))
put!(channel, @task println(sum(1:1000)))

When I add the tasks in the REPL:

julia> put!(channel, @task println("Hello, world!"))
Task (runnable) @0x000000011b71de40

julia> put!(channel, @task println("Goodbye, world!"))
Hello, world!
Task (runnable) @0x000000011b71e230

julia> put!(channel, @task (sleep(2); println("Task 2 seconds")))
Goodbye, world!
Task (runnable) @0x000000011b71e620

julia> put!(channel, @task println(sum(1:1000)))
Task (runnable) @0x000000011b71eb60

julia> 500500
Task 2 seconds
2 Likes

OMG, so simple. Much appreciated!