Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve scalability in task_group #1310

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 65 additions & 1 deletion include/oneapi/tbb/detail/_task.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2020-2023 Intel Corporation
Copyright (c) 2020-2024 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -53,6 +53,7 @@ TBB_EXPORT void __TBB_EXPORTED_FUNC execute_and_wait(d1::task& t, d1::task_group
TBB_EXPORT void __TBB_EXPORTED_FUNC wait(d1::wait_context&, d1::task_group_context& ctx);
TBB_EXPORT d1::slot_id __TBB_EXPORTED_FUNC execution_slot(const d1::execution_data*);
TBB_EXPORT d1::task_group_context* __TBB_EXPORTED_FUNC current_context();
TBB_EXPORT d1::task* __TBB_EXPORTED_FUNC current_task();

// Do not place under __TBB_RESUMABLE_TASKS. It is a stub for unsupported platforms.
struct suspend_point_type;
Expand Down Expand Up @@ -147,6 +148,68 @@ class wait_context {
}
};

class alignas(max_nfs_size) distributed_reference_counter {
public:
// Despite the internal reference count is uin64_t we limit the user interface with uint32_t
// to preserve a part of the internal reference count for special needs.
distributed_reference_counter(std::uint32_t ref_count, distributed_reference_counter* parent,
wait_context& wo, small_object_allocator& allocator)
: m_ref_count{ref_count}
, m_parent_ref(parent)
, m_wait_ctx(wo)
, m_allocator(allocator)
{
suppress_unused_warning(m_version_and_traits);
}

distributed_reference_counter(const distributed_reference_counter&) = delete;

virtual ~distributed_reference_counter() {
__TBB_ASSERT(m_ref_count.load(std::memory_order_relaxed) == 0, nullptr);
}

void reserve(std::uint32_t delta = 1) {
add_reference(delta);
}

void release(std::uint32_t delta = 1) {
add_reference(-std::int64_t(delta));
}

protected:
void release_parent() {
if (m_parent_ref) {
m_parent_ref->release();
} else {
m_wait_ctx.release();
}
}

// Add ability to create new behaviors by overriding logic of this method
// e.g., add invocation of user's callback once counter reached 0 (formally continuation)
virtual void add_reference(std::int64_t delta) {
std::uint64_t r = m_ref_count.fetch_add(static_cast<std::uint64_t>(delta)) + static_cast<std::uint64_t>(delta);

__TBB_ASSERT_EX((r & overflow_mask) == 0, "Overflow is detected");

if (!r) {
release_parent();

auto allocator = m_allocator;
this->~distributed_reference_counter();
allocator.deallocate(this);
}
}

std::uint64_t m_version_and_traits{};

static constexpr std::uint64_t overflow_mask = ~((1LLU << 32) - 1);
std::atomic<std::uint64_t> m_ref_count;
distributed_reference_counter* m_parent_ref{nullptr};
wait_context& m_wait_ctx;
small_object_allocator m_allocator;
};

