Example of Async/Distributed loading into a single shared Channel

I think this is a fairly common senario:
You have a collection of files,
and each file contains a set of records (e.g. one JSON entry per line).
And you which to kinda lazily load up those records, without caring about maintaining order…
Possibly you want to do something with the records like filter some of them out or transform them etc.
You want to do it lazily because you maybe can’t (or don’t want to) hold all the data in memory at once,
or maybe you just want to be able to test your algorithm on the first 10_000 before scaling up to the full dataset.
Channels are totally the tool for the job when it comes to lazily loaded data.
(See my blog post on them)
But how do you scale that up for having many “feeders” put!ing on to a shared channel?

I’ve abstracted that functionality first for for Async, and then for Distributed code.
I’m not particularly happy with the name, channel_load
It is a kind of mapreduce (but that is not saying much: almost any operation can be described as “a kind of map reduce”).
It doesn’t just apply to file loading:
any time you have a collection of things that each can become a collection of things,
and you want all the things gathered up this works.
Maybe mapflatten is a good name. lazy_mapflatten.

Async loading

Async should help with this,
since IO is the kinda thing async normally helps with.
(I’m not sure how that works, but apparently it does? Maybe it is that things other than disk IO can be getting done will we wait for the OS to get stuff from disk).
Async isn’t true parallism, it is just swapping Tasks when one is blocked/yeilds.
Only one bit of julia code runs at a time.

function async_channel_load(fileload_func, filenames; ctype=Any, csize=2^16)
    Channel(ctype=ctype], csize=csize) do ch
        @sync for fn in filenames
            @async fileload_func(ch, fn)
        end
		# channel will be closed once this do block completes.
    end
end

The @sync needed to not exist this loop before all contrained @asyncs are done.
The @async starts the new tasks for each file.
The Channel do starts a task of its own, to run that do block.
and so the async_channel_load actually returns the channel almost immediately.
Most of the loading will happen as you try to read from the channel.

This is fairly simply now that it is in front of me.
It took me a while to work out, as I’ld not seen any examples.
There is one in the manual (IIRC) that does have a channel being feed by multiple asyncs,
but it doesn’t close the channel when it is done.
Not closing channels when all feeders are done is bad, as it means the consumer can’t tell that there is nothing left.
(and so you can’t use it with a for loop directly).

For example

We have a bunch of gzipped textfiles, with a json entry on each-line.
We want to parse them,
and return all the ones which have a quality rating above 0.5.

using JSON, CodecZlib, Glob

data = async_channel_load(glob("*.gz", datadir)) do fn, ch
	for line in eachline(GzipDecompressorStream(open(fn)))
		datum = JSON.parse(line)
		if datum["quality"] > 0.5
			put!(ch, datum)
		end
	end
end

collect(Base.Iterators.take(data, 10_000)) # Put the first 10_000 into a Vector

Distributed loading

This is more fiddly.
But it is true parallelism.
If there is solid computation to be done on each item,
and the time taken to do the inter-process communication isn’t larger than the processing time,
then this is going to be faster.
I guess as a rule of thumb, if you are tossing out a substantial portion of all records you load from each file.
(Tossing out as in not put! in them).
This will be pretty good.

function distributed_channel_load(fileload_func, filenames; ctype=Any, csize=2^16)
    Channel(ctype=ctype, csize=csize) do local_ch
		remote_ch = RemoteChannel(()->local_ch)

		c_pool = CachingPool(workers())
		file_dones = remotecall.(fileload_func, c_pool, remote_ch, filenames)

		# Wait till all the all files are done
		for file_done in file_dones
			wait(file_done)
		end 
		clear!(c_pool)
		# channel will be closed once this do block completes.
	end
end

This is a bit more complex. But actually not a lot once you’ve gronked the Async version.
We need to make a RemoteChannel to wrap our Channel so that it is available to the workers.
We use a CachingPool which we clear! at the end, so that we only send fileload_func to each worker once (in case it is a big closure),
and we clear! it to free that memory back up on the workers at the end.

We use a broadcasted remotecall which tasks the place of the loop of @asyncs.
(Could use @spawn but I’ve never liked that macro, it doesn’t seem to give much that remote_call doesn’t, and macros are intrincially harder to reason about than functions (since they could be doing anything))
The remotecalls each return a Future that will be ready, when the remotecall has completed.
So we loop over waiting for them, to make sure we don’t close the channel before they are all done.
The loop of waits takes the place of the @sync.

For example

This is the same example as before; basically.

using JSON, CodecZlib, Glob

distributed_channel_load(glob("*.gz", datadir)) do fn, ch
	for line in eachline(GzipDecompressorStream(open(fn)))
		datum = JSON.parse(line)
		if datum["quality"] > 0.5
			put!(ch, datum)
		end
	end
end

collect(Base.Iterators.take(data, 10_000)) # Put the first 10_000 into a Vector

Thoughts?

I might turn this into a blog post or maybe a package later.

1 Like

I made this into a blog post a while back,
forgot to link it here
https://white.ucc.asn.au/2018/07/14/Asynchronous-and-Distributed-File-Loading.html

4 Likes

Really awesome write up here @oxinabox! Definitely going to help me out a lot.