Confusion about tasks and Channels

I’ve been trying to create code which creates a producer/consumer type process using tasks, and I’m having some trouble, if people don’t mind commenting on a somewhat verbose real world example. This is for code that will go in BioJulia if it proves successful.

I’ve noticed in 0.6 the tasks documentation task documentation has changed and shows you how to create channels that tasks can put and take values from, and that is what I have tried to do.

Below is my producer code. I don’t think the internal bit-fiddling details are overly relevant, but it takes binary encoded data chunks of two BioJulia sequences, and aligns them, and stores them in a BitsChunk struct, and put!s the BitsChunk on a channel for the consumer to take and compute with. The function aligned_bits creates and returns the Channel, using anonymous function because of the 1-parameter requirement as stated in the docs for creating Channels with tasks.

The producer:

struct BitsChunk
    abits::UInt64
    bbits::UInt64
    ishead::Bool
    istail::Bool
    remaining::Int
end

@inline function remaining(bc::BitsChunk)
    return bc.remaining
end

@inline function abits(bc::BitsChunk)
    return bc.abits
end

@inline function bbits(bc::BitsChunk)
    return bc.bbits
end

@inline function bit_chunks(bc::BitsChunk)
    return abits(bc), bbits(bc)
end

@inline function ishead(bc::BitsChunk)
    return bc.ishead
end

@inline function istail(bc::BitsChunk)
    return bc.istail
end

function aligned_bits(a::BioSequence{A}, b::BioSequence{A}) where {A}
    if length(a) > length(b)
        return Channel((c::Channel{BitsChunk}) -> _aligned_bits(c, b, a), ctype=BitsChunk)
    else
        return Channel((c::Channel{BitsChunk}) -> _aligned_bits(c, a, b), ctype=BitsChunk)
    end
end

function _aligned_bits(c::Channel{BitsChunk},
                      a::BioSequence{A},
                      b::BioSequence{A}) where {A}

    @assert length(a) ≤ length(b)
    nexta = bitindex(a, 1)
    stopa = bitindex(a, endof(a) + 1)
    nextb = bitindex(b, 1)
    stopb = bitindex(b, endof(b) + 1)
    #=
    Note that updating `nextb` or `nexta` by 64, increases the chunk
    index, but the `offset(nextb)` will remain the same.

    The first thing we need to sort out is to correctly align the head of
    sequence / subsequence `a`s data such that the offset of `nexta` is
    essentially reduced to 0.
    With sequence / subsequence `a` aligned, from there, we only need to
    worry about the alignment of sequence / subsequence `b` with respect to `a`.
    =#
    if nexta < stopa && offset(nexta) != 0
        # Here we shift the first data chunks to the right so as the first
        # nucleotide of the seq/subseq is the first nibble / pair of bits.
        x = a.data[index(nexta)] >> offset(nexta)
        y = b.data[index(nextb)] >> offset(nextb)
        # Check there is something to go and get from the next chunk of `b`.
        # Check like so: `64 - offset(nextb) < stopb - nextb`.
        # If this is not true of `b`, then it is certainly not true of `a`.
        if offset(nextb) > offset(nexta) && 64 - offset(nextb) < stopb - nextb
            y |= b.data[index(nextb) + 1] << (64 - offset(nextb))
        end
        #=
        Check if the chunk of `a` we are currently aligning contains the end
        of seq/subseq `a`.
        Check by seeing if the distance to the end of the sequence is smaller
        than the distance to the next word.

        If so, the mask used needs to be defined to account for this:
        `mask(stopa - nexta)`, otherwise the mask simply needs to be
        `mask(64 - offset(nexta))`.

        Finally, move position markers by k, meaning they move to either the
        next chunk, or the end of the sequence if it is in the current integer.
        =#
        rem_word = 64 - offset(nexta)
        rem_stop = stopa - nexta
        rem = ifelse(rem_stop < rem_word, rem_stop, rem_word)
        m = mask(rem)
        put!(c, BitsChunk(x & m, y & m, true, false, rem))
        nexta += rem
        nextb += rem
    end
    @assert offset(nexta) == 0
    if offset(nextb) == 0 # data are aligned with each other.
        while stopa - nexta ≥ 64
            x = a.data[index(nexta)]
            y = b.data[index(nextb)]
            put!(c, BitsChunk(x, y, false, false, stopa - nexta))
            nexta += 64
            nextb += 64
        end
        if nexta < stopa # process the remaining tail.
            x = a.data[index(nexta)]
            y = b.data[index(nextb)]
            rem = stopa - nexta
            m = mask(rem)
            put!(c, BitsChunk(x & m, y & m, false, true, rem))
        end
    elseif nexta < stopa # b is unaligned with a.
        y = b.data[index(nextb)]
        nextb += 64
        while stopa - nexta ≥ 64
            x = a.data[index(nexta)]
            z = b.data[index(nextb)]
            y = y >> offset(nextb) | z << (64 - offset(nextb))
            put!(c, BitsChunk(x, y, false, false, stopa - nexta))
            y = z
            nexta += 64
            nextb += 64
        end
        if nexta < stopa # process the remaining tail.
            x = a.data[index(nexta)]
            y = y >> offset(nextb)
            if 64 - offset(nextb) < stopa - nexta
                y |= b.data[index(nextb)] << (64 - offset(nextb))
            end
            rem = stopa - nexta
            m = mask(rem)
            put!(c, BitsChunk(x & m, y & m, false, true, rem))
        end
    end
