I’ve been trying to create code which creates a producer/consumer type process using tasks, and I’m having some trouble, if people don’t mind commenting on a somewhat verbose real world example. This is for code that will go in BioJulia if it proves successful.
I’ve noticed in 0.6 the tasks documentation task documentation has changed and shows you how to create channels that tasks can put and take values from, and that is what I have tried to do.
Below is my producer code. I don’t think the internal bit-fiddling details are overly relevant, but it takes binary encoded data chunks of two BioJulia sequences, and aligns them, and stores them in a BitsChunk struct, and put!
s the BitsChunk on a channel for the consumer to take and compute with. The function aligned_bits
creates and returns the Channel, using anonymous function because of the 1-parameter requirement as stated in the docs for creating Channels with tasks.
The producer:
struct BitsChunk
abits::UInt64
bbits::UInt64
ishead::Bool
istail::Bool
remaining::Int
end
@inline function remaining(bc::BitsChunk)
return bc.remaining
end
@inline function abits(bc::BitsChunk)
return bc.abits
end
@inline function bbits(bc::BitsChunk)
return bc.bbits
end
@inline function bit_chunks(bc::BitsChunk)
return abits(bc), bbits(bc)
end
@inline function ishead(bc::BitsChunk)
return bc.ishead
end
@inline function istail(bc::BitsChunk)
return bc.istail
end
function aligned_bits(a::BioSequence{A}, b::BioSequence{A}) where {A}
if length(a) > length(b)
return Channel((c::Channel{BitsChunk}) -> _aligned_bits(c, b, a), ctype=BitsChunk)
else
return Channel((c::Channel{BitsChunk}) -> _aligned_bits(c, a, b), ctype=BitsChunk)
end
end
function _aligned_bits(c::Channel{BitsChunk},
a::BioSequence{A},
b::BioSequence{A}) where {A}
@assert length(a) ≤ length(b)
nexta = bitindex(a, 1)
stopa = bitindex(a, endof(a) + 1)
nextb = bitindex(b, 1)
stopb = bitindex(b, endof(b) + 1)
#=
Note that updating `nextb` or `nexta` by 64, increases the chunk
index, but the `offset(nextb)` will remain the same.
The first thing we need to sort out is to correctly align the head of
sequence / subsequence `a`s data such that the offset of `nexta` is
essentially reduced to 0.
With sequence / subsequence `a` aligned, from there, we only need to
worry about the alignment of sequence / subsequence `b` with respect to `a`.
=#
if nexta < stopa && offset(nexta) != 0
# Here we shift the first data chunks to the right so as the first
# nucleotide of the seq/subseq is the first nibble / pair of bits.
x = a.data[index(nexta)] >> offset(nexta)
y = b.data[index(nextb)] >> offset(nextb)
# Check there is something to go and get from the next chunk of `b`.
# Check like so: `64 - offset(nextb) < stopb - nextb`.
# If this is not true of `b`, then it is certainly not true of `a`.
if offset(nextb) > offset(nexta) && 64 - offset(nextb) < stopb - nextb
y |= b.data[index(nextb) + 1] << (64 - offset(nextb))
end
#=
Check if the chunk of `a` we are currently aligning contains the end
of seq/subseq `a`.
Check by seeing if the distance to the end of the sequence is smaller
than the distance to the next word.
If so, the mask used needs to be defined to account for this:
`mask(stopa - nexta)`, otherwise the mask simply needs to be
`mask(64 - offset(nexta))`.
Finally, move position markers by k, meaning they move to either the
next chunk, or the end of the sequence if it is in the current integer.
=#
rem_word = 64 - offset(nexta)
rem_stop = stopa - nexta
rem = ifelse(rem_stop < rem_word, rem_stop, rem_word)
m = mask(rem)
put!(c, BitsChunk(x & m, y & m, true, false, rem))
nexta += rem
nextb += rem
end
@assert offset(nexta) == 0
if offset(nextb) == 0 # data are aligned with each other.
while stopa - nexta ≥ 64
x = a.data[index(nexta)]
y = b.data[index(nextb)]
put!(c, BitsChunk(x, y, false, false, stopa - nexta))
nexta += 64
nextb += 64
end
if nexta < stopa # process the remaining tail.
x = a.data[index(nexta)]
y = b.data[index(nextb)]
rem = stopa - nexta
m = mask(rem)
put!(c, BitsChunk(x & m, y & m, false, true, rem))
end
elseif nexta < stopa # b is unaligned with a.
y = b.data[index(nextb)]
nextb += 64
while stopa - nexta ≥ 64
x = a.data[index(nexta)]
z = b.data[index(nextb)]
y = y >> offset(nextb) | z << (64 - offset(nextb))
put!(c, BitsChunk(x, y, false, false, stopa - nexta))
y = z
nexta += 64
nextb += 64
end
if nexta < stopa # process the remaining tail.
x = a.data[index(nexta)]
y = y >> offset(nextb)
if 64 - offset(nextb) < stopa - nexta
y |= b.data[index(nextb)] << (64 - offset(nextb))
end
rem = stopa - nexta
m = mask(rem)
put!(c, BitsChunk(x & m, y & m, false, true, rem))
end
end
end
I have verified that this producer works:
using BioSequences
a = rna"CGAGCCGCAUGGAGGAUUUAGGGAAGACCGCCUCUGAUCCGUCUAUACCCCAUACUGGAAUUCGUCAAUAACGGCCCUCAUCUCUUUUU"
b = rna"UKR-BCWACNWVMCMYCYUSSMURNBUWGKNVHU-RGCNKGHHKGNDRBWKYD-HDKHHUGS"
julia> ab = BioSequences.aligned_bits(a, b)
Channel{BioSequences.BitsChunk}(sz_max:0,sz_curr:1)
julia> collect(ab)
4-element Array{BioSequences.BitsChunk,1}:
BioSequences.BitsChunk(0xa32379f2192e05c8, 0x1441448124224142, false, false, 248)
BioSequences.BitsChunk(0x7fc498ef583668a2, 0x2242214114441888, false, false, 184)
BioSequences.BitsChunk(0x5df4cbb4cf24508b, 0x2181828422814828, false, false, 120)
BioSequences.BitsChunk(0x00648bbcdb0dac9e, 0x0088114482181222, false, true, 56)
# Yaay, I get my lovely aligned binary data chunks to work with in the consumer.
The consumer, take!
s the BitsChunks
structs and does some counting for me: It creates the channel, and as you can see in several places take!
s values from the channel.
function bitpar_counter(::Type{S}, a::BioSequence{A}, b::BioSequence{A}) where {S<:Site,A}
println("Open channel.")
bits_channel = aligned_bits(a, b)
counts = bp_start_counter(S, A)
println("Take first block.")
block = take!(bits_channel)
println("Block:")
println(block)
x, y = bit_chunks(block)
counts = bp_update_counter(counts, bp_chunk_count(S, A, x, y))
println("First counts: ", counts)
println("Was it an aligned head block: ", ishead(block))
println("Correct for emptyspace: ", bp_correct_emptyspace(S, A))
if ishead(block) && bp_correct_emptyspace(S, A)
nempty = div(64, bitsof(A)) - div(remaining(block), bitsof(A))
println("Empty: ", nempty)
counts = bp_emptyspace_correction(nempty, counts)
println("Corrected counts: ", counts)
end
while isopen(bits_channel)
println("In the body code block.")
block = take!(bits_channel)
println("Block: ")
println(block)
x, y = bit_chunks(block)
counts = bp_update_counter(counts, bp_chunk_count(S, A, x, y))
println("Counts: ", counts)
end
if istail(block)
println("In the tail code block.")
if bp_correct_emptyspace(S, A)
nempty = div(64, bitsof(A)) - div(remaining(block), bitsof(A))
counts = bp_emptyspace_correction(nempty, counts)
end
end
return counts
end
There are quite a few debugging println
statements in the above consumer, that allowed me to see that it works:
julia> using BioSequences
INFO: Recompiling stale cache file /Users/bward/.julia/lib/v0.6/BioSequences.ji for module BioSequences.
julia> a = rna"CGAGCCGCAUGGAGGAUUUAGGGAAGACCGCCUCUGAUCCGUCUAUACCCCAUACUGGAAUUCGUCAAUAACGGCCCUCAUCUCUUUUU"
89nt RNA Sequence:
CGAGCCGCAUGGAGGAUUUAGGGAAGACCGCCUCUGAUC…CAUACUGGAAUUCGUCAAUAACGGCCCUCAUCUCUUUUU
julia> b = rna"UKR-BCWACNWVMCMYCYUSSMURNBUWGKNVHU-RGCNKGHHKGNDRBWKYD-HDKHHUGS"
62nt RNA Sequence:
UKR-BCWACNWVMCMYCYUSSMURNBUWGKNVHU-RGCNKGHHKGNDRBWKYD-HDKHHUGS
julia> count(Mismatch, a, b)
Open channel.
Take first block.
Block:
BioSequences.BitsChunk(0xa32379f2192e05c8, 0x1441448124224142, false, false, 248)
First counts: 15
Was it an aligned head block: false
Correct for emptyspace: false
In the body code block.
Block:
BioSequences.BitsChunk(0x7fc498ef583668a2, 0x2242214114441888, false, false, 184)
Counts: 30
In the body code block.
Block:
BioSequences.BitsChunk(0x5df4cbb4cf24508b, 0x2181828422814828, false, false, 120)
Counts: 45
In the body code block.
Block:
BioSequences.BitsChunk(0x00648bbcdb0dac9e, 0x0088114482181222, false, true, 56)
Counts: 59
In the tail code block.
59
So I’m satisfied with what I made, until I comment out the println statements, all of a sudden it does not work, it tells me the channel get’s closed:
julia> using BioSequences
INFO: Recompiling stale cache file /Users/bward/.julia/lib/v0.6/BioSequences.ji for module BioSequences.
julia> a = rna"CGAGCCGCAUGGAGGAUUUAGGGAAGACCGCCUCUGAUCCGUCUAUACCCCAUACUGGAAUUCGUCAAUAACGGCCCUCAUCUCUUUUU"
89nt RNA Sequence:
CGAGCCGCAUGGAGGAUUUAGGGAAGACCGCCUCUGAUC…CAUACUGGAAUUCGUCAAUAACGGCCCUCAUCUCUUUUU
julia> b = rna"UKR-BCWACNWVMCMYCYUSSMURNBUWGKNVHU-RGCNKGHHKGNDRBWKYD-HDKHHUGS"
62nt RNA Sequence:
UKR-BCWACNWVMCMYCYUSSMURNBUWGKNVHU-RGCNKGHHKGNDRBWKYD-HDKHHUGS
julia> count(Mismatch, a, b)
ERROR: InvalidStateException("Channel is closed.", :closed)
Stacktrace:
[1] try_yieldto(::Base.##296#297{Task}, ::Task) at ./event.jl:189
[2] wait() at ./event.jl:234
[3] take_unbuffered(::Channel{BioSequences.BitsChunk}) at ./channels.jl:340
[4] take!(::Channel{BioSequences.BitsChunk}) at ./channels.jl:317
[5] bitpar_counter(::Type{BioSequences.Mismatch}, ::BioSequences.BioSequence{BioSequences.RNAAlphabet{4}}, ::BioSequences.BioSequence{BioSequences.RNAAlphabet{4}}) at /Users/bward/.julia/v0.6/BioSequences/src/bioseq/site_counting/count_sites_bitpar.jl:36
[6] count(::Type{BioSequences.Mismatch}, ::BioSequences.BioSequence{BioSequences.RNAAlphabet{4}}, ::BioSequences.BioSequence{BioSequences.RNAAlphabet{4}}) at /Users/bward/.julia/v0.6/BioSequences/src/bioseq/site_counting/site_counting.jl:119
I find this odd, as my consumers while loop depends on the isopen
condition of the channel, and I would assume the very first take! statement would have something to take.
Can anyone point me as to why I’m getting this error occur when I take debug lines out of my code, that - on the face of it - I do not expect to affect the running of the code?
(My working copy of this is on this branch: https://github.com/Ward9250/BioSequences.jl/tree/bitaligned_iter if anyone wanted to download it, if the reason for the error is not a simple one.)
Thanks,
Ben