I think the best characterization is mindf… . I am enclosing some code that helped me understand what pmap runs where and when. I don’t fully understand it, and why this has to be done in this mindf… way rather than a more userfriendly way, but it works.
I am curious whether this code can be easily modified to be used on non-local threads (multiple computers). if someone has two computers already set up for this…
(nprocs()>=4) || addprocs(4, topology=:master_slave)
@everywhere thistimestart=time()
@everywhere function sleeper( args::Tuple{ Int, Int, Int } )
(core::Int, tid::Int, workqueuecontenti::Int) = args
const TICK=0.75
t=time(); gstarttime=round(time()-thistimestart,1)
print("[$tid:$workqueuecontenti:$gstarttime]");
sleep( (0.9+rand()/4)*TICK )
(tid==2) && sleep(8 *TICK) ## sleep a long time
(tid==8) && sleep(5 *TICK) ## sleep a less long time
tooktime= round(time()-t,2)
print("[/$tid:$workqueuecontenti:$gstarttime:$tooktime] ")
"taskid=$tid. core $core. start@$gstarttime, ran $tooktime. random=$(rand('a':'z'))"
end#function
msg(s::String,p::Int) = println("$(round(time()-thistimestart,1)):\t$(repeat(" ", max(0,p)))p$p: $s")
## the workqueue are the arguments for each desired function
function pmap(fun::Function, workqueue, numcores::Int=nprocs())
tasknum = length(workqueue)
resultcollector = Vector{Any}(tasknum)
taskidxcounter = 0 ## must be outside!
msg("enter sync pre", -1)
@sync begin
msg("enter sync in", -1)
for p=1:numcores
msg("in for loop", p)
if (numcores == 1) || ((numcores>1) && (p != myid())) ## redundant for clarity
@async begin
msg(" in async. next up will be while", p)
while (taskidxcounter < tasknum)
taskidxcounter+=1
taskid=taskidxcounter ## taskid must be local
# (taskid > tasknum) && break
msg(" next task $taskid in while, run+wait4fetch. tbd", p)
resultcollector[taskid] = remotecall_fetch(fun,p, (p,taskid,workqueue[taskid]) )
msg(" fetching done: taskid=$taskid. got $(resultcollector[taskid])", p)
end#while
msg("out of while loop", p)
end#begin @async
msg("out of async", p)
end#if
end#for
msg("out of for loop for p's", -1)
end#begin sync
msg("out of sync and returning resultcollector", -1)
resultcollector
end
r= pmap( sleeper, [1:13;] )
println("\n----\n"); display(r); println("\n")
which results in the somewhat lengthy output of
0.1: p-1: enter sync pre
0.1: p-1: enter sync in
0.1: p1: in for loop
0.1: p2: in for loop
0.1: p2: out of async
0.1: p3: in for loop
0.1: p3: out of async
0.1: p4: in for loop
0.1: p4: out of async
0.1: p5: in for loop
0.1: p5: out of async
0.1: p-1: out of for loop for p's
0.1: p2: in async. next up will be while
0.1: p2: next task 1 in while, run+wait4fetch. tbd
0.1: p3: in async. next up will be while
0.2: p3: next task 2 in while, run+wait4fetch. tbd
0.1: p4: in async. next up will be while
0.2: p4: next task 3 in while, run+wait4fetch. tbd
0.1: p5: in async. next up will be while
0.2: p5: next task 4 in while, run+wait4fetch. tbd
0.9: p5: fetching done: taskid=4. got taskid=4. core 5. start@0.2, ran 0.7. random=j
0.9: p5: next task 5 in while, run+wait4fetch. tbd
0.9: p2: fetching done: taskid=1. got taskid=1. core 2. start@0.2, ran 0.74. random=r
0.9: p2: next task 6 in while, run+wait4fetch. tbd
1.0: p4: fetching done: taskid=3. got taskid=3. core 4. start@0.2, ran 0.79. random=t
1.0: p4: next task 7 in while, run+wait4fetch. tbd
1.7: p4: fetching done: taskid=7. got taskid=7. core 4. start@1.0, ran 0.72. random=s
1.7: p4: next task 8 in while, run+wait4fetch. tbd
1.7: p5: fetching done: taskid=5. got taskid=5. core 5. start@0.9, ran 0.82. random=l
1.7: p5: next task 9 in while, run+wait4fetch. tbd
1.8: p2: fetching done: taskid=6. got taskid=6. core 2. start@0.9, ran 0.84. random=w
1.8: p2: next task 10 in while, run+wait4fetch. tbd
2.6: p5: fetching done: taskid=9. got taskid=9. core 5. start@1.7, ran 0.83. random=h
2.6: p5: next task 11 in while, run+wait4fetch. tbd
2.6: p2: fetching done: taskid=10. got taskid=10. core 2. start@1.8, ran 0.86. random=o
2.6: p2: next task 12 in while, run+wait4fetch. tbd
3.3: p5: fetching done: taskid=11. got taskid=11. core 5. start@2.5, ran 0.71. random=x
3.3: p5: next task 13 in while, run+wait4fetch. tbd
3.4: p2: fetching done: taskid=12. got taskid=12. core 2. start@2.6, ran 0.75. random=v
3.4: p2: out of while loop
4.0: p5: fetching done: taskid=13. got taskid=13. core 5. start@3.3, ran 0.77. random=v
4.0: p5: out of while loop
6.2: p4: fetching done: taskid=8. got taskid=8. core 4. start@1.7, ran 4.47. random=k
6.2: p4: out of while loop
6.9: p3: fetching done: taskid=2. got taskid=2. core 3. start@0.2, ran 6.72. random=j
6.9: p3: out of while loop
6.9: p-1: out of sync and returning resultcollector
----
13-element Array{Any,1}:
"taskid=1. core 2. start@0.2, ran 0.74. random=r"
"taskid=2. core 3. start@0.2, ran 6.72. random=j"
"taskid=3. core 4. start@0.2, ran 0.79. random=t"
"taskid=4. core 5. start@0.2, ran 0.7. random=j"
"taskid=5. core 5. start@0.9, ran 0.82. random=l"
"taskid=6. core 2. start@0.9, ran 0.84. random=w"
"taskid=7. core 4. start@1.0, ran 0.72. random=s"
"taskid=8. core 4. start@1.7, ran 4.47. random=k"
"taskid=9. core 5. start@1.7, ran 0.83. random=h"
"taskid=10. core 2. start@1.8, ran 0.86. random=o"
"taskid=11. core 5. start@2.5, ran 0.71. random=x"
"taskid=12. core 2. start@2.6, ran 0.75. random=v"
"taskid=13. core 5. start@3.3, ran 0.77. random=v"
From worker 2: [1:1:0.2][/1:1:0.2:0.74] [6:6:0.9][/6:6:0.9:0.84] [10:10:1.8][/10:10:1.8:0.86] [12:12:2.6][/12:12:2.6:0.75]
From worker 5: [4:4:0.2][/4:4:0.2:0.7] [5:5:0.9][/5:5:0.9:0.82] [9:9:1.7][/9:9:1.7:0.83] [11:11:2.5][/11:11:2.5:0.71] [13:13:3.3][/13:13:3.3:0.77]
From worker 3: [2:2:0.2][/2:2:0.2:6.72]
From worker 4: [3:3:0.2][/3:3:0.2:0.79] [7:7:1.0][/7:7:1.0:0.72] [8:8:1.7][/8:8:1.7:4.47]