Decide and queue when core is available?


#1

dear julia experts—I would like to queue tasks onto open CPU cores until one of my tasks has happened to find the correct solution. then I want to be done. In particular, I need to know how my master waits for many slaves, processes results of any done slaves, and then makes decisions, such as whether to queue more. eventually, I would love it to work on server farms, but right now I am starting local on an 8-core Intel processor.

I hope this is easy, but I find myself clueless even how to start. There are not too many examples in the docs.

here is the sketch I have been trying to fill in.

n=time()

@everywhere function sleeper(tid, attempt)
    t=time(); print("[$tid:$attempt:$(round(time()-n,1))]");
    sleep(0.9+rand()/4)
    print("[/$tid:$attempt:$(round(time()-t,2))]")
    (tid, attempt, rand(1:1000))
end#function

(workers()==4) || setprocs(4)  ## topology=:master_slave

foundsofar= Array{ Tuple{Int,Int,Float64} }(0)

## set in motion
for tid=1:4; queueandrun( tid, sleeper(tid,tid), timeout=50 ); end#for
attempt=5

while (true)
     not poll but sleep until any thread returns.
     (tid,attempt,r) = fetch from completed sleeper
     if tid timed out
       warn user, maybe remove/check worker or task
     else 
       push!(foundsofar, (tid,attempt,r) )
       (r==49) && break   ## we are done.  no more
     end#if
     ## tid has become available again.  or ask julia for open workers??
     queueandrun( tid, sleeper(tid, attempt; timeout=50) )
     attempt+=1
end#while

close tid that has found the result.
wait until remaining tids have finished,
push valid results onto foundsofar, and close
the tid.

I hope this is easy. advice appreciated.

regards,

/iaw


#2

One approach is to write your own scheduler for parallel workers, along the lines of the pmap implemention listed here in the doc . Your scheduler would evaluate the incoming results and break if the result were found. That’s better than making sleeper jobs that await jobs on a shared queue or jobs channel, been there.

I believe this approach could use local or remote workers. I use a variant of the above on a 32-core machine, and will totally max the machine out for as long as it takes to get the job done.


#3

thank you, pasha. pointing me to this example was what I needed. the logic behind the example is very tricky. I put in a lot of print statements to understand how this proceeds. so now I should be able to modify it.


#4

Yes, the use of @async and that nextidx is confusing. I modified it a good bit for my own purposes, but kept the spirit of it, which is to get workers on the job as soon as they are done. My prior rig had a jobs channel and an emit channel, but that was too much data traffic versus work being done. You have to balance these things carefully!


#5

I think the best characterization is mindf… . I am enclosing some code that helped me understand what pmap runs where and when. I don’t fully understand it, and why this has to be done in this mindf… way rather than a more userfriendly way, but it works.

I am curious whether this code can be easily modified to be used on non-local threads (multiple computers). if someone has two computers already set up for this…


(nprocs()>=4) || addprocs(4, topology=:master_slave)

@everywhere thistimestart=time()

@everywhere function sleeper( args::Tuple{ Int, Int, Int } )
    (core::Int, tid::Int, workqueuecontenti::Int) = args
    const TICK=0.75
    t=time(); gstarttime=round(time()-thistimestart,1)
    print("[$tid:$workqueuecontenti:$gstarttime]");
    sleep( (0.9+rand()/4)*TICK )
    (tid==2) && sleep(8 *TICK)  ## sleep a long time
    (tid==8) && sleep(5 *TICK)  ## sleep a less long time
    tooktime= round(time()-t,2)
    print("[/$tid:$workqueuecontenti:$gstarttime:$tooktime] ")
    "taskid=$tid.   core $core.   start@$gstarttime, ran $tooktime.   random=$(rand('a':'z'))"
end#function

msg(s::String,p::Int) = println("$(round(time()-thistimestart,1)):\t$(repeat(" ", max(0,p)))p$p: $s")

## the workqueue are the arguments for each desired function

function pmap(fun::Function, workqueue, numcores::Int=nprocs())
    tasknum = length(workqueue)
    resultcollector = Vector{Any}(tasknum)
    taskidxcounter = 0  ## must be outside!

    msg("enter sync pre", -1)
    @sync begin
        msg("enter sync in", -1)
        for p=1:numcores
            msg("in for loop", p)
            if (numcores == 1) || ((numcores>1) && (p != myid()))  ## redundant for clarity
                @async begin
                    msg(" in async. next up will be while", p)
                    while (taskidxcounter < tasknum)
                        taskidxcounter+=1
                        taskid=taskidxcounter                    ## taskid must be local
                        # (taskid > tasknum) && break
                        msg("  next task $taskid in while, run+wait4fetch. tbd", p)
                        resultcollector[taskid] = remotecall_fetch(fun,p,  (p,taskid,workqueue[taskid]) )
                        msg("  fetching done: taskid=$taskid.  got $(resultcollector[taskid])", p)
                    end#while
                    msg("out of while loop", p)
                end#begin @async
                msg("out of async", p)
            end#if
        end#for
        msg("out of for loop for p's", -1)
    end#begin sync
    msg("out of sync and returning resultcollector", -1)
    resultcollector
end


r= pmap( sleeper, [1:13;] )

println("\n----\n"); display(r); println("\n")

which results in the somewhat lengthy output of

