Threads.spawn - increment/decrement atomic variable

This is my working code, which is spawning threads. This example is working.

while true
    M = pullDataFromQueue()
        if !isnothing(M)
            Threads.@spawn begin
                # Worker
                delay(3)
                end
            end
      end
end

I want to limit how much threads can be spawned. I need to identify when spawned thread is finished.

Following code is implementing increasing and decreasing atomic variable. Code is incrementing variable, but not decrementing.

How can I count how much spawned threads are still running? Why is following code not working?

activeThreads = Threads.Atomic{Int}(0)

while true
    if activeThreads[] <= 4
        M = pullDataFromQueue()
        if !isnothing(M)
            Threads.@spawn begin
                Threads.atomic_add!(activeThreads,1)

                # Worker
                delay(3)

                Threads.atomic_sub!(activeThreads,1)
            end
        end
    end
    println("Active threads: ", activeThreads[])
end

Where delay function is coming from? Usual Julia command for operations delaying is sleep, so it should be something like

activeThreads = Threads.Atomic{Int}(0)

while true
    if activeThreads[] <= 4
        M = pullDataFromQueue()
        if !isnothing(M)
            Threads.@spawn begin
                Threads.atomic_add!(activeThreads,1)

                # Worker
                sleep(3)

                Threads.atomic_sub!(activeThreads,1)
            end
        end
    end
    println("Active threads: ", activeThreads[])
end

Also, you probably run into race conditions when you are trying to read activeThreads[] so results are more or less random. You can see it better if you execute something like this

activeThreads = Threads.Atomic{Int}(0)

while true
        Threads.@spawn begin
            Threads.atomic_add!(activeThreads,1)

            # Worker
            sleep(0.1)

            Threads.atomic_sub!(activeThreads,1)
        end
    println("Active threads: ", activeThreads[])
end

You’ll see that number of active threads oscillates randomly.

I’m sorry for that fault (Delay → Sleep). Now, it is working. I don’t see random numbers. I see behavior what I was expecting. Why should be results more or less random?

Because at the moment when println function is executed arbitrary number of threads launched and stopped. At least on my instance, I see this

Active threads: 3476
Active threads: 3477
Active threads: 3478
Active threads: 3444
Active threads: 3433
Active threads: 3434
Active threads: 3435

yes, so how can I implemented described behavior? How can I fix current implementationt? Lock atomic variabe? or?

Probably it’s not an answer to your original question, but I would recommend to think over the approach itself. First of all, maybe you can avoid using low level functions like @spawn. There are multiple high-level alternatives: Folds.jl, ThreadsX.jl, FLoops.jl.

If you want to control the number of threads, then there is ThreadPools.jl.

If you still want to do it on your own, maybe it’s easier to invert the problem: generate necessary number of threads and then execute tasks with the standard producer/consumers pattern. In this approach you need to create channel and put! data there, and consumers can take! data and make necessary manipulations until channel is empty or they receive stop token. This way, you can guarantee number of threads (since it is defined from the start) and also you do not get overhead from spawning new threads.