Help me learn concurrency: live manipulation of continuous sound

I have a program here that plays a continuous tone. If you press the ‘r’ key, the pitch rises. If you press ‘f’, the pitch falls. If you press ‘q’, the program quits. The first while loop continuously takes in keyboard input and the second one continuously plays a tone.

Questions:

  1. Here is my understanding of how the program is working: The program, running on only 1 thread, is asynchronous but not simultaneous/parallel. The program is going through the entire buffer and then coming back to the Channel to check if it has anything. So it’s following a cycle of play → check keyboard input → play → check keyboard input. I can see this if I change the buffer size to 41_000 (the sample rate) so that the buffer takes 1 second to play. With this longer buffer, if I type rfrf, the sound fluctuate in pitch every second or so. Is my understanding correct?

  2. For some reason, when I run this with 2 threads and with the play() function as a task (i.e. playtone(true) as opposed to playtone(false)), there seems to be a lot of lag. Some of the key presses get ignored. I also don’t need to press the return key for the program to read the keystroke. Why is this happening?

  3. Do I need to put the second while loop in a Task if I want to use multi-threaded parallel execution? I don’t think I need it if I just want asynchronous but not parallel execution.

  4. Do I need to use Tasks at all if I’m not interested in parallel execution?

using PortAudio: PortAudioStream, write

function playtone(play_as_task::Bool)
    # set up a task to take keyboard input
    input_chnl = Channel{Char}(1)
    input_task = Task() do # read keyboard input and store in Channel, stop if 'q' is typed.
        while true
            put!(input_chnl, read(stdin, Char))
            fetch(input_chnl) == 'q' ? break : nothing # quit if `q` is typed
        end
    end
    input_task.sticky = false
    schedule(input_task)

    # set up a task to play sound
    i::UInt64 = 1
    buffer_size::UInt64 = 1_024
    buffer = Vector{Float32}(undef, buffer_size)
    freq = 440
    stream = PortAudioStream(0, 1; warn_xruns=false)
    function play()
        while true
            if isready(input_chnl)
                _input = take!(input_chnl)
                if _input == 'q' # stop playing sound
                    break
                else # change frequency: 'f' -> fall, 'r' -> rise
                    freq = _input == 'f' ? freq - 20 : _input == 'r' ? freq + 20 : freq
                end
            end
            buffer .= sin.(2 * pi * (i .+ (1:buffer_size)) * freq / stream.sample_rate) # crude, I know.
            write(stream, buffer)
            i += buffer_size
        end
        close(stream)
        close(input_chnl)
    end
    if play_as_task
        output_task = Task(play)
        output_task.sticky = false
        schedule(output_task)
    else
        play()
    end
end

# playtone(true)
playtone(false)
2 Likes

Hi,

  1. Yes, this seems correct.
  2. I cannot replicate this: using multiple threads both playtone(true) and playtone(false) feel equally responsive for me, where the responsiveness is determined by buffer_size. The only difference I notice is that, as you also mention, in the playtone(false) case, I need to use enter to register a character. This difference is also present when using a single thread. I’m not sure of the cause, but I’ll note that if you run the input_task in global scope, it also swallows a stdin input, without the need to use enter. Then again, in combination with the code of play in global scope, I do need enter…
  3. Yes, use Tasks with sticky set to false (or @spawn) for parallel execution on different CPU threads. Obviously this requires Threads.nthreads() >= 2.
  4. Yes, if you want asynchronous single-threaded execution. When a Task is not busy, it will yield, allowing other Tasks to execute. Consider for example
task1 = Task() do
    for _ = 1:3
        sleep(1)
        println("a ", Threads.threadid())
    end
end

task2 = Task() do
    for _ = 1:3
        sleep(1)
        println("b ", Threads.threadid())
    end
end

schedule(task1); schedule(task2)

(regardless of Threads.nthreads(), as we keep sticky set to true). The prints will be
a 1
b 1
a 1
b 1
a 1
b 1
where each a-b line pair is printed pretty much simulaneously. This happens because the Tasks will yield during the sleep (when the CPU is not busy), to allow for other Tasks to run. The prints will then end after 3 seconds.

Clearly, this is different behaviour from synchronously executing the tasks, where we would first print “a 1” thrice in a span of 3 seconds, and then print “b 1” thrice during another 3 seconds. It is also different from merging the loops to

for _ = 1:3
    sleep(1)
    println("a ", Threads.threadid())
    sleep(1)
    println("a ", Threads.threadid())
end

which will again take 6 seconds to execute.

