Source code for curepy.retrieval_methods.base

"""Base class for retrieval methods"""

from abc import ABC, abstractmethod
from curepy.container.retrieval_input import RetrievalInput
import numpy as np
from curepy.utilities.maths import lnlike
import warnings
from typing import Optional, List


[docs] class BaseRetrieval(ABC): """Base retrieval object.""" @abstractmethod def _run_retrieval(self, retrieval_inputs: RetrievalInput): """ Abstract run_retrieval method to be implemented by each retrieval subclass :param retrieval_inputs: Object encapsulating all inputs needed for the retrieval. """ pass
[docs] def run_retrieval(self, retrieval_inputs: RetrievalInput, *args, **kwargs): """ Execute the retrieval algorithm. :param retrieval_inputs: Object encapsulating all inputs needed for the retrieval. """ result = self._run_retrieval(retrieval_inputs, *args, **kwargs) return result
[docs] def reshape_outputs( self, x: np.ndarray, u_x: np.ndarray, corr_x: Optional[np.ndarray], ) -> tuple: """ Reshape flat retrieval outputs back to the initial-guess shape. :param x: Flat retrieved state vector. :param u_x: Flat uncertainties of the retrieved state vector. :param corr_x: Correlation matrix of the retrieved state vector, or ``None``. Reshaping of correlation matrices is not yet implemented; a warning is emitted when ``corr_x`` is not ``None``. :returns: Tuple of ``(x, u_x, corr_x)`` reshaped to the initial-guess shape. """ x_shape = self.retrieval_input.measurement_function_obj.initial_guess.shape x = x.reshape(x_shape) u_x = u_x.reshape(x_shape) if corr_x is not None: warnings.warn("Reshaping of correlation matrices is not yet implemented") # corr_x = corr_x.reshape(x.shape + (np.prod(x_shape),)) return x, u_x, corr_x
[docs] @staticmethod def generate_theta_0(ig: np.ndarray) -> np.ndarray: """ Convert the initial guess into a flat 1-D state vector. :param ig: Initial guess, which may be a scalar, 1-D array, or 2-D array (one row per measurement location). :returns: Flat 1-D initial state vector. """ if hasattr(ig, "__len__"): if hasattr(ig[0], "__len__"): try: theta_0 = np.concatenate(ig).flatten() except: theta_0 = np.concatenate([x.flatten() for x in ig]) else: theta_0 = np.array(ig).flatten() else: theta_0 = np.array([ig]) return theta_0
def _check_retrieval_input(self) -> None: """ Validate and fill in default retrieval sub-objects. Builds a default (no ancillary parameters) ancillary object if one has not been provided, and builds a flat uniform prior over all retrieval parameters if no prior has been set. """ # format and define retrieval input if self.retrieval_input.ancillary_obj is None: self.retrieval_input.build_ancillary() if self.retrieval_input.prior_obj is None: self.retrieval_input.build_prior( prior_shape=["uniform"] * len(self.retrieval_input.measurement_function_obj.initial_guess), prior_params=[{"minimum": -np.inf, "maximum": np.inf}] * len(self.retrieval_input.measurement_function_obj.initial_guess), )
[docs] def find_chisum( self, theta: np.ndarray, repeat_dims: List[int] = [], ) -> float: """ Compute the chi-squared cost between the forward model and observations. Evaluates the measurement function at ``theta``, computes the residual with respect to the observations, and returns the weighted sum of squared residuals using the inverse covariance matrix (or diagonal uncertainties when no covariance is available). :param theta: Current retrieval state vector. :param repeat_dims: Indices of repeat dimensions along which to accumulate the chi-squared sum. Only zero or one repeat dimensions are currently supported. :returns: Chi-squared cost value. """ modelled_data = ( self.retrieval_input.measurement_function_obj.measurement_function_x( theta, self.retrieval_input.ancillary_obj.b ).flatten() ) diff = modelled_data - self.retrieval_input.measurement_obj.y_flat # Only normalize by u_y_flat if it's available if self.retrieval_input.measurement_obj.u_y_flat is not None: diff_norm = diff / self.retrieval_input.measurement_obj.u_y_flat else: diff_norm = diff if np.isfinite(np.sum(diff)): if self.retrieval_input.measurement_obj.cholesky is None: chisq = np.sum( (diff_norm) ** 2 ) # this is equivalent to using an identity matrix for the inverse covariance, which is appropriate when only uncorrelated uncertainties are available else: if len(repeat_dims) == 0: y = self.retrieval_input.measurement_obj.W @ diff_norm chisq = y.T @ y elif len(repeat_dims) == 1: sum = 0 for i in range(diff.shape[repeat_dims[0]]): diff_norm_i = np.take(diff_norm, i, repeat_dims[0]) y = self.retrieval_input.measurement_obj.W @ diff_norm_i sum += y.T @ y chisq = sum else: raise ValueError( "Methods for multiple repeat dimensions are not yet implemented," ) else: print("The difference between model and observations is infinite") chisq = np.inf if chisq < 0: raise ValueError( "The chi-squared cost is negative, which should not be possible. Check the inputs and the measurement function for errors." ) return chisq
[docs] def lnprob(self, theta: np.ndarray) -> float: """ Compute the log posterior probability for state vector ``theta``. Evaluates the log prior and the log likelihood and returns their sum. Returns ``-np.inf`` if the prior is not finite at ``theta``. :param theta: Current retrieval state vector. :returns: Log posterior probability. """ lp_prior = self.retrieval_input.prior_obj.lnprior( theta, )() if not all(np.isfinite(lp_prior)): return -np.inf lp = lnlike(self.find_chisum(theta, repeat_dims=[])) return np.sum(lp_prior) + lp
if __name__ == "__main__": pass