Can Julia efficiently make use of 20+ cores for transforming hundreds of millions of rows for machine learning?

I’ve seen similar questions posted but I wanted to ask my question to see if I can get more precise answers.

I train machine learning models on datasets with hundreds of millions of rows (10^8). On this order of magnitude of data, Python becomes prohibitively slow mostly because (pandas) dataframe operations do not make use of parallel computing efficiently.

I do have access to machines with multiple (20+) cores. As a result the solution I’ve gone with is to use Java’s due to its superior parallel (threading) computing compabilities. So far this solution has worked pretty well (I can transform 10^8 datasets in a reasonable amount of time), however I’ve found Java’s programming paradigm awkward for machine learning and costly in development hours.

I’m considering switching to Julia if it currently (or in the future) will have Python (pandas) like programming style but can also utilize multiple cores efficiently enough to match the speed of Java on a 10^8 dataset.

Has anyone tried using Julia with multiple cores on datasets of this scale or have a reference to relevant benchmarks? Is Julia currently capable for transforming 10^8 datasets efficiently? If not, will it be able to somewhere down the road of its development?

Edit: Ideally I would like to avoid distributed solutions because they are bothersome to nightmarish to debug. I’ve tried Spark and DASK for Python and they’re both cumbersome for my ML workflow.

2 Likes

Welcome, @Kevin_Shen!

Well, I do not train machine learning models, but I do handle some large data sets, though not as big as your problem.

I would argue that Julia is as capable as Java, maybe a tad behind on the threading model (given it is a recent API), but certainly on par on the distributed model.

In these slides I talk about handling a large dataset using the distributed model (this was before Julia v1.0 so the syntax changed a bit). Once the data is on memory, it takes a few seconds to do one simple pass on the data, and it takes more time as the operations on data get more complex.

It might take some engineering, but the main message to you is that if you were able to handle it in Java, then it is most likely that you’ll be able to handle it in Julia. Also, most likely it will take less lines of code when comparing to Java.

As with any programming language, you might need to understand how a few libs are implemented. For instance, don’t expect a plain vanilla Julia matrix (or DataFrames) to support distributed memory mapping with parallel multiplication out of the box. Most likely you’ll need to find a library for that, or write some code. In my case, as I show in the slides, I divided my dataset in chunks.

4 Likes

A curiosity: “transforming” the data here is a technical therm of ML meaning something specific?

Because in principle the answer is yes, Julia can handle large arrays and parallelism, but I am not sure if you are asking something more specific.

It is likely that others will know exactly what you mean, but if that is not very precise, it will be better to post some example.

Hi Felipe, you bring up a great point which is I want to avoid distributed solutions. Spark and Dask for Python exist but they’re very awkward to debug and iterate with in the ML workflow (so much so for me that I’d prefer Java over them). Thanks for your slides.

By “transform” I mean pandas-like transformations like for example,

df[“new_feature”] = df[“age”] * df[“height”]
df[“new_feature2”] = df[df[“new_feature” < 500]]

Specifically I’m thinking of pandas “operations a data scientist would use 98% of the time” (like ones above). As opposed to more advanced pandas features. Does this help (I’m not sure if I can enumerate all these operations)? Or is it still too vague?

p.s. df here would have 10^8 rows and 10^3 columns.

1 Like

I want to avoid distributed solutions.

I’m with you on this one! I’ve been switching my Julia code that was based on a Distributed model to use the new Threading model. It has been working great for me, and a bit faster than the Distributed model since all JIT compilation is done only once.

But sometimes the data just doesn’t fit your RAM on one computer, so you may need Distributed computation. Hopefully not your case.

1 Like

Thanks @felipenoris! Let me take a close look at your slides … your data does seem ~ 1 order of magnitude smaller but it might be close enough. Suppose RAM is not a concern (we always have enough to fit the entire dataset).

I am not a user of data frames (in the Pandas/package sense) in general, but if I can think of those as array operations you might be willing to implement in parallel, I do not see any reason for Julia being worst than other language. The difficulties certainly lie in structuring the data to make parallelism efficient, but if you have already done that in Java, probably you won’t have major difficulties.

The first operation (multiplication) takes less than 0.5 second on a single core for 10^8 rows. So I assume that you have to do hundreds of such transformations. In this case threading in-built into Julia should handle it well (the problem probably will be that with simple transformations and that many cores the whole operation will be RAM speed not CPU speed bound).

The second operation will be done fastest if you make a view of the original data frame (which can be done very fast).

3 Likes

@lmiq @bkamins I might be missing the mark here with my question since I’m a data scientist and know very little about programming languages. Let me make the question more abstract to see if it’ll help,

“Python + pandas dataframes works superbly for 10^5 datatsets but is too slow for 10^8 datasets. Can I program in Julia in a very similar fashion as I program in Python + pandas and have reasonable speed for the 10^8 dataset?”

@bkamins your answers seems to suggest yes.

I think to a data scientist I’m basically asking for a silver bullet here. Distributed solutions like Spark and Dask for Python have existed for a while but people aren’t happy with them. Data scientists want to do plain vanilla python but have it scale up to 10^8+ dataset size by just allocating more cores to computer.

If you are able to provide an example of something that takes a lot of time, even with some random data, sometimes the people here feel challenged :grinning_face_with_smiling_eyes: I have seen codes being accelerated a thousand times…

