Keeping threads busy with 1.3 multiprocessing

I’m filtering 185 large historical monthly csv files and am allocating a thread to process a month. When I make an initial 185 thread requests, the system starts out allocating all available threads (8 on my machine) but then slowly winds down till the end where it’s only using 1 thread. This method take 3.5 hours.

When I limit the number of pending requests to 8, the system is kept busy more often, but there are times when the active thread count drops, then ramps back to 7, the limit I’ve set for Julia. This method takes 2.5 hours, an hour faster.

I am using Ubuntu 18.04 on intel machine.

Here is the code for making bulk 185 thread requests:

println(Dates.format(now(),"HH:MM"))

@sync for filename in readdir(inpath)
    
    Threads.@spawn process_month(filename)
    
end

println(Dates.format(now(),"HH:MM"))

Here is the code limiting the pending requests to 8 threads:

println(Dates.format(now(),"HH:MM"))

NUM_THREADS = 8

threadflags = [0 for x in 1:NUM_THREADS]

@sync for filename in readdir(inpath)
    
    while(isnothing(findfirst(isequal(0),threadflags)))
        sleep(1)
    end
    
    i = findfirst(isequal(0),threadflags)
    
    threadflags[i] = 1
    Threads.@spawn process_month(filename,i)
    
end

println(Dates.format(now(),"HH:MM"))

Any tips for keeping more threads busy more often?

Are you sure you have fully optimized the worker operation? If not, working on the multi-threading will probably be harder to get the same potential benefits.

The worker is simple, so I don’t know how to optimize further.

    df = readtable(f,eltypes = types_filter)
    df = df[findall(in(filterlist),df[!, :symbol]),:]

    append!(df_data,df)

Orthogonal issue (and sorry to take over your thread), but wouldn’t readtable in multiple threads be really slow because of harddrive jumping around to get the data?

Interesting idea. Read and write the data from a single thread. Do the filtering in separate thread.

I am using SSD, instead of rotating disk if that matters.

Still, I wonder why the big difference in performance between these two methods. From watching the 2019 JuliaCon video about the new 1.3 processing, it was clear you can make many requests beyond your physical thread count and the scheduler would keep the machine busy till all thread requests were satisfied. This happens for about the first few minutes, but then degrades. Often only a single thread is operating on the data.

For an SSD, my guess is that multiprocessed reads would be better, but benchmarking is always smart.

Did you measure time it took to run each process_month? Is it possible that there are a few files that is much larger than others?

Aside: Is df_data in append!(df_data,df) shared across threads? If so, I think your code as-is would corrupt the data.

Aside^2: I think the best strategy for using threads is to use map/reduce/filter framework. For example, using Transducers.reduce, your code is simply something like reduce(append!!, Map(process_month′), readdir(inpath); init=Empty(DataFrame), basesize=1) provided that process_month′ is a function that returns a DataFrame. Demo:

julia> using Transducers, BangBang, DataFrames

julia> reduce(append!!, Map(x -> DataFrame(a=[x])), 1:2; init=Empty(DataFrame), basesize=1)
2×1 DataFrame
│ Row │ a     │
│     │ Int64 │
├─────┼───────┤
│ 1   │ 1     │
│ 2   │ 2     │
4 Likes

I’ve been doing more experiments in trying to keep more threads running.

What is working for me is to not give up threads until all work is completed. In the previous attempts described above, I would give up a thread after a month was processed, then request a thread for the next month. This results in inconsistent thread count ranging from the max to a single thread.

The way that works is to start a thread that requests months to process from a channel. I’ll start 6 such threads, leaving 2 for the system. Any more than that and I start getting inconsistent thread behavior.

So I believe the scheduler is not working as advertised. Multiprocessing in 1.3 is experimental, so I’ll revisit this when there are improvements.

If you think this is a bug, would you consider opening an issue?

As tkf said, mutating a variable concurrently from multiple threads is problematic, you should protect your tf_data with some sort of lock to avoid intermixing. (Sorry if you’re already doing that.)

There is no corruption issue here. This thread is about inconsistent thread scheduling.