diff options
Diffstat (limited to 'src/common/threadsafe_queue.h')
-rw-r--r-- | src/common/threadsafe_queue.h | 55 |
1 files changed, 35 insertions, 20 deletions
diff --git a/src/common/threadsafe_queue.h b/src/common/threadsafe_queue.h index edf13bc49..e714ba5b3 100644 --- a/src/common/threadsafe_queue.h +++ b/src/common/threadsafe_queue.h @@ -7,17 +7,17 @@ // a simple lockless thread-safe, // single reader, single writer queue -#include <algorithm> #include <atomic> +#include <condition_variable> #include <cstddef> #include <mutex> -#include "common/common_types.h" +#include <utility> namespace Common { -template <typename T, bool NeedSize = true> +template <typename T> class SPSCQueue { public: - SPSCQueue() : size(0) { + SPSCQueue() { write_ptr = read_ptr = new ElementPtr(); } ~SPSCQueue() { @@ -25,13 +25,12 @@ public: delete read_ptr; } - u32 Size() const { - static_assert(NeedSize, "using Size() on FifoQueue without NeedSize"); + std::size_t Size() const { return size.load(); } bool Empty() const { - return !read_ptr->next.load(); + return Size() == 0; } T& Front() const { @@ -47,13 +46,14 @@ public: ElementPtr* new_ptr = new ElementPtr(); write_ptr->next.store(new_ptr, std::memory_order_release); write_ptr = new_ptr; - if (NeedSize) - size++; + cv.notify_one(); + + ++size; } void Pop() { - if (NeedSize) - size--; + --size; + ElementPtr* tmpptr = read_ptr; // advance the read pointer read_ptr = tmpptr->next.load(); @@ -66,8 +66,7 @@ public: if (Empty()) return false; - if (NeedSize) - size--; + --size; ElementPtr* tmpptr = read_ptr; read_ptr = tmpptr->next.load(std::memory_order_acquire); @@ -77,6 +76,16 @@ public: return true; } + T PopWait() { + if (Empty()) { + std::unique_lock lock{cv_mutex}; + cv.wait(lock, [this]() { return !Empty(); }); + } + T t; + Pop(t); + return t; + } + // not thread-safe void Clear() { size.store(0); @@ -89,7 +98,7 @@ private: // and a pointer to the next ElementPtr class ElementPtr { public: - ElementPtr() : next(nullptr) {} + ElementPtr() {} ~ElementPtr() { ElementPtr* next_ptr = next.load(); @@ -98,21 +107,23 @@ private: } T current; - std::atomic<ElementPtr*> next; + std::atomic<ElementPtr*> next{nullptr}; }; ElementPtr* write_ptr; ElementPtr* read_ptr; - std::atomic<u32> size; + std::atomic_size_t size{0}; + std::mutex cv_mutex; + std::condition_variable cv; }; // a simple thread-safe, // single reader, multiple writer queue -template <typename T, bool NeedSize = true> +template <typename T> class MPSCQueue { public: - u32 Size() const { + std::size_t Size() const { return spsc_queue.Size(); } @@ -126,7 +137,7 @@ public: template <typename Arg> void Push(Arg&& t) { - std::lock_guard<std::mutex> lock(write_lock); + std::lock_guard lock{write_lock}; spsc_queue.Push(t); } @@ -138,13 +149,17 @@ public: return spsc_queue.Pop(t); } + T PopWait() { + return spsc_queue.PopWait(); + } + // not thread-safe void Clear() { spsc_queue.Clear(); } private: - SPSCQueue<T, NeedSize> spsc_queue; + SPSCQueue<T> spsc_queue; std::mutex write_lock; }; } // namespace Common |