Concerning the use of something very similar to Pandas, I guess you might try to port some problem to the framework of Data frames.jl and see how it goes.

4 Likes

Great suggestion :joy: Let me put together a minimal example tomorrow…

1 Like

For custom stuff check out transducers.jl by @tkf

Hi, I’m doing analysis of a 23 billion row, 10 column data sample. I’ve only done a simple analysis so far, but I do apply a calibration transformation to one of the columns and then histogram it using one of the other columns as a mask and another as a category variable to decide which histogram to fill. I’m using a very, very big machine - the Cori supercomputer at NERSC. But the point here is that I’ve been quite successful getting this to work with Julia. 100M rows for me easily fits on one Cori node that I can then attack with 64 processes. I’ve tried multithreading on a node and that works well (see A quick introduction to data parallelism in Julia for some good hints and packages that help). Though to get the multi-node processing I need, I’m using MPI. That’s probably overkill for you using one node.

It all really depends on what you are trying to do and how complex your data manipulations are. Your data seems to be much more complex that mine (you have way fewer rows, but many more columns). I haven’t needed to use DataFrames yet, but I suspect I will soon as I do more complex analyses. The great thing I’ve found about Julia is that it gives you tools for evaluating different options and strategies. The @time macro is especially helpful, as it not only tells you the time of an operation, but how much memory was allocated in the process. I need to worry about time and memory for each strategy I try (I’m trying to fill the nodes with as much data as I can, so I need to be careful about operations that make copies). I can read in (using HDF5) and histogram that 23B row dataset using 20 nodes with 64 processes each in under two minutes total (most of that is i/o - the data manipulation and histogramming step takes 5 seconds!!). And the code is straightforward and easy to read. I’m working now to make the i/o faster.

So I think you’ll just have to try stuff and evaluate for yourself. Use views as suggested and watch for functions that make copies. DataFrames are great, though I’ve found the syntax of select/transform/combine to be a bit complex, but it’s very powerful. I’m often finding examples of operations I didn’t realize it could do. Knowing more about the core of Julia (like arrays and views) and not just DataFrames is extremely helpful. Unlike Python (where numpy/pandas really supplant the core Python data structures) and R (where, at least to me, the tidy-verse packages supplant base R), the Julia higher level packages like DataFrames enhance, rather than supersede, the core Julia structures. So the more you know about core Julia, the more options you’ll have and you’ll better understand what’s going on. The julia documentation is fantastic, so learning Julia is not difficult.

And as you see, the community is great. I’ll think you’ll get what you want out of Julia. It’ll take work though (like everything) and be sure to ask for help when you need it. That strategy has worked very well for me.

15 Likes

Not to derail the OP’s original post, but I have a quick question about your setup. How exactly do you read data across multiple nodes so that you don’t use the RAM on any single node? For example, I have access to 18 nodes each with 256gb of memory. I have a fairly large dataset to process and it would be great if I can load up 20% of the data on compute node 1, another 20% on compute node 2, and so on… (ofcourse then there is the challenge of merging together results, but right now I can’t even open my dataset since its too large for any single node).

Have a look here: Database-like ops benchmark in the groupby 5GB section (as this is relevant to your question with 10^8 rows, there are less columns, but number of columns is not relevant here). This is not a definitive benchmark, but gives you the feel of comparison between platforms and timings (actually split-apply-combine is more expensive than ungrouped operations).

In these benchmarks DataFrames.jl runs on a single thread.

4 Likes

I was just curious, and this takes 1s in my laptop:

using DataFrames, BenchmarkTools

# generate a subset of all ages*height less than 5
function getdf(df)
  df.newfeature = df["age"] .* df["height"]
  df2 = df[df.newfeature .< 5, :]
  return df2
end

# generate data
n = 10^8
df = DataFrame(age = rand(0:70,n), height = rand(n))

# multiply the height and age and generate df with subset
@btime getdf($df);

output:

julia> @btime getdf($df);
  1.103 s (33 allocations: 1.54 GiB)

2 Likes

Hi @affans, I use MPI.jl for the parallelism and my files are in the HDF5 format, so I use HDF5.jl as well. You can tell MPI the number of nodes you want to use and the number of processes (called “ranks” in MPI terminology) per node. So you can divide up the reading accordingly among the nodes keeping memory in mind. MPI is built into the batch system on Cori - not sure how to make that work on your system.

My problem now is that the # of ranks per node that is efficient for reading (about 6/node - more is bad due to i/o overhead and contention) is poor for processing (where more is better!). So I’m working on having lots of ranks per node (32 or 64) but designating a small subset for reading (like 6) and read into shared memory so the data is available to all of the ranks on the node. I’m getting close to making this work.

I hope this helps! – Adam

2 Likes

Do you need MPI only for reading? Can you do Threads for processing?
I have a toy example where I use Distributed for reading and threads for processing:
https://github.com/jstrube/Julia_in_HEP_paper/blob/main/julia/FoxWolfram_async_distributed.jl
I don’t use shared memory, but I think that could be made to work.
Hope it’s useful.

1 Like

should be (the syntax you use is not supported on DataFrames.jl 0.22 and it has been deprecated a long time)

df.newfeature = df.age .* df.height

and

df2 = df[df.newfeature .< 5, :]

would be faster as:

df2 = @view df[df.newfeature .< 5, :]
3 Likes