struct execution_data {
task_group_context* context{};
slot_id original_slot{};
Expand Down Expand Up @@ -199,6 +262,7 @@ inline void wait(wait_context& wait_ctx, task_group_context& ctx) {
call_itt_task_notify(destroy, &wait_ctx);
}

using r1::current_task;
using r1::current_context;

class task_traits {
Expand Down
77 changes: 57 additions & 20 deletions include/oneapi/tbb/task_group.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2023 Intel Corporation
Copyright (c) 2005-2024 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -431,23 +431,53 @@ class structured_task_group;
class isolated_task_group;
#endif

template<typename F>
class function_task : public task {
const F m_func;
class base_task_group_task : public task {
public:
base_task_group_task(wait_context& wo, small_object_allocator& alloc, distributed_reference_counter* p = nullptr)
: m_wait_ctx(wo), m_allocator(alloc), m_parent(p)
{}

distributed_reference_counter* get_ref_counter() {
if (m_ref_counter == nullptr) {
small_object_allocator alloc{};
kboyarinov marked this conversation as resolved.
Show resolved Hide resolved
// Original task holds implicit reference to ensure the reference_counter would not be destroyed
// after completing one or several nested stolen tasks.
m_ref_counter = alloc.new_object<distributed_reference_counter>(1, m_parent, m_wait_ctx, alloc);
m_parent = m_ref_counter;
}
return m_ref_counter;
}

bool is_same_task_group(wait_context* wo) {
return &m_wait_ctx == wo;
}
protected:
wait_context& m_wait_ctx;
small_object_allocator m_allocator;
distributed_reference_counter* m_parent{nullptr};
distributed_reference_counter* m_ref_counter{nullptr};
};

template<typename F>
class function_task : public base_task_group_task {
const F m_func;

void finalize(const execution_data& ed) {
// Make a local reference not to access this after destruction.
wait_context& wo = m_wait_ctx;
// Copy allocator to the stack
auto allocator = m_allocator;
// Destroy user functor before release wait.
wait_context& wo = m_wait_ctx;
auto parent = m_parent;

this->~function_task();
wo.release();

if (parent) {
parent->release();
} else {
wo.release();
}

allocator.deallocate(this, ed);
}

task* execute(execution_data& ed) override {
task* res = d2::task_ptr_or_nullptr(m_func);
finalize(ed);
Expand All @@ -458,15 +488,13 @@ class function_task : public task {
return nullptr;
}
public:
function_task(const F& f, wait_context& wo, small_object_allocator& alloc)
: m_func(f)
, m_wait_ctx(wo)
, m_allocator(alloc) {}

function_task(F&& f, wait_context& wo, small_object_allocator& alloc)
: m_func(std::move(f))
, m_wait_ctx(wo)
, m_allocator(alloc) {}
function_task(const F& f, wait_context& wo, small_object_allocator& alloc, distributed_reference_counter* p = nullptr)
: base_task_group_task(wo, alloc, p), m_func(f)
{}

function_task(F&& f, wait_context& wo, small_object_allocator& alloc, distributed_reference_counter* p = nullptr)
: base_task_group_task(wo, alloc, p), m_func(std::move(f))
{}
};

template <typename F>
Expand Down Expand Up @@ -529,9 +557,18 @@ class task_group_base : no_copy {

template<typename F>
task* prepare_task(F&& f) {
m_wait_ctx.reserve();
base_task_group_task* parent_task = dynamic_cast<base_task_group_task*>(current_task());
distributed_reference_counter* ref_counter = nullptr;

if (parent_task && parent_task->is_same_task_group(&m_wait_ctx)) {
ref_counter = parent_task->get_ref_counter();
ref_counter->reserve();
} else {
m_wait_ctx.reserve();
}

small_object_allocator alloc{};
return alloc.new_object<function_task<typename std::decay<F>::type>>(std::forward<F>(f), m_wait_ctx, alloc);
return alloc.new_object<function_task<typename std::decay<F>::type>>(std::forward<F>(f), m_wait_ctx, alloc, ref_counter);
}

task_group_context& context() noexcept {
Expand Down
3 changes: 2 additions & 1 deletion src/tbb/def/lin32-tbb.def
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2021 Intel Corporation
Copyright (c) 2005-2024 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -77,6 +77,7 @@ _ZN3tbb6detail2r17suspendEPFvPvPNS1_18suspend_point_typeEES2_;
_ZN3tbb6detail2r16resumeEPNS1_18suspend_point_typeE;
_ZN3tbb6detail2r121current_suspend_pointEv;
_ZN3tbb6detail2r114notify_waitersEj;
_ZN3tbb6detail2r112current_taskEv;

/* Task dispatcher (task_dispatcher.cpp) */
_ZN3tbb6detail2r114execution_slotEPKNS0_2d114execution_dataE;
Expand Down
3 changes: 2 additions & 1 deletion src/tbb/def/lin64-tbb.def
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2021 Intel Corporation
Copyright (c) 2005-2024 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -77,6 +77,7 @@ _ZN3tbb6detail2r17suspendEPFvPvPNS1_18suspend_point_typeEES2_;
_ZN3tbb6detail2r16resumeEPNS1_18suspend_point_typeE;
_ZN3tbb6detail2r121current_suspend_pointEv;
_ZN3tbb6detail2r114notify_waitersEm;
_ZN3tbb6detail2r112current_taskEv;

/* Task dispatcher (task_dispatcher.cpp) */
_ZN3tbb6detail2r114execution_slotEPKNS0_2d114execution_dataE;
Expand Down
3 changes: 2 additions & 1 deletion src/tbb/def/mac64-tbb.def
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2005-2021 Intel Corporation
# Copyright (c) 2005-2024 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -79,6 +79,7 @@ __ZN3tbb6detail2r17suspendEPFvPvPNS1_18suspend_point_typeEES2_
__ZN3tbb6detail2r16resumeEPNS1_18suspend_point_typeE
__ZN3tbb6detail2r121current_suspend_pointEv
__ZN3tbb6detail2r114notify_waitersEm
__ZN3tbb6detail2r112current_taskEv

# Task dispatcher (task_dispatcher.cpp)
__ZN3tbb6detail2r114execution_slotEPKNS0_2d114execution_dataE
Expand Down
3 changes: 2 additions & 1 deletion src/tbb/def/win32-tbb.def
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
; Copyright (c) 2005-2021 Intel Corporation
; Copyright (c) 2005-2024 Intel Corporation
;
; Licensed under the Apache License, Version 2.0 (the "License");
; you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -71,6 +71,7 @@ EXPORTS
?resume@r1@detail@tbb@@YAXPAUsuspend_point_type@123@@Z
?suspend@r1@detail@tbb@@YAXP6AXPAXPAUsuspend_point_type@123@@Z0@Z
?notify_waiters@r1@detail@tbb@@YAXI@Z
?current_task@r1@detail@tbb@@YAPAVtask@d1@23@XZ

; Task dispatcher (task_dispatcher.cpp)
?spawn@r1@detail@tbb@@YAXAAVtask@d1@23@AAVtask_group_context@523@G@Z
Expand Down
3 changes: 2 additions & 1 deletion src/tbb/def/win64-tbb.def
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
; Copyright (c) 2005-2021 Intel Corporation
; Copyright (c) 2005-2024 Intel Corporation
;
; Licensed under the Apache License, Version 2.0 (the "License");
; you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -71,6 +71,7 @@ EXPORTS
?resume@r1@detail@tbb@@YAXPEAUsuspend_point_type@123@@Z
?current_suspend_point@r1@detail@tbb@@YAPEAUsuspend_point_type@123@XZ
?notify_waiters@r1@detail@tbb@@YAX_K@Z
?current_task@r1@detail@tbb@@YAPEAVtask@d1@23@XZ

; Task dispatcher (task_dispatcher.cpp)
?spawn@r1@detail@tbb@@YAXAEAVtask@d1@23@AEAVtask_group_context@523@@Z
Expand Down
5 changes: 4 additions & 1 deletion src/tbb/scheduler_common.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2023 Intel Corporation
Copyright (c) 2005-2024 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -479,6 +479,9 @@ class alignas (max_nfs_size) task_dispatcher {
//! Suspend point (null if this task dispatcher has been never suspended)
suspend_point_type* m_suspend_point{ nullptr };

//! Innermost task whose task::execute() is running. A nullptr on the outermost level.
d1::task* m_innermost_running_task{ nullptr };

//! Attempt to get a task from the mailbox.
/** Gets a task only if it has not been executed by its sender or a thief
that has stolen it from the sender's task pool. Otherwise returns nullptr.
Expand Down
3 changes: 2 additions & 1 deletion src/tbb/small_object_pool.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2020-2021 Intel Corporation
Copyright (c) 2020-2024 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -40,6 +40,7 @@ void* __TBB_EXPORTED_FUNC allocate(d1::small_object_pool*& allocator, std::size_
// TODO: optimize if the allocator contains a valid pool.
auto tls = governor::get_thread_data();
auto pool = tls->my_small_object_pool;
__TBB_ASSERT(allocator == nullptr || pool == allocator, "An attempt was made to allocate using another thread's small memory pool");
return pool->allocate_impl(allocator, number_of_bytes);
}

Expand Down
6 changes: 5 additions & 1 deletion src/tbb/task.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2023 Intel Corporation
Copyright (c) 2005-2024 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -221,6 +221,10 @@ void notify_waiters(std::uintptr_t wait_ctx_addr) {
governor::get_thread_data()->my_arena->get_waiting_threads_monitor().notify(is_related_wait_ctx);
}

d1::task* current_task() {
return governor::get_thread_data()->get_current_task();
}

} // namespace r1
} // namespace detail
} // namespace tbb
Expand Down
7 changes: 5 additions & 2 deletions src/tbb/task_dispatcher.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2020-2023 Intel Corporation
Copyright (c) 2020-2024 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -249,15 +249,17 @@ d1::task* task_dispatcher::local_wait_for_all(d1::task* t, Waiter& waiter ) {
task_dispatcher& task_disp;
execution_data_ext old_execute_data_ext;
properties old_properties;
d1::task* old_innermost_running_task;

~dispatch_loop_guard() {
task_disp.m_execute_data_ext = old_execute_data_ext;
task_disp.m_properties = old_properties;
task_disp.m_innermost_running_task = old_innermost_running_task;

__TBB_ASSERT(task_disp.m_thread_data && governor::is_thread_data_set(task_disp.m_thread_data), nullptr);
__TBB_ASSERT(task_disp.m_thread_data->my_task_dispatcher == &task_disp, nullptr);
}
} dl_guard{ *this, m_execute_data_ext, m_properties };
} dl_guard{ *this, m_execute_data_ext, m_properties, m_innermost_running_task };

// The context guard to track fp setting and itt tasks.
context_guard_helper</*report_tasks=*/ITTPossible> context_guard;
Expand Down Expand Up @@ -317,6 +319,7 @@ d1::task* task_dispatcher::local_wait_for_all(d1::task* t, Waiter& waiter ) {

ITT_CALLEE_ENTER(ITTPossible, t, itt_caller);

m_innermost_running_task = t;
if (ed.context->is_group_execution_cancelled()) {
t = t->cancel(ed);
} else {
Expand Down
8 changes: 7 additions & 1 deletion src/tbb/thread_data.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2020-2023 Intel Corporation
Copyright (c) 2020-2024 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -139,6 +139,8 @@ class thread_data : public ::rml::job
void leave_task_dispatcher();
void propagate_task_group_state(std::atomic<uint32_t> d1::task_group_context::* mptr_state, d1::task_group_context& src, uint32_t new_state);

d1::task* get_current_task();

//! Index of the arena slot the scheduler occupies now, or occupied last time
unsigned short my_arena_index;

Expand Down Expand Up @@ -251,6 +253,10 @@ inline void thread_data::propagate_task_group_state(std::atomic<std::uint32_t> d
my_context_list->epoch.store(the_context_state_propagation_epoch.load(std::memory_order_relaxed), std::memory_order_release);
}

inline d1::task* thread_data::get_current_task() {
return my_task_dispatcher->m_innermost_running_task;
}

} // namespace r1
} // namespace detail
} // namespace tbb
Expand Down
Loading
Loading