From f28dd32275c1feba4854abad30ff5e21a7b39440 Mon Sep 17 00:00:00 2001 From: ReinUsesLisp Date: Mon, 22 Mar 2021 21:00:48 -0300 Subject: common/thread_worker: Add wait for requests method --- src/common/thread_worker.cpp | 9 +++++++++ 1 file changed, 9 insertions(+) (limited to 'src/common/thread_worker.cpp') diff --git a/src/common/thread_worker.cpp b/src/common/thread_worker.cpp index 8f9bf447a..745918c7e 100644 --- a/src/common/thread_worker.cpp +++ b/src/common/thread_worker.cpp @@ -29,6 +29,10 @@ ThreadWorker::ThreadWorker(std::size_t num_workers, const std::string& name) { } task = std::move(requests.front()); requests.pop(); + + if (requests.empty()) { + wait_condition.notify_one(); + } } task(); @@ -55,4 +59,9 @@ void ThreadWorker::QueueWork(std::function&& work) { condition.notify_one(); } +void ThreadWorker::WaitForRequests() { + std::unique_lock lock{queue_mutex}; + wait_condition.wait(lock, [this] { return stop || requests.empty(); }); +} + } // namespace Common -- cgit v1.2.3 From bf5b5c1bf43946039d91f78253599c9996f86057 Mon Sep 17 00:00:00 2001 From: ReinUsesLisp Date: Thu, 1 Apr 2021 01:05:45 -0300 Subject: common/thread_worker: Use unique function --- src/common/thread_worker.cpp | 46 +++++++++++++++++++------------------------- 1 file changed, 20 insertions(+), 26 deletions(-) (limited to 'src/common/thread_worker.cpp') diff --git a/src/common/thread_worker.cpp b/src/common/thread_worker.cpp index 745918c7e..f4d8bb0f0 100644 --- a/src/common/thread_worker.cpp +++ b/src/common/thread_worker.cpp @@ -8,36 +8,30 @@ namespace Common { ThreadWorker::ThreadWorker(std::size_t num_workers, const std::string& name) { - for (std::size_t i = 0; i < num_workers; ++i) - threads.emplace_back([this, thread_name{std::string{name}}] { - Common::SetCurrentThreadName(thread_name.c_str()); + const auto lambda = [this, thread_name{std::string{name}}] { + Common::SetCurrentThreadName(thread_name.c_str()); - // Wait for first request + while (!stop) { + UniqueFunction task; { std::unique_lock lock{queue_mutex}; + if (requests.empty()) { + wait_condition.notify_all(); + } condition.wait(lock, [this] { return stop || !requests.empty(); }); - } - - while (true) { - std::function task; - - { - std::unique_lock lock{queue_mutex}; - condition.wait(lock, [this] { return stop || !requests.empty(); }); - if (stop || requests.empty()) { - return; - } - task = std::move(requests.front()); - requests.pop(); - - if (requests.empty()) { - wait_condition.notify_one(); - } + if (stop || requests.empty()) { + break; } - - task(); + task = std::move(requests.front()); + requests.pop(); } - }); + task(); + } + wait_condition.notify_all(); + }; + for (size_t i = 0; i < num_workers; ++i) { + threads.emplace_back(lambda); + } } ThreadWorker::~ThreadWorker() { @@ -51,10 +45,10 @@ ThreadWorker::~ThreadWorker() { } } -void ThreadWorker::QueueWork(std::function&& work) { +void ThreadWorker::QueueWork(UniqueFunction work) { { std::unique_lock lock{queue_mutex}; - requests.emplace(work); + requests.emplace(std::move(work)); } condition.notify_one(); } -- cgit v1.2.3 From a10e112e6436b30c9eb5ca2a82c94f83205bbc34 Mon Sep 17 00:00:00 2001 From: FernandoS27 Date: Tue, 6 Apr 2021 04:23:02 +0200 Subject: common/thread_worker: Fix data race --- src/common/thread_worker.cpp | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) (limited to 'src/common/thread_worker.cpp') diff --git a/src/common/thread_worker.cpp b/src/common/thread_worker.cpp index f4d8bb0f0..fd130dfb4 100644 --- a/src/common/thread_worker.cpp +++ b/src/common/thread_worker.cpp @@ -8,9 +8,17 @@ namespace Common { ThreadWorker::ThreadWorker(std::size_t num_workers, const std::string& name) { + workers_queued.store(static_cast(num_workers), std::memory_order_release); const auto lambda = [this, thread_name{std::string{name}}] { Common::SetCurrentThreadName(thread_name.c_str()); + // TODO(Blinkhawk): Change the design, this is very prone to data races + // Wait for first request + { + std::unique_lock lock{queue_mutex}; + condition.wait(lock, [this] { return stop || !requests.empty(); }); + } + while (!stop) { UniqueFunction task; { @@ -26,7 +34,9 @@ ThreadWorker::ThreadWorker(std::size_t num_workers, const std::string& name) { requests.pop(); } task(); + work_done++; } + workers_stopped++; wait_condition.notify_all(); }; for (size_t i = 0; i < num_workers; ++i) { @@ -49,13 +59,15 @@ void ThreadWorker::QueueWork(UniqueFunction work) { { std::unique_lock lock{queue_mutex}; requests.emplace(std::move(work)); + work_scheduled++; } condition.notify_one(); } void ThreadWorker::WaitForRequests() { std::unique_lock lock{queue_mutex}; - wait_condition.wait(lock, [this] { return stop || requests.empty(); }); + wait_condition.wait( + lock, [this] { return workers_stopped >= workers_queued || work_done >= work_scheduled; }); } } // namespace Common -- cgit v1.2.3 From c147e9a90e92ceec17d778d3c6e5cf6f028109b3 Mon Sep 17 00:00:00 2001 From: FernandoS27 Date: Tue, 6 Apr 2021 06:02:44 +0200 Subject: common/thread_worker: Simplify logic --- src/common/thread_worker.cpp | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) (limited to 'src/common/thread_worker.cpp') diff --git a/src/common/thread_worker.cpp b/src/common/thread_worker.cpp index fd130dfb4..32be49b15 100644 --- a/src/common/thread_worker.cpp +++ b/src/common/thread_worker.cpp @@ -12,13 +12,6 @@ ThreadWorker::ThreadWorker(std::size_t num_workers, const std::string& name) { const auto lambda = [this, thread_name{std::string{name}}] { Common::SetCurrentThreadName(thread_name.c_str()); - // TODO(Blinkhawk): Change the design, this is very prone to data races - // Wait for first request - { - std::unique_lock lock{queue_mutex}; - condition.wait(lock, [this] { return stop || !requests.empty(); }); - } - while (!stop) { UniqueFunction task; { @@ -27,7 +20,7 @@ ThreadWorker::ThreadWorker(std::size_t num_workers, const std::string& name) { wait_condition.notify_all(); } condition.wait(lock, [this] { return stop || !requests.empty(); }); - if (stop || requests.empty()) { + if (stop) { break; } task = std::move(requests.front()); -- cgit v1.2.3 From da34d3704405665b68d3d992f37a7eeb541238af Mon Sep 17 00:00:00 2001 From: ReinUsesLisp Date: Tue, 25 May 2021 20:37:06 -0300 Subject: common/thread_worker: Add support for stateful threads --- src/common/thread_worker.cpp | 66 -------------------------------------------- 1 file changed, 66 deletions(-) delete mode 100644 src/common/thread_worker.cpp (limited to 'src/common/thread_worker.cpp') diff --git a/src/common/thread_worker.cpp b/src/common/thread_worker.cpp deleted file mode 100644 index 32be49b15..000000000 --- a/src/common/thread_worker.cpp +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright 2020 yuzu emulator team -// Licensed under GPLv2 or any later version -// Refer to the license.txt file included. - -#include "common/thread.h" -#include "common/thread_worker.h" - -namespace Common { - -ThreadWorker::ThreadWorker(std::size_t num_workers, const std::string& name) { - workers_queued.store(static_cast(num_workers), std::memory_order_release); - const auto lambda = [this, thread_name{std::string{name}}] { - Common::SetCurrentThreadName(thread_name.c_str()); - - while (!stop) { - UniqueFunction task; - { - std::unique_lock lock{queue_mutex}; - if (requests.empty()) { - wait_condition.notify_all(); - } - condition.wait(lock, [this] { return stop || !requests.empty(); }); - if (stop) { - break; - } - task = std::move(requests.front()); - requests.pop(); - } - task(); - work_done++; - } - workers_stopped++; - wait_condition.notify_all(); - }; - for (size_t i = 0; i < num_workers; ++i) { - threads.emplace_back(lambda); - } -} - -ThreadWorker::~ThreadWorker() { - { - std::unique_lock lock{queue_mutex}; - stop = true; - } - condition.notify_all(); - for (std::thread& thread : threads) { - thread.join(); - } -} - -void ThreadWorker::QueueWork(UniqueFunction work) { - { - std::unique_lock lock{queue_mutex}; - requests.emplace(std::move(work)); - work_scheduled++; - } - condition.notify_one(); -} - -void ThreadWorker::WaitForRequests() { - std::unique_lock lock{queue_mutex}; - wait_condition.wait( - lock, [this] { return workers_stopped >= workers_queued || work_done >= work_scheduled; }); -} - -} // namespace Common -- cgit v1.2.3