Parallelizing function returns no output

I am trying to build a function to run in parallel. The function returns no output when run in parallel but works fine without parallel run. I am sure being new to Julia I am doing something wrong here. The function also throws an error while assigning num_round variable (num_round=modelparam[:num_round]) near the end of code. Any help in fixing the code is very much appreciated.

using Printf,Statistics,DecisionTree,StatsBase
using ScikitLearn.CrossValidation: KFold
Random.seed!(123);
using MLBase,XGBoost
using ScikitLearn: @sk_import
@sk_import metrics: (recall_score,roc_auc_score,accuracy_score,classification_report)
using MacroTools

function prockey(key)
    if @capture(key, (a_ : b_) | (a_ => b_) | (a_ = b_))
        return :($(string(a))=>$b)
    end
    error("Invalid json key syntax $key")
end
 function procmap(d)
  if @capture(d, f_(xs__))
      return :($f($(map(procmap, xs)...)))
  elseif !@capture(d, {xs__})
      return d
  else
      return :($(Dict{String, Any})($(map(prockey, xs)...)))
  end
end

macro json(ex)
    esc(MacroTools.prewalk(procmap, ex))
end

@everywhere using XGBoost,MLBase,StatsBase
@everywhere begin
    function train_model(train_x,train_y,param_grid;n_iters=1,n_splits=3) 
    
    modelscore::Float64 = 0.0
    score=0.0
    cv_score=Array{Float64,1}(undef,n_splits)
    modelparam=Dict{Symbol,Any}()
    sampler(a) = StatsBase.sample(a)
    xgparam=Dict{Symbol,Any}()
    
    n=size(train_y,1)
    k_fold = KFold(n, n_folds=n_splits)
    @sync @distributed for i in 1:n_iters
      
        for key in keys(param_grid)
            xgparam[key]= sampler(param_grid[key])
        end
        
        num_round= xgparam[:num_round]
        j=1
        
       @sync @distributed for (train_index, test_index) in k_fold
            
            X_train, X_test = train_x[train_index,:], train_x[test_index,:];
            y_train, y_test = train_y[train_index], train_y[test_index];
            model=xgboost(X_train,num_round,label=y_train,param=xgparam,silent=1,objective="binary:logistic")
            y_pred=Array{Int64,1}(map(val->round(val),XGBoost.predict(model,X_test)))
#             cv_score[j]=recall_score(y_test,y_pred)
            cv_score[j]=roc_auc_score(y_test,y_pred)
            j=j+1
        end
        
        score = StatsBase.mean(cv_score)
        if score> modelscore
            modelscore=score
            modelparam=xgparam
        end

    end
    
    #Fit the model: 
    num_round=10 #modelparam[:num_round]
    model=xgboost(train_x,num_round,label=train_y,param=modelparam,silent=1,objective="binary:logistic")
    return (model, modelscore,modelparam)
 end
end
param_grid=Dict{Symbol,Any}(:colsample_bytree=>(0.7:0.0001:1.0),
                                :eta=> (0.02:0.0002:0.2),
                                :max_depth=> (2:2:30),  
                                :subsample=> (0.7:0.001:1.0),
                                :num_round=>(10:5:100));

model,best_score,best_param=train_model(X,y,param_grid,n_iters=10,n_splits=5); #compile the function

# model,best_score,best_param=train_model(X,y,param_grid,n_iters=50,n_splits=10);

@printf("Best Score : %.4f",best_score)
@json(best_param)

output:
Best Score : 0.0000
Dict{Symbol,Any} with 0 entries

Itā€™s been a while since I wrote parallel code (none so far on 0.7/1.0), but it feels like maybe cv_score should be a SharedArray? I canā€™t try it out because I canā€™t get XGBoost to install on my system, getting a dependency errorā€¦

This link can help you to install XGBoost.

Cool thanks, got it to work - still not getting anywhere on your code though, as it isnā€™t a MWE.

Iā€™ve tried just using random X and y but end up with a Python error (continuous format is not supported in the roc_auc_score function.

Except for x and y which are arrays of type float64 and int64, I have included everything.

You can also use recall score to test.

Okay Iā€™ve had a go at throwing something together that works (in the sense that for the random X and y inputs I get the same result in serial (removing @sync @distributed and parallel)

The idea is to keep it simple by only parallelising the inner loop, and writing results to a SharedArray - I havenā€™t benchmarked this or checked whether the results make sense, but itā€™s a strategy Iā€™ve successfully used in other situations before, and have found to scale quite well.

Code below:

# Imports
using Printf, Statistics, DecisionTree, StatsBase, Random, MLBase, XGBoost
using Random, Distributed, MacroTools, SharedArrays
using ScikitLearn.CrossValidation: KFold
Random.seed!(123)

# Parallel imports 
addprocs(3)
@everywhere using ScikitLearn: @sk_import
@everywhere @sk_import metrics: (recall_score,roc_auc_score,accuracy_score,classification_report)
@everywhere using XGBoost,MLBase,StatsBase, PyCall

# Define function
@everywhere function train_model(train_x,train_y,param_grid;n_iters=1,n_splits=3)
    train_x = X; train_y = y
    modelscore = 0.0; score_now = 0.0
    modelparam = Dict{Symbol,Any}(); xgparam = Dict{Symbol,Any}()

    # cv_score needs to be a SharedArray for each process to write results to
    cv_score = SharedArray{Float64,1}(n_splits)

    k_fold = KFold(size(train_y, 1), n_folds=n_splits)

    for _ āˆˆ 1:n_iters  
        for key in keys(param_grid)
            xgparam[key] = StatsBase.sample(param_grid[key])
        end

        num_round = xgparam[:num_round]

        @sync @distributed for i āˆˆ 1:n_splits
            train_index, test_index = k_fold[i]
            X_train, X_test = train_x[train_index,:], train_x[test_index,:];
            y_train, y_test = train_y[train_index], train_y[test_index];
            model = xgboost(X_train, num_round, label = y_train, param = xgparam,
                            silent=1,objective="binary:logistic")
            y_pred = Array{Int64,1}(map(val -> round(val),
                                    XGBoost.predict(model, X_test)))

            cv_score[i] = roc_auc_score(y_test,y_pred)
        end

        score_now = StatsBase.mean(cv_score)
        if score_now > modelscore
            println("Success")
            modelscore, modelparam = score_now, xgparam
        end
    end

    #Fit the model:
    num_round=10
    model = xgboost(train_x, num_round, label = train_y, param = modelparam,
                    silent=1, objective="binary:logistic")

    return model, modelscore, modelparam
end

param_grid = Dict{Symbol,Any}(:colsample_bytree => (0.7:0.0001:1.0),
                              :eta =>  (0.02:0.0002:0.2),
                              :max_depth =>  (2:2:30),
                              :subsample =>  (0.7:0.001:1.0),
                              :num_round => (10:5:100));

X = rand(1000, 5)
y = rand(0:1, 1000) # XGBoost complained that labels need to be between 0 and 1 so used this

model, best_score, best_param = train_model(X, y,
    param_grid, n_iters=10, n_splits=5)
2 Likes

@nilshg. Many Thanks for your help. It works fine now.

However, I had to do one correction to the code. I has to move ā€œxgparamā€ var to inside the for loop as the first line of code since ā€œmodelparamā€ var was not picking up the best overall parameters. When I printed the output of both xgparam and modelparam, I found that xgparam and modelparam values were not consistent.

It was my first attempt to write a function to run in parallel in Julia and your help was great. My next attempt will be to make it more efficient, where possible.