In particular, single-threaded Tasks can be usefull for IO-bound situations.

1 Like

Regarding the enter-behaviour in 2., the discrepancy does not happen when running the code in a script. So it will be related to using the REPL / interactive contexts.

script
# Run with e.g. 
# julia -t 2 script.jl true

using Pkg
Pkg.activate(temp=true)
Pkg.add("PortAudio")

using PortAudio: PortAudioStream, write

function playtone(play_as_task::Bool)
    # set up a task to take keyboard input
    input_chnl = Channel{Char}(1)
    input_task = Task() do # read keyboard input and store in Channel, stop if 'q' is typed.
        while true
            put!(input_chnl, read(stdin, Char))
            fetch(input_chnl) == 'q' ? break : nothing # quit if `q` is typed
			# Small side-note: you could use   fetch(input_chnl) == 'q' && break   (no need for the nothing)
        end
    end
    input_task.sticky = false
    schedule(input_task)

    # set up a task to play sound
    i::UInt64 = 1
    buffer_size::UInt64 = 1_024
    buffer = Vector{Float32}(undef, buffer_size)
    freq = 440
    stream = PortAudioStream(0, 1; warn_xruns=false)
    function play()
        while true
            if isready(input_chnl)
                _input = take!(input_chnl)
                if _input == 'q' # stop playing sound
                    break
                else # change frequency: 'l' -> lower, 'h' -> higher
                    freq = _input == 'f' ? freq - 20 : _input == 'r' ? freq + 20 : freq
                end
            end
            buffer .= sin.(2 * pi * (i .+ (1:buffer_size)) * freq / stream.sample_rate) # crude, I know.
            write(stream, buffer)
            i += buffer_size
        end
        close(stream)
        close(input_chnl)
    end
    if play_as_task
        output_task = Task(play)
        output_task.sticky = false
        schedule(output_task)
		return output_task
    else
        play()
    end
end


play_as_task = parse(Bool, ARGS[1])

if play_as_task
	output_task = playtone(true)
	wait(output_task)  # Avoid that the script immediately terminates
else
	playtone(false)
end

In this case I need to use enter regardless of the value of play_as_task.

Tasks are the recommended way to do asynchronous and/or parallel work in Julia. A sticky task will run on the thread it was created from, so if you launch multiple sticky tasks they will execute asynchronously but not in parallel.

  1. Yes
  2. I was able to reproduce this. I believe that what’s happening is you’re running playtone on the main thread, but since play() is called as a task, it can be paused or put into the background. Adding a wait statement fixes the issue:
    if play_as_task
        output_task = Task(play)
        output_task.sticky = false
        schedule(output_task)
        wait(output_task)
  1. Yes
  2. In general, yes. When you run your code, it runs on the main task by default. If you want to have 2 concurrent units of work, you need to explicitly create a second task. If you want n concurrent units of work, you need to explicitly create n-1 tasks.

There seems to be something weird going on with stdin and VSCode. You might get different behavior if you run the program as a script vs in REPL. In order to not let that distract us, lets stick with running the program from the REPL using include(script.jl).

Thanks @eldee and @Satvik. Both your responses are helpful. I can confirm that using wait(output_task) with 2 threads, as @Satvik described, helps. I am not really following why it helps though, or what the statement is doing. The documentation of wait says that when called on a Task, the behavior is for the “current task” to wait for the argument Task to “finish”. I’m not sure what “finish” means here. I doesn’t seem like we’re waiting for the function enclosed by the task to return… There’s playtone, input_task, and output_task, so I’m not sure who is waiting for whom.

Replacing wait(output_task) with wait(input_task) seems to work just as well. Why?

If input_task is waiting for output_task or vice versa, does the execution not become sequential? Are they still parallel?

I did not use VS Code for my testing, just the REPL or Windows Terminal. On that note, if you and @Satvik are using Linux or MacOS, that might also explain the difference in responsiveness we observe in 2.

(You have to slightly modify the script I posted, by removing the code involving ARGS , but this is straightforward, of course.)

No, that is exactly what “finish” would mean. In @Satvik’s wait(output_task) you halt execution of the current (main) task until we broke out of the loop of output_task.

So input_task is not waiting for output_task, or vice versa. Both tasks are still running in parallel. But the main task, which created input_task and output_task, is waiting on one of them. It does not matter which one, we just need to main task to be idle.

On my machine this main task is essentially always idle after you ran playtone. This makes sense to me, as you’re not (explicitly) asking this task to do anything anymore. So the lag issue then never occurs. I don’t understand why it would be different for you and @Satvik .


