JuliaDB Parallel/Distributed Computing

I’m trying to learn how to do distributed/parallel computing and I’m using JuliaDB and running into several issues that I don’t understand.

First, I loaded 4 CSV files, each of which is 3GB - 4GB in size, like this:

using JuliaDB

loadtable(glob("data/*.csv"), output = "bin", chunks = 16, type_detect_rows = 2000)

I thought that the chunks = 16 argument would result in a distributed table with 16 chunks, but it didn’t do that (I guess it is just for merging CSVs together, not splitting them?). In the bin directory I have 4 files and when I call load("bin") I get a distributed table in 4 chunks that contains nearly 16 million rows and 286 columns

I then wrote a function to do some filtering/aggregating of the data and used @time to test how long its execution takes:

using JuliaDB, JuliaDBMeta, OnlineStats, Statistics
using JuliaDBMeta: @filter

tbl = load("bin")

function by_age(lower::Int64, upper::Int64, table)
    result = @applychunked table begin
        @where !ismissing(:AGEP) &&
        upper >= :AGEP >= lower
    end
    return reduce(+, result; select = :PWGTP)
end

@time by_age(35, 44, tbl)

The call to by_age takes around 60 seconds each time (after the initial run which takes > 2 mins):

julia> @time by_age(35, 44, tbl)
 67.165804 seconds (329.40 M allocations: 38.674 GiB, 9.58% gc time)
40735140

julia> @time by_age(35, 44, tbl)
 60.377765 seconds (328.61 M allocations: 38.634 GiB, 10.46% gc time)
40735140

julia> @time by_age(35, 44, tbl)
 58.394598 seconds (328.61 M allocations: 38.634 GiB, 10.79% gc time)
40735140

julia> @time by_age(35, 44, tbl)
 57.983220 seconds (328.61 M allocations: 38.634 GiB, 10.90% gc time)
40735140

Interestingly, if I swap the + in the by_age function with Sum() from OnlineStats, it’s not any faster:

function by_age(lower::Int64, upper::Int64, table)
    result = @applychunked table begin
        @where !ismissing(:AGEP) &&
        upper >= :AGEP >= lower
    end
    return reduce(Sum(), result; select = :PWGTP) # changed + to Sum()
end

julia> @time by_age(35, 44, tbl)
 58.877263 seconds (327.32 M allocations: 38.635 GiB, 10.90% gc time)
Sum: n=1845493 | value=4.07351e7

julia> @time by_age(35, 44, tbl)
 58.634385 seconds (326.76 M allocations: 38.606 GiB, 10.81% gc time)
Sum: n=1845493 | value=4.07351e7

julia> @time by_age(35, 44, tbl)
 60.791893 seconds (326.76 M allocations: 38.606 GiB, 10.78% gc time)
Sum: n=1845493 | value=4.07351e7

julia> @time by_age(35, 44, tbl)
 60.314000 seconds (326.76 M allocations: 38.606 GiB, 10.87% gc time)
Sum: n=1845493 | value=4.07351e7

After this, I loaded the Distributed package and added an additional process for a total of 2 processes (workers?). I’m really not sure what the correct way to do this is:

using Distributed

while length(procs()) < 2
    addprocs(1)
end

@everywhere using JuliaDB, JuliaDBMeta, OnlineStats, Statistics
@everywhere using JuliaDBMeta: @filter

I’m not sure if I need the @everywhere as I’ve placed it in the snippet above.

The code goes on like this:

tbl = load("bin")

@everywhere function by_age(lower::Int64, upper::Int64, table)
    result = @applychunked table begin
        @where !ismissing(:AGEP) &&
        upper >= :AGEP >= lower
    end
    return reduce(Sum(), result; select = :PWGTP)
end

Again, I don’t know if I need the @everywhere before the function definition.

I then proceed to call the by_age function, with @time, and after the first two calls (which were slower than before I added Distributed and the additional process) it crashes Julia and my IDE (VS Code with Julia extension). I tried this again and multiple calls to by_age always crashes the program.

