I’m trying to parallelize mapping a very large iterator (a FASTA.Reader from the FASTX package).
The mapping operation is quite costly, and I have many cores, so I thought using threads would be appropriate. Ideally, I should reuse the same thread for many mappings as otherwise thread creation will have a significant performance impact.
I’m quite new to Julia, used to functional programming, so I have experienced a bit of friction with mapping and iterators vs collections. I note the regular map only works for collections (but FASTA.Reader is an iterator as it has unknown length). Hence, the threaded drop in replacement ThreadX.map can’t be used.
What is the most elegant solution to this? The only thing I could come up with that is both threaded and sends batches of elements to map is a handcoded solution based on channels. Alternatively, shall I give up on threads and look for parallel solutions?
The notion of “collection” for ThreadsX etc. is very generic and it doesn’t have to be a thing that is backed up by RAM. If the FASTA file format is parallel friendly and possible to recover the parser state when you start from the middle of a file, it should be possible to implement SplittablesBase.halve by re-opening the file. In fact, this is how SplittablesBase.jl supports String; you can recover the “parser state” for UTF-8 from the middle of byte stream. Somebody has to implement SplittablesBase.halve on FASTA.Reader, though.
using FASTX
# Some expensive function
function foo(x::FASTA.Record)
t1 = time()
x = UInt(length(x.sequence))
while trailing_zeros(x) < 26
x ⊻= rand(UInt)
end
time() - t1
end
function bar(path)
open(FASTA.Reader, path) do reader
@sync for record in reader
Threads.@spawn foo(record)
end
end
end
There should be a way of doing this high-level, functionally. @tkf is there really no way to simply apply a function to an iterator of unknown length - given that we don’t care about multithreading of the iteration, only the application of the function?
Edit: To give a little more detail to anyone reading: The FASTX.Reader iterator is complicated to multithread - not impossible, just difficult. But it’s also very fast. For almost all use cases, the bottleneck is not the iterator itself, but whatever processing is done to each element that is returned from the iterator. That is really what’s at focus here: How do you just let the iterator be single-threaded, but distribute the elements returned from the iterator to different threads to be processed?
Well, it depends. For example, if you don’t care about the ordering, you can “broadcast” the work to multiple tasks using a Channel and Threads.foreach. If you can afford somewhat large buffer and/or each function call has more or less have the same execution time, I think it’s possible to write a generic multi-threaded reduction for purely sequential iterators. If you don’t need the generic reduction and just need map in particular, I think the “large buffering” requirement is not critical (as you are going store everything anyway). I just wanted to suggest halve first since it’s much more composable and generic.
I found a simple and elegant solution. Using ThreadPools.tmap, which is a drop-in replacement for map.
@stillyslalomBase.map can handle iterators. map(identifier, open(FASTA.reader, "file.fa")) works fine. ThreadsX.map doesn’t in this case. That’s what confused me.
The example you provided with Iterators.filter does indeed work, so I wonder what are the interface differences between filter and FASTA.reader?
@jakobnissen Thanks for that code. Perhaps ThreadPools.tmap is a good option in this case. I still don’t get why the example @stillyslalom works with ThreadsX.map but it doesn’t on a FASTA.reader.
If ThreadPools.tmap fits your use-case, then I think that’s great! But I’d point out a few caveats:
It looks like ThreadPools.tmapcollects input into an array first. It’s a robust strategy but not very optimal. For example, you cannot interleave the iteration and the computation this way.
The main goal of ThreadPools.jl is to separate latency-critical code from throughput-oriented code. ThreadPools.jl achieves this by a clever trick but unfortunately this impedes dynamic scheduling by the julia runtime. As a result, using this at library level means we will loose composable nested parallelism ecosystem that Julia’s parallel runtime is designed to support.
ThreadsX.map converts the iterator transformations (e.g., Iterators.filter) to transducers and runs the reduction on the “inner most” iterator. So, if you have ThreadsX.map(f, (x for x in xs if p(x))) (or equivalently ThreadsX.map(f, Iterators.filter(p, xs))), what matters is if the iterator (collection) xs supports SplittablesBase.jl API.
This is based on a new generic NondeterministicThreading transducer that supports non-deterministic reduction that only assumes associativity (but not commutativity). There are a lot of optimization opportunities left on the table but I hope it’s already useful in many cases.
As one might expect from Amdahl’s law, the function to be parallelized has to be much slower than the iteration and the overhead of the parallelization.