diff options
| author | Morph <39850852+Morph1984@users.noreply.github.com> | 2023-03-19 04:01:47 -0400 | 
|---|---|---|
| committer | Morph <39850852+Morph1984@users.noreply.github.com> | 2023-03-21 19:17:38 -0400 | 
| commit | 15d573194c95b95ccf4a5480d8e40a7765a00929 (patch) | |
| tree | 11d3f37d6e90467e4d2f41f0f315958b6edf2b75 | |
| parent | f28ca5361f5242f5b65a5c54bdde55639fba71f5 (diff) | |
bounded_threadsafe_queue: Add TryPush
| -rw-r--r-- | src/common/bounded_threadsafe_queue.h | 71 | 
1 files changed, 71 insertions, 0 deletions
| diff --git a/src/common/bounded_threadsafe_queue.h b/src/common/bounded_threadsafe_queue.h index e03427539..eb88cc1d1 100644 --- a/src/common/bounded_threadsafe_queue.h +++ b/src/common/bounded_threadsafe_queue.h @@ -22,6 +22,55 @@ class SPSCQueue {      static_assert((Capacity & (Capacity - 1)) == 0, "Capacity must be a power of two.");  public: +    bool TryPush(T&& t) { +        const size_t write_index = m_write_index.load(); + +        // Check if we have free slots to write to. +        if ((write_index - m_read_index.load()) == Capacity) { +            return false; +        } + +        // Determine the position to write to. +        const size_t pos = write_index % Capacity; + +        // Push into the queue. +        m_data[pos] = std::move(t); + +        // Increment the write index. +        ++m_write_index; + +        // Notify the consumer that we have pushed into the queue. +        std::scoped_lock lock{cv_mutex}; +        cv.notify_one(); + +        return true; +    } + +    template <typename... Args> +    bool TryPush(Args&&... args) { +        const size_t write_index = m_write_index.load(); + +        // Check if we have free slots to write to. +        if ((write_index - m_read_index.load()) == Capacity) { +            return false; +        } + +        // Determine the position to write to. +        const size_t pos = write_index % Capacity; + +        // Emplace into the queue. +        std::construct_at(std::addressof(m_data[pos]), std::forward<Args>(args)...); + +        // Increment the write index. +        ++m_write_index; + +        // Notify the consumer that we have pushed into the queue. +        std::scoped_lock lock{cv_mutex}; +        cv.notify_one(); + +        return true; +    } +      void Push(T&& t) {          const size_t write_index = m_write_index.load(); @@ -153,6 +202,17 @@ private:  template <typename T, size_t Capacity = detail::DefaultCapacity>  class MPSCQueue {  public: +    bool TryPush(T&& t) { +        std::scoped_lock lock{write_mutex}; +        return spsc_queue.TryPush(std::move(t)); +    } + +    template <typename... Args> +    bool TryPush(Args&&... args) { +        std::scoped_lock lock{write_mutex}; +        return spsc_queue.TryPush(std::forward<Args>(args)...); +    } +      void Push(T&& t) {          std::scoped_lock lock{write_mutex};          spsc_queue.Push(std::move(t)); @@ -196,6 +256,17 @@ private:  template <typename T, size_t Capacity = detail::DefaultCapacity>  class MPMCQueue {  public: +    bool TryPush(T&& t) { +        std::scoped_lock lock{write_mutex}; +        return spsc_queue.TryPush(std::move(t)); +    } + +    template <typename... Args> +    bool TryPush(Args&&... args) { +        std::scoped_lock lock{write_mutex}; +        return spsc_queue.TryPush(std::forward<Args>(args)...); +    } +      void Push(T&& t) {          std::scoped_lock lock{write_mutex};          spsc_queue.Push(std::move(t)); | 
