diff options
Diffstat (limited to 'eigen/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h')
-rw-r--r-- | eigen/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h | 210 |
1 files changed, 0 insertions, 210 deletions
diff --git a/eigen/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h b/eigen/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h deleted file mode 100644 index 05ed76c..0000000 --- a/eigen/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h +++ /dev/null @@ -1,210 +0,0 @@ -// This file is part of Eigen, a lightweight C++ template library -// for linear algebra. -// -// Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com> -// -// This Source Code Form is subject to the terms of the Mozilla -// Public License v. 2.0. If a copy of the MPL was not distributed -// with this file, You can obtain one at http://mozilla.org/MPL/2.0/. - -#ifndef EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_ -#define EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_ - - -namespace Eigen { - -// RunQueue is a fixed-size, partially non-blocking deque or Work items. -// Operations on front of the queue must be done by a single thread (owner), -// operations on back of the queue can be done by multiple threads concurrently. -// -// Algorithm outline: -// All remote threads operating on the queue back are serialized by a mutex. -// This ensures that at most two threads access state: owner and one remote -// thread (Size aside). The algorithm ensures that the occupied region of the -// underlying array is logically continuous (can wraparound, but no stray -// occupied elements). Owner operates on one end of this region, remote thread -// operates on the other end. Synchronization between these threads -// (potential consumption of the last element and take up of the last empty -// element) happens by means of state variable in each element. States are: -// empty, busy (in process of insertion of removal) and ready. Threads claim -// elements (empty->busy and ready->busy transitions) by means of a CAS -// operation. The finishing transition (busy->empty and busy->ready) are done -// with plain store as the element is exclusively owned by the current thread. -// -// Note: we could permit only pointers as elements, then we would not need -// separate state variable as null/non-null pointer value would serve as state, -// but that would require malloc/free per operation for large, complex values -// (and this is designed to store std::function<()>). -template <typename Work, unsigned kSize> -class RunQueue { - public: - RunQueue() : front_(0), back_(0) { - // require power-of-two for fast masking - eigen_assert((kSize & (kSize - 1)) == 0); - eigen_assert(kSize > 2); // why would you do this? - eigen_assert(kSize <= (64 << 10)); // leave enough space for counter - for (unsigned i = 0; i < kSize; i++) - array_[i].state.store(kEmpty, std::memory_order_relaxed); - } - - ~RunQueue() { eigen_assert(Size() == 0); } - - // PushFront inserts w at the beginning of the queue. - // If queue is full returns w, otherwise returns default-constructed Work. - Work PushFront(Work w) { - unsigned front = front_.load(std::memory_order_relaxed); - Elem* e = &array_[front & kMask]; - uint8_t s = e->state.load(std::memory_order_relaxed); - if (s != kEmpty || - !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire)) - return w; - front_.store(front + 1 + (kSize << 1), std::memory_order_relaxed); - e->w = std::move(w); - e->state.store(kReady, std::memory_order_release); - return Work(); - } - - // PopFront removes and returns the first element in the queue. - // If the queue was empty returns default-constructed Work. - Work PopFront() { - unsigned front = front_.load(std::memory_order_relaxed); - Elem* e = &array_[(front - 1) & kMask]; - uint8_t s = e->state.load(std::memory_order_relaxed); - if (s != kReady || - !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire)) - return Work(); - Work w = std::move(e->w); - e->state.store(kEmpty, std::memory_order_release); - front = ((front - 1) & kMask2) | (front & ~kMask2); - front_.store(front, std::memory_order_relaxed); - return w; - } - - // PushBack adds w at the end of the queue. - // If queue is full returns w, otherwise returns default-constructed Work. - Work PushBack(Work w) { - std::unique_lock<std::mutex> lock(mutex_); - unsigned back = back_.load(std::memory_order_relaxed); - Elem* e = &array_[(back - 1) & kMask]; - uint8_t s = e->state.load(std::memory_order_relaxed); - if (s != kEmpty || - !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire)) - return w; - back = ((back - 1) & kMask2) | (back & ~kMask2); - back_.store(back, std::memory_order_relaxed); - e->w = std::move(w); - e->state.store(kReady, std::memory_order_release); - return Work(); - } - - // PopBack removes and returns the last elements in the queue. - // Can fail spuriously. - Work PopBack() { - if (Empty()) return Work(); - std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock); - if (!lock) return Work(); - unsigned back = back_.load(std::memory_order_relaxed); - Elem* e = &array_[back & kMask]; - uint8_t s = e->state.load(std::memory_order_relaxed); - if (s != kReady || - !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire)) - return Work(); - Work w = std::move(e->w); - e->state.store(kEmpty, std::memory_order_release); - back_.store(back + 1 + (kSize << 1), std::memory_order_relaxed); - return w; - } - - // PopBackHalf removes and returns half last elements in the queue. - // Returns number of elements removed. But can also fail spuriously. - unsigned PopBackHalf(std::vector<Work>* result) { - if (Empty()) return 0; - std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock); - if (!lock) return 0; - unsigned back = back_.load(std::memory_order_relaxed); - unsigned size = Size(); - unsigned mid = back; - if (size > 1) mid = back + (size - 1) / 2; - unsigned n = 0; - unsigned start = 0; - for (; static_cast<int>(mid - back) >= 0; mid--) { - Elem* e = &array_[mid & kMask]; - uint8_t s = e->state.load(std::memory_order_relaxed); - if (n == 0) { - if (s != kReady || - !e->state.compare_exchange_strong(s, kBusy, - std::memory_order_acquire)) - continue; - start = mid; - } else { - // Note: no need to store temporal kBusy, we exclusively own these - // elements. - eigen_assert(s == kReady); - } - result->push_back(std::move(e->w)); - e->state.store(kEmpty, std::memory_order_release); - n++; - } - if (n != 0) - back_.store(start + 1 + (kSize << 1), std::memory_order_relaxed); - return n; - } - - // Size returns current queue size. - // Can be called by any thread at any time. - unsigned Size() const { - // Emptiness plays critical role in thread pool blocking. So we go to great - // effort to not produce false positives (claim non-empty queue as empty). - for (;;) { - // Capture a consistent snapshot of front/tail. - unsigned front = front_.load(std::memory_order_acquire); - unsigned back = back_.load(std::memory_order_acquire); - unsigned front1 = front_.load(std::memory_order_relaxed); - if (front != front1) continue; - int size = (front & kMask2) - (back & kMask2); - // Fix overflow. - if (size < 0) size += 2 * kSize; - // Order of modification in push/pop is crafted to make the queue look - // larger than it is during concurrent modifications. E.g. pop can - // decrement size before the corresponding push has incremented it. - // So the computed size can be up to kSize + 1, fix it. - if (size > static_cast<int>(kSize)) size = kSize; - return size; - } - } - - // Empty tests whether container is empty. - // Can be called by any thread at any time. - bool Empty() const { return Size() == 0; } - - private: - static const unsigned kMask = kSize - 1; - static const unsigned kMask2 = (kSize << 1) - 1; - struct Elem { - std::atomic<uint8_t> state; - Work w; - }; - enum { - kEmpty, - kBusy, - kReady, - }; - std::mutex mutex_; - // Low log(kSize) + 1 bits in front_ and back_ contain rolling index of - // front/back, repsectively. The remaining bits contain modification counters - // that are incremented on Push operations. This allows us to (1) distinguish - // between empty and full conditions (if we would use log(kSize) bits for - // position, these conditions would be indistinguishable); (2) obtain - // consistent snapshot of front_/back_ for Size operation using the - // modification counters. - std::atomic<unsigned> front_; - std::atomic<unsigned> back_; - Elem array_[kSize]; - - RunQueue(const RunQueue&) = delete; - void operator=(const RunQueue&) = delete; -}; - -} // namespace Eigen - -#endif // EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_ |