Was reading through this blog post: Machine Learning and Parallel Processing in Julia, Part I and saw a pattern I use a lot: walk through directories read/write files, etc…
I currently use go for that kind of work, want to see the julia way. The example from the blog post works but I’m trying to refactor it to use channels given deprecation warnings like these
WARNING: Task iteration is now deprecated. Use Channels for inter-task communication
However I’m having trouble getting the channel semantics correct (e.g., unable to co-ordinate between a put!()
and take!()
in the readFile
, addData
, buildDataSet
functions posted below). Are there any blog posts or tutorials that could lead me through this particular case?
I’ve read through a similar discourse post reading-and-processing-data-files-concurrently but it does not address the specific pattern below.
Here is the code I’m working with:
function readFile(path::String)
for (root, dirs, files) in walkdir(path)
for filename in files
if !(filename in SKIPFILES) && filename[1] != '.'
fullname = joinpath(root, filename)
if isfile(fullname)
pastHeader, lines = false, Vector{String}()
open(fullname) do f
for line in eachline(f)
if !isvalid(line)
line = decode(convert(Array{UInt8,1}, line), "LATIN1")
end
line = chomp(line)
if pastHeader
push!(lines, line)
elseif endof(line) == 0
pastHeader = true
end
end
end
content = join(lines, NEWLINE)
produce(fullname,content)
end
end
end
end
end
function addData!(df::DataFrame, path::String, classification::String)
for (filename, text) in Task(()->readFile(path))
push!(df, @data([text, classification, filename]))
end
end
function buildDataSet(sources)
df = DataFrame(text = Vector{String}(), class = Vector{String}(), index = Vector{String}())
for (path, classification) in sources
addData!(df, joinpath(SPAMROOT, path), classification)
end
return df
end