Using IOStreams with Distributed parallelism

I’ve got a serial function:

function serial_mem(::Type{M}, input::DatastoreBuffer{<:ReadDatastore}, count_mode::CountMode, range::AbstractRange = 1:length(input)) where {M<:AbstractMer}
    @info "[Process: $(myid())] Collecting kmers from $(length(range)) reads in read datastore $(name(ReadDatastores.datastore(input)))"
    all_mers = collect_mers(M, count_mode, input, range)
    @info "[Process: $(myid())] Collapsing collected kmers into counts"
    return collapse_into_counts(all_mers)
end

Which I wanted to run distributed. The variable input is a type which references another type, which references an IOStream: DataStoreBuffer → ReadDatastore → IOStream.

So I made the following function:

function dist_mem(::Type{M}, input::DatastoreBuffer{<:ReadDatastore}, count_mode::CountMode) where {M<:AbstractMer}
    @info "Splitting all reads in $(name(ReadDatastores.datastore(input))), across $(nprocs()) processes and counting kmers"
    local_counts = Vector{Vector{MerCount{M}}}(undef, nprocs())
    @sync for p in procs()
        @async local_counts[p] = remotecall_fetch(serial_mem, p, M, input, count_mode, p:nprocs():length(input))
    end
    @info "Merging all counts from $(nprocs()) processes"
    final_count = local_counts[1]
    @inbounds for i in 2:lastindex(local_counts)
        unsafe_merge_into!(final_count, local_counts[i])
    end
    return final_count
end

However if I try to run this:

julia> using Distributed, BioSequences, ReadDatastores

julia> addprocs(4, exeflags="--project=.")
4-element Array{Int64,1}:
 2
 3
 4
 5

