MUQ  0.4.3
ParallelFixedSamplesMIMCMC.cpp
Go to the documentation of this file.
2 
4 
5 using namespace muq::SamplingAlgorithms;
6 
7 
8 void RoundRobinStaticLoadBalancer::setup(std::shared_ptr<ParallelizableMIComponentFactory> componentFactory, uint availableRanks)
9 {
10  ranks_remaining = availableRanks;
11  spdlog::info("Balancing load across {} ranks", availableRanks);
12  auto indices = MultiIndexFactory::CreateFullTensor(componentFactory->FinestIndex()->GetVector());
13  models_remaining = indices->Size();
14 }
15 
16 int RoundRobinStaticLoadBalancer::numCollectors(std::shared_ptr<MultiIndex> modelIndex)
17 {
19  return 1;
20 }
21 
23 {
24  WorkerAssignment assignment;
25  assignment.numWorkersPerGroup = 1;
27 
28  spdlog::debug("Of {}, assigning {} to model {}", ranks_remaining, assignment.numGroups * assignment.numWorkersPerGroup, *modelIndex);
29 
30  assert (assignment.numGroups * assignment.numWorkersPerGroup > 0);
31 
33  ranks_remaining -= assignment.numGroups * assignment.numWorkersPerGroup;
34 
35  return assignment;
36 }
37 
39  std::shared_ptr<ParallelizableMIComponentFactory> componentFactory,
40  std::shared_ptr<StaticLoadBalancer> loadBalancing,
41  std::shared_ptr<parcer::Communicator> comm,
42  std::shared_ptr<muq::Utilities::OTF2TracerBase> tracer)
43  : SamplingAlgorithm(nullptr),
44  pt(pt),
45  comm(comm),
46  componentFactory(componentFactory),
47  phonebookClient(std::make_shared<PhonebookClient>(comm, phonebookRank)),
48  workerClient(comm, phonebookClient, rootRank)
49 {
50 
51  spdlog::debug("Rank: {}", comm->GetRank());
52 
53  if (comm->GetRank() == rootRank) {
54 
55  auto comm_self = std::make_shared<parcer::Communicator>(MPI_COMM_SELF);
56  componentFactory->SetComm(comm_self);
57 
58  auto indices = MultiIndexFactory::CreateFullTensor(componentFactory->FinestIndex()->GetVector());
59 
60  assert(comm->GetSize() - 2 >= 0);
61  loadBalancing->setup(componentFactory, comm->GetSize() - 2);
62 
63  int rank = 2;
64 
65  // Assign collectors
66  spdlog::trace("Assigning collectors");
67  for (int i = 0; i < indices->Size(); i++) {
68  std::shared_ptr<MultiIndex> index = (*indices)[i];
69  std::vector<int> collectorRanks;
70  int numCollectors = loadBalancing->numCollectors(index);
71  for (int r = 0; r < numCollectors; r++) {
72  collectorRanks.push_back(rank);
73  rank++;
74  }
75  collectorClients.push_back(CollectorClient(comm, collectorRanks, index));
76  }
77 
78  // Assign workers
79  spdlog::trace("Assigning workers");
80  for (int i = 0; i < indices->Size(); i++) {
81 
82  std::shared_ptr<MultiIndex> index = (*indices)[i];
83  StaticLoadBalancer::WorkerAssignment assignment = loadBalancing->numWorkers(index);
84 
85  for (int group = 0; group < assignment.numGroups; group++) {
86  std::vector<int> groupRanks;
87  for (int r = 0; r < assignment.numWorkersPerGroup; r++) {
88  groupRanks.push_back(rank);
89  rank++;
90  }
91  workerClient.assignGroup(groupRanks, index);
92  }
93 
94  assert (rank <= comm->GetSize());
95  }
96 
97 
98  } else if (comm->GetRank() == phonebookRank) {
99  PhonebookServer phonebook(comm, pt.get<bool>("MLMCMC.Scheduling"), tracer);
100  phonebook.Run();
101  } else {
102  auto phonebookClient = std::make_shared<PhonebookClient>(comm, phonebookRank);
104  }
105 
106 }
107 
108 std::shared_ptr<SampleCollection> StaticLoadBalancingMIMCMC::GetSamples() const
109 {
110  return nullptr;
111 };
112 
113 std::shared_ptr<SampleCollection> StaticLoadBalancingMIMCMC::GetQOIs() const
114 {
115  return nullptr;
116 };
117 
118 
120 {
121  if (comm->GetRank() != rootRank) {
122  return Eigen::VectorXd::Zero(1);
123  }
124 
125  for (CollectorClient& client : collectorClients) {
126  client.ComputeMeans();
127  }
128 
129  while (true) {
130  MPI_Status status;
131  ControlFlag command = comm->Recv<ControlFlag>(MPI_ANY_SOURCE, ControlTag, &status);
132 
133  for (CollectorClient& client : collectorClients) {
134  if (client.Receive(command, status))
135  break;
136  }
137 
138  bool isComputingMeans = false;
139  for (CollectorClient& client : collectorClients) {
140  isComputingMeans = isComputingMeans || client.IsComputingMeans();
141  }
142  if (!isComputingMeans)
143  break;
144  }
145  spdlog::info("Computing means completed");
146 
147 
148  Eigen::VectorXd mean_box = collectorClients[0].GetQOIMean();
149  mean_box.setZero();
150  Eigen::VectorXd mean = mean_box;
151 
152  for (CollectorClient& client : collectorClients) {
153  mean_box = client.GetQOIMean();
154  mean += mean_box;
155  //std::cout << "Mean level:\t" << mean_box.transpose() << " adding up to:\t" << mean.transpose() << std::endl;
156  }
157  return mean;
158 }
159 
161 {
162  if (comm->GetRank() == rootRank) {
163  phonebookClient->SchedulingStop();
164  std::cout << "Starting unassign sequence" << std::endl;
165  for (CollectorClient& client : collectorClients) {
166  client.Unassign();
167  }
169  std::cout << "Finished unassign sequence" << std::endl;
170 
172  std::cout << "Rank " << comm->GetRank() << " quit" << std::endl;
173  }
174 }
175 
176 void StaticLoadBalancingMIMCMC::RequestSamples(std::shared_ptr<MultiIndex> index, int numSamples)
177 {
178  if (comm->GetRank() != rootRank) {
179  return;
180  }
181  for (CollectorClient& client : collectorClients) {
182  client.GetModelIndex();
183  if (client.GetModelIndex() == index) {
184  client.CollectSamples(numSamples);
185  return;
186  }
187  }
188  std::cerr << "Requested samples from nonexisting collector!" << std::endl;
189 }
190 
192 {
193  if (comm->GetRank() != rootRank) {
194  return;
195  }
196  // TODO: Get indices from collectors, then request samples for each index
197  for (CollectorClient& client : collectorClients) {
198  client.CollectSamples(numSamples);
199  }
200 }
201 
202 
204 {
205  if (comm->GetRank() != rootRank) {
206  return;
207  }
208  while (true) {
209  MPI_Status status;
210  ControlFlag command = comm->Recv<ControlFlag>(MPI_ANY_SOURCE, ControlTag, &status);
211 
212  bool command_handled = false;
213  for (CollectorClient& client : collectorClients) {
214  if (client.Receive(command, status)) {
215  command_handled = true;
216  break;
217  }
218  }
219 
220  if (!command_handled) {
221  if (command == ControlFlag::SCHEDULING_NEEDED) {
222  spdlog::debug("SCHEDULING_NEEDED entered!");
223  // TODO: Phonebook client receive analog zu CollectorClient statt manuellem Empangen!
224  auto idle_index = std::make_shared<MultiIndex>(comm->Recv<MultiIndex>(status.MPI_SOURCE, ControlTag));
225  int rescheduleRank = comm->Recv<int>(status.MPI_SOURCE, ControlTag);
226  auto busy_index = std::make_shared<MultiIndex>(comm->Recv<MultiIndex>(status.MPI_SOURCE, ControlTag));
227 
228  spdlog::debug("SCHEDULING_NEEDED Unassigning {}!", rescheduleRank);
229  std::vector<int> groupMembers = workerClient.UnassignGroup(idle_index, rescheduleRank);
230 
231  workerClient.assignGroup(groupMembers, busy_index);
232 
233  phonebookClient->SchedulingDone();
234  spdlog::debug("SCHEDULING_NEEDED left!");
235  command_handled = true;
236  }
237  }
238 
239 
240  if (!command_handled) {
241  std::cerr << "Unexpected command!" << std::endl;
242  exit(43);
243  }
244 
245  bool isSampling = false;
246  for (CollectorClient& client : collectorClients) {
247  isSampling = isSampling || client.IsSampling();
248  }
249  if (!isSampling)
250  break;
251  }
252  spdlog::debug("Sampling completed");
253 }
254 
255 void StaticLoadBalancingMIMCMC::WriteToFile(std::string filename)
256 {
257  if (comm->GetRank() != rootRank) {
258  return;
259  }
260  for (CollectorClient& client : collectorClients) {
261  client.WriteToFile(filename);
262  }
263 }
264 
265 std::shared_ptr<SampleCollection> StaticLoadBalancingMIMCMC::RunImpl(std::vector<Eigen::VectorXd> const& x0)
266 {
267  for (CollectorClient& client : collectorClients) {
268  int numSamples = pt.get<int>("NumSamples" + multiindexToConfigString(client.GetModelIndex()));
269  client.CollectSamples(numSamples);
270  }
271 
272  RunSamples();
273  return nullptr;
274 }
275 
276 std::string StaticLoadBalancingMIMCMC::multiindexToConfigString(std::shared_ptr<MultiIndex> index)
277 {
278  std::stringstream strs;
279  for (int i = 0; i < index->GetLength(); i++) {
280  strs << "_" << index->GetValue(i);
281  }
282  return strs.str();
283 }
High-level communication wrapper for controlling SampleCollectors.
High-level wrapper for communicating with the phonebook process.
Definition: Phonebook.h:393
Phonebook implementation facilitating communication between different roles in a parallel MIMCMC type...
Definition: Phonebook.h:38
WorkerAssignment numWorkers(std::shared_ptr< MultiIndex > modelIndex) override
Number of worker groups and number of workers per group for a given model index.
int numCollectors(std::shared_ptr< MultiIndex > modelIndex) override
Number of collector processes assigned to a model index.
void setup(std::shared_ptr< ParallelizableMIComponentFactory > componentFactory, uint availableRanks) override
virtual std::shared_ptr< SampleCollection > RunImpl(std::vector< Eigen::VectorXd > const &x0)
void Finalize()
Cleanup parallel method, wait for all ranks to finish.
std::shared_ptr< ParallelizableMIComponentFactory > componentFactory
StaticLoadBalancingMIMCMC(pt::ptree pt, std::shared_ptr< ParallelizableMIComponentFactory > componentFactory, std::shared_ptr< StaticLoadBalancer > loadBalancing=std::make_shared< RoundRobinStaticLoadBalancer >(), std::shared_ptr< parcer::Communicator > comm=std::make_shared< parcer::Communicator >(MPI_COMM_WORLD), std::shared_ptr< muq::Utilities::OTF2TracerBase > tracer=std::make_shared< OTF2TracerDummy >())
std::string multiindexToConfigString(std::shared_ptr< MultiIndex > index)
void RequestSamplesAll(int numSamples)
Request an additional number of samples to be computed on each level.
virtual std::shared_ptr< SampleCollection > GetSamples() const
Dummy implementation; required by interface, has no meaning in ML/MI setting.
Eigen::VectorXd MeanQOI()
Get mean quantity of interest estimate computed via telescoping sum.
void RequestSamples(std::shared_ptr< MultiIndex > index, int numSamples)
Request additional samples to be compute for a given model index.
virtual std::shared_ptr< SampleCollection > GetQOIs() const
Dummy implementation; required by interface, has no meaning in ML/MI setting.
void assignGroup(std::vector< int > subgroup, std::shared_ptr< MultiIndex > modelindex)
std::vector< int > UnassignGroup(std::shared_ptr< MultiIndex > modelIndex, int groupRootRank)
Implements the actual sampling / collecting logic for parallel MIMCMC.
ControlFlag
Flags used by parallel MCMC/MIMCMC type methods.
Definition: ParallelFlags.h:23
const int ControlTag
Tags separating MPI communication between control level processes and internal work group communicati...
Definition: ParallelFlags.h:15