Limiting the maximum number of parallel threads with @spawn, as with @threads

I would like to write a parallel for loop over files in a directory tree, open them and do some long computation. I started writing something like this:

@threads for (root, dirs, files) in walkdir(path)
  open_and_process(files)
end

which does not work because length(::Channel{Any}) does not exist — understandable. It’s easy then to fix this with @spawn:

for (root, dirs, files) in walkdir(path)
  @spawn open_and_process(files)
end

and wait for the tasks to complete in a second step. This second solution, however, does not seem to consider the JULIA_NUM_THREADS variable, in contrast to the @threads macro. Limiting the number of available threads is essential in my application, as I’m running on a cluster with dozens of cores and using all of them causes “too many open files” errors to pop up.

I see two ways of fixing this: 1) get a list of files beforehand, store them into an array and go back using @threads. 2) implement a bit of logic to @spawn a part of the tasks and wait for them to complete in an intermediate step before starting to spaw again.

Is there a simpler solution I’m not considering here?

(Julia version 1.4.1)

EDIT: a remark to option 1: since the file list is long and the disk I’m working on is rather slow, gathering the files in a list takes quite a bit of time. Not starting tasks as the files come makes it a little less efficient, from this point of view.

I think this is a good place to use channels, since the overhead of channels will be dwarfed by disk operations. You can thread spawn n instances which take! from the channel, and then replace your current @spawn call with a put!. I think this is basically what ThreadPools.jl does.

I believe the @spawn is killing you because of how the scheduler is running. @spawn would create a task that will execute until it has to pause for some event, like file IO. At this point one of the other tasks will run until it pauses. I’m not sure how the scheduler schedules but I’d be willing to bet it’s round-robin which mean each of your tasks would open a file, start doing IO, pause, and the next task would schedule open a file, pause, and so on. Which basically means ALL the task would try to have a file open at the same time.

The easiest solution might be to pass a single Semaphore to each task.

https://docs.julialang.org/en/v1/base/parallel/#Base.Semaphore

Set the size to something less than ulimit -Hn and each task would acquire the semaphore before it opens a file, and release it when they close the file. That would keep your total open files below the limit.

Just realize that Julia may have files open, and reading a directory actually involves opening the directory. So give yourself some space.