Allegro.cc - Online Community

Allegro.cc Forums » Programming Questions » Synchronizing Threads

This thread is locked; no one can reply to it. rss feed Print
Synchronizing Threads
tiesuerioto
Member #16,863
June 2018

Hello, I read the wiki tutorial on threads, and I think I understood it, but I'm having trouble synchronizing threads. I want to have two threads perform calculations and have the results managed in main(). The way I tried was to have the threads wait for main:

#SelectExpand
1al_lock_mutex(data->mutex); 2while (!data->ready_main){ 3 al_wait_cond(data->cond, data->mutex); 4} 5data->ready_main = false; 6al_unlock_mutex(data->mutex);

then have main signal them and wait:

#SelectExpand
1al_lock_mutex(data->mutex); 2data->ready_main = true; 3al_broadcast_cond(data->cond); 4al_unlock_mutex(data->mutex); 5 6al_lock_mutex(data->mutex); 7while (!data->ready_thread){ 8 al_wait_cond(data->cond, data->mutex); 9} 10data->ready_thread = false; 11al_unlock_mutex(data->mutex);

then have the threads signal main after they're done:

#SelectExpand
1al_lock_mutex(data->mutex); 2calculations(); 3data->ready_thread = true; 4al_broadcast_cond(data->cond); 5al_unlock_mutex(data->mutex);

Is this correct? When I try a more complicated version of this, where there's more going on, it crashes after a while, but for simpler ones, it seems not to have problems (though I haven't tested those as much). Still, I'm guessing at some point, it crashes because the threads and main both get stuck waiting for each other or something.

Edgar Reynaldo
Member #8,592
May 2007
avatar

Do yourself a favor and use a queue instead. It's already synchronized for you.

Otherwise, threads shouldn't signal main. Main should signal threads. Main runs everything. Threads are there to perform work and to stop when requested.

Example (psuedo code) :

Worker threads :

