Allegro.cc - Online Community

Allegro.cc Forums » Off-Topic Ordeals » Working Thread Pool

This thread is locked; no one can reply to it. rss feed Print
Working Thread Pool
Edgar Reynaldo
Major Reynaldo
May 2007
avatar

Hello peeps!

I created a ThreadPool in response to Alex Glez's thread here : https://www.allegro.cc/forums/thread/617498

You can set the number of threads to use, and add jobs to the queue. It can be paused and resumed. If you need to cancel the work, you can kill it.

You can view and clone the repository from here :

https://github.com/EdgarReynaldo/AllegroThreadPool

Example code is given here : https://www.allegro.cc/forums/thread/617498/1038506#target

The example program main.cpp that comes with the repo lets you test saving bitmaps using multiple threads. I attached a static win32 binary you can try out for yourself. Run the program with -h to see a list of command options. It's meant to be run from the console.

Increasing the number of jobs from 1 to 8 makes everything faster by a factor of between 2 and 4 or more. Approximate time saved is output by the program.

Attached is a zip of the latest repo plus a static win32 binary.
ThreadPool.zip

Enjoy!

Edgar

EDIT
If for some reason you need more than 32 jobs you're welcome to increase that number, but if you use more than 853 threads, you may run into the same problem I did on Windows, where thread 854 hangs. Don't ask me why I don't know. Must be an OS limitation somehow, or else a bug somewhere. More than 8 or 16 threads is probably a waste of time though, as they will be competing for resources.

bamccaig
Member #7,536
July 2006
avatar

I have little experience with threads, let alone thread pools, but I did a quick study of the code to try to understand it and these are my thoughts:

  • While I suppose it's valid to have a Finish() for the pool, it seems counter-intuitive to the general concept of a pool (there may be a separate concept of related jobs that you can finish to ensure all work is done before going on, but the pool itself is generally unrelated I think). That said, EnqueueJob() and FinishJob() (despite the inconsistent naming) seems to accomplish what I would imagine a pool to be so it's just nitpicking (with almost zero experience working with thread pools, at that). Those are protected though so I'm lost again.


  • Having a single mutex to manage the entire mechanism seems somewhat wasteful, meaning that I think it may block parts of the program needlessly. But I've rarely worked with mutexes so if I think too hard about it I'll probably get confused. It seems like a working solution, and best of all simple which means easier to understand. Whether or not it's ideal is debatable, but working is sufficient for now.


  • `Finish` doesn't seem to actually wait for the thread to finish. It appears to add it to a new collection/container, but doesn't seem to do anything else with it.

I'm exhausted and a bit drunk so maybe I'm missing some things (and threads tend to be a brainfuck in general anyway). Just looking to try to understand what's going on, and if I catch a mistake even better.

relpatseht
Member #5,034
September 2004
avatar

grumble grumblelock freegrumble grumbleperfgrumble grumble

Starting base case... 1536000000
Starting AL case..... 1536000000
Starting Rel case.... 1536000000
Base time 6648345614
             |     Sequential | AL Thread Pool | Rel ThreadPool
---------------------------------------------------------------
Total time   |     6648345614 |     1473114439 |      660840522
Per job      |    4.32835e+06 |         959059 |         430235
Overhead     |              0 |    8.68719e+08 |    5.64455e+07
Overhead/job |              0 |     6.2213e+06 |         404232
Overhead %   |              0 |        58.9716 |        8.54147

Peter Hull
Member #1,136
March 2001

I'm prepared to be wrong, but it looks like every job submitted via ThreadPool::AddJob gets its own Thread object, and the pool schedules their start so that no more than a certain number are running at once.

As I understood it, a Thread Pool contains a number of threads that outlive any one job; they process one job then go back to the queue for the next one. The benefit is they avoid the overhead of thread creation/destruction if you have may small jobs.

https://docs.oracle.com/javase/tutorial/essential/concurrency/pools.html
https://msdn.microsoft.com/en-us/library/ms973903.aspx

