I’m doing multi-processing in a “cheat way”, with my long running code encapsulated in a Node Puppeteer script. I’m trying to not overload the server, so have at most 12 jobs running simultaneously. I hacked this way together, but given the documentation on awesome tools like Channel I was wondering if there is a better way. Also minor annoyance that my progress bar doesn’t seem to update every time I increment the job counter (might be too fast to update) so it doesn’t show 12 active tasks until a while.
# Define the number of concurrent tasks
max_concurrent_tasks = 12
running_tasks = 0
tasks = []
fail_jobs = 0
p = Progress(nrow(rows))
for row in eachrow(rows)
next!(p; showvalues = [("Case #", row.case_number), ("Borough", row.borough), ("Auction Date", row.auction_date), ("# active tasks", running_tasks), ("# failed", fail_jobs)])
while running_tasks >= max_concurrent_tasks
for (case_number, process) in tasks
if !success(process)
continue
end
fail_jobs += process.exitcode
running_tasks -= 1
filter!(tsk-> tsk[1] != case_number, tasks)
end
sleep(3) # Wait for a slot to be available
end
tsk = run(`node scrapers/notice_of_sale.js $(row.case_number) $(row.borough) $(row.auction_date)`, wait=false)
push!(tasks, (row.case_number, tsk))
running_tasks += 1
end
Here are some ways you could get rid of the explicit polling and waiting, and optionally also of the scheduling itself.
Manual scheduling, communication using a Channel
If you’re okay with doing a bit of Task management yourself, you could use a Channel to just replace the explicit polling and waiting. E.g.
using Random, ProgressMeter, DataFrames
function gen_data(nrows, show=true)
rows = DataFrame(case_number=1:nrows, name=[randstring(5) for _ = 1:nrows])
show && display(rows)
return rows
end
function process_data(rows, max_concurrent_tasks, show_progress_bar=false)
tasks = Task[]
channel = Channel{Tuple{Int, Int}}(max_concurrent_tasks)
failed_jobs = 0
running_tasks = 0
finished_tasks = 0
if show_progress_bar
pb = Progress(nrow(rows))
out_stream = devnull
else
pb = nothing
out_stream = stdout
end
for row in eachrow(rows)
show_progress_bar && next!(pb; showvalues = [("Case #", row.case_number), ("name: ", row.name), ("# active tasks", running_tasks), ("# failed", failed_jobs), ("# finished", finished_tasks)])
task = Task() do
let row = row
p = run(pipeline(`cmd /C echo Start case number $(row.case_number) \& ping -n 6 127.0.0.1 \> nul \& echo Done with $(row.case_number)`, stdout=out_stream), wait=true)
put!(channel, (row.case_number, p.exitcode))
end
end
while running_tasks >= max_concurrent_tasks
# Wait until a previous task finishes
# ("if" instead of "while" should also be fine)
finished_case_number, exitcode = take!(channel)
exitcode == 0 || (failed_jobs += 1; @warn "Processing case #$finished_case_number failed!")
running_tasks -= 1
finished_tasks += 1
end
push!(tasks, task)
schedule(task)
running_tasks += 1
end
wait.(tasks)
running_tasks = 0
finished_tasks = length(tasks) # i.e. nrow(rows)
while !isempty(channel)
finished_case_number, exitcode = take!(channel)
exitcode == 0 || (failed_jobs += 1; @warn "Processing case #$finished_case_number failed!")
end
println("\nNumber of failed jobs: $failed_jobs")
show_progress_bar && finish!(pb)
end
Here I’m just wrapping the runProcess into a Task, which then communicates with the master task via the channel. When we don’t have the capacity to start another job, take!(channel) halts until some previous job has finished, so that !isempty(channel).
(The run command I’m using on Windows just prints some info and waits for 5 seconds, simulating 5 seconds of processing time. On Unix there are probably nicer ways to achieve such a ‘sleep’ of 5 s, but I had to resort to a somewhat hacky approach suggested on StackExchange. These prints do interfere with the progress bar, so either I show a progress bar and suppress the prints, or vice versa.)
Output
The outputs I then get are like
julia> process_data(gen_data(10, false), 4, false)
Start case number 1
Start case number 2
Start case number 3
Start case number 4
Done with 1
Done with 2
Done with 4
Done with 3
Start case number 5
Start case number 6
Start case number 7
Start case number 8
Done with 5
Start case number 9
Done with 6
Done with 7
Start case number 10
Done with 8
Done with 9
Done with 10
Number of failed jobs: 0
to illustrate (to the degree I can using static text) that the progress bar works fine. (Note that, like in your code, the progress bar gets updated when we want to start a job. In the final update (for case #10), then only 5 jobs have finished, 4 are busy, and we still want to schedule one other. You might want to change this behaviour so that 100% is only shown when all jobs have finished.)
Via OhMyThreads
Instead of scheduling everything yourself in the main task, you could alternatively use a package like OhMyThreads.jl to do this for you.
No progress bar
To illustrate that the core scheduling part is quite compact, I will first omit a progress bar.
using OhMyThreads
function process_data_omt(rows, max_concurrent_tasks)
failed_jobs = tmapreduce(+, eachrow(rows); ntasks=max_concurrent_tasks) do row
p = run(`cmd /C echo Start case number $(row.case_number) \& ping -n 6 127.0.0.1 \> nul \& echo Done with $(row.case_number)`)
return p.exitcode == 0 ? 0 : 1 # i.e. Int(p.exitcode != 0)
end
println("\nNumber of failed jobs: $failed_jobs")
end
Output
julia> process_data_omt(gen_data(10, false), 4)
Start case number 1
Start case number 7
Start case number 9
Start case number 4
Done with 9
Done with 7
Done with 4
Done with 1
Start case number 2
Start case number 10
Start case number 5
Start case number 8
Done with 8
Done with 10
Done with 2
Done with 5
Start case number 6
Start case number 3
Done with 6
Done with 3
Number of failed jobs: 0
With progress bar (including showvalues)
As next!(::Progress) should be thread-safe, you could still use a progress bar. To maintain variables such as running_tasks, you could use atomics to update them asynchronously.
using .Threads: Atomic
function process_data_omt_pb(rows, max_concurrent_tasks)
pb = Progress(nrow(rows))
failed_jobs = Atomic{Int}(0)
running_tasks = Atomic{Int}(0)
finished_tasks = Atomic{Int}(0)
tmap(eachrow(rows); ntasks=max_concurrent_tasks) do row
next!(pb; showvalues = [("Case #", row.case_number), ("name", row.name), ("# active tasks", running_tasks[]), ("# failed", failed_jobs[]), ("# finished", finished_tasks[])])
atomic_add!(running_tasks, 1)
p = run(`cmd /C ping -n 6 127.0.0.1 \> nul`)
atomic_add!(running_tasks, -1)
atomic_add!(finished_tasks, 1)
p.exitcode == 0 || (failed_jobs[] += 1; @warn "Processing case #$(row.case_number) failed!")
return nothing
end
println("\nNumber of failed jobs: $(failed_jobs[])")
finish!(pb)
end
I tried messing the manual approach and I couldn’t get it to work. The problem was always that the channel got filled up with 4 tasks, then they all exited successfully very quickly, and then the while loop just gets stuck because there is no more put! coming since there are no more running tasks. It only works in the case that at least 1 out of 4 tasks is guaranteed to take a significant amount of time to complete.
I came up with this solution that worked pretty well.
p = Progress(nrow(rows))
Threads.@threads :dynamic for row in eachrow(rows)
next!(p; showvalues = [("Case #", row.case_number), ("Borough", row.borough), ("Auction Date", row.auction_date)])
try
args = [row.case_number, row.borough, row.auction_date]
`node scrapers/notice_of_sale.js $args`
run(pipeline(`node scrapers/notice_of_sale.js $args`, devnull))
catch e
println("Error downloading filings for $(row.case_number) $(row.borough)")
end
end
I don’t theoretically see why that would be the case, nor can I replicate it. If I remove the ping -n 6 127.0.0.1 sleep in my process_data above, so that the process just prints twice and finishes immediately, I correctly get
julia> process_data(gen_data(10, false), 4, false)
Start case number 1
Start case number 4
Start case number 3
Start case number 2
Done with 1
Done with 4
Done with 3
Done with 2
Start case number 5
Done with 5
Start case number 6
Done with 6
Start case number 7
Done with 7
Start case number 8
Done with 8
Start case number 9
Done with 9
Start case number 10
Done with 10
Number of failed jobs: 0
false
as output. If the runProcesses finish faster than we can handle them in the main task, the execution will mostly just be sequential. Note also that running_tasks is technically not the same as count(istaskstarted.(tasks)) - count(istaskdone.(tasks)).
If you’re happy with this solution, then that’s all that matters of course . Though let me point out that (as far as I know) Threads.@threads will always use Threads.nthreads() and so cannot be controlled to make use of max_concurrent_tasks (especially when this is larger than the number of cores on the machine). If you run Julia with julia -t 1, all your run processes will execute sequentially.
So very weird behavior here. When I copy paste your code into my file, everything runs fine. But when I replace the dummy data rows with my actual DataFrame, it only processes the first max_concurrent_tasks worth of tasks and then halts.
Problem between chair and keyboard. My case numbers are Strings (blame the court system, not me). You statically typed the channel to be {Int, Int}. Weirdly it never gave an error! This is a weird bug, should have been a run time error when trying to put a string into an Int but instead it just hangs forever.