ANN: Transducers.jl 0.3. taking "zeros" seriously, type stability improvements, fusible groupby, OnlineStats, "GPU support", and more

I don’t think so. If r/fold doesn’t terminate, it is wasting time and also violating Reduced semantics; it shouldn’t call the reducing/combining function once it returns Reduced. My impression is that r/fold just simply does not support reduced since it predates transducers.

No, it doesn’t have to be. With associativity, you can use (a + b) + (c + d) (reduce-order) instead of ((a + b) + c) + d (foldl-order). (a + b) + (c + d) is great because you can run a + b and c + d in parallel. But it does not change the result if + really is associative.

Now, if you have an element z such that for all x you have op(z, x) = op(x, z) = z, you can skip some computations. Multiplication is a good example ((z, op) = (0, *)). Reduced is also such an example (as long as the reducing function is associative). Consider following computation with some reducing function @ that may return a Reduced:

ans = (a @ b) @ (c @ d) @ (e @ f) @ (g @ h)
    =   (i    @    j)   @   (k    @    l)
    =         m         @         n

where the letter just below @ indicates that it is the output of the computation above (e.g., i = a @ b).

If you have two threads, you may start computing a @ b and c @ d in parallel. If c @ d returns a Reduced before a @ b finishes the computations, you can cancel the tasks for e @ f, g @ h and k @ l. However, if you first start computing a @ b and e @ f in parallel and e @ f returns a Reduced, you should not terminate c @ d, i @ j etc. but it is OK to terminate g @ h.

This may be the source of confusion. Those equations do not hold in general. For example, if I have A = reduced(a) then I have

(A + B) + C = reduced(a) + C = reduced(a)
A + (B + C) = reduced(a)

OK, so you take the first reduced result. Where first means earlier in the collection if you were to iterate through in a serial manner.

Exactly!

Hi,

I am trying to establish a means to merge two sorted integer list sources (sorted within their start and end ranges only); currently obtained from a memory mapped matrix. The values of the start and end of the lists is stored externally to the source, to the extent that I can guarantee (for the sake argument) that

some_value_a = { StartIndex = 1 , EndIndex = 5 }
some_value_b = { StartIndex = 6 , EndIndex = 70 }

etc. etc. from which I am able to process all of the indexes for given values; which I then use as indexes into a “dataFile” from where the matrix pointer was originally defined.

As a result, my need is to be able to (initially) merge values from a given column of the matrix to process as a filter. This works very efficiently and my current strategy is to pull the values into vectors, then apply a classic merge algorithm against x collections (in pairs).

However, this is only really suitable for testing, since my eventual row count will be in the millions. I will also have to merge many values from single columns and intersect merge - outputs across columns; and would also like to sensibly partition my files going forward. To this end, I instead sought to (base.) iterate over the “left” and “right” merge sets pairs (initially), but found this to be cumbersome, leading me to revisit and research further: finding Transducers, which appears to satisfy this need quite appropriately.

My attempt, however, is causing a stack overflow:

using Transducers , StructTypes
using Random
#1:40 |> Partition(8) |> Filter(x -> prod(x) % 11 == 0) |> Cat() |> Scan(+) |> sum
using Transducers: Transducer, R_, next, inner, xform
using Transducers: start, complete, wrap, unwrap, wrapping


struct ColumnMerge <: Transducer
    #history::Int
    skip::Int
    leftStartVal::Int
    rightStartVal::Int
    leftendRow::Int
    rightendRow::Int
    mergeCol #:: = queryArray[queryArrayCol]
    #queryArray
end


function Transducers.start(rf::R_{ColumnMerge}, result)
    buffer = []
  #  rng = MersenneTwister(xform(rf).seed)
  #  totalLength = 1
    currentIndex = 1
    left = xform(rf).leftStartVal
    right = xform(rf).rightStartVal
    leftendRow = xform(rf).leftendRow
    rightendRow = xform(rf).rightendRow
    queryArray = xform(rf).mergeCol
    private_state = ( left , right , leftendRow, rightendRow , queryArray)
    println(private_state)
    return wrap(rf, private_state, start(inner(rf), result))
end


function Transducers.next(rf::R_{ColumnMerge}, result, left , right , setlength)
    wrapping(rf, result) do (left, right , setlength), iresult
#global returnVal
      #  println(private_state)
        println( (left, right , setlength), iresult)
      if  left <= xform(rf).leftendRow  && right <= xform(rf).rightendRow
        leftVal = xform(rf).mergeCol[left]
        rightVal = xform(rf).mergeCol[right]
        if leftVal < rightVal
            left += 1        
                result = leftVal
        else
                right += 1
                result = rightVal
        end
        setlength += 1
    end
println( (left, right , setlength), iresult)
      iresult = next(inner(rf), result, left , right , setlength)
        return (left, right , setlength), iresult
    end
