I’ve used Tasks a fair bit – my production code regularly runs hundreds of thousands of tasks. You’ll see a slight performance hit if you’re trying to create very optimized code, as well as a fair bit of memory allocations. But Tasks and Channels are a very nice way to enforce a highly concurrent model where most computation can run in any order. And if you have dependencies, the easiest way is usually to have a parent task start its child tasks.
If you want extreme generality, you can actually use a Channel{Task}
and I use that in some places. But it’s less efficient, so Channels of more concrete specific types are usually better.
With Channels, you rarely need to use wait
or notify
in my experience.
Here is a relatively simple example using Tasks and Channels to make a threadsafe dict:
function task_consumer(channel::Channel{Task})::Nothing
for task in channel
schedule(task)
wait(task)
end
end
global main_thread_task_queue = Channel{Task}(task_consumer, Inf)
"""
ChannelBasedDict{K, V}()
Yet another thread-safe dict.
"""
struct ChannelBasedDict{K, V} <: AbstractDict{K, V}
dict::Dict{K, V}
channel::Channel{Task}
function ChannelBasedDict{K, V}() where {K, V}
channel = main_thread_task_queue
dict = Dict{K, V}()
return new{K, V}(dict, channel)
end
end
function Base.setindex!(d::ChannelBasedDict{K, V}, v::V, k::K) where {K, V}
task = @task d.dict[k] = v
put!(d.channel, task)
return task
end
function Base.getindex(d::ChannelBasedDict{K, V}, k::K)::V where {K, V}
task = @task return d.dict[k]
put!(d.channel, task)
return fetch(task)
end
function Base.haskey(d::ChannelBasedDict{K, V}, k::K)::Bool where {K, V}
task = @task haskey(d.dict, k)
put!(d.channel, task)
return fetch(task)
end
function Base.get(d::ChannelBasedDict{K, V}, k::K, default::V)::V where {K, V}
task = @task get(d.dict, k, default)
put!(d.channel, task)
return fetch(task)
end
function get_nonblocking!(f::Function, d::ChannelBasedDict{K, V}, k::K)::Task where {K, V}
task = @task begin
if !haskey(d.dict, k)
d.dict[k] = f()
end
return d.dict[k]
end
put!(d.channel, task)
return task
end
function Base.get!(f::Function, d::ChannelBasedDict{K, V}, k::K)::V where {K, V}
return fetch(get_nonblocking!(f, d, k))
end
function Base.length(d::ChannelBasedDict{K, V})::Int where {K, V}
task = @task begin
return length(d.dict)
end
put!(d.channel, task)
return fetch(task)
end
function Base.iterate(d::ChannelBasedDict{K, V}) where {K, V}
task = @task begin
return iterate(d.dict)
end
put!(d.channel, task)
return fetch(task)
end
function Base.iterate(d::ChannelBasedDict{K, V}, index) where {K, V}
task = @task begin
return iterate(d.dict, index)
end
put!(d.channel, task)
return fetch(task)
end