I’m hoping someone can explain what the correct way to do this is. Any assistance would be greatly appreciated!!

So I changed the code as follows, and it didn’t speed up the initial run of the by_age function at all, and it still crashes if I call that function more than once (this does not happen when I only have one process/worker):

using Distributed

addprocs(3)

using JuliaDB, JuliaDBMeta, OnlineStats, Statistics
using JuliaDBMeta: @filter

tbl = load("bin")

function by_age(lower::Int64, upper::Int64, table)
    result = @applychunked table begin
        @where !ismissing(:AGEP) &&
        upper >= :AGEP >= lower
    end
    return reduce(+, result; select = :PWGTP)
end

The error message said there was a ReadOnlyMemoryError on Worker 3…

Just a guess, but can you checkout the latest master of MemPool and see if that fixes the crash (and start from the beginning, if you don’t mind…). It would also be great to post a full stacktrace if you can get one.

Regarding CSV chunking, I’m working on reviving a PR which would do chunked reading of one or more CSVs (splitting each CSV individually), and which will take good advantage of multiple workers if available: https://github.com/JuliaComputing/JuliaDB.jl/issues/288. It’s still a WIP, but is something to track if you’d like.

Your guess about chunks is correct.

The correct sequence of commands is:

using Distributed
addprocs(4) #do not make this higher than the number of physical cores and make sure you have enough memory
@everywhere JuliaDB, JuliaDBMeta, OnlineStats, Statistics
load("bin") #load is aware of the workers connected to your master process and utilizes them!

Your by_age only needs to be defined on the master process, no need for an everywhere.

Sum() really is for extremely large data sets. I am not surprised it is not faster.

Does Julia crash, or is there an error message, but you can continue to execute commands?
Please, report either error message and if possible a minimum working example of how to reproduce it.

1 Like

@jpsamaroo @MaximilianJHuber Thanks to you both for the help, I really appreciate it! I tweaked the code a bit this morning and I’m now unable to reproduce the problem. The only change I made was to add the indexcols argument when calling loadtable:

loadtable(glob("data/*.csv"), output = "bin", chunks = 4, type_detect_rows = 2000, indexcols = [:ST])

After that, I did everything exactly as I did yesterday and the code is running just fine (albeit not much faster than without adding additional processes):

using Distributed
addprocs(3)
@everywhere using JuliaDB, JuliaDBMeta, OnlineStats, Statistics
@everywhere using JuliaDBMeta: @filter

# loadtable(glob("data/*.csv"), output = "bin", chunks = 4, type_detect_rows = 2000, indexcols = [:ST])

tbl = load("bin")

function by_age(lower::Int64, upper::Int64, state::Int64)
    @applychunked tbl begin
        @where :ST == state &&
        !ismissing(:AGEP) &&
        upper >= :AGEP >= lower
        @groupby _ :PUMA { total = sum(:PWGTP) }
    end
end

Calling the function multiple times results in the following:

julia> @time by_age(24, 35, 12)
  2.340554 seconds (3.67 k allocations: 164.938 KiB)
Distributed Table with 151 rows in 1 chunks:
PUMA  total
───────────
101   30554
102   18177
500   46048
901   25682
902   12937
903   17459
904   18965
1101  21653
1102  18368
1103  16801
⋮

julia> @time by_age(24, 35, 12)
  2.391163 seconds (3.67 k allocations: 164.813 KiB)
Distributed Table with 151 rows in 1 chunks:
PUMA  total
───────────
101   30554
102   18177
500   46048
901   25682
902   12937
903   17459
904   18965
1101  21653
1102  18368
1103  16801
⋮

julia> @time by_age(24, 35, 12)
  2.178894 seconds (3.67 k allocations: 164.813 KiB)
