Program Listing for File thread_manager.h

Return to documentation for file (include/thread_manager.h)

#pragma once

#include <rte_eal.h>
#include <rte_lcore.h>

#include <atomic>
#include <functional>
#include <iostream>
#include <latch>
#include <memory>
#include <stdexcept>
#include <stop_token>
#include <tuple>
#include <vector>

template <typename F, typename... Ts>
concept IsValidArguments =
    (std::is_invocable_r_v<int, std::decay_t<F>, std::decay_t<Ts>...> ||
    std::is_invocable_r_v<int, std::decay_t<F>, std::stop_token, std::decay_t<Ts>...>);

class ThreadManager {
public:
    enum class LaunchThreadResult {
        Success,
        MaxLCoreReached,
        NotMainLCore,
        RemoteLaunchFail,
    };

    ThreadManager() : _stop_source{} {
        // Indicate that main lcore thread is the only one running
        running_threads.push_back({rte_lcore_id()});
    }

    ~ThreadManager() {
        request_stop();
        join();
    }

    void join() {
        rte_eal_mp_wait_lcore();
    }

    ThreadManager(const ThreadManager&) = delete;
    ThreadManager(const ThreadManager&&) = delete;

    template <typename F, typename... Ts>
    requires IsValidArguments<F, Ts...>
    [[nodiscard]] LaunchThreadResult LaunchThread(F&& f, Ts&&... args);

    unsigned GetTotalThreads() const {
        // Subtract one for the main lcore, this thread cannot be used
        return rte_lcore_count() - 1;
    }

    unsigned GetUnusedThreads() const {
        return rte_lcore_count() - running_threads.size();
    }

    [[nodiscard]] std::stop_source get_stop_source() {
        return _stop_source;
    };

    [[nodiscard]] std::stop_token get_stop_token() const {
        return _stop_source.get_token();
    };

    bool request_stop() noexcept {
        return get_stop_source().request_stop();
    }

private:
    template <typename F, typename... Ts>
    struct LCoreParams;

    template <typename F, typename... Ts>
    static int LCoreReceiver(void* arg);

    std::vector<unsigned> running_threads;

    std::stop_source _stop_source;
};

template <typename F, typename... Ts>
struct ThreadManager::LCoreParams {
    LCoreParams(F&& f, const std::stop_source& _stop_source, Ts&&... args)
        : f(std::forward<F>(f)),
          m_args(std::forward<Ts>(args)...),
          start_thread(2),
          _stop_source(_stop_source) { }

    F f;

    // Store the arguments in the tuple, move if possible
    std::tuple<std::decay_t<Ts>...> m_args;

    // Use an std::latch to synchronize the exit of this function to when the LCoreReceiver is
    // ready, both the calling thread and the callee should be ready. Therefore the latch is
    // initialized to 2
    std::latch start_thread;

    // The stop source can be passed by constant reference since the original stop source object
    // is guaranteed to outlive the threads
    const std::stop_source& _stop_source;
};

// The lambda and function arguments are forwarded using rvalue references
template <typename F, typename... Ts>
requires IsValidArguments<F, Ts...>
ThreadManager::LaunchThreadResult ThreadManager::LaunchThread(F&& f, Ts&&... args) {
    // Threads can only be launched from the main lcore
    if (rte_lcore_id() != rte_get_main_lcore())
        return ThreadManager::LaunchThreadResult::NotMainLCore;

    // Get the id of the next lcore
    unsigned lcore_id_to_run = rte_get_next_lcore(running_threads.back(), true, false);

    // rte_get_next_lcore returns RTE_MAX_LCORE when there are no hardware threads available
    // anymore
    if (lcore_id_to_run == RTE_MAX_LCORE)
        return ThreadManager::LaunchThreadResult::MaxLCoreReached;

    // Add the new lcore id to the list of running lcores
    running_threads.push_back(lcore_id_to_run);

    // Wrap an LCoreParams object in a shared pointer to safely pass it to the other thread
    std::shared_ptr<LCoreParams<F, Ts...>> lcore_params =
        std::make_shared<LCoreParams<F, Ts...>>(std::forward<F>(f), _stop_source,
        std::forward<Ts>(args)...);

    int res = rte_eal_remote_launch(LCoreReceiver<F, Ts...>, (void*) (&lcore_params),
        lcore_id_to_run);

    if (res)
        return ThreadManager::LaunchThreadResult::RemoteLaunchFail;

    // Wait until the LCoreReceiver is ready
    lcore_params->start_thread.arrive_and_wait();

    return ThreadManager::LaunchThreadResult::Success;
}

template <typename F, typename... Ts>
int ThreadManager::LCoreReceiver(void* arg) {
    // Make a copy of the shared pointer to guarantee access to data, reference count will be
    // incremented
    std::shared_ptr<LCoreParams<F, Ts...>> params_wrapper =
        *((std::shared_ptr<LCoreParams<F, Ts...>>*) (arg));

    params_wrapper->start_thread.arrive_and_wait();

    int retval;

    if constexpr (std::is_invocable_r_v<int, std::decay_t<F>, std::stop_token,
              std::decay_t<Ts>...>) {
        // Obtain a stop token from the stop source
        std::stop_token _stop_token = params_wrapper->_stop_source.get_token();

        // The actual function arguments are only available as a tuple, construct a lambda
        // that invokes the function with the stop token. The function arguments in the
        // original can be unpacked by calling this lambda with std::apply
        auto lambda_with_token = [_stop_token = std::move(_stop_token),
                         f = std::forward<F>(params_wrapper->f)](
                         auto&&... args) -> int {
            return std::invoke(f, std::move(_stop_token),
                std::forward<decltype(args)>(args)...);
        };

        // Apply original function arguments to lambda, original function will be called
        // with a movable stop token
        retval =
            std::apply(std::move(lambda_with_token), std::move(params_wrapper->m_args));

    } else {
        retval = std::apply(std::forward<F>(params_wrapper->f),
            std::move(params_wrapper->m_args));
    }

    return retval;
}