15 #include "spdlog/fmt/ostr.h"
16 #include "spdlog/spdlog.h"
17 #include <parcer/Communicator.h>
25 namespace SamplingAlgorithms {
43 std::shared_ptr<muq::Utilities::OTF2TracerBase>
tracer = std::make_shared<muq::Utilities::OTF2TracerDummy>())
59 for (
auto &indexWorkerListPair :
phonebook ) {
60 std::shared_ptr<MultiIndex> index = indexWorkerListPair.first;
61 WorkerList& workerList = indexWorkerListPair.second;
79 spdlog::debug(
"Timer triggered for {}, idle fraction {}, {} workers on that model, work run out {}", *indexWorkerListPair.first, workerList.
GetIdleFraction(), workerList.
NumWorkers(), work_run_out);
85 double largest_others_load = .0;
86 std::shared_ptr<MultiIndex> most_loaded_index =
nullptr;
87 for (
auto &indexWorkerListPair :
phonebook ) {
88 std::shared_ptr<MultiIndex> index = indexWorkerListPair.first;
89 WorkerList& workerList = indexWorkerListPair.second;
91 spdlog::debug(
"Load on model {}: {}", *index, others_load);
92 if (others_load > largest_others_load) {
93 largest_others_load = others_load;
94 most_loaded_index = index;
97 assert (most_loaded_index !=
nullptr);
99 if (*index == *most_loaded_index)
102 if (work_run_out || my_load + .5 / (
double)workerList.
NumWorkers() + .01 < largest_others_load) {
104 spdlog::debug(
"Reassigning from model {} to model {}, {} ready here", *index, *most_loaded_index, workerList.
NumWorkersReady());
106 const int RootNode = 0;
128 for (
auto &indexWorkerListPair :
phonebook ) {
129 WorkerList& workerList = indexWorkerListPair.second;
138 auto requested_sample_index = std::make_shared<MultiIndex>(
comm->Recv<MultiIndex>(status.MPI_SOURCE,
ControlTag));
139 auto request_source_index = std::make_shared<MultiIndex>(
comm->Recv<MultiIndex>(status.MPI_SOURCE,
ControlTag));
140 bool high_priority =
comm->Recv<
bool>(status.MPI_SOURCE,
ControlTag);
150 auto index = std::make_shared<MultiIndex>(
comm->Recv<MultiIndex>(status.MPI_SOURCE,
ControlTag));
151 int rank =
comm->Recv<
int>(status.MPI_SOURCE,
ControlTag, &status);
158 spdlog::trace(
"Phonebook entry for {} set", *index);
160 auto index = std::make_shared<MultiIndex>(
comm->Recv<MultiIndex>(status.MPI_SOURCE,
ControlTag));
161 int rank =
comm->Recv<
int>(status.MPI_SOURCE,
ControlTag, &status);
165 auto index = std::make_shared<MultiIndex>(
comm->Recv<MultiIndex>(status.MPI_SOURCE,
ControlTag));
168 std::cerr <<
"getting workers for nonexistent model!" << std::endl;
170 spdlog::trace(
"Getting workers from phonebook map");
171 std::vector<int> sendvec =
phonebook[index].GetWorkers();
172 spdlog::trace(
"Sending {} workgroups", sendvec.size());
177 spdlog::trace(
"Sent empty largest index");
180 spdlog::trace(
"Sent largest index {} unset", *
phonebook.rbegin()->first);
184 auto index = std::make_shared<MultiIndex>(
comm->Recv<MultiIndex>(status.MPI_SOURCE,
ControlTag));
185 int rank =
comm->Recv<
int>(status.MPI_SOURCE,
ControlTag, &status);
187 std::cerr <<
"setting ready for nonexistent model!" << std::endl;
193 spdlog::trace(
"Rank {} quit",
comm->GetRank());
197 for (
auto request_iter =
requests.begin(); request_iter !=
requests.end();) {
198 if (!
phonebook.count(request_iter->requestedSampleIndex)) {
199 std::cerr <<
"checking request for nonexistent model!" << std::endl;
201 int workerRank =
phonebook[request_iter->requestedSampleIndex].NextWorker();
202 if (workerRank == -1) {
206 request_iter =
requests.erase(request_iter);
241 void UnRegister(std::shared_ptr<MultiIndex> modelIndex,
int rank) {
245 phonebook[modelIndex].RemoveWorker(rank);
246 if (
phonebook[modelIndex].NumWorkers() == 0) {
247 spdlog::debug(
"Phonebook erasing modelIndex", *modelIndex);
251 spdlog::trace(
"Phonebook entry for {} unset", *modelIndex);
282 spdlog::trace(
"Removed worker {}, {} remaining", worker,
NumWorkers());
304 if (
total_time == std::chrono::nanoseconds::zero())
315 idle_time = std::chrono::nanoseconds::zero();
316 total_time = std::chrono::nanoseconds::zero();
317 begin_tick = std::chrono::high_resolution_clock::now();
321 std::chrono::high_resolution_clock::time_point now = std::chrono::high_resolution_clock::now();
323 std::chrono::nanoseconds measurement_period = now -
last_tick;
345 for (
auto request_iter =
requests.begin(); request_iter !=
requests.end(); request_iter++) {
346 if (*index == *(request_iter->requestedSampleIndex))
347 in_queue += request_iter->highPriority ? 1.0 : .1;
353 int numQueuedTasks = 0;
355 if (request.highPriority)
356 numQueuedTasks += *(request.sourceModelIndex) == *index;
358 return numQueuedTasks;
381 std::shared_ptr<parcer::Communicator>
comm;
383 std::shared_ptr<muq::Utilities::OTF2TracerBase>
tracer;
402 int Query(std::shared_ptr<MultiIndex> remoteIndex, std::shared_ptr<MultiIndex> sourceIndex,
bool high_priority) {
411 spdlog::debug(
"GetWorkgroups call for model {}", *modelIndex);
414 spdlog::debug(
"GetWorkgroups call for model {}, retrieving", *modelIndex);
416 spdlog::debug(
"GetWorkgroups call for model {}, returning", *modelIndex);
425 void Register(std::shared_ptr<MultiIndex> modelIndex,
int rank) {
431 void UnRegister(std::shared_ptr<MultiIndex> modelIndex,
int rank) {
453 std::shared_ptr<parcer::Communicator>
comm;
High-level wrapper for communicating with the phonebook process.
void SetWorkerReady(std::shared_ptr< MultiIndex > modelIndex, int rank)
PhonebookClient(std::shared_ptr< parcer::Communicator > comm, int phonebookRank)
void UnRegister(std::shared_ptr< MultiIndex > modelIndex, int rank)
std::vector< int > GetWorkgroups(std::shared_ptr< MultiIndex > modelIndex)
std::shared_ptr< MultiIndex > LargestIndex()
void Register(std::shared_ptr< MultiIndex > modelIndex, int rank)
std::shared_ptr< parcer::Communicator > comm
int Query(std::shared_ptr< MultiIndex > remoteIndex, std::shared_ptr< MultiIndex > sourceIndex, bool high_priority)
void SetWorkerReady(int worker)
std::deque< int > GetWorkersReady()
std::chrono::high_resolution_clock::time_point last_tick
std::chrono::high_resolution_clock::time_point begin_tick
void RemoveWorker(int worker)
int registeredReadyCounter
void AddWorker(int worker)
std::chrono::nanoseconds idle_time
std::deque< int > workers_ready
std::chrono::nanoseconds total_time
int NumWorkersReady() const
std::vector< int > workers
double NormalizedRegisteredReadyCounter()
std::vector< int > GetWorkers()
Phonebook implementation facilitating communication between different roles in a parallel MIMCMC type...
int getNumTasksQueuedTasksFromIndex(std::shared_ptr< MultiIndex > index)
std::shared_ptr< muq::Utilities::OTF2TracerBase > tracer
std::map< std::shared_ptr< MultiIndex >, WorkerList, MultiPtrComp > phonebook
bool rescheduling_in_progress
double getLoadFactor(std::shared_ptr< MultiIndex > index, WorkerList &worker_list)
PhonebookServer(std::shared_ptr< parcer::Communicator > comm, bool scheduling_active=true, std::shared_ptr< muq::Utilities::OTF2TracerBase > tracer=std::make_shared< muq::Utilities::OTF2TracerDummy >())
std::deque< SampleRequest > requests
double getNumQueuedTasksForIndex(std::shared_ptr< MultiIndex > index, WorkerList &worker_list)
std::shared_ptr< parcer::Communicator > comm
void UnRegister(std::shared_ptr< MultiIndex > modelIndex, int rank)
ControlFlag
Flags used by parallel MCMC/MIMCMC type methods.
const int ControlTag
Tags separating MPI communication between control level processes and internal work group communicati...
std::shared_ptr< MultiIndex > sourceModelIndex
std::shared_ptr< MultiIndex > requestedSampleIndex