CSV in Parallel Error

I have a very large (160 Gb) csv file. I had the idea to do some processing of the file in parallel, but I’m stuck on an error in CSV.jl. What I wanted to do was use CSV.Rows on non-contiguous chunks of the file across different local processes, do my task, the combine the results.

I find that I am unable to move CSV.Rows objects across processes - this error is Example 1. I also find that I get an error if I try to read parts of the file using CSV.File and move those pieces - this error is Example 2.

Example 1:


using Distributed, Tables, CSV 
addprocs(4)

@everywhere using Tables, CSV 

    # create CSV.Rows for non-overlapping pieces of 10_000 lines each
d1 = CSV.Rows("large_file.tsv"; header = 1, limit = 10_000);
d2 = CSV.Rows("large_file.tsv"; header = 1, skipto = 10_001, limit = 10_000);

c2 = RemoteChannel( ()->Channel{CSV.Rows}(10), 2)
put!(c2, d2)

   # Fails with the following message:

ERROR: IOError: write: broken pipe (EPIPE)
Stacktrace:Worker 2 terminated.

 [1] try_yieldto(::typeof(Base.ensure_rescheduled), ::Base.RefValue{Task}) at ./event.jl:196
 [2] wait() at ./event.jl:255
 [3] uv_write(::Sockets.TCPSocket, ::Ptr{UInt8}, ::UInt64) at ./stream.jl:853
 [4] unsafe_write(::Sockets.TCPSocket, ::Ptr{UInt8}, ::UInt64) at ./stream.jl:911
 [5] unsafe_write at ./io.jl:509 [inlined]
 [6] macro expansion at ./gcutils.jl:87 [inlined]
 [7] write at ./io.jl:532 [inlined]
 [8] serialize_array_data at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Serialization/src/Serialization.jl:246 [inlined]
 [9] serialize(::Distributed.ClusterSerializer{Sockets.TCPSocket}, ::Array{UInt8,1}) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Serialization/src/Serialization.jl:263
 [10] serialize_any(::Distributed.ClusterSerializer{Sockets.TCPSocket}, ::Any) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Serialization/src/Serialization.jl:638
 [11] serialize at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Serialization/src/Serialization.jl:617 [inlined]
 [12] serialize(::Distributed.ClusterSerializer{Sockets.TCPSocket}, ::Tuple{Distributed.RRID,CSV.Rows{false,false,false,Parsers.Options{false,true,false,Missing,UInt8,Nothing}}}) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Serialization/src/Serialization.jl:196
 [13] serialize_msg(::Distributed.ClusterSerializer{Sockets.TCPSocket}, ::Distributed.CallMsg{:call_fetch}) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/messages.jl:90
 [14] #invokelatest#1 at ./essentials.jl:742 [inlined]
 [15] invokelatest at ./essentials.jl:741 [inlined]
 [16] send_msg_(::Distributed.Worker, ::Distributed.MsgHeader, ::Distributed.CallMsg{:call_fetch}, ::Bool) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/messages.jl:185
 [17] send_msg at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/messages.jl:134 [inlined]
 [18] #remotecall_fetch#149(::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}, ::Function, ::Function, ::Distributed.Worker, ::Distributed.RRID, ::Vararg{Any,N} where N) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:374
 [19] remotecall_fetch(::Function, ::Distributed.Worker, ::Distributed.RRID, ::Vararg{Any,N} where N) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:371
 [20] #remotecall_fetch#152(::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}, ::Function, ::Function, ::Int64, ::Distributed.RRID, ::Vararg{Any,N} where N) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:406
 [21] remotecall_fetch at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:406 [inlined]
 [22] call_on_owner at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:479 [inlined]
 [23] put!(::RemoteChannel{Channel{CSV.Rows}}, ::CSV.Rows{false,false,false,Parsers.Options{false,true,false,Missing,UInt8,Nothing}}) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:572
 [24] top-level scope at none:0

This also fails if I try to move CSV.File objects instead - Example 2:


using Distributed, Tables, CSV

addprocs(4)
4-element Array{Int64,1}:
 2
 3
 4
 5

@everywhere using Tables, CSV

d1 = CSV.File("large_file.tsv"; header = 1, limit = 10_000)

