JobSystem now automatically free Jobs (#91)

* JobSystem now automatically free Jobs

Until now Job allocation used a linear allocator
strategy which required to “reset” the JobSystem
periodically — typically once per frame in
filament.

This is no longer required. We use a pool allocator
now, which doesn’t add much overhead. It does
use a spin-lock for thread-safety though, since
we assume very little contention, this shouldn’t
be a problem.

* Thread Safe Object Pool Allocator

A lock-less, thread-safe object pool allocator,
now used for storing JobSystem’s jobs allocations.
This gets rid of the spin-lock introduced in the
previous cl.
This commit is contained in:
Mathias Agopian
2018-08-13 22:25:15 -07:00
committed by Romain Guy
parent bb553bf53d
commit d9ba3998d2
4 changed files with 140 additions and 97 deletions

View File

@@ -103,12 +103,12 @@ void JobSystem::setThreadAffinity(uint32_t mask) noexcept {
#endif
}
JobSystem::JobSystem(size_t threadCount, size_t adoptableThreadsCount) noexcept {
JobSystem::JobSystem(size_t threadCount, size_t adoptableThreadsCount) noexcept
: mJobPool("JobSystem Job pool", MAX_JOB_COUNT * sizeof(Job)),
mJobStorageBase(static_cast<Job *>(mJobPool.getAllocator().getCurrent()))
{
SYSTRACE_ENABLE();
void* p = aligned_alloc(MAX_JOB_COUNT * sizeof(Job), CACHELINE_SIZE);
mJobStorage = (Job*)p;
if (threadCount == 0) {
// default value, system dependant
size_t hwThreads = std::thread::hardware_concurrency();
@@ -128,7 +128,6 @@ JobSystem::JobSystem(size_t threadCount, size_t adoptableThreadsCount) noexcept
// this is pitty these are not compile-time checks (C++17 supports it apparently)
assert(mExitRequested.is_lock_free());
assert(mNextJobIndex.is_lock_free());
assert(Job().runningJobCount.is_lock_free());
std::random_device rd;
@@ -158,8 +157,6 @@ JobSystem::~JobSystem() {
state.thread.join();
}
}
aligned_free(mJobStorage);
}
JobSystem* JobSystem::getJobSystem() noexcept {
@@ -194,12 +191,7 @@ inline JobSystem::ThreadState& JobSystem::getState() noexcept {
}
JobSystem::Job* JobSystem::allocateJob() noexcept {
size_t index = mNextJobIndex.fetch_add(1, std::memory_order_relaxed);
if (UTILS_UNLIKELY(index >= MAX_JOB_COUNT)) {
mNextJobIndex.fetch_sub(1, std::memory_order_relaxed);
return nullptr;
}
return new(&mJobStorage[index]) Job();
return mJobPool.make<Job>();
}
inline JobSystem::ThreadState& JobSystem::getStateToStealFrom(JobSystem::ThreadState& state) noexcept {
@@ -270,8 +262,9 @@ JobSystem::Job* JobSystem::create(JobSystem::Job* parent, JobFunc func) noexcept
if (parent) {
// can't create a child job of a terminated parent
assert(parent->runningJobCount.load(std::memory_order_relaxed) > 0);
parent->runningJobCount.fetch_add(1, std::memory_order_relaxed);
index = parent - &mJobStorage[0];
index = parent - mJobStorageBase;
assert(index < MAX_JOB_COUNT);
}
job->function = func;
@@ -281,20 +274,12 @@ JobSystem::Job* JobSystem::create(JobSystem::Job* parent, JobFunc func) noexcept
return job;
}
void JobSystem::reset(JobSystem::Job* job) noexcept {
JobSystem::Job* parent = job->parent ? &mJobStorage[job->parent] : mMasterJob;
if (parent) {
assert(parent->runningJobCount.load(std::memory_order_relaxed) > 0);
parent->runningJobCount.fetch_add(1, std::memory_order_relaxed);
}
job->runningJobCount.store(1, std::memory_order_relaxed);
}
void JobSystem::finish(Job* job) noexcept {
SYSTRACE_CALL();
// terminate this job and notify its parent
Job* const storage = mJobStorage;
auto& jobPool = mJobPool;
Job* const storage = mJobStorageBase;
do {
// std::memory_order_release here is needed to synchronize with JobSystem::wait()
// which needs to "see" all changes that happened before the job terminated.
@@ -304,13 +289,17 @@ void JobSystem::finish(Job* job) noexcept {
// there is still work (e.g.: children), we're done.
break;
}
job = job->parent == 0x7FFF ? nullptr : &storage[job->parent];
Job* const parent = job->parent == 0x7FFF ? nullptr : &storage[job->parent];
// destroy this job...
jobPool.destroy(job);
// ... and check the parent
job = parent;
} while (job);
#if __ARM_ARCH_7A__
// on ARMv7a SEL is needed
__dsb(0xA); // ISHST = 0xA (b1010)
UTILS_SIGNAL_EVENT();
__dsb(0xA); // ISHST = 0xA (b1010)
UTILS_SIGNAL_EVENT();
#endif
}
@@ -392,13 +381,6 @@ void JobSystem::emancipate() {
sThreadState = nullptr;
}
void JobSystem::reset() noexcept {
assert(!mActiveJobs.load(std::memory_order_relaxed));
mJobWaterMark = std::max(mJobWaterMark, (size_t)mNextJobIndex.load(std::memory_order_relaxed));
mNextJobIndex.store(0, std::memory_order_relaxed);
mMasterJob = nullptr;
}
io::ostream& operator<<(io::ostream& out, JobSystem const& js) {
for (auto const& item : js.mThreadStates) {
out << size_t(std::log2f(item.mask)) << ": " << item.workQueue.getCount() << io::endl;