Multithreaded mapping of an iterator (not a collection)

I’m trying to parallelize mapping a very large iterator (a FASTA.Reader from the FASTX package).

The mapping operation is quite costly, and I have many cores, so I thought using threads would be appropriate. Ideally, I should reuse the same thread for many mappings as otherwise thread creation will have a significant performance impact.

I’m quite new to Julia, used to functional programming, so I have experienced a bit of friction with mapping and iterators vs collections. I note the regular map only works for collections (but FASTA.Reader is an iterator as it has unknown length). Hence, the threaded drop in replacement ThreadX.map can’t be used.

What is the most elegant solution to this? The only thing I could come up with that is both threaded and sends batches of elements to map is a handcoded solution based on channels. Alternatively, shall I give up on threads and look for parallel solutions?

1 Like

You may want to take a second look at ThreadsX.map - it can handle iterators (as can Base.map).

julia> v = Iterators.filter(<(.5), 0:0.2:1)
Base.Iterators.Filter{Base.Fix2{typeof(<), Float64}, StepRangeLen{Float64, Base.TwicePrecision{Float64}, Base.TwicePrecision{Float64}}}(Base.Fix2{typeof(<), Float64}(<, 0.5), 0.0:0.2:1.0)

julia> length(v)
ERROR: MethodError: no method matching length(::Base.Iterators.Filter{Base.Fix2{typeof(<), Float64}, StepRangeLen{Float64, Base.TwicePrecision{Float64}, Base.TwicePrecision{Float64}}})

julia> map(i -> (i, Threads.threadid()), v)
3-element Vector{Tuple{Float64, Int64}}:
 (0.0, 1)
 (0.2, 1)
 (0.4, 1)

julia> ThreadsX.map(i -> (i, Threads.threadid()), v)
3-element Vector{Tuple{Float64, Int64}}:
 (0.0, 1)
 (0.2, 6)
 (0.4, 2)
1 Like

Broadcasting only works on iterators with known size and shape, but that is not the case for map. Perhaps you are mixing them up?

The notion of “collection” for ThreadsX etc. is very generic and it doesn’t have to be a thing that is backed up by RAM. If the FASTA file format is parallel friendly and possible to recover the parser state when you start from the middle of a file, it should be possible to implement SplittablesBase.halve by re-opening the file. In fact, this is how SplittablesBase.jl supports String; you can recover the “parser state” for UTF-8 from the middle of byte stream. Somebody has to implement SplittablesBase.halve on FASTA.Reader, though.

Hello, @yarnton, and welcome!

Here’s a low level solution:

using FASTX

# Some expensive function
function foo(x::FASTA.Record)
    t1 = time()
    x = UInt(length(x.sequence))
    while trailing_zeros(x) < 26
        x ⊻= rand(UInt)
    end
    time() - t1
end

function bar(path)
    open(FASTA.Reader, path) do reader
        @sync for record in reader
            Threads.@spawn foo(record)
        end
    end
end

There should be a way of doing this high-level, functionally. @tkf is there really no way to simply apply a function to an iterator of unknown length - given that we don’t care about multithreading of the iteration, only the application of the function?

Edit: To give a little more detail to anyone reading: The FASTX.Reader iterator is complicated to multithread - not impossible, just difficult. But it’s also very fast. For almost all use cases, the bottleneck is not the iterator itself, but whatever processing is done to each element that is returned from the iterator. That is really what’s at focus here: How do you just let the iterator be single-threaded, but distribute the elements returned from the iterator to different threads to be processed?

Well, it depends. For example, if you don’t care about the ordering, you can “broadcast” the work to multiple tasks using a Channel and Threads.foreach. If you can afford somewhat large buffer and/or each function call has more or less have the same execution time, I think it’s possible to write a generic multi-threaded reduction for purely sequential iterators. If you don’t need the generic reduction and just need map in particular, I think the “large buffering” requirement is not critical (as you are going store everything anyway). I just wanted to suggest halve first since it’s much more composable and generic.

(Edit: I think I’ll implement it in ThreadsX soon-ish unless somebody beats me :slight_smile: https://github.com/tkf/ThreadsX.jl/issues/161)

2 Likes

Thanks for all the detailed replies.

I found a simple and elegant solution. Using ThreadPools.tmap, which is a drop-in replacement for map.

@stillyslalom Base.map can handle iterators. map(identifier, open(FASTA.reader, "file.fa")) works fine. ThreadsX.map doesn’t in this case. That’s what confused me.

The example you provided with Iterators.filter does indeed work, so I wonder what are the interface differences between filter and FASTA.reader?

@jakobnissen Thanks for that code. Perhaps ThreadPools.tmap is a good option in this case. I still don’t get why the example @stillyslalom works with ThreadsX.map but it doesn’t on a FASTA.reader.

@tkf Thanks for your explanations too. I have a new account and I can only mention two users at a time!

@DNF Yes, the fact that ThreadsX.map doesn’t work on FASTA.reader confused me.

1 Like

If ThreadPools.tmap fits your use-case, then I think that’s great! But I’d point out a few caveats:

  • It looks like ThreadPools.tmap collects input into an array first. It’s a robust strategy but not very optimal. For example, you cannot interleave the iteration and the computation this way.
  • It pre-computes output element type using the compiler internal. Thus, it is not a typocalypse-free solution. Practically, it means that updating julia can break your code.
  • The main goal of ThreadPools.jl is to separate latency-critical code from throughput-oriented code. ThreadPools.jl achieves this by a clever trick but unfortunately this impedes dynamic scheduling by the julia runtime. As a result, using this at library level means we will loose composable nested parallelism ecosystem that Julia’s parallel runtime is designed to support.

ThreadsX.map converts the iterator transformations (e.g., Iterators.filter) to transducers and runs the reduction on the “inner most” iterator. So, if you have ThreadsX.map(f, (x for x in xs if p(x))) (or equivalently ThreadsX.map(f, Iterators.filter(p, xs))), what matters is if the iterator (collection) xs supports SplittablesBase.jl API.

2 Likes

OK, I implemented ThreadsX.mapi that sequentially iterates over the input collection:

https://github.com/tkf/ThreadsX.jl/pull/162

This is based on a new generic NondeterministicThreading transducer that supports non-deterministic reduction that only assumes associativity (but not commutativity). There are a lot of optimization opportunities left on the table but I hope it’s already useful in many cases.

As one might expect from Amdahl’s law, the function to be parallelized has to be much slower than the iteration and the overhead of the parallelization.

4 Likes

@tkf Thanks!!! I’ll give it a try.

Works really well @tkf. Many thanks. The alternative, ThreadPools, has stability problems and other efficiency issues you discussed.