@Distributed: On worker 2 UndefVarError {{Module}}

Hello,
i want to run a for loop in parallel but using Threads.threads just made the Programm slower so i wanted to try @distributed

module MyModule
export start, loop_inner, rec
function start()
    return rec(...)
end

@inline function loop_inner(...)
    ...
end
function rec(...)
    ...
    results = zeros(N)
    @sync @distributed for idx in 1::N
        results[idx] = MyModule.loop_inner(...)
    end
    return sum(results)
    ...
end

When i just use@distributed without @sync in front of it - im getting no UndefVarError's but sum(results) will always be 0. When i try to use @sync @distributed im getting:

ERROR: LoadError: TaskFailedException:
On worker 2:
UndefVarError: MyModule not defined

My Main looks something like this:

using Distributed
cwd = pwd()
@Distributed.everywhere push!(LOAD_PATH,"pwd")
@Distributed.everywhere include("MyModule.jl")
@Distributed.everywhere using .MyModule

workers = Threads.nthreads() # = 8
addprocs(workers, 
            #max_parallel = 8,
            restrict=true, 
            enable_threaded_blas=true,
            exeflags=`--optimize=3 --inline=yes --check-bounds=no --math-mode=fast`, 
            topology=:master_worker)

 LangfordParallel.start()

$JULIA_NUM_THREADS=8 julia -L MyModule.jl main.jl -p=8
Code: https://github.com/masterholdy/julia-threading-question

I have downloaded your code into c:\temp and made it run by changing main_langford.jl:

This

cwd = pwd()
@Distributed.everywhere push!(LOAD_PATH,"pwd")

doesn’t make much sense. You don’t want to have the string “pwd” in the LOAD_PATH. I failed for now to do it right, but the following line helped as a workaround for now:

@everywhere cd("c:\\temp")

Which puts every worker into the desired working directory.

I commentet out the topology as it doesn’t work for me:

addprocs(workers, 
            #max_parallel = 8,
            restrict=true, 
            enable_threaded_blas=true,
            exeflags=`--optimize=3 --inline=yes --check-bounds=no --math-mode=fast`, 
            #topology=:master_worker
)

and put it directly after

using Distributed

So the complete changed part of your main_langford.jl is now:

#include("LangfordParallel.jl")
using Distributed
workers = Threads.nthreads()
addprocs(workers, 
            #max_parallel = 8,
            restrict=true, 
            enable_threaded_blas=true,
            exeflags=`--optimize=3 --inline=yes --check-bounds=no --math-mode=fast`, 
            #topology=:master_worker
)

#cwd = pwd()
@everywhere cd("c:\\temp")
#@Distributed.everywhere include("langford.jl")
@Distributed.everywhere include("LangfordParallel.jl")
@Distributed.everywhere using .LangfordParallel
#@Distributed.everywhere include("LangfordBinary.jl")
#@Distributed.everywhere using Distributed
#using Revise, Profile, PProf
#addprocs(8)

Now starting the REPL (1.4.2, Windows 10 64bit):

julia> cd("c:\\temp")

julia> include("main_langford.jl")
L(s, n) = L(2, 12):
-------------------------------
Start w/: 1 Thread(s)
ERGEBNIS-Sequential (Julia) :0.0
Elapsed time in microseconds : 15311.017199 ms
-------------------------------

Running Julia with JULIA_NUM_THREADS=8 and doing the same brings:

julia> cd("c:\\temp")

julia> include("main_langford.jl")
L(s, n) = L(2, 12):
-------------------------------
Start w/: 8 Thread(s)

and still running (takes very long it seems).

So, your code has quite some issues and I don’t have the time now to do it all right. But I hope that the above changes will help you to do the next steps.

1 Like

What I forgot to mention:

It seems you are mixing Threads and multipe CPUs in your code. You may read https://docs.julialang.org/en/v1/manual/parallel-computing/ again. In general: modern CPUs have multiple Cores, you use them with Distributed. A single core does have multiple threads, thats where multi-threading comes into play.

First I would suggest, decide, which parallel paradigm to use first: threads OR cores. If you want to start with threads remove Distributed, @ everywhere and addprocs and so on. Stick on the Threads documentation https://docs.julialang.org/en/v1/manual/parallel-computing/#man-multithreading-1

(Of course your code should produce the desired result without any paralell execution at the very first!)

Yes for the typicall CPU, there may be CPUs with more than 2 threads per core, don’t know, not important for us now.

Yes, thats what I think too.

What I try to say is: Using threads doesn’t need Distributed, everywhere macro, addproc and all those things which belong to multi processes. You want (I guess) multiple threads in a single process (for the start). This is described in
https://docs.julialang.org/en/v1/manual/parallel-computing/#man-multithreading-1
and when you scroll down it stops with the headline

Multi-Core or Distributed Processing

I can’t see a question anymore, but still it seems you are just trying this and that quite randomly.
(e.g. @inbounds should not pop up here).

The docs are quite good to get the basics about parallel execution.

As we were talking about threads and are heading for a 2CPU/72Thread machine I have put up an example using threads. And because I wasn’t sure if threads are meant to be created on every existing core or just on the cores set available (with the -p parameter when calling Julia). I didn’t found a fast answer in the docs so I just put some code together:

