Help refactoring deprecated Task() to Channels

Was reading through this blog post: Machine Learning and Parallel Processing in Julia, Part I and saw a pattern I use a lot: walk through directories read/write files, etc…

I currently use go for that kind of work, want to see the julia way. The example from the blog post works but I’m trying to refactor it to use channels given deprecation warnings like these

WARNING: Task iteration is now deprecated. Use Channels for inter-task communication

However I’m having trouble getting the channel semantics correct (e.g., unable to co-ordinate between a put!() and take!() in the readFile, addData, buildDataSet functions posted below). Are there any blog posts or tutorials that could lead me through this particular case?

I’ve read through a similar discourse post reading-and-processing-data-files-concurrently but it does not address the specific pattern below.

Here is the code I’m working with:

function readFile(path::String)
    for (root, dirs, files) in walkdir(path)
        for filename in files 
            if !(filename in SKIPFILES) && filename[1] != '.'
                fullname = joinpath(root, filename)
                if isfile(fullname)
                    pastHeader, lines = false, Vector{String}()
                    open(fullname) do f 
                        for line in eachline(f)
                            if !isvalid(line)
                                line = decode(convert(Array{UInt8,1}, line), "LATIN1")
                            end
                            line = chomp(line)
                            if pastHeader
                                push!(lines, line)
                            elseif endof(line) == 0
                                pastHeader = true
                            end
                        end
                    end
                    content = join(lines, NEWLINE)
                    produce(fullname,content)
                end
            end
        end
    end
end

function addData!(df::DataFrame, path::String, classification::String)
    for (filename, text) in Task(()->readFile(path))
        push!(df, @data([text, classification, filename]))
    end
end

function buildDataSet(sources)
    df = DataFrame(text = Vector{String}(), class = Vector{String}(), index = Vector{String}())
    for (path, classification) in sources
        addData!(df, joinpath(SPAMROOT, path), classification)
    end
    return df
end

If all you want to do is yield items lazily and consume them in a loop, then this example might help:

julia> function producer()
         Channel() do channel
           for i in 1:10
             println("producing $i")
             put!(channel, i)
           end
         end
       end
producer (generic function with 1 method)

julia> function consumer()
         for item in producer()
           @show item
         end
       end
consumer (generic function with 1 method)

julia> consumer()
producing 1
item = 1
producing 2
item = 2
producing 3
item = 3
producing 4
item = 4
producing 5
item = 5
producing 6
item = 6
producing 7
item = 7
producing 8
item = 8
producing 9
item = 9
producing 10
item = 10

The above uses the Channel(::Function) constructor, which handles all the work of creating and scheduling the producer task for you. From the ?Channel docs:

Channel(func::Function; ctype=Any, csize=0, taskref=nothing)

  Creates a new task from func, binds it to a new channel of type ctype and size csize, and schedules the task, all in a single call.

  func must accept the bound channel as its only argument.

  If you need a reference to the created task, pass a Ref{Task} object via keyword argument taskref.

  Returns a Channel.

  julia> chnl = Channel(c->foreach(i->put!(c,i), 1:4));
  
  julia> typeof(chnl)
  Channel{Any}
  
  julia> for i in chnl
             @show i
         end;
  i = 1
  i = 2
  i = 3
  i = 4
2 Likes

Nice, i somehow missed the Channel docs, i saw the c = Channel(producer) example in Control Flow and couldn’t quite grok it… this makes a lot more sense.

Thank you. headed to meetings, will definitely try this!

Sure enough, simple and works.
I’m gonna work it a bit. In go I’d typically have a goroutine per file or per batch of files. I imagine i’d have to rework it quite a bit if I wanted to run a channel on multiple cpu to large sets of files, per the discourse post here: reading-and-processing-data-files-concurrently

Thank you again.

I did noticed if I put the Channel() do block under scope of the for (root, dirs, files) in walkdir(path) block i get an error.

ERROR: MethodError: no method matching start(::Void)
Closest candidates are:
  start(::SimpleVector) at essentials.jl:258
  start(::Base.MethodList) at reflection.jl:560
  start(::ExponentialBackOff) at error.jl:107

Here is the refactored bit:

function readFile(path::String)
    Channel() do chan
        for (root, dirs, files) in walkdir(path)
            for filename in files 
                if !(filename in SKIPFILES) && filename[1] != '.'
                    fullname = joinpath(root, filename)
                    if isfile(fullname)
                        pastHeader, lines = false, Vector{String}()
                        open(fullname) do f 
                            for line in eachline(f)
                                if !isvalid(line)
                                    line = decode(convert(Array{UInt8,1}, line), "LATIN1")
                                end
                                line = chomp(line)
                                if pastHeader
                                    push!(lines, line)
                                elseif endof(line) == 0
                                    pastHeader = true
                                end
                            end
                        end
                        content = join(lines, NEWLINE)
                        put!(chan, (fullname,content))
                    end
                end
            end
        end
    end
end

function addData!(df::DataFrame, path::String, classification::String)
    for (filename, text) in readFile(path)
        push!(df, @data([text, classification, filename]))
    end
end

function buildDataSet(sources)
    df = DataFrame(text = Vector{String}(), class = Vector{String}(), index = Vector{String}())
    for (path, classification) in sources
        addData!(df, joinpath(SPAMROOT, path), classification)
    end
    return df
end

sweet, thanks for reply. When I’m done with meetings I’m gonna try adding channels too, and play with splitting out the loop at the root directory so I can get channels looping over subdirectories. Thanks again, exactly the patterns i need to get running.