I think this is a fairly common senario:
You have a collection of files,
and each file contains a set of records (e.g. one JSON entry per line).
And you which to kinda lazily load up those records, without caring about maintaining order…
Possibly you want to do something with the records like filter some of them out or transform them etc.
You want to do it lazily because you maybe can’t (or don’t want to) hold all the data in memory at once,
or maybe you just want to be able to test your algorithm on the first 10_000 before scaling up to the full dataset.
Channels are totally the tool for the job when it comes to lazily loaded data.
(See my blog post on them)
But how do you scale that up for having many “feeders”
put!ing on to a shared channel?
I’ve abstracted that functionality first for for Async, and then for Distributed code.
I’m not particularly happy with the name,
It is a kind of
mapreduce (but that is not saying much: almost any operation can be described as “a kind of map reduce”).
It doesn’t just apply to file loading:
any time you have a collection of things that each can become a collection of things,
and you want all the things gathered up this works.
mapflatten is a good name.
Async should help with this,
since IO is the kinda thing async normally helps with.
(I’m not sure how that works, but apparently it does? Maybe it is that things other than disk IO can be getting done will we wait for the OS to get stuff from disk).
Async isn’t true parallism, it is just swapping Tasks when one is blocked/yeilds.
Only one bit of julia code runs at a time.
function async_channel_load(fileload_func, filenames; ctype=Any, csize=2^16) Channel(ctype=ctype], csize=csize) do ch @sync for fn in filenames @async fileload_func(ch, fn) end # channel will be closed once this do block completes. end end
@sync needed to not exist this loop before all contrained @asyncs are done.
@async starts the new tasks for each file.
Channel do starts a task of its own, to run that do block.
and so the
async_channel_load actually returns the channel almost immediately.
Most of the loading will happen as you try to read from the channel.
This is fairly simply now that it is in front of me.
It took me a while to work out, as I’ld not seen any examples.
There is one in the manual (IIRC) that does have a channel being feed by multiple asyncs,
but it doesn’t close the channel when it is done.
Not closing channels when all feeders are done is bad, as it means the consumer can’t tell that there is nothing left.
(and so you can’t use it with a for loop directly).
We have a bunch of gzipped textfiles, with a json entry on each-line.
We want to parse them,
and return all the ones which have a
quality rating above 0.5.
using JSON, CodecZlib, Glob data = async_channel_load(glob("*.gz", datadir)) do fn, ch for line in eachline(GzipDecompressorStream(open(fn))) datum = JSON.parse(line) if datum["quality"] > 0.5 put!(ch, datum) end end end collect(Base.Iterators.take(data, 10_000)) # Put the first 10_000 into a Vector
This is more fiddly.
But it is true parallelism.
If there is solid computation to be done on each item,
and the time taken to do the inter-process communication isn’t larger than the processing time,
then this is going to be faster.
I guess as a rule of thumb, if you are tossing out a substantial portion of all records you load from each file.
(Tossing out as in not
put! in them).
This will be pretty good.
function distributed_channel_load(fileload_func, filenames; ctype=Any, csize=2^16) Channel(ctype=ctype, csize=csize) do local_ch remote_ch = RemoteChannel(()->local_ch) c_pool = CachingPool(workers()) file_dones = remotecall.(fileload_func, c_pool, remote_ch, filenames) # Wait till all the all files are done for file_done in file_dones wait(file_done) end clear!(c_pool) # channel will be closed once this do block completes. end end
This is a bit more complex. But actually not a lot once you’ve gronked the Async version.
We need to make a
RemoteChannel to wrap our
Channel so that it is available to the workers.
We use a
CachingPool which we
clear! at the end, so that we only send
fileload_func to each worker once (in case it is a big closure),
clear! it to free that memory back up on the workers at the end.
We use a broadcasted
remotecall which tasks the place of the loop of
@spawn but I’ve never liked that macro, it doesn’t seem to give much that
remote_call doesn’t, and macros are intrincially harder to reason about than functions (since they could be doing anything))
remotecalls each return a
Future that will be ready, when the
remotecall has completed.
So we loop over
waiting for them, to make sure we don’t close the channel before they are all done.
The loop of
waits takes the place of the
This is the same example as before; basically.
using JSON, CodecZlib, Glob distributed_channel_load(glob("*.gz", datadir)) do fn, ch for line in eachline(GzipDecompressorStream(open(fn))) datum = JSON.parse(line) if datum["quality"] > 0.5 put!(ch, datum) end end end collect(Base.Iterators.take(data, 10_000)) # Put the first 10_000 into a Vector
I might turn this into a blog post or maybe a package later.