Help refactoring deprecated Task() to Channels


#1

Was reading through this blog post: Machine Learning and Parallel Processing in Julia, Part I and saw a pattern I use a lot: walk through directories read/write files, etc…

I currently use go for that kind of work, want to see the julia way. The example from the blog post works but I’m trying to refactor it to use channels given deprecation warnings like these

WARNING: Task iteration is now deprecated. Use Channels for inter-task communication

However I’m having trouble getting the channel semantics correct (e.g., unable to co-ordinate between a put!() and take!() in the readFile, addData, buildDataSet functions posted below). Are there any blog posts or tutorials that could lead me through this particular case?

I’ve read through a similar discourse post reading-and-processing-data-files-concurrently but it does not address the specific pattern below.

Here is the code I’m working with:

function readFile(path::String)
    for (root, dirs, files) in walkdir(path)
        for filename in files 
            if !(filename in SKIPFILES) && filename[1] != '.'
                fullname = joinpath(root, filename)
                if isfile(fullname)
                    pastHeader, lines = false, Vector{String}()
                    open(fullname) do f 
                        for line in eachline(f)
                            if !isvalid(line)
                                line = decode(convert(Array{UInt8,1}, line), "LATIN1")
                            end
                            line = chomp(line)
                            if pastHeader
                                push!(lines, line)
                            elseif endof(line) == 0
                                pastHeader = true
                            end
                        end
                    end
                    content = join(lines, NEWLINE)
                    produce(fullname,content)
                end
            end
        end
    end
end

function addData!(df::DataFrame, path::String, classification::String)
    for (filename, text) in Task(()->readFile(path))
        push!(df, @data([text, classification, filename]))
    end
end

function buildDataSet(sources)
    df = DataFrame(text = Vector{String}(), class = Vector{String}(), index = Vector{String}())
    for (path, classification) in sources
        addData!(df, joinpath(SPAMROOT, path), classification)
    end
    return df
end

#2

If all you want to do is yield items lazily and consume them in a loop, then this example might help:

julia> function producer()
         Channel() do channel
           for i in 1:10
             println("producing $i")
             put!(channel, i)
           end
         end
       end
producer (generic function with 1 method)

julia> function consumer()
         for item in producer()
           @show item
         end
       end
consumer (generic function with 1 method)

julia> consumer()
producing 1
item = 1
producing 2
item = 2
producing 3
item = 3
producing 4
item = 4
producing 5
item = 5
producing 6
item = 6
producing 7
item = 7
producing 8
item = 8
producing 9
item = 9
producing 10
item = 10

The above uses the Channel(::Function) constructor, which handles all the work of creating and scheduling the producer task for you. From the ?Channel docs:

Channel(func::Function; ctype=Any, csize=0, taskref=nothing)

  Creates a new task from func, binds it to a new channel of type ctype and size csize, and schedules the task, all in a single call.

  func must accept the bound channel as its only argument.

  If you need a reference to the created task, pass a Ref{Task} object via keyword argument taskref.

  Returns a Channel.

  julia> chnl = Channel(c->foreach(i->put!(c,i), 1:4));
  
  julia> typeof(chnl)
  Channel{Any}
  
  julia> for i in chnl
             @show i
         end;
  i = 1
  i = 2
  i = 3
  i = 4

#3

Nice, i somehow missed the Channel docs, i saw the c = Channel(producer) example in Control Flow and couldn’t quite grok it… this makes a lot more sense.

Thank you. headed to meetings, will definitely try this!


#4

Sure enough, simple and works.
I’m gonna work it a bit. In go I’d typically have a goroutine per file or per batch of files. I imagine i’d have to rework it quite a bit if I wanted to run a channel on multiple cpu to large sets of files, per the discourse post here: reading-and-processing-data-files-concurrently

Thank you again.

I did noticed if I put the Channel() do block under scope of the for (root, dirs, files) in walkdir(path) block i get an error.