c2 = RemoteChannel( ()-> Channel{CSV.File}(10), 2)

put!(c2, d1)
ERROR: IOError: write: broken pipe (EPIPE)
Stacktrace:Worker 2 terminated.

 [1] try_yieldto(::typeof(Base.ensure_rescheduled), ::Base.RefValue{Task}) at ./event.jl:196
 [2] wait() at ./event.jl:255
 [3] uv_write(::Sockets.TCPSocket, ::Ptr{UInt8}, ::UInt64) at ./stream.jl:853
 [4] unsafe_write(::Sockets.TCPSocket, ::Ptr{UInt8}, ::UInt64) at ./stream.jl:911
 [5] unsafe_write at ./io.jl:509 [inlined]
 [6] macro expansion at ./gcutils.jl:87 [inlined]
 [7] write at ./io.jl:532 [inlined]
 [8] serialize_array_data at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Serialization/src/Serialization.jl:246 [inlined]
 [9] serialize(::Distributed.ClusterSerializer{Sockets.TCPSocket}, ::Array{UInt8,1}) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Serialization/src/Serialization.jl:263
 [10] serialize_any(::Distributed.ClusterSerializer{Sockets.TCPSocket}, ::Any) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Serialization/src/Serialization.jl:638
 [11] serialize at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Serialization/src/Serialization.jl:617 [inlined]
 [12] serialize(::Distributed.ClusterSerializer{Sockets.TCPSocket}, ::Tuple{Distributed.RRID,CSV.File}) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Serialization/src/Serialization.jl:196
 [13] serialize_msg(::Distributed.ClusterSerializer{Sockets.TCPSocket}, ::Distributed.CallMsg{:call_fetch}) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/messages.jl:90
 [14] #invokelatest#1 at ./essentials.jl:742 [inlined]
 [15] invokelatest at ./essentials.jl:741 [inlined]
 [16] send_msg_(::Distributed.Worker, ::Distributed.MsgHeader, ::Distributed.CallMsg{:call_fetch}, ::Bool) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/messages.jl:185
 [17] send_msg at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/messages.jl:134 [inlined]
 [18] #remotecall_fetch#149(::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}, ::Function, ::Function, ::Distributed.Worker, ::Distributed.RRID, ::Vararg{Any,N} where N) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:374
 [19] remotecall_fetch(::Function, ::Distributed.Worker, ::Distributed.RRID, ::Vararg{Any,N} where N) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:371
 [20] #remotecall_fetch#152(::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}, ::Function, ::Function, ::Int64, ::Distributed.RRID, ::Vararg{Any,N} where N) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:406
 [21] remotecall_fetch at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:406 [inlined]
 [22] call_on_owner at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:479 [inlined]
 [23] put!(::RemoteChannel{Channel{CSV.File}}, ::CSV.File) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:572
 [24] top-level scope at none:0

I appreciate any advice! Julia VERSION is v"1.1.1". CSV version is v0.5.11

I believe the author of CSV.jl is working on this now that the new threading stuff is available on 1.3, so it will probably be possible eventually (as an option in the package).

In the meantime, one thing you could try is to split the file up and use JuliaDB which I think can read files in parallel. They have some examples in their docs.

No suggestions for getting your custom solution working

Can you share what OS you’re using? I don’t see any broken pipe errors on OSX. I’m running v"1.3.0-alpha.0" as well, but I’m not sure that should make any difference vs 1.1.1.

One thing you could try is mmapping the file yourself and pass that directly to CSV.jl, like:

using Mmap
filebuf = Mmap.mmap("large_file.tsv")
d1 = CSV.Rows(filebuf; header=1, limit=10000)
d2 = CSV.Rows(filebuf; header=1, skipto=10001, limit=10000)

Mac OS Mojave 10.14.6

Mmap leads to another error:


using Distributed 

addprocs(4)

@everywhere using Tables, CSV, Mmap 

filebuf = Mmap.mmap("big_file.tsv")

d1 = CSV.Rows(filebuf; header = 1, limit = 10_000);
d2 = CSV.Rows(filebuf; header = 1, skipto = 10_001, limit = 10_000);