Edgar Reynaldo
Major Reynaldo
May 2007
avatar

Thanks for the replies! ;)

bamccaig said:

That said, EnqueueJob() and FinishJob() (despite the inconsistent naming) seems to accomplish what I would imagine a pool to be so it's just nitpicking (with almost zero experience working with thread pools, at that). Those are protected though so I'm lost again.

They're protected because they're only supposed to be called by the MasterThreadProc. There's a separate master thread that runs all the jobs.

bamccaig said:

Having a single mutex to manage the entire mechanism seems somewhat wasteful, meaning that I think it may block parts of the program needlessly.

Anything accessible through the public interface that the MasterThread needs to access has to be guarded. There are ways to avoid locking altogether, but I don't know how to do that.

bamccaig said:

Finish doesn't seem to actually wait for the thread to finish. It appears to add it to a new collection/container, but doesn't seem to do anything else with it.

Are you referring to Thread::Finish, ThreadPool::Finish, or ThreadPool::FinishJob? FinishJob removes the worker from the work set, and adds it to the list of complete jobs. The complete_jobs list is to safely allow deletion of finished jobs.

Relpatseht said:

grumble grumblelock freegrumble grumbleperfgrumble grumble

Use your words man!

I don't know how to program it lock free. Never used atomic variables or other such things before.

Relpatseht said:

Starting base case... 1536000000
Starting AL case..... 1536000000
Starting Rel case.... 1536000000
Base time 6648345614
             |     Sequential | AL Thread Pool | Rel ThreadPool
---------------------------------------------------------------
Total time   |     6648345614 |     1473114439 |      660840522
Per job      |    4.32835e+06 |         959059 |         430235
Overhead     |              0 |    8.68719e+08 |    5.64455e+07
Overhead/job |              0 |     6.2213e+06 |         404232
Overhead %   |              0 |        58.9716 |        8.54147

Could you explain what this chart is? I have no idea what it means.

I'm prepared to be wrong, but it looks like every job submitted via ThreadPool::AddJob gets its own Thread object, and the pool schedules their start so that no more than a certain number are running at once.

As I understood it, a Thread Pool contains a number of threads that outlive any one job; they process one job then go back to the queue for the next one. The benefit is they avoid the overhead of thread creation/destruction if you have may small jobs.

A Thread object doesn't have an actual thread until Thread::Start is called. Allegro starts a thread as soon as you call al_create_thread, which means there's a limit on how many you can create. To alleviate this, I only create a thread when it's job is actually scheduled to run.

I could certainly do it that way. It would require a bit of a redesign. I understand what you're saying though. Have a set of working threads and recycle them by sending jobs to them. That's doable. I'd have to think about how to go about it properly though.

Again, thanks for the feedback folks!!! Keep it comin!

relpatseht
Member #5,034
September 2004
avatar

Sorry, I was inebriated slightly. I threw together a lock free thread pool, ran perf comparisons, and pasted the results. The code is all attached to that post.

The important number is overhead percent. O world be perfect scaling (running exactly num threads faster than base). Your thread pool has about 60% overhead, the lock free implementation has about 9%. Sequential is base times, running there jobs in one thread. Everything else is time. Smaller numbers are better.

Edgar Reynaldo
Major Reynaldo
May 2007
avatar

I'm going to rework the pool to recycle threads, and then I'll rerun the tests and see how we do. I expect it to improve, but not surpass your code. I'll remove as many locks as I can.

As for your code, just looking at it makes me C-sick, get it? (Yes I know it's C++ don't be pedantic). It's way over my head. All those different types I've never heard of before. And I claim to know C++ 11. Right.

The comparison may not be fair though, if it's Allegro and pthreads vs C++11 et al. If I made it entirely from other C++11 code, do you think it would compare?

relpatseht
Member #5,034
September 2004
avatar

How you start a thread is completely irrelevant. std::thread, al_create_thread, pthread_create, etc. On Windows, they'll all eventually call CreateThread and then they'll just be another thread to the OS.

