Transducers and reduct-like reduction

Dear All,

this question is mainly for @tkf . I would like to ask, if some there is a reducer in Transducers.jl that would implement reduce-like paralelism. Using your example, I am looking for this:

(a + b) + (c + d) =
                                  +
                                  / \
                                 /   \
                                /     \        reduce-like grouping
                               +       +
                              / \     / \
                             a   b   c   d

I have tried foldxt as foldxt(merge_dicts, Map(mapfun), items), but it does not seem to implement the above.

My problems is the complexity of merge_dicts, which scales linearly with size of input, which is growing as more items are created. The mapfun creates sorted arrays, which are merged in merge_dicts.

Thanks for an answer in advance.

merge(::AbstractDict, ::AbstractDict) is a reduce compatible op (aka monoid). See Combining containers for more information. This tutorial also includes strategies for how to decrease the cost in the base cases.

Thanks,

I will take a look on it.

Hi,

I have been playing with various strategies. Needless to say @tkf 's tutorials are awesome. I have decided to go with Distributed based parallelism instead of threads, because I am reading a lot of files and I am afraid that that GC would be intervening too much. My current approach is executing

foldxd(mergewith(append!), Map(flatten2dict), collect(enumerate(files)))

where the function flatten2dict creates a Dict which is reduced by mergewith. The problem is that produced Dicts are enormous and it crashes with error

ERROR: On worker 3:
peer 4 didn't connect to 3 within 59.999993085861206 seconds
StackTrace
ERROR: On worker 3:
peer 4 didn't connect to 3 within 59.999993085861206 seconds
Stacktrace:
  [1] error
    @ ./error.jl:33
  [2] wait_for_conn
    @ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/cluster.jl:194
  [3] check_worker_state
    @ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/cluster.jl:168
  [4] send_msg_
    @ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/messages.jl:165
  [5] send_msg
    @ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/messages.jl:122 [inlined]
  [6] #remotecall_fetch#143
    @ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:389 [inlin
ed]
  [7] remotecall_fetch
    @ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:386
  [8] #remotecall_fetch#146
    @ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:421 [inlined]
  [9] remotecall_fetch
    @ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:421 [inlined]
 [10] call_on_owner
    @ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:494 [inlined]
 [11] fetch
    @ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:533
 [12] iterate
    @ ./generator.jl:47 [inlined]
 [13] collect_to!
    @ ./array.jl:724
 [14] collect_to_with_first!
    @ ./array.jl:702
 [15] _collect
    @ ./array.jl:696
 [16] collect_similar
    @ ./array.jl:606 [inlined]
 [17] map
    @ ./abstractarray.jl:2294 [inlined]
 [18] #199
    @ ~/.julia/packages/Transducers/4zaoS/src/dreduce.jl:100
 [19] #106
    @ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/process_messages.jl:278
 [20] run_work_thunk
    @ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/process_messages.jl:63
 [21] macro expansion
    @ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/process_messages.jl:278
 [22] #105
    @ ./task.jl:411
Stacktrace:
  [1] #remotecall_fetch#143
    @ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:394 [inlin
ed]
  [2] remotecall_fetch(f::Function, w::Distributed.Worker, args::Vector{Future})
    @ Distributed /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.j
l:386
  [3] remotecall_fetch(f::Function, id::Int64, args::Vector{Future}; kwargs::Base.Iterators.Pairs{Union{}, Union{},
Tuple{}, NamedTuple{(), Tuple{}}})
    @ Distributed /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.j
l:421
  [4] remotecall_fetch(f::Function, id::Int64, args::Vector{Future})
    @ Distributed /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.j
l:421
  [5] remotecall_pool(rc_f::Function, f::Function, pool::WorkerPool, args::Vector{Future}; kwargs::Base.Iterators.Pa
irs{Union{}, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
    @ Distributed /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/workerpool.j
l:123
  [6] remotecall_pool
    @ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/workerpool.jl:121 [inlin
ed]
  [7] #remotecall_fetch#182
    @ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/workerpool.jl:229 [inlin
ed]
  [8] remotecall_fetch(f::Function, pool::WorkerPool, args::Vector{Future})
    @ Distributed /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/workerpool.j
l:229

Can anyone help me please, how to resolve this issue? I hunch is that serialization of dict and sending between processes is complicated.

1 Like