Source code for curepy.retrieval_methods.base
"""Base class for retrieval methods"""
from abc import ABC, abstractmethod
from curepy.container.retrieval_input import RetrievalInput
import numpy as np
from curepy.utilities.maths import lnlike
import warnings
from typing import Optional, List
[docs]
class BaseRetrieval(ABC):
"""Base retrieval object."""
@abstractmethod
def _run_retrieval(self, retrieval_inputs: RetrievalInput):
"""
Abstract run_retrieval method to be implemented by each retrieval subclass
:param retrieval_inputs: Object encapsulating all inputs needed for
the retrieval.
"""
pass
[docs]
def run_retrieval(self, retrieval_inputs: RetrievalInput, *args, **kwargs):
"""
Execute the retrieval algorithm.
:param retrieval_inputs: Object encapsulating all inputs needed for
the retrieval.
"""
result = self._run_retrieval(retrieval_inputs, *args, **kwargs)
return result
[docs]
def reshape_outputs(
self,
x: np.ndarray,
u_x: np.ndarray,
corr_x: Optional[np.ndarray],
) -> tuple:
"""
Reshape flat retrieval outputs back to the initial-guess shape.
:param x: Flat retrieved state vector.
:param u_x: Flat uncertainties of the retrieved state vector.
:param corr_x: Correlation matrix of the retrieved state vector,
or ``None``. Reshaping of correlation matrices is not yet
implemented; a warning is emitted when ``corr_x`` is not ``None``.
:returns: Tuple of ``(x, u_x, corr_x)`` reshaped to the
initial-guess shape.
"""
x_shape = self.retrieval_input.measurement_function_obj.initial_guess.shape
x = x.reshape(x_shape)
u_x = u_x.reshape(x_shape)
if corr_x is not None:
warnings.warn("Reshaping of correlation matrices is not yet implemented")
# corr_x = corr_x.reshape(x.shape + (np.prod(x_shape),))
return x, u_x, corr_x
[docs]
@staticmethod
def generate_theta_0(ig: np.ndarray) -> np.ndarray:
"""
Convert the initial guess into a flat 1-D state vector.
:param ig: Initial guess, which may be a scalar, 1-D array, or
2-D array (one row per measurement location).
:returns: Flat 1-D initial state vector.
"""
if hasattr(ig, "__len__"):
if hasattr(ig[0], "__len__"):
try:
theta_0 = np.concatenate(ig).flatten()
except:
theta_0 = np.concatenate([x.flatten() for x in ig])
else:
theta_0 = np.array(ig).flatten()
else:
theta_0 = np.array([ig])
return theta_0
def _check_retrieval_input(self) -> None:
"""
Validate and fill in default retrieval sub-objects.
Builds a default (no ancillary parameters) ancillary object if one
has not been provided, and builds a flat uniform prior over all
retrieval parameters if no prior has been set.
"""
# format and define retrieval input
if self.retrieval_input.ancillary_obj is None:
self.retrieval_input.build_ancillary()
if self.retrieval_input.prior_obj is None:
self.retrieval_input.build_prior(
prior_shape=["uniform"]
* len(self.retrieval_input.measurement_function_obj.initial_guess),
prior_params=[{"minimum": -np.inf, "maximum": np.inf}]
* len(self.retrieval_input.measurement_function_obj.initial_guess),
)
[docs]
def find_chisum(
self,
theta: np.ndarray,
repeat_dims: List[int] = [],
) -> float:
"""
Compute the chi-squared cost between the forward model and observations.
Evaluates the measurement function at ``theta``, computes the
residual with respect to the observations, and returns the weighted
sum of squared residuals using the inverse covariance matrix (or
diagonal uncertainties when no covariance is available).
:param theta: Current retrieval state vector.
:param repeat_dims: Indices of repeat dimensions along which to
accumulate the chi-squared sum. Only zero or one repeat
dimensions are currently supported.
:returns: Chi-squared cost value.
"""
modelled_data = (
self.retrieval_input.measurement_function_obj.measurement_function_x(
theta, self.retrieval_input.ancillary_obj.b
).flatten()
)
diff = modelled_data - self.retrieval_input.measurement_obj.y_flat
# Only normalize by u_y_flat if it's available
if self.retrieval_input.measurement_obj.u_y_flat is not None:
diff_norm = diff / self.retrieval_input.measurement_obj.u_y_flat
else:
diff_norm = diff
if np.isfinite(np.sum(diff)):
if self.retrieval_input.measurement_obj.cholesky is None:
chisq = np.sum(
(diff_norm) ** 2
) # this is equivalent to using an identity matrix for the inverse covariance, which is appropriate when only uncorrelated uncertainties are available
else:
if len(repeat_dims) == 0:
y = self.retrieval_input.measurement_obj.W @ diff_norm
chisq = y.T @ y
elif len(repeat_dims) == 1:
sum = 0
for i in range(diff.shape[repeat_dims[0]]):
diff_norm_i = np.take(diff_norm, i, repeat_dims[0])
y = self.retrieval_input.measurement_obj.W @ diff_norm_i
sum += y.T @ y
chisq = sum
else:
raise ValueError(
"Methods for multiple repeat dimensions are not yet implemented,"
)
else:
print("The difference between model and observations is infinite")
chisq = np.inf
if chisq < 0:
raise ValueError(
"The chi-squared cost is negative, which should not be possible. Check the inputs and the measurement function for errors."
)
return chisq
[docs]
def lnprob(self, theta: np.ndarray) -> float:
"""
Compute the log posterior probability for state vector ``theta``.
Evaluates the log prior and the log likelihood and returns their sum.
Returns ``-np.inf`` if the prior is not finite at ``theta``.
:param theta: Current retrieval state vector.
:returns: Log posterior probability.
"""
lp_prior = self.retrieval_input.prior_obj.lnprior(
theta,
)()
if not all(np.isfinite(lp_prior)):
return -np.inf
lp = lnlike(self.find_chisum(theta, repeat_dims=[]))
return np.sum(lp_prior) + lp
if __name__ == "__main__":
pass