0.1:	p-1: enter sync pre
0.1:	p-1: enter sync in
0.1:	 p1: in for loop
0.1:	  p2: in for loop
0.1:	  p2: out of async
0.1:	   p3: in for loop
0.1:	   p3: out of async
0.1:	    p4: in for loop
0.1:	    p4: out of async
0.1:	     p5: in for loop
0.1:	     p5: out of async
0.1:	p-1: out of for loop for p's
0.1:	  p2:  in async. next up will be while
0.1:	  p2:   next task 1 in while, run+wait4fetch. tbd
0.1:	   p3:  in async. next up will be while
0.2:	   p3:   next task 2 in while, run+wait4fetch. tbd
0.1:	    p4:  in async. next up will be while
0.2:	    p4:   next task 3 in while, run+wait4fetch. tbd
0.1:	     p5:  in async. next up will be while
0.2:	     p5:   next task 4 in while, run+wait4fetch. tbd
0.9:	     p5:   fetching done: taskid=4.  got taskid=4.   core 5.   start@0.2, ran 0.7.   random=j
0.9:	     p5:   next task 5 in while, run+wait4fetch. tbd
0.9:	  p2:   fetching done: taskid=1.  got taskid=1.   core 2.   start@0.2, ran 0.74.   random=r
0.9:	  p2:   next task 6 in while, run+wait4fetch. tbd
1.0:	    p4:   fetching done: taskid=3.  got taskid=3.   core 4.   start@0.2, ran 0.79.   random=t
1.0:	    p4:   next task 7 in while, run+wait4fetch. tbd
1.7:	    p4:   fetching done: taskid=7.  got taskid=7.   core 4.   start@1.0, ran 0.72.   random=s
1.7:	    p4:   next task 8 in while, run+wait4fetch. tbd
1.7:	     p5:   fetching done: taskid=5.  got taskid=5.   core 5.   start@0.9, ran 0.82.   random=l
1.7:	     p5:   next task 9 in while, run+wait4fetch. tbd
1.8:	  p2:   fetching done: taskid=6.  got taskid=6.   core 2.   start@0.9, ran 0.84.   random=w
1.8:	  p2:   next task 10 in while, run+wait4fetch. tbd
2.6:	     p5:   fetching done: taskid=9.  got taskid=9.   core 5.   start@1.7, ran 0.83.   random=h
2.6:	     p5:   next task 11 in while, run+wait4fetch. tbd
2.6:	  p2:   fetching done: taskid=10.  got taskid=10.   core 2.   start@1.8, ran 0.86.   random=o
2.6:	  p2:   next task 12 in while, run+wait4fetch. tbd
3.3:	     p5:   fetching done: taskid=11.  got taskid=11.   core 5.   start@2.5, ran 0.71.   random=x
3.3:	     p5:   next task 13 in while, run+wait4fetch. tbd
3.4:	  p2:   fetching done: taskid=12.  got taskid=12.   core 2.   start@2.6, ran 0.75.   random=v
3.4:	  p2: out of while loop
4.0:	     p5:   fetching done: taskid=13.  got taskid=13.   core 5.   start@3.3, ran 0.77.   random=v
4.0:	     p5: out of while loop
6.2:	    p4:   fetching done: taskid=8.  got taskid=8.   core 4.   start@1.7, ran 4.47.   random=k
6.2:	    p4: out of while loop
6.9:	   p3:   fetching done: taskid=2.  got taskid=2.   core 3.   start@0.2, ran 6.72.   random=j
6.9:	   p3: out of while loop
6.9:	p-1: out of sync and returning resultcollector

----

13-element Array{Any,1}:
 "taskid=1.   core 2.   start@0.2, ran 0.74.   random=r"
 "taskid=2.   core 3.   start@0.2, ran 6.72.   random=j"
 "taskid=3.   core 4.   start@0.2, ran 0.79.   random=t"
 "taskid=4.   core 5.   start@0.2, ran 0.7.   random=j"
 "taskid=5.   core 5.   start@0.9, ran 0.82.   random=l"
 "taskid=6.   core 2.   start@0.9, ran 0.84.   random=w"
 "taskid=7.   core 4.   start@1.0, ran 0.72.   random=s"
 "taskid=8.   core 4.   start@1.7, ran 4.47.   random=k"
 "taskid=9.   core 5.   start@1.7, ran 0.83.   random=h"
 "taskid=10.   core 2.   start@1.8, ran 0.86.   random=o"
 "taskid=11.   core 5.   start@2.5, ran 0.71.   random=x"
 "taskid=12.   core 2.   start@2.6, ran 0.75.   random=v"
 "taskid=13.   core 5.   start@3.3, ran 0.77.   random=v"

	From worker 2:	[1:1:0.2][/1:1:0.2:0.74] [6:6:0.9][/6:6:0.9:0.84] [10:10:1.8][/10:10:1.8:0.86] [12:12:2.6][/12:12:2.6:0.75]
	From worker 5:	[4:4:0.2][/4:4:0.2:0.7] [5:5:0.9][/5:5:0.9:0.82] [9:9:1.7][/9:9:1.7:0.83] [11:11:2.5][/11:11:2.5:0.71] [13:13:3.3][/13:13:3.3:0.77]
	From worker 3:	[2:2:0.2][/2:2:0.2:6.72]
	From worker 4:	[3:3:0.2][/3:3:0.2:0.79] [7:7:1.0][/7:7:1.0:0.72] [8:8:1.7][/8:8:1.7:4.47]

#6

haha, you had me searching the web for a Juila function called mindf() and, like many Julia searches on Google, it led to lots of strange places unrelated to the Julia language.