end

function Transducers.complete(rf::R_{ColumnMerge}, result)
   # (buffer, _), iresult = unwrap(rf, result)
  #  for x in buffer
   #  iresult = next(inner(rf), iresult, x)
  #  end
    
    #    return complete(inner(rf), iresult)
end

v = [1,2,3,4,5,6,7,8,9,10]
a = ColumnMerge(0,1,5,4,10,v)


foreach(a , 1:5) do x
        
    println(x)
    
end

throwing:

StackOverflowError:

Stacktrace:
[1] next(::Transducers.Reduction{ColumnMerge,Transducers.BottomRF{Transducers.SideEffect{var"#15#16"}}}, ::Transducers.PrivateState{Transducers.Reduction{ColumnMerge,Transducers.BottomRF{Transducers.SideEffect{var"#15#16"}}},Tuple{Int64,Int64,Int64,Int64,Array{Int64,1}},Nothing}, ::Int64) at C:\Users\doliver.julia\packages\Transducers\AgX5g\src\core.jl:491
[2] (::Transducers.Reduction{ColumnMerge,Transducers.BottomRF{Transducers.SideEffect{var"#15#16"}}})(::Transducers.PrivateState{Transducers.Reduction{ColumnMerge,Transducers.BottomRF{Transducers.SideEffect{var"#15#16"}}},Tuple{Int64,Int64,Int64,Int64,Array{Int64,1}},Nothing}, ::Int64) at C:\Users\doliver.julia\packages\Transducers\AgX5g\src\core.jl:255
… (the last 2 lines are repeated 39990 more times)
[79983] next(::Transducers.Reduction{ColumnMerge,Transducers.BottomRF{Transducers.SideEffect{var"#15#16"}}}, ::Transducers.PrivateState{Transducers.Reduction{ColumnMerge,Transducers.BottomRF{Transducers.SideEffect{var"#15#16"}}},Tuple{Int64,Int64,Int64,Int64,Array{Int64,1}},Nothing}, ::Int64) at C:\Users\doliver.julia\packages\Transducers\AgX5g\src\core.jl:491

as you can probably tell, I have attempted to follow the “writing Transducers” example and should be most grateful if you could help in getting this working as the mistake isn’t clear to me at all (I haven’t reviewed the source code as yet, but it wasn’t clear to me the relationship of the private_state and the calling code from the documents)

Further, how would I go about merging a vector of value pointers in this way? I “clearly” only need a final output vector of size StartIndex + EndIndex across all values and believe I wouldn’t require any intermediaries here given what Transducers describes, i.e, the iterated merge of one pair could feed the iterated merge of another etc. or, all values could be merged in one iteration approach by scanning the value pointers.

Finally, after having obtained all values from a given column, my need is to then accommodate values from other columns, to the extent that the result would be the intersection of merges, representing the actual values selected; like an indexed query engine of sorts. e.g.:

valueVectors = Vector{Vector{Int64}}()

for columnValues in columnValuesList
    
      push!(valueVectors ,  MergeValues(columnValues))
    
end

placeholder = valueVectors(1)
for v in valueVectors, 2: length(valueVectors)
   
   placeholder =  intersect!(placeholder , v)
    
end

There would appear to be some considerable opportunity to do this in not only a threaded / distributed way, but to leverage Transducers and foldl (or maybe even Folds.jl that I came across recently?) in the way that you articulate in your video - I am unable to really make inroads as yet.

Regards

If you want to implement something that acts as a “data source”, I’d recommend implementing it as a “foldable” which, confusingly (sorry!) I also called reducible in Transducers.jl manual: Writing reducibles · Transducers.jl

But I think implementing foldables directly with Transducers.jl can be a bit low-level (not super hard, though). FYI, FGenerators.jl provides a nice syntax sugar for implementing foldables:

using FGenerators

struct ColumnMerge{T} <: Foldable
    leftStartVal::Int
    rightStartVal::Int
    leftendRow::Int
    rightendRow::Int
    mergeCol::T
end

@fgenerator(foldable::ColumnMerge) do
    left = foldable.leftStartVal
    right = foldable.rightStartVal
    while left <= foldable.leftendRow && right <= foldable.rightendRow
        leftVal = foldable.mergeCol[left]
        rightVal = foldable.mergeCol[right]
        if leftVal == rightVal
            @yield (left = left, right = right, value = leftVal)
            left += 1
            right += 1
        elseif leftVal < rightVal
            left += 1
        else
            right += 1
        end
    end
end

foreach(ColumnMerge(1, 6, 5, 10, [1:5; [1, 2, 3, 6, 7]])) do x
    @show x
end
#=
Prints:
x = (left = 1, right = 6, value = 1)
x = (left = 2, right = 7, value = 2)
x = (left = 3, right = 8, value = 3)
=#

# `ColumnMerge` works with Transducers.jl:
using Transducers

xs = ColumnMerge(1, 6, 5, 10, [1:5; [1, 2, 3, 6, 7]]) |>
    Filter(x -> isodd(x.value)) |>
    collect

@show xs
#=
Prints:
2-element Array{NamedTuple{(:left, :right, :value),Tuple{Int64,Int64,Int64}},1}:
 (left = 1, right = 6, value = 1)
 (left = 3, right = 8, value = 3)
=#
2 Likes

I’ve been following transducers off and on for a while now. Exciting to see you are at 0.4.5!The problem you’re solving is a big deal, so thank you and everyone who has contributed :).

Is there a roadmap for getting to 1.0? If not how stable is the syntax likely to be in the <1.0 era?

Hi, good to know that you are interested in Transducers.jl!

There’s no super concrete plan but I’m thinking to split the interface part of Transducers.jl in to very lightweight “FoldsBase.jl” package. It requires a bit of juggling of the API and some breaking changes. I plan to do this in 0.5.x. If it works well, I’ll just promote it to 1.0. Anyway, that just my rough idea so… let’s see :slight_smile:

4 Likes

FWIW I hope you don’t feel pressured to commit to API stability anytime soon. I feel there’s still a lot of possibilities to explore.

1 Like

Yea my intentions weren’t to direct you to finalizing anything. I just like to ask this about packages before I decide whether I will consider using them or not. Especially for production type purposes.

Many thanks for providing this example - I was able to successfully implement for this use case using FGenerators.jl. (posting in case useful for anyone) :

using FGenerators

struct ColumnMerge{T} <: Foldable
    leftStartVal::Int
    rightStartVal::Int
    leftendRow::Int
    rightendRow::Int
    mergeCol::T
end

@fgenerator function MergeRanges(foldable::ColumnMerge)
    left = foldable.leftStartVal
    right = foldable.rightStartVal
    setlength = 1
     while left <= foldable.leftendRow  && right <= foldable.rightendRow
        leftVal = foldable.mergeCol[left]
        rightVal = foldable.mergeCol[right]
        if leftVal < rightVal
             left += 1     
          @yield leftVal
        else
              right += 1
          @yield rightVal
        end
        
    end
       
     while left <= foldable.leftendRow
            @yield foldable.mergeCol[left]
            left += 1   
       end
       
    while right <= foldable.rightendRow
            @yield foldable.mergeCol[right]
             right += 1
    end    
end

However, my end goal (apologies if this wasn’t clear before) is still to try and merge K “span values”, i.e. not just two as above.

This is where I started to originally consider a foldl / Iterators.accumulate(f, itr; [init]) for the function @fgenerator MergeRanges (which didnt work since the function isn’t iterable and calling collect would defeat the purpose), or maybe just taking the individual spans and doing something naive (psuedocode):

mutable struct ColumnValues{T} <: Foldable
    startRow::Int
    endRow::Int    
    mergeCol::T
end


@fgenerator function IterateColValues(foldable::ColumnValues)
    index = foldable.startRow
     while index <= foldable.endRow 
        returnVal = foldable.mergeCol[index]

          @yield returnVal
             index += 1   
    end

end


columnValuesA = ColumnValues(1,2,columnToQuery)
columnValuesB = ColumnValues(5,6,columnToQuery)
        .
        .
        .
        .

columnValuesN = ColumnValues(100,10000,columnToQuery)

columnValues = Vector{ColumnValues}()

push!(columnValues, columnValuesA)
push!(columnValues, columnValuesB)
push!(columnValues, columnValuesN)


valueStore = Vector{Int64}()
currentMin = 0
getNext = Dict{Int64,Bool}()
pushmin = false
while setLength <= totalSetLength
    for (f , index) in columnValues

        if(getNext[index])
           value = IterateColValues(f).GetNext
            else 
            value = IterateColValues(f).Current
        end
       #test value if next min
        if(value < currentMin)
            currentMin = value
            pushmin = true
            getNext[index] = true
        else
            getNext[index] = false
        end
    end
       if (pushmin)
           push!(valueStore, currentMin)
          totalSetLength += 1 
           pushmin = false
       end
end

I am aware that merging across K sorted vectors is normally solved via something like leveraging a minimally sized heap (which is no doubt in a Julia package) or sorted dictionary but I would still need to call some version of “next” against each iterator to eliminate the need for intermediary vectors and instead just leveraging the embedded sorted property of my vectors on disk to stream the data in one pass since I am very resource constrained in my deployment environment (hence the choice of memory mapped files etc.).

Is there anything in Tranducers / Julia Folds / something else that might be able to simplify this. For what its worth - this is “another” incredible Julia package that is so far advanced of things I’ve encountered in other languages: the whole ecosystem and language design is so impressive.

Regards

1 Like