11 spdlog::info(
"Balancing load across {} ranks", availableRanks);
12 auto indices = MultiIndexFactory::CreateFullTensor(componentFactory->FinestIndex()->GetVector());
39 std::shared_ptr<ParallelizableMIComponentFactory> componentFactory,
40 std::shared_ptr<StaticLoadBalancer> loadBalancing,
41 std::shared_ptr<parcer::Communicator> comm,
42 std::shared_ptr<muq::Utilities::OTF2TracerBase> tracer)
46 componentFactory(componentFactory),
47 phonebookClient(std::make_shared<
PhonebookClient>(comm, phonebookRank)),
48 workerClient(comm, phonebookClient, rootRank)
51 spdlog::debug(
"Rank: {}",
comm->GetRank());
55 auto comm_self = std::make_shared<parcer::Communicator>(MPI_COMM_SELF);
58 auto indices = MultiIndexFactory::CreateFullTensor(
componentFactory->FinestIndex()->GetVector());
60 assert(
comm->GetSize() - 2 >= 0);
66 spdlog::trace(
"Assigning collectors");
67 for (
int i = 0; i < indices->Size(); i++) {
68 std::shared_ptr<MultiIndex> index = (*indices)[i];
69 std::vector<int> collectorRanks;
70 int numCollectors = loadBalancing->numCollectors(index);
71 for (
int r = 0; r < numCollectors; r++) {
72 collectorRanks.push_back(rank);
79 spdlog::trace(
"Assigning workers");
80 for (
int i = 0; i < indices->Size(); i++) {
82 std::shared_ptr<MultiIndex> index = (*indices)[i];
85 for (
int group = 0; group < assignment.
numGroups; group++) {
86 std::vector<int> groupRanks;
88 groupRanks.push_back(rank);
94 assert (rank <= comm->GetSize());
122 return Eigen::VectorXd::Zero(1);
126 client.ComputeMeans();
134 if (client.Receive(command, status))
138 bool isComputingMeans =
false;
140 isComputingMeans = isComputingMeans || client.IsComputingMeans();
142 if (!isComputingMeans)
145 spdlog::info(
"Computing means completed");
150 Eigen::VectorXd mean = mean_box;
153 mean_box = client.GetQOIMean();
164 std::cout <<
"Starting unassign sequence" << std::endl;
169 std::cout <<
"Finished unassign sequence" << std::endl;
172 std::cout <<
"Rank " <<
comm->GetRank() <<
" quit" << std::endl;
182 client.GetModelIndex();
183 if (client.GetModelIndex() == index) {
184 client.CollectSamples(numSamples);
188 std::cerr <<
"Requested samples from nonexisting collector!" << std::endl;
198 client.CollectSamples(numSamples);
212 bool command_handled =
false;
214 if (client.Receive(command, status)) {
215 command_handled =
true;
220 if (!command_handled) {
222 spdlog::debug(
"SCHEDULING_NEEDED entered!");
224 auto idle_index = std::make_shared<MultiIndex>(
comm->Recv<MultiIndex>(status.MPI_SOURCE,
ControlTag));
225 int rescheduleRank =
comm->Recv<
int>(status.MPI_SOURCE,
ControlTag);
226 auto busy_index = std::make_shared<MultiIndex>(
comm->Recv<MultiIndex>(status.MPI_SOURCE,
ControlTag));
228 spdlog::debug(
"SCHEDULING_NEEDED Unassigning {}!", rescheduleRank);
234 spdlog::debug(
"SCHEDULING_NEEDED left!");
235 command_handled =
true;
240 if (!command_handled) {
241 std::cerr <<
"Unexpected command!" << std::endl;
245 bool isSampling =
false;
247 isSampling = isSampling || client.IsSampling();
252 spdlog::debug(
"Sampling completed");
261 client.WriteToFile(filename);
269 client.CollectSamples(numSamples);
278 std::stringstream strs;
279 for (
int i = 0; i < index->GetLength(); i++) {
280 strs <<
"_" << index->GetValue(i);
High-level communication wrapper for controlling SampleCollectors.
High-level wrapper for communicating with the phonebook process.
Phonebook implementation facilitating communication between different roles in a parallel MIMCMC type...
WorkerAssignment numWorkers(std::shared_ptr< MultiIndex > modelIndex) override
Number of worker groups and number of workers per group for a given model index.
int numCollectors(std::shared_ptr< MultiIndex > modelIndex) override
Number of collector processes assigned to a model index.
void setup(std::shared_ptr< ParallelizableMIComponentFactory > componentFactory, uint availableRanks) override
void RunSamples()
Run the parallel method.
virtual std::shared_ptr< SampleCollection > RunImpl(std::vector< Eigen::VectorXd > const &x0)
WorkerClient workerClient
void Finalize()
Cleanup parallel method, wait for all ranks to finish.
std::shared_ptr< ParallelizableMIComponentFactory > componentFactory
StaticLoadBalancingMIMCMC(pt::ptree pt, std::shared_ptr< ParallelizableMIComponentFactory > componentFactory, std::shared_ptr< StaticLoadBalancer > loadBalancing=std::make_shared< RoundRobinStaticLoadBalancer >(), std::shared_ptr< parcer::Communicator > comm=std::make_shared< parcer::Communicator >(MPI_COMM_WORLD), std::shared_ptr< muq::Utilities::OTF2TracerBase > tracer=std::make_shared< OTF2TracerDummy >())
std::shared_ptr< PhonebookClient > phonebookClient
std::string multiindexToConfigString(std::shared_ptr< MultiIndex > index)
std::shared_ptr< parcer::Communicator > comm
void RequestSamplesAll(int numSamples)
Request an additional number of samples to be computed on each level.
virtual std::shared_ptr< SampleCollection > GetSamples() const
Dummy implementation; required by interface, has no meaning in ML/MI setting.
void WriteToFile(std::string filename)
std::vector< CollectorClient > collectorClients
Eigen::VectorXd MeanQOI()
Get mean quantity of interest estimate computed via telescoping sum.
void RequestSamples(std::shared_ptr< MultiIndex > index, int numSamples)
Request additional samples to be compute for a given model index.
virtual std::shared_ptr< SampleCollection > GetQOIs() const
Dummy implementation; required by interface, has no meaning in ML/MI setting.
void assignGroup(std::vector< int > subgroup, std::shared_ptr< MultiIndex > modelindex)
std::vector< int > UnassignGroup(std::shared_ptr< MultiIndex > modelIndex, int groupRootRank)
Implements the actual sampling / collecting logic for parallel MIMCMC.
ControlFlag
Flags used by parallel MCMC/MIMCMC type methods.
const int ControlTag
Tags separating MPI communication between control level processes and internal work group communicati...