c2 = RemoteChannel( ()-> Channel{CSV.Rows}(10), 2)

put!(c2, d2)

ERROR: IOError: write: broken pipe (EPIPE)
Stacktrace:Worker 2 terminated.

 [1] try_yieldto(::typeof(Base.ensure_rescheduled), ::Base.RefValue{Task}) at ./event.jl:196
 [2] wait() at ./event.jl:255
 [3] uv_write(::Sockets.TCPSocket, ::Ptr{UInt8}, ::UInt64) at ./stream.jl:853
 [4] unsafe_write(::Sockets.TCPSocket, ::Ptr{UInt8}, ::UInt64) at ./stream.jl:911
 [5] unsafe_write at ./io.jl:509 [inlined]
 [6] macro expansion at ./gcutils.jl:87 [inlined]
 [7] write at ./io.jl:532 [inlined]
 [8] serialize_array_data at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Serialization/src/Serialization.jl:246 [inlined]
 [9] serialize(::Distributed.ClusterSerializer{Sockets.TCPSocket}, ::Array{UInt8,1}) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Serialization/src/Serialization.jl:263
 [10] serialize_any(::Distributed.ClusterSerializer{Sockets.TCPSocket}, ::Any) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Serialization/src/Serialization.jl:638
 [11] serialize at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Serialization/src/Serialization.jl:617 [inlined]
 [12] serialize(::Distributed.ClusterSerializer{Sockets.TCPSocket}, ::Tuple{Distributed.RRID,CSV.Rows{false,false,false,Parsers.Options{false,true,false,Missing,UInt8,Nothing}}}) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Serialization/src/Serialization.jl:196
 [13] serialize_msg(::Distributed.ClusterSerializer{Sockets.TCPSocket}, ::Distributed.CallMsg{:call_fetch}) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/messages.jl:90
 [14] #invokelatest#1 at ./essentials.jl:742 [inlined]
 [15] invokelatest at ./essentials.jl:741 [inlined]
 [16] send_msg_(::Distributed.Worker, ::Distributed.MsgHeader, ::Distributed.CallMsg{:call_fetch}, ::Bool) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/messages.jl:185
 [17] send_msg at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/messages.jl:134 [inlined]
 [18] #remotecall_fetch#149(::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}, ::Function, ::Function, ::Distributed.Worker, ::Distributed.RRID, ::Vararg{Any,N} where N) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:374
 [19] remotecall_fetch(::Function, ::Distributed.Worker, ::Distributed.RRID, ::Vararg{Any,N} where N) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:371
 [20] #remotecall_fetch#152(::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}, ::Function, ::Function, ::Int64, ::Distributed.RRID, ::Vararg{Any,N} where N) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:406
 [21] remotecall_fetch at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:406 [inlined]
 [22] call_on_owner at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:479 [inlined]
 [23] put!(::RemoteChannel{Channel{CSV.Rows}}, ::CSV.Rows{false,false,false,Parsers.Options{false,true,false,Missing,UInt8,Nothing}}) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:572
 [24] top-level scope at none:0

I’ll check if I get the same error on v"1.3.0-alpha" and update.

Different error on v"1.3.0-alpha.0".


using Distributed

addprocs(4)
4-element Array{Int64,1}:
 2
 3
 4
 5

@everywhere using Tables, CSV, Mmap

filebuf = Mmap.mmap("big_file.tsv")

d1 = CSV.Rows(filebuf; header = 1, limit = 10_000);
d2 = CSV.Rows(filebuf; header = 1, skipto = 10_001, limit = 10_000);

c2 = RemoteChannel( ()-> Channel{CSV.Rows}(10), 2)

put!(c2, d2)

ERROR: IOError: write: invalid argument (EINVAL)
Stacktrace:
 [1] uv_write(::Sockets.TCPSocket, ::Ptr{UInt8}, ::UInt64) at ./stream.jl:948
 [2] unsafe_write(::Sockets.TCPSocket, ::Ptr{UInt8}, ::UInt64) at ./stream.jl:1002
 [3] unsafe_write at ./io.jl:593 [inlined]
 [4] macro expansion at ./gcutils.jl:91 [inlined]
 [5] write at ./io.jl:616 [inlined]
 [6] serialize_array_data at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.3/Serialization/src/Serialization.jl:246 [inlined]
 [7] serialize(::Distributed.ClusterSerializer{Sockets.TCPSocket}, ::Array{UInt8,1}) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.3/Serialization/src/Serialization.jl:263
 [8] serialize_any(::Distributed.ClusterSerializer{Sockets.TCPSocket}, ::Any) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.3/Serialization/src/Serialization.jl:629
 [9] serialize at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.3/Serialization/src/Serialization.jl:608 [inlined]
 [10] serialize(::Distributed.ClusterSerializer{Sockets.TCPSocket}, ::Tuple{Distributed.RRID,Int64,CSV.Rows{false,false,false,Parsers.Options{false,true,false,Missing,UInt8,Nothing}}}) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.3/Serialization/src/Serialization.jl:196
 [11] serialize_msg(::Distributed.ClusterSerializer{Sockets.TCPSocket}, ::Distributed.CallMsg{:call_fetch}) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.3/Distributed/src/messages.jl:90
 [12] #invokelatest#1 at ./essentials.jl:708 [inlined]
 [13] invokelatest at ./essentials.jl:707 [inlined]
 [14] send_msg_(::Distributed.Worker, ::Distributed.MsgHeader, ::Distributed.CallMsg{:call_fetch}, ::Bool) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.3/Distributed/src/messages.jl:185
 [15] send_msg at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.3/Distributed/src/messages.jl:134 [inlined]
 [16] #remotecall_fetch#149(::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}, ::typeof(remotecall_fetch), ::Function, ::Distributed.Worker, ::Distributed.RRID, ::Vararg{Any,N} where N) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.3/Distributed/src/remotecall.jl:385
 [17] remotecall_fetch(::Function, ::Distributed.Worker, ::Distributed.RRID, ::Vararg{Any,N} where N) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.3/Distributed/src/remotecall.jl:382
 [18] #remotecall_fetch#152(::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}, ::typeof(remotecall_fetch), ::Function, ::Int64, ::Distributed.RRID, ::Vararg{Any,N} where N) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.3/Distributed/src/remotecall.jl:417
 [19] remotecall_fetch at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.3/Distributed/src/remotecall.jl:417 [inlined]
 [20] call_on_owner at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.3/Distributed/src/remotecall.jl:490 [inlined]
 [21] put!(::RemoteChannel{Channel{CSV.Rows}}, ::CSV.Rows{false,false,false,Parsers.Options{false,true,false,Missing,UInt8,Nothing}}) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.3/Distributed/src/remotecall.jl:591
 [22] top-level scope at REPL[8]:1

julia> VERSION
v"1.3.0-alpha.0"

Ah, I better understand what’s going on here now; the problem is that when you do put!(c2, d2), it’s actually trying to serialize the entire underlying file buffer to remote process, which of course is very wasteful/problematic (I didn’t hit it in my case because my new machine is pretty beefy and I was using a much smaller file (though still several gb)). Anyway, the solution here is to have each process do it’s own mmap/file loading, so something like:

using Distributed
addprocs(2)
@everywhere using CSV

@sync @distributed for i = 0:10_000:nrows
    d = CSV.Rows("large_file.tsv"; header=1, skipto=i + 1, limit=10_000)
end

There’s obviously a few different flavors here w/ Distributed constructs @fetch, etc., but hopefully you get the idea.

Thanks very much for your help. This looks better than my eventual solution, which I think (???) also works.


using Distributed 
addprocs()

@everywhere using CSV

@everywhere function define_and_work( ; start_row = 0, stop_row = manageable_number)
	f1 = CSV.Rows("large_file.tsv"; skipto = start_row, limit = stop_row )
	for el in f1
            #do something
	end 
	return #something 
end 

function work(x)
    nmwrk = length(workers())
    to_save = Array{Any,1}(undef, nmwrk)
	@sync for i in workers()
    	@async begin
    	    to_save[i-1] = remotecall_fetch(define_and_work, i, start_row = (i-2)*100_000, stop_row =  ((i-1)*100_000)-1)
    	end 
    end  
    return to_save 
end 

outp = work(1)