Decide and queue when core is available?

dear julia experts—I would like to queue tasks onto open CPU cores until one of my tasks has happened to find the correct solution. then I want to be done. In particular, I need to know how my master waits for many slaves, processes results of any done slaves, and then makes decisions, such as whether to queue more. eventually, I would love it to work on server farms, but right now I am starting local on an 8-core Intel processor.

I hope this is easy, but I find myself clueless even how to start. There are not too many examples in the docs.

here is the sketch I have been trying to fill in.

n=time()

@everywhere function sleeper(tid, attempt)
    t=time(); print("[$tid:$attempt:$(round(time()-n,1))]");
    sleep(0.9+rand()/4)
    print("[/$tid:$attempt:$(round(time()-t,2))]")
    (tid, attempt, rand(1:1000))
end#function

(workers()==4) || setprocs(4)  ## topology=:master_slave

foundsofar= Array{ Tuple{Int,Int,Float64} }(0)

## set in motion
for tid=1:4; queueandrun( tid, sleeper(tid,tid), timeout=50 ); end#for
attempt=5

while (true)
     not poll but sleep until any thread returns.
     (tid,attempt,r) = fetch from completed sleeper
     if tid timed out
       warn user, maybe remove/check worker or task
     else 
       push!(foundsofar, (tid,attempt,r) )
       (r==49) && break   ## we are done.  no more
     end#if
     ## tid has become available again.  or ask julia for open workers??
     queueandrun( tid, sleeper(tid, attempt; timeout=50) )
     attempt+=1
end#while

close tid that has found the result.
wait until remaining tids have finished,
push valid results onto foundsofar, and close
the tid.

I hope this is easy. advice appreciated.

regards,

/iaw

One approach is to write your own scheduler for parallel workers, along the lines of the pmap implemention listed here in the doc . Your scheduler would evaluate the incoming results and break if the result were found. That’s better than making sleeper jobs that await jobs on a shared queue or jobs channel, been there.

I believe this approach could use local or remote workers. I use a variant of the above on a 32-core machine, and will totally max the machine out for as long as it takes to get the job done.

1 Like

thank you, pasha. pointing me to this example was what I needed. the logic behind the example is very tricky. I put in a lot of print statements to understand how this proceeds. so now I should be able to modify it.

Yes, the use of @async and that nextidx is confusing. I modified it a good bit for my own purposes, but kept the spirit of it, which is to get workers on the job as soon as they are done. My prior rig had a jobs channel and an emit channel, but that was too much data traffic versus work being done. You have to balance these things carefully!

I think the best characterization is mindf… . I am enclosing some code that helped me understand what pmap runs where and when. I don’t fully understand it, and why this has to be done in this mindf… way rather than a more userfriendly way, but it works.

I am curious whether this code can be easily modified to be used on non-local threads (multiple computers). if someone has two computers already set up for this…


(nprocs()>=4) || addprocs(4, topology=:master_slave)

@everywhere thistimestart=time()

@everywhere function sleeper( args::Tuple{ Int, Int, Int } )
    (core::Int, tid::Int, workqueuecontenti::Int) = args
    const TICK=0.75
    t=time(); gstarttime=round(time()-thistimestart,1)
    print("[$tid:$workqueuecontenti:$gstarttime]");
    sleep( (0.9+rand()/4)*TICK )
    (tid==2) && sleep(8 *TICK)  ## sleep a long time
    (tid==8) && sleep(5 *TICK)  ## sleep a less long time
    tooktime= round(time()-t,2)
    print("[/$tid:$workqueuecontenti:$gstarttime:$tooktime] ")
    "taskid=$tid.   core $core.   start@$gstarttime, ran $tooktime.   random=$(rand('a':'z'))"
end#function

msg(s::String,p::Int) = println("$(round(time()-thistimestart,1)):\t$(repeat(" ", max(0,p)))p$p: $s")

## the workqueue are the arguments for each desired function

function pmap(fun::Function, workqueue, numcores::Int=nprocs())
    tasknum = length(workqueue)
    resultcollector = Vector{Any}(tasknum)
    taskidxcounter = 0  ## must be outside!

    msg("enter sync pre", -1)
    @sync begin
        msg("enter sync in", -1)
        for p=1:numcores
            msg("in for loop", p)
            if (numcores == 1) || ((numcores>1) && (p != myid()))  ## redundant for clarity
                @async begin
                    msg(" in async. next up will be while", p)
                    while (taskidxcounter < tasknum)
                        taskidxcounter+=1
                        taskid=taskidxcounter                    ## taskid must be local
                        # (taskid > tasknum) && break
                        msg("  next task $taskid in while, run+wait4fetch. tbd", p)
                        resultcollector[taskid] = remotecall_fetch(fun,p,  (p,taskid,workqueue[taskid]) )
                        msg("  fetching done: taskid=$taskid.  got $(resultcollector[taskid])", p)
                    end#while
                    msg("out of while loop", p)
                end#begin @async
                msg("out of async", p)
            end#if
        end#for
        msg("out of for loop for p's", -1)
    end#begin sync
    msg("out of sync and returning resultcollector", -1)
    resultcollector
end


r= pmap( sleeper, [1:13;] )

println("\n----\n"); display(r); println("\n")

which results in the somewhat lengthy output of

