Help implementing task parallelism in Julia

Dear fellow Julians,

I’m currently working on a project about using computer vision to monitor vast areas for wild fires. We currently have a working version of the software implemented in Python. The code is running on our server, which receives video streams from various cameras and feed them to our fire detection code, which is implemented using the YOLO (You Only Look Once) algorithm.

One of the goals of the project is to scale this to hundreds or even thousands of cameras monitoring many areas of interest. In order to do that, I’m working on a concurrent version of the algorithm in Julia. Since I’m pretty new to concurrent / parallel programming, I’m not sure I’m approaching it the best way. Thus, I would like to share what I have right now in the hope I could get some useful feedback.

After reading the Parallel Computing section of the Julia manual, this is what I came up with so far:

Functions:

Function Description
poll_camera() polls a particular camera
detect_features() detect features on a single video frame
notify_detection() send notification to the appropriate service

Channels:

Channel Description
frame_channel Used by the poll_camera() function to send frames to the detect_features() function
notifier_channel Used by the detect_features() function to send yolo results to the notify_detection() function

The system actually has many more functions besides the ones listed above, but for the sake of this message the ones above are enough.

As far as I understand, my scenario is one of “task parallelism”, as opposed to “data parallelism”. The code is shown below:

# Channels
const frame_channel    = Channel{Tuple}(128)
const notifier_channel = Channel{Tuple}(128)

function poll_camera(camera_id)

	# Grab a frame from the camera stream
	strm = VideoIO.openvideo(camera_id)
	
	while !eof(strm)
	
		read!(strm, video_frame)

		# Feed the read frame into the channel 'frame_channel'
		put!(frame_channel, video_frame)

	end

function detect_features()

	# Loop over items in 'frame_channel'
	for video_frame in frame_channel

		# Run the model
		yolo_results = yolo_model(...)

		# If a detection occurs in the current frame, we send the yolo results to the notify_detection() function
		if !isempty(yolo_results)

			put!(notifier_channel, (video_frame, yolo_results))

		end

	end

end

function notify_detection()

	# Loop over items in 'notifier_channel'
	for item in notifier_channel
	
		# Code comes here...

	end
end

After having defined the functions above, I spawn multiple tasks for each of them as shown below:

# Spawn 'n_tasks_1' poll_cameras() tasks
for i in 1:n_tasks_1
	@spawn poll_cameras()
end

# Spawn 'n_tasks_2' detect_features() tasks
for i in 1:n_tasks_2
	@spawn detect_features()
end

# Spawn 'n_tasks_3' notify_detection() tasks
for i in 1:n_tasks_3
	@spawn notify_detection()
end

When searching this forum for parallel programing posts, I stumbled upon the following tutorial written by @tkf:

https://juliafolds.github.io/data-parallelism/tutorials/concurrency-patterns/

Reading this tutorial made me wonder if I’m doing the right thing. Of the patterns listed in the tutorial, it seems to me that what I’m doing is most similar to the “Task farm” pattern, although I’m not sure (I noticed, for example, that my implementation is missing the @sync construct in the outer loop).

I would really appreciate if someone could give me any feedback regaring my implementation. Am I going in the right direction? Are there any obvious mistakes in the way I’m approaching this? Any feedback and/or pointers would be greatly appreciated.

Thanks a lot!

2 Likes

Do you have a clear idea where the bottlenecks will be? How much IO blocking is involved with reading from camera streams and writing notifications?

I suppose the simplest approach would be to use a single thread per camera and read, detect, and notify in a loop that blocks on reading from each camera. If there’s too much overhead from running that many threads, then using a few working threads or processes with something like channels seems appropriate.

The high-level description looks good to me. You’d need to wait for something in the script, though (otherwise, julia immediately exits the script). The best way to do it is to wrap all three spawn loops in a single @sync begin ... end. An important benefit is that this makes sure that you notice errors from these tasks.

1 Like

I am going to be really dumb here. How far do you want this solution to scale?
Really dumb thought - for each camera run a separate Julia script. Have the actual image recognition part running in Daemon mode
[ANN] DaemonMode.jl: a package to run faster scripts in Julia

Thanks for all the answers!

The idea to have a separate Julia script, or a separate thread, per camera is tempting because I think that would simplify things quite a bit. In particular, I think this would avoid having to deal with shared memory among the threads.

