Task scheduling and Priority

I have a package that both collects data and analyzes the data. These parts are separated, with the data acquisition function periodically collecting data and writing this data to a file. The analysis function looks for a new data file, and when it exists, then it reads and processes the data file. Both functions run continuously, with the acquire function running in a separate thread using the @spawn command.

These tasks are run on separate threads using the following code:

function main()::Cint
    try
        #separate thread for acquiring data
        Threads.@spawn acquire()
        # main thread to process the files acquired
        autorun()
    catch
        Base.invokelatest(Base.display_error, Base.catch_stack())
        return 1
    end
    return 0
end

The function is written with the intent to use it for the PackageCompiler, but that has not been tested yet.

The machine is a PI 4 with 8 GB of RAM. The acquire portion only takes a couple percent of the CPU. The main work of acquisition is performed by PI HATS and the main bottleneck in this is reading the data from the HAT and writing it to a file, in my case an Arrow file. If this fails, then there is a buffer overrun error message.

With some testing, the acquire function running on its own works well. However when it is acquiring data at the same time that the PI is processing another data file, I can get the buffer overrun error and in one case the program hung.

Linux has a way of setting process priority, is there a way I can do this in Julia for tasks? Or is there something else that I am missing?

How many threads are you starting julia with? Usually you need to pass a -t argument.

Also, note that Threads.@spawn doesn’t itself spawn an OS thread. It just spawns a julia Task, which will be scheduled on any available OS thread (which are currently created during startup). There is no priority setting for Tasks, the scheduler is cooperative; there is no preemption.

1 Like

The PI has 4 cores and I am starting it with julia -t3 --project=.

Is there another way to set priorities in Julia?

As I said, there is no Task-priority mechanism. Julia Tasks are cooperative, which means they run until they yield to the scheduler.

Can you share a bit more of your code? You mentioned that you get a buffer overrun error, do you have a stacktrace/concrete error message?

1 Like

The HAT is an MCC172 HAT for the PI. It has a C developed .so library to access its functions. I ccall into that code from julia to access its functions using the MccDaqHats package. The complete code I am using is very similar to the example file given in the MccDaqHats package. The initial code sets up the HATS and then when it is acquiring the data it calls the following code snippet:

while total_samples_read < totalsamplesperchan
            
    # read and process data a HAT at a time
    for hu in hatuse
        resultcode, statuscode, result, samples_read = 
            mcc172_a_in_scan_read(hu.address, readrequestsize, hu.numchanused, timeout)
        # Can do a check on result_code
        @debug "resultcode is $resultcode, statuscode is $statuscode, samples_read is $samples_read"
        # Check for an overrun error
        status = mcc172_status_decode(statuscode)
        if status.hardwareoverrun
            error("Hardware overrun")
        elseif status.bufferoverrun
            error("Buffer overrun")
        elseif !status.triggered
            error("Measurement not triggered")
        elseif !status.running
            error("Measurement not running")
        elseif samples_read ≠ readrequestsize
            error("Samples read was $samples_read and requested size is $readrequestsize")
        end

        # Get the right column(s) for the channel(s) on this hat
        if hu.chanmask == 0x01
            chan = hu.measchannel1
        elseif hu.chanmask == 0x02
            chan = hu.measchannel2
        elseif hu.chanmask == 0x03
            chan = [hu.measchannel1 hu.measchannel2]
        else
            error("Channel mask is incorrect")
        end
        @debug "On hat $(hu.adress) the channels used are $chan"

        # deinterleave the data and put in temporary matrix or hdf dataset
        scanresult[1:readrequestsize,chan] = deinterleave(result, hu.numchanused)
    end

    # convert matrix to a Table and write to Arrow formatted Data
    write && Arrow.write(writer, Tables.table(scanresult))
    total_samples_read += readrequestsize
    @debug "Total samples read for scan: $total_samples_read"

    # process some of the data

end

The first line of code after the main for loop in the shown code ccalls the function to read the data from the HAT. This function also returns a status code to determine if it was successful or not. When this code is running without other CPU loads, the CPU load is normally about 1%, and reaches no higher than 5% and the code runs reliably. When the CPU is doing other work and is at say 40% load, then sometimes a buffer overrun occurs.

