Dear All,
this question is mainly for @tkf . I would like to ask, if some there is a reducer in Transducers.jl that would implement reduce-like
paralelism. Using your example, I am looking for this:
(a + b) + (c + d) =
+
/ \
/ \
/ \ reduce-like grouping
+ +
/ \ / \
a b c d
I have tried foldxt
as foldxt(merge_dicts, Map(mapfun), items)
, but it does not seem to implement the above.
My problems is the complexity of merge_dicts
, which scales linearly with size of input, which is growing as more items are created. The mapfun
creates sorted arrays, which are merged in merge_dicts
.
Thanks for an answer in advance.
tkf
August 9, 2021, 10:05pm
2
merge(::AbstractDict, ::AbstractDict)
is a reduce
compatible op
(aka monoid). See Combining containers for more information. This tutorial also includes strategies for how to decrease the cost in the base cases.
Thanks,
I will take a look on it.
Hi,
I have been playing with various strategies. Needless to say @tkf 's tutorials are awesome. I have decided to go with Distributed
based parallelism instead of threads, because I am reading a lot of files and I am afraid that that GC would be intervening too much. My current approach is executing
foldxd(mergewith(append!), Map(flatten2dict), collect(enumerate(files)))
where the function flatten2dict
creates a Dict
which is reduced by mergewith
. The problem is that produced Dicts
are enormous and it crashes with error
ERROR: On worker 3:
peer 4 didn't connect to 3 within 59.999993085861206 seconds
StackTrace
ERROR: On worker 3:
peer 4 didn't connect to 3 within 59.999993085861206 seconds
Stacktrace:
[1] error
@ ./error.jl:33
[2] wait_for_conn
@ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/cluster.jl:194
[3] check_worker_state
@ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/cluster.jl:168
[4] send_msg_
@ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/messages.jl:165
[5] send_msg
@ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/messages.jl:122 [inlined]
[6] #remotecall_fetch#143
@ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:389 [inlin
ed]
[7] remotecall_fetch
@ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:386
[8] #remotecall_fetch#146
@ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:421 [inlined]
[9] remotecall_fetch
@ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:421 [inlined]
[10] call_on_owner
@ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:494 [inlined]
[11] fetch
@ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:533
[12] iterate
@ ./generator.jl:47 [inlined]
[13] collect_to!
@ ./array.jl:724
[14] collect_to_with_first!
@ ./array.jl:702
[15] _collect
@ ./array.jl:696
[16] collect_similar
@ ./array.jl:606 [inlined]
[17] map
@ ./abstractarray.jl:2294 [inlined]
[18] #199
@ ~/.julia/packages/Transducers/4zaoS/src/dreduce.jl:100
[19] #106
@ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/process_messages.jl:278
[20] run_work_thunk
@ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/process_messages.jl:63
[21] macro expansion
@ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/process_messages.jl:278
[22] #105
@ ./task.jl:411
Stacktrace:
[1] #remotecall_fetch#143
@ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:394 [inlin
ed]
[2] remotecall_fetch(f::Function, w::Distributed.Worker, args::Vector{Future})
@ Distributed /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.j
l:386
[3] remotecall_fetch(f::Function, id::Int64, args::Vector{Future}; kwargs::Base.Iterators.Pairs{Union{}, Union{},
Tuple{}, NamedTuple{(), Tuple{}}})
@ Distributed /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.j
l:421
[4] remotecall_fetch(f::Function, id::Int64, args::Vector{Future})
@ Distributed /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.j
l:421
[5] remotecall_pool(rc_f::Function, f::Function, pool::WorkerPool, args::Vector{Future}; kwargs::Base.Iterators.Pa
irs{Union{}, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
@ Distributed /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/workerpool.j
l:123
[6] remotecall_pool
@ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/workerpool.jl:121 [inlin
ed]
[7] #remotecall_fetch#182
@ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/workerpool.jl:229 [inlin
ed]
[8] remotecall_fetch(f::Function, pool::WorkerPool, args::Vector{Future})
@ Distributed /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/workerpool.j
l:229
Can anyone help me please, how to resolve this issue? I hunch is that serialization of dict and sending between processes is complicated.
1 Like