I know most cameras output 2 frames/second, whereas some of them output 15 fps. These rates seem low but they’re actually enough for our purposes (fire detection). Also, these rates are not defined by the cameras themselves, but rather by some kind of streaming service in our server (https://zoneminder.com/) that receive the various cameras’ streams and normalize them to a common format. I also know that the current Python detection code process about 35 frames/second. The detection itself is performed on a GPU using Cuda. When scaling this to a higher number of cameras, I know the GPU will become a bottleneck and then I suspect I may have to use one of @tkf’s concurrency patterns for controlled parallelisms:

https://juliafolds.github.io/data-parallelism/tutorials/concurrency-patterns/

However, since @tkf seems to think there’s nothing too wrong with my current approach, I would like to keep working on it. In that case, one thing I’ll need to do is to deal with shared data. For example, I need to buffer the last 30 seconds (or whatever) worth of frames from all cameras in order to be able to create a video, for visualization purposes, in a case a detection happens in a camera.

What’s the best way of doing that in the general case (i.e., when I have a shared matrix or dictionary)? After reading this section of the manual, I though I would have to use locks. However, in this introduction, @tkf warns that locks and atomics should be avoided. However, I’m not sure this advice in only valid for data parallelism scenarios or also for task parallelism (which I understand to be my case).

1 Like

Remember that you can have systems with more than 1 GPU!
Also with Nvidia MIG you can partition GPUs into slices.

I work for Dell and we have a very interesting platform if you want to store a high amount of video data and have GPUs attached locally to the storage. I don’t want this to turn into a puff for my employer - please contact me by a message?

Hi John,

As far as I know, our server has one Nvidia Quadro P5000 GPU.

I’m not familiar at all with GPU programming (neither in Julia nor in any other language). Also, I’m not really familiar with the Python code that is being used at the moment, but I do know from other people in the project that they are running multiple instances of the detection code at once on the GPU. So maybe they’re doing the GPU partitioning you mentioned.

This is a research project in my university in Brazil and, because of all the fires everywhere (Amazon, Cerrado, Pantanal, etc.), it’s been attracting quite some attention from the media. The Python code we have right now works fine, but we’re having some problems with its multi-threaded implementation. Julia’s multi-threading model just seems superior to Python’s.

Channel-based design is good for something like this, but it’s based on a bit nuanced reasoning. Channel (queue) can become a bottleneck with high-contention and so I usually recommend against it when you can use the divide-and-conquer pattern. It’s also possible to introduce deadlock if you are not careful (e.g., there’s some feedback loop). However, Channel is great in terms of hackability since you can isolate parallelizable components and tweak each part without breaking others. It’s also easy to tweak the scheduling specifics. There’s also RemoteChannel in Distribtued.jl so scaling it out with adding multiple machines is conceptually straightforward. This can also help even with a single machine if GC becomes the bottleneck.

If all tasks are updating the matrix and dictionary all the time, it’s a bad idea. But if they are accessed somewhat rarely, lock is totally OK. I’m also working on concurrent data structures like lock-free ConcurrentDict which can be useful for sharing data efficiently. But concurrent data sharing is tricky to do it correctly and remembering “Don’t communicate by sharing memory, share memory by communicating.” is always useful.

1 Like

Hi @tkf,

Thank you so much for all your input on this. I’m a total beginner on parallel programming, and the more I talk to you the more I realize how ignorant I am on this subject :frowning:

As for the data sharing part, I’m afraid some of my data structures would have to be updated pretty much all the time. In that case, since locks are a bad idea, I guess what you recommend is the ConcurrentCollections.jl package you mentioned?

I have also thought more about having one thread per camera for as much work as possible, as suggested by @robsmith11. This could significantly reduce my need for shared data structures. However, if I use one thread per camera, I know I won’t be able to have the detection function running in every thread because the YOLO model takes too much memory and would saturate the GPU. In that case, I guess I may need to use your “task farm” pattern.

I got quite intrigued by the “Don’t communicate by sharing memory, share memory by communicating.” proverb. Is there any reading material about this specific proverb that you recommend?

I really appreciate all your help with this. Domo arigato gozaimasu!

The difficulty of using something like ConcurrentDict is that it’s not possible to get the “whole picture” of the collection. For a dictionary maybe it’s OK but for a matrix, I don’t think element-wise thread-safe access is enough (typically).

There are also tricks to avoid accessing shared data all the time. For example, if there is only one task updating the matrix, you can use a simple pattern like this to avoid holding the lock for a long period of time:

const LOCK = ReentrantLock()
const MATRIX = Ref(zero(n, n))

function getmatrix()  # called from many tasks
    return lock(LOCK) do
        MATRIX[]
    end
end

function update_matrix()  # called from only one task
    newmatrix = copy(getmatrix())
    compute!(newmatrix)  # lengthy computation
    lock(LOCK) do
        MATRIX[] = newmatrix
    end
end

Anyway, I think it’s better to start with Channel- and/or lock-based correct program and measure what really is the bottleneck. If you find out something, you can reduce the critical section by, e.g., copying things into task-local variables as illustrated in the example above.

Good question… Off the top of my head, I don’t remember if there is a good reading material. I think I’ve learned the concept by watching a bunch of talks by Rob Pike online. If you click the line in the proverb page, it jumps to the part of the talk where Rob Pike talks about the concept for a few minutes.

どういたしまして! :slight_smile:

That sounds like

I have thought more about this and I think I may be able to solve my data sharing problem if I use more than one channel to communicate between a pair of functions. To be exact, I would need as many channels as there are cameras in my system.

So, instead of doing

const frame_channel = Channel{Tuple}(128)

I would do something like this:

channel_list = Vector(undef, n_cameras)

for i in 1:n_cameras

	channel_list[i] = Channel{Tuple}(128)

end

However,the use of the const construct in the original definition makes me wonder if I should use a tuple or an SVector instead of a normal vector to store the various channels. Any thoughts?