I have a very large (160 Gb) csv file. I had the idea to do some processing of the file in parallel, but I’m stuck on an error in CSV.jl
. What I wanted to do was use CSV.Rows
on non-contiguous chunks of the file across different local processes, do my task, the combine the results.
I find that I am unable to move CSV.Rows
objects across processes - this error is Example 1. I also find that I get an error if I try to read parts of the file using CSV.File
and move those pieces - this error is Example 2.
Example 1:
using Distributed, Tables, CSV
addprocs(4)
@everywhere using Tables, CSV
# create CSV.Rows for non-overlapping pieces of 10_000 lines each
d1 = CSV.Rows("large_file.tsv"; header = 1, limit = 10_000);
d2 = CSV.Rows("large_file.tsv"; header = 1, skipto = 10_001, limit = 10_000);
c2 = RemoteChannel( ()->Channel{CSV.Rows}(10), 2)
put!(c2, d2)
# Fails with the following message:
ERROR: IOError: write: broken pipe (EPIPE)
Stacktrace:Worker 2 terminated.
[1] try_yieldto(::typeof(Base.ensure_rescheduled), ::Base.RefValue{Task}) at ./event.jl:196
[2] wait() at ./event.jl:255
[3] uv_write(::Sockets.TCPSocket, ::Ptr{UInt8}, ::UInt64) at ./stream.jl:853
[4] unsafe_write(::Sockets.TCPSocket, ::Ptr{UInt8}, ::UInt64) at ./stream.jl:911
[5] unsafe_write at ./io.jl:509 [inlined]
[6] macro expansion at ./gcutils.jl:87 [inlined]
[7] write at ./io.jl:532 [inlined]
[8] serialize_array_data at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Serialization/src/Serialization.jl:246 [inlined]
[9] serialize(::Distributed.ClusterSerializer{Sockets.TCPSocket}, ::Array{UInt8,1}) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Serialization/src/Serialization.jl:263
[10] serialize_any(::Distributed.ClusterSerializer{Sockets.TCPSocket}, ::Any) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Serialization/src/Serialization.jl:638
[11] serialize at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Serialization/src/Serialization.jl:617 [inlined]
[12] serialize(::Distributed.ClusterSerializer{Sockets.TCPSocket}, ::Tuple{Distributed.RRID,CSV.Rows{false,false,false,Parsers.Options{false,true,false,Missing,UInt8,Nothing}}}) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Serialization/src/Serialization.jl:196
[13] serialize_msg(::Distributed.ClusterSerializer{Sockets.TCPSocket}, ::Distributed.CallMsg{:call_fetch}) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/messages.jl:90
[14] #invokelatest#1 at ./essentials.jl:742 [inlined]
[15] invokelatest at ./essentials.jl:741 [inlined]
[16] send_msg_(::Distributed.Worker, ::Distributed.MsgHeader, ::Distributed.CallMsg{:call_fetch}, ::Bool) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/messages.jl:185
[17] send_msg at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/messages.jl:134 [inlined]
[18] #remotecall_fetch#149(::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}, ::Function, ::Function, ::Distributed.Worker, ::Distributed.RRID, ::Vararg{Any,N} where N) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:374
[19] remotecall_fetch(::Function, ::Distributed.Worker, ::Distributed.RRID, ::Vararg{Any,N} where N) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:371
[20] #remotecall_fetch#152(::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}, ::Function, ::Function, ::Int64, ::Distributed.RRID, ::Vararg{Any,N} where N) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:406
[21] remotecall_fetch at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:406 [inlined]
[22] call_on_owner at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:479 [inlined]
[23] put!(::RemoteChannel{Channel{CSV.Rows}}, ::CSV.Rows{false,false,false,Parsers.Options{false,true,false,Missing,UInt8,Nothing}}) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:572
[24] top-level scope at none:0
This also fails if I try to move CSV.File
objects instead - Example 2:
using Distributed, Tables, CSV
addprocs(4)
4-element Array{Int64,1}:
2
3
4
5
@everywhere using Tables, CSV
d1 = CSV.File("large_file.tsv"; header = 1, limit = 10_000)
c2 = RemoteChannel( ()-> Channel{CSV.File}(10), 2)
put!(c2, d1)
ERROR: IOError: write: broken pipe (EPIPE)
Stacktrace:Worker 2 terminated.
[1] try_yieldto(::typeof(Base.ensure_rescheduled), ::Base.RefValue{Task}) at ./event.jl:196
[2] wait() at ./event.jl:255
[3] uv_write(::Sockets.TCPSocket, ::Ptr{UInt8}, ::UInt64) at ./stream.jl:853
[4] unsafe_write(::Sockets.TCPSocket, ::Ptr{UInt8}, ::UInt64) at ./stream.jl:911
[5] unsafe_write at ./io.jl:509 [inlined]
[6] macro expansion at ./gcutils.jl:87 [inlined]
[7] write at ./io.jl:532 [inlined]
[8] serialize_array_data at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Serialization/src/Serialization.jl:246 [inlined]
[9] serialize(::Distributed.ClusterSerializer{Sockets.TCPSocket}, ::Array{UInt8,1}) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Serialization/src/Serialization.jl:263
[10] serialize_any(::Distributed.ClusterSerializer{Sockets.TCPSocket}, ::Any) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Serialization/src/Serialization.jl:638
[11] serialize at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Serialization/src/Serialization.jl:617 [inlined]
[12] serialize(::Distributed.ClusterSerializer{Sockets.TCPSocket}, ::Tuple{Distributed.RRID,CSV.File}) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Serialization/src/Serialization.jl:196
[13] serialize_msg(::Distributed.ClusterSerializer{Sockets.TCPSocket}, ::Distributed.CallMsg{:call_fetch}) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/messages.jl:90
[14] #invokelatest#1 at ./essentials.jl:742 [inlined]
[15] invokelatest at ./essentials.jl:741 [inlined]
[16] send_msg_(::Distributed.Worker, ::Distributed.MsgHeader, ::Distributed.CallMsg{:call_fetch}, ::Bool) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/messages.jl:185
[17] send_msg at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/messages.jl:134 [inlined]
[18] #remotecall_fetch#149(::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}, ::Function, ::Function, ::Distributed.Worker, ::Distributed.RRID, ::Vararg{Any,N} where N) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:374
[19] remotecall_fetch(::Function, ::Distributed.Worker, ::Distributed.RRID, ::Vararg{Any,N} where N) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:371
[20] #remotecall_fetch#152(::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}, ::Function, ::Function, ::Int64, ::Distributed.RRID, ::Vararg{Any,N} where N) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:406
[21] remotecall_fetch at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:406 [inlined]
[22] call_on_owner at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:479 [inlined]
[23] put!(::RemoteChannel{Channel{CSV.File}}, ::CSV.File) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:572
[24] top-level scope at none:0
I appreciate any advice! Julia VERSION
is v"1.1.1"
. CSV
version is v0.5.11