Multithreaded CSV writes

Hi! I am trying to write many data frames to CSV format in parallel. Interestingly, the multi-threaded version is slower than doing it sequentially, and it comes with a very high GC cost. Any idea what’s happening? What would be the best way to optimize writing these files?

Here’s my MWE:

# PoC - performance of multi-threaded write vs. sequential
using DataFrames, Dates, Random

# Make copies of a data frame

function make_df(rows, cols)
    df = DataFrame(date = repeat([Date(now())], rows), name = [randstring(32) for i in 1:rows])
    insertcols!(df, ["x$i" => rand(rows) for i in 1:cols]...)
    return df
end

output_path(i) = joinpath(mkdir(tempname()), "df$i.csv")

# Set up test data
df = make_df(100_000, 50)
dfs = [df for i in 1:30];

# ============== Serial writes ============

function write_serial(dfs)
    for i in 1:length(dfs)
        CSV.write(output_path(i), dfs[i])
    end
end

@time write_serial(dfs) 
@time write_serial(dfs)
@time write_serial(dfs)

#=
julia> @time write_serial(dfs)
 42.513265 seconds (806.20 M allocations: 20.267 GiB, 6.13% gc time)

julia> @time write_serial(dfs)
 49.565514 seconds (806.19 M allocations: 20.266 GiB, 5.28% gc time)

julia> @time write_serial(dfs)
 48.972818 seconds (806.19 M allocations: 20.266 GiB, 5.48% gc time)
=#

# ============== Multi-threaded writes ============

using CSV
using Base.Threads

@show Threads.nthreads()

function write_multithreaded(dfs)
    Threads.@threads for i in 1:length(dfs)
        CSV.write(output_path(i), dfs[i])
    end
end

@time write_multithreaded(dfs)
@time write_multithreaded(dfs)
@time write_multithreaded(dfs)

#=
julia> @time write_multithreaded(dfs)
 66.760075 seconds (806.25 M allocations: 20.270 GiB, 84.84% gc time)

julia> @time write_multithreaded(dfs)
 65.836329 seconds (806.19 M allocations: 20.266 GiB, 86.49% gc time)

julia> @time write_multithreaded(dfs)
 66.630212 seconds (806.19 M allocations: 20.266 GiB, 88.42% gc time)
=#

Julia version:

julia> versioninfo()
Julia Version 1.5.1
Commit 697e782ab8 (2020-08-25 20:08 UTC)
Platform Info:
  OS: Linux (x86_64-pc-linux-gnu)
  CPU: Intel(R) Xeon(R) CPU E5-2699 v4 @ 2.20GHz
  WORD_SIZE: 64
  LIBM: libopenlibm
  LLVM: libLLVM-9.0.1 (ORCJIT, broadwell)

Package versions:

  [336ed68f] CSV v0.7.7
  [a93c6f00] DataFrames v0.21.7

I/O is blocking shared between threads. You need multi-process if you want to parallel I/O

Why would they block? Every thread writes to a different file… :thinking:

I would try async instead of multithreaded:

function write_async(dfs)
    @sync for i in 1:length(dfs)
        @async CSV.write(output_path(i), dfs[i])
    end
end
1 Like

Well, it gives me the same performance as doing it sequentially. I want to write these files as quickly as I could…

julia> @time write_async(dfs)
 51.096555 seconds (806.22 M allocations: 20.268 GiB, 6.99% gc time)

julia> @time write_async(dfs)
 50.875884 seconds (806.19 M allocations: 20.266 GiB, 6.40% gc time)

julia> @time write_async(dfs)
 51.142538 seconds (806.19 M allocations: 20.266 GiB, 6.36% gc time)

it’s not a Julia thing, it’s just how multi-threading works in general. The threads share the same process, thus the I/O device is blocking shared between each of them.

2 Likes

When you mix I/O and compute like this, I think it might be beneficial to have more parallel tasks than the number of CPU cores. If the number of data frames is not huge, just replacing @async of @ericphanson’s code with Threads.@spawn could be enough.

If the number of data frames is much larger than the number of cores, I think you’d need threaded loop with load-balancing. For example, you can use GitHub - JuliaFolds/FLoops.jl: Fast sequential, threaded, and distributed for-loops for Julia—fold for humans™ with basesize = length(dfs) ÷ (8 * Threads.nthreads()) (8 is just a guess; you’d need to find some good number larger than 1).

