Idiomatic way to create non-sticky tasks?

Hi. I have an application to create an Array of tasks and then launch them asynchronously (preferably in parallel). The time of launching is not necessarily the time of creation hence @spawn is not ideal.

Is there a macro like @task that creates non sticky tasks? Right now, I create tasks kind of this way.

TaskArray = Array{Task,length(ndims)}(undef,ndims…)

Fill the array of tasks. Currently I am letting them be sticky if the original task is sticky. Is there a map like function that could in place modify attributes of elements of an array, so I could make them all non sticky in one line?

Then I do

Schedule.(TaskArray)

Also if I have a multidimensional array of Tasks, what is the scheduling order.

Say TaskArray = fill(@task myfunction(argument),dim1,dim2). Would it traverse dim1_idx = 1:dim1 first for dim2 = 1, then do the same for dim2 = 2…, and so on till n dims?

The documentation is a little hard to follow. Are there other ways to create tasks instead of the @ task macro where we specify these attributes?

Tasks have no reliable scheduling order. However there is a “depth-first” kind of scheduling, i.e. if a Task spawns further Tasks then these are prioritized. See

you can use foreach

tasks = [...]
foreach(t->t.sticky = false, tasks)

No there is no built-in for that. You’ll need to set the stick flag manually or create new marcro that does that for you, e.g.

macro mytask(args)
    return quote 
        let t = @task($args)
            t.sticky = false
            t
        end
    end
end
t = @mytask println(5)
@assert !t.sticky

It feels like this might a XY problem kind of question because your questions seem a bit odd and oddly specific to me.
You seem to care about performance. However having lots of (small?) tasks is not very efficient due to overheads. If you create tasks in a multi-dimensional array, then it would probably be more efficient to instead split the indices and create only a limited amount of tasks that each cover some range of indices.
If you describe what problem you try to solve, then perhaps you can get better guidance. So feel invited to share more about the origin of these questions.

1 Like

The tasks are not particularly small. I have a 2 D task array of size about n x y (where n is a number between 1 and 5 typically, and y is typically 6).

The tasks are assigned in this manner, all dim2 elements for a dim1 element is populated ie Tasks[idx,:] is populated one at a time along with a validity checker. Once all tasks are populated and all validity checks are true, all tasks could be run in parallel across any dimension.

The reason to prefer scheduling in Tasks[:,idx] order first is that these tasks are pre processing steps for another set of operations that occur in sequence after each Tasks[:,idx] is complete. And then there is another set of post processing tasks that are to be scheduled (can be in parallel like Tasks, but provided the required operations Tasks[:,idx] and the operations after that is complete). The flow can be summarized as

Pre-process[:,1] || Pre-process[:,2] || … || Pre-process[:,end]
↓ ↓ ↓
Process[1] → Process[2] → … → Process[end]
↓ ↓ ↓
Post-process[:,1] || Post-process[:,2] || … || Post-process[:,end]

The → and ↓ mark dependencies, while || mark independent. n is a variable number based on user input (and it’s useful to keep y parametric as well).

I think both the macro and foreach is good option.

For your setting, a pretty standard construction is to use backpressure. That is: You emplace a limit that PreProcess(i) cannot start before Process(i-n1) is done; and you limit that Process(i) cannot start until PostProcess(i-n2) is done, with generous values of n1 and n2; otherwise a job is scheduled once its other prerequisites are done (PreProcess(i) and Process(i-1) for Process(i), Process(i) for PostProcess(i)).

This is especially if any of your steps expand memory usage. E.g. if PreProcess(i) creates a large array that only gets free’d once Process(i) or PostProcess(i) is done with it.

You don’t really need to create most of the tasks before scheduling them: Process(i) can create the tasks for Process(i+1), PostProcess(i) and PreProcess(i+n1), and can manage the vectors of tasks (since it’s effectively under a mutex anyway, no extra locks required).

Regardless of how you end up coding the thing, you need to make sure that the back-pressure exists, in order to prevent a situation where you first spend lots of like doing all the PreProcess, and then linearly do the Process / PostProcess in order, using only a single core.

