bounded_threadsafe_queue: Deduplicate and add PushModes
Adds the PushModes Try and Wait to allow producers to specify how they want to push their data to the queue if the queue is full. If the queue is full: - Try will fail to push to the queue, returning false. Try only returns true if it successfully pushes to the queue. This may result in items not being pushed into the queue. - Wait will wait until a slot is available to push to the queue, resulting in potential for deadlock if a consumer is not running.
This commit is contained in:
parent
15d573194c
commit
407dc917f1
|
@ -23,97 +23,21 @@ class SPSCQueue {
|
||||||
|
|
||||||
public:
|
public:
|
||||||
bool TryPush(T&& t) {
|
bool TryPush(T&& t) {
|
||||||
const size_t write_index = m_write_index.load();
|
return Push<PushMode::Try>(std::move(t));
|
||||||
|
|
||||||
// Check if we have free slots to write to.
|
|
||||||
if ((write_index - m_read_index.load()) == Capacity) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Determine the position to write to.
|
|
||||||
const size_t pos = write_index % Capacity;
|
|
||||||
|
|
||||||
// Push into the queue.
|
|
||||||
m_data[pos] = std::move(t);
|
|
||||||
|
|
||||||
// Increment the write index.
|
|
||||||
++m_write_index;
|
|
||||||
|
|
||||||
// Notify the consumer that we have pushed into the queue.
|
|
||||||
std::scoped_lock lock{cv_mutex};
|
|
||||||
cv.notify_one();
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename... Args>
|
template <typename... Args>
|
||||||
bool TryPush(Args&&... args) {
|
bool TryEmplace(Args&&... args) {
|
||||||
const size_t write_index = m_write_index.load();
|
return Emplace<PushMode::Try>(std::forward<Args>(args)...);
|
||||||
|
|
||||||
// Check if we have free slots to write to.
|
|
||||||
if ((write_index - m_read_index.load()) == Capacity) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Determine the position to write to.
|
|
||||||
const size_t pos = write_index % Capacity;
|
|
||||||
|
|
||||||
// Emplace into the queue.
|
|
||||||
std::construct_at(std::addressof(m_data[pos]), std::forward<Args>(args)...);
|
|
||||||
|
|
||||||
// Increment the write index.
|
|
||||||
++m_write_index;
|
|
||||||
|
|
||||||
// Notify the consumer that we have pushed into the queue.
|
|
||||||
std::scoped_lock lock{cv_mutex};
|
|
||||||
cv.notify_one();
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void Push(T&& t) {
|
void PushWait(T&& t) {
|
||||||
const size_t write_index = m_write_index.load();
|
Push<PushMode::Wait>(std::move(t));
|
||||||
|
|
||||||
// Wait until we have free slots to write to.
|
|
||||||
while ((write_index - m_read_index.load()) == Capacity) {
|
|
||||||
std::this_thread::yield();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Determine the position to write to.
|
|
||||||
const size_t pos = write_index % Capacity;
|
|
||||||
|
|
||||||
// Push into the queue.
|
|
||||||
m_data[pos] = std::move(t);
|
|
||||||
|
|
||||||
// Increment the write index.
|
|
||||||
++m_write_index;
|
|
||||||
|
|
||||||
// Notify the consumer that we have pushed into the queue.
|
|
||||||
std::scoped_lock lock{cv_mutex};
|
|
||||||
cv.notify_one();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename... Args>
|
template <typename... Args>
|
||||||
void Push(Args&&... args) {
|
void EmplaceWait(Args&&... args) {
|
||||||
const size_t write_index = m_write_index.load();
|
Emplace<PushMode::Wait>(std::forward<Args>(args)...);
|
||||||
|
|
||||||
// Wait until we have free slots to write to.
|
|
||||||
while ((write_index - m_read_index.load()) == Capacity) {
|
|
||||||
std::this_thread::yield();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Determine the position to write to.
|
|
||||||
const size_t pos = write_index % Capacity;
|
|
||||||
|
|
||||||
// Emplace into the queue.
|
|
||||||
std::construct_at(std::addressof(m_data[pos]), std::forward<Args>(args)...);
|
|
||||||
|
|
||||||
// Increment the write index.
|
|
||||||
++m_write_index;
|
|
||||||
|
|
||||||
// Notify the consumer that we have pushed into the queue.
|
|
||||||
std::scoped_lock lock{cv_mutex};
|
|
||||||
cv.notify_one();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool TryPop(T& t) {
|
bool TryPop(T& t) {
|
||||||
|
@ -147,6 +71,80 @@ public:
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
enum class PushMode {
|
||||||
|
Try,
|
||||||
|
Wait,
|
||||||
|
Count,
|
||||||
|
};
|
||||||
|
|
||||||
|
template <PushMode Mode>
|
||||||
|
bool Push(T&& t) {
|
||||||
|
const size_t write_index = m_write_index.load();
|
||||||
|
|
||||||
|
if constexpr (Mode == PushMode::Try) {
|
||||||
|
// Check if we have free slots to write to.
|
||||||
|
if ((write_index - m_read_index.load()) == Capacity) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
} else if constexpr (Mode == PushMode::Wait) {
|
||||||
|
// Wait until we have free slots to write to.
|
||||||
|
while ((write_index - m_read_index.load()) == Capacity) {
|
||||||
|
std::this_thread::yield();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
static_assert(Mode < PushMode::Count, "Invalid PushMode.");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Determine the position to write to.
|
||||||
|
const size_t pos = write_index % Capacity;
|
||||||
|
|
||||||
|
// Push into the queue.
|
||||||
|
m_data[pos] = std::move(t);
|
||||||
|
|
||||||
|
// Increment the write index.
|
||||||
|
++m_write_index;
|
||||||
|
|
||||||
|
// Notify the consumer that we have pushed into the queue.
|
||||||
|
std::scoped_lock lock{cv_mutex};
|
||||||
|
cv.notify_one();
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
template <PushMode Mode, typename... Args>
|
||||||
|
bool Emplace(Args&&... args) {
|
||||||
|
const size_t write_index = m_write_index.load();
|
||||||
|
|
||||||
|
if constexpr (Mode == PushMode::Try) {
|
||||||
|
// Check if we have free slots to write to.
|
||||||
|
if ((write_index - m_read_index.load()) == Capacity) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
} else if constexpr (Mode == PushMode::Wait) {
|
||||||
|
// Wait until we have free slots to write to.
|
||||||
|
while ((write_index - m_read_index.load()) == Capacity) {
|
||||||
|
std::this_thread::yield();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
static_assert(Mode < PushMode::Count, "Invalid PushMode.");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Determine the position to write to.
|
||||||
|
const size_t pos = write_index % Capacity;
|
||||||
|
|
||||||
|
// Emplace into the queue.
|
||||||
|
std::construct_at(std::addressof(m_data[pos]), std::forward<Args>(args)...);
|
||||||
|
|
||||||
|
// Increment the write index.
|
||||||
|
++m_write_index;
|
||||||
|
|
||||||
|
// Notify the consumer that we have pushed into the queue.
|
||||||
|
std::scoped_lock lock{cv_mutex};
|
||||||
|
cv.notify_one();
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
void Pop() {
|
void Pop() {
|
||||||
const size_t read_index = m_read_index.load();
|
const size_t read_index = m_read_index.load();
|
||||||
|
|
||||||
|
@ -208,20 +206,20 @@ public:
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename... Args>
|
template <typename... Args>
|
||||||
bool TryPush(Args&&... args) {
|
bool TryEmplace(Args&&... args) {
|
||||||
std::scoped_lock lock{write_mutex};
|
std::scoped_lock lock{write_mutex};
|
||||||
return spsc_queue.TryPush(std::forward<Args>(args)...);
|
return spsc_queue.TryEmplace(std::forward<Args>(args)...);
|
||||||
}
|
}
|
||||||
|
|
||||||
void Push(T&& t) {
|
void PushWait(T&& t) {
|
||||||
std::scoped_lock lock{write_mutex};
|
std::scoped_lock lock{write_mutex};
|
||||||
spsc_queue.Push(std::move(t));
|
spsc_queue.PushWait(std::move(t));
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename... Args>
|
template <typename... Args>
|
||||||
void Push(Args&&... args) {
|
void EmplaceWait(Args&&... args) {
|
||||||
std::scoped_lock lock{write_mutex};
|
std::scoped_lock lock{write_mutex};
|
||||||
spsc_queue.Push(std::forward<Args>(args)...);
|
spsc_queue.EmplaceWait(std::forward<Args>(args)...);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool TryPop(T& t) {
|
bool TryPop(T& t) {
|
||||||
|
@ -262,20 +260,20 @@ public:
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename... Args>
|
template <typename... Args>
|
||||||
bool TryPush(Args&&... args) {
|
bool TryEmplace(Args&&... args) {
|
||||||
std::scoped_lock lock{write_mutex};
|
std::scoped_lock lock{write_mutex};
|
||||||
return spsc_queue.TryPush(std::forward<Args>(args)...);
|
return spsc_queue.TryEmplace(std::forward<Args>(args)...);
|
||||||
}
|
}
|
||||||
|
|
||||||
void Push(T&& t) {
|
void PushWait(T&& t) {
|
||||||
std::scoped_lock lock{write_mutex};
|
std::scoped_lock lock{write_mutex};
|
||||||
spsc_queue.Push(std::move(t));
|
spsc_queue.PushWait(std::move(t));
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename... Args>
|
template <typename... Args>
|
||||||
void Push(Args&&... args) {
|
void EmplaceWait(Args&&... args) {
|
||||||
std::scoped_lock lock{write_mutex};
|
std::scoped_lock lock{write_mutex};
|
||||||
spsc_queue.Push(std::forward<Args>(args)...);
|
spsc_queue.EmplaceWait(std::forward<Args>(args)...);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool TryPop(T& t) {
|
bool TryPop(T& t) {
|
||||||
|
|
|
@ -207,7 +207,7 @@ public:
|
||||||
if (!filter.CheckMessage(log_class, log_level)) {
|
if (!filter.CheckMessage(log_class, log_level)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
message_queue.Push(
|
message_queue.EmplaceWait(
|
||||||
CreateEntry(log_class, log_level, filename, line_num, function, std::move(message)));
|
CreateEntry(log_class, log_level, filename, line_num, function, std::move(message)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -118,7 +118,7 @@ u64 ThreadManager::PushCommand(CommandData&& command_data, bool block) {
|
||||||
|
|
||||||
std::unique_lock lk(state.write_lock);
|
std::unique_lock lk(state.write_lock);
|
||||||
const u64 fence{++state.last_fence};
|
const u64 fence{++state.last_fence};
|
||||||
state.queue.Push(std::move(command_data), fence, block);
|
state.queue.EmplaceWait(std::move(command_data), fence, block);
|
||||||
|
|
||||||
if (block) {
|
if (block) {
|
||||||
Common::CondvarWait(state.cv, lk, thread.get_stop_token(), [this, fence] {
|
Common::CondvarWait(state.cv, lk, thread.get_stop_token(), [this, fence] {
|
||||||
|
|
Reference in New Issue