diff options
author | bunnei <bunneidev@gmail.com> | 2021-09-18 20:37:40 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-09-18 20:37:40 -0700 |
commit | a9c3619d26beaaae338c5e902635147b9237350a (patch) | |
tree | b86f29a288fa5bc5a337ba61a6833fa1c89c2270 /src/common/threadsafe_queue.h | |
parent | 6e376c27a327d8e123efafe49370a304cec20221 (diff) | |
parent | 84f7e7e91c441636b93accae6f7bd52f70a8ab99 (diff) |
Merge pull request #7019 from ameerj/videocore-jthread
videocore: Use std::jthread for worker threads
Diffstat (limited to 'src/common/threadsafe_queue.h')
-rw-r--r-- | src/common/threadsafe_queue.h | 27 |
1 files changed, 22 insertions, 5 deletions
diff --git a/src/common/threadsafe_queue.h b/src/common/threadsafe_queue.h index 8430b9778..2c8c2b90e 100644 --- a/src/common/threadsafe_queue.h +++ b/src/common/threadsafe_queue.h @@ -14,7 +14,7 @@ #include <utility> namespace Common { -template <typename T> +template <typename T, bool with_stop_token = false> class SPSCQueue { public: SPSCQueue() { @@ -84,7 +84,7 @@ public: void Wait() { if (Empty()) { std::unique_lock lock{cv_mutex}; - cv.wait(lock, [this]() { return !Empty(); }); + cv.wait(lock, [this] { return !Empty(); }); } } @@ -95,6 +95,19 @@ public: return t; } + T PopWait(std::stop_token stop_token) { + if (Empty()) { + std::unique_lock lock{cv_mutex}; + cv.wait(lock, stop_token, [this] { return !Empty(); }); + } + if (stop_token.stop_requested()) { + return T{}; + } + T t; + Pop(t); + return t; + } + // not thread-safe void Clear() { size.store(0); @@ -123,13 +136,13 @@ private: ElementPtr* read_ptr; std::atomic_size_t size{0}; std::mutex cv_mutex; - std::condition_variable cv; + std::conditional_t<with_stop_token, std::condition_variable_any, std::condition_variable> cv; }; // a simple thread-safe, // single reader, multiple writer queue -template <typename T> +template <typename T, bool with_stop_token = false> class MPSCQueue { public: [[nodiscard]] std::size_t Size() const { @@ -166,13 +179,17 @@ public: return spsc_queue.PopWait(); } + T PopWait(std::stop_token stop_token) { + return spsc_queue.PopWait(stop_token); + } + // not thread-safe void Clear() { spsc_queue.Clear(); } private: - SPSCQueue<T> spsc_queue; + SPSCQueue<T, with_stop_token> spsc_queue; std::mutex write_lock; }; } // namespace Common |