Skip to content

Commit

Permalink
Initial implementation of NUMA-aware threading
Browse files Browse the repository at this point in the history
  • Loading branch information
Vika-F committed Jan 30, 2025
1 parent 96370cc commit 07a8b54
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 33 deletions.
3 changes: 2 additions & 1 deletion cpp/daal/include/services/daal_defines.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,8 @@ struct IsSameType<U, U>
static const bool value = true;
};

const size_t DAAL_MALLOC_DEFAULT_ALIGNMENT = 64;
constexpr size_t DAAL_MALLOC_DEFAULT_ALIGNMENT = 64;
constexpr size_t DAAL_MAX_NUMA_COUNT = 8;

const int SERIALIZATION_HOMOGEN_NT_ID = 1000;
const int SERIALIZATION_AOS_NT_ID = 3000;
Expand Down
42 changes: 22 additions & 20 deletions cpp/daal/src/algorithms/covariance/covariance_impl.i
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
#include "src/threading/threading.h"
#include "src/externals/service_profiler.h"

#include <iostream>

using namespace daal::internal;
using namespace daal::services::internal;

Expand Down Expand Up @@ -170,28 +172,26 @@ services::Status updateDenseCrossProductAndSums(bool isNormalized, size_t nFeatu
services::Status status = hyperparameter->find(denseUpdateStepBlockSize, numRowsInBlock);
DAAL_CHECK_STATUS_VAR(status);
}
size_t numBlocks = nVectors / numRowsInBlock;
if (numBlocks * numRowsInBlock < nVectors)
{
numBlocks++;
}
size_t numRowsInLastBlock = numRowsInBlock + (nVectors - numBlocks * numRowsInBlock);

