Hi Guys,
Right now I am running a model optimization to optimize one set of parameter for several sites (in total 47 sites). Site computation is independent with each other. I am using CMAES to minimise the cost. For now, I saw two places I can do the parallelisation, but I am not sure how to do it…
The first one is to parallelise the site running, this is already done, we are using threads
to do it. But for the second one, CMAES cost, I don’t know how to do it. Here is our code:
function optimizeTEM(forcing::NamedTuple, observations, info::NamedTuple)
opti_helpers = prepOpti(forcing, observations, info, info.optimization.run_options.cost_method)
optim_para = optimizer(opti_helpers.cost_function, opti_helpers.default_values, opti_helpers.lower_bounds, opti_helpers.upper_bounds, info.optimization.optimizer.options, info.optimization.optimizer.method)
optim_para = backScaleParameters(optim_para, opti_helpers.parameter_table, info.optimization.run_options.parameter_scaling)
# update the parameter table with the optimized values
opti_helpers.parameter_table.optimized .= optim_para
return opti_helpers.parameter_table
end
function prepOpti(forcing, observations, info, cost_method::CostModelObs)
run_helpers = prepTEM(forcing, info)
parameter_helpers = prepParameters(info.optimization.parameter_table, info.optimization.run_options.parameter_scaling)
parameter_table = parameter_helpers.parameter_table
default_values = parameter_helpers.default_values
lower_bounds = parameter_helpers.lower_bounds
upper_bounds = parameter_helpers.upper_bounds
cost_options = prepCostOptions(observations, info.optimization.cost_options, cost_method)
cost_function = x -> cost(x, default_values, info.models.forward, run_helpers.space_forcing, run_helpers.space_spinup_forcing, run_helpers.loc_forcing_t, run_helpers.output_array, run_helpers.space_output, deepcopy(run_helpers.space_land), run_helpers.tem_info, observations, parameter_table, cost_options, info.optimization.run_options.multi_constraint_method, info.optimization.run_options.parameter_scaling, cost_method)
opti_helpers = (; parameter_table=parameter_table, cost_function=cost_function, cost_options=cost_options, default_values=default_values, lower_bounds=lower_bounds, upper_bounds=upper_bounds, run_helpers=run_helpers)
return opti_helpers
end
function cost(parameter_vector, _, selected_models, space_forcing, space_spinup_forcing, loc_forcing_t, output_array, space_output, space_land, tem_info, observations, parameter_updater, cost_options, multi_constraint_method, parameter_scaling_type, ::CostModelObs)
@debug parameter_vector
updated_models = updateModels(parameter_vector, parameter_updater, parameter_scaling_type, selected_models)
runTEM!(updated_models, space_forcing, space_spinup_forcing, loc_forcing_t, space_output, space_land, tem_info)
cost_vector = metricVector(output_array, observations, cost_options)
cost_metric = combineMetric(cost_vector, multi_constraint_method)
@debug cost_vector, cost_metric
return cost_metric
end
function runTEM!(selected_models, space_forcing, space_spinup_forcing, loc_forcing_t, space_output, space_land, tem_info::NamedTuple)
parallelizeTEM!(selected_models, space_forcing, space_spinup_forcing, loc_forcing_t, space_output, space_land, tem_info, tem_info.run.parallelization)
return nothing
end
function parallelizeTEM!(selected_models, space_forcing, space_spinup_forcing, loc_forcing_t, space_output, space_land, tem_info, ::ThreadsParallelization)
Threads.@threads for space_index ∈ eachindex(space_forcing)
coreTEM!(selected_models, space_forcing[space_index], space_spinup_forcing[space_index], loc_forcing_t, space_output[space_index], space_land[space_index], tem_info)
end
return nothing
end
If I only run 1 site, i.e. no parallelisation for forward running, I can do the parallelisation by following codes:
function getCostVectorSize(algo_options, parameter_vector, ::CMAEvolutionStrategyCMAES)
cost_vector_size = Threads.nthreads()
if hasproperty(algo_options, :multi_threading)
if algo_options.multi_threading
if hasproperty(algo_options, :popsize)
cost_vector_size = algo_options.popsize
else
cost_vector_size = 4 + floor(Int, 3 * log(length(parameter_vector)))
end
end
end
return cost_vector_size
end
function prepOpti(forcing, observations, info, ::CostModelObsMT; algorithm_info_field=:optimizer)
algorithm_info = getproperty(info.optimization, algorithm_info_field)
opti_helpers = prepOpti(forcing, observations, info, CostModelObs())
run_helpers = opti_helpers.run_helpers
cost_vector_size = getCostVectorSize(getproperty(algorithm_info, :options), opti_helpers.default_values, getproperty(algorithm_info, :method))
cost_vector = Vector{eltype(opti_helpers.default_values)}(undef, cost_vector_size)
space_index = 1 # the parallelization of cost computation only runs in single pixel runs
cost_function = x -> cost(x, opti_helpers.default_values, info.models.forward, run_helpers.space_forcing[space_index], run_helpers.space_spinup_forcing[space_index], run_helpers.loc_forcing_t, run_helpers.output_array, run_helpers.space_output_mt, deepcopy(run_helpers.space_land[space_index]), run_helpers.tem_info, observations, opti_helpers.parameter_table, opti_helpers.cost_options, info.optimization.run_options.multi_constraint_method, info.optimization.run_options.parameter_scaling, cost_vector, info.optimization.run_options.cost_method)
opti_helpers = (; opti_helpers..., cost_function=cost_function, cost_vector=cost_vector)
return opti_helpers
end
function cost(parameter_matrix, _, selected_models, space_forcing, space_spinup_forcing, loc_forcing_t, output_array, space_output, space_land, tem_info, observations, parameter_updater, cost_options, multi_constraint_method, parameter_scaling_type, cost_out::Vector, ::CostModelObsMT)
@info size(parameter_matrix)
parameter_set_size = size(parameter_matrix, 2)
done_params=1
Threads.@threads for parameter_index in eachindex(1:parameter_set_size)
idx = Threads.threadid()
parameter_vector = parameter_matrix[:, parameter_index]
@debug parameter_vector
updated_models = updateModels(parameter_vector, parameter_updater, parameter_scaling_type, selected_models)
coreTEM!(updated_models, space_forcing, space_spinup_forcing, loc_forcing_t, space_output[idx], space_land, tem_info)
cost_vector = metricVector(space_output[idx], observations, cost_options)
cost_metric = combineMetric(cost_vector, multi_constraint_method)
cost_out[parameter_index] = cost_metric
@info idx, round(100 * done_params/parameter_set_size,digits=2), parameter_set_size, cost_metric
@debug idx, round(100 * done_params/parameter_set_size,digits=2), parameter_set_size, cost_metric, cost_vector
done_params += 1
end
return cost_out
end
You could see cost(..., ::CostModelObsMT, ...)
arrange and parallelise all CMAES cost evaluations (in total 4 + floor(Int, 3 * log(length(parameter_vector)))
evaluations) into different threads (if there is NO OTHER parallization on the coreTEM! or runTEM!)…
so how can I do it? Thanks!