I have many data files to process (calculate the median of each row and then compare the medians together) and I would like top process them in parallel since the first steps are independent.
I tried the following approach after a
export JULIA_NUM_THREADS=4
in the Linux terminal
but I don’t see any “acceleration” even on big dataframes of 100 Mo. So what is the best approach ?
Thank you very much !
f = readdir("data/")
Threads.@threads for file in f
dataFile = "data/" * file
readTable(dataFile, sep, h)
end
Are you sure that disk I/O is not the bottleneck? For a similar problem, which was not CPU-bound, I experienced a slight slowdown from parallelization even with an SSD. HDD could be much worse.
Recently I did a program in GO to read a Fasta file of 175 Mo. The time to read this file was < 1 sec.
When I read 2 dataframes (which are of course much more complex than Fasta files) of 105 Mo sequentially in Julia, I need 9.5 sec (excluding precompilation time) and I have 100% CPU load during this time, indicating that the botteneck is the CPU.
When I tried the code above, the computation time was the same (9.5 sec) and I have 100% CPU load on one CPU core.
Then if I export JULIA_NUM_THREADS=4 in the terminal and try again I have big troubles (see segmentation error).
So I have the feeling that my code have a big problem or my approach is wrong. But I do know that the disk speed is not the bottleneck in my case.
signal (11): Erreur de segmentation
unknown function (ip: 0xa)
Allocations: 2356870 (Pool: 2355558; Big: 1312); GC: 2
Erreur de segmentation (core dumped)
Fred, I have to ask. How many CPU cores do you have? Also do you have hyperthreading enabled?
If we guess that you have two cores with HT enabled, how about thinking of only using two threads?
You could either switch off the HT in the BIOS, or in Linux you can set each second CPU to offline.
Actually this is a case where I would expect HT to be good - if the process is stalled wiating for IO then the other thread on that core should get some work done.
You have two issues: speed, and the segmentation fault (which you did not mention before). I would try to deal with the latter first. If you post a MWE, you may get help here.
This is a working example. I did not managed to attach files, only image files are authorized
to see the segmentation fault, try this example before and after typing
export JULIA_NUM_THREADS=2
in the terminal
using DataFrames
# read dataframe
function readTable(file, sep, h)
println("Reading...\t", file)
x = readtable(file , separator = sep, header = h) # read data file
return x
end
function main()
sep = '\t' # table separator
h = true # table header
# process data
f = ["data.csv", "data2.csv"]
Threads.@threads for file in f
tab = readTable(file, sep, h)
end
end
##########################################
tic()
main()
toc()
I’m not sure what readtable is calling, but I found that using CSV.read and specifying the columns types was the fastest to read CSVs. It seems readtable also as an option to specify types.
$ julia DataThreadCSV.jl
WARNING: Method definition ==(Base.Nullable{S}, Base.Nullable{T}) in module Base at nullable.jl:238 overwritten in module NullableArrays at /home/fred/.julia/v0.6/NullableArrays/src/operators.jl:99.
Reading… data.csv
Reading… data2.csv
elapsed time: 3.875885983 seconds
# DataThreadCSV.jl
using CSV
##########################################
# read dataframe
function readTable(file, sep, h)
println("Reading...\t", file)
x = CSV.read(file ; delim = sep, types=Dict(1=>String), header = h, null="NA") # read data file
return x
end
function main()
sep = '\t' # table separator
h = true # table header
# process data
f = ["data.csv", "data2.csv"]
Threads.@threads for file in f
tab = readTable(file, sep, h)
end
end
##########################################
tic()
main()
toc()
# DataThread.jl
using DataFrames
# read dataframe
function readTable(file, sep, h)
println("Reading...\t", file)
x = readtable(file , separator = sep, header = h) # read data file
return x
end
function main()
sep = '\t' # table separator
h = true # table header
# process data
f = ["data.csv", "data2.csv"]
Threads.@threads for file in f
tab = readTable(file, sep, h)
end
end
##########################################
tic()
main()
toc()
versioninfo()
Julia Version 0.6.0
Commit 9036443 (2017-06-19 13:05 UTC)
Platform Info:
OS: Linux (x86_64-pc-linux-gnu)
CPU: Intel(R) Core(TM) i7-4770 CPU @ 3.40GHz
WORD_SIZE: 64
BLAS: libopenblas (USE64BITINT DYNAMIC_ARCH NO_AFFINITY Haswell)
LAPACK: libopenblas64_
LIBM: libopenlibm
LLVM: libLLVM-3.9.1 (ORCJIT, haswell)
$ julia DataThreadCSV.jl
WARNING: Method definition ==(Base.Nullable{S}, Base.Nullable{T}) in module Base at nullable.jl:238 overwritten in module NullableArrays at /home/fred/.julia/v0.6/NullableArrays/src/operators.jl:99.
Reading... data.csv
Reading... data2.csv
elapsed time: 3.938964452 seconds
$ export JULIA_NUM_THREADS=4
$ julia DataThreadCSV.jl
WARNING: Method definition ==(Base.Nullable{S}, Base.Nullable{T}) in module Base at nullable.jl:238 overwritten in module NullableArrays at /home/fred/.julia/v0.6/NullableArrays/src/operators.jl:99.
Reading... Reading... data2.csv
Error thrown in threaded loop on thread 1: UndefRefError()
Error thrown in threaded loop on thread 0: ErrorException("unlock count must match lock count")elapsed time: ERROR: fatal: error thrown and no exception handler available.
UndefRefError()
rec_backtrace at /home/centos/buildbot/slave/package_tarball64/build/src/stackwalk.c:84
record_backtrace at /home/centos/buildbot/slave/package_tarball64/build/src/task.c:245
jl_throw at /home/centos/buildbot/slave/package_tarball64/build/src/task.c:564
shift! at ./array.jl:788
wait at ./event.jl:223
uv_write at ./stream.jl:811
unsafe_write at ./stream.jl:832
print at ./strings/io.jl:122 [inlined]
#with_output_color#509 at ./util.jl:404
jl_call_fptr_internal at /home/centos/buildbot/slave/package_tarball64/build/src/julia_internal.h:339 [inlined]
jl_call_method_internal at /home/centos/buildbot/slave/package_tarball64/build/src/julia_internal.h:358 [inlined]
jl_apply_generic at /home/centos/buildbot/slave/package_tarball64/build/src/gf.c:1933
jl_apply at /home/centos/buildbot/slave/package_tarball64/build/src/julia.h:1424 [inlined]
jl_f__apply at /home/centos/buildbot/slave/package_tarball64/build/src/builtins.c:426
#with_output_color at ./<missing>:0
jl_call_fptr_internal at /home/centos/buildbot/slave/package_tarball64/build/src/julia_internal.h:339 [inlined]
jl_call_method_internal at /home/centos/buildbot/slave/package_tarball64/build/src/julia_internal.h:358 [inlined]
jl_apply_generic at /home/centos/buildbot/slave/package_tarball64/build/src/gf.c:1933
jl_apply at /home/centos/buildbot/slave/package_tarball64/build/src/julia.h:1424 [inlined]
jl_f__apply at /home/centos/buildbot/slave/package_tarball64/build/src/builtins.c:426
#print_with_color#510 at ./util.jl:417
jl_call_fptr_internal at /home/centos/buildbot/slave/package_tarball64/build/src/julia_internal.h:339 [inlined]
jl_call_method_internal at /home/centos/buildbot/slave/package_tarball64/build/src/julia_internal.h:358 [inlined]
jl_apply_generic at /home/centos/buildbot/slave/package_tarball64/build/src/gf.c:1933
jl_apply at /home/centos/buildbot/slave/package_tarball64/build/src/julia.h:1424 [inlined]
jl_f__apply at /home/centos/buildbot/slave/package_tarball64/build/src/builtins.c:426
#print_with_color at ./<missing>:0
jl_call_fptr_internal at /home/centos/buildbot/slave/package_tarball64/build/src/julia_internal.h:339 [inlined]
jl_call_method_internal at /home/centos/buildbot/slave/package_tarball64/build/src/julia_internal.h:358 [inlined]
jl_apply_generic at /home/centos/buildbot/slave/package_tarball64/build/src/gf.c:1933
display_error at ./client.jl:131
unknown function (ip: 0x7fb5a806cb2d)
jl_call_fptr_internal at /home/centos/buildbot/slave/package_tarball64/build/src/julia_internal.h:339 [inlined]
jl_call_method_internal at /home/centos/buildbot/slave/package_tarball64/build/src/julia_internal.h:358 [inlined]
jl_apply_generic at /home/centos/buildbot/slave/package_tarball64/build/src/gf.c:1933
display_error at ./client.jl:140
unknown function (ip: 0x7fb5a806c596)
jl_call_fptr_internal at /home/centos/buildbot/slave/package_tarball64/build/src/julia_internal.h:339 [inlined]
jl_call_method_internal at /home/centos/buildbot/slave/package_tarball64/build/src/julia_internal.h:358 [inlined]
jl_apply_generic at /home/centos/buildbot/slave/package_tarball64/build/src/gf.c:1933
do_call at /home/centos/buildbot/slave/package_tarball64/build/src/interpreter.c:75
eval at /home/centos/buildbot/slave/package_tarball64/build/src/interpreter.c:242
eval_body at /home/centos/buildbot/slave/package_tarball64/build/src/interpreter.c:539
jl_toplevel_eval_body at /home/centos/buildbot/slave/package_tarball64/build/src/interpreter.c:511
jl_toplevel_eval_flex at /home/centos/buildbot/slave/package_tarball64/build/src/toplevel.c:571
jl_toplevel_eval_in at /home/centos/buildbot/slave/package_tarball64/build/src/builtins.c:496
eval at ./boot.jl:235
unknown function (ip: 0x7fb5c1b7c39f)
jl_call_fptr_internal at /home/centos/buildbot/slave/package_tarball64/build/src/julia_internal.h:339 [inlined]
jl_call_method_internal at /home/centos/buildbot/slave/package_tarball64/build/src/julia_internal.h:358 [inlined]
jl_apply_generic at /home/centos/buildbot/slave/package_tarball64/build/src/gf.c:1933
_start at ./client.jl:417
unknown function (ip: 0x7fb5c1bbe6e8)
jl_call_fptr_internal at /home/centos/buildbot/slave/package_tarball64/build/src/julia_internal.h:339 [inlined]
jl_call_method_internal at /home/centos/buildbot/slave/package_tarball64/build/src/julia_internal.h:358 [inlined]
jl_apply_generic at /home/centos/buildbot/slave/package_tarball64/build/src/gf.c:1933
unknown function (ip: 0x401e41)
unknown function (ip: 0x401662)
__libc_start_main at /build/glibc-bfm8X4/glibc-2.23/csu/../csu/libc-start.c:291
unknown function (ip: 0x40170c)
IO is not yet threadsafe. However, if all the read files can fit into memory, you can read files concurrently using @async (in a @sync block) and then process them concurrently on multiple threads.
I just have a little problem to use “readtable” on each thread with this strategy
$ julia DataAsync.jl
From worker 2: Reading… data.csv
From worker 3: Reading… data2.csv
ERROR: LoadError: On worker 2:
UndefVarError: readtable not defined
using DataFrames
addprocs(2)
# read dataframe
@everywhere function readTable(file, sep, h)
println("Reading...\t", file)
x = readtable(file , separator = sep, header = h) # read data file
return x
end
function main()
sep = '\t' # table separator
h = true # table header
func_future = Array{Future, 1}()
# process data
f = ["data.csv", "data2.csv"]
for file in f
func_fut = @spawn readTable(file, sep, h)
push!(func_future, func_fut)
end
fetch(func_future[1])
fetch(func_future[2])
end
##########################################
tic()
main()
toc()
What I planed to do was a little more complex : if I have 50 tables but only 4 fit into memory, my idea was to use 4 simultaneous threads to read and to process the tables 4 by 4
You can try both these approaches and see which fits your use case better.
Distributed, multi-process - use addprocs(N) and pmap
pseudocode:
addprocs()
@everywhere begin
using DataFrames
function process_file(fname)
.......
end
end
results = pmap(process_file, list_of_files)
Multi-threaded, single process. Process in sets of N. Psuedocode would be something like this:
const N = 4
const data_chnl = Channel{Any}(N)
@schedule begin
@sync for f in list_of_files
@async put!(data_chnl, readTable(f, sep, h))
end
close(data_chnl)
end
data=[]
for d in data_chnl
push!(data, d)
if length(data) == N
Threads.@threads for d2 in data
process_read_file(d2)
end
empty!(data)
end
end
if length(data) > 0
Threads.@threads for d2 in data
process_read_file(d2)
end
empty!(data)
end