diff options
| -rw-r--r-- | src/common/bounded_threadsafe_queue.h | 71 | 
1 files changed, 71 insertions, 0 deletions
| diff --git a/src/common/bounded_threadsafe_queue.h b/src/common/bounded_threadsafe_queue.h index e03427539..eb88cc1d1 100644 --- a/src/common/bounded_threadsafe_queue.h +++ b/src/common/bounded_threadsafe_queue.h @@ -22,6 +22,55 @@ class SPSCQueue {      static_assert((Capacity & (Capacity - 1)) == 0, "Capacity must be a power of two.");  public: +    bool TryPush(T&& t) { +        const size_t write_index = m_write_index.load(); + +        // Check if we have free slots to write to. +        if ((write_index - m_read_index.load()) == Capacity) { +            return false; +        } + +        // Determine the position to write to. +        const size_t pos = write_index % Capacity; + +        // Push into the queue. +        m_data[pos] = std::move(t); + +        // Increment the write index. +        ++m_write_index; + +        // Notify the consumer that we have pushed into the queue. +        std::scoped_lock lock{cv_mutex}; +        cv.notify_one(); + +        return true; +    } + +    template <typename... Args> +    bool TryPush(Args&&... args) { +        const size_t write_index = m_write_index.load(); + +        // Check if we have free slots to write to. +        if ((write_index - m_read_index.load()) == Capacity) { +            return false; +        } + +        // Determine the position to write to. +        const size_t pos = write_index % Capacity; + +        // Emplace into the queue. +        std::construct_at(std::addressof(m_data[pos]), std::forward<Args>(args)...); + +        // Increment the write index. +        ++m_write_index; + +        // Notify the consumer that we have pushed into the queue. +        std::scoped_lock lock{cv_mutex}; +        cv.notify_one(); + +        return true; +    } +      void Push(T&& t) {          const size_t write_index = m_write_index.load(); @@ -153,6 +202,17 @@ private:  template <typename T, size_t Capacity = detail::DefaultCapacity>  class MPSCQueue {  public: +    bool TryPush(T&& t) { +        std::scoped_lock lock{write_mutex}; +        return spsc_queue.TryPush(std::move(t)); +    } + +    template <typename... Args> +    bool TryPush(Args&&... args) { +        std::scoped_lock lock{write_mutex}; +        return spsc_queue.TryPush(std::forward<Args>(args)...); +    } +      void Push(T&& t) {          std::scoped_lock lock{write_mutex};          spsc_queue.Push(std::move(t)); @@ -196,6 +256,17 @@ private:  template <typename T, size_t Capacity = detail::DefaultCapacity>  class MPMCQueue {  public: +    bool TryPush(T&& t) { +        std::scoped_lock lock{write_mutex}; +        return spsc_queue.TryPush(std::move(t)); +    } + +    template <typename... Args> +    bool TryPush(Args&&... args) { +        std::scoped_lock lock{write_mutex}; +        return spsc_queue.TryPush(std::forward<Args>(args)...); +    } +      void Push(T&& t) {          std::scoped_lock lock{write_mutex};          spsc_queue.Push(std::move(t)); | 