#SelectExpand
1void* thread_main(ALLEGRO_THREAD* t , void* arg) { 2 MyData* dat = (MyData*)arg; 3 4 /// Calculate digits of pi or something 5 while (!al_thread_should_stop(t)) { 6 /// calc a little more 7 dat->CalcNext(); 8 } 9 10 ALLEGRO_USER_EVENT ev; 11 ev.type = MY_SPECIAL_SIGNAL; 12 ev.user.data1 = (uintptr_t)dat->data; 13 al_emit_user_event(dat-queue , dat->ev_source , &ev); 14 15 return arg; 16}

Then main runs as many threads as it wants, and simply waits on an event queue for all the threads to finish.

Makes your life much simpler my friend.

EDIT

Some warnings. When you use a condition variable, there's a lot to worry about that could go wrong. For instance, say you have thread a and thread main. When thread a signals the condition, main may or may not be waiting on it. If main is not already waiting on the condition, then it will be missed. Then main will go on to wait for a condition that will never be signaled.

Elias
Member #358
May 2000

I agree with Edgar, in many cases some higher level constructs like event queues can be easier than very low-level condition variables and mutexes.

Having said that, it looks like you use the same mutex and condition variable to wait in each worker thread as well as the main thread. Without seeing more code this could be fine - but usually you want two separate condition variables. That is - there is no need to wake up all your sibling threads when you tell the main thread you are done.

Further, it seems that you call your calculations() function while the mutex is locked. That completely defeats the purpose - it means while a thread is blocked doing calculations neither the main thread nor any other thread can do anything - in particular once the first thread starts doing work no other thread can start doing work until the first one has completed. The point of threads is that calculations can be done in parallel, so you want to do them outside of any lock.

--
"Either help out or stop whining" - Evert

Edgar Reynaldo
Member #8,592
May 2007
avatar

Some notes on multi threading and sharing data. If it's possible for more than one thread to access the same data at the same time, then you need to use a mutex to prevent race conditions. However then you have to be aware of other issues like deadlock, which happens when one thread is waiting on another thread which is waiting on the first.

I made utility classes in my eagle library for this purpose. It simplifies threads, mutexes, and event handling. All the threads in my library (like the window manager thread, event handler threads, and the input thread) use event queues to manage signalling each other. That way there's never a missed event. All my threads run without causing any issues. I have thread safe logging as well as many other features. Feel free to peruse the source code for ideas if you like. You can find it on GitHub here : https://github.com/EdgarReynaldo/EagleGUI

tiesuerioto
Member #16,863
June 2018

@Edgar Reynaldo
Okay, so if I want to have the threads loop, would it look like this? (Psuedo code)

#SelectExpand
1#define MY_SPECIAL_SIGNAL ALLEGRO_GET_EVENT_TYPE('M','I','N','E') 2 3class MyData{ 4 public: 5 ALLEGRO_MUTEX *mutex; 6 ALLEGRO_COND *cond; 7 ALLEGRO_EVENT_SOURCE event_source; 8 int result; 9 bool ready_M; 10 bool finish; 11 MyData(){ 12 mutex = al_create_mutex(); 13 cond = al_create_cond(); 14 al_init_user_event_source(&event_source); 15 result = 0; 16 ready_M = false; 17 finish = false; 18 } 19//... 20 } 21};

Main:

#SelectExpand
1 ALLEGRO_EVENT_QUEUE *event_queue = NULL; 2 ALLEGRO_THREAD *thread_1 = NULL; 3 ALLEGRO_THREAD *thread_2 = NULL; 4//... 5 MyData data1, data2; 6 al_register_event_source(event_queue, &data1.event_source); 7 al_register_event_source(event_queue, &data2.event_source); 8 thread_1 = al_create_thread(thread_main, &data1); 9 al_start_thread(thread_1); 10 thread_2 = al_create_thread(thread_main, &data2); 11 al_start_thread(thread_2); 12 13 int result1, result2; 14 15 while(1) 16 { 17 ALLEGRO_EVENT ev; 18 al_wait_for_event(event_queue, &ev); 19 20 if (ev.type == SOME_INPUT){ 21 send_input_to_thread(&data1, input); 22 send_input_to_thread(&data2, input); 23 } 24 else if(ev.type == MY_SPECIAL_SIGNAL){ 25 result1 = ev.user.data1; 26 result2 = ev.user.data2; 27 } 28//...

Threads:

#SelectExpand
1 MyData *data = (MyData*) arg; 2 3 while(1){ 4 5 al_lock_mutex(data->mutex); 6 while (!data->ready_M){ 7 al_wait_cond(data->cond, data->mutex); 8 } 9 data->ready_M = false; 10 al_unlock_mutex(data->mutex); 11 12 al_lock_mutex(data->mutex); 13 data->result = calculation(input); 14 al_unlock_mutex(data->mutex); 15 16 ALLEGRO_EVENT ev; 17 18 ev.user.type = MY_SPECIAL_SIGNAL; 19 ev.user.data1 = (intptr_t)data->result; 20 al_emit_user_event(&data->event_source, &ev, NULL);

As for your library, I might try it out if I'm unable to figure this out, but I want to see if I can learn to get this to work before that if possible.

@Elias
It wasn't apparent from my first post, but there are supposed to be two pieces of data, so each thread shouldn't interfere with the other's work (I think).

Edgar Reynaldo
Member #8,592
May 2007
avatar

That's pretty good. I might do a few things a little different though.

1. A single thread does a single task. Generally they do their work and then stop. If you really need them to loop, then you need to check for exit conditions. Your code is missing return statements from the thread functions, but that's probably because it was just pseudocode.

2. Looping can be easily achieved by starting another thread.

3. The wait and condition variables in your code are unnecessary. Just don't start the thread until it's ready to do work.

But otherwise, your code is pretty much what I would do. The only things I would do different is get rid of the cond var, and start the threads on the SOME_INPUT signal.

tiesuerioto
Member #16,863
June 2018

So, just to make sure, I should write a separate thread to handle each relevant event, and instead of looping them, I should create and start new threads each time?

relpatseht
Member #5,034
September 2004
avatar

I see a lot of bad advice. Functional, but ineffective.

Don't use conditional variables. Don't use mutexes. Don't give a thread a single task. Don't spin up an arbitrary number of threads.

Starting a thread has a significantly variable cost, from a few microseconds to multiple milliseconds depending on a variety of factors which aren't all under your control.
Conditional variables and mutexes, even when correctly placed underneath atomic variables, incur a context switch on any contention. A context switch has to record the full register state and switch to/from OS mode. They have a high cost and will destroy any perf gains in a multi-threaded environment.
Similarly, an arbitrary number of threads will likely incur more context switches. Your machine only has a fixed number of logical cores (typically core count * 2 because of hyper threading, but retrievable via std::thread::hardware_concurrency()), having more than this number of threads running is going to harm performance, rather than help, as the scheduler will try to give each equal time. Every time it needs to switch which thread is running on a core, you get a context switch. The exception to this rule is if the thread is going to block (typically on I/O (network, file system, etc)). While a thread is blocking, it is put to sleep while whatever operation it is waiting in is streaming in. So do have an arbitrarily large number of threads opening/reading from files, for example.

To properly use multiple threads, first you want a decent thread pool. At minimum, a thread pool should start hardware_concurrency() threads and have each of them spin on a task queue (that had better be single producer, single consumer, lock free). There should only be one thread pool in the application that every system shares. The systems can post tasks which will be added to the task queues (round robin scheduling is the simplest). Tasks should be as large as possible.
If you want to go further, you can make your thread pool monitor CPU usage etc and start up more/shutdown threads to maximize CPU time.

For data sharing between threads, the best answer is to not do it. Organize your data/tasks such that there won't be any contention. If you need to pass data between threads, use memory barriers to ensure the data is visible to the other thread (flushed from cache). If you need multiple threads to write to data simultaneously, use an atomic (which can probably only be up to 64 bit). If you can't do any of these, then you need to use a mutex (or, preferably, redesign your algorithm).

A single producer, single consumer lock free queue is a queue (typically ring buffer) which only one thread is allowed to write to (the producer) and one thread is allowed to read from (the consumer) which uses memory fences rather than mutexes to protect against race conditions (no locks, lock free). The reason you want a single producer, single consumer is they can be made much more efficiently than multi producer/multi consumer, or any combination of those words. This queue is going to be passing tasks to your threads, you need that to happen as fast as possible, because you need your threads to be fed. Here's a simple implementation:

#SelectExpand
1template<class T, size_t N> 2class spsc_queue 3{ 4 private: 5 static_assert(std::is_trivial<T>, "Only supports trivial types."); 6 static const size_t capacity = N + 1; 7 8 T ringBuffer[capacity]; 9 alignas(64) std::atomic_size_t head; // Put on separate cache line as these are flushed by atomic ops 10 alignas(64) std::atomic_size_t tail; 11 12 public: 13 spsc_queue() : head(0), tail(0){} 14 15 bool try_push(const T& val) 16 { 17 size_t curTail = tail.load(std::memory_order_relaxed); // relaxed: no memory barrier 18 size_t nextTail = (curTail + 1) % capacity; 19 20 if(nextTail != head.load(std::memory_order_acquire)) 21 { 22 ringBuffer[curTail] = val; 23 tail.store(nextTail, std::memory_order_relaxed); // can be relaxed since followed by thread fence, otherwise, need release 24 std::atomic_thread_fence(std::memory_order_release); // release tail write and ringBuffer writer so visible to acquiring threads 25 return true; 26 } 27 28 return false; // full 29 } 30 31 bool try_pop(T *val) 32 { 33 size_t curHead = head.load(std::memory_order_relaxed); // relaxed: no barriers 34 35 std::atomic_thread_fence(std::memory_order_acquire); // Acquire tail and ringBuffer writes 36 if(curHead == tail.load(std::memory_order_relaxed)) // can be relaxed since preceeded by thread fence, otherwise, need acquire 37 return false; // empty 38 39 *item = ringBuffer[curHead]; 40 41 head.store((curHead + 1) % capacity, std::memory_order_release); // release head write to other thread 42 return true; 43 } 44};

Concurrency is hard. It's easy to throw threads and mutexes at a problem, but if you profile your results, you'll see the numbers don't add up and moving from single threaded to using all 36 cores of your processor (you lucky dog) only gave you a 2-3x performance improvement. Take the time to learn this stuff. It's very rarely done well.

Edgar Reynaldo
Member #8,592
May 2007
avatar

Relpatseht said:

I see a lot of bad advice. Functional, but ineffective.

Don't use conditional variables. Don't use mutexes. Don't give a thread a single task. Don't spin up an arbitrary number of threads.

I see a lot of bad advice too. :/ You're taking what I said out of context in addition to that.

I agree that you shouldn't use condition vars, if only because they are not fool proof, and it's easy to shoot yourself in the foot using one.

DO use mutexes, WHEN appropriate. Atomic operations are better, but not always possible to use. When data is shared between threads, and there is the possibility of more than one thread writing to that shared data or a thread reading during a write, then you MUST use a mutex.

A thread by its very nature is suited only for a single task at a time. That's not to say you can't re-task it, or give it another task when its finished with the previous one.

I never said to use an arbitrary number of threads. Use as many as you need, observing the limits of the OS and CPU.

Relpatseht said:

a thread pool should start hardware_concurrency() threads and have each of them spin on a task queue

Perhaps you meant block. You don't ever want your threads to spin when they don't have anything to do. That's just 100% CPU for nothing.

To properly use multiple threads, first you want a decent thread pool.
...
The systems can post tasks which will be added to the task queues (round robin scheduling is the simplest). Tasks should be as large as possible.
If you want to go further, you can make your thread pool monitor CPU usage etc and start up more/shutdown threads to maximize CPU time.

Why don't you just re-implement the CPU all over again? Task scheduling, thread pools, all this is taking something that should be simple and making it more complicated than it needs to be. A pool can be replaced by an array. For each cpu you will always get the same number of threads, so there's no point implementing a dynamically sized set of threads. Your cpu will schedule the task for you as soon as you start them. All you need to do is keep track of which threads are free to do work, and assign an available thread when necessary.

Relpatseht said:

For data sharing between threads, the best answer is to not do it.

The answer is to avoid it as much as possible, not to never do it.

I have multiple examples of data that needs to be shared between threads. I have a Window Manager thread that keeps track of which window is active. At any time from any thread, this data needs to be read. At any time from any thread the active window may be written to. To properly prevent data races, a mutex is used. Maybe I could have gotten away with an atomic data type, but that doesn't make mutexes pointless or bad design.

I have an input thread that reads all input and queues it for later. The queues are protected by a mutex to prevent data races. Yet another use case for mutexes.

#SelectExpand
1void* A5InputThreadProcess(EagleThread* thread , void* input_handler) { 2 EagleInputHandler* handler = (EagleInputHandler*)input_handler; 3 Allegro5InputHandler* a5_input_handler = dynamic_cast<Allegro5InputHandler*>(handler); 4 5 if (!a5_input_handler) { 6 throw EagleException("A5InputThreadProcess : Input handler is invalid."); 7 } 8 9 while (!thread->ShouldStop()) { 10 ALLEGRO_EVENT ev; 11 al_wait_for_event(a5_input_handler->input_queue , &ev); 12 13 if (ev.any.source == &a5_input_handler->input_extra_event_source && 14 ev.type == EAGLE_EVENT_USER_START && 15 A5INPUT_THREAD_SHOULD_STOP == (A5INPUT_THREAD_MESSAGE)ev.user.data1) { 16 break; 17 } 18 EagleEvent ee = GetEagleInputEvent(ev); 19 20 ee.window = Eagle::EagleLibrary::System("Allegro5")->GetWindowManager()->GetActiveWindow();///Allegro5GraphicsContext::GetActiveWindow(); 21 ee.source = a5_input_handler; 22 23 EagleInfo() << "Input Event #" << ee.type << " received." << std::endl; 24 25 if (IsKeyboardEvent(ee)) { 26 a5_input_handler->keyboard_event_handler.RespondToEvent(ee , thread); 27 a5_input_handler->keyboard_event_handler.TakeNextEvent(thread);/// discard event after emitting 28 } 29 else if (IsMouseEvent(ee)) { 30 a5_input_handler->mouse_event_handler.RespondToEvent(ee , thread); 31 a5_input_handler->mouse_event_handler.TakeNextEvent(thread);/// discard event after emitting 32 } 33 else if (IsTouchEvent(ee)) { 34 a5_input_handler->touch_event_handler.RespondToEvent(ee , thread); 35 a5_input_handler->touch_event_handler.TakeNextEvent(thread);/// discard event after emitting 36 } 37 else if (IsJoystickEvent(ee)) { 38 a5_input_handler->joystick_event_handler.RespondToEvent(ee , thread); 39 a5_input_handler->joystick_event_handler.TakeNextEvent(thread);/// discard event after emitting 40 } 41 } 42 return input_handler; 43}

Relpahtseht,

I think we both have very different use cases for our threads. Most of what you said makes sense, but doesn't apply very well to my situation. Likewise, I gave sound advice as well, but perhaps only for my situation.

relpatseht
Member #5,034
September 2004
avatar

Perhaps you meant block. You don't ever want your threads to spin when they don't have anything to do. That's just 100% CPU for nothing.

I meant spin. There's no way to communicate a block without a mutex/conditional var etc. That said, a spin doesn't mean 100% CPU (although, if you're not planning to use 100% CPU, why are you threading to begin with?)

#SelectExpand
1void ThreadTaskExecutor(void *userData) 2{ 3 spsc_queue<TaskFn> *taskQueue = reintperpret_cast<spsc_queue<TaskFn>*>(userData); 4 TaskFn task; 5 6 for(;;) 7 { 8 while(taskQueue->try_pop(&task)) 9 task(); 10 11 // Our task queue is empty. Spin on yields until something shows up 12 do { 13 std::thread::yield(); 14 } while(taskQueue->empty()); 15 } 16}

It's not hard to see how such a system could then be extended without using any locking mechanisms to shutting down threads that aren't in use for a while, potentially dropping below hardware_concurrency. Something somewhat similar could be done on the task management thread to spin up more threads as necessary for full CPU utilization. Though, this setup is designed for applications wanting to use 100% cpu, which means erring on the side of too much, rather than to little. If you want to minimize CPU use, use a different threading system (or, preferably, no threading at all).

Quote:

A pool can be replaced by an array. For each cpu you will always get the same number of threads, so there's no point implementing a dynamically sized set of threads. Your cpu will schedule the task for you as soon as you start them. All you need to do is keep track of which threads are free to do work, and assign an available thread when necessary.

I think you misunderstood me. A basic thread pool is just an array of hardware_concurrency threads, and you keep track of which is free to do work with an (spsc, lock free) task queue, because that removes as much contention as possible between the thread assigning tasks and the threads executing tasks.
That said, there are cases where you want more threads than logical cores. As I mentioned, I/O operations will cause threads to block. At that point, you're trying to maximize your I/O bandwidth, not CPU usage. This is a more extreme case, granted, but if you're trying to load a bunch of files, likely from different sources (some over the network, some from different local disks, etc), then you'll almost certainly need more than logical core count threads to cap out your machines I/O pipes. Anytime you're handing tasks to your thread pool that can potentially block, there's the potential for needing more threads.
That said, if you're making a game, you should first preprocess your assets into one large, contiguous file, so you can load them as fast as possible from a single thread. Remove all the waits associated with opening/closing files, potential for disk seeks, etc.

Quote:

I have a Window Manager thread that keeps track of which window is active. At any time from any thread, this data needs to be read. At any time from any thread the active window may be written to. To properly prevent data races, a mutex is used. Maybe I could have gotten away with an atomic data type, but that doesn't make mutexes pointless or bad design.

Using a mutex is bad design because they are slow with no way to recover the lost time elsewhere. They are the easy way out of a difficult problem; the the whole application suffers as a result. You can just about always design a system so they aren't necessary, the cases you mentioned are no exception.

This is engineering, we make bad design decision all the time. They are trade offs. We have limited resources, so we measure costs and make decisions as a result. These decisions can be to make bad design choices, and that doesn't make them bad decisions.

tiesuerioto
Member #16,863
June 2018

@relpatseht
I had a feeling starting threads constantly would be costly. Having said that, I think the method you provided might be more complex than necessary for a relatively simple game on a machine without many cores.

I've been trying to make it so no conditions or mutexes are used. The simplest way I can think of doing this without going outside of the Allegro library would be to use events to communicate between main and the threads.

relpatseht
Member #5,034
September 2004
avatar

There are plenty of thread pool libraries out there. TBB is common, as is PPL. A queue as I provided is also a great way of passing information between threads.

Something to keep in mind is the default crt is single threaded. If you allocate regularly (ie: use cpp containers) you'll notice a lot of time waiting on mutexes inside of new. TBB provides a decent multi threaded allocator, but a custom scheme is best.

If you want a simpler threading model, such as a thread for each system, I recommend having copies of the data relevant for each system, and syncing that data once per frame with a simpler pointer swap (which can be done with atomics). This means all your systems are a frame behind. That's acceptable for most things, but probably not input, so you'll want some mitigation there.

tiesuerioto
Member #16,863
June 2018

I took another look at the queue (I assumed line 5 is supposed to be std::is_trivial<T>::value, and line 39 is supposed to be *val), but it still seems like it might be overkill for what I'm trying to do.

To go back to the best I could come up with, it's not that different from what I wrote before.

#SelectExpand
1#define MY_SPECIAL_SIGNAL ALLEGRO_GET_EVENT_TYPE('M','I','N','E') 2 3class MyData{ 4 public: 5 ALLEGRO_EVENT_QUEUE *event_queue; 6 ALLEGRO_EVENT_SOURCE event_source; 7 int result; 8 9 MyData(ALLEGRO_EVENT_SOURCE &esource){ 10 event_queue = al_create_event_queue(); 11 al_init_user_event_source(&event_source); 12 al_register_event_source(event_queue, &esource); 13 result = 0; 14 } 15//...

Main:

#SelectExpand
1 ALLEGRO_EVENT_SOURCE event_source; 2 ALLEGRO_EVENT_QUEUE *event_queue = NULL; 3 ALLEGRO_THREAD *thread_1 = NULL; 4 ALLEGRO_THREAD *thread_2 = NULL; 5//... 6 al_init_user_event_source(&event_source); 7 MyData data1(event_source); 8 MyData data2(event_source); 9 10 thread_1 = al_create_thread(thread_main, &data1); 11 thread_2 = al_create_thread(thread_main, &data2); 12 al_start_thread(thread_1); 13 al_start_thread(thread_2); 14 15 al_register_event_source(event_queue, &data1.event_source); 16 al_register_event_source(event_queue, &data2.event_source); 17 18 int result1, result2; 19 20 while(1) 21 { 22 ALLEGRO_EVENT ev; 23 al_wait_for_event(event_queue, &ev); 24 25 if(ev.type == SOME_INPUT) { 26 ALLEGRO_EVENT ev_input; 27 ev_input.user.data1 = input; 28 ev_input.user.data2 = button; 29 al_emit_user_event(&event_source, &ev_input, NULL); 30 else if(ev.type == MY_SPECIAL_SIGNAL){ 31 result1 = ev.user.data1; 32//...

Threads:

#SelectExpand
1 MyData *data = (MyData*) arg; 2 3 while(1){ 4 5 ALLEGRO_EVENT ev; 6 7 al_wait_for_event(data->event_queue, &ev); 8 9 if (ev.user.data1 == input) 10 data->result = calculation(ev.user.data2); 11 12 ALLEGRO_EVENT ev; 13 14 ev.user.type = MY_SPECIAL_SIGNAL; 15 ev.user.data1 = data->result; 16 al_emit_user_event(&data->event_source, &ev, NULL);

Elias
Member #358
May 2000

That looks like it will work - but note that events never expire. So when you call al_emit_user_event on line 28 of the main thread both threads will see that event in their queue. You will have to add some code to check if another thread started working on it already (or even completed it already).

--
"Either help out or stop whining" - Evert

Go to: