6 #include "spdlog/spdlog.h"
7 #include "spdlog/fmt/ostr.h"
20 std::vector<int> subgroup,
21 std::shared_ptr<MultiIndex> modelindex)
22 : comm(comm), subgroup(subgroup), boxHighestIndex(modelindex)
35 boxIndices = MultiIndexFactory::CreateFullTensor(boxSize->GetVector());
46 spdlog::debug(
"Kick off collection of {} samples", numSamples);
48 int samplesAssigned = 0;
50 for (
int i = 0; i <
subgroup.size(); i++) {
53 int assigning = (numSamples - samplesAssigned) / (
subgroup.size() - i);
55 samplesAssigned += assigning;
57 spdlog::debug(
"Collection kick off done");
79 if (status.MPI_SOURCE !=
subgroup[0])
86 for (uint i = 0; i <
boxIndices->Size(); i++) {
87 std::shared_ptr<MultiIndex> boxIndex = (*boxIndices)[i];
89 Eigen::VectorXd chainSampleMean =
comm->Recv<Eigen::VectorXd>(status.MPI_SOURCE,
ControlTag);
90 Eigen::VectorXd chainQOIMean =
comm->Recv<Eigen::VectorXd>(status.MPI_SOURCE,
ControlTag);
92 std::shared_ptr<MultiIndex> index = std::make_shared<MultiIndex>(*
boxLowestIndex + *boxIndex);
93 auto indexDiffFromTop = std::make_shared<MultiIndex>(*
boxHighestIndex - *index);
97 if (indexDiffFromTop->Sum() % 2 == 0) {
103 if (indexDiffFromTop->Sum() % 2 == 0) {
112 std::cerr <<
"Unexpected command!" << std::endl;
140 std::shared_ptr<PhonebookClient> phonebookClient,
141 int RootRank) : comm(comm), phonebookClient(phonebookClient)
148 for (
int dest : subgroup) {
158 spdlog::trace(
"UnRegister {}", groupRootRank);
160 spdlog::trace(
"Sending unassign to {}", groupRootRank);
162 std::vector<int> groupMembers =
comm->Recv<std::vector<int>>(groupRootRank,
ControlTag);
168 std::shared_ptr<MultiIndex> largest =
nullptr;
171 spdlog::trace(
"Unassigning model {}", *largest);
173 for (
int rank : ranks) {
176 }
while (largest->Max() != 0);
180 for (
int dest = 1; dest <
comm->GetSize(); dest++)
186 std::shared_ptr<parcer::Communicator> comm,
187 std::shared_ptr<PhonebookClient> phonebookClient,
189 std::shared_ptr<ParallelizableMIComponentFactory> componentFactory,
190 std::shared_ptr<muq::Utilities::OTF2TracerBase> tracer)
197 std::vector<int> subgroup_proc = comm->Recv<std::vector<int>>(0,
ControlTag);
198 auto samplingProblemIndex = std::make_shared<MultiIndex>(comm->Recv<MultiIndex>(0,
ControlTag));
200 spdlog::trace(
"Setting up MPI subcommunicator for group");
201 MPI_Group world_group;
202 MPI_Comm_group (MPI_COMM_WORLD, &world_group);
204 MPI_Group_incl (world_group, subgroup_proc.size(), &subgroup_proc[0], &subgroup);
206 MPI_Comm subcomm_raw;
207 MPI_Comm_create_group(MPI_COMM_WORLD, subgroup,
ControlTag, &subcomm_raw);
208 auto subcomm = std::make_shared<parcer::Communicator>(subcomm_raw);
210 componentFactory->SetComm(subcomm);
211 spdlog::trace(
"Setting up ParallelMIComponentFactory");
212 auto parallelComponentFactory = std::make_shared<ParallelMIComponentFactory>(subcomm, comm, componentFactory);
214 if (subcomm->GetRank() == 0) {
215 std::cout <<
"Subgroup root is global " << comm->GetRank() << std::endl;
216 auto finestProblem = parallelComponentFactory->SamplingProblem(parallelComponentFactory->FinestIndex());
218 spdlog::trace(
"Setting up ParallelMIMCMCBox");
219 auto box = std::make_shared<ParallelMIMCMCBox>(pt, parallelComponentFactory, samplingProblemIndex, comm, phonebookClient, tracer);
221 spdlog::debug(
"Rank {} begins sampling", comm->GetRank());
224 for (
int i = 0; i < 2 + subsampling; i++)
228 phonebookClient->SetWorkerReady(samplingProblemIndex, comm->GetRank());
229 spdlog::trace(
"Awaiting instructions");
239 comm->Send<std::vector<int>>(subgroup_proc, status.MPI_SOURCE,
ControlTag);
242 spdlog::trace(
"Send sample from {} to rank {}", comm->GetRank(), status.MPI_SOURCE);
244 auto sampleCollection = box->FinestChain()->GetSamples();
245 auto latestSample = sampleCollection->at(sampleCollection->size()-1);
247 comm->Send<Eigen::VectorXd>(latestSample->state[0], status.MPI_SOURCE,
ControlTag);
248 comm->Send<
double>(AnyCast(latestSample->meta[
"LogTarget"]), status.MPI_SOURCE,
ControlTag);
249 if (latestSample->HasMeta(
"QOI")) {
250 std::shared_ptr<SamplingState> qoi = AnyCast(latestSample->meta[
"QOI"]);
251 comm->Send<Eigen::VectorXd>(qoi->state[0], status.MPI_SOURCE,
ControlTag);
253 spdlog::error(
"No QOI!");
258 spdlog::trace(
"Sampling");
260 for (
int i = 0; i < 1 + subsampling; i++)
263 phonebookClient->SetWorkerReady(samplingProblemIndex, comm->GetRank());
265 spdlog::trace(
"Send box from {} to rank {}", comm->GetRank(), status.MPI_SOURCE);
267 for (
int i = 0; i < box->NumChains(); i++) {
268 auto sampleCollection = box->GetChain(i)->GetSamples();
269 auto latestSample = sampleCollection->back();
271 comm->Send<Eigen::VectorXd>(latestSample->state[0], status.MPI_SOURCE,
ControlTag);
272 comm->Send<
double>(AnyCast(latestSample->meta[
"LogTarget"]), status.MPI_SOURCE,
ControlTag);
273 if (latestSample->HasMeta(
"QOI")) {
274 std::shared_ptr<SamplingState> qoi = AnyCast(latestSample->meta[
"QOI"]);
275 comm->Send<Eigen::VectorXd>(qoi->state[0], status.MPI_SOURCE,
ControlTag);
277 spdlog::error(
"No QOI!");
282 assert(box->GetQOIDiff()->size() > 0);
283 auto latestDiffSample = box->GetQOIDiff()->back();
284 comm->Send<Eigen::VectorXd>(latestDiffSample->state[0], status.MPI_SOURCE,
ControlTag);
287 for (
int i = 0; i < 1 + subsampling; i++)
290 phonebookClient->SetWorkerReady(samplingProblemIndex, comm->GetRank());
292 spdlog::error(
"Controller received unexpected command!");
301 parallelComponentFactory->finalize();
302 spdlog::trace(
"Rank {} finalized", comm->GetRank());
306 std::vector<int> subgroup_proc = comm->Recv<std::vector<int>>(RootRank,
ControlTag);
307 auto boxHighestIndex = std::make_shared<MultiIndex>(comm->Recv<MultiIndex>(RootRank,
ControlTag));
310 MPI_Group world_group;
311 MPI_Comm_group (MPI_COMM_WORLD, &world_group);
313 MPI_Group_incl (world_group, subgroup_proc.size(), &subgroup_proc[0], &subgroup);
315 MPI_Comm subcomm_raw;
316 MPI_Comm_create_group(MPI_COMM_WORLD, subgroup,
ControlTag, &subcomm_raw);
317 auto subcomm = std::make_shared<parcer::Communicator>(subcomm_raw);
321 auto boxLowestIndex = MultiIndex::Copy(boxHighestIndex);
323 std::shared_ptr<MultiIndex> boxSize = std::make_shared<MultiIndex>(*boxHighestIndex - *boxLowestIndex);
324 std::shared_ptr<MultiIndexSet> boxIndices = MultiIndexFactory::CreateFullTensor(boxSize->GetVector());
326 std::vector<std::shared_ptr<DistributedCollection>> sampleCollections(boxIndices->Size());
327 std::vector<std::shared_ptr<DistributedCollection>> qoiCollections(boxIndices->Size());
328 std::shared_ptr<DistributedCollection> qoiDiffCollection = std::make_shared<DistributedCollection>(std::make_shared<MarkovChain>(), subcomm);
329 for (uint i = 0; i < boxIndices->Size(); i++) {
330 auto sampleCollection = std::make_shared<MarkovChain>();
331 sampleCollections[i] = std::make_shared<DistributedCollection>(sampleCollection, subcomm);
332 auto qoiCollection = std::make_shared<MarkovChain>();
333 qoiCollections[i] = std::make_shared<DistributedCollection>(qoiCollection, subcomm);
349 int numSamples = comm->Recv<
int>(0,
ControlTag);
350 spdlog::debug(
"Collecting {} samples for box {}", numSamples, *boxHighestIndex);
352 for (
int i = 0; i < numSamples; i++) {
353 spdlog::trace(
"Requesting sample box for model {}", *boxHighestIndex);
354 int remoteRank = phonebookClient->Query(boxHighestIndex, boxHighestIndex,
false);
356 for (uint j = 0; j < boxIndices->Size(); j++) {
357 auto new_state = std::make_shared<SamplingState>(comm->Recv<Eigen::VectorXd>(remoteRank,
ControlTag));
358 new_state->meta[
"LogTarget"] = comm->Recv<
double>(remoteRank,
ControlTag);
359 sampleCollections[j]->Add(new_state);
360 qoiCollections[j]->Add(std::make_shared<SamplingState>(comm->Recv<Eigen::VectorXd>(remoteRank,
ControlTag)));
362 qoiDiffCollection->Add(std::make_shared<SamplingState>(comm->Recv<Eigen::VectorXd>(remoteRank,
ControlTag)));
363 if ((i+1) % std::max(1,numSamples / 10) == 0)
364 spdlog::debug(
"Collected {} out of {} samples for model {}", i+1, numSamples, *boxHighestIndex);
366 if (subcomm->GetRank() == 0)
370 std::list<Eigen::VectorXd> sampleMeans;
371 std::list<Eigen::VectorXd> qoiMeans;
372 for (uint i = 0; i < boxIndices->Size(); i++) {
373 sampleMeans.push_back(sampleCollections[i]->GlobalMean());
374 qoiMeans.push_back(qoiCollections[i]->GlobalMean());
376 if (subcomm->GetRank() == 0) {
378 auto qoiMean = qoiMeans.begin();
379 for (
auto sampleMean = sampleMeans.begin(); sampleMean != sampleMeans.end(); sampleMean++) {
380 comm->Send(*sampleMean, RootRank,
ControlTag);
386 std::string filename = comm->Recv<std::string>(status.MPI_SOURCE,
ControlTag);
387 for (uint i = 0; i < boxIndices->Size(); i++) {
388 std::shared_ptr<MultiIndex> boxIndex = (*boxIndices)[i];
389 sampleCollections[i]->WriteToFile(filename,
"/Collector_model" + boxHighestIndex->ToString() +
"_subchain_" + boxIndex->ToString() +
"_samples_rank_" +
std::to_string(subcomm->GetRank()));
390 qoiCollections[i]->WriteToFile(filename,
"/Collector_model" + boxHighestIndex->ToString() +
"_subchain_" + boxIndex->ToString() +
"_qois_rank_" +
std::to_string(subcomm->GetRank()));
392 qoiDiffCollection->WriteToFile(filename,
"/Collector_model" + boxHighestIndex->ToString() +
"_qoi_diff_rank_" +
std::to_string(subcomm->GetRank()));
393 comm->Send(
true, status.MPI_SOURCE,
ControlTag);
395 std::cerr <<
"Unexpected command!" << std::endl;
403 spdlog::trace(
"Rank {} quit", comm->GetRank());
406 std::cerr <<
"Unexpected command!" << std::endl;
415 std::stringstream strs;
416 for (
int i = 0; i < index->GetLength(); i++) {
417 strs <<
"_" << index->GetValue(i);
Eigen::VectorXd GetQOIMean()
std::shared_ptr< MultiIndex > GetModelIndex() const
bool Receive(ControlFlag command, const MPI_Status &status)
void WriteToFile(std::string filename)
std::shared_ptr< MultiIndexSet > boxIndices
CollectorClient(std::shared_ptr< parcer::Communicator > comm, std::vector< int > subgroup, std::shared_ptr< MultiIndex > modelindex)
std::shared_ptr< MultiIndex > boxLowestIndex
void CollectSamples(int numSamples)
std::shared_ptr< parcer::Communicator > comm
std::vector< int > subgroup
Eigen::VectorXd boxQOIMean
std::shared_ptr< MultiIndex > boxHighestIndex
std::shared_ptr< parcer::Communicator > comm
void assignGroup(std::vector< int > subgroup, std::shared_ptr< MultiIndex > modelindex)
std::shared_ptr< PhonebookClient > phonebookClient
std::vector< int > UnassignGroup(std::shared_ptr< MultiIndex > modelIndex, int groupRootRank)
WorkerClient(std::shared_ptr< parcer::Communicator > comm, std::shared_ptr< PhonebookClient > phonebookClient, int RootRank)
WorkerServer(boost::property_tree::ptree const &pt, std::shared_ptr< parcer::Communicator > comm, std::shared_ptr< PhonebookClient > phonebookClient, int RootRank, std::shared_ptr< ParallelizableMIComponentFactory > componentFactory, std::shared_ptr< muq::Utilities::OTF2TracerBase > tracer)
std::string multiindexToConfigString(std::shared_ptr< MultiIndex > index)
ControlFlag
Flags used by parallel MCMC/MIMCMC type methods.
const int ControlTag
Tags separating MPI communication between control level processes and internal work group communicati...
NLOHMANN_BASIC_JSON_TPL_DECLARATION std::string to_string(const NLOHMANN_BASIC_JSON_TPL &j)
user-defined to_string function for JSON values