By the way, you choose a very difficult example to learn about concurrency, working with the REPL and stdin :slight_smile: . I think the base thread is fully in charge of managing the REPL. For example,

julia> t1 = Threads.@spawn while true
           sleep(1)
           println(Threads.threadid())
       end;

julia> t2 = Threads.@spawn while true
           sleep(1)
           println(Threads.threadid())
       end;

5
3
5
3
5
5
3
3
(We are getting a continuous stream of prints. The tasks are not on the base thread.)

julia> while true
       end
(No output gets printed any more.)

(CTRL+Cs to stop the loop.)
WARNING: Force throwing a SIGINT
3ERROR:
InterruptException:5

Stacktrace:
 [1] top-level scope
   @ .\REPL[3]:2

3
5
(Prints continue.)

Adding a sleep to the main thread’s while true loop will also allow for prints to appear.

Presumably stdin will then also be coupled to the base thread, complicating matters in the playtone situation.

Thanks @eldee.

I was able to follow along the examples I came across that showed the basics of parallelism for numerical computations. So I wanted to pick an example slightly more involved. Passing data between tasks with a Channel seemed interesting. And I am working on data sonification and wanted to look into some REPL based, non-GUI interactivity. So this example is actually a boiled-down version of a program I would eventually want.

One thing I’m learning is to think more actively about the REPL and the main thread.

It makes sense that the program needs a wait to prevent premature termination. But I thought the point of concurrency was that input_task and output_task should both continue even when playtone returns. But it looks like, when run as a script from the terminal with julia --threads=2 "script.jl", the Julia process terminates at end of file when playtone returns. Another thing I’m learning.

I am on a mac OS. I don’t know if it is intended or desired for the behavior of Tasks to be so different for different operating systems. Why would the main task be idle in one case and not in another? That means, despite all the abstractions, concurrency in Julia still requires platform-specific code.

Or worse, waiting for output_task inside the playtone function as @Satvik suggested, or outside it as you (@eldee) suggested both seem to work when includeing the file from the REPL. But when called from the terminal with julia --threads=2 "script.jl", it works for the for the first few keyboard inputs only. Eventually, the program stops responding to the inputs.

I appreciate all the help. But I remain confused.

One thing that’s going on is that you’re using a Channel with size 1. So if the Channel is full, input_task will block on put!(input_chnl, read(stdin, Char)). The elements in the channel only get processed by play, which takes some time to process. So you’ll probably see more responsiveness if you make the Channel size bigger.

But it looks like, when run as a script from the terminal with julia --threads=2 "script.jl" , the Julia process terminates at end of file when playtone returns. Another thing I’m learning.

That’s precisely it – the process will terminate, unless you keep it alive with something like a while true; end. And when the process dies it kills all the related tasks.

Here’s a simple program that captures every character of stdin. Because there’s a buffer, the delay from the other task doesn’t stop it:

function read_char_async()
    ch = Channel(1024)
    Threads.@spawn while !eof(stdin)
        put!(ch, read(stdin, Char))
    end
    return ch
end

ch = read_char_async()

while true
    char = take!(ch)
    # Process the character here
    println("Processed character: ", char)
    
    # Simulate some processing time
    sleep(0.1)
end

That makese sense. At the same time, I thought put! would pick up where it left off. So if I type abcd, the size-1 channel would hold 'a' first, and when that gets eventually removed with take!, put! will unblock and put 'b' into the channel. And so on until the end of the character sequence.

In order to test my theory, I took your example code, changed the channel to size 1 and the sleep time to 4 seconds:

function read_char_async()
    ch = Channel(1)
    Threads.@spawn while !eof(stdin)
        put!(ch, read(stdin, Char))
    end
    return ch
end

ch = read_char_async()

while true
    char = take!(ch)
    # Process the character here
    println("Processed character: ", char)
    
    # Simulate some processing time
    sleep(4)
end

I entered the code in a fresh single-threaded REPL and type abcd and the return key (the characters were not getting read in without the return key). And I do in fact get the following (one line for the return character as well):

Processed character: a
Processed character: b
Processed character: c
Processed character: d
Processed character: 

Well, in that case it’s a very good example! :slight_smile:

I would say it’s not intended or desired, but also not unexpected, as to my understanding task scheduling (as well as IO for that matter) is one of the responsibilities of the operating system.
By the way, I also tested playtone(true) on a computer running Ubuntu, and can indeed notice some unresponsiveness there. In any case there’s no harm in adding a wait. Then your code would work on all platforms.