Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ namespace hpx::threads::policies {
// The background_scheduler_scheduler runs only background work
HPX_CXX_CORE_EXPORT template <typename Mutex = std::mutex,
typename PendingQueuing = lockfree_fifo,
typename StagedQueuing = lockfree_fifo,
typename StagedQueuing = concurrentqueue_fifo,
typename TerminatedQueuing =
default_background_scheduler_terminated_queue>
class background_scheduler final
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ namespace hpx::threads::policies {
/// whenever no other work is available.
HPX_CXX_CORE_EXPORT template <typename Mutex = std::mutex,
typename PendingQueuing = lockfree_fifo,
typename StagedQueuing = lockfree_fifo,
typename StagedQueuing = concurrentqueue_fifo,
typename TerminatedQueuing =
default_local_priority_queue_scheduler_terminated_queue>
class local_priority_queue_scheduler : public scheduler_base
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ namespace hpx::threads::policies {
/// from.
HPX_CXX_CORE_EXPORT template <typename Mutex = std::mutex,
typename PendingQueuing = lockfree_fifo,
typename StagedQueuing = lockfree_fifo,
typename StagedQueuing = concurrentqueue_fifo,
typename TerminatedQueuing =
default_local_queue_scheduler_terminated_queue>
class local_queue_scheduler : public scheduler_base
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ namespace hpx::threads::policies {
// from.
HPX_CXX_CORE_EXPORT template <typename Mutex = std::mutex,
typename PendingQueuing = lockfree_fifo,
typename StagedQueuing = lockfree_fifo,
typename StagedQueuing = concurrentqueue_fifo,
typename TerminatedQueuing =
default_local_workrequesting_scheduler_terminated_queue>
class local_workrequesting_scheduler final : public scheduler_base
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ namespace hpx::threads::policies {
// any work stealing.
HPX_CXX_CORE_EXPORT template <typename Mutex = std::mutex,
typename PendingQueuing = lockfree_fifo,
typename StagedQueuing = lockfree_fifo,
typename StagedQueuing = concurrentqueue_fifo,
typename TerminatedQueuing =
default_static_priority_queue_scheduler_terminated_queue>
class static_priority_queue_scheduler final
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ namespace hpx::threads::policies {
/// from.
HPX_CXX_CORE_EXPORT template <typename Mutex = std::mutex,
typename PendingQueuing = lockfree_fifo,
typename StagedQueuing = lockfree_fifo,
typename StagedQueuing = concurrentqueue_fifo,
typename TerminatedQueuing =
default_static_queue_scheduler_terminated_queue>
class static_queue_scheduler final
Expand Down
37 changes: 11 additions & 26 deletions libs/core/schedulers/include/hpx/schedulers/thread_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ namespace hpx::threads::policies {
thread_description_ptr>::type;

using task_items_type =
typename StagedQueuing::template apply<task_description*>::type;
typename StagedQueuing::template apply<task_description>::type;

using terminated_items_type =
typename TerminatedQueuing::template apply<thread_data*>::type;
Expand Down Expand Up @@ -203,9 +203,6 @@ namespace hpx::threads::policies {
}
}

static util::internal_allocator<task_description>
task_description_alloc_;

///////////////////////////////////////////////////////////////////////
// add new threads if there is some amount of work available
std::size_t add_new(std::int64_t add_count, thread_queue* addfrom,
Expand All @@ -219,30 +216,27 @@ namespace hpx::threads::policies {
}

std::size_t added = 0;
task_description* task = nullptr;
task_description task;
while (add_count-- && addfrom->new_tasks_.pop(task, steal))
{
#ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME
if (get_maintain_queue_wait_times_enabled())
{
addfrom->new_tasks_wait_ +=
hpx::chrono::high_resolution_clock::now() -
task->waittime;
task.waittime;
++addfrom->new_tasks_wait_count_;
}
#endif
// create the new thread
threads::thread_init_data& data = task->data;
threads::thread_init_data& data = task.data;

[[maybe_unused]] bool const schedule_now =
data.initial_state == thread_schedule_state::pending;

threads::thread_id_ref_type thrd;
create_thread_object(thrd, data, lk);

std::destroy_at(task);
task_description_alloc_.deallocate(task, 1);

// add the new entry to the map of all threads
std::pair<thread_map_type::iterator, bool> const p =
thread_map_.emplace(thrd.noref());
Expand Down Expand Up @@ -773,14 +767,12 @@ namespace hpx::threads::policies {
// later thread creation
++new_tasks_count_.data_;

task_description* td = task_description_alloc_.allocate(1);
#ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME
new (td) task_description{
HPX_MOVE(data), hpx::chrono::high_resolution_clock::now()};
new_tasks_.push(task_description{
HPX_MOVE(data), hpx::chrono::high_resolution_clock::now()});
#else
new (td) task_description{HPX_MOVE(data)}; //-V106
new_tasks_.push(task_description{HPX_MOVE(data)}); //-V106
#endif
new_tasks_.push(td);
if (&ec != &throws)
ec = make_success_code();
}
Expand Down Expand Up @@ -812,17 +804,17 @@ namespace hpx::threads::policies {

void move_task_items_from(thread_queue* src, std::int64_t count)
{
task_description* task = nullptr;
task_description task;
while (src->new_tasks_.pop(task))
{
#ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME
if (get_maintain_queue_wait_times_enabled())
{
std::int64_t now =
hpx::chrono::high_resolution_clock::now();
src->new_tasks_wait_ += now - task->waittime;
src->new_tasks_wait_ += now - task.waittime;
++src->new_tasks_wait_count_;
task->waittime = now;
task.waittime = now;
}
#endif

Expand All @@ -832,7 +824,7 @@ namespace hpx::threads::policies {
// been incremented
--src->new_tasks_count_.data_;

if (new_tasks_.push(task))
if (new_tasks_.push(HPX_MOVE(task)))
{
if (finish)
break;
Expand Down Expand Up @@ -1345,13 +1337,6 @@ namespace hpx::threads::policies {
util::cache_line_data<std::atomic<std::int64_t>> work_items_count_;
};

///////////////////////////////////////////////////////////////////////////
template <typename Mutex, typename PendingQueuing, typename StagedQueuing,
typename TerminatedQueuing>
util::internal_allocator<typename thread_queue<Mutex, PendingQueuing,
StagedQueuing, TerminatedQueuing>::task_description>
thread_queue<Mutex, PendingQueuing, StagedQueuing,
TerminatedQueuing>::task_description_alloc_;
} // namespace hpx::threads::policies

#include <hpx/config/warnings_suffix.hpp>
Loading