I recall seeing a package some time ago, which I cannot find anymore, that is somewhat similar to DataFlowTasks.jl but could also set code execution priority. So I thought this might help, but there may be other better ideas.

1 Like

Is your autorun() optimized? From the looks of it, it may just not be able to cope with the rate data is coming in.

In this scenario, the reading task might be blocked in a garbage collection because the processing task is busy on the cpu and can’t serve a garbage collection.

Compute-bound, non-memory-allocating tasks can prevent garbage collection from running in other threads that are allocating memory. In these cases it may be necessary to insert a manual call to GC.safepoint() to allow GC to run. This limitation will be removed in the future.
Multi-Threading · The Julia Language

In general, julia is not a good choice for realtime applications. I would consider creating a simple C-program that runs with linux realtime priority and just dumps raw data into files.

So far, there is not enough information to conclude GC is to blame. You can certainly write real-time code in Julia, it just requires the same kind of mindset as if you’d write the same code in C - that is, don’t allocate heap memory in your hot loop, make your code as deterministic as possible, remove problematic type instabilities etc.

3 Likes

Both the acquire() and the autorun() functions spend a lot of time sleeping. The problem with buffer overrun seems to occur when both are active (not sleeping) at the same time. Then there is periodic data acquisition and then processing the acquired file.

Not sure what you mean by optimized? The code does some work including FFT’s and some other heavy lifting that we tried to make efficient. Maybe we should spend more time looking to reduce allocations in autorun()?

Would it be beneficial to replace Threads.@spawn acquire() with Threads.@threads acquire() or is that even possible?

I have not tried to change the number of threads that I start Julia with. Are there any suggestions on this front?

My guess is that the error is occurring in the C code that controls the HAT, and not in Julia. However it is the additional CPU and memory resources when autorun() is busy and not sleeping that results in the buffer overrun error being issued by the C code.

I have the HAT set to collect 1s worth of data for each scan. The scan rate is set to 51,200 samples per second. And I am collecting 3 channels of data. This is 1.23 MB/s. And my development is on an SD card, hopefully will transition to a PI 5 with a NVMe drive.

My initial thought was to up the priority of the acquire() code to give the C code more resources. Sukera tells me that is not possible. Now I wonder if the code that calls the C function can have a priority set for the C function. This code is given below. If it is possible how would I do this?

function mcc172_a_in_scan_read(address::Integer, samples_per_channel::Integer, mcc172_num_channels::Integer, timeout::Real)
	# Reads status and number of available samples from an analog input scan.
	# Status is an || combination of flags:
	# STATUS_HW_OVERRUN (0x0001) A hardware overrun occurred.
	# STATUS_BUFFER_OVERRUN (0x0002) A scan buffer overrun occurred.
	# STATUS_TRIGGERED (0x0004) The trigger event occurred.
	# STATUS_RUNNING   (0x0008) The scan is running (actively acquiring data.)
	
	status = Ref{UInt16}()					# Initialize
	buffer_size_samples::Int32 = samples_per_channel * mcc172_num_channels
	buffer = Vector{Float64}(undef, buffer_size_samples)
	samples_read_per_channel = Ref{UInt32}() # Initialize
	resultcode = ccall((:mcc172_a_in_scan_read, libdaqhats),
	Cint, (UInt8, Ref{UInt16}, UInt32, Cdouble, Ptr{Cdouble}, UInt32, Ref{UInt32}), 
	address, status, samples_per_channel, timeout, buffer, buffer_size_samples, samples_read_per_channel)
	printError(resultcode)
	return resultcode, status[], buffer, Int(samples_read_per_channel[])
end

Do you think you can write two separate programs? one to acquire() and one to autorun()

From your description, it sounds like you want to perform some form of realtime computation. For that context, it’s usually required to heavily optimize the computation portion of your code, to keep up with the demands for processing.

Some questions for you to ponder:

  • How have you tried to optimize autorun() so far?
    • Are there repeated allocations in autorun(), is it possible to reuse that memory?
    • Are there type instabilities in autorun()?
  • How are you sharing data between acquire() and autorun(), and could that introduce some unwanted overhead?
    • You mention that both autorun and acquire sleep - could this be the source of why autorun can’t keep up with acquire?
  • What are you using for FFT, and if it’s FFTW.jl, are you using a pre-planned FFT & reusing its buffers?