/* TLS data initialization */
SafeStatus safeStat;
daal::static_tls<tls_data_t<algorithmFPType, cpu> *> tls_data([=, &safeStat]() {
daal::tls<tls_data_t<algorithmFPType, cpu> *> tls_data([=, &safeStat]() {
auto tlsData = tls_data_t<algorithmFPType, cpu>::create(isNormalized, nFeatures);
if (!tlsData)
{
safeStat.add(services::ErrorMemoryAllocationFailed);
}
return tlsData;
});
DAAL_CHECK_SAFE_STATUS();

/* Threaded loop with syrk seq calls */
daal::static_threader_for(numBlocks, [&](int iBlock, size_t tid) {
struct tls_data_t<algorithmFPType, cpu> * tls_data_local = tls_data.local(tid);
daal::numa_threader_for(nVectors, numRowsInBlock, [&](size_t startRow, size_t endRow) {
size_t nRows = endRow - startRow;
if (startRow < 0 || endRow < 0 || endRow <= startRow || endRow > nVectors)
{
return;
}
struct tls_data_t<algorithmFPType, cpu> * tls_data_local = tls_data.local();
if (!tls_data_local)
{
return;
Expand All @@ -202,21 +202,23 @@ services::Status updateDenseCrossProductAndSums(bool isNormalized, size_t nFeatu
algorithmFPType alpha = 1.0;
algorithmFPType beta = 1.0;

size_t nRows = (iBlock < (numBlocks - 1)) ? numRowsInBlock : numRowsInLastBlock;
size_t startRow = iBlock * numRowsInBlock;

ReadRows<algorithmFPType, cpu, NumericTable> dataTableBD(dataTable, startRow, nRows);
ReadRows<algorithmFPType, cpu, NumericTable> dataTableBD(dataTable, size_t(startRow), size_t(nRows));
DAAL_CHECK_BLOCK_STATUS_THR(dataTableBD);
algorithmFPType * dataBlock_local = const_cast<algorithmFPType *>(dataTableBD.get());
algorithmFPType * dataBlockLocal = const_cast<algorithmFPType *>(dataTableBD.get());
if (!dataBlockLocal)
{
safeStat.add(services::ErrorMemoryAllocationFailed);
}

DAAL_INT nFeatures_local = nFeatures;
DAAL_INT nFeaturesLocal = nFeatures;
DAAL_INT nRowsLocal = nRows;
algorithmFPType * crossProduct_local = tls_data_local->crossProduct;
algorithmFPType * sums_local = tls_data_local->sums;

{
DAAL_ITTNOTIFY_SCOPED_TASK(gemmData);
BlasInst<algorithmFPType, cpu>::xxsyrk(&uplo, &trans, (DAAL_INT *)&nFeatures_local, (DAAL_INT *)&nRows, &alpha, dataBlock_local,
(DAAL_INT *)&nFeatures_local, &beta, crossProduct_local, (DAAL_INT *)&nFeatures_local);
BlasInst<algorithmFPType, cpu>::xxsyrk(&uplo, &trans, (DAAL_INT *)&nFeaturesLocal, (DAAL_INT *)&nRowsLocal, &alpha, dataBlockLocal,
(DAAL_INT *)&nFeaturesLocal, &beta, crossProduct_local, (DAAL_INT *)&nFeaturesLocal);
}

if (!isNormalized && (method == defaultDense) && !assumeCentered)
Expand All @@ -227,9 +229,9 @@ services::Status updateDenseCrossProductAndSums(bool isNormalized, size_t nFeatu
{
PRAGMA_IVDEP
PRAGMA_VECTOR_ALWAYS
for (DAAL_INT j = 0; j < nFeatures_local; j++)
for (DAAL_INT j = 0; j < nFeaturesLocal; j++)
{
sums_local[j] += dataBlock_local[i * nFeatures_local + j];
sums_local[j] += dataBlockLocal[i * nFeaturesLocal + j];
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/daal/src/algorithms/naivebayes/naivebayes_train_impl.i
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ Status collectCounters(const Parameter * nbPar, NumericTable * ntData, NumericTa
daal::tls<algorithmFPType *> tls_n_ci([=]() -> algorithmFPType * { return _CALLOC_<algorithmFPType, cpu>(p * c); });

SafeStatus safeStat;
daal::threader_for_blocked(n, n, [=, &tls_n_ci, &safeStat](algorithmFPType j0, algorithmFPType jn) {
daal::threader_for_blocked(n, 1, [=, &tls_n_ci, &safeStat](algorithmFPType j0, algorithmFPType jn) {
algorithmFPType * local_n_ci = tls_n_ci.local();
DAAL_CHECK_THR(local_n_ci, ErrorMemoryAllocationFailed);

Expand Down
86 changes: 80 additions & 6 deletions cpp/daal/src/threading/threading.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@
#include "services/daal_memory.h"
#include "src/algorithms/service_qsort.h"

#define TBB_PREVIEW_GLOBAL_CONTROL 1
#define TBB_PREVIEW_TASK_ARENA 1
/// #define TBB_PREVIEW_GLOBAL_CONTROL 1
/// #define TBB_PREVIEW_TASK_ARENA 1

#include <algorithm> // std::min
#include <vector> // std::vector
#include <stdlib.h> // malloc and free
#include <tbb/tbb.h>
#include <tbb/spin_mutex.h>
Expand All @@ -37,6 +38,8 @@
#include <tbb/task_arena.h>
#include "services/daal_atomic_int.h"

#include <iostream>

#if defined(TBB_INTERFACE_VERSION) && TBB_INTERFACE_VERSION >= 12002
#include <tbb/task.h>
#endif
Expand Down Expand Up @@ -75,6 +78,35 @@ DAAL_EXPORT void _daal_tbb_task_scheduler_handle_free(void *& schedulerHandle)
// #endif
}

DAAL_EXPORT size_t _initArenas()
{
#if defined(TARGET_X86_64)
size_t nNUMA = daal::threader_env()->getNumberOfNUMANodes();
if (nNUMA > daal::DAAL_MAX_NUMA_COUNT)
{
return -1;
}
if (nNUMA > 1)
{
std::vector<tbb::numa_node_id> numa_indexes = tbb::info::numa_nodes();
for (size_t i = 0; i < nNUMA; ++i)
{
tbb::task_arena * arena = new tbb::task_arena();
arena->initialize(tbb::task_arena::constraints(numa_indexes[i]));
daal::threader_env()->setArena(i, arena);
}
}
else
{
tbb::task_arena * arena = new tbb::task_arena(tbb::task_arena::constraints {}.set_max_threads_per_core(1));
arena->initialize();
daal::threader_env()->setArena(0, arena);
}

#endif
return 0;
}

DAAL_EXPORT size_t _setSchedulerHandle(void ** schedulerHandle)
{
#if defined(TARGET_X86_64)
Expand All @@ -84,7 +116,7 @@ DAAL_EXPORT size_t _setSchedulerHandle(void ** schedulerHandle)
*schedulerHandle = reinterpret_cast<void *>(new tbb::task_scheduler_handle(tbb::attach {}));
#endif
// It is necessary for initializing tbb in cases where DAAL does not use it.
tbb::task_arena {}.initialize();
_initArenas();
#endif
return 0;
}
Expand Down Expand Up @@ -161,6 +193,38 @@ DAAL_EXPORT void _daal_threader_for_blocked_size(size_t n, size_t block, const v
}
}

DAAL_EXPORT void _daal_threader_for_blocked_numa(size_t n, size_t block, const void * a, daal::functype_blocked_size func)
{
if (daal::threader_env()->getNumberOfThreads() > 1)
{
const size_t nNUMA = daal::threader_env()->getNumberOfNUMANodes();
if (nNUMA > 1 && n > nNUMA * block * 2)
{
const size_t nPerNUMA = (n + nNUMA - 1) / nNUMA;
for (size_t i = 0; i < nNUMA; ++i)
{
tbb::task_arena * arena = reinterpret_cast<tbb::task_arena *>(daal::threader_env()->getArena(i));
const size_t startIter = i * nPerNUMA;
const size_t endIter = std::min(startIter + nPerNUMA, n);

arena->execute([&]() {
tbb::parallel_for(tbb::blocked_range<size_t>(startIter, endIter, block * 2),
[=](tbb::blocked_range<size_t> r) -> void { return func(r.begin(), r.end(), a); });
});
}
}
else
{
tbb::parallel_for(tbb::blocked_range<size_t>(0ul, n, block * 2),
[=](tbb::blocked_range<size_t> r) -> void { return func(r.begin(), r.end(), a); });
}
}
else
{
func(0ul, n, a);
}
}

DAAL_EXPORT void _daal_threader_for_simple(int n, int reserved, const void * a, daal::functype func)
{
if (daal::threader_env()->getNumberOfThreads() > 1)
Expand Down Expand Up @@ -319,11 +383,11 @@ DAAL_PARALLEL_SORT_IMPL(daal::IdxValType<double>, pair_fp64_uint64)

#undef DAAL_PARALLEL_SORT_IMPL

DAAL_EXPORT void _daal_threader_for_blocked(int n, int reserved, const void * a, daal::functype2 func)
DAAL_EXPORT void _daal_threader_for_blocked(int n, size_t grainsize, const void * a, daal::functype2 func)
{
if (daal::threader_env()->getNumberOfThreads() > 1)
{
tbb::parallel_for(tbb::blocked_range<int>(0, n, 1), [&](tbb::blocked_range<int> r) { func(r.begin(), r.end() - r.begin(), a); });
tbb::parallel_for(tbb::blocked_range<int>(0, n, grainsize * 2), [&](tbb::blocked_range<int> r) { func(r.begin(), r.end() - r.begin(), a); });
}
else
{
Expand Down Expand Up @@ -836,4 +900,14 @@ DAAL_EXPORT void _daal_wait_task_group(void * taskGroupPtr)
}

namespace daal
{}
{
ThreaderEnvironment::ThreaderEnvironment() : _numberOfThreads(_daal_threader_get_max_threads())
{
#if defined(TARGET_X86_64)
_numberOfNUMANodes = tbb::info::numa_nodes().size();
#else
_numberOfNUMANodes = 1;
#endif
}

} // namespace daal
48 changes: 43 additions & 5 deletions cpp/daal/src/threading/threading.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ typedef void (*functype_int32ptr)(const int * i, const void * a);
typedef void (*functype_static)(size_t i, size_t tid, const void * a);
typedef void (*functype2)(int i, int n, const void * a);
typedef void (*functype_blocked_size)(size_t first, size_t last, const void * a);
typedef void (*functype)(int i, const void * a);
typedef void * (*tls_functype)(const void * a);
typedef void (*tls_reduce_functype)(void * p, const void * a);
typedef void (*functype_break)(int i, bool & needBreak, const void * a);
Expand All @@ -65,8 +66,9 @@ extern "C"
DAAL_EXPORT void _daal_threader_for_simple(int n, int threads_request, const void * a, daal::functype func);
DAAL_EXPORT void _daal_threader_for_int32ptr(const int * begin, const int * end, const void * a, daal::functype_int32ptr func);
DAAL_EXPORT void _daal_static_threader_for(size_t n, const void * a, daal::functype_static func);
DAAL_EXPORT void _daal_threader_for_blocked(int n, int threads_request, const void * a, daal::functype2 func);
DAAL_EXPORT void _daal_threader_for_blocked(int n, size_t grainsize, const void * a, daal::functype2 func);
DAAL_EXPORT void _daal_threader_for_blocked_size(size_t n, size_t block, const void * a, daal::functype_blocked_size func);
DAAL_EXPORT void _daal_threader_for_blocked_numa(size_t n, size_t block, const void * a, daal::functype_blocked_size func);
DAAL_EXPORT void _daal_threader_for_optional(int n, int threads_request, const void * a, daal::functype func);
DAAL_EXPORT void _daal_threader_for_break(int n, int threads_request, const void * a, daal::functype_break func);

Expand Down Expand Up @@ -105,6 +107,7 @@ extern "C"
DAAL_EXPORT void _daal_tbb_task_scheduler_handle_free(void *& schedulerHandle);
DAAL_EXPORT size_t _setNumberOfThreads(const size_t numThreads, void ** globalControl);
DAAL_EXPORT size_t _setSchedulerHandle(void ** schedulerHandle);
DAAL_EXPORT size_t _initArenas();

DAAL_EXPORT void * _daal_threader_env();

Expand Down Expand Up @@ -167,12 +170,25 @@ inline void threaded_scalable_free(void * ptr)
class ThreaderEnvironment
{
public:
ThreaderEnvironment() : _numberOfThreads(_daal_threader_get_max_threads()) {}
ThreaderEnvironment();
size_t getNumberOfThreads() const { return _numberOfThreads; }
void setNumberOfThreads(size_t value) { _numberOfThreads = value; }
size_t getNumberOfNUMANodes() const { return _numberOfNUMANodes; }
void * getArena(size_t i) const
{
if (i >= _numberOfNUMANodes) return nullptr;
return _arenas[i];
}

void setArena(size_t i, void * arena)
{
if (i < _numberOfNUMANodes) _arenas[i] = arena;
}

private:
size_t _numberOfThreads;
size_t _numberOfNUMANodes;
void * _arenas[DAAL_MAX_NUMA_COUNT];
};

inline ThreaderEnvironment * threader_env()
Expand All @@ -185,9 +201,16 @@ inline size_t threader_get_threads_number()
return threader_env()->getNumberOfThreads();
}

inline size_t threader_get_numa_number()
{
return threader_env()->getNumberOfNUMANodes();
}

inline size_t setSchedulerHandle(void ** schedulerHandle)
{
return _setSchedulerHandle(schedulerHandle);
size_t status = _setSchedulerHandle(schedulerHandle);
if (!status) return status;
return _initArenas();
}

inline size_t setNumberOfThreads(const size_t numThreads, void ** globalControl)
Expand Down Expand Up @@ -216,6 +239,13 @@ inline void threader_func_b(int i0, int in, const void * a)
func(i0, in);
}

template <typename F>
inline void threader_func_b_size_t(size_t i0, size_t in, const void * a)
{
const F & func = *static_cast<const F *>(a);
func(i0, in);
}

template <typename F>
inline void threader_func_break(int i, bool & needBreak, const void * a)
{
Expand Down Expand Up @@ -244,6 +274,14 @@ inline void threader_for(int n, int reserved, const F & func)
_daal_threader_for(n, reserved, a, threader_func<F>);
}

template <typename F>
inline void numa_threader_for(int n, int block, const F & func)
{
const void * a = static_cast<const void *>(&func);

_daal_threader_for_blocked_numa(n, block, a, threader_func_b_size_t<F>);
}

/// Pass a function to be executed in a for loop to the threading layer.
/// The maximal number of iterations in the loop is `2^63 - 1 (INT64_MAX)`.
/// The default scheduling of the threading layer is used to assign
Expand Down Expand Up @@ -353,11 +391,11 @@ inline void static_threader_for(size_t n, const F & func)
/// @param[in] func Callable object that processes the block of loop's iterations
/// `[beginRange, endRange)`.
template <typename F>
inline void threader_for_blocked(int n, int reserved, const F & func)
inline void threader_for_blocked(int n, size_t grainsize, const F & func)
{
const void * a = static_cast<const void *>(&func);

_daal_threader_for_blocked(n, reserved, a, threader_func_b<F>);
_daal_threader_for_blocked(n, grainsize, a, threader_func_b<F>);
}

template <typename F>
Expand Down

0 comments on commit 07a8b54

Please sign in to comment.