ERROR: MethodError: no method matching start(::Void)
Closest candidates are:
  start(::SimpleVector) at essentials.jl:258
  start(::Base.MethodList) at reflection.jl:560
  start(::ExponentialBackOff) at error.jl:107

Here is the refactored bit:

function readFile(path::String)
    Channel() do chan
        for (root, dirs, files) in walkdir(path)
            for filename in files 
                if !(filename in SKIPFILES) && filename[1] != '.'
                    fullname = joinpath(root, filename)
                    if isfile(fullname)
                        pastHeader, lines = false, Vector{String}()
                        open(fullname) do f 
                            for line in eachline(f)
                                if !isvalid(line)
                                    line = decode(convert(Array{UInt8,1}, line), "LATIN1")
                                end
                                line = chomp(line)
                                if pastHeader
                                    push!(lines, line)
                                elseif endof(line) == 0
                                    pastHeader = true
                                end
                            end
                        end
                        content = join(lines, NEWLINE)
                        put!(chan, (fullname,content))
                    end
                end
            end
        end
    end
end

function addData!(df::DataFrame, path::String, classification::String)
    for (filename, text) in readFile(path)
        push!(df, @data([text, classification, filename]))
    end
end

function buildDataSet(sources)
    df = DataFrame(text = Vector{String}(), class = Vector{String}(), index = Vector{String}())
    for (path, classification) in sources
        addData!(df, joinpath(SPAMROOT, path), classification)
    end
    return df
end

#5

i don’t have all of the libraries installed to run your code verbatim, but from some preliminary fooling around in the repl i think you can take these steps to refactor to a channel:

  1. modify readFile to take in a channel.
function readFile(path::String, channel::Channel{Tuple{String, String}}) 
# Type of channel should be determined based on the type below. 
# based on how i read the callsite i'm pretty sure it's a tuple of strings, but i could be wrong. 
  1. in readFile replace produce with a call to put! to the channel
                        end
                    end
                    content = join(lines, NEWLINE)
                    put!(channel, (fullname,content))
                end
  1. close the channel at the end of readFile
        end
    end
    close(channel)
end
  1. introduce a channel at the addData! callsite, call readFile against the channel, and then iterate across the channel.
function addData!(df::DataFrame, path::String, classification::String)
    channel = Channel{Tuple{String,String}}(16) # Pick a number, the empty constructor is deprecated.
    readFile(path, channel)
    for (filename, text) in channel
        push!(df, @data([text, classification, filename]))
    end
end

And I think that should do it!


#6

(My last post got held up in the mod queue, you basically did everything i was going to suggest lmao.)

You mentioned:

I did noticed if I put the Channel() do block under scope of the for (root, dirs, files) in walkdir(path) block i get an error.

Julia returns the last expression evaluated in a function. In the first case, when you have your Channel() do chan block above the for loop, the last expression will be the entirety of the do block, so readFile will return your freshly created channel. However, when you move the Channel declaration into the for loop, for (root, dirs, files) in walkdir(path), the last executed expression in the function readFile is the forloop itself, which will return nothing.

you can check this out in the repl

you_get_nothing = for x in 1:10 
x
end
# you'd think that you'd return the last x value once the loop is done...
# but that's not true. you get nothing.
println(you_get_nothing == nothing) #prints true 

What’s happening is that readFile() returns nothing at the for loop…

    for (filename, text) in readFile(path)

which calls a start method to iterate along the result of readFile(path) as part of the iterator protocol that the for loop uses. Since the returned value was nothing, it tries to find a start method which takes in a parameter of type Void and since it can’t find one it gives you a method error! :smile_cat:

One note: Void is changing in 0.7 to Nothing Which is so much easier to remember.


#7

sweet, thanks for reply. When I’m done with meetings I’m gonna try adding channels too, and play with splitting out the loop at the root directory so I can get channels looping over subdirectories. Thanks again, exactly the patterns i need to get running.