diff options
Diffstat (limited to 'src/common/threadsafe_queue.h')
-rw-r--r-- | src/common/threadsafe_queue.h | 37 |
1 files changed, 26 insertions, 11 deletions
diff --git a/src/common/threadsafe_queue.h b/src/common/threadsafe_queue.h index ad04df8ca..2c8c2b90e 100644 --- a/src/common/threadsafe_queue.h +++ b/src/common/threadsafe_queue.h @@ -14,7 +14,7 @@ #include <utility> namespace Common { -template <typename T> +template <typename T, bool with_stop_token = false> class SPSCQueue { public: SPSCQueue() { @@ -46,15 +46,13 @@ public: ElementPtr* new_ptr = new ElementPtr(); write_ptr->next.store(new_ptr, std::memory_order_release); write_ptr = new_ptr; + ++size; - const size_t previous_size{size++}; - - // Acquire the mutex and then immediately release it as a fence. + // cv_mutex must be held or else there will be a missed wakeup if the other thread is in the + // line before cv.wait // TODO(bunnei): This can be replaced with C++20 waitable atomics when properly supported. // See discussion on https://github.com/yuzu-emu/yuzu/pull/3173 for details. - if (previous_size == 0) { - std::lock_guard lock{cv_mutex}; - } + std::lock_guard lock{cv_mutex}; cv.notify_one(); } @@ -86,7 +84,7 @@ public: void Wait() { if (Empty()) { std::unique_lock lock{cv_mutex}; - cv.wait(lock, [this]() { return !Empty(); }); + cv.wait(lock, [this] { return !Empty(); }); } } @@ -97,6 +95,19 @@ public: return t; } + T PopWait(std::stop_token stop_token) { + if (Empty()) { + std::unique_lock lock{cv_mutex}; + cv.wait(lock, stop_token, [this] { return !Empty(); }); + } + if (stop_token.stop_requested()) { + return T{}; + } + T t; + Pop(t); + return t; + } + // not thread-safe void Clear() { size.store(0); @@ -125,13 +136,13 @@ private: ElementPtr* read_ptr; std::atomic_size_t size{0}; std::mutex cv_mutex; - std::condition_variable cv; + std::conditional_t<with_stop_token, std::condition_variable_any, std::condition_variable> cv; }; // a simple thread-safe, // single reader, multiple writer queue -template <typename T> +template <typename T, bool with_stop_token = false> class MPSCQueue { public: [[nodiscard]] std::size_t Size() const { @@ -168,13 +179,17 @@ public: return spsc_queue.PopWait(); } + T PopWait(std::stop_token stop_token) { + return spsc_queue.PopWait(stop_token); + } + // not thread-safe void Clear() { spsc_queue.Clear(); } private: - SPSCQueue<T> spsc_queue; + SPSCQueue<T, with_stop_token> spsc_queue; std::mutex write_lock; }; } // namespace Common |