Distributed Table with 151 rows in 1 chunks:
PUMA  total
───────────
101   30554
102   18177
500   46048
901   25682
902   12937
903   17459
904   18965
1101  21653
1102  18368
1103  16801
⋮

julia> @time by_age(24, 35, 12)
  2.320857 seconds (3.67 k allocations: 164.813 KiB)
Distributed Table with 151 rows in 1 chunks:
PUMA  total
───────────
101   30554
102   18177
500   46048
901   25682
902   12937
903   17459
904   18965
1101  21653
1102  18368
1103  16801
⋮

For your reference, the data I am working with is from the U.S. Census Bureau and can be downloaded here: https://www2.census.gov/programs-surveys/acs/data/pums/2017/5-Year/csv_pus.zip

The ZIP file contains 4 CSV files (psam_pusa, psam_pusb, psam_pusc, psam_pusd). These are the files that I’m loading via the loadtable function in my code above. You should be able to reproduce my code with these files. The CSV files range in size from 3.1GB - 4.3GB and after converting to IndexedTables they balloon in size to 6.2GB - 8.5GB.

My CPU has 8 cores and I’m only working with 8GB RAM.

1 Like

Great to hear that it works now! Would you mind filing an issue, and pointing out how adding indexcols fixes the crash? (If you haven’t already, I haven’t checked)

At this point, is the only issue the fact that using multiple workers doesn’t speed things up? If so, could you try to make your query a bit larger (so it takes longer), and watch your OS’ process monitor to see if all CPUs are engaged? Checking if you’re using your full disk I/O bandwidth would also be good information to have (in case you’re bottlenecking on I/O).

I have the census dataset on hand, so if I get some spare time later today or tomorrow, I will try your MWE locally and see if I can debug why you aren’t seeing a speedup.

The ballooning of the size is alright. I do the math here. There is also some code that makes the file much smaller, though I do not understand how. It is a bit suspicious, hence the issue on GitHub.

Is the run-time too slow for you? You could @select only the columns you need and reduce the file size by a lot and probably speed up things.

So the issue isn’t fixed after all, if I run the by_age function without filtering down to the state level (I forgot I had made this change) it again crashes if I try to call it more than once or twice. This time I was able to capture a portion of the error code (my REPL turned into gibberish after the crash):

ERROR:       From worker 4:     ReadOnlyMemoryError()ProcessExitedException()
try_yieldto(::typeof(Base.ensure_rescheduled), ::Base.RefValue{Task}) at .\event.jl:196
wait() at .\event.jl:255
wait(::Condition) at .\event.jl:46
wait_impl at .\channels.jl:362 [inlined]
wait at .\channels.jl:358 [inlined]
take_buffered(::Channel{Any}) at .\channels.jl:317
take!(::Channel{Any}) at .\channels.jl:315
take!(::Distributed.RemoteValue) at C:\cygwin\home\Administrator\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.1\Distributed\src\remotecall.jl:576
#remotecall_fetch#149(::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}, ::Function, ::Function, ::Distributed.Worker, ::Dagger.Context, ::Vararg{Any,N} where N) at C:\cygwin\home\Administrator\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.1\Distributed\src\remotecall.jl:375
remotecall_fetch(::Function, ::Distributed.Worker, ::Dagger.Context, ::Vararg{Any,N} where N) at C:\cygwin\home\Administrator\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.1\Distributed\src\remotecall.jl:371
#remotecall_fetch#152(::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}, ::Function, ::Function, ::Int64, ::Dagger.Context, ::Vararg{Any,N} where N) at C:\cygwin\home\Administrator\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.1\Distributed\src\remotecall.jl:406
remotecall_fetch at C:\cygwin\home\Administrator\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.1\Distributed\src\remotecall.jl:406 [inlined]
macro expansion at C:\Users\mthel\.julia\packages\Dagger\sdZXi\src\scheduler.jl:272 [inlined]
(::getfield(Dagger.Sch, Symbol("##13#14")){Dagger.Context,Dagger.OSProc,Int64,getfield(JuliaDBMeta, Symbol("##22#23")){getfield(Main, Symbol("##3#5")){Int64,Int64}},Tuple{Dagger.Chunk{Any,MemPool.FileRef}},Channel{Any},Bool,Bool,Bool})() at .\task.jl:259