end

I have verified that this producer works:

using BioSequences

a = rna"CGAGCCGCAUGGAGGAUUUAGGGAAGACCGCCUCUGAUCCGUCUAUACCCCAUACUGGAAUUCGUCAAUAACGGCCCUCAUCUCUUUUU"
b = rna"UKR-BCWACNWVMCMYCYUSSMURNBUWGKNVHU-RGCNKGHHKGNDRBWKYD-HDKHHUGS"

julia> ab = BioSequences.aligned_bits(a, b)
Channel{BioSequences.BitsChunk}(sz_max:0,sz_curr:1)

julia> collect(ab)
4-element Array{BioSequences.BitsChunk,1}:
 BioSequences.BitsChunk(0xa32379f2192e05c8, 0x1441448124224142, false, false, 248)
 BioSequences.BitsChunk(0x7fc498ef583668a2, 0x2242214114441888, false, false, 184)
 BioSequences.BitsChunk(0x5df4cbb4cf24508b, 0x2181828422814828, false, false, 120)
 BioSequences.BitsChunk(0x00648bbcdb0dac9e, 0x0088114482181222, false, true, 56) 

# Yaay, I get my lovely aligned binary data chunks to work with in the consumer.

The consumer, take!s the BitsChunks structs and does some counting for me: It creates the channel, and as you can see in several places take!s values from the channel.

function bitpar_counter(::Type{S}, a::BioSequence{A}, b::BioSequence{A}) where {S<:Site,A}
    println("Open channel.")
    bits_channel = aligned_bits(a, b)
    counts = bp_start_counter(S, A)
    println("Take first block.")
    block = take!(bits_channel)
    println("Block:")
    println(block)
    x, y = bit_chunks(block)
    counts = bp_update_counter(counts, bp_chunk_count(S, A, x, y))
    println("First counts: ", counts)
    println("Was it an aligned head block: ", ishead(block))
    println("Correct for emptyspace: ", bp_correct_emptyspace(S, A))
    if ishead(block) && bp_correct_emptyspace(S, A)
        nempty = div(64, bitsof(A)) - div(remaining(block), bitsof(A))
        println("Empty: ", nempty)
        counts = bp_emptyspace_correction(nempty, counts)
        println("Corrected counts: ", counts)
    end
    while isopen(bits_channel)
        println("In the body code block.")
        block = take!(bits_channel)
        println("Block: ")
        println(block)
        x, y = bit_chunks(block)
        counts = bp_update_counter(counts, bp_chunk_count(S, A, x, y))
        println("Counts: ", counts)
    end
    if istail(block)
        println("In the tail code block.")
        if bp_correct_emptyspace(S, A)
            nempty = div(64, bitsof(A)) - div(remaining(block), bitsof(A))
            counts = bp_emptyspace_correction(nempty, counts)
        end
    end
    return counts
end

There are quite a few debugging println statements in the above consumer, that allowed me to see that it works:

julia> using BioSequences
INFO: Recompiling stale cache file /Users/bward/.julia/lib/v0.6/BioSequences.ji for module BioSequences.

julia> a = rna"CGAGCCGCAUGGAGGAUUUAGGGAAGACCGCCUCUGAUCCGUCUAUACCCCAUACUGGAAUUCGUCAAUAACGGCCCUCAUCUCUUUUU"
89nt RNA Sequence:
CGAGCCGCAUGGAGGAUUUAGGGAAGACCGCCUCUGAUC…CAUACUGGAAUUCGUCAAUAACGGCCCUCAUCUCUUUUU

julia> b = rna"UKR-BCWACNWVMCMYCYUSSMURNBUWGKNVHU-RGCNKGHHKGNDRBWKYD-HDKHHUGS"
62nt RNA Sequence:
UKR-BCWACNWVMCMYCYUSSMURNBUWGKNVHU-RGCNKGHHKGNDRBWKYD-HDKHHUGS

julia> count(Mismatch, a, b)
Open channel.
Take first block.
Block:
BioSequences.BitsChunk(0xa32379f2192e05c8, 0x1441448124224142, false, false, 248)
First counts: 15
Was it an aligned head block: false
Correct for emptyspace: false
In the body code block.
Block: 
BioSequences.BitsChunk(0x7fc498ef583668a2, 0x2242214114441888, false, false, 184)
Counts: 30
In the body code block.
Block: 
BioSequences.BitsChunk(0x5df4cbb4cf24508b, 0x2181828422814828, false, false, 120)
Counts: 45
In the body code block.
Block: 
BioSequences.BitsChunk(0x00648bbcdb0dac9e, 0x0088114482181222, false, true, 56)
Counts: 59
In the tail code block.
59

So I’m satisfied with what I made, until I comment out the println statements, all of a sudden it does not work, it tells me the channel get’s closed:

