I think this is a fairly common senario:
You have a collection of files,
and each file contains a set of records (e.g. one JSON entry per line).
And you which to kinda lazily load up those records, without caring about maintaining order…
Possibly you want to do something with the records like filter some of them out or transform them etc.
You want to do it lazily because you maybe can’t (or don’t want to) hold all the data in memory at once,
or maybe you just want to be able to test your algorithm on the first 10_000 before scaling up to the full dataset.
Channels are totally the tool for the job when it comes to lazily loaded data.
(See my blog post on them)
But how do you scale that up for having many “feeders” put!
ing on to a shared channel?
I’ve abstracted that functionality first for for Async, and then for Distributed code.
I’m not particularly happy with the name, channel_load
It is a kind of mapreduce
(but that is not saying much: almost any operation can be described as “a kind of map reduce”).
It doesn’t just apply to file loading:
any time you have a collection of things that each can become a collection of things,
and you want all the things gathered up this works.
Maybe mapflatten
is a good name. lazy_mapflatten
.
Async loading
Async should help with this,
since IO is the kinda thing async normally helps with.
(I’m not sure how that works, but apparently it does? Maybe it is that things other than disk IO can be getting done will we wait for the OS to get stuff from disk).
Async isn’t true parallism, it is just swapping Tasks when one is blocked/yeilds.
Only one bit of julia code runs at a time.
function async_channel_load(fileload_func, filenames; ctype=Any, csize=2^16)
Channel(ctype=ctype], csize=csize) do ch
@sync for fn in filenames
@async fileload_func(ch, fn)
end
# channel will be closed once this do block completes.
end
end
The @sync
needed to not exist this loop before all contrained @asyncs are done.
The @async
starts the new tasks for each file.
The Channel do
starts a task of its own, to run that do block.
and so the async_channel_load
actually returns the channel almost immediately.
Most of the loading will happen as you try to read from the channel.
This is fairly simply now that it is in front of me.
It took me a while to work out, as I’ld not seen any examples.
There is one in the manual (IIRC) that does have a channel being feed by multiple asyncs,
but it doesn’t close the channel when it is done.
Not closing channels when all feeders are done is bad, as it means the consumer can’t tell that there is nothing left.
(and so you can’t use it with a for loop directly).
For example
We have a bunch of gzipped textfiles, with a json entry on each-line.
We want to parse them,
and return all the ones which have a quality
rating above 0.5.
using JSON, CodecZlib, Glob
data = async_channel_load(glob("*.gz", datadir)) do fn, ch
for line in eachline(GzipDecompressorStream(open(fn)))
datum = JSON.parse(line)
if datum["quality"] > 0.5
put!(ch, datum)
end
end
end
collect(Base.Iterators.take(data, 10_000)) # Put the first 10_000 into a Vector
Distributed loading
This is more fiddly.
But it is true parallelism.
If there is solid computation to be done on each item,
and the time taken to do the inter-process communication isn’t larger than the processing time,
then this is going to be faster.
I guess as a rule of thumb, if you are tossing out a substantial portion of all records you load from each file.
(Tossing out as in not put!
in them).
This will be pretty good.
function distributed_channel_load(fileload_func, filenames; ctype=Any, csize=2^16)
Channel(ctype=ctype, csize=csize) do local_ch
remote_ch = RemoteChannel(()->local_ch)
c_pool = CachingPool(workers())
file_dones = remotecall.(fileload_func, c_pool, remote_ch, filenames)
# Wait till all the all files are done
for file_done in file_dones
wait(file_done)
end
clear!(c_pool)
# channel will be closed once this do block completes.
end
end
This is a bit more complex. But actually not a lot once you’ve gronked the Async version.
We need to make a RemoteChannel
to wrap our Channel
so that it is available to the workers.
We use a CachingPool
which we clear!
at the end, so that we only send fileload_func
to each worker once (in case it is a big closure),
and we clear!
it to free that memory back up on the workers at the end.
We use a broadcasted remotecall
which tasks the place of the loop of @async
s.
(Could use @spawn
but I’ve never liked that macro, it doesn’t seem to give much that remote_call
doesn’t, and macros are intrincially harder to reason about than functions (since they could be doing anything))
The remotecall
s each return a Future
that will be ready, when the remotecall
has completed.
So we loop over wait
ing for them, to make sure we don’t close the channel before they are all done.
The loop of wait
s takes the place of the @sync
.
For example
This is the same example as before; basically.
using JSON, CodecZlib, Glob
distributed_channel_load(glob("*.gz", datadir)) do fn, ch
for line in eachline(GzipDecompressorStream(open(fn)))
datum = JSON.parse(line)
if datum["quality"] > 0.5
put!(ch, datum)
end
end
end
collect(Base.Iterators.take(data, 10_000)) # Put the first 10_000 into a Vector
Thoughts?
I might turn this into a blog post or maybe a package later.