Reading and processing Data files concurrently

Hi,

I have many data files to process (calculate the median of each row and then compare the medians together) and I would like top process them in parallel since the first steps are independent.

I tried the following approach after a
export JULIA_NUM_THREADS=4
in the Linux terminal

but I don’t see any “acceleration” even on big dataframes of 100 Mo. So what is the best approach ?

Thank you very much !

 f = readdir("data/")
   
   Threads.@threads for file in f
        dataFile = "data/" * file
        readTable(dataFile, sep, h)
    end

Are you sure that disk I/O is not the bottleneck? For a similar problem, which was not CPU-bound, I experienced a slight slowdown from parallelization even with an SSD. HDD could be much worse.

Thanks for your comments !

Recently I did a program in GO to read a Fasta file of 175 Mo. The time to read this file was < 1 sec.

When I read 2 dataframes (which are of course much more complex than Fasta files) of 105 Mo sequentially in Julia, I need 9.5 sec (excluding precompilation time) and I have 100% CPU load during this time, indicating that the botteneck is the CPU.

When I tried the code above, the computation time was the same (9.5 sec) and I have 100% CPU load on one CPU core.

Then if I export JULIA_NUM_THREADS=4 in the terminal and try again I have big troubles (see segmentation error).

So I have the feeling that my code have a big problem or my approach is wrong. But I do know that the disk speed is not the bottleneck in my case.

signal (11): Erreur de segmentation
unknown function (ip: 0xa)
Allocations: 2356870 (Pool: 2355558; Big: 1312); GC: 2
Erreur de segmentation (core dumped)

Fred, I have to ask. How many CPU cores do you have? Also do you have hyperthreading enabled?

If we guess that you have two cores with HT enabled, how about thinking of only using two threads?
You could either switch off the HT in the BIOS, or in Linux you can set each second CPU to offline.

Actually this is a case where I would expect HT to be good - if the process is stalled wiating for IO then the other thread on that core should get some work done.

Hi John,

I have a Core i7-4770 with 4 cores and 8 threads :slight_smile:

You have two issues: speed, and the segmentation fault (which you did not mention before). I would try to deal with the latter first. If you post a MWE, you may get help here.

Thank you Tamas !

This is a working example. I did not managed to attach files, only image files are authorized :frowning:
to see the segmentation fault, try this example before and after typing

export JULIA_NUM_THREADS=2

in the terminal :slight_smile:

using DataFrames

# read dataframe
function readTable(file, sep, h)
    println("Reading...\t", file)
    x = readtable(file , separator = sep, header = h) # read data file
    return x
end

function main()
    sep = '\t'       # table separator
    h = true         # table header
  
    # process data
    f = ["data.csv", "data2.csv"]
    
    Threads.@threads for file in f
        tab = readTable(file, sep, h)
    end
end

##########################################

tic()
main()
toc()

data.csv (tab separator)
id s1 s2 s3 s4
g1 0.1349779443 0.2319120248 0.4795815343 0.1349779443
g2 0.9721584522 0.4378209082 0.8485481786
g3 0.1529253099 0.8485481786 0.1529253099
g4 0.8138636984 0.9721584522 0.9174289651 0.8138636984

data2.csv
id s1 s2 s3 s4
g1 0.2235082715 0.726445808 0.3964289063 0.2169791684
g2 0.6151192371 0.7863019568 0.6236194363
g3 0.9810212048 0.2967554158 0.5556356032
g4 0.0347811024 0.5602313542 0.1317892775 0.4228049423

FWIW, faster CSV readers than readtable are available in packages CSV, CSVFiles and TextParse.

I’m not sure what readtable is calling, but I found that using CSV.read and specifying the columns types was the fastest to read CSVs. It seems readtable also as an option to specify types.

FWIW, I cannot reproduce the segfault.

Hi Jonathan !

I compared DataFrames and CSV speed (with pkg updated to last version) and I just observed the opposite :slight_smile:

$ julia DataThread.jl

Reading… data.csv
Reading… data2.csv
elapsed time: 1.534718347 seconds

$ julia DataThreadCSV.jl
WARNING: Method definition ==(Base.Nullable{S}, Base.Nullable{T}) in module Base at nullable.jl:238 overwritten in module NullableArrays at /home/fred/.julia/v0.6/NullableArrays/src/operators.jl:99.
Reading… data.csv
Reading… data2.csv
elapsed time: 3.875885983 seconds

# DataThreadCSV.jl
using CSV

##########################################
# read dataframe
function readTable(file, sep, h)
    println("Reading...\t", file)
    x = CSV.read(file ; delim = sep, types=Dict(1=>String), header = h, null="NA") # read data file
    return x
end

function main()
    sep = '\t'       # table separator
    h = true         # table header
  
    # process data
    f = ["data.csv", "data2.csv"]
    
    Threads.@threads for file in f
        tab = readTable(file, sep, h)
    end
end

##########################################

tic()
main()
toc()

# DataThread.jl
using DataFrames

# read dataframe
function readTable(file, sep, h)
    println("Reading...\t", file)
    x = readtable(file , separator = sep, header = h) # read data file
    return x