No, and no - Threads.@threads is for (somewhat automatically) threading for loops. It too spawns Julia tasks, not OS threads.

Julia uses an n:m threading model, that is, n Julia tasks are scheduled over m OS threads. You’re spawning Julia with 3 OS threads, so m=3. Any spawning mechanism available in Julia (at the time of writing) only affects n, i.e. the number of Julia tasks scheduled on those 3 OS threads. I dislike the naming of the Threads stdlib for that reason - it’s unnecessarily confusing.

So far, you’ve shown that you have two tasks running in parallel; one running acquire() (the one from Threads.@spawn) and one running autorun() (your main Task). With that setup, starting julia with 3 OS threads (-t 3) should be sufficient.

This is a symptom indicative of autorun not being able to keep with the rate data is collected by the C library. All the C library does is notify you of that fact through the buffer overrun error, I doubt the C library itself is the root cause of that error.

Again, there are no priorities or preferenced scheduling options for Julia tasks. ccall is no different here, there is no “priority” you can give because it’s just a regular function call, as if you had written that same call in C.

1 Like

I thought that’s what the separate :interactive threadpool does. I don’t think this would help this particular situation, though.

FWIW, before we ended up with OhMyThreads.jl, we considered naming the package OhMyTasks. But mentioning threads in some way seemed important for discoverability and communicating that we’re focused on multihreading. What we did though is name our macro API @tasks. Because I dislike @threads (the name and also it’s lack of features).

Perhaps also worth mentioning: OpenMP, a very popular framework that is mostly used for multihreading code in C/Fortran, stands for open multi-processing. Apparently, naming things is hard :slight_smile:

2 Likes

Well… :interactive itself is only a prioritization if nothing else runs on that threadpool. In essence, it just confines tasks to another set of threads, so you don’t get contention across pools. You can still get contention within the :interactive pool, which is why the manual recommends only putting very short lived tasks on it, to keep it responsive for other code.

The data acquisition of acquire collects 1.23 MB per 1 second scan and then writes this to an Arrow file. I added some timing information and the write time is typically about 0.02 s and the other processing time < 0.002 s with two instances of Julia open. With just acquire() running the write time is less than half. I also looked up the PI 4 process specs, it is a Cortex A72 and it has 32 kB L1 data cache per core and and 512 kB, 1MB, 2MB, or 4MB L2 cache shared among cores depending up configuration.

They are actually two separate programs, one that creates the data and writes it to a file, and the second processes the data once the file is written. My perception was that if I only have one instance of Julia running that less memory is used and thus preferred. That is why the main() function in the first post @spawns acquire() and then runs autorun().
Based on your suggestion I just tried running the acquire() in one julia process in one terminal and the autorun() program in another julia process in a second terminal and there were no buffer overruns. So this seems like the simplest solution. Thanks for the suggestion. In each case I started Julia with julia -t1 --project=..

Blockquote
How have you tried to optimize autorun() so far?
Are there repeated allocations in autorun(), is it possible to reuse that memory?
Are there type instabilities in autorun()?

We tried to take care in how we wrote the code, though are by no means experts on non-allocating code. We have tried to use some tools like JET.jl and Cthulu.jl, but that was a while back.

Blockquote
How are you sharing data between acquire() and autorun() , and could that introduce some unwanted overhead?

As mentioned above it is through writing files. This was for historical reasons, which allows for non Julia programs to acquire data as well as the acquire() program we wrote.

Blockquote
What are you using for FFT, and if it’s FFTW.jl, are you using a pre-planned FFT & reusing its buffers?

We are using a planned FFT with FFTW. Not sure if I remember what or how to reuse its buffers. We do check the data size and if it is the same as last time we keep the same plan if the number of data points are different we recalculate the plan.

Edit:
I tried ThreadPinning and it seems to work about the same as running two separate instances of Julia.

using ThreadPinning
ThreadPinning.@spawnat 1 acquire()

I can also suggest starting julia with

julia -p 1 -t 2,1

I would be interested to know if playing around with these three numbers makes your experience better or worse