cleanup and small improvements to JobSystem

- add missing includes
- wake-up one or several threads based on the number of jobs available
This commit is contained in:
Mathias Agopian
2024-04-15 15:42:59 -07:00
committed by Mathias Agopian
parent 785592293c
commit 8af2d7512d
2 changed files with 73 additions and 55 deletions

View File

@@ -31,12 +31,25 @@ static constexpr bool DEBUG_FINISH_HANGS = false;
#include <utils/compiler.h>
#include <utils/Log.h>
#include <utils/memalign.h>
#include <utils/ostream.h>
#include <utils/Panic.h>
#include <utils/Systrace.h>
#include <algorithm>
#include <atomic>
#include <chrono>
#include <cmath>
#include <condition_variable>
#include <iterator>
#include <mutex>
#include <random>
#include <thread>
#include <assert.h>
#include <math.h>
#include <stddef.h>
#include <stdint.h>
#if defined(WIN32)
# define NOMINMAX
@@ -141,10 +154,10 @@ JobSystem::JobSystem(const size_t userThreadCount, const size_t adoptableThreads
{
SYSTRACE_ENABLE();
int threadPoolCount = userThreadCount;
unsigned int threadPoolCount = userThreadCount;
if (threadPoolCount == 0) {
// default value, system dependant
int hwThreads = std::thread::hardware_concurrency();
unsigned int hwThreads = std::thread::hardware_concurrency();
if (UTILS_HAS_HYPER_THREADING) {
// For now we avoid using HT, this simplifies profiling.
// TODO: figure-out what to do with Hyper-threading
@@ -155,9 +168,9 @@ JobSystem::JobSystem(const size_t userThreadCount, const size_t adoptableThreads
threadPoolCount = hwThreads - 1;
}
// make sure we have at least one thread in the thread pool
threadPoolCount = std::max(1, threadPoolCount);
threadPoolCount = std::max(1u, threadPoolCount);
// and also limit the pool to 32 threads
threadPoolCount = std::min(UTILS_HAS_THREADING ? 32 : 0, threadPoolCount);
threadPoolCount = std::min(UTILS_HAS_THREADING ? 32u : 0u, threadPoolCount);
mThreadStates = aligned_vector<ThreadState>(threadPoolCount + adoptableThreadsCount);
mThreadCount = uint16_t(threadPoolCount);
@@ -174,7 +187,6 @@ JobSystem::JobSystem(const size_t userThreadCount, const size_t adoptableThreads
for (size_t i = 0, n = states.size(); i < n; i++) {
auto& state = states[i];
state.rndGen = default_random_engine(rd());
state.id = (uint32_t)i;
state.js = this;
if (i < hardwareThreadCount) {
// don't start a thread of adoptable thread slots
@@ -222,7 +234,7 @@ void JobSystem::decRef(Job const* job) noexcept {
void JobSystem::requestExit() noexcept {
mExitRequested.store(true);
std::lock_guard<Mutex> lock(mWaiterLock);
std::lock_guard<Mutex> const lock(mWaiterLock);
mWaiterCondition.notify_all();
}
@@ -254,23 +266,23 @@ void JobSystem::wait(std::unique_lock<Mutex>& lock, Job* job) noexcept {
// hang debugging...
// we check of we had active jobs or if the job we're waiting on had completed already.
// there is the possibility of a race condition, but our long timeout gives us some
// We check of we had active jobs or if the job we're waiting on had completed already.
// There is the possibility of a race condition, but our long timeout gives us some
// confidence that we're in an incorrect state.
auto id = getState().id;
size_t const id = std::distance(mThreadStates.data(), &getState());
auto activeJobs = mActiveJobs.load();
if (job) {
auto runningJobCount = job->runningJobCount.load();
ASSERT_POSTCONDITION(runningJobCount > 0,
"JobSystem(%p, %d): waiting while job %p has completed and %d jobs are active!",
this, id, job, activeJobs);
"JobSystem(%p, %u): waiting while job %p has completed and %d jobs are active!",
this, unsigned(id), job, activeJobs);
}
ASSERT_POSTCONDITION(activeJobs <= 0,
"JobSystem(%p, %d): waiting while %d jobs are active!",
this, id, activeJobs);
"JobSystem(%p, %u): waiting while %d jobs are active!",
this, unsigned(id), activeJobs);
} while (true);
}
@@ -278,22 +290,26 @@ void JobSystem::wait(std::unique_lock<Mutex>& lock, Job* job) noexcept {
void JobSystem::wakeAll() noexcept {
HEAVY_SYSTRACE_CALL();
std::lock_guard<Mutex> lock(mWaiterLock);
std::lock_guard<Mutex> const lock(mWaiterLock);
// this empty critical section is needed -- it guarantees that notify_all() happens
// after the condition's variables are set.
mWaiterCondition.notify_all();
}
void JobSystem::wakeOne() noexcept {
void JobSystem::wake(size_t hint) noexcept {
HEAVY_SYSTRACE_CALL();
std::lock_guard<Mutex> lock(mWaiterLock);
// this empty critical section is needed -- it guarantees that notify_one() happens
std::lock_guard<Mutex> const lock(mWaiterLock);
// this empty critical section is needed -- it guarantees that notify_all() happens
// after the condition's variables are set.
mWaiterCondition.notify_one();
if (hint == 1) {
mWaiterCondition.notify_one();
} else {
mWaiterCondition.notify_all();
}
}
inline JobSystem::ThreadState& JobSystem::getState() noexcept {
std::lock_guard<utils::Mutex> lock(mThreadMapLock);
std::lock_guard<utils::Mutex> const lock(mThreadMapLock);
auto iter = mThreadMap.find(std::this_thread::get_id());
ASSERT_PRECONDITION(iter != mThreadMap.end(), "This thread has not been adopted.");
return *iter->second;
@@ -305,17 +321,17 @@ JobSystem::Job* JobSystem::allocateJob() noexcept {
void JobSystem::put(WorkQueue& workQueue, Job* job) noexcept {
assert(job);
size_t index = job - mJobStorageBase;
size_t const index = job - mJobStorageBase;
assert(index >= 0 && index < MAX_JOB_COUNT);
// put the job into the queue first
workQueue.push(uint16_t(index + 1));
// then increase our active job count
uint32_t oldActiveJobs = mActiveJobs.fetch_add(1, std::memory_order_relaxed);
// but it's possible that the job has already been picked-up, so oldActiveJobs could be
int32_t const oldActiveJobs = mActiveJobs.fetch_add(1, std::memory_order_relaxed);
// But it's possible that the job has already been picked-up, so oldActiveJobs could be
// negative for instance. We signal only if that's not the case.
if (oldActiveJobs >= 0) {
wakeOne(); // wake-up a thread if needed...
wake(oldActiveJobs + 1); // wake-up a thread if needed...
}
}
@@ -324,18 +340,19 @@ JobSystem::Job* JobSystem::pop(WorkQueue& workQueue) noexcept {
// (and we're about to pick it up), other threads don't loop trying to do the same.
mActiveJobs.fetch_sub(1, std::memory_order_relaxed);
size_t index = workQueue.pop();
size_t const index = workQueue.pop();
assert(index <= MAX_JOB_COUNT);
Job* job = !index ? nullptr : &mJobStorageBase[index - 1];
Job* const job = !index ? nullptr : &mJobStorageBase[index - 1];
// if our guess was wrong, i.e. we couldn't pick-up a job (b/c our queue was empty), we
// If our guess was wrong, i.e. we couldn't pick up a job (b/c our queue was empty), we
// need to correct mActiveJobs.
if (!job) {
if (mActiveJobs.fetch_add(1, std::memory_order_relaxed) >= 0) {
// and if there are some active jobs, then we need to wake someone up. We know it
int32_t const oldActiveJobs = mActiveJobs.fetch_add(1, std::memory_order_relaxed);
if (oldActiveJobs >= 0) {
// And if there are some active jobs, then we need to wake someone up. We know it
// can't be us, because we failed taking a job and we know another thread can't
// have added one in our queue.
wakeOne();
wake(oldActiveJobs + 1); // wake-up a thread if needed...
}
}
return job;
@@ -346,17 +363,18 @@ JobSystem::Job* JobSystem::steal(WorkQueue& workQueue) noexcept {
// (and we're about to pick it up), other threads don't loop trying to do the same.
mActiveJobs.fetch_sub(1, std::memory_order_relaxed);
size_t index = workQueue.steal();
size_t const index = workQueue.steal();
assert(index <= MAX_JOB_COUNT);
Job* job = !index ? nullptr : &mJobStorageBase[index - 1];
Job* const job = !index ? nullptr : &mJobStorageBase[index - 1];
// if we failed taking a job, we need to correct mActiveJobs
// If we failed taking a job, we need to correct mActiveJobs.
if (!job) {
if (mActiveJobs.fetch_add(1, std::memory_order_relaxed) >= 0) {
// and if there are some active jobs, then we need to wake someone up. We know it
int32_t const oldActiveJobs = mActiveJobs.fetch_add(1, std::memory_order_relaxed);
if (oldActiveJobs >= 0) {
// And if there are some active jobs, then we need to wake someone up. We know it
// can't be us, because we failed taking a job and we know another thread can't
// have added one in our queue.
wakeOne();
wake(oldActiveJobs + 1); // wake-up a thread if needed...
}
}
return job;
@@ -366,7 +384,7 @@ inline JobSystem::ThreadState* JobSystem::getStateToStealFrom(JobSystem::ThreadS
auto& threadStates = mThreadStates;
// memory_order_relaxed is okay because we don't take any action that has data dependency
// on this value (in particular mThreadStates, is always initialized properly).
uint16_t adopted = mAdoptedThreads.load(std::memory_order_relaxed);
uint16_t const adopted = mAdoptedThreads.load(std::memory_order_relaxed);
uint16_t const threadCount = mThreadCount + adopted;
JobSystem::ThreadState* stateToStealFrom = nullptr;
@@ -374,8 +392,8 @@ inline JobSystem::ThreadState* JobSystem::getStateToStealFrom(JobSystem::ThreadS
// don't try to steal from someone else if we're the only thread (infinite loop)
if (threadCount >= 2) {
do {
// this is biased, but frankly, we don't care. it's fast.
uint16_t index = uint16_t(state.rndGen() % threadCount);
// This is biased, but frankly, we don't care. It's fast.
uint16_t const index = uint16_t(state.rndGen() % threadCount);
assert(index < threadStates.size());
stateToStealFrom = &threadStates[index];
// don't steal from our own queue
@@ -594,8 +612,8 @@ void JobSystem::adopt() {
}
// memory_order_relaxed is safe because we don't take action on this value.
uint16_t adopted = mAdoptedThreads.fetch_add(1, std::memory_order_relaxed);
size_t index = mThreadCount + adopted;
uint16_t const adopted = mAdoptedThreads.fetch_add(1, std::memory_order_relaxed);
size_t const index = mThreadCount + adopted;
ASSERT_POSTCONDITION(index < mThreadStates.size(),
"Too many calls to adopt(). No more adoptable threads!");
@@ -613,7 +631,7 @@ void JobSystem::adopt() {
void JobSystem::emancipate() {
const auto tid = std::this_thread::get_id();
std::lock_guard<utils::Mutex> lock(mThreadMapLock);
std::lock_guard<utils::Mutex> const lock(mThreadMapLock);
auto iter = mThreadMap.find(tid);
ThreadState* const state = iter == mThreadMap.end() ? nullptr : iter->second;
ASSERT_PRECONDITION(state, "this thread is not an adopted thread");
@@ -623,7 +641,8 @@ void JobSystem::emancipate() {
io::ostream& operator<<(io::ostream& out, JobSystem const& js) {
for (auto const& item : js.mThreadStates) {
out << size_t(item.id) << ": " << item.workQueue.getCount() << io::endl;
size_t const id = std::distance(js.mThreadStates.data(), &item);
out << id << ": " << item.workQueue.getCount() << io::endl;
}
return out;
}