julia> using BioSequences
INFO: Recompiling stale cache file /Users/bward/.julia/lib/v0.6/BioSequences.ji for module BioSequences.

julia> a = rna"CGAGCCGCAUGGAGGAUUUAGGGAAGACCGCCUCUGAUCCGUCUAUACCCCAUACUGGAAUUCGUCAAUAACGGCCCUCAUCUCUUUUU"
89nt RNA Sequence:
CGAGCCGCAUGGAGGAUUUAGGGAAGACCGCCUCUGAUC…CAUACUGGAAUUCGUCAAUAACGGCCCUCAUCUCUUUUU

julia> b = rna"UKR-BCWACNWVMCMYCYUSSMURNBUWGKNVHU-RGCNKGHHKGNDRBWKYD-HDKHHUGS"
62nt RNA Sequence:
UKR-BCWACNWVMCMYCYUSSMURNBUWGKNVHU-RGCNKGHHKGNDRBWKYD-HDKHHUGS

julia> count(Mismatch, a, b)
ERROR: InvalidStateException("Channel is closed.", :closed)
Stacktrace:
 [1] try_yieldto(::Base.##296#297{Task}, ::Task) at ./event.jl:189
 [2] wait() at ./event.jl:234
 [3] take_unbuffered(::Channel{BioSequences.BitsChunk}) at ./channels.jl:340
 [4] take!(::Channel{BioSequences.BitsChunk}) at ./channels.jl:317
 [5] bitpar_counter(::Type{BioSequences.Mismatch}, ::BioSequences.BioSequence{BioSequences.RNAAlphabet{4}}, ::BioSequences.BioSequence{BioSequences.RNAAlphabet{4}}) at /Users/bward/.julia/v0.6/BioSequences/src/bioseq/site_counting/count_sites_bitpar.jl:36
 [6] count(::Type{BioSequences.Mismatch}, ::BioSequences.BioSequence{BioSequences.RNAAlphabet{4}}, ::BioSequences.BioSequence{BioSequences.RNAAlphabet{4}}) at /Users/bward/.julia/v0.6/BioSequences/src/bioseq/site_counting/site_counting.jl:119

I find this odd, as my consumers while loop depends on the isopen condition of the channel, and I would assume the very first take! statement would have something to take.

Can anyone point me as to why I’m getting this error occur when I take debug lines out of my code, that - on the face of it - I do not expect to affect the running of the code?

(My working copy of this is on this branch: https://github.com/Ward9250/BioSequences.jl/tree/bitaligned_iter if anyone wanted to download it, if the reason for the error is not a simple one.)

Thanks,
Ben

I’ve come up with a version which works, using the loop api for channels, basically taking the while loop out and switching it with a for loop. This works now, but if someone knows why it wasn’t working in the first place, what it was about Channels or Tasks I was forgetting or not seeing I would be eternally grateful.

function bitpar_counter(::Type{S}, a::BioSequence{A}, b::BioSequence{A}) where {S<:Site,A}
    bits_channel = aligned_bits(a, b)
    counts = bp_start_counter(S, A)
    block = take!(bits_channel)
    x, y = bit_chunks(block)
    counts = bp_update_counter(counts, bp_chunk_count(S, A, x, y))
    if ishead(block) && bp_correct_emptyspace(S, A)
        nempty = div(64, bitsof(A)) - div(remaining(block), bitsof(A))
        counts = bp_emptyspace_correction(nempty, counts)
    end
    for block in bits_channel
        x, y = bit_chunks(block)
        counts = bp_update_counter(counts, bp_chunk_count(S, A, x, y))
    end
    if istail(block) && bp_correct_emptyspace(S, A)
        nempty = div(64, bitsof(A)) - div(remaining(block), bitsof(A))
        counts = bp_emptyspace_correction(nempty, counts)
    end
    return counts
end
julia> using BioSequences
INFO: Recompiling stale cache file /Users/bward/.julia/lib/v0.6/BioSequences.ji for module BioSequences.

julia> a = rna"CGAGCCGCAUGGAGGAUUUAGGGAAGACCGCCUCUGAUCCGUCUAUACCCCAUACUGGAAUUCGUCAAUAACGGCCCUCAUCUCUUUUU"
89nt RNA Sequence:
CGAGCCGCAUGGAGGAUUUAGGGAAGACCGCCUCUGAUC…CAUACUGGAAUUCGUCAAUAACGGCCCUCAUCUCUUUUU

julia> b = rna"UKR-BCWACNWVMCMYCYUSSMURNBUWGKNVHU-RGCNKGHHKGNDRBWKYD-HDKHHUGS"
62nt RNA Sequence:
UKR-BCWACNWVMCMYCYUSSMURNBUWGKNVHU-RGCNKGHHKGNDRBWKYD-HDKHHUGS

julia> count(Mismatch, a, b)
59

EDIT: This solution is also significantly slower than the non-task based version of the code currently in BioJulia, even when checking for and removing type ambiguity flagged in @code_warntype. So there must be some performance penalty to using channels and tasks to achieve iteration?