1
0
Fork 0

threadsafe_queue: Add std::stop_token overload to PopWait

Useful for jthreads which make use of the threadsafe queues.
This commit is contained in:
ameerj 2023-06-24 10:11:54 +03:00 committed by GPUCode
parent aa39430e2c
commit a1443356f1
2 changed files with 25 additions and 6 deletions

View File

@ -27,13 +27,13 @@
#include "common/common_paths.h"
#include "common/file_util.h"
#include "common/literals.h"
#include "common/settings.h"
#include "common/thread.h"
#include "common/logging/backend.h"
#include "common/logging/log.h"
#include "common/logging/log_entry.h"
#include "common/logging/text_formatter.h"
#include "common/settings.h"
#include "common/string_util.h"
#include "common/thread.h"
#include "common/threadsafe_queue.h"
namespace Common::Log {

View File

@ -13,8 +13,10 @@
#include <mutex>
#include <utility>
#include "common/polyfill_thread.h"
namespace Common {
template <typename T>
template <typename T, bool with_stop_token = false>
class SPSCQueue {
public:
SPSCQueue() {
@ -93,6 +95,19 @@ public:
return t;
}
T PopWait(std::stop_token stop_token) {
if (Empty()) {
std::unique_lock lock{cv_mutex};
CondvarWait(cv, lock, stop_token, [this] { return !Empty(); });
}
if (stop_token.stop_requested()) {
return T{};
}
T t;
Pop(t);
return t;
}
// not thread-safe
void Clear() {
size.store(0);
@ -121,13 +136,13 @@ private:
ElementPtr* read_ptr;
std::atomic_size_t size{0};
std::mutex cv_mutex;
std::condition_variable cv;
std::conditional_t<with_stop_token, std::condition_variable_any, std::condition_variable> cv;
};
// a simple thread-safe,
// single reader, multiple writer queue
template <typename T>
template <typename T, bool with_stop_token = false>
class MPSCQueue {
public:
[[nodiscard]] std::size_t Size() const {
@ -160,13 +175,17 @@ public:
return spsc_queue.PopWait();
}
T PopWait(std::stop_token stop_token) {
return spsc_queue.PopWait(stop_token);
}
// not thread-safe
void Clear() {
spsc_queue.Clear();
}
private:
SPSCQueue<T> spsc_queue;
SPSCQueue<T, with_stop_token> spsc_queue;
std::mutex write_lock;
};
} // namespace Common