Here’s a general idea of what I’m doing (though not a working example). In my specific case, I’m working with a 12x2000x2000 array (~70mb), and constructing a 750x2000x2000 array by looping over the second and third dimensions (embarrassingly parallel in that results at a given index are not dependent on any other index). Using 32 cores, the Dask implementation looks like:
import numpy as np
import dask
from dask.distributed import LocalCluster, Client
def loop_func(my_array):
# Get the shape
nb, ni, nj = my_array.shape
new_array = np.full((750, ni, nj), np.nan, dtype=np.float32)
for i in range(ni):
for j in range(nj):
if (not (np.isnan(my_array[:, i, j]).any())):
mean_params = my_array[:4, i, j]
cov_mat = my_array[4:, i, j].reshape((3,3))
new_array[:, i, j] = do_stuff(mean_params, cov_mat)
return new_array
## dask
# setup dask cluster
def call_dask():
parm_sims = dask.array.map_blocks(sim_parms_uq, param_array, num_pars, num_uq_its, scale_parm_index,
chunks=(-1, chunk_size, chunk_size), dtype=np.float32)
parm_sims.compute()
return
print(timeit.Timer(call_dask).repeat(3, 5))
# shutdown dask cluster
I’ve also come up with the following set of modules in Julia which are called through Python using PyJulia and process the same array that is passed to Dask:
module distributed_utils
export loop_func!
using DistributedArrays
function loop_func!(new_array, A)
A_loc = localpart(A)
sim_dat = fill(NaN, size(localpart(new_array)))
for j = 1:size(A_loc, 3), i = 1:size(A_loc, 2)
if !any(isnan, A_loc[:, i, j])
mean_params = A_loc[1:3, i, j]
cov_mat = transpose(reshape(A_loc[4:end, i, j], 3, 3))
cov_mat = convert(Array, cur_cov_mat)
if isposdef(cov_mat)
sim_dat[:, i, j] = do_stuff(mean_params, cov_mat)
end
end
end
new_array[:L] = sim_dat
end;
end; #module
module my_module
export python_wrapper
using Distributed
CPU_CORES = length(Sys.cpu_info())
addprocs(CPU_CORES - 1)
module_dir = "/home/jovyan/workdir/lib"
@everywhere push!(LOAD_PATH, $module_dir)
@everywhere using distributed_utils: loop_func!
@everywhere using DistributedArrays
function f_distributed!(new_array, my_array)
@sync begin
for p in procs(new_array)
@async remotecall_wait(loop_func!, p, new_array, my_array)
end
end
end;
function python_wrapper(dataset)
## NOTE: these arrays are "chunked" automatically (could be done manually though)
my_array = distribute(dataset)
new_array = dfill(NaN, 750, size(dataset, 2), size(dataset, 3))
f_distributed!(new_array, my_array)
new_array = convert(Array, new_array)
return new_array
end;
end; # module
Then in Python
import julia
jl = julia.Julia(compiled_modules=False)
jl.include('./lib/my_module.jl')
python_wrapper = julia.Main.my_module.python_wrapper
def call_julia():
python_wrapper(my_array)
## call once to compile
call_julia()
print(timeit.Timer(call_julia).repeat(3, 5))
Interestingly, the overall process is a bit slower in Julia. The results from Python’s timeit
module are
- Dask: 26 seconds (average)
- Julia: 32 seconds (average)
I was surprised by these results and have to wonder if I should be doing something different in the Julia implementation, or if Dask might really be faster in this case? One thought I have is that the Julia implementation has a check for positive definiteness in the double for-loop and the Python version does not. However, the matrices being checked are small (3x3) so I’m not sure this would be significant.