Managing concurrent running shell tasks without a manual counter

I’m doing multi-processing in a “cheat way”, with my long running code encapsulated in a Node Puppeteer script. I’m trying to not overload the server, so have at most 12 jobs running simultaneously. I hacked this way together, but given the documentation on awesome tools like Channel I was wondering if there is a better way. Also minor annoyance that my progress bar doesn’t seem to update every time I increment the job counter (might be too fast to update) so it doesn’t show 12 active tasks until a while.

# Define the number of concurrent tasks
    max_concurrent_tasks = 12
    running_tasks = 0
    tasks = []
    fail_jobs = 0
    
    p = Progress(nrow(rows))
    for row in eachrow(rows)
        next!(p; showvalues = [("Case #", row.case_number), ("Borough", row.borough), ("Auction Date", row.auction_date), ("# active tasks", running_tasks), ("# failed", fail_jobs)])
        while running_tasks >= max_concurrent_tasks
            for (case_number, process) in tasks
                if !success(process)
                    continue
                end
                fail_jobs += process.exitcode
                running_tasks -= 1
                filter!(tsk-> tsk[1] != case_number, tasks)
            end
            sleep(3) # Wait for a slot to be available
        end

        
        tsk = run(`node scrapers/notice_of_sale.js $(row.case_number) $(row.borough) $(row.auction_date)`, wait=false)        
        push!(tasks, (row.case_number, tsk))
        running_tasks += 1
        
    end

Here are some ways you could get rid of the explicit polling and waiting, and optionally also of the scheduling itself.

Manual scheduling, communication using a Channel

If you’re okay with doing a bit of Task management yourself, you could use a Channel to just replace the explicit polling and waiting. E.g.

using Random, ProgressMeter, DataFrames

function gen_data(nrows, show=true)
	rows = DataFrame(case_number=1:nrows, name=[randstring(5) for _ = 1:nrows])
	show && display(rows)
	return rows
end

function process_data(rows, max_concurrent_tasks, show_progress_bar=false)
	tasks = Task[]
	channel = Channel{Tuple{Int, Int}}(max_concurrent_tasks)
	failed_jobs = 0
	running_tasks = 0
	finished_tasks = 0

	if show_progress_bar
		pb = Progress(nrow(rows))
		out_stream = devnull
	else
		pb = nothing
		out_stream = stdout
	end

	for row in eachrow(rows)
		show_progress_bar && next!(pb; showvalues = [("Case #", row.case_number), ("name: ", row.name), ("# active tasks", running_tasks), ("# failed", failed_jobs), ("# finished", finished_tasks)])
        
		task = Task() do 
			let row = row
				p = run(pipeline(`cmd /C echo Start case number $(row.case_number) \& ping -n 6 127.0.0.1 \> nul \& echo Done with $(row.case_number)`, stdout=out_stream), wait=true)
				put!(channel, (row.case_number, p.exitcode))
			end
		end
		while running_tasks >= max_concurrent_tasks
			# Wait until a previous task finishes
			# ("if" instead of "while" should also be fine)
			finished_case_number, exitcode = take!(channel)
			exitcode == 0 || (failed_jobs += 1; @warn "Processing case #$finished_case_number failed!")
			running_tasks -= 1
			finished_tasks += 1
		end
		
		push!(tasks, task)
		schedule(task)
		running_tasks += 1
	end

	wait.(tasks)
	running_tasks = 0
	finished_tasks = length(tasks)  # i.e. nrow(rows)

	while !isempty(channel)
		finished_case_number, exitcode = take!(channel)
		exitcode == 0 || (failed_jobs += 1; @warn "Processing case #$finished_case_number failed!")
	end
	println("\nNumber of failed jobs: $failed_jobs")

	show_progress_bar && finish!(pb)
end

Here I’m just wrapping the run Process into a Task, which then communicates with the master task via the channel. When we don’t have the capacity to start another job, take!(channel) halts until some previous job has finished, so that !isempty(channel).

(The run command I’m using on Windows just prints some info and waits for 5 seconds, simulating 5 seconds of processing time. On Unix there are probably nicer ways to achieve such a ‘sleep’ of 5 s, but I had to resort to a somewhat hacky approach suggested on StackExchange. These prints do interfere with the progress bar, so either I show a progress bar and suppress the prints, or vice versa.)

Output

The outputs I then get are like

julia> process_data(gen_data(10, false), 4, false)
Start case number 1
Start case number 2
Start case number 3
Start case number 4
Done with 1
Done with 2
Done with 4
Done with 3
Start case number 5
Start case number 6
Start case number 7
Start case number 8
Done with 5
Start case number 9
Done with 6
Done with 7
Start case number 10
Done with 8
Done with 9
Done with 10

Number of failed jobs: 0
julia> process_data(gen_data(10, true), 4, true)
10×2 DataFrame
 Row │ case_number  name
     │ Int64        String
─────┼─────────────────────
   1 │           1  hpfGF
   2 │           2  GddR3
   3 │           3  Usbvx
   4 │           4  4NUnn
   5 │           5  V32Nt
   6 │           6  i5KDS
   7 │           7  ARb7w
   8 │           8  EY3aY
   9 │           9  WSreI
  10 │          10  m7duo
Progress: 100%|█████████████████████████████████████████████████████████████████████████████████| Time: 0:00:10
  Case #:          10
  name:            m7duo
  # active tasks:  4
  # failed:        0
  # finished:      5

Number of failed jobs: 0

