fix a race in jobsystem (2nd attempt)

We were decrementing activeJobCount after removing the job from the
queue, which could cause other threads in the pool to preempt us before
the decrement, causing them to spin forever trying to get a non-existant
job, until the decrement actually happened.

Now we always decrement first and fix-up the count if we couldn't get
a job from the queues. The race is inverted, and doesn't cause threads
to spin a long time.


fixes b/201100123
This commit is contained in:
Mathias Agopian
2021-09-13 14:20:08 -07:00
committed by Mathias Agopian
parent 6e7af103d3
commit 19b0ad2605
2 changed files with 64 additions and 33 deletions

View File

@@ -15,8 +15,10 @@
*/
// Note: The overhead of SYSTRACE_TAG_JOBSYSTEM is not negligible especially with parallel_for().
#ifndef SYSTRACE_TAG
//#define SYSTRACE_TAG SYSTRACE_TAG_JOBSYSTEM
#define SYSTRACE_TAG SYSTRACE_TAG_NEVER
#endif
// when SYSTRACE_TAG_JOBSYSTEM is used, enables even heavier systraces
#define HEAVY_SYSTRACE 0
@@ -275,6 +277,65 @@ JobSystem::Job* JobSystem::allocateJob() noexcept {
return mJobPool.make<Job>();
}
void JobSystem::put(WorkQueue& workQueue, Job* job) noexcept {
assert(job);
size_t index = job - mJobStorageBase;
assert(index >= 0 && index < MAX_JOB_COUNT);
// put the job into the queue first
workQueue.push(uint16_t(index + 1));
// then increase our active job count
uint32_t oldActiveJobs = mActiveJobs.fetch_add(1, std::memory_order_relaxed);
// but it's possible that the job has already been picked-up, so oldActiveJobs could be
// negative for instance. We signal only if that's not the case.
if (oldActiveJobs >= 0) {
wakeOne(); // wake-up a thread if needed...
}
}
JobSystem::Job* JobSystem::pop(WorkQueue& workQueue) noexcept {
// decrement mActiveJobs first, this is to ensure that if there is only a single job left
// (and we're about to pick it up), other threads don't loop trying to do the same.
mActiveJobs.fetch_sub(1, std::memory_order_relaxed);
size_t index = workQueue.pop();
assert(index <= MAX_JOB_COUNT);
Job* job = !index ? nullptr : &mJobStorageBase[index - 1];
// if our guess was wrong, i.e. we couldn't pick-up a job (b/c our queue was empty), we
// need to correct mActiveJobs.
if (!job) {
if (mActiveJobs.fetch_add(1, std::memory_order_relaxed) >= 0) {
// and if there are some active jobs, then we need to wake someone up. We know it
// can't be us, because we failed taking a job and we know another thread can't
// have added one in our queue.
wakeOne();
}
}
return job;
}
JobSystem::Job* JobSystem::steal(WorkQueue& workQueue) noexcept {
// decrement mActiveJobs first, this is to ensure that if there is only a single job left
// (and we're about to pick it up), other threads don't loop trying to do the same.
mActiveJobs.fetch_sub(1, std::memory_order_relaxed);
size_t index = workQueue.steal();
assert(index <= MAX_JOB_COUNT);
Job* job = !index ? nullptr : &mJobStorageBase[index - 1];
// if we failed taking a job, we need to correct mActiveJobs
if (!job) {
if (mActiveJobs.fetch_add(1, std::memory_order_relaxed) >= 0) {
// and if there are some active jobs, then we need to wake someone up. We know it
// can't be us, because we failed taking a job and we know another thread can't
// have added one in our queue.
wakeOne();
}
}
return job;
}
inline JobSystem::ThreadState* JobSystem::getStateToStealFrom(JobSystem::ThreadState& state) noexcept {
auto& threadStates = mThreadStates;
// memory_order_relaxed is okay because we don't take any action that has data dependency
@@ -323,11 +384,6 @@ bool JobSystem::execute(JobSystem::ThreadState& state) noexcept {
if (job) {
assert(job->runningJobCount.load(std::memory_order_relaxed) >= 1);
UTILS_UNUSED_IN_RELEASE
uint32_t activeJobs = mActiveJobs.fetch_sub(1, std::memory_order_relaxed);
assert(activeJobs); // whoops, we were already at 0
HEAVY_SYSTRACE_VALUE32("JobSystem::activeJobs", activeJobs - 1);
if (UTILS_LIKELY(job->function)) {
HEAVY_SYSTRACE_NAME("job->function");
job->function(job->storage, *this, job);
@@ -446,18 +502,8 @@ void JobSystem::run(Job*& job) noexcept {
ThreadState& state(getState());
// increase the active job count before we add the job to the queue, because otherwise
// the job could run and finish before the counter is incremented, which would trigger
// an assert() in execute(). Either way, it's not "wrong", but the assert() is useful.
uint32_t activeJobs = mActiveJobs.fetch_add(1, std::memory_order_relaxed);
put(state.workQueue, job);
HEAVY_SYSTRACE_VALUE32("JobSystem::activeJobs", activeJobs + 1);
// wake-up a thread if needed...
wakeOne();
// after run() returns, the job is virtually invalid (it'll die on its own)
job = nullptr;
}