end

function main()
    sep = '\t'       # table separator
    h = true         # table header
  
    # process data
    f = ["data.csv", "data2.csv"]
    
    Threads.@threads for file in f
        tab = readTable(file, sep, h)
    end
end

##########################################

tic()
main()
toc()

versioninfo()
Julia Version 0.6.0
Commit 9036443 (2017-06-19 13:05 UTC)
Platform Info:
  OS: Linux (x86_64-pc-linux-gnu)
  CPU: Intel(R) Core(TM) i7-4770 CPU @ 3.40GHz
  WORD_SIZE: 64
  BLAS: libopenblas (USE64BITINT DYNAMIC_ARCH NO_AFFINITY Haswell)
  LAPACK: libopenblas64_
  LIBM: libopenlibm
  LLVM: libLLVM-3.9.1 (ORCJIT, haswell)

Julia’s multithreading is experimental. There are Base routines that are not thread-safe and IO is AFAIU not safe, e.g.

julia> Threads.@threads for i in 1:10
           println(i)
       end
1
2
signal (11): Segmentation fault

Hi Tamas,

I also have the segfault with CSV package


 $ julia DataThreadCSV.jl 
WARNING: Method definition ==(Base.Nullable{S}, Base.Nullable{T}) in module Base at nullable.jl:238 overwritten in module NullableArrays at /home/fred/.julia/v0.6/NullableArrays/src/operators.jl:99.
Reading...	data.csv
Reading...	data2.csv
elapsed time: 3.938964452 seconds

$ export JULIA_NUM_THREADS=4

$ julia DataThreadCSV.jl 
WARNING: Method definition ==(Base.Nullable{S}, Base.Nullable{T}) in module Base at nullable.jl:238 overwritten in module NullableArrays at /home/fred/.julia/v0.6/NullableArrays/src/operators.jl:99.
Reading...	Reading...	data2.csv
Error thrown in threaded loop on thread 1: UndefRefError()
Error thrown in threaded loop on thread 0: ErrorException("unlock count must match lock count")elapsed time: ERROR: fatal: error thrown and no exception handler available.
UndefRefError()
rec_backtrace at /home/centos/buildbot/slave/package_tarball64/build/src/stackwalk.c:84
record_backtrace at /home/centos/buildbot/slave/package_tarball64/build/src/task.c:245
jl_throw at /home/centos/buildbot/slave/package_tarball64/build/src/task.c:564
shift! at ./array.jl:788
wait at ./event.jl:223
uv_write at ./stream.jl:811
unsafe_write at ./stream.jl:832
print at ./strings/io.jl:122 [inlined]
#with_output_color#509 at ./util.jl:404
jl_call_fptr_internal at /home/centos/buildbot/slave/package_tarball64/build/src/julia_internal.h:339 [inlined]
jl_call_method_internal at /home/centos/buildbot/slave/package_tarball64/build/src/julia_internal.h:358 [inlined]
jl_apply_generic at /home/centos/buildbot/slave/package_tarball64/build/src/gf.c:1933
jl_apply at /home/centos/buildbot/slave/package_tarball64/build/src/julia.h:1424 [inlined]
jl_f__apply at /home/centos/buildbot/slave/package_tarball64/build/src/builtins.c:426
#with_output_color at ./<missing>:0
jl_call_fptr_internal at /home/centos/buildbot/slave/package_tarball64/build/src/julia_internal.h:339 [inlined]
jl_call_method_internal at /home/centos/buildbot/slave/package_tarball64/build/src/julia_internal.h:358 [inlined]
jl_apply_generic at /home/centos/buildbot/slave/package_tarball64/build/src/gf.c:1933
jl_apply at /home/centos/buildbot/slave/package_tarball64/build/src/julia.h:1424 [inlined]
jl_f__apply at /home/centos/buildbot/slave/package_tarball64/build/src/builtins.c:426
#print_with_color#510 at ./util.jl:417
jl_call_fptr_internal at /home/centos/buildbot/slave/package_tarball64/build/src/julia_internal.h:339 [inlined]
jl_call_method_internal at /home/centos/buildbot/slave/package_tarball64/build/src/julia_internal.h:358 [inlined]
jl_apply_generic at /home/centos/buildbot/slave/package_tarball64/build/src/gf.c:1933
jl_apply at /home/centos/buildbot/slave/package_tarball64/build/src/julia.h:1424 [inlined]
jl_f__apply at /home/centos/buildbot/slave/package_tarball64/build/src/builtins.c:426
#print_with_color at ./<missing>:0
jl_call_fptr_internal at /home/centos/buildbot/slave/package_tarball64/build/src/julia_internal.h:339 [inlined]
jl_call_method_internal at /home/centos/buildbot/slave/package_tarball64/build/src/julia_internal.h:358 [inlined]
jl_apply_generic at /home/centos/buildbot/slave/package_tarball64/build/src/gf.c:1933
display_error at ./client.jl:131
unknown function (ip: 0x7fb5a806cb2d)
jl_call_fptr_internal at /home/centos/buildbot/slave/package_tarball64/build/src/julia_internal.h:339 [inlined]
jl_call_method_internal at /home/centos/buildbot/slave/package_tarball64/build/src/julia_internal.h:358 [inlined]
jl_apply_generic at /home/centos/buildbot/slave/package_tarball64/build/src/gf.c:1933
display_error at ./client.jl:140
unknown function (ip: 0x7fb5a806c596)
jl_call_fptr_internal at /home/centos/buildbot/slave/package_tarball64/build/src/julia_internal.h:339 [inlined]
jl_call_method_internal at /home/centos/buildbot/slave/package_tarball64/build/src/julia_internal.h:358 [inlined]
jl_apply_generic at /home/centos/buildbot/slave/package_tarball64/build/src/gf.c:1933
do_call at /home/centos/buildbot/slave/package_tarball64/build/src/interpreter.c:75
eval at /home/centos/buildbot/slave/package_tarball64/build/src/interpreter.c:242
eval_body at /home/centos/buildbot/slave/package_tarball64/build/src/interpreter.c:539
jl_toplevel_eval_body at /home/centos/buildbot/slave/package_tarball64/build/src/interpreter.c:511
jl_toplevel_eval_flex at /home/centos/buildbot/slave/package_tarball64/build/src/toplevel.c:571
jl_toplevel_eval_in at /home/centos/buildbot/slave/package_tarball64/build/src/builtins.c:496
eval at ./boot.jl:235
unknown function (ip: 0x7fb5c1b7c39f)
jl_call_fptr_internal at /home/centos/buildbot/slave/package_tarball64/build/src/julia_internal.h:339 [inlined]
jl_call_method_internal at /home/centos/buildbot/slave/package_tarball64/build/src/julia_internal.h:358 [inlined]
jl_apply_generic at /home/centos/buildbot/slave/package_tarball64/build/src/gf.c:1933
_start at ./client.jl:417
unknown function (ip: 0x7fb5c1bbe6e8)
jl_call_fptr_internal at /home/centos/buildbot/slave/package_tarball64/build/src/julia_internal.h:339 [inlined]
jl_call_method_internal at /home/centos/buildbot/slave/package_tarball64/build/src/julia_internal.h:358 [inlined]
jl_apply_generic at /home/centos/buildbot/slave/package_tarball64/build/src/gf.c:1933
unknown function (ip: 0x401e41)
unknown function (ip: 0x401662)
__libc_start_main at /build/glibc-bfm8X4/glibc-2.23/csu/../csu/libc-start.c:291
unknown function (ip: 0x40170c)

IO is not yet threadsafe. However, if all the read files can fit into memory, you can read files concurrently using @async (in a @sync block) and then process them concurrently on multiple threads.

You are right Amit !

I just have a little problem to use “readtable” on each thread with this strategy

$ julia DataAsync.jl
From worker 2: Reading… data.csv
From worker 3: Reading… data2.csv
ERROR: LoadError: On worker 2:
UndefVarError: readtable not defined

using DataFrames
addprocs(2)

# read dataframe
@everywhere function readTable(file, sep, h)
    println("Reading...\t", file)
    x = readtable(file , separator = sep, header = h) # read data file
    return x
end

function main()
    sep = '\t'       # table separator
    h = true         # table header
    func_future = Array{Future, 1}()
    # process data
    f = ["data.csv", "data2.csv"]
    
    for file in f
        func_fut = @spawn readTable(file, sep, h)
        push!(func_future, func_fut)
    end
    fetch(func_future[1])
    fetch(func_future[2])
end

##########################################

tic()
main()
toc()

If you want to distribute over separate worker processes, do addprocs before a @everywhere using DataFrames.

I was talking about using threads in a single process only for the computation part, with IO being done concurrently using @async (not @spawn)

data=[]
@sync for f in list_of_files
   @async push!(data, readTable(f, sep, h))
end

Threads.@threads for d in data
   process_read_file(d)
end

Thank you very much Amit !

What I planed to do was a little more complex : if I have 50 tables but only 4 fit into memory, my idea was to use 4 simultaneous threads to read and to process the tables 4 by 4 :slight_smile:

You can try both these approaches and see which fits your use case better.

  1. Distributed, multi-process - use addprocs(N) and pmap

pseudocode:

addprocs()
@everywhere begin
   using DataFrames
   function process_file(fname)
   .......
   end
end

results = pmap(process_file, list_of_files)
  1. Multi-threaded, single process. Process in sets of N. Psuedocode would be something like this:
const N = 4
const data_chnl = Channel{Any}(N)
@schedule begin
  @sync for f in list_of_files
    @async put!(data_chnl, readTable(f, sep, h))
  end
  close(data_chnl)
end

data=[]
for d in data_chnl
  push!(data, d)
  if length(data) == N
    Threads.@threads for d2 in data
       process_read_file(d2)
    end
    empty!(data)
  end 
end

if length(data) > 0
 Threads.@threads for d2 in data
   process_read_file(d2)
 end
 empty!(data)
end 
2 Likes

Thank you very much Amit. I will study your suggestions with great interest ! :wink: