MUQ  0.4.3
Phonebook.h
Go to the documentation of this file.
1 #ifndef PHONEBOOK_H_
2 #define PHONEBOOK_H_
3 
4 #include "MUQ/config.h"
5 
6 #if MUQ_HAS_MPI
7 
8 
9 #include <stdlib.h>
10 #include <stdio.h>
11 #include <inttypes.h>
12 #include <string>
13 #include <map>
14 #include <mpi.h>
15 #include "spdlog/fmt/ostr.h"
16 #include "spdlog/spdlog.h"
17 #include <parcer/Communicator.h>
20 #include <deque>
21 #include <chrono>
23 
24 namespace muq {
25  namespace SamplingAlgorithms {
26 
39  public:
40 
41  PhonebookServer(std::shared_ptr<parcer::Communicator> comm,
42  bool scheduling_active = true,
43  std::shared_ptr<muq::Utilities::OTF2TracerBase> tracer = std::make_shared<muq::Utilities::OTF2TracerDummy>())
45  {
46  }
47 
48  void Run() {
49  //Dune::Timer timer_idle;
50  //Dune::Timer timer_full;
51 
52 
53  while (true) {
54  MPI_Status status;
55  //timer_idle.start();
56 
57 
59  for ( auto &indexWorkerListPair : phonebook ) {
60  std::shared_ptr<MultiIndex> index = indexWorkerListPair.first;
61  WorkerList& workerList = indexWorkerListPair.second;
62  bool work_run_out = workerList.NumWorkersReady() == workerList.NumWorkers() && getNumQueuedTasksForIndex(index, workerList) == 0.0;
63 
64  if (workerList.NormalizedRegisteredReadyCounter() > .5 || work_run_out || workerList.recheck) {
65 
66  if (workerList.NormalizedRegisteredReadyCounter() > .5) {
67  workerList.ResetTimer();
68  }
69 
70  if (workerList.NumWorkers() <= 1) // Only reschedule if we have more than 1 worker on this model
71  continue;
72 
73  if (workerList.NumWorkersReady() == 0) {
74  workerList.recheck = true;
75  continue;
76  }
77  workerList.recheck = false;
78 
79  spdlog::debug("Timer triggered for {}, idle fraction {}, {} workers on that model, work run out {}", *indexWorkerListPair.first, workerList.GetIdleFraction(), workerList.NumWorkers(), work_run_out);
80 
81  //if (index->GetValue(0) == 1) // FIXME: Fix rescheduling model 1
82  // continue;
83 
84  double my_load = getLoadFactor(index, workerList);
85  double largest_others_load = .0;
86  std::shared_ptr<MultiIndex> most_loaded_index = nullptr;
87  for ( auto &indexWorkerListPair : phonebook ) {
88  std::shared_ptr<MultiIndex> index = indexWorkerListPair.first;
89  WorkerList& workerList = indexWorkerListPair.second;
90  double others_load = getLoadFactor(index, workerList);
91  spdlog::debug("Load on model {}: {}", *index, others_load);
92  if (others_load > largest_others_load) {
93  largest_others_load = others_load;
94  most_loaded_index = index;
95  }
96  }
97  assert (most_loaded_index != nullptr);
98 
99  if (*index == *most_loaded_index)
100  continue;
101 
102  if (work_run_out || my_load + .5 / (double)workerList.NumWorkers() + .01 < largest_others_load) {
103 
104  spdlog::debug("Reassigning from model {} to model {}, {} ready here", *index, *most_loaded_index, workerList.NumWorkersReady());
105 
106  const int RootNode = 0; // Send to root
107 
108  int rescheduleRank = workerList.GetWorkersReady()[0];
109  UnRegister(index, rescheduleRank); // Already unregister this rank so it won't be offered for sampling anymore while being rescheduled!
110 
112  comm->Send(*index, RootNode, ControlTag);
113  comm->Send(rescheduleRank, RootNode, ControlTag);
114  comm->Send(*most_loaded_index, RootNode, ControlTag);
115 
117  break; // Phonebook needs to be ready for further communication after rescheduling a process, so don't reschedule another one right now
118  }
119 
120  }
121 
122  }
123  }
124 
125  ControlFlag command = comm->Recv<ControlFlag>(MPI_ANY_SOURCE, ControlTag, &status);
126  //timer_idle.stop();
127 
128  for ( auto &indexWorkerListPair : phonebook ) {
129  WorkerList& workerList = indexWorkerListPair.second;
130  workerList.tick();
131  }
132 
133 
134 
135  tracer->enterRegion(TracerRegions::PhonebookBusy);
136 
137  if (command == ControlFlag::GET_WORKGROUP) {
138  auto requested_sample_index = std::make_shared<MultiIndex>(comm->Recv<MultiIndex>(status.MPI_SOURCE, ControlTag));
139  auto request_source_index = std::make_shared<MultiIndex>(comm->Recv<MultiIndex>(status.MPI_SOURCE, ControlTag));
140  bool high_priority = comm->Recv<bool>(status.MPI_SOURCE, ControlTag);
141  if (high_priority)
142  requests.push_front(SampleRequest{.requestedSampleIndex = requested_sample_index, .sourceModelIndex = request_source_index, .sourceMPIRank = status.MPI_SOURCE, .highPriority = high_priority});
143  else
144  requests.push_front(SampleRequest{.requestedSampleIndex = requested_sample_index, .sourceModelIndex = request_source_index, .sourceMPIRank = status.MPI_SOURCE, .highPriority = high_priority});
145  } else if (command == ControlFlag::SCHEDULING_DONE) {
146  rescheduling_in_progress = false;
147  } else if (command == ControlFlag::SCHEDULING_STOP) {
149  } else if (command == ControlFlag::SET_WORKGROUP) {
150  auto index = std::make_shared<MultiIndex>(comm->Recv<MultiIndex>(status.MPI_SOURCE, ControlTag));
151  int rank = comm->Recv<int>(status.MPI_SOURCE, ControlTag, &status);
152 
153  if (!phonebook.count(index)) {
154  phonebook[index] = WorkerList();
155  }
156  phonebook[index].AddWorker(rank);
157  //comm->Send(ControlFlag::HANDSHAKE, status.MPI_SOURCE, ControlTag);
158  spdlog::trace("Phonebook entry for {} set", *index);
159  } else if (command == ControlFlag::UNSET_WORKGROUP) {
160  auto index = std::make_shared<MultiIndex>(comm->Recv<MultiIndex>(status.MPI_SOURCE, ControlTag));
161  int rank = comm->Recv<int>(status.MPI_SOURCE, ControlTag, &status);
162 
163  UnRegister(index, rank);
164  } else if (command == ControlFlag::GET_WORKGROUPS) {
165  auto index = std::make_shared<MultiIndex>(comm->Recv<MultiIndex>(status.MPI_SOURCE, ControlTag));
166 
167  if (!phonebook.count(index)) {
168  std::cerr << "getting workers for nonexistent model!" << std::endl;
169  }
170  spdlog::trace("Getting workers from phonebook map");
171  std::vector<int> sendvec = phonebook[index].GetWorkers();
172  spdlog::trace("Sending {} workgroups", sendvec.size());
173  comm->Send(sendvec, status.MPI_SOURCE, ControlTag);
174  } else if (command == ControlFlag::GET_LARGEST_INDEX) {
175  if (phonebook.empty()) {
176  comm->Send(-1, status.MPI_SOURCE, ControlTag);
177  spdlog::trace("Sent empty largest index");
178  } else {
179  comm->Send(*phonebook.rbegin()->first, status.MPI_SOURCE, ControlTag);
180  spdlog::trace("Sent largest index {} unset", *phonebook.rbegin()->first);
181  }
182  } else if (command == ControlFlag::SET_WORKER_READY) {
183 
184  auto index = std::make_shared<MultiIndex>(comm->Recv<MultiIndex>(status.MPI_SOURCE, ControlTag));
185  int rank = comm->Recv<int>(status.MPI_SOURCE, ControlTag, &status);
186  if (!phonebook.count(index)) {
187  std::cerr << "setting ready for nonexistent model!" << std::endl;
188  continue;
189  }
190  phonebook[index].SetWorkerReady(rank);
191  } else if (command == ControlFlag::QUIT) {
192  tracer->leaveRegion(TracerRegions::PhonebookBusy);
193  spdlog::trace("Rank {} quit", comm->GetRank());
194  break;
195  }
196 
197  for (auto request_iter = requests.begin(); request_iter != requests.end();) {
198  if (!phonebook.count(request_iter->requestedSampleIndex)) {
199  std::cerr << "checking request for nonexistent model!" << std::endl;
200  }
201  int workerRank = phonebook[request_iter->requestedSampleIndex].NextWorker();
202  if (workerRank == -1) {
203  request_iter++;
204  } else {
205  comm->Send(workerRank, request_iter->sourceMPIRank, ControlTag);
206  request_iter = requests.erase(request_iter);
207  }
208  }
209 
210 
211 
212  /*for (auto request_iter = requests.begin(); request_iter != requests.end();) {
213  std::shared_ptr<MultiIndex> index = std::get<0>(*request_iter);
214  int sender = std::get<1>(*request_iter);
215  bool high_priority = std::get<2>(*request_iter);
216  if (high_priority) {
217  request_iter++;
218  continue;
219  }
220 
221  if (!phonebook.count(index)) {
222  std::cerr << "checking request for nonexistent model!" << std::endl;
223  }
224  int workerRank = phonebook[index].NextWorker();
225  if (workerRank == -1) {
226  request_iter++;
227  } else {
228  comm->Send(workerRank, sender, ControlTag);
229  request_iter = requests.erase(request_iter);
230  }
231  }*/
232 
233  tracer->leaveRegion(TracerRegions::PhonebookBusy);
234  }
235  //std::cout << "Phonebook " << comm->GetRank() << " idle time:\t" << timer_idle.elapsed() << " of:\t" << timer_full.elapsed() << std::endl;
236 
237  }
238 
239  private:
240 
241  void UnRegister(std::shared_ptr<MultiIndex> modelIndex, int rank) {
242  /*if (!phonebook.count(modelIndex)) {
243  std::cerr << "unsetting nonexistent entry!" << std::endl;
244  }*/
245  phonebook[modelIndex].RemoveWorker(rank);
246  if (phonebook[modelIndex].NumWorkers() == 0) {
247  spdlog::debug("Phonebook erasing modelIndex", *modelIndex);
248  phonebook.erase(modelIndex);
249  }
250  //comm->Send(ControlFlag::HANDSHAKE, status.MPI_SOURCE, ControlTag);
251  spdlog::trace("Phonebook entry for {} unset", *modelIndex);
252  }
253 
254  class WorkerList {
255  public:
256 
258  ResetTimer();
259  }
260 
261  int NextWorker() {
262  //assert(workers.size() > 0);
263  if (workers.size() == 0)
264  return -1;
265 
266  if (workers_ready.size() == 0)
267  return -1;
268 
269  //tick();
270  int worker = workers_ready.front();
271  workers_ready.pop_front();
272  return worker;
273  }
274  void AddWorker(int worker) {
275  //tick();
276  workers.push_back(worker);
277  }
278  void RemoveWorker(int worker) {
279  //tick();
280  workers_ready.erase(std::remove(workers_ready.begin(), workers_ready.end(), worker), workers_ready.end());
281  workers.erase(std::remove(workers.begin(), workers.end(), worker), workers.end());
282  spdlog::trace("Removed worker {}, {} remaining", worker, NumWorkers());
283  }
284  int NumWorkers() const {
285  return workers.size();
286  }
287  int NumWorkersReady() const {
288  return workers_ready.size();
289  }
290  void SetWorkerReady(int worker) {
291  //tick();
293 
294  workers_ready.push_back(worker);
295  }
296  std::vector<int> GetWorkers() {
297  return workers;
298  }
299  std::deque<int> GetWorkersReady() {
300  return workers_ready;
301  }
302 
303  double GetIdleFraction() {
304  if (total_time == std::chrono::nanoseconds::zero())
305  return .0;
306  return ((double)idle_time.count()) / (double)(total_time.count());
307  }
309  if (workers.size() == 0)
310  return -1;
311  return (double)registeredReadyCounter / (double)workers.size();
312  }
313  void ResetTimer() {
315  idle_time = std::chrono::nanoseconds::zero();
316  total_time = std::chrono::nanoseconds::zero();
317  begin_tick = std::chrono::high_resolution_clock::now();
319  }
320  void tick() {
321  std::chrono::high_resolution_clock::time_point now = std::chrono::high_resolution_clock::now();
322 
323  std::chrono::nanoseconds measurement_period = now - last_tick;
324  idle_time += measurement_period * workers_ready.size();
325  total_time += measurement_period * workers.size();
326 
327  last_tick = now;
328  }
329  public:
330  bool recheck = false;
331 
332  private:
334  std::chrono::nanoseconds idle_time;
335  std::chrono::nanoseconds total_time;
336  std::chrono::high_resolution_clock::time_point begin_tick, last_tick;
337 
338  std::vector<int> workers;
339  std::deque<int> workers_ready;
340  };
341 
342 
343  double getNumQueuedTasksForIndex (std::shared_ptr<MultiIndex> index, WorkerList& worker_list) {
344  double in_queue = 0;
345  for (auto request_iter = requests.begin(); request_iter != requests.end(); request_iter++) {
346  if (*index == *(request_iter->requestedSampleIndex))
347  in_queue += request_iter->highPriority ? 1.0 : .1;
348  }
349  return in_queue;
350  }
351 
352  int getNumTasksQueuedTasksFromIndex (std::shared_ptr<MultiIndex> index) {
353  int numQueuedTasks = 0;
354  for (SampleRequest& request : requests) {
355  if (request.highPriority)
356  numQueuedTasks += *(request.sourceModelIndex) == *index;
357  }
358  return numQueuedTasks;
359  }
360 
361 
362  double getLoadFactor (std::shared_ptr<MultiIndex> index, WorkerList& worker_list) {
363  double load_factor = 1.0 - worker_list.GetIdleFraction() - (double)getNumTasksQueuedTasksFromIndex(index) / worker_list.NumWorkers();
364 
365  load_factor += getNumQueuedTasksForIndex(index, worker_list) / worker_list.NumWorkers();
366  return load_factor;
367  }
368 
369  struct SampleRequest {
370  std::shared_ptr<MultiIndex> requestedSampleIndex;
371  std::shared_ptr<MultiIndex> sourceModelIndex;
374  };
375 
376 
377  std::deque< SampleRequest > requests;
378 
379 
380  std::map<std::shared_ptr<MultiIndex>, WorkerList, MultiPtrComp> phonebook;
381  std::shared_ptr<parcer::Communicator> comm;
383  std::shared_ptr<muq::Utilities::OTF2TracerBase> tracer;
385  };
386 
394 
395  public:
396 
397  PhonebookClient(std::shared_ptr<parcer::Communicator> comm, int phonebookRank)
399  {
400  }
401 
402  int Query(std::shared_ptr<MultiIndex> remoteIndex, std::shared_ptr<MultiIndex> sourceIndex, bool high_priority) {
404  comm->Send<MultiIndex>(*remoteIndex, phonebookRank, ControlTag);
405  comm->Send<MultiIndex>(*sourceIndex, phonebookRank, ControlTag);
406  comm->Send(high_priority, phonebookRank, ControlTag);
407  return comm->Recv<int>(phonebookRank, ControlTag);
408  }
409 
410  std::vector<int> GetWorkgroups(std::shared_ptr<MultiIndex> modelIndex) {
411  spdlog::debug("GetWorkgroups call for model {}", *modelIndex);
413  comm->Send(*modelIndex, phonebookRank, ControlTag);
414  spdlog::debug("GetWorkgroups call for model {}, retrieving", *modelIndex);
415  std::vector<int> ret = comm->Recv<std::vector<int>>(phonebookRank, ControlTag);
416  spdlog::debug("GetWorkgroups call for model {}, returning", *modelIndex);
417  return ret;
418  }
419 
420  std::shared_ptr<MultiIndex> LargestIndex() {
422  return std::make_shared<MultiIndex>(comm->Recv<MultiIndex>(phonebookRank, ControlTag));
423  }
424 
425  void Register(std::shared_ptr<MultiIndex> modelIndex, int rank) {
427  comm->Send(*modelIndex, phonebookRank, ControlTag);
428  comm->Ssend(rank, phonebookRank, ControlTag);
429  }
430 
431  void UnRegister(std::shared_ptr<MultiIndex> modelIndex, int rank) {
433  comm->Send(*modelIndex, phonebookRank, ControlTag);
434  comm->Ssend(rank, phonebookRank, ControlTag);
435  //if (comm->Recv<ControlFlag>(phonebookRank, ControlTag) != ControlFlag::HANDSHAKE)
436  // std::cerr << "Failed handshake in UnRegister()!" << std::endl;
437  }
438 
439  void SetWorkerReady(std::shared_ptr<MultiIndex> modelIndex, int rank) {
441  comm->Send(*modelIndex, phonebookRank, ControlTag);
442  comm->Send(rank, phonebookRank, ControlTag);
443  }
444 
445  void SchedulingDone() {
447  }
448  void SchedulingStop() {
450  }
451 
452  private:
453  std::shared_ptr<parcer::Communicator> comm;
455  };
456  }
457 }
458 
459 #endif
460 
461 #endif
High-level wrapper for communicating with the phonebook process.
Definition: Phonebook.h:393
void SetWorkerReady(std::shared_ptr< MultiIndex > modelIndex, int rank)
Definition: Phonebook.h:439
PhonebookClient(std::shared_ptr< parcer::Communicator > comm, int phonebookRank)
Definition: Phonebook.h:397
void UnRegister(std::shared_ptr< MultiIndex > modelIndex, int rank)
Definition: Phonebook.h:431
std::vector< int > GetWorkgroups(std::shared_ptr< MultiIndex > modelIndex)
Definition: Phonebook.h:410
std::shared_ptr< MultiIndex > LargestIndex()
Definition: Phonebook.h:420
void Register(std::shared_ptr< MultiIndex > modelIndex, int rank)
Definition: Phonebook.h:425
std::shared_ptr< parcer::Communicator > comm
Definition: Phonebook.h:453
int Query(std::shared_ptr< MultiIndex > remoteIndex, std::shared_ptr< MultiIndex > sourceIndex, bool high_priority)
Definition: Phonebook.h:402
std::chrono::high_resolution_clock::time_point last_tick
Definition: Phonebook.h:336
std::chrono::high_resolution_clock::time_point begin_tick
Definition: Phonebook.h:336
Phonebook implementation facilitating communication between different roles in a parallel MIMCMC type...
Definition: Phonebook.h:38
int getNumTasksQueuedTasksFromIndex(std::shared_ptr< MultiIndex > index)
Definition: Phonebook.h:352
std::shared_ptr< muq::Utilities::OTF2TracerBase > tracer
Definition: Phonebook.h:383
std::map< std::shared_ptr< MultiIndex >, WorkerList, MultiPtrComp > phonebook
Definition: Phonebook.h:380
double getLoadFactor(std::shared_ptr< MultiIndex > index, WorkerList &worker_list)
Definition: Phonebook.h:362
PhonebookServer(std::shared_ptr< parcer::Communicator > comm, bool scheduling_active=true, std::shared_ptr< muq::Utilities::OTF2TracerBase > tracer=std::make_shared< muq::Utilities::OTF2TracerDummy >())
Definition: Phonebook.h:41
std::deque< SampleRequest > requests
Definition: Phonebook.h:377
double getNumQueuedTasksForIndex(std::shared_ptr< MultiIndex > index, WorkerList &worker_list)
Definition: Phonebook.h:343
std::shared_ptr< parcer::Communicator > comm
Definition: Phonebook.h:381
void UnRegister(std::shared_ptr< MultiIndex > modelIndex, int rank)
Definition: Phonebook.h:241
ControlFlag
Flags used by parallel MCMC/MIMCMC type methods.
Definition: ParallelFlags.h:23
const int ControlTag
Tags separating MPI communication between control level processes and internal work group communicati...
Definition: ParallelFlags.h:15
std::shared_ptr< MultiIndex > requestedSampleIndex
Definition: Phonebook.h:370