diff options
Diffstat (limited to 'src/common/bounded_threadsafe_queue.h')
-rw-r--r-- | src/common/bounded_threadsafe_queue.h | 167 |
1 files changed, 167 insertions, 0 deletions
diff --git a/src/common/bounded_threadsafe_queue.h b/src/common/bounded_threadsafe_queue.h new file mode 100644 index 000000000..7e465549b --- /dev/null +++ b/src/common/bounded_threadsafe_queue.h @@ -0,0 +1,167 @@ +// SPDX-FileCopyrightText: Copyright (c) 2020 Erik Rigtorp <erik@rigtorp.se> +// SPDX-License-Identifier: MIT + +#pragma once + +#include <atomic> +#include <bit> +#include <condition_variable> +#include <memory> +#include <mutex> +#include <new> +#include <stop_token> +#include <type_traits> +#include <utility> + +namespace Common { + +#if defined(__cpp_lib_hardware_interference_size) +constexpr size_t hardware_interference_size = std::hardware_destructive_interference_size; +#else +constexpr size_t hardware_interference_size = 64; +#endif + +#ifdef _MSC_VER +#pragma warning(push) +#pragma warning(disable : 4324) +#endif + +template <typename T, size_t capacity = 0x400> +class MPSCQueue { +public: + explicit MPSCQueue() : allocator{std::allocator<Slot<T>>()} { + // Allocate one extra slot to prevent false sharing on the last slot + slots = allocator.allocate(capacity + 1); + // Allocators are not required to honor alignment for over-aligned types + // (see http://eel.is/c++draft/allocator.requirements#10) so we verify + // alignment here + if (reinterpret_cast<uintptr_t>(slots) % alignof(Slot<T>) != 0) { + allocator.deallocate(slots, capacity + 1); + throw std::bad_alloc(); + } + for (size_t i = 0; i < capacity; ++i) { + std::construct_at(&slots[i]); + } + static_assert(std::has_single_bit(capacity), "capacity must be an integer power of 2"); + static_assert(alignof(Slot<T>) == hardware_interference_size, + "Slot must be aligned to cache line boundary to prevent false sharing"); + static_assert(sizeof(Slot<T>) % hardware_interference_size == 0, + "Slot size must be a multiple of cache line size to prevent " + "false sharing between adjacent slots"); + static_assert(sizeof(MPSCQueue) % hardware_interference_size == 0, + "Queue size must be a multiple of cache line size to " + "prevent false sharing between adjacent queues"); + } + + ~MPSCQueue() noexcept { + for (size_t i = 0; i < capacity; ++i) { + std::destroy_at(&slots[i]); + } + allocator.deallocate(slots, capacity + 1); + } + + // The queue must be both non-copyable and non-movable + MPSCQueue(const MPSCQueue&) = delete; + MPSCQueue& operator=(const MPSCQueue&) = delete; + + MPSCQueue(MPSCQueue&&) = delete; + MPSCQueue& operator=(MPSCQueue&&) = delete; + + void Push(const T& v) noexcept { + static_assert(std::is_nothrow_copy_constructible_v<T>, + "T must be nothrow copy constructible"); + emplace(v); + } + + template <typename P, typename = std::enable_if_t<std::is_nothrow_constructible_v<T, P&&>>> + void Push(P&& v) noexcept { + emplace(std::forward<P>(v)); + } + + void Pop(T& v, std::stop_token stop) noexcept { + auto const tail = tail_.fetch_add(1); + auto& slot = slots[idx(tail)]; + if (!slot.turn.test()) { + std::unique_lock lock{cv_mutex}; + cv.wait(lock, stop, [&slot] { return slot.turn.test(); }); + } + v = slot.move(); + slot.destroy(); + slot.turn.clear(); + slot.turn.notify_one(); + } + +private: + template <typename U = T> + struct Slot { + ~Slot() noexcept { + if (turn.test()) { + destroy(); + } + } + + template <typename... Args> + void construct(Args&&... args) noexcept { + static_assert(std::is_nothrow_constructible_v<U, Args&&...>, + "T must be nothrow constructible with Args&&..."); + std::construct_at(reinterpret_cast<U*>(&storage), std::forward<Args>(args)...); + } + + void destroy() noexcept { + static_assert(std::is_nothrow_destructible_v<U>, "T must be nothrow destructible"); + std::destroy_at(reinterpret_cast<U*>(&storage)); + } + + U&& move() noexcept { + return reinterpret_cast<U&&>(storage); + } + + // Align to avoid false sharing between adjacent slots + alignas(hardware_interference_size) std::atomic_flag turn{}; + struct aligned_store { + struct type { + alignas(U) unsigned char data[sizeof(U)]; + }; + }; + typename aligned_store::type storage; + }; + + template <typename... Args> + void emplace(Args&&... args) noexcept { + static_assert(std::is_nothrow_constructible_v<T, Args&&...>, + "T must be nothrow constructible with Args&&..."); + auto const head = head_.fetch_add(1); + auto& slot = slots[idx(head)]; + slot.turn.wait(true); + slot.construct(std::forward<Args>(args)...); + slot.turn.test_and_set(); + cv.notify_one(); + } + + constexpr size_t idx(size_t i) const noexcept { + return i & mask; + } + + static constexpr size_t mask = capacity - 1; + + // Align to avoid false sharing between head_ and tail_ + alignas(hardware_interference_size) std::atomic<size_t> head_{0}; + alignas(hardware_interference_size) std::atomic<size_t> tail_{0}; + + std::mutex cv_mutex; + std::condition_variable_any cv; + + Slot<T>* slots; + [[no_unique_address]] std::allocator<Slot<T>> allocator; + + static_assert(std::is_nothrow_copy_assignable_v<T> || std::is_nothrow_move_assignable_v<T>, + "T must be nothrow copy or move assignable"); + + static_assert(std::is_nothrow_destructible_v<T>, "T must be nothrow destructible"); +}; + +#ifdef _MSC_VER +#pragma warning(pop) +#endif + +} // namespace Common |