Also, I tried running all of this on a friend’s machine (CPU has 16 cores and he also has 32GB of RAM) and it all worked flawlessly. I was able to monitor the CPUs and all of them engaged as expected…I’m wondering if I’m not just running up against hardware limitations on my machine?

I would hope that your 8 CPU 8 GB RAM machine would be suitable for use with JuliaDB; I think this is just a bug somewhere that needs fixing.

It looks like the worker crashed while in the middle of switching tasks during a wait() call; could you post the output of versioninfo()?

@mthelm85 can you please first split the CSV files:

files = glob("/data/*.csv")

for f in files
    open(f) do file
        i = 1
        g = open(f * "1.csv", "w")
        
        header = readline(file)
        
        for ln in eachline(file)
            if i % 1000000 == 1
                close(g)
                g = open(f * string(floor(Int64,(i-1)/1000000) + 1) * ".csv", "w")
                write(g, header * "\n") #write header
            end

            write(g, ln * "\n")
            i += 1
        end
        close(g)
    end
end

rm.(files)

loadtable(glob("/data/*.csv"), 
    output = "/bin", 
    chunks = length(glob("/data/*.csv")))
)

and see if the error still appears? This splitting up should allow your 8GB RAM easily hold multiple chuncks (each < 2GB).

Okay, I gave it a try. On my 16GB RAM, 4 core machine I have to split the CSVs, otherwise I get an out-of-memory error while loadtable.

Then I run:

using JuliaDB, JuliaDBMeta

tbl = load("C:/Users/Max/Desktop/databin")

function by_age(lower::Int64, upper::Int64, state::Int64)
    @applychunked tbl begin
        @where :ST == state &&
        !ismissing(:AGEP) &&
        upper >= :AGEP >= lower
        @groupby _ :PUMA { total = sum(:PWGTP) }
    end
end

@time by_age(24, 35, 12)

and get:

Internal error: encountered unexpected error in runtime:
ReadOnlyMemoryError()
reset_page at /home/Administrator/buildbot/worker/package_win64/build/src\gc.c:1045 [inlined]
add_page at /home/Administrator/buildbot/worker/package_win64/build/src\gc.c:1080
jl_gc_alloc at /home/Administrator/buildbot/worker/package_win64/build/src\gc.c:1142
_new_array_ at /home/Administrator/buildbot/worker/package_win64/build/src\array.c:100
_new_array at /home/Administrator/buildbot/worker/package_win64/build/src\array.c:160 [inlined]
jl_alloc_array_1d at /home/Administrator/buildbot/worker/package_win64/build/src\array.c:420
Type at .\boot.jl:402 [inlined]
Type at .\abstractdict.jl:509 [inlined]
Type at .\abstractdict.jl:675 [inlined]
adce_pass! at .\compiler/ssair\passes.jl:877
run_passes at .\compiler/ssair\driver.jl:126
optimize at .\compiler\optimize.jl:164
typeinf at .\compiler\typeinfer.jl:35
abstract_call_method_with_const_args at .\compiler\abstractinterpretation.jl:198
abstract_call_gf_by_type at .\compiler\abstractinterpretation.jl:107
abstract_call at .\compiler\abstractinterpretation.jl:776
abstract_eval_call at .\compiler\abstractinterpretation.jl:805
abstract_eval at .\compiler\abstractinterpretation.jl:890
typeinf_local at .\compiler\abstractinterpretation.jl:1135
typeinf_nocycle at .\compiler\abstractinterpretation.jl:1191
typeinf at .\compiler\typeinfer.jl:14
typeinf_edge at .\compiler\typeinfer.jl:497
abstract_call_method at .\compiler\abstractinterpretation.jl:345
abstract_call_gf_by_type at .\compiler\abstractinterpretation.jl:85
abstract_call at .\compiler\abstractinterpretation.jl:776
abstract_eval_call at .\compiler\abstractinterpretation.jl:805
abstract_eval at .\compiler\abstractinterpretation.jl:890
typeinf_local at .\compiler\abstractinterpretation.jl:1135

