Parallelization of estimation loop

I have a loop that is embarrassingly parallel and in which there is no data movement. What is the best way nowadays to parallelize it in Julia v0.6?

Assume I have an iterator type:

struct MyIterator
  length::Int
end
Base.start(itr::MyIterator)       = 1
Base.next(itr::MyIterator, state) = state, state + 1
Base.done(itr::MyIterator, state) = state == itr.length + 1

The loop that I want to parallelize looks as follows:

for x in MyIterator(10)
  y = f(x)
  # save into an array of results
  push!(ys, y)
end

where f is a complicated function. I tried just adding the @parallel macro in front of the loop:

ys = @parallel for x in MyIterator(10)
  f(x)
end

and fetching the results

ys = fetch(ys)

but it throws an error saying that length(::MyIterator) is not defined. It made sense to me, to parallelize we need to know the length beforehand. I then defined the length:

Base.length(itr::MyIterator) = itr.length

and the loop run without errors. However, the result is an array of Future, and when I run fetch I get the same array. I then tried running fetch.(ys) with the extra dot and it returned a list of RemoteException, so clearly something is still missing.

I appreciate if someone can elaborate on the expected interface an iterator has to obey in order to be parallelizable. Also, if you have suggestions on how to solve this issue that is not using the @parallel macro, I am willing to learn.

I don’t think @parallel support sequential iterators, for the simple reason that it doesn’t run the loop sequentially. It expects instead something to be abstractarray-like indexable, and is usually used with a literal range (1:niter).

@jameson could you please elaborate on the expected interface? I don’t think it is documented anywhere.

I would like to either 1) have an iterator type that does the parallelization magic for me or 2) know the current construct in Julia to achieve paralleliism in this simple example (@spawn, etc.).

If you have suggestions about the former solution, it would be interesting to document the minimum iterator interface one has to implement to have @paralell working as expected.

Perhaps pmap might work for you.

In general pmap is a good choice for relatively expensive computations, which you have indicated is the case.
It has the added benefit of dynamic load balancing (useful for non-uniform calculations &/or non-uniform processors)

struct MyIterator
  length::Int
end
Base.start(itr::MyIterator) = 1
Base.next(itr::MyIterator, state) = state, state + 1
Base.done(itr::MyIterator, state) = state == itr.length + 1
Base.length(itr::MyIterator) = itr.length

addprocs()
@everywhere f(x) = x*x
ys = pmap(f, MyIterator(10))

Thank you @greg_plowman, I am considering pmap, but I am also interested in understanding all the current alternatives in Julia v0.6. We need a place on the web, docs, etc. to just discuss this parallelization loop in Julia. I don’t know the pros and cons of each approach.

Also, I am trying to understand how we as package writers are supposed to use addprocs(). I would like to have my parallel loop with pmap for instance, but if the user didn’t call addprocs(), I don’t want the extra overhead, I want to fallback to a serial loop.

Could you please elaborate on a general solution with pmap? For your info, my function f is basically solving a linear system: https://github.com/juliohm/GeoStats.jl/blob/master/src/kriging_solver.jl#L133

1 Like

How about a simple:

if nprocs() == 1
 map(...)
else
  pmap(...)
end

?
How do parallel, threads, and messing with the GPU compare?
Any links going into depth about the trade offs/considerations in Julia?

1 Like

What is the overhead in calling nprocs() inside of a loop? Can we have a compile-time solution for this switch?

one_process = nproces() == 1
for i = 1:N 
  if one_process
    map(...)
  else
    pmap(...)
  end
end

Also

julia> using BenchmarkTools

julia> @benchmark nprocs()
BenchmarkTools.Trial: 
  memory estimate:  0 bytes
  allocs estimate:  0
  --------------
  minimum time:     20.693 ns (0.00% GC)
  median time:      20.726 ns (0.00% GC)
  mean time:        21.140 ns (0.00% GC)
  maximum time:     42.319 ns (0.00% GC)
  --------------
  samples:          10000
  evals/sample:     997

julia> one_process = nprocs() == 1
true

julia> @benchmark if $one_process end
BenchmarkTools.Trial: 
  memory estimate:  0 bytes
  allocs estimate:  0
  --------------
  minimum time:     0.053 ns (0.00% GC)
  median time:      0.054 ns (0.00% GC)
  mean time:        0.054 ns (0.00% GC)
  maximum time:     0.113 ns (0.00% GC)
  --------------
  samples:          10000
  evals/sample:     1000

EDIT:
Although, I think it’d generally be better to parallelize the outer loop instead of an inner.

1 Like

@jameson how to make an iterator type indexable in the sense you explained?

Use an indexable type (e.g. an AbstractArray, range, etc.), rather than an iterator.

What is the interface that we have to implement?

How about:

mymap = nprocess() == 1 ? map : pmap
for i = 1:N 
  mymap(...)
end
2 Likes

I believe you have to implement getindex(::MyIterator,i::Int) and then iterate over index like, for threaded parallelization:

x = MyIterator(10)
Threads.@threads for i in 1:lentgh(x)
  f(x[i])
end

But then your iterator starts to look a lot like an AbstractArray…