The timing code does matter somewhat. std::chrono::high_resolution_clock goes through Windows's QueryPerformanceCounter, which is the highest precision clock available to Windows. At the end of the day, that's all it's doing. Getting a time stamp, comparing time stamps. The values aren't in any units.

The std::atomic_* types just map their ops to atomic intrinsics. At least, for the sizes I use. If you use an atomic of a size bigger than the hardware supports atomics for, then it will fall back to mutexes. See std::atomic_is_lock_free. Be careful with that.

I align the atomic types to 64 because a cache line is typically 64 bytes. Under the hood, a CPU can't synchronize at lower level than a cache line. If two atomics share a cache line, they share contention. Slinging cache lines around between processors isn't free.

In the timing code, the work function is using avx512. Not many processors support that. It melts CPUs, which is why I chose it. Probably need to switch to something else on your end.

Anyway, like I said in some other thread, threading is all about perf, which means it all about removing contention and never locking. If I have time tomorrow, I'll add comments, etc. Then probably extend it with real c++11 so I can have nice syntax like:

#SelectExpand
1std::vector<Foo> bar; 2parallel_for(0, bar.size(), [&](unsigned threadIndex, unsigned barIndex) 3{ 4 // do parallel work... 5}); 6 7parallel_for_each(bar.begin(), bar.end(), [&](unsigned threadIndex, Foo& f) 8{ 9 // do parallel work... 10}); 11 12parallel_for(0, bar.size(), [&](unsigned barIndex) 13{ 14 // do parallel work... 15}); 16 17parallel_for_each(bar.begin(), bar.end(), [&](Foo& f) 18{ 19 // do parallel work... 20});

[Edit]

I started writing how everything works, then I ran out of time/got bored. I got as far as mostly explaining the SPSC queue, which is the only important bit anyway.

...

Simple Lock Free Thread Pool

Background

The most important performance characteristic of a multithreaded algorithm
is scalability. A perfectly scaling algorithm will run NUM_THREADS as fast
as the single threaded version. Poorly scaling algorithms will usually
have a logarithmic graph as number of threads increase. Not to say that's
never useful, but it is wasteful of resources, which usually doesn't go
well with game programming.

The number one murderer of scalability is resource contention. All memory
reads can occur in parallel, but writes will need to be done sequentially,
meaning someone is going to wait.

Mutexes, semaphores, and other threading primitives exist to solve the
problem of deciding who is going to wait and how long they are going to
wait for. This is inherently a bad solution. First of all, these are all
implemented in software, in kernel space. Software is already slow, but
the transition to kernel space (especially in the post SPECTER/MELTDOWN
world) is deathly slow. Second of all, we don't want to wait, so picking
a solution centered around waiting/sleeping seems rather foolish.

In comes lock-free and wait-free algorithms. A lock-free algorithm is
one that completes without any threading primitive locking. A wait-free
algorithm is one that completes without any waiting at all (every
thread in contention for the same resource helps the others finish).
These things are hard to do, but we have some hardware instructions to
help us out.

But first, a small digression. Your code is a lie. The machine code
generated by the compiler is a lie. For decades now, most processors
have had out of order execution capabilities. It means what it sounds
like. The order the opcodes are passed to the processor is not
necessairily the other they are executed. As long as the intent is
preserved, compilers are free to reorder your code however they and
processors will further reorder instructions however they'd like. This
effectively makes lock-free and wait-free algorithms impossible without
fences, which are barriers around which the compiler and the processor
cannot reorder code.

For all intents and purposes, you only need to know about three kinds of
fences. The strongest fence is sequentially consistent. Like it sounds,
sequentially consistent fences/operations are guarenteed to be seen across
all processors in the order they occur in in code. This is a very nice,
strong guarentee, but (on some hardware) it is slow. The two, weaker,
faster fences work together; acquire and release. The relese fence
prevents any reads or writes to memory from being moved after the fence.
Furthermore, any write before the release fence is guarenteed to be
visible to another thread after a corresponding acquire fence. The
acquire fence also prevents memory reads/writes from being moved to before
the fence.

Now, that's all not actually very useful information in practical terms,
since, on x64, all fences are sequentially consistent, but, if you write
code for other platforms (at least ARM, PPC is dead), then using the
acquire and release fences will net you performance wins. Phones are real
things, I guess, so lets use acquire/release fences where we can.

So, that's all great, but there is still one piece missing: atomic
operations. Atomic operations are operations which are guarenteed to be
performed uninterrupted. So another thread won't split your operation
down the middle. For example, something as simple as i++ is actually
three operations. We need to read i's value from memory, modify it by
incrementing, then write the value back to i. If the operation isn't
atomic, then there is the potential for another thread to change
the value of i in memory after it has been read by your thread but
before your thread writes it back to memory. The other thread's
modification would be erased.

The Job Queue

The basis of our lock-free thread pool is the job queue. That's where
all the possibility for contention exists in a thread pool--the mechanism
that passes jobs from the main thread to the pool threads. We're going
to go as simple as possible (which also means fast), so our lock-free
queue will only allow one thread to write to it, and one (different)
thread to read from it. Multiple writing threads will break it.
Multiple reading threads will break it. This will be a very limited queue,
but it has exactly the features we need.

The queue will be implmented as a ring buffer, so the data for the queue
looks like this:

struct job
{
  void (*callback)(int threadIndex, void *userData);
  void *userData;
};

struct sqsc_queue
{
  static const size_t capacity = 31;

  alignas(64) std::atomic_size_t head = 0;
  alignas(64) job ringBuffer[capacity];
  alignas(64) std::atomic_size_t tail = 0;
};

As an aside, everything in the spsc_queue is aligned to 64 because
64 is the typical cache line size. CPUs access memory in cache line
sized chunks. Making memory visible to other cores on the CPU means
copying that memory to their local cache. If multiple atomic values
share the same cahce line, they are effectively sharing their contention
as well, which hurts performance.

As you can see, the ring buffer's data is pretty simple. There is a
head index and a tail index, which are both atomic, and a flat buffer
of memory. As a ring buffer, when we add something we advance the tail
index, and when we remove something, we advance the head index. The
indices are stored as size_t types (where size_t is the platforms standard
width, so uint64_t on a 64 bit processor).

The code to add to the queue is quite simple (read, fast):

bool try_push(spsc_queue *queue, const job &newJob)
{
  size_t curTail = queue->tail.load(std::memory_order_relaxed); // relaxed: no memory barrier
  size_t nextTail = (curTail + 1) & capacity;

  if (nextTail != queue->head.load(std::memory_order_acquire))
  {
    queue->ringBuffer[curTail] = newJob;
    queue->tail.store(nextTail, std::memory_order_release); // release tail write and ringBuffer writer so visible to acquiring threads
    return true;
  }

  return false; // full
}

So to break this down a bit, the function is called "try_push" because
it can fail. If the queue is full, we don't allocate a bigger
queue. That would cost time and add a lot of complexity to the
algorithm. We'll just design whatever is using the queue to
tolerate failure.

The first step is to get the current tail index and determine the
next tail index. Because we've written it in stone that only one
thread will ever write to this queue, we know we are the only
thread that ever writes to the tail index. Thus, we don't need
any memory barrier at all to read from it. And, again, this is a
ring buffer, so if the next tail index would go out of bounds, we
wrap it around to 0.

Next we check if the queue is full, which is true if our next tail
index would overrwrite the current first item in the queue. We
need to put an acquire fence on reading the head index, since
we need to make sure we are looking at the most recent value
written by our reading thread.

So, if our queue is full, we return false, but otherwise, we
first store the new job in the ring buffer. Here's where things
get tricky, and why ordering matters heavily. We write in the new
job to the buffer first. If you note in the try_pop code below,
this value can't be read until the tail pointer is advanced. As
I was mentioning earlier, the compiler lies and the processor lies,
so we have no idea when this write is actually going to occur. Thus,
the storing of the nextTail value requires a release fence. This
release fence prevent the store of newJob from being moved after
the tail store, so we can rest easy knowing that if the the
incremented tail value is seen on the read thread, the write to
the ring buffer has already finished.

That's all there is to writing to the ring buffer. Reading isn't much
more complex:

bool try_pop(spsc_queue *queue, job *outJob)
{
  size_t curHead = queue->head.load(std::memory_order_relaxed); // relaxed: no barriers

  if (curHead == queue->tail.load(std::memory_order_acquire)) // Acquire tail and ringBuffer writes    
    return false; // empty

  *outJob = queue->ringBuffer[curHead];

  queue->head.store((curHead + 1) % capacity, std::memory_order_release); // release head write to other thread
  return true;
}

bamccaig
Member #7,536
July 2006
avatar

Side note: is it by design that the writer uses bitwise-AND, but the reader uses modulus for the wrap around?

relpatseht
Member #5,034
September 2004
avatar

No, I was editing from a template size to 31 (2^n - 1), so it made sense to switch to an and mask, which is marginally more performance. I didn't finish, so the second place wasn't modified.

Was anything I wrote useful? The world of performance multi threading is sorely lacking in the industry, so if it so happens I've explained things in a way that makes sense/there's interest, I'll write more maybe. Though, I've also been wanting to do a write up on a performant rendering pipeline in software from scratch. Knowing what the GPU is doing makes graphics a lot easier/intuitive, in my experience.

bamccaig
Member #7,536
July 2006
avatar

I definitely found it useful, but also it seems like something that is very subject to the inner workings of the hardware it runs on. Which I'm unlikely to understand or keep up on. Certainly I wouldn't be able to implement this myself because I don't understand the technology well enough to back up these assertions if a colleague were to question it. But it's interesting nonetheless. I think where it gets flaky is the alignment to prevent "contention". That sounds like something where it will only work on the machines that behave this way. Would it still work on 32-bit software/hardware? I assume some future 128-bit bus will require 128-bit alignment, but again I don't really understand that fully. If you have more to write then write then I'll definitely read it. I can't promise that I'll be able to actually use it though, but I might learn something anyway (I think I already have).

relpatseht
Member #5,034
September 2004
avatar

The idea of atomic operations is not hardware specific. Any modern hardware will support at least an atomic compare and swap operation as well as memory fences. Just about all hardware now (all processors, mobile or otherwise) support these primitives in a common enough manner for it to be standardized in the STL, as I was using. Just about all 32 bit hardware also supports/supported atomic operations and fences since the advent of multi core processors. Of course, most 32 bit processors only supported atomic up to 32 bits wide. Some 64 bit processors support 128 bit atomics, but I wouldn't build an algorithm reliant on it.

As for 64 bit alignment, that's cache line width, not bus width. This is much less consistent, but is typically 64 bytes. Just about all processors have multiple cache levels. The closer the cache to the core, the faster and smaller it is. Most multi core processors will give each core their own L1 cache, then maybe pairs will share an L2, and the whole processors will share L3. These synchronization primitives are concerned with, as their name suggests, how memory in cache and in ram is synchronized across the processors.

So, if every core had their own L1 cache, and they are all looking at the same memory address, then every core had a unique copy of that memory. If one core updates that memory with no fences or atomic instructions, the other cores won't see the update until it is (eventually, but who knows when) written from cache back to ram them read from ram back to the other cores' cache.

Again, processors can only reason about memory in cache line sized chunks. Even if the operations in memory are atomic with the right fences, the processor still needs to copy that memory around between all the cores' cache at all the correct levels to ensure the update is seen by the reading cores. All that copying around of cache lines isn't free. So you see why keeping atomics on separate cache lines is good for keeping contention down? It prevents false sharing by ensuring an atomic is only in the cache of a core actually using it.

Typing on a phone is hard and I'm sure autocorrect had made a fool of me somewhere. TLDR is this stuff is everywhere and hardware is mostly converged. If you want to make things that go fast, you need to learn it.

bamccaig
Member #7,536
July 2006
avatar

The atomics didn't concern me. Especially the C++ types/library stuff. I'd expect that to work. It's predominantly the alignment which I feel like I cannot rely on or defend. Mind you, I predominantly develop C# applications so professionally this is too low-level to be of use anyway. I doubt it translates directly to "managed" code. But I still love C and C++ and am interested in learning what I can.

Side note: typing on phones sucks. I hate using my phone for anything other than texting and calling (and gaming while I poop). It's terrible at Web stuff.

Edgar Reynaldo
Major Reynaldo
May 2007
avatar

bambams - learn to use the microphone on your phone, it can read speech just fine.

Relpatseht
This is great stuff, please continue. I just haven't had much time to reply, don't think I didn't see your edit or your reply.

What I don't understand are fences and the .load function. I also don't quite understand how you manage to implement an 'spsc' queue using atomic operations.

bamccaig
Member #7,536
July 2006
avatar

bambams - learn to use the microphone on your phone, it can read speech just fine.

Yes, it's quite good. I use it sometimes. But it's not private unless you're in private, and it's a bit slower to get to.

Agreed, the conversation here is awesome right now. Stay on topic for once!

relpatseht
Member #5,034
September 2004
avatar

I realize my earlier characterization of acquire/release being the same as sequentially consistent on x64/x64 was incorrect.

On x86/x64 systems, there are very strong cache coherency rules. A memory write on any core must be visible to all other cores. As a result, the acquire and release fences are only fences the compiler cannot reorder loads and store around respectively. They are implemented via simple MOV instructions. Sequentially consistent, on the other hand, still requires the heavier XCHG instruction. Simplified, only one XCHG instruction is executed at a time across the whole processor (thus, sequentially consistent).

ARM, for instance, doesn't have strong cache coherency rules, so the barriers are actual instructions emit by the compiler to instruct the processor a particular address needs to be made coherent across cores.

As for atomics, they are not a special type. They just instruct the compiler to handle the value specially in that they are never stored in registers. Atomics are always accessed via memory. You don't need to use .load and .store with atomics, you can just use the = operator, but then all operations will be sequentially consistent, which is slower.

For instance, consider the following:

void ThreadProc(void *userData)
{
    uint32_t * const foo = reinterpret_cast<uint32_t*>(userData);

    while(*foo == 0)
        std::this_thread::yield();
}

int main()
{
    uint32_t bar = 0;
    std::thread waitThead(ThreadProc, &bar);

    bar = 1;
    waitThread.join();
}

It is very possible for this code to never terminate. What could easily happen is foo is stored in a register and the register is never updated, making that while loop go forever. Foo here is just a plain variable, it isn't special to the compiler in any way, so there is no reason for the compiler to emit code that says to re-read the value from memory every time. To the compiler, without the atomic instrinsics, there is no such thing as other threads or processes magically updating the memory behind the scenes.
Were foo to be an atomic, however, that lets the compiler know it needs to handle it specially, and always get the latest value.

"But isn't that what the 'volatile' keyword is for?"
Yes and no. On x86/64, again, with the strong cache coherency model, using volatile will probably work if your algorithm is designed to be used with acquire release semantics. However, it's not portably correct. On other architectures, cache coherency requires different instructions. Even x86/64 requires different instructions for sequentially consistent operations. Effectively, this also means volatile would work across all architectures if you are on a single core processor. Single core processors don't even exist anymore though, so that's pretty much a moot point.

As for the SPSC queue, remember, it only works because it's SPSC, single producer, single consumer. There can only be 1 thread calling try_pop, and only 1 thread calling try_push. Every acquire operation is paired up with a release operation. In try_pop when we acquire the tail, we're guaranteed the memory stores up to and including writing nextTail to tail in try_push are finished. Vice-versa is true for head.

Go to: