Hi,
I am trying to establish a means to merge two sorted integer list sources (sorted within their start and end ranges only); currently obtained from a memory mapped matrix. The values of the start and end of the lists is stored externally to the source, to the extent that I can guarantee (for the sake argument) that
some_value_a = { StartIndex = 1 , EndIndex = 5 }
some_value_b = { StartIndex = 6 , EndIndex = 70 }
etc. etc. from which I am able to process all of the indexes for given values; which I then use as indexes into a “dataFile” from where the matrix pointer was originally defined.
As a result, my need is to be able to (initially) merge values from a given column of the matrix to process as a filter. This works very efficiently and my current strategy is to pull the values into vectors, then apply a classic merge algorithm against x collections (in pairs).
However, this is only really suitable for testing, since my eventual row count will be in the millions. I will also have to merge many values from single columns and intersect merge - outputs across columns; and would also like to sensibly partition my files going forward. To this end, I instead sought to (base.) iterate over the “left” and “right” merge sets pairs (initially), but found this to be cumbersome, leading me to revisit and research further: finding Transducers, which appears to satisfy this need quite appropriately.
My attempt, however, is causing a stack overflow:
using Transducers , StructTypes
using Random
#1:40 |> Partition(8) |> Filter(x -> prod(x) % 11 == 0) |> Cat() |> Scan(+) |> sum
using Transducers: Transducer, R_, next, inner, xform
using Transducers: start, complete, wrap, unwrap, wrapping
struct ColumnMerge <: Transducer
#history::Int
skip::Int
leftStartVal::Int
rightStartVal::Int
leftendRow::Int
rightendRow::Int
mergeCol #:: = queryArray[queryArrayCol]
#queryArray
end
function Transducers.start(rf::R_{ColumnMerge}, result)
buffer = []
# rng = MersenneTwister(xform(rf).seed)
# totalLength = 1
currentIndex = 1
left = xform(rf).leftStartVal
right = xform(rf).rightStartVal
leftendRow = xform(rf).leftendRow
rightendRow = xform(rf).rightendRow
queryArray = xform(rf).mergeCol
private_state = ( left , right , leftendRow, rightendRow , queryArray)
println(private_state)
return wrap(rf, private_state, start(inner(rf), result))
end
function Transducers.next(rf::R_{ColumnMerge}, result, left , right , setlength)
wrapping(rf, result) do (left, right , setlength), iresult
#global returnVal
# println(private_state)
println( (left, right , setlength), iresult)
if left <= xform(rf).leftendRow && right <= xform(rf).rightendRow
leftVal = xform(rf).mergeCol[left]
rightVal = xform(rf).mergeCol[right]
if leftVal < rightVal
left += 1
result = leftVal
else
right += 1
result = rightVal
end
setlength += 1
end
println( (left, right , setlength), iresult)
iresult = next(inner(rf), result, left , right , setlength)
return (left, right , setlength), iresult
end
end
function Transducers.complete(rf::R_{ColumnMerge}, result)
# (buffer, _), iresult = unwrap(rf, result)
# for x in buffer
# iresult = next(inner(rf), iresult, x)
# end
# return complete(inner(rf), iresult)
end
v = [1,2,3,4,5,6,7,8,9,10]
a = ColumnMerge(0,1,5,4,10,v)
foreach(a , 1:5) do x
println(x)
end
throwing:
StackOverflowError:
Stacktrace:
[1] next(::Transducers.Reduction{ColumnMerge,Transducers.BottomRF{Transducers.SideEffect{var"#15#16"}}}, ::Transducers.PrivateState{Transducers.Reduction{ColumnMerge,Transducers.BottomRF{Transducers.SideEffect{var"#15#16"}}},Tuple{Int64,Int64,Int64,Int64,Array{Int64,1}},Nothing}, ::Int64) at C:\Users\doliver.julia\packages\Transducers\AgX5g\src\core.jl:491
[2] (::Transducers.Reduction{ColumnMerge,Transducers.BottomRF{Transducers.SideEffect{var"#15#16"}}})(::Transducers.PrivateState{Transducers.Reduction{ColumnMerge,Transducers.BottomRF{Transducers.SideEffect{var"#15#16"}}},Tuple{Int64,Int64,Int64,Int64,Array{Int64,1}},Nothing}, ::Int64) at C:\Users\doliver.julia\packages\Transducers\AgX5g\src\core.jl:255
… (the last 2 lines are repeated 39990 more times)
[79983] next(::Transducers.Reduction{ColumnMerge,Transducers.BottomRF{Transducers.SideEffect{var"#15#16"}}}, ::Transducers.PrivateState{Transducers.Reduction{ColumnMerge,Transducers.BottomRF{Transducers.SideEffect{var"#15#16"}}},Tuple{Int64,Int64,Int64,Int64,Array{Int64,1}},Nothing}, ::Int64) at C:\Users\doliver.julia\packages\Transducers\AgX5g\src\core.jl:491
as you can probably tell, I have attempted to follow the “writing Transducers” example and should be most grateful if you could help in getting this working as the mistake isn’t clear to me at all (I haven’t reviewed the source code as yet, but it wasn’t clear to me the relationship of the private_state and the calling code from the documents)
Further, how would I go about merging a vector of value pointers in this way? I “clearly” only need a final output vector of size StartIndex + EndIndex across all values and believe I wouldn’t require any intermediaries here given what Transducers describes, i.e, the iterated merge of one pair could feed the iterated merge of another etc. or, all values could be merged in one iteration approach by scanning the value pointers.
Finally, after having obtained all values from a given column, my need is to then accommodate values from other columns, to the extent that the result would be the intersection of merges, representing the actual values selected; like an indexed query engine of sorts. e.g.:
valueVectors = Vector{Vector{Int64}}()
for columnValues in columnValuesList
push!(valueVectors , MergeValues(columnValues))
end
placeholder = valueVectors(1)
for v in valueVectors, 2: length(valueVectors)
placeholder = intersect!(placeholder , v)
end
There would appear to be some considerable opportunity to do this in not only a threaded / distributed way, but to leverage Transducers and foldl (or maybe even Folds.jl that I came across recently?) in the way that you articulate in your video - I am unable to really make inroads as yet.
Regards