julia> @everywhere using MerCounting
[ Info: Precompiling MerCounting [a20136b7-8e32-4c10-91d3-7060c0bd8ec7]

julia> ds = @openreads "full-paired.prseq"
Paired Read Datastore 'full-paired': 988980 reads (494490 pairs)

julia> ds = buffer(ds)
Buffered Paired Read Datastore 'full-paired': 988980 reads (494490 pairs)

julia> MerCounting.Counters.dist_mem(DNAMer{31}, ds, MerCounting.CANONICAL)
[ Info: Splitting all reads in full-paired, across 5 processes and counting kmers
[ Info: [Process: 1] Collecting kmers from 197796 reads in read datastore full-paired
[ Info: [Process: 1] Collapsing collected kmers into counts
[ Info: [Process: 2] Collecting kmers from 197796 reads in read datastore full-paired
[ Info: [Process: 3] Collecting kmers from 197796 reads in read datastore full-paired
[ Info: [Process: 4] Collecting kmers from 197796 reads in read datastore full-paired
[ Info: [Process: 5] Collecting kmers from 197796 reads in read datastore full-paired
ERROR: TaskFailedException:
On worker 2:
SystemError: seek: Bad file descriptor
#systemerror#44 at ./error.jl:134
systemerror at ./error.jl:134 [inlined]
seek at ./iostream.jl:108
collect_mers at /Users/bward/.julia/packages/ReadDatastores/8WVvz/src/sequence-buffer.jl:64
serial_mem at /Users/bward/.julia/packages/MerCounting/Mv5HZ/src/counters/serial_mem.jl:39
#108 at /Users/bward/repos/julia13/usr/share/julia/stdlib/v1.3/Distributed/src/process_messages.jl:294
run_work_thunk at /Users/bward/repos/julia13/usr/share/julia/stdlib/v1.3/Distributed/src/process_messages.jl:79
macro expansion at /Users/bward/repos/julia13/usr/share/julia/stdlib/v1.3/Distributed/src/process_messages.jl:294 [inlined]
#107 at ./task.jl:333
Stacktrace:
 [1] #remotecall_fetch#145(::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}, ::typeof(remotecall_fetch), ::Function, ::Distributed.Worker, ::Type, ::Vararg{Any,N} where N) at /Users/bward/repos/julia13/usr/share/julia/stdlib/v1.3/Distributed/src/remotecall.jl:390
 [2] remotecall_fetch(::Function, ::Distributed.Worker, ::Type, ::Vararg{Any,N} where N) at /Users/bward/repos/julia13/usr/share/julia/stdlib/v1.3/Distributed/src/remotecall.jl:382
 [3] #remotecall_fetch#148(::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}, ::typeof(remotecall_fetch), ::Function, ::Int64, ::Type, ::Vararg{Any,N} where N) at /Users/bward/repos/julia13/usr/share/julia/stdlib/v1.3/Distributed/src/remotecall.jl:417
 [4] remotecall_fetch at /Users/bward/repos/julia13/usr/share/julia/stdlib/v1.3/Distributed/src/remotecall.jl:417 [inlined]
 [5] (::MerCounting.Counters.var"#1#2"{Mer{DNAAlphabet{2},31},DatastoreBuffer{PairedReads{DNAAlphabet{4}}},Canonical,Array{Array{MerCount{Mer{DNAAlphabet{2},31}},1},1},Int64})() at ./task.jl:333

...and 3 more exception(s).

Stacktrace:
 [1] sync_end(::Array{Any,1}) at ./task.jl:300
 [2] dist_mem(::Type{Mer{DNAAlphabet{2},31}}, ::DatastoreBuffer{PairedReads{DNAAlphabet{4}}}, ::Canonical) at ./task.jl:319
 [3] top-level scope at REPL[6]:1

I hit a bad file descriptor error.

If I re-write things so as each process opens it’s own buffered datastore:

function _do_serial_mem(::Type{M}, ::Type{D}, filename::String, count_mode::CountMode) where {M<:AbstractMer,D<:ReadDatastore}
    ds = open(D, filename)
    part = myid():nprocs():length(ds)
    counts = serial_mem(M, buffer(ds), count_mode, part)
    close(ds)
    return counts
end

function dist_mem2(::Type{M}, ::Type{D}, filename::String, count_mode::CountMode) where {M<:AbstractMer,D<:ReadDatastore}
    @info "Splitting all reads in $filename, across $(nprocs()) processes and counting kmers"
    local_counts = Vector{Vector{MerCount{M}}}(undef, nprocs())
    @sync for p in procs()
        @async local_counts[p] = remotecall_fetch(_do_serial_mem, p, M, D, filename, count_mode)
    end
    @info "Merging all counts from $(nprocs()) processes"
    final_count = local_counts[1]
    @inbounds for i in 2:lastindex(local_counts)
        unsafe_merge_into!(final_count, local_counts[i])
    end
    return final_count
end

Then it works:

julia> using Distributed, BioSequences, ReadDatastores

julia> addprocs(4, exeflags="--project=.")
4-element Array{Int64,1}:
 2
 3
 4
 5

julia> @everywhere using MerCounting
[ Info: Precompiling MerCounting [a20136b7-8e32-4c10-91d3-7060c0bd8ec7]

julia> o = MerCounting.Counters.dist_mem2(DNAMer{31}, PairedReads{DNAAlphabet{4}}, "full-paired.prseq", MerCounting.CANONICAL)

I was wondering if there’s a way to make the first version work so it’s a bit more conveinient to just call the dist_mem method on a datastore that is already loaded on the master process - perhaps there’s a way to make it so as the datastores get sent to worker processes properly without the bad file descriptor error?

The trouble is when you open a file for reading the OS gives you a “file descriptor” which is only valid for that process (I believe it’s actually a number that the OS mapped to an open file). So passing that file descriptor to another process is useless because that process hasn’t told the OS to open the file, and even if it did, it would probably get a different file descriptor. And worse, distributed could be across multiple machines, and a file descriptor would definitely have no meaning on another machine…

So really you just have 2 options, the first would to be use multiple threads. That will work since each thread is in the same process and uses the same file description. HOWEVER you probably need to protect access to the file if two threads do a seek() then a read() around the same time, the last seek() wins and both threads would read at that offset.

The other option is to have each Distributed instance open the file for reading, which is what you doing in your re-write. :slight_smile:

1 Like

That makes a lot of sense. I’ve changed it so as if you pass the method a datastore, then it gets the filename and each process opens it’s own version - just so as the user can do this with datastores they’ve already opened in their session, without having to call the method with a filename and expected datastore type - so the API is nicer.

I’m open to trying a multithreadded version, but I’m not familiar with protecting blocks of code using locks and so on and I never see too many examples of julia code doing it.