to illustrate (to the degree I can using static text) that the progress bar works fine. (Note that, like in your code, the progress bar gets updated when we want to start a job. In the final update (for case #10), then only 5 jobs have finished, 4 are busy, and we still want to schedule one other. You might want to change this behaviour so that 100% is only shown when all jobs have finished.)

Via OhMyThreads

Instead of scheduling everything yourself in the main task, you could alternatively use a package like OhMyThreads.jl to do this for you.

No progress bar

To illustrate that the core scheduling part is quite compact, I will first omit a progress bar.

using OhMyThreads
function process_data_omt(rows, max_concurrent_tasks)
	failed_jobs = tmapreduce(+, eachrow(rows); ntasks=max_concurrent_tasks) do row
		p = run(`cmd /C echo Start case number $(row.case_number) \& ping -n 6 127.0.0.1 \> nul \& echo Done with $(row.case_number)`)
		return p.exitcode == 0 ? 0 : 1  # i.e. Int(p.exitcode != 0)
	end
	println("\nNumber of failed jobs: $failed_jobs")
end
Output
julia> process_data_omt(gen_data(10, false), 4)
Start case number 1
Start case number 7
Start case number 9
Start case number 4
Done with 9
Done with 7
Done with 4
Done with 1
Start case number 2
Start case number 10
Start case number 5
Start case number 8
Done with 8
Done with 10
Done with 2
Done with 5
Start case number 6
Start case number 3
Done with 6
Done with 3

Number of failed jobs: 0
With progress bar (including showvalues)

As next!(::Progress) should be thread-safe, you could still use a progress bar. To maintain variables such as running_tasks, you could use atomics to update them asynchronously.

using .Threads: Atomic

function process_data_omt_pb(rows, max_concurrent_tasks)
	pb = Progress(nrow(rows))
	
	failed_jobs = Atomic{Int}(0)
	running_tasks = Atomic{Int}(0)
	finished_tasks = Atomic{Int}(0)
	
	tmap(eachrow(rows); ntasks=max_concurrent_tasks) do row
		next!(pb; showvalues = [("Case #", row.case_number), ("name", row.name), ("# active tasks", running_tasks[]), ("# failed", failed_jobs[]), ("# finished", finished_tasks[])])
		
		atomic_add!(running_tasks, 1)
		p = run(`cmd /C ping -n 6 127.0.0.1 \> nul`)
		atomic_add!(running_tasks, -1)
		atomic_add!(finished_tasks, 1)
		p.exitcode == 0 || (failed_jobs[] += 1; @warn "Processing case #$(row.case_number) failed!")
		
		return nothing
	end
	println("\nNumber of failed jobs: $(failed_jobs[])")
	
	finish!(pb)
end
Output
julia> process_data_omt_pb(gen_data(10, true), 4)
10×2 DataFrame
 Row │ case_number  name
     │ Int64        String
─────┼─────────────────────
   1 │           1  N2x3n
   2 │           2  pUj49
   3 │           3  C7WgA
   4 │           4  g2u95
   5 │           5  1jyBo
   6 │           6  UR5L2
   7 │           7  pQAmC
   8 │           8  fihqN
   9 │           9  Kkf1g
  10 │          10  mGgpT
Progress: 100%|█████████████████████████████████████████████████████████████████████████████████| Time: 0:00:10
  Case #:          3
  name:            C7WgA
  # active tasks:  1
  # failed:        0
  # finished:      7

Number of failed jobs: 0

I tried messing the manual approach and I couldn’t get it to work. The problem was always that the channel got filled up with 4 tasks, then they all exited successfully very quickly, and then the while loop just gets stuck because there is no more put! coming since there are no more running tasks. It only works in the case that at least 1 out of 4 tasks is guaranteed to take a significant amount of time to complete.

I came up with this solution that worked pretty well.

p = Progress(nrow(rows))
    Threads.@threads :dynamic for row in eachrow(rows)
            next!(p; showvalues = [("Case #", row.case_number), ("Borough", row.borough), ("Auction Date", row.auction_date)])
            try
                args = [row.case_number, row.borough, row.auction_date]
                `node scrapers/notice_of_sale.js $args`
                run(pipeline(`node scrapers/notice_of_sale.js $args`, devnull))
            catch e
                println("Error downloading filings for $(row.case_number) $(row.borough)")
        end
    end

I don’t theoretically see why that would be the case, nor can I replicate it. If I remove the ping -n 6 127.0.0.1 sleep in my process_data above, so that the process just prints twice and finishes immediately, I correctly get

julia> process_data(gen_data(10, false), 4, false)
Start case number 1
Start case number 4
Start case number 3
Start case number 2
Done with 1
Done with 4
Done with 3
Done with 2
Start case number 5
Done with 5
Start case number 6
Done with 6
Start case number 7
Done with 7
Start case number 8
Done with 8
Start case number 9
Done with 9
Start case number 10
Done with 10

Number of failed jobs: 0
false

as output. If the run Processes finish faster than we can handle them in the main task, the execution will mostly just be sequential. Note also that running_tasks is technically not the same as count(istaskstarted.(tasks)) - count(istaskdone.(tasks)).

If you’re happy with this solution, then that’s all that matters of course :slight_smile: . Though let me point out that (as far as I know) Threads.@threads will always use Threads.nthreads() and so cannot be controlled to make use of max_concurrent_tasks (especially when this is larger than the number of cores on the machine). If you run Julia with julia -t 1, all your run processes will execute sequentially.

So very weird behavior here. When I copy paste your code into my file, everything runs fine. But when I replace the dummy data rows with my actual DataFrame, it only processes the first max_concurrent_tasks worth of tasks and then halts.

Problem between chair and keyboard. My case numbers are Strings (blame the court system, not me). You statically typed the channel to be {Int, Int}. Weirdly it never gave an error! This is a weird bug, should have been a run time error when trying to put a string into an Int but instead it just hangs forever.

1 Like