Revert "fix a race in jobsystem"

This reverts commit 2feb0ad325.
This commit is contained in:
Benjamin Doherty
2021-09-24 11:24:44 -07:00
parent 891ffabd11
commit 2e4936afc4
2 changed files with 17 additions and 39 deletions

View File

@@ -354,56 +354,19 @@ private:
assert(job);
size_t index = job - mJobStorageBase;
assert(index >= 0 && index < MAX_JOB_COUNT);
// put the job into the queue first
workQueue.push(uint16_t(index + 1));
// then increase our active job count
uint32_t oldActiveJobs = mActiveJobs.fetch_add(1, std::memory_order_relaxed);
// but it's possible that the job has already been picked-up, so oldActiveJobs could be
// negative for instance. We signal only if that's not the case.
if (oldActiveJobs >= 0) {
wakeOne(); // wake-up a thread if needed...
}
}
Job* pop(WorkQueue& workQueue) noexcept {
// decrement mActiveJobs first, this is to ensure that if there is only a single job left
// (and we're about to pick it up), other threads don't loop trying to do the same.
mActiveJobs.fetch_sub(1, std::memory_order_relaxed);
size_t index = workQueue.pop();
assert(index <= MAX_JOB_COUNT);
Job* job = !index ? nullptr : &mJobStorageBase[index - 1];
// if our guess was wrong, i.e. we couldn't pick-up a job (b/c our queue was empty), we
// need to correct mActiveJobs.
if (!job) {
if (mActiveJobs.fetch_add(1, std::memory_order_relaxed) >= 0) {
// and if there are some active jobs, then we need to wake someone up. We know it
// can't be us, because we failed taking a job and we know another thread can't
// have added one in our queue.
wakeOne();
}
}
return job;
return !index ? nullptr : &mJobStorageBase[index - 1];
}
Job* steal(WorkQueue& workQueue) noexcept {
// decrement mActiveJobs first, this is to ensure that if there is only a single job left
// (and we're about to pick it up), other threads don't loop trying to do the same.
mActiveJobs.fetch_sub(1, std::memory_order_relaxed);
size_t index = workQueue.steal();
assert(index <= MAX_JOB_COUNT);
Job* job = !index ? nullptr : &mJobStorageBase[index - 1];
// if we failed taking a job, we need to correct mActiveJobs
if (!job) {
mActiveJobs.fetch_add(1, std::memory_order_relaxed);
// we don't need to signal here, because we're going to loop back into this thread to
// try again.
}
return job;
return !index ? nullptr : &mJobStorageBase[index - 1];
}
void wait(std::unique_lock<Mutex>& lock, Job* job = nullptr) noexcept;

View File

@@ -323,6 +323,11 @@ bool JobSystem::execute(JobSystem::ThreadState& state) noexcept {
if (job) {
assert(job->runningJobCount.load(std::memory_order_relaxed) >= 1);
UTILS_UNUSED_IN_RELEASE
uint32_t activeJobs = mActiveJobs.fetch_sub(1, std::memory_order_relaxed);
assert(activeJobs); // whoops, we were already at 0
HEAVY_SYSTRACE_VALUE32("JobSystem::activeJobs", activeJobs - 1);
if (UTILS_LIKELY(job->function)) {
HEAVY_SYSTRACE_NAME("job->function");
job->function(job->storage, *this, job);
@@ -441,8 +446,18 @@ void JobSystem::run(Job*& job) noexcept {
ThreadState& state(getState());
// increase the active job count before we add the job to the queue, because otherwise
// the job could run and finish before the counter is incremented, which would trigger
// an assert() in execute(). Either way, it's not "wrong", but the assert() is useful.
uint32_t activeJobs = mActiveJobs.fetch_add(1, std::memory_order_relaxed);
put(state.workQueue, job);
HEAVY_SYSTRACE_VALUE32("JobSystem::activeJobs", activeJobs + 1);
// wake-up a thread if needed...
wakeOne();
// after run() returns, the job is virtually invalid (it'll die on its own)
job = nullptr;
}