[....to long to post here....]

Please submit a bug report with steps to reproduce this fault, and any error messages that follow (in their entirety). Thanks.
Exception: UNKNOWN at 0x7ff845459129 -- RaiseException at C:\WINDOWS\System32\KERNELBASE.dll (unknown line)
in expression starting at no file:0
RaiseException at C:\WINDOWS\System32\KERNELBASE.dll (unknown line)
Unwind_RaiseException at C:\Users\Max\AppData\Local\Julia-1.1.0\bin\libgcc_s_seh-1.dll (unknown line)
_cxa_throw at C:\Users\Max\AppData\Local\Julia-1.1.0\bin\libstdc++-6.dll (unknown line)
Znwy at C:\Users\Max\AppData\Local\Julia-1.1.0\bin\libstdc++-6.dll (unknown line)
ZNSt6vectorIN4llvm5SUnitESaIS1_EE7reserveEy at C:\Users\Max\AppData\Local\Julia-1.1.0\bin\LLVM.dll (unknown line)
ZN4llvm18ScheduleDAGSDNodes15BuildSchedUnitsEv at C:\Users\Max\AppData\Local\Julia-1.1.0\bin\LLVM.dll (unknown line)
ZN4llvm18ScheduleDAGSDNodes15BuildSchedGraphEPNS_9AAResultsE at C:\Users\Max\AppData\Local\Julia-1.1.0\bin\LLVM.dll (unknown line)
ZN4llvm28createHybridListDAGSchedulerEPNS_16SelectionDAGISelENS_10CodeGenOpt5LevelE at C:\Users\Max\AppData\Local\Julia-1.1.0\bin\LLVM.dll (unknown line)
ZN4llvm16SelectionDAGISel17CodeGenAndEmitDAGEv at C:\Users\Max\AppData\Local\Julia-1.1.0\bin\LLVM.dll (unknown line)
ZN4llvm16SelectionDAGISel20SelectAllBasicBlocksERKNS_8FunctionE at C:\Users\Max\AppData\Local\Julia-1.1.0\bin\LLVM.dll (unknown line)
ZN4llvm16SelectionDAGISel20runOnMachineFunctionERNS_15MachineFunctionE at C:\Users\Max\AppData\Local\Julia-1.1.0\bin\LLVM.dll (unknown line)
ZN4llvm16createX86ISelDagERNS_16X86TargetMachineENS_10CodeGenOpt5LevelE at C:\Users\Max\AppData\Local\Julia-1.1.0\bin\LLVM.dll (unknown line)
ZN4llvm19MachineFunctionPass13runOnFunctionERNS_8FunctionE at C:\Users\Max\AppData\Local\Julia-1.1.0\bin\LLVM.dll (unknown line)
ZN4llvm13FPPassManager13runOnFunctionERNS_8FunctionE at C:\Users\Max\AppData\Local\Julia-1.1.0\bin\LLVM.dll (unknown line)
ZN4llvm13FPPassManager11runOnModuleERNS_6ModuleE at C:\Users\Max\AppData\Local\Julia-1.1.0\bin\LLVM.dll (unknown line)
ZN4llvm6legacy15PassManagerImpl3runERNS_6ModuleE at C:\Users\Max\AppData\Local\Julia-1.1.0\bin\LLVM.dll (unknown line)