Also, for threading parallelization, you can put in your module Threads.nthreads(Sys.CPU_CORES) to use all available cores in the machine. Adding threads doesn’t need to be done at Julia start up, as adding processes do…

If that is the case, then it is not worth it. I was wondering if Julia had a smarter way of defining parallel iterators.

Make sure you use Hwloc.jl for finding the number of cores. Sys.CPU_CORES is misleading and should be renamed to Sys.CPU_LOGICAL_CORES. If you use Sys.CPU_CORES, your performance will be a nightmare on Windows. I had to fix this in my ImageQuilting.jl package.

1 Like

Good to know, thanks! I’ve only being using Julia on Linux and Mac’s and didn’t experienced any problems.
And sorry about talking about threads, I didn’t notice you posted in the “Parallel/Distributed” section.

I had the impression that a key difference between Iterable and AbstractVector was that Iterables were always sequential, that is, in order to get, for example, the third element, you needed to pass through the first and the second first.
Of course, if that is the case, then it is indeed not possible to parallelize a Iterable…

1 Like

Makes total sense @favba, thanks for the insights. After considering your thoughts, perhaps this iterator approach is not adequate for parallelization.

I will try the pmap solution later but this topic is still open. There is not much documentation about how to parallize loops in Julia and more we talk about it, more things standardize.

Looking into type stability…

julia> x = randn(5);

julia> function f(x)
           mymap = nprocs() == 1 ? map : pmap
           y = mymap(abs2, x)
           y .* x
       end
f (generic function with 1 method)

julia> @code_warntype f(x)
Variables:
  #self#::#f
  x::Array{Float64,1}
  mymap::Union{Base.#map, Base.Distributed.#pmap}
  y::Any
  #temp#::Union{Base.#map, Base.Distributed.#pmap}

Body:
  begin 
      NewvarNode(:(mymap::Union{Base.#map, Base.Distributed.#pmap}))
      NewvarNode(:(y::Any))
      SSAValue(0) = $(Expr(:invoke, MethodInstance for nprocs(), :(Main.nprocs)))
      unless (SSAValue(0) === 1)::Bool goto 7
      #temp#::Union{Base.#map, Base.Distributed.#pmap} = Main.map
      goto 9
      7: 
      #temp#::Union{Base.#map, Base.Distributed.#pmap} = Main.pmap
      9: 
      mymap::Union{Base.#map, Base.Distributed.#pmap} = #temp#::Union{Base.#map, Base.Distributed.#pmap} # line 3:
      y::Any = (mymap::Union{Base.#map, Base.Distributed.#pmap})(Main.abs2, x::Array{Float64,1})::Any # line 4:
      return (Base.broadcast)(Main.*, y::Any, x::Array{Float64,1})::Any
  end::Any

julia> function g(x)
           one_process = nprocs() == 1
           one_process ? map(abs2, x) : pmap(abs2, x)
       end
g (generic function with 1 method)

julia> @code_warntype g(x)
Variables:
  #self#::#g
  x::Array{Float64,1}
  one_process::Bool

Body:
  begin 
      SSAValue(0) = $(Expr(:invoke, MethodInstance for nprocs(), :(Main.nprocs)))
      one_process::Bool = (SSAValue(0) === 1)::Bool # line 3:
      unless one_process::Bool goto 6
      return $(Expr(:invoke, MethodInstance for _collect(::Array{Float64,1}, ::Base.Generator{Array{Float64,1},Base.#abs2}, ::Base.EltypeUnknown, ::Base.HasShape), :(Base._collect), :(x), :($(Expr(:new, Base.Generator{Array{Float64,1},Base.#abs2}, :($(QuoteNode(abs2))), :(x)))), :($(QuoteNode(Base.EltypeUnknown()))), :($(QuoteNode(Base.HasShape())))))
      6: 
      return $(Expr(:invoke, MethodInstance for #pmap#213(::Array{Any,1}, ::Function, ::Function, ::Array{Float64,1}), :(Base.Distributed.#pmap#213), :($(Expr(:foreigncall, :(:jl_alloc_array_1d), Array{Any,1}, svec(Any, Int64), Array{Any,1}, 0, 0, 0))), :(Main.pmap), :(Main.abs2), :(x)))
  end::Any

julia> @generated return_mymap() = nprocs() == 1 ? map : pmap
return_mymap (generic function with 1 method)

julia> function h(x)
           mymap = return_mymap()
           mymap(abs2, x)
       end
h (generic function with 1 method)

julia> @code_warntype h(x)
Variables:
  #self#::#h
  x::Array{Float64,1}
  mymap::Base.#map

Body:
  begin  # line 3:
      return $(Expr(:invoke, MethodInstance for _collect(::Array{Float64,1}, ::Base.Generator{Array{Float64,1},Base.#abs2}, ::Base.EltypeUnknown, ::Base.HasShape), :(Base._collect), :(x), :($(Expr(:new, Base.Generator{Array{Float64,1},Base.#abs2}, :($(QuoteNode(abs2))), :(x)))), :($(QuoteNode(Base.EltypeUnknown()))), :($(QuoteNode(Base.HasShape())))))
  end::Array{Float64,1}

EDIT:
Also tried in place versions, and confirmed what code_warntype suggested with BenchmarkTools. h and h! > g and g! >> f and f!

1 Like