I/O in Julia yields (switches tasks) so I’m not sure what the worry is?

(Julia is using libuv which provides high-performance non-blocking I/O. So I think there is a high chance that Julia’s task system is very good at this kind of operations. Though reading the internal a bit, I’m not sure if Julia uses libuv for file system I/O.)

1 Like

sorry, blocking was a misnomer. I meant to say that there’s only 1 I/O “head” per process. So threads in a multi-thread parallelism, I believe, share the I/O head (from OS scheduler perspective if you wanna call it).

1 Like

It may help for me to provide a little more context.

My real use case involves computing many data frames (some of them are quite large) and I can use multi-threading to parallelize the compute process. But, I still need to persist the results and so I am mixing I/O in some way.

The challenge is that I don’t want to use multi-processing for the compute process either because there are large non-numeric reference data that cannot be shared easily across processes or else I would have to take the hit of passing the data via IPC.

Use a different file format? I wouldn’t use CSV in performance-critical code.

3 Likes

I am considering Parquet.jl but it doesn’t support writing Date columns yet. On the bright side, I can use the same multi-threading loop for writing the same data frames (without the Date column) and it would be 10x faster and without any GC problem.

Unfortunately, I don’t have too many choices as the files will be consumed by some other downstream processes.

Hmm… I guess I still don’t get the concern. You can do multiple I/O operations concurrently even without muli-threading (see Python’s async/await and JavaScript) so it is already beneficial to use multiple tasks. In addition to this, if you need to interleave computations, using multi-threading makes sense. That’s why tings like Go and Tokio (as well as Julia) have M:N scheduler.

I was just thinking if a function f() uses 10% of total disk-io, multi-threading that function 10x still only uses 10%, but if you multi-process it 10x you can potentially use 100% of disk-io band width. maybe I’m misunderstanding this

I think the main benefit comes in a use-case like this comes from interleaving I/O-intensive works and CPU-intensive work. Julia’s worker threads can (in principle) do some CPU-intensive work without blocked by I/O. So, you can do some computation (e.g., formatting floating point numbers) while the OS is talking to the hardware (or some kind of in-memory cache subsystem abstracting out the physical disks).

Just as a future reference (for julia ≥ 1.6), Threads.foreach with ntasks > Threads.nthreads() is also a good API for this: Multi-Threading · The Julia Language

It’s still a simple function so I guess using it by copying it into your code base is a good option.

i write in parallel in JDF

1 Like

Interesting. I just looked further into the Julia source code for writing to files. It appears to be using libuv already but since it does not pass any callback function it is effectively doing synchronous I/O.

According to Filesystem - libuv documentation

All filesystem functions have two forms - synchronous and asynchronous .

The synchronous forms automatically get called (and block ) if the callback is null. The return value of functions is a libuv error code. This is usually only useful for synchronous calls. The asynchronous form is called when a callback is passed and the return value is 0.

Please correct me if I’m wrong.

I’m unsure if that’s the case. CSV.jl utilizes multi-threading to speed up reading the same file. Is this “I/O head” a bottleneck for writes only?

Another thing is that when I replace the CSV.write call with Feather.write, I get a 3x speed-up with 24 threads and it had very little GC (6%). If there’s only a single “I/O head”, then why would I get any speed-up?

My main question lies with the excessive GC with CSV.write which only happens when I run it with multiple threads. I have now tested with a semaphore-enabled version for controlling number of threads. You can see GC picks up very quickly with just a few threads:

julia> @time write_multithreaded_sem(dfs, 2)   # 2 threads
 41.316944 seconds (833.75 M allocations: 21.591 GiB, 28.37% gc time)

julia> @time write_multithreaded_sem(dfs, 5)   # 5 threads
 32.083467 seconds (833.19 M allocations: 21.563 GiB, 55.97% gc time)

julia> @time write_multithreaded_sem(dfs, 10)  # 10 threads
 42.505263 seconds (833.19 M allocations: 21.563 GiB, 72.00% gc time)

Yeah, that’s one thing. There’s also ios_* functions that wrap non-libuv functions https://github.com/JuliaLang/julia/blob/master/src/support/ios.c