signal (22): SIGABRT
in expression starting at no file:0
crt_sig_handler at /home/Administrator/buildbot/worker/package_win64/build/src/home/Administrator/buildbot/worker/package_win64/build/src\signals-win.c:91
raise at C:\WINDOWS\System32\msvcrt.dll (unknown line)
abort at C:\WINDOWS\System32\msvcrt.dll (unknown line)
ZN9__gnu_cxx27__verbose_terminate_handlerEv at C:\Users\Max\AppData\Local\Julia-1.1.0\bin\libstdc++-6.dll (unknown line)
ZGTtNSt16invalid_argumentD1Ev at C:\Users\Max\AppData\Local\Julia-1.1.0\bin\libstdc++-6.dll (unknown line)
ZSt9terminatev at C:\Users\Max\AppData\Local\Julia-1.1.0\bin\libstdc++-6.dll (unknown line)
_cxa_throw at C:\Users\Max\AppData\Local\Julia-1.1.0\bin\libstdc++-6.dll (unknown line)
Znwy at C:\Users\Max\AppData\Local\Julia-1.1.0\bin\libstdc++-6.dll (unknown line)
ZN4llvm9DWARFUnit42determineStringOffsetsTableContributionDWOERNS_18DWARFDataExtractorEy at C:\Users\Max\AppData\Local\Julia-1.1.0\bin\LLVM.dll (unknown line)
ZNK4llvm9DWARFUnit19extractDIEsToVectorEbbRSt6vectorINS_19DWARFDebugInfoEntryESaIS2_EE at C:\Users\Max\AppData\Local\Julia-1.1.0\bin\LLVM.dll (unknown line)
ZN4llvm9DWARFUnit23getSubroutineForAddressEy at C:\Users\Max\AppData\Local\Julia-1.1.0\bin\LLVM.dll (unknown line)
ZN4llvm9DWARFUnit25getInlinedChainForAddressEyRNS_15SmallVectorImplINS_8DWARFDieEEE at C:\Users\Max\AppData\Local\Julia-1.1.0\bin\LLVM.dll (unknown line)
ZN4llvm12DWARFContext25getInliningInfoForAddressEyNS_19DILineInfoSpecifierE at C:\Users\Max\AppData\Local\Julia-1.1.0\bin\LLVM.dll (unknown line)

or

fatal: error thrown and no exception handler available.
ReadOnlyMemoryError()
reset_page at /home/Administrator/buildbot/worker/package_win64/build/src\gc.c:1045 [inlined]
add_page at /home/Administrator/buildbot/worker/package_win64/build/src\gc.c:1080
jl_gc_pool_alloc at /home/Administrator/buildbot/worker/package_win64/build/src\gc.c:1142

Plus, most of the time I execute this code some other application (Slack, a Chrome tab, VS code, …) crashes!

@mthelm85 I opened an issue on GitHub with an MWE.

To be honest, I do not believe the issue will be fixed in time such that it will not keep you from doing your research. The reason is that the developers of the JuliaDB stack are busy with other things. However, let me propose a workaround:

Only loadtable columns that you need:

loadtable(glob("data/*.csv"), 
    output = "bin", 
    type_detect_rows = 2000,
    indexcols = [:SERIALNO], 
    datacols = [:PUMA, :AGEP, :PWGTP],
    chunks = length(glob("data/*.csv"))
)

I tested it and did not get the wild error above.

@MaximilianJHuber @jpsamaroo You are awesome, thank so you much. I really appreciate your assistance.

The output of versioninfo() is:

Julia Version 1.1.0
Commit 80516ca202 (2019-01-21 21:24 UTC)
Platform Info:
  OS: Windows (x86_64-w64-mingw32)
  CPU: Intel(R) Core(TM) i7-3770 CPU @ 3.40GHz
  WORD_SIZE: 64
  LIBM: libopenlibm
  LLVM: libLLVM-6.0.1 (ORCJIT, ivybridge)
Environment:
  JULIA_EDITOR = "C:\Users\mthel\AppData\Local\Programs\Microsoft VS Code Insiders\Code - Insiders.exe"