Program Listing for File thread_manager.h
↰ Return to documentation for file (include/thread_manager.h
)
#pragma once
#include <rte_eal.h>
#include <rte_lcore.h>
#include <atomic>
#include <functional>
#include <iostream>
#include <latch>
#include <memory>
#include <stdexcept>
#include <stop_token>
#include <tuple>
#include <vector>
template <typename F, typename... Ts>
concept IsValidArguments =
(std::is_invocable_r_v<int, std::decay_t<F>, std::decay_t<Ts>...> ||
std::is_invocable_r_v<int, std::decay_t<F>, std::stop_token, std::decay_t<Ts>...>);
class ThreadManager {
public:
enum class LaunchThreadResult {
Success,
MaxLCoreReached,
NotMainLCore,
RemoteLaunchFail,
};
ThreadManager() : _stop_source{} {
// Indicate that main lcore thread is the only one running
running_threads.push_back({rte_lcore_id()});
}
~ThreadManager() {
request_stop();
join();
}
void join() {
rte_eal_mp_wait_lcore();
}
ThreadManager(const ThreadManager&) = delete;
ThreadManager(const ThreadManager&&) = delete;
template <typename F, typename... Ts>
requires IsValidArguments<F, Ts...>
[[nodiscard]] LaunchThreadResult LaunchThread(F&& f, Ts&&... args);
unsigned GetTotalThreads() const {
// Subtract one for the main lcore, this thread cannot be used
return rte_lcore_count() - 1;
}
unsigned GetUnusedThreads() const {
return rte_lcore_count() - running_threads.size();
}
[[nodiscard]] std::stop_source get_stop_source() {
return _stop_source;
};
[[nodiscard]] std::stop_token get_stop_token() const {
return _stop_source.get_token();
};
bool request_stop() noexcept {
return get_stop_source().request_stop();
}
private:
template <typename F, typename... Ts>
struct LCoreParams;
template <typename F, typename... Ts>
static int LCoreReceiver(void* arg);
std::vector<unsigned> running_threads;
std::stop_source _stop_source;
};
template <typename F, typename... Ts>
struct ThreadManager::LCoreParams {
LCoreParams(F&& f, const std::stop_source& _stop_source, Ts&&... args)
: f(std::forward<F>(f)),
m_args(std::forward<Ts>(args)...),
start_thread(2),
_stop_source(_stop_source) { }
F f;
// Store the arguments in the tuple, move if possible
std::tuple<std::decay_t<Ts>...> m_args;
// Use an std::latch to synchronize the exit of this function to when the LCoreReceiver is
// ready, both the calling thread and the callee should be ready. Therefore the latch is
// initialized to 2
std::latch start_thread;
// The stop source can be passed by constant reference since the original stop source object
// is guaranteed to outlive the threads
const std::stop_source& _stop_source;
};
// The lambda and function arguments are forwarded using rvalue references
template <typename F, typename... Ts>
requires IsValidArguments<F, Ts...>
ThreadManager::LaunchThreadResult ThreadManager::LaunchThread(F&& f, Ts&&... args) {
// Threads can only be launched from the main lcore
if (rte_lcore_id() != rte_get_main_lcore())
return ThreadManager::LaunchThreadResult::NotMainLCore;
// Get the id of the next lcore
unsigned lcore_id_to_run = rte_get_next_lcore(running_threads.back(), true, false);
// rte_get_next_lcore returns RTE_MAX_LCORE when there are no hardware threads available
// anymore
if (lcore_id_to_run == RTE_MAX_LCORE)
return ThreadManager::LaunchThreadResult::MaxLCoreReached;
// Add the new lcore id to the list of running lcores
running_threads.push_back(lcore_id_to_run);
// Wrap an LCoreParams object in a shared pointer to safely pass it to the other thread
std::shared_ptr<LCoreParams<F, Ts...>> lcore_params =
std::make_shared<LCoreParams<F, Ts...>>(std::forward<F>(f), _stop_source,
std::forward<Ts>(args)...);
int res = rte_eal_remote_launch(LCoreReceiver<F, Ts...>, (void*) (&lcore_params),
lcore_id_to_run);
if (res)
return ThreadManager::LaunchThreadResult::RemoteLaunchFail;
// Wait until the LCoreReceiver is ready
lcore_params->start_thread.arrive_and_wait();
return ThreadManager::LaunchThreadResult::Success;
}
template <typename F, typename... Ts>
int ThreadManager::LCoreReceiver(void* arg) {
// Make a copy of the shared pointer to guarantee access to data, reference count will be
// incremented
std::shared_ptr<LCoreParams<F, Ts...>> params_wrapper =
*((std::shared_ptr<LCoreParams<F, Ts...>>*) (arg));
params_wrapper->start_thread.arrive_and_wait();
int retval;
if constexpr (std::is_invocable_r_v<int, std::decay_t<F>, std::stop_token,
std::decay_t<Ts>...>) {
// Obtain a stop token from the stop source
std::stop_token _stop_token = params_wrapper->_stop_source.get_token();
// The actual function arguments are only available as a tuple, construct a lambda
// that invokes the function with the stop token. The function arguments in the
// original can be unpacked by calling this lambda with std::apply
auto lambda_with_token = [_stop_token = std::move(_stop_token),
f = std::forward<F>(params_wrapper->f)](
auto&&... args) -> int {
return std::invoke(f, std::move(_stop_token),
std::forward<decltype(args)>(args)...);
};
// Apply original function arguments to lambda, original function will be called
// with a movable stop token
retval =
std::apply(std::move(lambda_with_token), std::move(params_wrapper->m_args));
} else {
retval = std::apply(std::forward<F>(params_wrapper->f),
std::move(params_wrapper->m_args));
}
return retval;
}