MUQ  0.4.3
ParallelTempering.cpp
Go to the documentation of this file.
2 
3 #include <chrono>
4 #include <stdexcept>
5 #include <iomanip>
6 #include <ios>
7 
10 
13 
14 namespace pt = boost::property_tree;
15 using namespace muq::Utilities;
16 using namespace muq::SamplingAlgorithms;
17 
18 
19 ParallelTempering::ParallelTempering(boost::property_tree::ptree opts,
20  std::shared_ptr<InferenceProblem> const& problem) : ParallelTempering(opts, ExtractTemps(opts), ExtractKernels(opts, problem))
21 {}
22 
23 ParallelTempering::ParallelTempering(boost::property_tree::ptree opts,
24  Eigen::VectorXd inverseTemps,
25  std::vector<std::shared_ptr<TransitionKernel>> kernels) : ParallelTempering(opts, inverseTemps, StackObjects(kernels))
26 {}
27 
28 
29 
30 ParallelTempering::ParallelTempering(boost::property_tree::ptree opts,
31  Eigen::VectorXd inverseTemps,
32  std::vector<std::vector<std::shared_ptr<TransitionKernel>>> kernelsIn) : numTemps(inverseTemps.size()),
33  scheduler(std::make_shared<ThinScheduler>(opts)),
34  kernels(kernelsIn),
35  numSamps(opts.get<double>("NumSamples")),
36  burnIn(opts.get("BurnIn",0)),
37  printLevel(opts.get("PrintLevel",3)),
38  swapIncr(opts.get("Swap Increment", 2)),
39  seoSwaps(opts.get("Swap Type","DEO")=="SEO"),
40  cumulativeSwapProb(Eigen::VectorXd::Zero(numTemps-1)),
41  successfulSwaps(Eigen::VectorXd::Zero(numTemps-1)),
42  attemptedSwaps(Eigen::VectorXd::Zero(numTemps-1)),
43  nextAdaptInd(opts.get("Adapt Start", 100))
44 {
45  if(std::abs(inverseTemps(0))>std::numeric_limits<double>::epsilon()){
46  std::stringstream msg;
47  msg << "In ParallelTempering constructor. First inverse temperature in schedule must be 0.0.";
48  throw std::invalid_argument(msg.str());
49  }
50 
51  if(std::abs(inverseTemps(inverseTemps.size()-1)-1.0)>std::numeric_limits<double>::epsilon()){
52  std::stringstream msg;
53  msg << "In ParallelTempering constructor. Last inverse temperature in schedule must be 1.0.";
54  throw std::invalid_argument(msg.str());
55  }
56 
57 
58  if(inverseTemps.minCoeff()<-std::numeric_limits<double>::epsilon()){
59  std::stringstream msg;
60  msg << "In ParallelTempering constructor. Inverse temperatures must be in [0,1], but found a minimum temperature of " << inverseTemps.minCoeff();
61  throw std::invalid_argument(msg.str());
62  }
63 
64  if(inverseTemps.maxCoeff()>1+std::numeric_limits<double>::epsilon()){
65  std::stringstream msg;
66  msg << "In ParallelTempering constructor. Inverse temperatures must be in [0,1], but found a maximum temperature of " << inverseTemps.maxCoeff();
67  throw std::invalid_argument(msg.str());
68  }
69 
70  problems.resize(kernels.size());
71  for(unsigned int i=0; i<kernels.size(); ++i){
72  problems.at(i) = std::dynamic_pointer_cast<InferenceProblem>( kernels.at(i).at(0)->Problem() );
73 
74  if(problems.at(i)==nullptr){
75  std::stringstream msg;
76  msg << "In ParallelTempering constructor. Could not cast sampling problem for dimension " << i << " into an InfereceProblem.";
77  throw std::invalid_argument(msg.str());
78  }
79  }
80 
81  // Check to make sure the problems are not pointing at the same thing. Otherwise we won't be able to set the temperatures
82  for(unsigned int i=0; i<kernels.size()-1; ++i){
83  for(unsigned j=i+1; j<kernels.size(); ++j){
84  if(problems.at(i)==problems.at(j)){
85  std::stringstream msg;
86  msg << "In ParallelTempering constructor. Found pointers to the same sampling problem, which prevents setting the temperature at different levels.";
87  throw std::invalid_argument(msg.str());
88  }
89  }
90  }
91 
92  // Set the temperatures
93  chains.resize(kernels.size());
94  sampNums.resize(kernels.size(), 0);
95  for(unsigned int i=0; i<kernels.size(); ++i){
96  problems.at(i)->SetInverseTemp(inverseTemps(i));
97  chains.at(i) = std::make_shared<MarkovChain>();
98  }
99 }
100 
101 
102 void ParallelTempering::SetState(std::vector<std::shared_ptr<SamplingState>> const& x0)
103 {
104  if(x0.size() != numTemps){
105  std::stringstream msg;
106  msg << " In ParallelTempering::SetState, the size of the argument x0 is " << x0.size() << ", but the temperature schedule has " << numTemps << " levels.";
107  throw std::invalid_argument(msg.str());
108  }
109 
110  prevStates = x0;
111 }
112 
113 
114 void ParallelTempering::SetState(std::vector<Eigen::VectorXd> const& x0)
115 {
116  std::vector<std::shared_ptr<SamplingState>> states(numTemps);
117  for(unsigned int i=0; i<numTemps; ++i)
118  states.at(i) = std::make_shared<SamplingState>(x0);
119 
120  SetState(states);
121 }
122 
123 void ParallelTempering::SetState(std::vector<std::vector<Eigen::VectorXd>> const& x0)
124 {
125  if(x0.size() != numTemps){
126  std::stringstream msg;
127  msg << " In ParallelTempering::SetState, the size of the argument x0 is " << x0.size() << ", but the temperature schedule has " << numTemps << " levels.";
128  throw std::invalid_argument(msg.str());
129  }
130 
131  std::vector<std::shared_ptr<SamplingState>> states(numTemps);
132  for(unsigned int i=0; i<numTemps; ++i)
133  states.at(i) = std::make_shared<SamplingState>(x0.at(i));
134 
135  SetState(states);
136 }
137 
138 
139 double ParallelTempering::GetInverseTemp(unsigned int chainInd) const
140 {
141  return problems.at(chainInd)->GetInverseTemp();
142 }
143 
144 std::vector<std::shared_ptr<TransitionKernel>> const& ParallelTempering::Kernels(unsigned int chainInd) const
145 {
146  return kernels.at(chainInd);
147 }
148 
149 
150 std::shared_ptr<MarkovChain> ParallelTempering::Run(){return Run(std::vector<std::vector<Eigen::VectorXd>>());}
151 
152 std::shared_ptr<MarkovChain> ParallelTempering::Run(std::vector<Eigen::VectorXd> const& x0){
153  return Run(StackObjects(x0));
154 }
155 
156 std::shared_ptr<MarkovChain> ParallelTempering::Run(std::vector<std::vector<Eigen::VectorXd>> const& x0) {
157 
158  if( !x0.empty() ) { SetState(x0); }
159 
160  // What is the next iteration that we want to print at
161  const unsigned int printIncr = std::floor(numSamps / double(10));
162  unsigned int nextPrintInd = printIncr;
163  unsigned int nextSwapInd = swapIncr;
164 
165  // Run until we've received the desired number of samples
166  if(printLevel>0)
167  std::cout << "Starting parallel tempering sampler..." << std::endl;
168 
169  while(sampNums.at(numTemps-1) < numSamps)
170  {
171  // Should we print
172  if(sampNums.at(numTemps-1) > nextPrintInd){
173  if(printLevel>0){
174  PrintStatus(" ", sampNums.at(numTemps-1));
175  }
176  nextPrintInd += printIncr;
177  }
178 
179  // Swap every swapIncr steps
180  if(sampNums.at(numTemps-1)>nextSwapInd){
181  SwapStates();
182  nextSwapInd += swapIncr;
183  }
184 
185  // Exploratory steps (independent MCMC kernels for each MCMC chain)
186  Sample();
187 
188  // Adapt the temperatures
189  if(sampNums.at(numTemps-1) > nextAdaptInd){
191  adaptIncr *= 2;
193  }
194  }
195 
196 
197  if(printLevel>0){
198  PrintStatus(" ", numSamps+1);
199  std::cout << "Completed in " << totalTime << " seconds." << std::endl;
200  }
201 
202  return chains.at(numTemps-1);
203 }
204 
205 
207 
208  // We need at least one swap before we can adapt
209  if(attemptedSwaps.minCoeff()==0)
210  return;
211 
212  // Current inverse temperatures
213  Eigen::VectorXd currBetas = CollectInverseTemps();//(numTemps);
214  for(unsigned int i=0; i<numTemps; ++i)
215  currBetas(i) = problems.at(i)->GetInverseTemp();
216 
217  // First, compute a cumulative sum of the average acceptance probabilities
218  Eigen::VectorXd cumProbs(numTemps);
219  cumProbs(0) = 0.0;//cumulativeSwapProb(0) / attemptedSwaps(0);
220  for(unsigned int i=1; i<numTemps; ++i)
221  cumProbs(i) = cumProbs(i-1) + 1.0- (cumulativeSwapProb(i-1) / attemptedSwaps(i-1));
222 
223  if((cumProbs.maxCoeff()-cumProbs.minCoeff())<1e-8)
224  return;
225 
226  for(unsigned int i=1; i<numTemps-1; ++i){
227  double desiredVal = cumProbs(numTemps-1) * double(i)/(numTemps-1);
228 
229  // Use linear interpolation to get the temperature that would achieve this value
230  int j;
231  for(j=0; j<numTemps; ++j){
232  if(cumProbs(j) >= desiredVal)
233  break;
234  }
235  if((j==0)||(j==numTemps)){
236  std::cout << "Cumulative probs: " << cumProbs.transpose() << std::endl;
237  std::cout << "desiredVal: " << desiredVal << std::endl;
238  }
239  assert(j!=numTemps);
240  assert(j>0);
241 
242  double w = (desiredVal - cumProbs(j-1)) / (cumProbs(j) - cumProbs(j-1));
243  double newTemp = w*currBetas(j) + (1.0-w)*currBetas(j-1);
244 
245  // Update the temperature of the problem
246  problems.at(i)->SetInverseTemp(newTemp);
247  }
248 
249  // Reset the recorded swap probabilities
250  successfulSwaps.setZero();
251  cumulativeSwapProb.setZero();
252  attemptedSwaps.setZero();
253 }
254 
256 {
257  Eigen::VectorXd output(numTemps);
258  for(unsigned int i=0; i<numTemps; ++i)
259  output(i) = problems.at(i)->GetInverseTemp();
260  return output;
261 }
262 
263 void ParallelTempering::PrintStatus(std::string prefix, unsigned int currInd) const
264 {
265  std::cout << prefix << int(std::floor(double((currInd - 1) * 100) / double(numSamps))) << "% Complete" << std::endl;
266 
267  if(printLevel>1){
268  std::streamsize ss = std::cout.precision();
269  std::cout.precision(2);
270  std::cout << prefix << " Avg. Swap Probs: " << (cumulativeSwapProb.array() / attemptedSwaps.array()).matrix().transpose() << std::endl;
271  std::cout << prefix << " Inverse Temps: " << CollectInverseTemps().transpose() << std::endl;
272  std::cout.precision(ss);
273  }
274 
275  if(printLevel==2){
276  std::cout << prefix << " Kernel 0:\n";
277  for(int blockInd=0; blockInd<kernels.at(0).size(); ++blockInd){
278  std::cout << prefix << " Block " << blockInd << ":\n";
279  kernels.at(0).at(blockInd)->PrintStatus(prefix + " ");
280  }
281  }else if(printLevel>2){
282  for(int chainInd=0; chainInd<numTemps; ++chainInd){
283  std::cout << prefix << " Kernel " << chainInd << ":\n";
284  for(int blockInd=0; blockInd<kernels.at(chainInd).size(); ++blockInd){
285  std::cout << prefix << " Block " << blockInd << ":\n";
286  kernels.at(chainInd).at(blockInd)->PrintStatus(prefix + " ");
287  }
288  }
289  }
290 }
291 
292 void ParallelTempering::CheckForMeta(std::shared_ptr<SamplingState> const& state)
293 {
294  if(!state->HasMeta("InverseTemp")){
295  std::stringstream msg;
296  msg << "Error in ParallelTempering::SwapStates. Tried swapping states with a state that does not have temperature metadata. The state must have the \"InverseTemp\" metadata, which is typically set in InferenceProblem::LogDensity.";
297  throw std::runtime_error(msg.str());
298  }
299 
300  if(!state->HasMeta("LogLikelihood")){
301  std::stringstream msg;
302  msg << "Error in ParallelTempering::SwapStates. Tried swapping states with a state that does not have likelihood metadata. The state must have the \"LogLikelihood\" metadata, which is typically set in InferenceProblem::LogDensity.";
303  throw std::runtime_error(msg.str());
304  }
305 
306  if(!state->HasMeta("LogPrior")){
307  std::stringstream msg;
308  msg << "Error in ParallelTempering::SwapStates. Tried swapping states with a state that does not have prior metadata. The state must have the \"LogPrior\" metadata, which is typically set in InferenceProblem::LogDensity.";
309  throw std::runtime_error(msg.str());
310  }
311 }
312 
314 
315  // Figure out if this is an even swap or an odd swap
316  unsigned int startInd;
317  if(seoSwaps){
318  startInd = RandomGenerator::GetUniformInt(0,1);
319  }else{
320  startInd = evenSwap ? 0 : 1;
321  }
322 
323  double beta1, beta2, logLikely1, logLikely2, alpha;
324 
325  for(unsigned int i=startInd; i<numTemps-1; i+=2){
326  CheckForMeta(prevStates.at(i));
327  CheckForMeta(prevStates.at(i+1));
328 
329  beta1 = AnyCast( prevStates.at(i)->meta["InverseTemp"] );
330  beta2 = AnyCast( prevStates.at(i+1)->meta["InverseTemp"] );
331 
332  logLikely1 = AnyCast( prevStates.at(i)->meta["LogLikelihood"] );
333  logLikely2 = AnyCast( prevStates.at(i+1)->meta["LogLikelihood"] );
334 
335  alpha = std::exp( (beta1 - beta2)*(logLikely2 - logLikely1));
336 
337  attemptedSwaps(i)++;
338  cumulativeSwapProb(i) += std::min(alpha, 1.0);
339 
340  if(RandomGenerator::GetUniform() < alpha){
341  std::swap(prevStates[i], prevStates[i+1]);
342  prevStates.at(i)->meta["InverseTemp"] = problems.at(i)->GetInverseTemp();
343  prevStates.at(i+1)->meta["InverseTemp"] = problems.at(i+1)->GetInverseTemp();
344  successfulSwaps(i)++;
345  }
346  }
347 
348 
349  // alternate between true and false. If evenSwap is true, this will set it false. If evenSwap is false, this will set it to true.
350  evenSwap = evenSwap ^ true;
351 
352 }
353 
355 
356  assert(kernels.size()==numTemps);
357 
358  bool initError = false;
359  if(prevStates.size()==0){
360  initError = true;
361  }else if(prevStates.at(0)==nullptr){
362  initError = true;
363  }
364 
365  if(initError){
366  std::stringstream msg;
367  msg << "\nERROR in ParallelTempering::Sample. Trying to sample chain but previous (or initial) state has not been set.\n";
368  throw std::runtime_error(msg.str());
369  }
370 
371  auto startTime = std::chrono::high_resolution_clock::now();
372 
373  std::vector<std::vector<std::shared_ptr<SamplingState>>> newStates(numTemps);
374  newStates.resize(numTemps);
375 
376  // Loop over all the different chains
377  for(unsigned int chainInd=0; chainInd<numTemps; ++chainInd){
378  newStates.at(chainInd).resize(kernels.at(chainInd).size());
379 
380  // Loop through each parameter block
381  for(int kernInd=0; kernInd<kernels.at(chainInd).size(); ++kernInd){
382 
383  // Set some metadata that might be needed by the expensive sampling problem
384  prevStates.at(chainInd)->meta["iteration"] = sampNums.at(chainInd);
385  prevStates.at(chainInd)->meta["IsProposal"] = false;
386 
387  // kernel prestep
388  kernels.at(chainInd).at(kernInd)->PreStep(sampNums.at(chainInd), prevStates.at(chainInd));
389 
390  // use the kernel to get the next state(s)
391  newStates.at(chainInd) = kernels.at(chainInd).at(kernInd)->Step(sampNums.at(chainInd), prevStates.at(chainInd));
392  // save when these samples where created
393  double now = std::chrono::duration<double>(std::chrono::high_resolution_clock::now()-startTime).count();
394  for(auto& state : newStates.at(chainInd))
395  state->meta["time"] = now;
396 
397  // kernel post-processing
398  kernels.at(chainInd).at(kernInd)->PostStep(sampNums.at(chainInd), newStates.at(chainInd));
399  }
400  }
401 
402  // add the new states to the SampleCollection (this also increments sampNum and updates prevStates)
403  SaveSamples(newStates);
404 
405  auto endTime = std::chrono::high_resolution_clock::now();
406  totalTime += std::chrono::duration<double>(endTime - startTime).count();
407 }
408 
409 
410 
411 void ParallelTempering::SaveSamples(std::vector<std::vector<std::shared_ptr<SamplingState>>> const& newStates)
412 {
413  assert(newStates.size() == numTemps);
414 
415  for(unsigned int chainInd=0; chainInd<numTemps; ++chainInd){
416  for(unsigned int stateInd=0; stateInd<newStates.at(chainInd).size(); ++stateInd){
417 
418  // Should we save this sample?
419  if( ShouldSave(chainInd, sampNums.at(chainInd)) ) {
420  chains.at(chainInd)->Add(newStates.at(chainInd).at(stateInd));
421 
422  // Save the QOI if the state has one
423  if(newStates.at(chainInd).at(stateInd)->HasMeta("QOI")) {
424  std::shared_ptr<SamplingState> qoi = AnyCast(newStates.at(chainInd).at(stateInd)->meta["QOI"]);
425  QOIs.at(chainInd)->Add(qoi);
426  }
427  }
428 
429  ++sampNums.at(chainInd);
430  // Increment the number of samples and break if we're the posterior chain and we hit the max. number
431  if(chainInd==numTemps-1){
432  if( sampNums.at(numTemps-1)>=numSamps ) { return; }
433  }
434  }
435  }
436 
437  for(unsigned int chainInd=0; chainInd<numTemps; ++chainInd){
438  prevStates.at(chainInd) = newStates.at(chainInd).back();
439  }
440 }
441 
442 
443 std::vector<std::vector<std::shared_ptr<TransitionKernel>>> ParallelTempering::StackKernels(std::vector<std::shared_ptr<TransitionKernel>> const& kerns)
444 {
445  std::vector<std::vector<std::shared_ptr<TransitionKernel>>> newKernels(kerns.size());
446  for(unsigned int i=0; i<kerns.size(); ++i){
447  newKernels.resize(1);
448  newKernels.at(i).at(0) = kerns.at(i);
449  }
450  return newKernels;
451 }
452 
453 Eigen::VectorXd ParallelTempering::ExtractTemps(boost::property_tree::ptree opts)
454 {
455  std::string allTemps = opts.get<std::string>("Inverse Temperatures");
456  std::vector<std::string> tempStrings = StringUtilities::Split(allTemps, ',');
457 
458  Eigen::VectorXd inverseTemps(tempStrings.size());
459  for(unsigned int i=0; i<tempStrings.size(); ++i)
460  inverseTemps(i) = std::stod(tempStrings.at(i));
461 
462  return inverseTemps;
463 }
464 
465 std::vector<std::vector<std::shared_ptr<TransitionKernel>>> ParallelTempering::ExtractKernels(boost::property_tree::ptree opts,
466  std::shared_ptr<InferenceProblem> const& problem)
467 {
468  std::string allKernelString = opts.get<std::string>("Kernel Lists");
469  std::vector<std::string> chainStrings = StringUtilities::Split(allKernelString, ';');
470 
471  std::vector<std::vector<std::shared_ptr<TransitionKernel>>> kernels(chainStrings.size());
472 
473  for(unsigned int chainInd=0; chainInd<chainStrings.size(); ++chainInd){
474  std::vector<std::string> kernelNames = StringUtilities::Split(chainStrings.at(chainInd), ',');
475 
476  unsigned int numBlocks = kernelNames.size();
477  assert(numBlocks>0);
478  kernels.at(chainInd).resize(numBlocks);
479 
480  // Add the block id to a child tree and construct a kernel for each block
481  for(int i=0; i<numBlocks; ++i) {
482  boost::property_tree::ptree subTree = opts.get_child(kernelNames.at(i));
483  subTree.put("BlockIndex",i);
484 
485  auto prob = problem->Clone();
486  prob->AddOptions(subTree);
487  kernels.at(chainInd).at(i) = TransitionKernel::Construct(subTree, prob);
488  }
489  }
490 
491  return kernels;
492 }
493 
494 bool ParallelTempering::ShouldSave(unsigned int chainInd, unsigned int const sampNum) const
495 {
496  return ((sampNum>=burnIn) && (scheduler->ShouldSave(sampNum)));
497 }
Defines an MCMC sampler with multiple chains running on problems with different temperatues.
std::vector< std::vector< std::shared_ptr< TransitionKernel > > > kernels
const unsigned int numTemps
Number of temperatures in the temperature schedule.
std::vector< std::shared_ptr< MarkovChain > > chains
void SaveSamples(std::vector< std::vector< std::shared_ptr< SamplingState >>> const &newStates)
double GetInverseTemp(unsigned int chainInd) const
void SetState(std::vector< std::shared_ptr< SamplingState >> const &x0)
Set the state of the MCMC chain.
static std::vector< std::vector< std::shared_ptr< TransitionKernel > > > StackKernels(std::vector< std::shared_ptr< TransitionKernel >> const &kerns)
std::shared_ptr< SaveSchedulerBase > scheduler
static Eigen::VectorXd ExtractTemps(boost::property_tree::ptree opts)
void AdaptTemperatures()
Adapts the temperatures according to the procedure outlined in Section 5 of .
std::vector< std::shared_ptr< SamplingState > > prevStates
void PrintStatus(unsigned int currInd) const
std::vector< std::shared_ptr< MarkovChain > > QOIs
static std::vector< std::vector< std::shared_ptr< TransitionKernel > > > ExtractKernels(boost::property_tree::ptree opts, std::shared_ptr< InferenceProblem > const &prob)
std::vector< std::shared_ptr< TransitionKernel > > const & Kernels(unsigned int chainInd) const
std::shared_ptr< MarkovChain > Run()
ParallelTempering(boost::property_tree::ptree opts, std::shared_ptr< InferenceProblem > const &problem)
static void CheckForMeta(std::shared_ptr< SamplingState > const &state)
Checks a sampling state to make sure it has the metadata necessary to swap states.
bool ShouldSave(unsigned int chainInd, unsigned int sampNum) const
Returns true if a sample of a particular chain should be saved.
static std::vector< std::vector< T > > StackObjects(std::vector< T > const &kerns)
std::vector< std::shared_ptr< InferenceProblem > > problems
static std::shared_ptr< TransitionKernel > Construct(boost::property_tree::ptree const &pt, std::shared_ptr< AbstractSamplingProblem > problem)
Static constructor for the transition kernel.
Class for easily casting boost::any's in assignment operations.
Definition: AnyHelpers.h:34
std::vector< std::string > Split(std::string str, char delim=',')
Split a string into parts based on a particular character delimiter. Also Strips whitespace from part...
auto get(const nlohmann::detail::iteration_proxy_value< IteratorType > &i) -> decltype(i.key())
Definition: json.h:3956