using Random, BenchmarkTools

N=10_000_000
NThreads=8
MersenneTwister(0); #seeding the RNG
a=[ rand(Int,N) for x = 1:NThreads ]

function f_threaded(a,N,NThreads)
	s=zeros(NThreads)
	indices=collect(1:(N-1))
	randperm!(indices)
	for k in 1:10
		Threads.@threads for i in 1:NThreads
			for j in indices
				s[i]+=a[i][j]*a[i][j+1]
			end
		end
	end
	s
end
function f(a,N,NThreads)
	s=zeros(NThreads)
	indices=collect(1:(N-1))
	randperm!(indices)
	for k in 1:10
		for i in 1:NThreads
			for j in indices
				s[i]+=a[i][j]*a[i][j+1]
			end
		end
	end
	s
end
@btime f(a,N,NThreads)
@btime f_threaded(a,N,NThreads)

Threads.nthreads()

On a windows CMD:

C:\temp>set JULIA_NUM_THREADS=16

C:\temp>julia.exe

The outcome is:

julia> @btime f(a,N,NThreads)
  11.607 s (3 allocations: 76.29 MiB)
8-element Array{Float64,1}:
 -3.88638726543992e23
 -4.138778296184118e23
 -2.967358655409068e23
  1.5760206471213304e23
 -1.7625091619973682e23
  2.07202393944643e23
  2.6818004920701988e23
 -9.87477346267389e22

julia> @btime f_threaded(a,N,NThreads)
  5.934 s (466 allocations: 76.36 MiB)
8-element Array{Float64,1}:
 -3.8863872654405564e23
 -4.138778296183166e23
 -2.9673586554094026e23
  1.5760206471210798e23
 -1.762509161997028e23
  2.0720239394459922e23
  2.681800492070774e23
 -9.874773462673677e22

julia> Threads.nthreads()
8

Twice as fast using threads. And:

Using set JULIA_NUM_THREADS=16 results in

julia> Threads.nthreads()
8

and all 8 cores are 100% in use. By the way, the little nodge during the 100% usage was taking the screenshot of the window :slight_smile:

So, all you need is Threads.@threads and you need to check for side effects and race conditions, everything very good covered in the docs. Of course your algorithm should be put into good shape for parallelism. I haven’t viewed what you are doing in your code, because you are still at the very beginning to make your code run and produce something and not only errors.

1 Like

You want to fetch after calling sum(b). That is,

function sum_of_sum_parallel_1(a, b)
    taskL = Threads.@spawn sum(a)
    r = sum(b)
    l = fetch(taskL)
    return l + r
end

In parallelSum, you want to put the @threads on the for thread_idx in 1:numberThreads, not the inner loop:

    Threads.@threads for thread_idx in 1:numberThreads
       for idx in thread_idx:numberThreads:problemSize
            results[thread_idx] += a[idx]
        end
    end

Alternatively, you could use:

    @sync for thread_idx in 1:numberThreads
       Threads.@spawn for idx in thread_idx:numberThreads:problemSize
            results[thread_idx] += a[idx]
        end
    end

The @threads macro splits up a for loop to run on multiple threads; @spawn creates an individual task that can run on any thread.

1 Like

Strange, I can’t confirm:

C:\Users>set JULIA_NUM_THREADS=12

julia> versioninfo()
Julia Version 1.4.1
Commit 381693d3df* (2020-04-14 17:20 UTC)
Platform Info:
  OS: Windows (x86_64-w64-mingw32)
  CPU: AMD Ryzen 5 3600 6-Core Processor
  WORD_SIZE: 64
  LIBM: libopenlibm
  LLVM: libLLVM-8.0.1 (ORCJIT, znver1)
Environment:
  JULIA_NUM_THREADS = 12

using Random, BenchmarkTools

N=100_000_000
NThreads=12
MersenneTwister(0); #seeding the RNG
a=rand(Float64,N)

function f_threaded(a, problemSize, numberThreads = Threads.nthreads())
    results = zeros(numberThreads)
    Threads.@threads for thread_idx in 1:numberThreads
        @simd for idx in thread_idx:numberThreads:problemSize
            @inbounds results[thread_idx] += a[idx]
        end
    end
    return sum(results)
end
function f(a, problemSize, numberThreads = Threads.nthreads())
    results = zeros(numberThreads)
    for thread_idx in 1:numberThreads
        for idx in thread_idx:numberThreads:problemSize
            results[thread_idx] += a[idx]
        end
    end
    return sum(results)
end

julia> @btime f(a,N,NThreads)
  434.296 ms (2 allocations: 192 bytes)
4.99935545816905e7

julia> @btime f_threaded(a,N,NThreads)
  113.651 ms (65 allocations: 10.13 KiB)
4.99935545816905e7

How do you benchmark? Can you provide complete MWE(=minimal working example) code?

If you want to beat sum you will have a hard time.
Have a look at

@edit sum(a)

And you may want to read this discussion: Implementing parallel sum
There are others also about sum and performance, just search for sum.