1 Like

You might want to look at GitHub - JuliaFolds2/StableTasks.jl: Type stable multithreaded tasks in julia

2 Likes

:slight_smile: wesome. I’ll look into this as well.

I have another scenario that I’d like to have suggestion on.

I have a logic that does something like this

Task_1 = spawn ()
Task_2 = spawn ()

for index in range(1,iterations):
arg_iterable_Task1 = view(arg_task1,index,:slight_smile:
arg_iterable_Task2 = view(arg_task2,index,:slight_smile:
Task1 = @spawn function1!(arg1,arg_func1…,arg_iterable_Task1 )
wait(Task2)
wait(Task1)
function_require_sync(arg1,arg2,arg_sync…)
Task2 = @spawn function2!(arg2,arg_func2…,arg_iterable_Task2 )
end

wait(Task2)

In this case Task1 and Task 2 are fairly short (but not super short). The code runs faster and allocates almost nothing without spawning tasks. I am curious if there is a way to use a lightweight task spawning mechanism, just as an experiment.

Also once the Task2 is spawned if I update arg_iterable_Task2, the existing task should use the existing variable right. The idea is to concurrently run (i+1)'th iteration of Task1 with (i)'th iteration of Task 2 and sync them.

I did some benchmarking. If run as function instead of tasks each of them takes about 2us per iteration.

Spawning as task using @spawn adds overhead and hence slower (but actually not by much in a latency perspective, the time grows to 3x the previous one). I’m surprised it is able to manage inter thread communications within 5us or so! There’s a lot of allocations. I am curious if there’s a way to manually manage these communications better, like by capturing necessary number of threads before the for loop and then scheduling on these threads (or using channels in a similar way). Additionally provide hints on time, like sizehint.

I think you might be getting carried away a bit and losing sight of your original goal. Didn’t you originally want to do performance optimization? It seems to me that you currently try to model your data dependencies using Tasks and the two goals are actually at odds.

If performance is the goal, then I’d like to point out that this is way too small of a job to put in a Task. As you noticed, a Task comes with a performance overhead of ~5μs. So I recommend that you should have workloads that take at least 1ms but the larger the better (as long as you can keep all threads busy ofc).

2 Likes

Got it. Thanks. The original question, I’ve got good leads and it works great.

I have re structured my code to remove the spawn for these tasks (the top level tasks got a speed benefit of 3-4x over single threaded). But this experiment also showed me a potential bug/issue/warning.

using .Threads

function displayNumber(inputNumber::Int)
    println(inputNumber)
end

function updateNumber(iteration_stride::Int,num_iteration_strides::Int)
    iterable_index = collect(1:iteration_stride:iteration_stride*num_iteration_strides)
    current_val = 1
    TaskVal::Task = @spawn ()
    for idy in range(1,1)
        for idx in range(1,iteration_stride*num_iteration_strides)
            if((current_val < length(iterable_index)) & (idx == iterable_index[current_val]))
                current_val += 1
            end
            wait(TaskVal)
            TaskVal = @spawn displayNumber(current_val)
        end
    end
end

updateNumber(3,10)

The above code will cause race condition issues depending upon whether the variable was updated before the thread was physically spawned.

Regarding the second question on the two tasks, the intention was to see if there could be more performance that could be squeezed out. The spawning of tasks just maps well to it, if the overhead wasn’t there. I’m still curious if there is something like this I could do at the Base.ThreadingUtilities or Base.LLVM level to do the fine-grained control I’m looking to do. Like hacking the runtime to do something (or even build a new macro that could be optimized for this kind of synchronization).

Something like Task1 = build_tight_task(function1) where build_tight_task takes in a function as argument, along with datatype to create a compile that could bind to a physical thread and then be able to invoke with arguments during runtime and use the same wait function (overloaded to handle this struct).

using .Threads

function updateNumber_producer(iteration_stride::Int,num_iteration_strides::Int,Communicator::Channel{Int})
    iterable_index = collect(1:iteration_stride:iteration_stride*num_iteration_strides)
    current_val = 1
    for idx in range(1,iteration_stride*num_iteration_strides)
        if((current_val < length(iterable_index)) & (idx == iterable_index[current_val]))
            current_val += 1
        end
        put!(Communicator,current_val)
    end
end

function displayNumber(iteration_stride::Int,num_iteration_strides::Int,Communicator::Channel{Int})
    current_val::Int = 0
    for idx in range(1,iteration_stride*num_iteration_strides)
        current_val = take!(Communicator)
        println(current_val)
    end
end

function updateNumber_spawn(iteration_stride::Int,num_iteration_strides::Int,Communicator::Channel{Int})
    UpdaterTask = @spawn updateNumber_producer(iteration_stride,num_iteration_strides,Communicator)
    DisplayTask = @spawn displayNumber(iteration_stride,num_iteration_strides,Communicator)
    wait(UpdaterTask)
    wait(DisplayTask)
end

function updateNumber_task(iteration_stride::Int,num_iteration_strides::Int,Communicator::Channel{Int})
    UpdaterTask = @task updateNumber_producer(iteration_stride,num_iteration_strides,Communicator)
    DisplayTask = @task displayNumber(iteration_stride,num_iteration_strides,Communicator)
    schedule(UpdaterTask)
    schedule(DisplayTask)
    wait(UpdaterTask)
    wait(DisplayTask)
end

Communicator = Channel{Int}(1)
updateNumber_task(3,10,Communicator)
updateNumber_spawn(3,10,Communicator)
close(Communicator)

This code fixes the above issue. (Or one could also define another variable current_val_buffer and do current_val_buffer = current_val after wait(TaskVal)).

Channels also reduce the overhead (possibly since I only launch 2 tasks instead of 2*num_iterations tasks). The overhead (disabling print and using @btime) is now about 2us for spawn (which possibly has to manage multiple threads). Using Tasks, if I leave sticky = true, I get 1us and setting sticky = false, I get 2us just similar to spawn.

The above toy task is too quick to benefit from multithreading, but I guess if my task is slightly larger, this could actually benefit it. Any way to reduce latency on channels further?

The latency on Channels comes from waiting on conditions and locks, which involves the scheduler. It is possible to avoid it by using Threads.SpinLock instead of Channel. Spinlocks are generally the fastest way to synchronize a few threads because they use hardware atomics, but you’ll have to do all the stuff yourself, and be careful not to deadlock.

1 Like

IMO this is pretty bad advice. Hand rolling your own channel is really easy to get wrong and really hard to get fast. I think there is a missing 1-way 1 to 1 channel that could be lower latency and higher performance by giving up some flexibility, but that belongs in a package (or Base), not hand rolled by everyone who needs it.

5 Likes

Thank you @sgaure and @Oscar_Smith. For the original question, the change with channels would be either using mutable values passed into both tasks and using an unbuffered channel for synchronizing, or to use a buffered channel that juggles the values between tasks. So the modified code for the above task will be

Note (I missed to mention that function_require_sync is also a mutating function, should have been made function_require_sync! and it modified arg1 to be used by Task1).

# Construction multi channel communicators.

# Create graphs between each pair of processes.
function create_communicators(Num_processes::Integer)
    Num_channel_pairs = binomial(Num_processes,2)
    communicators = Array{Channel{T},2}(undef,2,Num_channel_pairs)
    return communicators
end

# Method 1: Using mutable objects, and simple sync messages. The channel will be unbuffered, and mutable objects will be passed as argument.
function function1_loop!(arg1,arg_task1,communicators::Array{Channel{T},2},arg_func1...) where T
    sync_message::T = 1
    iterations = size(arg_task1,1)
    for index in range(1,iterations)
        arg_iterable_Task1 = view(arg_task1,:,index)
        take!(communicators[1,1])
        function1!(arg1,arg_func1…,arg_iterable_Task1)
        put!(communicators[2,1],sync_message)
        # Here arg1 (a mutable object) is mutated and shared between the two functions.
        # If required arg1, could be the sync message being passed between the two functions.
    end
end

function function2_loop!(arg1,arg2,arg_task2,communicators::Array{Channel{T},2},arg_func2...) where T
    sync_message::T = 1
    iterations = size(arg_task2,1)
    for index in range(1,iterations)
        arg_iterable_Task2 = view(arg_task2,:,index)
        take!(communicators[2,1])
        function_require_sync!(arg1,arg2,arg_sync…) # Modifies arg1 in place to be used by function! in next iteration.
        put!(communicators[1,1],sync_message)
        # If required arg1, could be the sync message being passed between the two functions.
        function2!(arg2,arg_func2…,arg_iterable_Task2)
    end
end

function asynchronous_compute_unbuffered!(arg2,arg_task1,arg_task2)
    arg1 = some_mutable_object()
    arg_func1 = tuple(some_value...)
    arg_func2 = tuple(some_value...)

    Num_processes = 2
    communicators = create_communicators(Num_processes)
    communicators .= Channel{T}(0)

    put!(communicators[1,1],1)
    # Alternately, arg1 could be the sync message being passed between the two functions.
    # In that case, the functions do not consume arg1 as argument, and arg1 is no longer mutable.
    Task_1_loop = @spawn function1_loop!(arg1,arg_task1,communicators,arg_func1...)
    Task_2_loop = @spawn function2_loop!(arg1,arg2,arg_task2,communicators,arg_func2...)
    #wait(Task_1_loop) #Wait not really necessary since Task_2 completion is controlled by channel and hence Task 1.
    wait(Task_2_loop)
    take!(communicators[1,1])
    close.(communicators)
end

# Method 2: Using channels as sync messages, passing arguments as channel messages.

# Alternate version passing arg1 as a sync message.
function function1_loop!(arg_task1,communicators::Array{Channel{T},2},arg_func1...) where T
    iterations = size(arg_task1,1)
    for index in range(1,iterations)
        arg_iterable_Task1 = view(arg_task1,:,index)
        arg1 = take!(communicators[1,1])
        arg2 = function1(arg1,arg_func1…,arg_iterable_Task1)
        put!(communicators[2,1],arg2)
    end
end

function function2_loop!(arg_task2,communicators::Array{Channel{T},2},arg_func2...) where T
    iterations = size(arg_task2,1)
    for index in range(1,iterations)
        arg_iterable_Task2 = view(arg_task2,:,index)
        arg2 = take!(communicators[2,1])
        arg1 = function_require_sync(arg2,arg_sync…)
        put!(communicators[1,1],arg1)
        function2!(arg2,arg_func2…,arg_iterable_Task2)
    end
end

function asynchronous_compute_buffered!(arg_task1,arg_task2)
    arg1 = some_object()
    arg_func1 = tuple(some_value...)
    arg_func2 = tuple(some_value...)

    Num_processes = 2
    communicators = create_communicators(Num_processes)
    communicators[1] = Channel{type(arg1)}(1) # How to optimize if arg1 is an array or matrix? Channel{eltype(arg1)}(size(arg1)) if arg1 is an array.
    communicators[2] = Channel{type(arg2)}(1) # Channel{eltype(arg2)}(size(arg2)) ??

    put!(communicators[1,1],arg1)
    Task_1_loop = @spawn function1_loop!(arg_task1,communicators,arg_func1...)
    Task_2_loop = @spawn function2_loop!(arg_task2,communicators,arg_func2...)
    wait(Task_2_loop)
    arg1_out = take!(communicators[1,1])
    close.(communicators)
    return arg1_out
end

Can anyone teach me how I can make a package to make these communications faster. Ideally, I should be able to support both cases mentioned above. Willing to delve into the deeper parts of the libraries/base Julia to do this. Ideally, it should be able to use existing channel infrastructure with a macro on the put! and take!.

I figured out a scheme that works.

locks 1 and 2

initial, lock1 open, lock 2 closed

thread 1 checks if lock 1 is open. executes task.. closes lock 1.. opens lock2..

thread 2 checks if lock 2 is open. executes task.. closes lock 2.. opens lock1..

This has a crisscross handshake between each pair of channels.

using .Threads
function Thread1Sim(locks)
    for idx in range(1,1000)
        while(islocked(locks[1]))
        end
        lock(locks[1])
        # Do required task
        unlock(locks[2])
    end
end

function Thread2Sim(locks)
    for idx in range(1,1000)
        while(islocked(locks[2]))
        end
        lock(locks[2])
        # Do required task before Task 1 runs again.
        unlock(locks[1])
        # Do remaining task.
    end
end

locks = [SpinLock() for _ in range(1,2)] #Threads.SpinLock()
lock(locks[2])

function spawn_tasks(locks)
    T1 = @spawn Thread1Sim(locks)
    T2 = @spawn Thread2Sim(locks)
    wait(T1)
    wait(T2)
end

In case of the mutable (unbuffered) case, this would translate

  1. Convert take!(channel[id1]) to
    while(islocked(locks[id1]))
    end
    lock(locks[id1])
  2. Convert put!(channel[id2]) to
    unlock(locks[id2])

In the buffered channels case, after the while loop ends in take!, it would take the data out (maybe there’s a way to use LLVM for this as well to sync messages quickly). As long as there is no contention but pure message passing, this logic would work even for put! where the code puts the message first and then unlock.

I believe this should be another package and not part of base channel unless there’s a way to handle contention. If there’s a way to handle contention, then it could use a macro to hint if the logic is already contention free to do aggressive compiler optimization. If the code is contention free, and in scenarios where the tasks are forced to run on a single thread (resource limitations, etc) then the compiler could even fuse the task together.

I have few questions about existing channel methods. Why does put!(channel,value) return a value? Why does put! not accept just a channel in case of unbuffered, i.e. put!(channel) returns a method error. (Is this planned to change? Unbuffered channels is a recent addition if I’m understanding the commit history right). In general, if a function returns something and the return value is not consumed either by a variable or by io(eg, in case of REPL) is it, or can it be compiled away?

Can you explain your issue again?

The following looks like nonsense to me:

        while(islocked(locks[1]))
        end
        lock(locks[1])

This spin-loop is almost literally what lock does.

If you don’t like the built-in locks, I’d suggest using atomics / rolling your own, instead of mis-using the built-in ones.

Your construction is somewhat broken in that it doesn’t contain a GC safe-point in the spin-loop, and will therefore deadlock on GC. Furthermore, you should use the pause instruction of your CPU (that is designed for spinlocks! It saves power and hands more resources to sibling hyperthreads, and doesn’t hammer your inter-core connections as much).

I mean, there are things to dislike about the built-in locks: The finalizer thing is a kludge, and I think many use-cases would profit from not doing that (better expected runtime, worse outliers if finalizers happen to be scheduled).

Also, using a spinlock seems super wrong to me: All reasonable lock implementations spin for a while. You could rightfully argue that Julia ReentrantLock should try a couple of “fast” (ccall(:jl_cpu_suspend, Cvoid, ()), i.e. pause instruction) spins before asking the scheduler to find other work to do (Base.yield()). Not sure why it does it this way.

PS. Another thing that is wrong: You take the lock on one thread and unlock it on a different one. This has the issue that the runtime then believes, forever, that your main thread is in a critical section and suppresses finalizers. I am not entirely sure how officially this is forbidden, though – is this your bug, and I missed the docs that forbid it? Is it mitigated in the runtime (e.g.: Periodically reset the critical section counter, e.g. on each GC pass)? Or is it a runtime bug (not mitigated, and you’re allowed to do that)?

2 Likes

Thank you. I’m sorry for the code incorrectness, I did the best I could with the documentation I could find and experimentation I could do! This area seems quite sparse and I could not find much references.

I was looking to find a way to synchronize and/or send messages between two threads quicker than standard channels. Everything from the 6th comment in this thread is about that.