0.1:	p-1: enter sync pre
0.1:	p-1: enter sync in
0.1:	 p1: in for loop
0.1:	  p2: in for loop
0.1:	  p2: out of async
0.1:	   p3: in for loop
0.1:	   p3: out of async
0.1:	    p4: in for loop
0.1:	    p4: out of async
0.1:	     p5: in for loop
0.1:	     p5: out of async
0.1:	p-1: out of for loop for p's
0.1:	  p2:  in async. next up will be while
0.1:	  p2:   next task 1 in while, run+wait4fetch. tbd
0.1:	   p3:  in async. next up will be while
0.2:	   p3:   next task 2 in while, run+wait4fetch. tbd
0.1:	    p4:  in async. next up will be while
0.2:	    p4:   next task 3 in while, run+wait4fetch. tbd
0.1:	     p5:  in async. next up will be while
0.2:	     p5:   next task 4 in while, run+wait4fetch. tbd
0.9:	     p5:   fetching done: taskid=4.  got taskid=4.   core 5.   start@0.2, ran 0.7.   random=j
0.9:	     p5:   next task 5 in while, run+wait4fetch. tbd
0.9:	  p2:   fetching done: taskid=1.  got taskid=1.   core 2.   start@0.2, ran 0.74.   random=r
0.9:	  p2:   next task 6 in while, run+wait4fetch. tbd
1.0:	    p4:   fetching done: taskid=3.  got taskid=3.   core 4.   start@0.2, ran 0.79.   random=t
1.0:	    p4:   next task 7 in while, run+wait4fetch. tbd
1.7:	    p4:   fetching done: taskid=7.  got taskid=7.   core 4.   start@1.0, ran 0.72.   random=s
1.7:	    p4:   next task 8 in while, run+wait4fetch. tbd
1.7:	     p5:   fetching done: taskid=5.  got taskid=5.   core 5.   start@0.9, ran 0.82.   random=l
1.7:	     p5:   next task 9 in while, run+wait4fetch. tbd
1.8:	  p2:   fetching done: taskid=6.  got taskid=6.   core 2.   start@0.9, ran 0.84.   random=w
1.8:	  p2:   next task 10 in while, run+wait4fetch. tbd
2.6:	     p5:   fetching done: taskid=9.  got taskid=9.   core 5.   start@1.7, ran 0.83.   random=h
2.6:	     p5:   next task 11 in while, run+wait4fetch. tbd
2.6:	  p2:   fetching done: taskid=10.  got taskid=10.   core 2.   start@1.8, ran 0.86.   random=o
2.6:	  p2:   next task 12 in while, run+wait4fetch. tbd
3.3:	     p5:   fetching done: taskid=11.  got taskid=11.   core 5.   start@2.5, ran 0.71.   random=x
3.3:	     p5:   next task 13 in while, run+wait4fetch. tbd
3.4:	  p2:   fetching done: taskid=12.  got taskid=12.   core 2.   start@2.6, ran 0.75.   random=v
3.4:	  p2: out of while loop
4.0:	     p5:   fetching done: taskid=13.  got taskid=13.   core 5.   start@3.3, ran 0.77.   random=v
4.0:	     p5: out of while loop
6.2:	    p4:   fetching done: taskid=8.  got taskid=8.   core 4.   start@1.7, ran 4.47.   random=k
6.2:	    p4: out of while loop
6.9:	   p3:   fetching done: taskid=2.  got taskid=2.   core 3.   start@0.2, ran 6.72.   random=j
6.9:	   p3: out of while loop
6.9:	p-1: out of sync and returning resultcollector

----

13-element Array{Any,1}:
 "taskid=1.   core 2.   start@0.2, ran 0.74.   random=r"
 "taskid=2.   core 3.   start@0.2, ran 6.72.   random=j"
 "taskid=3.   core 4.   start@0.2, ran 0.79.   random=t"
 "taskid=4.   core 5.   start@0.2, ran 0.7.   random=j"
 "taskid=5.   core 5.   start@0.9, ran 0.82.   random=l"
 "taskid=6.   core 2.   start@0.9, ran 0.84.   random=w"
 "taskid=7.   core 4.   start@1.0, ran 0.72.   random=s"
 "taskid=8.   core 4.   start@1.7, ran 4.47.   random=k"
 "taskid=9.   core 5.   start@1.7, ran 0.83.   random=h"
 "taskid=10.   core 2.   start@1.8, ran 0.86.   random=o"
 "taskid=11.   core 5.   start@2.5, ran 0.71.   random=x"
 "taskid=12.   core 2.   start@2.6, ran 0.75.   random=v"
 "taskid=13.   core 5.   start@3.3, ran 0.77.   random=v"

	From worker 2:	[1:1:0.2][/1:1:0.2:0.74] [6:6:0.9][/6:6:0.9:0.84] [10:10:1.8][/10:10:1.8:0.86] [12:12:2.6][/12:12:2.6:0.75]
	From worker 5:	[4:4:0.2][/4:4:0.2:0.7] [5:5:0.9][/5:5:0.9:0.82] [9:9:1.7][/9:9:1.7:0.83] [11:11:2.5][/11:11:2.5:0.71] [13:13:3.3][/13:13:3.3:0.77]
	From worker 3:	[2:2:0.2][/2:2:0.2:6.72]
	From worker 4:	[3:3:0.2][/3:3:0.2:0.79] [7:7:1.0][/7:7:1.0:0.72] [8:8:1.7][/8:8:1.7:4.47]

haha, you had me searching the web for a Juila function called mindf() and, like many Julia searches on Google, it led to lots of strange places unrelated to the Julia language.