MUQ  0.4.3
ParallelMIMCMCWorker.cpp
Go to the documentation of this file.
2 
3 #include <chrono>
4 #include <list>
5 #include <thread>
6 #include "spdlog/spdlog.h"
7 #include "spdlog/fmt/ostr.h"
16 
17 using namespace muq::SamplingAlgorithms;
18 
19 CollectorClient::CollectorClient(std::shared_ptr<parcer::Communicator> comm,
20  std::vector<int> subgroup,
21  std::shared_ptr<MultiIndex> modelindex)
22  : comm(comm), subgroup(subgroup), boxHighestIndex(modelindex)
23 {
24  for (int dest : subgroup) {
26  comm->Send(subgroup, dest, ControlTag);
27  comm->Send(*boxHighestIndex, dest, ControlTag);
28  }
29 
30  // Set up Multiindex box
31  boxLowestIndex = MultiIndex::Copy(boxHighestIndex);
32  --(*boxLowestIndex);
33 
34  std::shared_ptr<MultiIndex> boxSize = std::make_shared<MultiIndex>(*boxHighestIndex - *boxLowestIndex);
35  boxIndices = MultiIndexFactory::CreateFullTensor(boxSize->GetVector());
36 }
37 
38 std::shared_ptr<MultiIndex> CollectorClient::GetModelIndex() const
39 {
40  return MultiIndex::Copy(boxHighestIndex);
41 }
42 
43 
44 void CollectorClient::CollectSamples(int numSamples)
45 {
46  spdlog::debug("Kick off collection of {} samples", numSamples);
47  sampling = true;
48  int samplesAssigned = 0;
49 
50  for (int i = 0; i < subgroup.size(); i++) {
51  int dest = subgroup[i];
53  int assigning = (numSamples - samplesAssigned) / (subgroup.size() - i); // Ensure mostly even assignment of samples
54  comm->Send(assigning, dest, ControlTag);
55  samplesAssigned += assigning;
56  }
57  spdlog::debug("Collection kick off done");
58 }
59 
61 {
62  computingMeans = true;
63  for (int dest : subgroup) {
64  comm->Send(ControlFlag::MEANS, dest, ControlTag);
65  }
66 }
67 
68 void CollectorClient::WriteToFile(std::string filename)
69 {
70  for (int dest : subgroup) {
72  comm->Send(filename, dest, ControlTag);
73  bool sync = comm->Recv<bool>(dest, ControlTag);
74  }
75 }
76 
77 bool CollectorClient::Receive (ControlFlag command, const MPI_Status& status)
78 {
79  if (status.MPI_SOURCE != subgroup[0])
80  return false;
81 
82  if (command == ControlFlag::SAMPLE_BOX_DONE) {
83  sampling = false;
84  } else if (command == ControlFlag::MEANS_DONE) {
85 
86  for (uint i = 0; i < boxIndices->Size(); i++) {
87  std::shared_ptr<MultiIndex> boxIndex = (*boxIndices)[i];
88 
89  Eigen::VectorXd chainSampleMean = comm->Recv<Eigen::VectorXd>(status.MPI_SOURCE, ControlTag);
90  Eigen::VectorXd chainQOIMean = comm->Recv<Eigen::VectorXd>(status.MPI_SOURCE, ControlTag);
91 
92  std::shared_ptr<MultiIndex> index = std::make_shared<MultiIndex>(*boxLowestIndex + *boxIndex);
93  auto indexDiffFromTop = std::make_shared<MultiIndex>(*boxHighestIndex - *index);
94 
95  //std::cout << "Contribution of model " << *index << ": " << chainQOIMean << std::endl;
96  if (i == 0) {
97  if (indexDiffFromTop->Sum() % 2 == 0) {
98  boxQOIMean = chainQOIMean;
99  } else {
100  boxQOIMean = -chainQOIMean;
101  }
102  } else {
103  if (indexDiffFromTop->Sum() % 2 == 0) {
104  boxQOIMean += chainQOIMean;
105  } else {
106  boxQOIMean -= chainQOIMean;
107  }
108  }
109  }
110  computingMeans = false;
111  } else {
112  std::cerr << "Unexpected command!" << std::endl;
113  exit(43);
114  }
115 
116  return true;
117 }
118 
119 Eigen::VectorXd CollectorClient::GetQOIMean() {
120  return boxQOIMean;
121 }
122 
124  for (int dest : subgroup) {
125  comm->Send(ControlFlag::UNASSIGN, dest, ControlTag);
126  }
127 }
128 
130  return sampling;
131 }
132 
134  return computingMeans;
135 }
136 
137 
138 
139 WorkerClient::WorkerClient(std::shared_ptr<parcer::Communicator> comm,
140  std::shared_ptr<PhonebookClient> phonebookClient,
141  int RootRank) : comm(comm), phonebookClient(phonebookClient)
142 {
143 }
144 
145 
146 void WorkerClient::assignGroup (std::vector<int> subgroup, std::shared_ptr<MultiIndex> modelindex)
147 {
148  for (int dest : subgroup) {
149  comm->Send(ControlFlag::ASSIGN, dest, ControlTag);
150  comm->Send(subgroup, dest, ControlTag);
151  comm->Send(*modelindex, dest, ControlTag);
152  }
153  phonebookClient->Register(modelindex, subgroup[0]);
154 }
155 
156 std::vector<int> WorkerClient::UnassignGroup (std::shared_ptr<MultiIndex> modelIndex, int groupRootRank)
157 {
158  spdlog::trace("UnRegister {}", groupRootRank);
159  phonebookClient->UnRegister(modelIndex, groupRootRank);
160  spdlog::trace("Sending unassign to {}", groupRootRank);
161  comm->Ssend(ControlFlag::UNASSIGN, groupRootRank, ControlTag);
162  std::vector<int> groupMembers = comm->Recv<std::vector<int>>(groupRootRank, ControlTag);
163  return groupMembers;
164 }
165 
167 {
168  std::shared_ptr<MultiIndex> largest = nullptr;
169  do {
170  largest = phonebookClient->LargestIndex();
171  spdlog::trace("Unassigning model {}", *largest);
172  std::vector<int> ranks = phonebookClient->GetWorkgroups(largest);
173  for (int rank : ranks) {
174  UnassignGroup(largest, rank);
175  }
176  } while (largest->Max() != 0);
177 }
178 
180  for (int dest = 1; dest < comm->GetSize(); dest++)
181  comm->Send(ControlFlag::QUIT, dest, ControlTag);
182 }
183 
184 
185 WorkerServer::WorkerServer(boost::property_tree::ptree const& pt,
186  std::shared_ptr<parcer::Communicator> comm,
187  std::shared_ptr<PhonebookClient> phonebookClient,
188  int RootRank,
189  std::shared_ptr<ParallelizableMIComponentFactory> componentFactory,
190  std::shared_ptr<muq::Utilities::OTF2TracerBase> tracer)
191 {
192 
193  while (true) {
194  ControlFlag command = comm->Recv<ControlFlag>(RootRank, ControlTag);
195 
196  if (command == ControlFlag::ASSIGN) {
197  std::vector<int> subgroup_proc = comm->Recv<std::vector<int>>(0, ControlTag);
198  auto samplingProblemIndex = std::make_shared<MultiIndex>(comm->Recv<MultiIndex>(0, ControlTag));
199 
200  spdlog::trace("Setting up MPI subcommunicator for group");
201  MPI_Group world_group;
202  MPI_Comm_group (MPI_COMM_WORLD, &world_group);
203  MPI_Group subgroup;
204  MPI_Group_incl (world_group, subgroup_proc.size(), &subgroup_proc[0], &subgroup);
205 
206  MPI_Comm subcomm_raw;
207  MPI_Comm_create_group(MPI_COMM_WORLD, subgroup, ControlTag, &subcomm_raw);
208  auto subcomm = std::make_shared<parcer::Communicator>(subcomm_raw);
209 
210  componentFactory->SetComm(subcomm);
211  spdlog::trace("Setting up ParallelMIComponentFactory");
212  auto parallelComponentFactory = std::make_shared<ParallelMIComponentFactory>(subcomm, comm, componentFactory);
213 
214  if (subcomm->GetRank() == 0) {
215  std::cout << "Subgroup root is global " << comm->GetRank() << std::endl;
216  auto finestProblem = parallelComponentFactory->SamplingProblem(parallelComponentFactory->FinestIndex());
217 
218  spdlog::trace("Setting up ParallelMIMCMCBox");
219  auto box = std::make_shared<ParallelMIMCMCBox>(pt, parallelComponentFactory, samplingProblemIndex, comm, phonebookClient, tracer);
220 
221  spdlog::debug("Rank {} begins sampling", comm->GetRank());
222  const int subsampling = pt.get<int>("MLMCMC.Subsampling" + multiindexToConfigString(samplingProblemIndex));
223  tracer->enterRegion(TracerRegions::Sampling);
224  for (int i = 0; i < 2 + subsampling; i++) // TODO: Really subsampling on every level? Maybe subsample when requesting samples?
225  box->Sample();
226 
227  tracer->leaveRegion(TracerRegions::Sampling);
228  phonebookClient->SetWorkerReady(samplingProblemIndex, comm->GetRank());
229  spdlog::trace("Awaiting instructions");
230 
231  //Dune::Timer timer_idle;
232  //Dune::Timer timer_full;
233  while (true) {
234  MPI_Status status;
235  //timer_idle.start();
236  command = comm->Recv<ControlFlag>(MPI_ANY_SOURCE, ControlTag, &status);
237  //timer_idle.stop();
238  if (command == ControlFlag::UNASSIGN) {
239  comm->Send<std::vector<int>>(subgroup_proc, status.MPI_SOURCE, ControlTag);
240  break;
241  } else if (command == ControlFlag::SAMPLE) {
242  spdlog::trace("Send sample from {} to rank {}", comm->GetRank(), status.MPI_SOURCE);
243 
244  auto sampleCollection = box->FinestChain()->GetSamples(); // TODO: last() function for collection? // TODO: Do not store chains here
245  auto latestSample = sampleCollection->at(sampleCollection->size()-1);
246  // TODO: Send "full" sample via parcer?
247  comm->Send<Eigen::VectorXd>(latestSample->state[0], status.MPI_SOURCE, ControlTag);
248  comm->Send<double>(AnyCast(latestSample->meta["LogTarget"]), status.MPI_SOURCE, ControlTag);
249  if (latestSample->HasMeta("QOI")) {
250  std::shared_ptr<SamplingState> qoi = AnyCast(latestSample->meta["QOI"]);
251  comm->Send<Eigen::VectorXd>(qoi->state[0], status.MPI_SOURCE, ControlTag);
252  } else {
253  spdlog::error("No QOI!");
254  exit(-1);
255  }
256 
257 
258  spdlog::trace("Sampling");
259  tracer->enterRegion(TracerRegions::Sampling);
260  for (int i = 0; i < 1 + subsampling; i++) // TODO: Really subsampling on every level? Maybe subsample when requesting samples?
261  box->Sample();
262  tracer->leaveRegion(TracerRegions::Sampling);
263  phonebookClient->SetWorkerReady(samplingProblemIndex, comm->GetRank());
264  } else if (command == ControlFlag::SAMPLE_BOX) {
265  spdlog::trace("Send box from {} to rank {}", comm->GetRank(), status.MPI_SOURCE);
266 
267  for (int i = 0; i < box->NumChains(); i++) {
268  auto sampleCollection = box->GetChain(i)->GetSamples(); // TODO: Do not store chains here
269  auto latestSample = sampleCollection->back();
270  // TODO: Send "full" sample via parcer?
271  comm->Send<Eigen::VectorXd>(latestSample->state[0], status.MPI_SOURCE, ControlTag);
272  comm->Send<double>(AnyCast(latestSample->meta["LogTarget"]), status.MPI_SOURCE, ControlTag);
273  if (latestSample->HasMeta("QOI")) {
274  std::shared_ptr<SamplingState> qoi = AnyCast(latestSample->meta["QOI"]);
275  comm->Send<Eigen::VectorXd>(qoi->state[0], status.MPI_SOURCE, ControlTag);
276  } else {
277  spdlog::error("No QOI!");
278  exit(-1);
279  }
280  }
281 
282  assert(box->GetQOIDiff()->size() > 0);
283  auto latestDiffSample = box->GetQOIDiff()->back();
284  comm->Send<Eigen::VectorXd>(latestDiffSample->state[0], status.MPI_SOURCE, ControlTag);
285 
286  tracer->enterRegion(TracerRegions::Sampling);
287  for (int i = 0; i < 1 + subsampling; i++) // TODO: Really subsampling on every level? Maybe subsample when requesting samples?
288  box->Sample();
289  tracer->leaveRegion(TracerRegions::Sampling);
290  phonebookClient->SetWorkerReady(samplingProblemIndex, comm->GetRank());
291  } else {
292  spdlog::error("Controller received unexpected command!");
293  exit(-1);
294  }
295 
296  }
297  //std::cout << "Worker Controller " << comm->GetRank() << " idle time:\t" << timer_idle.elapsed() << " of:\t" << timer_full.elapsed() << std::endl;
298 
299  }
300 
301  parallelComponentFactory->finalize();
302  spdlog::trace("Rank {} finalized", comm->GetRank());
303 
304  } else if (command == ControlFlag::ASSIGN_COLLECTOR) {
305 
306  std::vector<int> subgroup_proc = comm->Recv<std::vector<int>>(RootRank, ControlTag);
307  auto boxHighestIndex = std::make_shared<MultiIndex>(comm->Recv<MultiIndex>(RootRank, ControlTag));
308 
309  // Set up subcommunicator
310  MPI_Group world_group;
311  MPI_Comm_group (MPI_COMM_WORLD, &world_group);
312  MPI_Group subgroup;
313  MPI_Group_incl (world_group, subgroup_proc.size(), &subgroup_proc[0], &subgroup);
314 
315  MPI_Comm subcomm_raw;
316  MPI_Comm_create_group(MPI_COMM_WORLD, subgroup, ControlTag, &subcomm_raw);
317  auto subcomm = std::make_shared<parcer::Communicator>(subcomm_raw);
318 
319 
320  // Set up Multiindex box
321  auto boxLowestIndex = MultiIndex::Copy(boxHighestIndex);
322  --(*boxLowestIndex);
323  std::shared_ptr<MultiIndex> boxSize = std::make_shared<MultiIndex>(*boxHighestIndex - *boxLowestIndex);
324  std::shared_ptr<MultiIndexSet> boxIndices = MultiIndexFactory::CreateFullTensor(boxSize->GetVector());
325 
326  std::vector<std::shared_ptr<DistributedCollection>> sampleCollections(boxIndices->Size());
327  std::vector<std::shared_ptr<DistributedCollection>> qoiCollections(boxIndices->Size());
328  std::shared_ptr<DistributedCollection> qoiDiffCollection = std::make_shared<DistributedCollection>(std::make_shared<MarkovChain>(), subcomm);
329  for (uint i = 0; i < boxIndices->Size(); i++) {
330  auto sampleCollection = std::make_shared<MarkovChain>();
331  sampleCollections[i] = std::make_shared<DistributedCollection>(sampleCollection, subcomm);
332  auto qoiCollection = std::make_shared<MarkovChain>();
333  qoiCollections[i] = std::make_shared<DistributedCollection>(qoiCollection, subcomm);
334  }
335 
336 
337  //Dune::Timer timer_idle;
338  //Dune::Timer timer_full;
339  while (true) {
340  MPI_Status status;
341  //timer_idle.start();
342  command = comm->Recv<ControlFlag>(MPI_ANY_SOURCE, ControlTag, &status);
343  //timer_idle.stop();
344  if (command == ControlFlag::UNASSIGN)
345  break;
346  tracer->enterRegion(TracerRegions::CollectorBusy);
347  if (command == ControlFlag::SAMPLE_BOX) {
348 
349  int numSamples = comm->Recv<int>(0, ControlTag);
350  spdlog::debug("Collecting {} samples for box {}", numSamples, *boxHighestIndex);
351 
352  for (int i = 0; i < numSamples; i++) {
353  spdlog::trace("Requesting sample box for model {}", *boxHighestIndex);
354  int remoteRank = phonebookClient->Query(boxHighestIndex, boxHighestIndex, false);
355  comm->Send(ControlFlag::SAMPLE_BOX, remoteRank, ControlTag); // TODO: Receive sample in one piece?
356  for (uint j = 0; j < boxIndices->Size(); j++) {
357  auto new_state = std::make_shared<SamplingState>(comm->Recv<Eigen::VectorXd>(remoteRank, ControlTag));
358  new_state->meta["LogTarget"] = comm->Recv<double>(remoteRank, ControlTag);
359  sampleCollections[j]->Add(new_state);
360  qoiCollections[j]->Add(std::make_shared<SamplingState>(comm->Recv<Eigen::VectorXd>(remoteRank, ControlTag)));
361  }
362  qoiDiffCollection->Add(std::make_shared<SamplingState>(comm->Recv<Eigen::VectorXd>(remoteRank, ControlTag)));
363  if ((i+1) % std::max(1,numSamples / 10) == 0)
364  spdlog::debug("Collected {} out of {} samples for model {}", i+1, numSamples, *boxHighestIndex);
365  }
366  if (subcomm->GetRank() == 0)
367  comm->Send(ControlFlag::SAMPLE_BOX_DONE, RootRank, ControlTag); // TODO: Receive sample in one piece?
368  // box->Sample();
369  } else if (command == ControlFlag::MEANS) {
370  std::list<Eigen::VectorXd> sampleMeans;
371  std::list<Eigen::VectorXd> qoiMeans;
372  for (uint i = 0; i < boxIndices->Size(); i++) {
373  sampleMeans.push_back(sampleCollections[i]->GlobalMean());
374  qoiMeans.push_back(qoiCollections[i]->GlobalMean());
375  }
376  if (subcomm->GetRank() == 0) {
377  comm->Send(ControlFlag::MEANS_DONE, RootRank, ControlTag);
378  auto qoiMean = qoiMeans.begin();
379  for (auto sampleMean = sampleMeans.begin(); sampleMean != sampleMeans.end(); sampleMean++) {
380  comm->Send(*sampleMean, RootRank, ControlTag);
381  comm->Send(*qoiMean, RootRank, ControlTag);
382  qoiMean++;
383  }
384  }
385  } else if (command == ControlFlag::WRITE_TO_FILE) {
386  std::string filename = comm->Recv<std::string>(status.MPI_SOURCE, ControlTag);
387  for (uint i = 0; i < boxIndices->Size(); i++) {
388  std::shared_ptr<MultiIndex> boxIndex = (*boxIndices)[i];
389  sampleCollections[i]->WriteToFile(filename, "/Collector_model" + boxHighestIndex->ToString() + "_subchain_" + boxIndex->ToString() + "_samples_rank_" + std::to_string(subcomm->GetRank()));
390  qoiCollections[i]->WriteToFile(filename, "/Collector_model" + boxHighestIndex->ToString() + "_subchain_" + boxIndex->ToString() + "_qois_rank_" + std::to_string(subcomm->GetRank()));
391  }
392  qoiDiffCollection->WriteToFile(filename, "/Collector_model" + boxHighestIndex->ToString() + "_qoi_diff_rank_" + std::to_string(subcomm->GetRank()));
393  comm->Send(true, status.MPI_SOURCE, ControlTag);
394  } else {
395  std::cerr << "Unexpected command!" << std::endl;
396  exit(43);
397  }
398  tracer->leaveRegion(TracerRegions::CollectorBusy);
399 
400  }
401 
402  } else if (command == ControlFlag::QUIT) {
403  spdlog::trace("Rank {} quit", comm->GetRank());
404  break;
405  } else {
406  std::cerr << "Unexpected command!" << std::endl;
407  exit(42);
408  }
409  }
410 }
411 
412 
413 std::string WorkerServer::multiindexToConfigString (std::shared_ptr<MultiIndex> index)
414 {
415  std::stringstream strs;
416  for (int i = 0; i < index->GetLength(); i++) {
417  strs << "_" << index->GetValue(i);
418  }
419  return strs.str();
420 }
std::shared_ptr< MultiIndex > GetModelIndex() const
bool Receive(ControlFlag command, const MPI_Status &status)
std::shared_ptr< MultiIndexSet > boxIndices
CollectorClient(std::shared_ptr< parcer::Communicator > comm, std::vector< int > subgroup, std::shared_ptr< MultiIndex > modelindex)
std::shared_ptr< MultiIndex > boxLowestIndex
std::shared_ptr< parcer::Communicator > comm
std::shared_ptr< MultiIndex > boxHighestIndex
std::shared_ptr< parcer::Communicator > comm
void assignGroup(std::vector< int > subgroup, std::shared_ptr< MultiIndex > modelindex)
std::shared_ptr< PhonebookClient > phonebookClient
std::vector< int > UnassignGroup(std::shared_ptr< MultiIndex > modelIndex, int groupRootRank)
WorkerClient(std::shared_ptr< parcer::Communicator > comm, std::shared_ptr< PhonebookClient > phonebookClient, int RootRank)
WorkerServer(boost::property_tree::ptree const &pt, std::shared_ptr< parcer::Communicator > comm, std::shared_ptr< PhonebookClient > phonebookClient, int RootRank, std::shared_ptr< ParallelizableMIComponentFactory > componentFactory, std::shared_ptr< muq::Utilities::OTF2TracerBase > tracer)
std::string multiindexToConfigString(std::shared_ptr< MultiIndex > index)
ControlFlag
Flags used by parallel MCMC/MIMCMC type methods.
Definition: ParallelFlags.h:23
const int ControlTag
Tags separating MPI communication between control level processes and internal work group communicati...
Definition: ParallelFlags.h:15
NLOHMANN_BASIC_JSON_TPL_DECLARATION std::string to_string(const NLOHMANN_BASIC_JSON_TPL &j)
user-defined to_string function for JSON values
Definition: json.h:25172