I’ve got a serial function:
function serial_mem(::Type{M}, input::DatastoreBuffer{<:ReadDatastore}, count_mode::CountMode, range::AbstractRange = 1:length(input)) where {M<:AbstractMer}
@info "[Process: $(myid())] Collecting kmers from $(length(range)) reads in read datastore $(name(ReadDatastores.datastore(input)))"
all_mers = collect_mers(M, count_mode, input, range)
@info "[Process: $(myid())] Collapsing collected kmers into counts"
return collapse_into_counts(all_mers)
end
Which I wanted to run distributed. The variable input
is a type which references another type, which references an IOStream: DataStoreBuffer → ReadDatastore → IOStream.
So I made the following function:
function dist_mem(::Type{M}, input::DatastoreBuffer{<:ReadDatastore}, count_mode::CountMode) where {M<:AbstractMer}
@info "Splitting all reads in $(name(ReadDatastores.datastore(input))), across $(nprocs()) processes and counting kmers"
local_counts = Vector{Vector{MerCount{M}}}(undef, nprocs())
@sync for p in procs()
@async local_counts[p] = remotecall_fetch(serial_mem, p, M, input, count_mode, p:nprocs():length(input))
end
@info "Merging all counts from $(nprocs()) processes"
final_count = local_counts[1]
@inbounds for i in 2:lastindex(local_counts)
unsafe_merge_into!(final_count, local_counts[i])
end
return final_count
end
However if I try to run this:
julia> using Distributed, BioSequences, ReadDatastores
julia> addprocs(4, exeflags="--project=.")
4-element Array{Int64,1}:
2
3
4
5
julia> @everywhere using MerCounting
[ Info: Precompiling MerCounting [a20136b7-8e32-4c10-91d3-7060c0bd8ec7]
julia> ds = @openreads "full-paired.prseq"
Paired Read Datastore 'full-paired': 988980 reads (494490 pairs)
julia> ds = buffer(ds)
Buffered Paired Read Datastore 'full-paired': 988980 reads (494490 pairs)
julia> MerCounting.Counters.dist_mem(DNAMer{31}, ds, MerCounting.CANONICAL)
[ Info: Splitting all reads in full-paired, across 5 processes and counting kmers
[ Info: [Process: 1] Collecting kmers from 197796 reads in read datastore full-paired
[ Info: [Process: 1] Collapsing collected kmers into counts
[ Info: [Process: 2] Collecting kmers from 197796 reads in read datastore full-paired
[ Info: [Process: 3] Collecting kmers from 197796 reads in read datastore full-paired
[ Info: [Process: 4] Collecting kmers from 197796 reads in read datastore full-paired
[ Info: [Process: 5] Collecting kmers from 197796 reads in read datastore full-paired
ERROR: TaskFailedException:
On worker 2:
SystemError: seek: Bad file descriptor
#systemerror#44 at ./error.jl:134
systemerror at ./error.jl:134 [inlined]
seek at ./iostream.jl:108
collect_mers at /Users/bward/.julia/packages/ReadDatastores/8WVvz/src/sequence-buffer.jl:64
serial_mem at /Users/bward/.julia/packages/MerCounting/Mv5HZ/src/counters/serial_mem.jl:39
#108 at /Users/bward/repos/julia13/usr/share/julia/stdlib/v1.3/Distributed/src/process_messages.jl:294
run_work_thunk at /Users/bward/repos/julia13/usr/share/julia/stdlib/v1.3/Distributed/src/process_messages.jl:79
macro expansion at /Users/bward/repos/julia13/usr/share/julia/stdlib/v1.3/Distributed/src/process_messages.jl:294 [inlined]
#107 at ./task.jl:333
Stacktrace:
[1] #remotecall_fetch#145(::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}, ::typeof(remotecall_fetch), ::Function, ::Distributed.Worker, ::Type, ::Vararg{Any,N} where N) at /Users/bward/repos/julia13/usr/share/julia/stdlib/v1.3/Distributed/src/remotecall.jl:390
[2] remotecall_fetch(::Function, ::Distributed.Worker, ::Type, ::Vararg{Any,N} where N) at /Users/bward/repos/julia13/usr/share/julia/stdlib/v1.3/Distributed/src/remotecall.jl:382
[3] #remotecall_fetch#148(::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}, ::typeof(remotecall_fetch), ::Function, ::Int64, ::Type, ::Vararg{Any,N} where N) at /Users/bward/repos/julia13/usr/share/julia/stdlib/v1.3/Distributed/src/remotecall.jl:417
[4] remotecall_fetch at /Users/bward/repos/julia13/usr/share/julia/stdlib/v1.3/Distributed/src/remotecall.jl:417 [inlined]
[5] (::MerCounting.Counters.var"#1#2"{Mer{DNAAlphabet{2},31},DatastoreBuffer{PairedReads{DNAAlphabet{4}}},Canonical,Array{Array{MerCount{Mer{DNAAlphabet{2},31}},1},1},Int64})() at ./task.jl:333
...and 3 more exception(s).
Stacktrace:
[1] sync_end(::Array{Any,1}) at ./task.jl:300
[2] dist_mem(::Type{Mer{DNAAlphabet{2},31}}, ::DatastoreBuffer{PairedReads{DNAAlphabet{4}}}, ::Canonical) at ./task.jl:319
[3] top-level scope at REPL[6]:1
I hit a bad file descriptor error.
If I re-write things so as each process opens it’s own buffered datastore:
function _do_serial_mem(::Type{M}, ::Type{D}, filename::String, count_mode::CountMode) where {M<:AbstractMer,D<:ReadDatastore}
ds = open(D, filename)
part = myid():nprocs():length(ds)
counts = serial_mem(M, buffer(ds), count_mode, part)
close(ds)
return counts
end
function dist_mem2(::Type{M}, ::Type{D}, filename::String, count_mode::CountMode) where {M<:AbstractMer,D<:ReadDatastore}
@info "Splitting all reads in $filename, across $(nprocs()) processes and counting kmers"
local_counts = Vector{Vector{MerCount{M}}}(undef, nprocs())
@sync for p in procs()
@async local_counts[p] = remotecall_fetch(_do_serial_mem, p, M, D, filename, count_mode)
end
@info "Merging all counts from $(nprocs()) processes"
final_count = local_counts[1]
@inbounds for i in 2:lastindex(local_counts)
unsafe_merge_into!(final_count, local_counts[i])
end
return final_count
end
Then it works:
julia> using Distributed, BioSequences, ReadDatastores
julia> addprocs(4, exeflags="--project=.")
4-element Array{Int64,1}:
2
3
4
5
julia> @everywhere using MerCounting
[ Info: Precompiling MerCounting [a20136b7-8e32-4c10-91d3-7060c0bd8ec7]
julia> o = MerCounting.Counters.dist_mem2(DNAMer{31}, PairedReads{DNAAlphabet{4}}, "full-paired.prseq", MerCounting.CANONICAL)
I was wondering if there’s a way to make the first version work so it’s a bit more conveinient to just call the dist_mem method on a datastore that is already loaded on the master process - perhaps there’s a way to make it so as the datastores get sent to worker processes properly without the bad file descriptor error?