How to make parallel within the parallel?

Hi Guys,

Right now I am running a model optimization to optimize one set of parameter for several sites (in total 47 sites). Site computation is independent with each other. I am using CMAES to minimise the cost. For now, I saw two places I can do the parallelisation, but I am not sure how to do it…

The first one is to parallelise the site running, this is already done, we are using threads to do it. But for the second one, CMAES cost, I don’t know how to do it. Here is our code:

function optimizeTEM(forcing::NamedTuple, observations, info::NamedTuple)
    opti_helpers = prepOpti(forcing, observations, info, info.optimization.run_options.cost_method)

    optim_para = optimizer(opti_helpers.cost_function, opti_helpers.default_values, opti_helpers.lower_bounds, opti_helpers.upper_bounds, info.optimization.optimizer.options, info.optimization.optimizer.method)

    optim_para = backScaleParameters(optim_para, opti_helpers.parameter_table, info.optimization.run_options.parameter_scaling)

    # update the parameter table with the optimized values
    opti_helpers.parameter_table.optimized .= optim_para
    return opti_helpers.parameter_table
end

function  prepOpti(forcing, observations, info, cost_method::CostModelObs)
    run_helpers = prepTEM(forcing, info)

    parameter_helpers = prepParameters(info.optimization.parameter_table, info.optimization.run_options.parameter_scaling)
    
    parameter_table = parameter_helpers.parameter_table
    default_values = parameter_helpers.default_values
    lower_bounds = parameter_helpers.lower_bounds
    upper_bounds = parameter_helpers.upper_bounds
    cost_options = prepCostOptions(observations, info.optimization.cost_options, cost_method)

    cost_function = x -> cost(x, default_values, info.models.forward, run_helpers.space_forcing, run_helpers.space_spinup_forcing, run_helpers.loc_forcing_t, run_helpers.output_array, run_helpers.space_output, deepcopy(run_helpers.space_land), run_helpers.tem_info, observations, parameter_table, cost_options, info.optimization.run_options.multi_constraint_method, info.optimization.run_options.parameter_scaling, cost_method)

    opti_helpers = (; parameter_table=parameter_table, cost_function=cost_function, cost_options=cost_options, default_values=default_values, lower_bounds=lower_bounds, upper_bounds=upper_bounds, run_helpers=run_helpers)
    return opti_helpers
end

function cost(parameter_vector, _, selected_models, space_forcing, space_spinup_forcing, loc_forcing_t, output_array, space_output, space_land, tem_info, observations, parameter_updater, cost_options, multi_constraint_method, parameter_scaling_type, ::CostModelObs)
    @debug parameter_vector
    updated_models = updateModels(parameter_vector, parameter_updater, parameter_scaling_type, selected_models)
    runTEM!(updated_models, space_forcing, space_spinup_forcing, loc_forcing_t, space_output, space_land, tem_info)
    cost_vector = metricVector(output_array, observations, cost_options)
    cost_metric = combineMetric(cost_vector, multi_constraint_method)
    @debug cost_vector, cost_metric
    return cost_metric
end

function runTEM!(selected_models, space_forcing, space_spinup_forcing, loc_forcing_t, space_output, space_land, tem_info::NamedTuple)
    parallelizeTEM!(selected_models, space_forcing, space_spinup_forcing, loc_forcing_t, space_output, space_land, tem_info, tem_info.run.parallelization)
    return nothing
end

function parallelizeTEM!(selected_models, space_forcing, space_spinup_forcing, loc_forcing_t, space_output, space_land, tem_info, ::ThreadsParallelization)
    Threads.@threads for space_index ∈ eachindex(space_forcing)
        coreTEM!(selected_models, space_forcing[space_index], space_spinup_forcing[space_index], loc_forcing_t, space_output[space_index], space_land[space_index], tem_info)
    end
    return nothing
end

If I only run 1 site, i.e. no parallelisation for forward running, I can do the parallelisation by following codes:

function getCostVectorSize(algo_options, parameter_vector, ::CMAEvolutionStrategyCMAES)
    cost_vector_size = Threads.nthreads()
    if hasproperty(algo_options, :multi_threading)
        if algo_options.multi_threading
            if hasproperty(algo_options, :popsize)
                cost_vector_size = algo_options.popsize
            else
                cost_vector_size = 4 + floor(Int, 3 * log(length(parameter_vector)))
            end
        end
    end
    return cost_vector_size
end

function  prepOpti(forcing, observations, info, ::CostModelObsMT; algorithm_info_field=:optimizer)
    algorithm_info = getproperty(info.optimization, algorithm_info_field)
    opti_helpers = prepOpti(forcing, observations, info, CostModelObs())
    run_helpers = opti_helpers.run_helpers
    cost_vector_size = getCostVectorSize(getproperty(algorithm_info, :options), opti_helpers.default_values, getproperty(algorithm_info, :method))
    cost_vector = Vector{eltype(opti_helpers.default_values)}(undef, cost_vector_size)
    
    space_index = 1 # the parallelization of cost computation only runs in single pixel runs

    cost_function = x -> cost(x, opti_helpers.default_values, info.models.forward, run_helpers.space_forcing[space_index], run_helpers.space_spinup_forcing[space_index], run_helpers.loc_forcing_t, run_helpers.output_array, run_helpers.space_output_mt, deepcopy(run_helpers.space_land[space_index]), run_helpers.tem_info, observations, opti_helpers.parameter_table, opti_helpers.cost_options, info.optimization.run_options.multi_constraint_method, info.optimization.run_options.parameter_scaling, cost_vector, info.optimization.run_options.cost_method)

    opti_helpers = (; opti_helpers..., cost_function=cost_function, cost_vector=cost_vector)
    return opti_helpers
end

function cost(parameter_matrix, _, selected_models, space_forcing, space_spinup_forcing, loc_forcing_t, output_array, space_output, space_land, tem_info, observations, parameter_updater, cost_options, multi_constraint_method, parameter_scaling_type, cost_out::Vector, ::CostModelObsMT)
    @info size(parameter_matrix)
    parameter_set_size = size(parameter_matrix, 2)
    done_params=1
    Threads.@threads for parameter_index in eachindex(1:parameter_set_size)
        idx = Threads.threadid()
        parameter_vector = parameter_matrix[:, parameter_index]
        @debug parameter_vector
        updated_models = updateModels(parameter_vector, parameter_updater, parameter_scaling_type, selected_models)
        coreTEM!(updated_models, space_forcing, space_spinup_forcing, loc_forcing_t, space_output[idx], space_land, tem_info)
        cost_vector = metricVector(space_output[idx], observations, cost_options)
        cost_metric = combineMetric(cost_vector, multi_constraint_method)
        cost_out[parameter_index] = cost_metric
        @info idx, round(100 * done_params/parameter_set_size,digits=2), parameter_set_size, cost_metric
        @debug idx, round(100 * done_params/parameter_set_size,digits=2), parameter_set_size, cost_metric, cost_vector
        done_params += 1
    end
    return cost_out
end

You could see cost(..., ::CostModelObsMT, ...) arrange and parallelise all CMAES cost evaluations (in total 4 + floor(Int, 3 * log(length(parameter_vector))) evaluations) into different threads (if there is NO OTHER parallization on the coreTEM! or runTEM!)…

so how can I do it? Thanks!