Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a simple thread pool #1746

Merged
merged 2 commits into from
Jan 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/samplereader/SampleReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#pragma once

#include "utils/CryptoUtils.h"
#include "utils/ThreadPool.h"

#include <bento4/Ap4.h>

Expand Down Expand Up @@ -96,7 +97,8 @@ class ATTR_DLL_LOCAL ISampleReader
*/
void ReadSampleAsync()
{
m_readSampleAsyncState = std::async(std::launch::async, &ISampleReader::ReadSample, this);
m_readSampleAsyncState =
UTILS::THREAD::GlobalThreadPool.Execute(&ISampleReader::ReadSample, this);
}

/*!
Expand Down
2 changes: 2 additions & 0 deletions src/utils/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ set(SOURCES
FileUtils.cpp
JsonUtils.cpp
StringUtils.cpp
ThreadPool.cpp
UrlUtils.cpp
Utils.cpp
XMLUtils.cpp
Expand All @@ -21,6 +22,7 @@ set(HEADERS
JsonUtils.h
log.h
StringUtils.h
ThreadPool.h
UrlUtils.h
Utils.h
XMLUtils.h
Expand Down
72 changes: 72 additions & 0 deletions src/utils/ThreadPool.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright (C) 2025 Team Kodi
* This file is part of Kodi - https://kodi.tv
*
* SPDX-License-Identifier: GPL-2.0-or-later
* See LICENSES/README.md for more information.
*/

#include "ThreadPool.h"
neo1973 marked this conversation as resolved.
Show resolved Hide resolved

UTILS::THREAD::ThreadPool::~ThreadPool()
{
Stop();
}

void UTILS::THREAD::ThreadPool::Stop()
{
{
std::lock_guard lock(m_mutex);
m_isStopped = true;
}

m_condVar.notify_all();

for (const auto& executor : m_executors)
executor->Join();
}

std::optional<std::function<void()>> UTILS::THREAD::ThreadPool::TakeTask()
{
std::unique_lock lock(m_mutex);

m_condVar.wait(lock, [this]() { return !m_taskQueue.empty() || m_isStopped; });

if (m_isStopped)
return {};

++m_activeExecutors;
auto func = std::move(m_taskQueue.front());
m_taskQueue.pop();
return {std::move(func)};
}

void UTILS::THREAD::ThreadPool::TaskFinished()
{
std::lock_guard lock(m_mutex);
--m_activeExecutors;
}

UTILS::THREAD::ThreadPool::Executor::Executor(ThreadPool& threadPool) : m_threadPool(&threadPool)
{
m_thread = std::thread(&Executor::Run, this);
}

void UTILS::THREAD::ThreadPool::Executor::Join()
{
m_thread.join();
}

void UTILS::THREAD::ThreadPool::Executor::Run()
{
while (true)
{
auto f = m_threadPool->TakeTask();
if (!f)
return;
(*f)();
m_threadPool->TaskFinished();
}
}

UTILS::THREAD::ThreadPool UTILS::THREAD::GlobalThreadPool;
117 changes: 117 additions & 0 deletions src/utils/ThreadPool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright (C) 2025 Team Kodi
* This file is part of Kodi - https://kodi.tv
*
* SPDX-License-Identifier: GPL-2.0-or-later
* See LICENSES/README.md for more information.
*/

#include <condition_variable>
neo1973 marked this conversation as resolved.
Show resolved Hide resolved
#include <exception>
#include <functional>
#include <future>
#include <mutex>
#include <optional>
#include <queue>
#include <thread>
#include <vector>

#pragma once

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add appropriate namespaces such as the others utils files
at least the "utils"

namespace UTILS
{

namespace UTILS::THREAD
{

/*!
* \brief A simple thread pool
*
* The thread pool automatically grows if there are more concurrent tasks then
* threads. Automatic shrinking is not implemented but this could easily be
* added if desired.
*/
class ThreadPool
{
public:
ThreadPool() = default;
~ThreadPool();

/*!
* \brief Execute a callable on a thread pool thread
*
* \attention If the `std::future` obtained from this function is not moved
* from or bound to a reference, the destructor of the `std::future` will
* block at the end of the full expression until the asynchronous operation
* completes, essentially making the call synchronous.
*/
template<class F, class... Args>
[[nodiscard]] auto Execute(F&& f, Args&&... args) -> auto
{
using return_type = decltype(std::invoke(f, args...));

std::future<return_type> future;

{
std::lock_guard lock(m_mutex);

if (m_isStopped)
{
std::promise<return_type> p;
p.set_exception(
std::make_exception_ptr(std::runtime_error("ThreadPool has already been stopped")));
return p.get_future();
}

auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
future = task->get_future();

m_taskQueue.emplace([task = std::move(task)]() { (*task)(); });

// Check if there are enough executors for the number of task, if not create more
if (m_executors.size() - m_activeExecutors < m_taskQueue.size())
m_executors.emplace_back(std::make_unique<Executor>(*this));
}

m_condVar.notify_one();

return future;
}

/*!
* \brief Don't allow execution of new tasks and block until all running tasks completed
*/
void Stop();

private:
class Executor
{
public:
Executor(ThreadPool& threadPool);
void Join();

private:
ThreadPool* m_threadPool;
std::thread m_thread;
void Run();
};

/*!
* \brief Returns a task or nothing, if nothing is returned the Executor should exit
*/
std::optional<std::function<void()>> TakeTask();
/*!
* \brief Informs the thread pool that a task is done and the Executor is available again
*/
void TaskFinished();

// Executors need to be on the heap for a stable `this` pointer
std::vector<std::unique_ptr<Executor>> m_executors;
std::queue<std::function<void()>> m_taskQueue;
std::mutex m_mutex;
std::condition_variable m_condVar;
size_t m_activeExecutors{0};
bool m_isStopped{false};
};

extern ThreadPool GlobalThreadPool;

} // namespace UTILS::THREAD
Loading