Julia vs Python's Dask: Known speed comparisons?

Hi all,

I’m new to the Julia community and have arrived in hopes of improving the speed of parallelization tasks I’ve been running with Dask (distributed) in Python. I know there are some Julia packages that are inspired by Dask (e.g., Dagger.jl), but my main question is whether anyone is aware of direct comparisons between the speed of Dask and similar implementations in Julia?

Thanks,
Josh

1 Like

https://diskframe.com/articles/vs-dask-juliadb.html

2 Likes

If you mean the speed of parsing a DAG and generating tasks graphs, that is mostly negligible (compared to the time the tasks take themselves). You can check Dispatcher.jl as well.

1 Like

Dagger works, but its performance is still not optimal in certain common cases. If your operations can safely be run in true parallel with multithreading, then you can use the Dagger master branch and instruct the scheduler to multithread execution of DAG tasks on each worker process. I haven’t benchmarked the performance gains from this yet, but if you have an MWE that’s close to what you’re working on, I could probably use it to tune Dagger’s scheduler to try to beat Dask (assuming Dagger isn’t already faster, which it probably isn’t right now).

5 Likes

Here’s a general idea of what I’m doing (though not a working example). In my specific case, I’m working with a 12x2000x2000 array (~70mb), and constructing a 750x2000x2000 array by looping over the second and third dimensions (embarrassingly parallel in that results at a given index are not dependent on any other index). Using 32 cores, the Dask implementation looks like:


import numpy as np
import dask
from dask.distributed import LocalCluster, Client

def loop_func(my_array):
        # Get the shape
        nb, ni, nj = my_array.shape

        new_array = np.full((750, ni, nj), np.nan, dtype=np.float32)

        for i in range(ni):
            for j in range(nj):
                if (not (np.isnan(my_array[:, i, j]).any())):
                    mean_params = my_array[:4, i, j]
                    cov_mat = my_array[4:, i, j].reshape((3,3))
                    new_array[:, i, j] = do_stuff(mean_params, cov_mat)

        return new_array
        
## dask
# setup dask cluster 

def call_dask():
    parm_sims = dask.array.map_blocks(sim_parms_uq, param_array, num_pars, num_uq_its, scale_parm_index,
                                      chunks=(-1, chunk_size, chunk_size), dtype=np.float32)
    parm_sims.compute()
    return
    
print(timeit.Timer(call_dask).repeat(3, 5))

# shutdown dask cluster

I’ve also come up with the following set of modules in Julia which are called through Python using PyJulia and process the same array that is passed to Dask:


module distributed_utils
export loop_func!

using DistributedArrays

function loop_func!(new_array, A)
    A_loc = localpart(A)
    sim_dat = fill(NaN, size(localpart(new_array)))
    
    for j = 1:size(A_loc, 3), i = 1:size(A_loc, 2)
        if !any(isnan, A_loc[:, i, j])
            mean_params = A_loc[1:3, i, j]
            cov_mat = transpose(reshape(A_loc[4:end, i, j], 3, 3))
            cov_mat = convert(Array, cur_cov_mat)
            if isposdef(cov_mat)
                sim_dat[:, i, j] = do_stuff(mean_params, cov_mat)
            end
        end
    end
    new_array[:L] = sim_dat
end;

end; #module

module my_module
export python_wrapper

using Distributed
CPU_CORES = length(Sys.cpu_info())
addprocs(CPU_CORES - 1)

module_dir = "/home/jovyan/workdir/lib"
@everywhere push!(LOAD_PATH, $module_dir)
@everywhere using distributed_utils: loop_func!
@everywhere using DistributedArrays

function f_distributed!(new_array, my_array)
    @sync begin
        for p in procs(new_array)
            @async remotecall_wait(loop_func!, p, new_array, my_array)
        end
    end
end;

function python_wrapper(dataset)
    ## NOTE: these arrays are "chunked" automatically (could be done manually though)
    my_array = distribute(dataset)
    new_array = dfill(NaN, 750, size(dataset, 2), size(dataset, 3))

    f_distributed!(new_array, my_array)
    
    new_array = convert(Array, new_array)
    return new_array
end;

end; # module


Then in Python


import julia
jl = julia.Julia(compiled_modules=False)
jl.include('./lib/my_module.jl')
python_wrapper = julia.Main.my_module.python_wrapper

def call_julia():
    python_wrapper(my_array)

## call once to compile
call_julia()
    
print(timeit.Timer(call_julia).repeat(3, 5))

Interestingly, the overall process is a bit slower in Julia. The results from Python’s timeit module are

  • Dask: 26 seconds (average)
  • Julia: 32 seconds (average)

I was surprised by these results and have to wonder if I should be doing something different in the Julia implementation, or if Dask might really be faster in this case? One thought I have is that the Julia implementation has a check for positive definiteness in the double for-loop and the Python version does not. However, the matrices being checked are small (3x3) so I’m not sure this would be significant.

Out of curiosity, have you benchmarked this problem with single-threaded Julia, or tried multi-threading rather than using DistributedArrays? Since you are on a single 32 core machine (per import LocalCluster), multi-threading might be faster.

1 Like

Dask’s distributed scheduler is highly optimized and stress tested, so it could just be that Julia’s needs a bit more work.

Using views or, even better, StaticArrays would likely speed up the code.
If going the StaticArrays route, be sure not to allocate any Array objects through slicing, etc.
I’d also recommend double checking Julia’s performance tips. Eg, checking for type instabilities. A heuristic I often use is that if a serial version of the code is low on allocations, it’s probably pretty okay and not doing anything unintentionally silly.

3 Likes

It would be nice if Rocklin would move to the julia side :wink:

Indeed it would be

I have not tried multi-threading, but will give it a shot. Thanks for the suggestion!

While I am on a single machine for now, this is mainly a proof of concept. If I were to scale this to multiple machines, would you still suggest multi-threading?

Using views or, even better, StaticArray s would likely speed up the code.

Thanks for pointing out the StaticArrays package, I hadn’t come across it until now. I’ll check it out!

It might be useful on a per-node basis, although I don’t know how it composes with operations on DistributedArrays.

The reason I asked about single-threaded benchmarks is that I am curious if you have a baseline for the performance of the python vs julia implementations.

1 Like

I’m not sure I understand your question, do you mean benchmarks for serial implementations? For the results above, I was using a single thread per worker in the Dask implementation, and as far as I know Julia would’ve done the same.