Runnable code and benchmark at this gist.
This is a widespread pattern for columnar-storage access, usually each column is too large to fit in ram, so it’s chunked in to
clusters, and each
cluster is individually compressed on disk.
getindex(col, idx), you first need to find the cluster, decompress it, and compute the
localidx and return user the data. But user is very likely to also immediately
getindex(col, idx+1), so you want to cache the result of decompressed cluster.
And the question is what’s the best cache design such that user can do
@threads for i in ..... For the longest time we have used the illegal approach in UnROOT.jl, and it has very good performance (betting C++ in some workload), but we can’t silently give user incorrect result… (rest assured, the signal threaded performance was good AND correct)
actually @quinnj might be interested in this. I have wanted to add this to Arrow (note: right now we can’t lazily read a compressed Arrow file). Adding caching like this would fix problems such as:
some additional context:
1. We want users to be able to tap into multi-threading easily
without re-writing their loop body
for evt in tree
users used to be able to simply
@threads for evt in tree
func(evt.col1) #usually pushing into histograms, it's thread-safe
2. The workload is usually not compute dominated
if it’s super CPU limited, we could use anything, like
Channel. But it’s not.
3. Another reason can’t use
one additional thing is
evt is actually lazy (read triggered by
evt.col1), because there might be 1000+ columns, for this reason, we can’t just push
evt into a
Channel, because that would require eager-reading of all columns, which is bad.
task_local_storage is not ideal
task local storage is no good because if you have 100
ev in a cluster, and the user spawned a few tasks within that cluster, you end up decompressing that cluster more than 1 time.
they may hit OOM if they used
@sync for ... @spawn pattern, because then you decompress every cluster and the same time, with high multiplicity
Could you elaborate on this? I don’t understand what you mean.
so say you have 10^4 elements, and roughly chunked into 10^3 elements per cluster. If user did
@sync for ... @spawn, you now have roughly 10^4 tasks?
Imagine each of them checking if they have a cached result in “task local storage”, they won’t find any, thus they will start decompressing stuff way too many times than actually needed.
Because the first 10^3 elements belong to the same “cluster”, we really should only read from disk and decompress this thing once, with tasks, you’re gonna do it about 10^3 times.
Okay, but why would you do
@sync for ... @spawn? that’s almost always a bad idea
because according to people, that’s almost always a better idea than
@threads (it’s more composable etc.).
But the point is I can’t control how users is gonna schedule it, I don’t want to give them OOM if they somehow scheduled using my data the wrong way (they may be indirectly using it with some parallel framework)
I hope people aren’t actually saying that because it isn’t true. Just because people say that
@threads is bad, does not mean you should be blindly spawning
N tasks for a threaded loop over a user provided datastructure
That hasn’t been true since Julia 1.8. See
??Threads.@threads, particularly the now-default
It is still true for
:static in Julia >= 1.8, or older Julia versions.
julia> @time begin
Threads.@threads for i in 1:Threads.threadpoolsize()
Threads.@threads for j in 1:Threads.threadpoolsize()
1.090039 seconds (69.56 k allocations: 4.747 MiB, 207.46% compilation time)
I run 36^2 = 1296 parallel
sleep(1) tasks. Sleeping for a total of 1296 seconds takes only 1 second, because the sleeping is all done in parallel (
sleep doesn’t block/lets other tasks be scheduled).
No problem nesting here.
Ah yeah, that’s true and good to know I don’t have to tell everyone the
@threads pattern is completely bad.
yeah, this is part of the problem I face here right? I need a cache structure that’s robust against N tasks >> threadpoolsize. Before it was easy, just index with
threadid() since N threads is how much CPU user wants to saturate and task can’t migrate.
Now any “task local” (but not thread local) buffer can’t work as easily because over-spawned tasks would excessively read from disk / decompress.