"""
The mcmc submodule.
===================
Contains the MCMC which implements the Marcov Chain Monte Carlo method to
sample from a posterior distribution of a retrieval problem.
References
==========
[1] Andrew Gelman et al., Bayesian Data Analysis, 3rd Edition
"""
import logging
import numpy as np
logger = logging.getLogger(__name__)
def r_factor(stats):
"""
This computes the R-factor as defined in 'Bayesian Data Analysis'
(Getlman et al.), Chapter 11. If the simulations have converged,
the result of `r_factor` should be close to one.
Args:
stats: A list of arrays of statistics (scalar summaries) computed from
serveral MCMC runs.
"""
n = stats[0].size
m = len(stats)
vars = np.array([np.var(s) for s in stats])
means = np.array([np.mean(s) for s in stats])
mean = np.mean(means)
b = n * np.var(means)
w = np.mean(vars)
var_p = (n - 1) / n * w + b / n
return np.sqrt(var_p / w)
def variogram(stats, t):
"""
Helper function that computes the variogram for a given lag t. The
variogram is the mean of the mean squared sum of deviations of lag t of
each sequence.
Args:
stats: A list of sequences
"""
m = len(stats)
n = stats[0].size
return sum([np.sum((s[t+1:] - s[:n-t-1])**2) for s in stats]) / (m * (n - t))
def split(stats):
"""
Splits a list of sequences in halves.
Sequences generated from MCMC runs should be split in half in order to be
able to properly diagnose mixing.
Args:
stats: A list of sequences
"""
n = stats[0].size
return [s[i * (n // 2) : (i + 1) * (n // 2)]
for i in range(2) for s in stats]
def autocorrelation(stats):
"""
Estimates the autocorrelation of a list of sequences from a MCMC run.
This uses formula (11.7) in [1] to approximate the autocorrelation function
for lags [0, n // 2].
"""
n = stats[0].size
vars = np.array([np.var(s) for s in stats])
means = np.array([np.mean(s) for s in stats])
b = n * np.var(means)
w = np.mean(vars)
var_p = (n - 1) / n * w + b / n
rho = np.zeros(n // 2)
for t in range(n // 2):
vt = variogram(stats, t)
rho[t] = 1.0 - 0.5 * vt / var_p
return rho
def effective_sample_size(stats):
"""
This estimates the effective sample size of independent samples from the
posterior distribution using formula (11.8) in [1].
"""
n = stats[0].size
m = len(stats)
vars = np.array([np.var(s) for s in stats])
means = np.array([np.mean(s) for s in stats])
b = n * np.var(means)
w = np.mean(vars)
var_p = (n - 1) / n * w + b / n
rho = np.zeros(n // 2)
for t in range(n // 2):
vt = variogram(stats, t)
rho[t] = 1.0 - 0.5 * vt / var_p
if t > 2 and (rho[t-1] + rho[t-2]) < 0.0:
break
return m * n / (1.0 + 2.0 * sum(rho[:t-2]))
[docs]
class MCMC:
"""
The MCMC class represents an ongoing MCMC simulation. An MCMC object can be
used to run a given number of MC steps, test the results for convergence
and perform further calculations if necessary.
"""
@staticmethod
def _check_input(vars, py, stats):
"""
Helper method that checks arguments provided to `__init__`.
"""
if not type(vars) == list and len(vars) > 0:
raise Exception("Argument vars must be of type list and have length"
+ "> 0.")
for v in vars:
if not type(v) == list and len(v) == 3:
raise Exception("Elements of argument vars must be tuples of "
+ " mutable variables, jump functions and prior"
+ " densities.")
if not callable(v[1]):
raise Exception("Non-callable object given as prior density.")
if not callable(v[2]):
raise Exception("Non-callable object given as jump function.")
if not callable(py):
raise Exception("Non-callable object given for conditional"
+ "probability p(y | x).")
for s in stats:
if not callable(s):
raise Exception("Non-callable object given as statistic.")
[docs]
def __init__(self, vars, y, ly, stats = []):
"""
To construct an MCMC object, the user must provide a list of variables,
prior distributions and likelihood functions, the measurement vector,
a measurement likelihood and optionally a set of stats to evaluate at
each step.
Args:
vars: A list of triples (v,l,j) containing a triple of a variable
v, a prior likelihood function l so that `l(v)` yields a value
proportional to the logarithm of the prior probability of value
of v, and finally a jump function j, so that
`v_new = j(ws, v_old)` yields a new value for the variable v
and manipulates the :class:`~typhon.arts.workspace.Workspace`
object ws so that a subsequent call to the yCalc WSM will compute
the simulated measurement corresponding to the new value `v_new`
of the variable v.
y: The measured vector of brightness temperatures which must be
consistent with the ARTS WSV y
ly: The measurement likelihood such that `ly(y, yf)` gives
the log of the probability that deviations between `y` and
`yf` are due to measurement errors.
stats: This is a list of statstics such that for each element
s `s(ws)` is a scalar value computed on a given workspace.
"""
MCMC._check_input(vars, ly, stats)
self.y = y
self.vars = vars
self.ly = ly
self.stats = stats
[docs]
def eval_l(self, ws):
"""
Evaluate the likelihood of the current state. This method
simply computes and sums up the measurement likelihood and
the prior likelihoods.
Args:
ws: A :class:`~typhon.arts.workspace.Workspace` object consistent
with the current state of the MCMC run.
"""
lxs = np.zeros(len(self.vars))
ly = self.ly(self.y, ws.y.value)
for i, (x, l, _) in enumerate(self.vars):
lxs[i] = l(x)
return ly, lxs
[docs]
def step(self, ws, ly_old, lxs_old):
"""
The performs a Gibbs step for a given variable. This will generate
a candidate for the given variable using the corresponding jump
function, call `yCalc()` on the
:class:`~typhon.arts.workspace.Workspace` object `ws`
Args:
ws: A :class:`~typhon.arts.workspace.Workspace` object consistent
with the current state of the MCMC run.
ly_old: The measurement likelihood before the execution of
new step
lxs_old: The prior likelihoods for each of the variables
that are being retrieved.
"""
accepted = np.zeros((1, len(self.vars)), dtype=bool)
lxs = np.zeros(lxs_old.shape)
for i, ((x,l,j), lx_old) in enumerate(zip(self.vars, lxs_old)):
# Generate new step
x_new = j(ws, x)
ws.yCalc()
lx_new = l(x_new)
ly_new = self.ly(self.y, ws.y.value)
# Check Acceptance
r = np.exp(lx_new + ly_new - lx_old - ly_old)
if r > 1.0 or np.random.random() < r:
x[:] = x_new
accepted[0, i] = True
ly_old = ly_new
lxs[i] = lx_new
else:
j(ws, x, revert = True)
accepted[0, i] = False
lxs[i] = lxs_old[i]
return accepted, ly_old, lxs
[docs]
def print_log(self, step, acceptance):
"""
Prints log output to stdout.
Args:
step: The number of the current step
acceptance: The array of bools tracking the acceptances for
each simulation step.
"""
if step > 0:
ar = sum(acceptance / step)
else:
ar = 0.0
logger.info("MCMC Step " + str(step) + ": " + "ar = " + str(ar))
[docs]
def warm_up(self, ws, x0s, n_steps):
"""
Run a simulation of `n_steps` on a given workspace `ws`
starting from start values `x0s`.
Args:
ws: A :class:`~typhon.arts.workspace.Workspace` object setup so that
only a call to the `yCalc` WSM is necessary to perform a
simulation
x0s: A list of start values which is used to initialized the
workspace by calling `j(x0)` for each `x0` in `x0s` and `j` is
the jump function of the corresponding variable.
"""
ls = np.zeros(n_steps + 1)
stats = np.zeros((n_steps + 1, len(self.stats)))
hist = [np.zeros((n_steps + 1,) + x.shape) for x,l,j in self.vars]
acceptance = np.zeros((n_steps, len(self.vars)), dtype=bool)
lxs = np.zeros(len(self.vars))
ly = 0.0
# Set initial state.
for i,((x, l, j), x0) in enumerate(zip(self.vars, x0s)):
x[:] = j(ws, x0)
hist[i][0,:] = x[:]
# Evaluate likelihood
ws.yCalc()
ly, lxs = self.eval_l(ws)
ls[0] = ly + sum(lxs)
# Evaluate statistics
for i,s in enumerate(self.stats):
stats[0, i] = s(ws)
for i1 in range(n_steps):
acceptance[i1, :], ly, lxs = self.step(ws, ly, lxs)
for i2, h in enumerate(hist):
hist[i2][i1+1, :] = self.vars[i2][0][:]
for i2,s in enumerate(self.stats):
stats[i1 + 1, i2] = s(ws)
if (i1 % 10) == 0:
self.print_log(i1, acceptance)
self.ly_old = ly
self.lxs_old = lxs
self.stats_old = stats[-1,:]
self.hist_old = [h[-1,:] for h in hist]
return hist, stats, ls, acceptance
[docs]
def run(self, ws, n_steps):
"""
Run a simulation of `n_steps` on a given workspace `ws`
starting from start values `x0s`.
Args:
ws: A :class:`~typhon.arts.workspace.Workspace` object setup so that
only a call to the `yCalc` WSM is necessary to perform a simulation
x0s: A list of start values which is used to initialized the
workspace by calling `j(x0)` for each `x0` in `x0s` and `j` is
the jump function of the corresponding variable.
"""
ls = np.zeros(n_steps)
stats = np.zeros((n_steps, len(self.stats)))
hist = [np.zeros((n_steps,) + x.shape) for x,l,j in self.vars]
acceptance = np.zeros((n_steps, len(self.vars)), dtype=bool)
ly = self.ly_old
lxs = self.lxs_old
for i1 in range(n_steps):
acceptance[i1, :], ly, lxs = self.step(ws, ly, lxs)
for i2, h in enumerate(hist):
hist[i2][i1, :] = self.vars[i2][0][:]
for i2,s in enumerate(self.stats):
stats[i1, i2] = s(ws)
if (i1 % 10) == 0:
self.print_log(i1, acceptance)
self.ly_old = ly
self.lxs_old = lxs
self.stats_old = stats[-1,:]
self.hist_old = [h[-1,:] for h in hist]
return hist, stats, ls, acceptance