Source code for stelpar.estimate

# -*- coding: utf-8 -*-




import numpy as np
import pandas as pd
import time
from datetime import datetime
import os

from multiprocessing import Pool
from tqdm import tqdm
from contextlib import nullcontext
import tempfile
import functools

import emcee

from .config import (
    MAGMODELPATH,
    INTERPMAGMODELPATH,
    STDMODELPATH,
    INTERPSTDMODELPATH,
    PARSECMODELPATH
    )
from .photometry import MeasuredPhotometry, SyntheticPhotometry
from .simulation import Probability, MCMC
from .target import Target
from .metadata import InitialConditions, Parallax, Moves, PhotometryMetadata, MetaDataFrame
from .utils import app_mag, app_mag_error, mag_to_flux, load_isochrone, WaitingAnimation, sigma


pd.options.display.float_format = '{:.6g}'.format




__all__ = ['Estimate']




def cleanup_cache_on_exception(method):
    """Decorator to cleanup resources if an exception occurs."""
    @functools.wraps(method)
    def wrapper(self, *args, **kwargs):
        try:
            return method(self, *args, **kwargs)
        except Exception as e:
            self._cleanup()  # Call the cleanup method
            raise  # Optionally re-raise the exception
    return wrapper




[docs] class Estimate(object): """ Automates the data gathering and mcmc simulation to estimate the fit parameters. Parameters ---------- target : str or `Target` object The target whose parameters are estimated via MCMC simulation. isochrone : str, optional If 'mag', uses the isochrone that incorporates the effects of magnetic fields (better for young stars). If 'std', uses the standard, non-magnetic isochrone. The default is 'mag'. interp_method : str, optional If 'true', uses the standard interpolation method of DFInterpolator. If 'nearest' uses nearest-neighbor interpolation. If 'hybrid' uses nearest-neighbor interpolation for age and DFInterpolator for mass. The default is 'true'. include_params : list, optional Which parameters (stellar and/or free parameters) to include in the final output. Fewer parameters may decrease runtime. if `zero_extinction=True`, extinction will not be included even if it is listed here. If `None`, all parameters will be included. The default is `None`. use_synphot : bool, optional Use the built-in synphot methods to calculate extinction or calculate extinction with `numpy` arrays which is faster. The default is `False`. zero_extinction : bool, optional If `True`, set extinction to zero (Av=0). The default is `False`. walker_init_tol : int, optional How many attempts should be made to initialize the walker positions before the simulation starts? The deault is 1000. meas_phot_kwargs : dict, optional Keyword arguments to pass to `:class: stelpar.MeasuredPhotometry`. The default is `None`. pool : `multiprocessing.Pool()` object or similar, optional. This only allows passing a user-controlled pool. Stelpar will try to parallelize regardless. The default is `None`. """ def __init__( self, target, isochrone='mag', interp_method='true', include_params=None, use_synphot=False, zero_extinction=False, walker_init_tol=1000, meas_phot_kwargs=None, pool=None, ): ## setup target-specific metadata # check if target is `Target` object or just string if isinstance(target, Target): self.target = target.name self.coords = target.coords self._ic = target.initial_conditions.loc[self.target].copy() self._user_plx = target.plx self._moves = target.moves self._phot_meta = target.photometry_meta.copy() else: self.target = target self.coords = None self._ic = InitialConditions().initial_conditions.copy() self._user_plx = Parallax().value self._moves = Moves().moves self._phot_meta = PhotometryMetadata().photometry.copy() ## select the appropriate isochrone model self._isochrone = isochrone self._interp_method = interp_method # self._cachedir = CACHEDIR self._cachedir = tempfile.TemporaryDirectory() if self._isochrone.lower() == 'mag': gridcachefile = '.grid_cache_mag' if self._interp_method.lower() == 'true': gridpath = MAGMODELPATH if self._interp_method.lower() == 'nearest' or self._interp_method.lower() == 'hybrid': gridpath = INTERPMAGMODELPATH elif self._isochrone.lower() == 'std': gridcachefile = '.grid_cache_std' if self._interp_method.lower() == 'true': gridpath = STDMODELPATH if self._interp_method.lower() == 'nearest' or self._interp_method.lower() == 'hybrid': gridpath = INTERPSTDMODELPATH elif self._isochrone.lower() == 'parsec': gridcachefile = '.grid_cache_par' if self._interp_method.lower() == 'true': raise ValueError( "invalid interpolation method: 'true' interpolation is too slow on the PARSEC grid. " "Choose a different interpolation method or use a different isochrone grid." ) if self._interp_method.lower() == 'nearest' or self._interp_method.lower() == 'hybrid': gridpath = PARSECMODELPATH else: raise ValueError( "invalid isochrone: can only be 'mag', 'std', or 'parsec'" ) if self._interp_method.lower() == 'true': self._model_grid = load_isochrone(gridpath) if self._isochrone.lower() == 'parsec': isochrone_cols = self._model_grid.columns.values else: isochrone_cols = None self._agelist = None self._masslist = None elif self._interp_method.lower() == 'nearest' or self._interp_method.lower() == 'hybrid': with WaitingAnimation("loading isochrone model grid", delay=0.5): grid = load_isochrone(gridpath) print('') self._agelist = grid.index.get_level_values('age').drop_duplicates() if self._isochrone.lower() == 'parsec': isochrone_cols = grid.columns.values self._masslist = None else: isochrone_cols = None self._masslist = grid.index.get_level_values('mass').drop_duplicates() gridcache = os.path.join(self._cachedir.name, gridcachefile) grid.to_pickle(gridcache) del grid self._model_grid = gridcache ## if an error occurs in any of this, clear the cache try: ## check if synphot is going to be used for the extinction calculation self._use_synphot = use_synphot ## check if extinction is going to be set to zero self._zero_extinction = zero_extinction ## which parameters will be included in the final output self._default_params = ['age', 'mass', 'Av', 'f', 'Teff', 'logg', 'logL', 'radius', 'density'] if include_params is not None: if not set(include_params).issubset(set(self._default_params)): raise KeyError( "Invalid parameter(s) in `include_params`. " f"Possible parameters include {self._default_params}." ) self._params = include_params else: self._params = self._default_params if self._zero_extinction: self._params = [p for p in self._params if p!='Av'] ## check the walker positions initialization tolerance self._walker_init_tol = walker_init_tol ## handle any kwargs for MeasuredPhotometry if meas_phot_kwargs is None: self._meas_phot_kwargs = dict() else: self._meas_phot_kwargs = meas_phot_kwargs ## collect data, initialize classes, and setup functions self._mp = MeasuredPhotometry( self.target, self.coords, photometry_meta=self._phot_meta, user_plx=self._user_plx, isochrone_cols=isochrone_cols, **self._meas_phot_kwargs ) self.photometry, self._termination_message = self._mp.get_data() if self.photometry is False: self._sp, self._prob, self.log_prob_fn = False, False, False else: self._sp = SyntheticPhotometry( self.photometry, model_grid=self._model_grid, interp_method=self._interp_method, extinction_kwargs={'use_synphot':self._use_synphot}, interp_kwargs={'agelist':self._agelist, 'masslist':self._masslist} ) self._prob = Probability(self.photometry, self._sp.photometry_model, self._ic, zero_extinction=self._zero_extinction) self.log_prob_fn = self._prob.log_probability if pool is None: self._pool = Pool() else: self._pool = pool ## metadata parameters for output self._run_date = None self._sim_runtime = None self._posterior_extract_time = None ## EstimateResults object to store results self.results = EstimateResults( target=target, options={ 'isochrone' : self._isochrone, 'interp_method' : self._interp_method, 'use_synphot' : self._use_synphot, 'zero_extinction' : self._zero_extinction, 'walker_init_tol' : self._walker_init_tol, 'meas_phot_kwargs' : self._meas_phot_kwargs, } ) except: self._cleanup() raise
[docs] @cleanup_cache_on_exception def run(self, nwalkers, nsteps, progress=True, verbose=True, backend=None): """ Wrapper for `stelpar.MCMC.run` which runs MCMC simulation using `emcee`. Parameters ---------- nwalkers : int The number of independent walkers in the simulation chain. nsteps : int The number of iterations of the simulation. progress : bool, optional If `True`, provides a progress bar during the sumulation. The default is `True`. verbose : bool, optional If `True`, outputs the current status of the simulation. The defauls is `True`. backend : emcee.backends.HDFBackend, optional Backend to save progress of MCMC. See https://emcee.readthedocs.io/en/stable/tutorials/monitor/ for more information. The default is `None`. Returns ------- sampler : EnsembleSampler `emcee.EnsembleSampler` object containing all estimated values and metadata from the simulation. """ self._run_date = datetime.today().strftime('%Y%m%d') start = time.time() if self.photometry is False: print(self._termination_message) return False if verbose: print(f"\nrunning MCMC for {self.target:s} ({self._isochrone:s}):") walker_context = WaitingAnimation("initializing walker positions", delay=0.5) else: walker_context = nullcontext() time.sleep(1) mcmc = MCMC( nwalkers, nsteps, self.log_prob_fn, self._ic, self._moves, pool=self._pool, zero_extinction=self._zero_extinction, walker_init_tol=self._walker_init_tol, walker_init_context=walker_context, backend=backend ) mcmc.run(progress=progress) time.sleep(1) sampler = mcmc.sampler stop = time.time() delta = stop-start self._sim_runtime = time.strftime('%H:%M:%S', time.gmtime(delta)) options = self.results.options options.update(nwalkers=nwalkers, nsteps=nsteps) self.results.add_kwarg( options=options, sampler=sampler, stats={ 'mean_acceptance_frac' : float(f'{np.mean(sampler.acceptance_fraction):.3f}'), 'median_autocorr_time' : float(f'{np.median(sampler.get_autocorr_time(tol=0)):.3f}'), 'date' : self._run_date, 'sim_runtime' : self._sim_runtime } ) return sampler
[docs] @cleanup_cache_on_exception def posterior(self, sampler, thin=1, discard=0, max_prob=False, force_true_interp=False, verbose=True): """ Calculates full posterior distributions for the fit parameters and others, including radius, Teff, and density. Interpolates estimated magnitudes from age and mass obtained from fit. See https://emcee.readthedocs.io/en/stable/tutorials/line/ for more general information. Parameters ---------- sampler : EnsembleSampler `emcee.EnsembleSampler` object containing all estimated values and metadata from the simulation. thin : int, optional Use every `thin` values of the posterior. The defualt is 1. discard : int, optional Remove (burnin) the first `discard` elements from the posterior. The defult is 0. max_prob : bool, optional Whether to calculate maximum-probability parameters. This can be computationally expensive. The default is `False`. force_true_interp : bool, optional If `True`, the non-fit chains are interpolated using the 'true' interpolation method. If `False` (default), uses the same interpolation method as the MCMC simulation. verbose : bool, optional If `True`, uses print statements to indicate the current status of the simulation. The defauls is `True`. Returns ------- posterior : stelpar.metadata.MetaDataFrame The estimated fit parameters and other stellar parameters, including uncertainties. photometry : stelpar.metadata.MetaDataFrame The measured and estimated magnitudes and other photometric data. posterior_chains : stelpar.metadata.MetaDataFrame The flattened lists of estimated or interpolated values of each parameter (including non-fit parameters) at every step of the simulation (i.e., the posterior distributions). """ start = time.time() if sampler is False: return False, False, False if verbose: print(f"\nextracting posterior for {self.target:s} ({self._isochrone:s}):") samples = sampler.get_chain try: flat_samples = samples(discard=discard, thin=thin, flat=True) except ValueError: flat_samples = samples(flat=True) if max_prob: if verbose: print("\ncalculating max log probability:") log_prob, max_prob_index = self._max_log_probability(flat_samples) if self._zero_extinction: posterior_chains = pd.DataFrame(flat_samples, columns=['age', 'mass', 'f']) else: posterior_chains = pd.DataFrame(flat_samples, columns=['age', 'mass', 'Av', 'f']) if verbose: print("\ngetting other parameter chains:") if force_true_interp: if self._interp_method == 'true': sp = self._sp else: if self._isochrone.lower() == 'mag': grid = load_isochrone(MAGMODELPATH) elif self._isochrone.lower() == 'std': grid = load_isochrone(STDMODELPATH) sp = SyntheticPhotometry( self.photometry, model_grid=grid, interp_method='true' ) else: sp = self._sp ## get the posterior chains of the non-fit parameters other_params = [p for p in self._params if p not in ['age', 'mass', 'Av', 'f', 'density']] if len(other_params) > 0: # try to use pool to parallelize this if possible if self._pool is None: posterior_chains[other_params] = pd.concat( [sp.interpolate_isochrone((posterior_chains['age'][i], posterior_chains['mass'][i])).loc[(posterior_chains['age'][i], posterior_chains['mass'][i]), other_params] for i in tqdm(range(len(flat_samples)))], ignore_index=True) else: map_func = self._pool.imap time.sleep(1) posterior_chains[other_params] = pd.concat( list( res.loc[:, other_params] for res in tqdm( map_func( sp.interpolate_isochrone, ((posterior_chains['age'][i], posterior_chains['mass'][i]) for i in range(len(posterior_chains))) ), total=len(posterior_chains) ) ), ignore_index=True ) time.sleep(1) if 'density' in self._params: posterior_chains['density'] = posterior_chains['mass'] / (posterior_chains['radius']**3) posterior = pd.DataFrame(index=self._params) # calculate the median value (50th percentile), and upper and lower confidence # (84th and 16th percentiles) for each parameter for p in self._params: mc = np.nanpercentile(posterior_chains[p], [16, 50, 84]) q = np.diff(mc) posterior.loc[p, 'median'] = mc[1] if max_prob: posterior.loc[p, 'max_probability'] = posterior_chains.loc[max_prob_index, p] posterior.loc[p, 'uncertainty'] = np.mean([q[0], q[1]]) posterior.loc[p, '+'] = q[1] posterior.loc[p, '-'] = q[0] posterior.index.names = ['parameter'] if self._zero_extinction: # value of Av here doesn't matter as long as `zero_extinction=True` median_photometry_model, teff_lp = self._sp.photometry_model(posterior.loc['age', 'median'], posterior.loc['mass', 'median'], 0, zero_extinction=self._zero_extinction) if max_prob: max_prob_photometry_model, teff_lp = self._sp.photometry_model(posterior.loc['age', 'max_probability'], posterior.loc['mass', 'max_probability'], 0, zero_extinction=self._zero_extinction) else: median_photometry_model, teff_lp = self._sp.photometry_model(posterior.loc['age', 'median'], posterior.loc['mass', 'median'], posterior.loc['Av', 'median']) if max_prob: max_prob_photometry_model, teff_lp = self._sp.photometry_model(posterior.loc['age', 'max_probability'], posterior.loc['mass', 'max_probability'], posterior.loc['Av', 'max_probability']) photometry = self.photometry med_f_abs = posterior.loc['f', 'median'] if max_prob: max_f_abs = posterior.loc['f', 'max_probability'] if median_photometry_model is not False: photometry = photometry.assign( MEDIAN_ABSOLUTE_MAGNITUDE=median_photometry_model['CORRECTED_MAGNITUDE'], MEDIAN_ABSOLUTE_MAGNITUDE_ERROR=lambda x: x['ABSOLUTE_MAGNITUDE_ERROR'].apply(sigma, args=([med_f_abs])), median_apparent_magnitude=lambda x: x['MEDIAN_ABSOLUTE_MAGNITUDE'].apply(app_mag, args=([x['parallax'].values[0]])), median_apparent_magnitude_error=lambda x: x['MEDIAN_ABSOLUTE_MAGNITUDE_ERROR'].apply(app_mag_error, args=([x['parallax'].values[0], x['parallax_error'].values[0]])) ) for band in photometry.index: photometry.loc[(band, ['median_flux', 'median_flux_error'])] = mag_to_flux(*photometry.loc[(band, ['median_apparent_magnitude', 'zeropoint_flux', 'median_apparent_magnitude_error'])]) photometry['median_percent_error'] = 100 * np.abs((photometry['flux'] - photometry['median_flux']) / photometry['flux']) if max_prob and max_prob_photometry_model is not False: photometry = photometry.assign( MAX_PROBABILITY_ABSOLUTE_MAGNITUDE=median_photometry_model['CORRECTED_MAGNITUDE'], MAX_PROBABILITY_ABSOLUTE_MAGNITUDE_ERROR=lambda x: x['ABSOLUTE_MAGNITUDE_ERROR'].apply(sigma, args=([max_f_abs])), max_probability_apparent_magnitude=lambda x: x['MAX_PROBABILITY_ABSOLUTE_MAGNITUDE'].apply(app_mag, args=([x['parallax'].values[0]])), max_probability_apparent_magnitude_error=lambda x: x['MAX_PROBABILITY_ABSOLUTE_MAGNITUDE_ERROR'].apply(app_mag_error, args=([x['parallax'].values[0], x['parallax_error'].values[0]])) ) for band in photometry.index: photometry.loc[(band, ['max_probability_flux', 'max_probability_flux_error'])] = mag_to_flux(*photometry.loc[(band, ['max_probability_apparent_magnitude', 'zeropoint_flux', 'max_probability_apparent_magnitude_error'])]) photometry['max_probability_percent_error'] = 100 * np.abs((photometry['flux'] - photometry['max_probability_flux']) / photometry['flux']) # percent errors should be the same between apparent and ABSOLUTE photometry.index.names = ['band'] ## apply metadata stop = time.time() delta = stop-start self._posterior_extract_time = time.strftime('%H:%M:%S', time.gmtime(delta)) metadata = { 'target' : self.target, 'coordinates' : self.coords, 'isochrone' : self._isochrone, 'interp_method' : self._interp_method, 'use_synphot' : self._use_synphot, 'zero_extinction' : self._zero_extinction, 'force_posterior_true_interp' : force_true_interp, 'nwalkers' : sampler.nwalkers, 'nsteps' : len(samples()), 'discard' : discard, 'thin' : thin, 'mean_acceptance_frac' : float(f'{np.mean(sampler.acceptance_fraction):.3f}'), 'median_autocorr_time' : float(f'{np.median(sampler.get_autocorr_time(tol=0)):.3f}'), 'date' : self._run_date, 'sim_runtime' : self._sim_runtime, 'posterior_extract_time' : self._posterior_extract_time } posterior = MetaDataFrame(posterior.copy(), meta_base_type='posterior', metadata=metadata) photometry = MetaDataFrame(photometry.copy(), meta_base_type='photometry', metadata=metadata) posterior_chains = MetaDataFrame(posterior_chains.copy(), meta_base_type='chains', metadata=metadata) options = self.results.options options.update(discard=discard, thin=thin, force_posterior_true_interp=force_true_interp) stats = self.results.stats stats.update(posterior_extract_time=self._posterior_extract_time) self.results.add_kwarg( posterior=posterior, photometry=photometry, chains=posterior_chains, options=options, stats=stats ) return posterior, photometry, posterior_chains
def _max_log_probability(self, coords, progress=True): """ Returns the maximum of the caluclated log probabilities and its index. Simplified version of :func: `emcee.EnsembleSampler.compute_log_prob` for this use case (no need for blobs). Parameters ---------- coords : numpy.ndarray The position matrix in parameter space for each fit parameter. progress : bool, optional If `True`, provides a progress bar during the calculation. The default is True. Returns ------- log_prob : array The list of calculated log-probability for each coordinate. max_log_prob_index : int The index where `log_prob` is maximized. """ p = coords if progress and self._pool is not None: map_func = self._pool.imap # imap gives tqdm an iterable elif not progress and self._pool is not None: map_func = self._pool.map else: map_func = map if progress: time.sleep(1) results = list(res for res in tqdm(map_func(self.log_prob_fn, [r for r in p]), total=len(p))) time.sleep(1) else: results = list(map_func(self.log_prob_fn, (p[i] for i in range(len(p))))) log_prob = np.array([float(l) for l in results]) if np.any(np.isnan(log_prob)): raise ValueError("Probability function returned NaN") max_log_prob_index = np.argmax(log_prob) return log_prob, max_log_prob_index def _get_log_likelihoods(self, coords, progress=True): """ Returns a distribution of log-likelihood values from a given simulation. Calculates the likelihoods in an analogous way to `self._max_log_probability`. Parameters ---------- coords : numpy.ndarray The position matrix in parameter space for each fit parameter. progress : bool, optional If `True`, provides a progress bar during the calculation. The default is True. Returns ------- log_likelihood : array The list of calculated log-likelihood for each coordinate. """ p = coords if progress and self._pool is not None: map_func = self._pool.imap # imap gives tqdm an iterable elif not progress and self._pool is not None: map_func = self._pool.map else: map_func = map if progress: time.sleep(1) results = list(res for res in tqdm(map_func(self._prob.log_likelihood, [r for r in p]), total=len(p))) time.sleep(1) else: results = list(map_func(self._prob.log_likelihood, (p[i] for i in range(len(p))))) log_likelihood = np.array([float(l) for l in results]) if np.any(np.isnan(log_likelihood)): raise ValueError("Likelihood function returned NaN") return log_likelihood def __del__(self): self._cleanup() def _cleanup(self): self._cachedir.cleanup()
class EstimateResults(object): def __init__( self, target=None, sampler=None, posterior=None, photometry=None, chains=None, options=None, stats=None ): self._input = dict() self._output = dict() self._input['target'] = target self._input['options'] = options self._output['sampler'] = sampler self._output['posterior'] = posterior self._output['photometry'] = photometry self._output['chains'] = chains self._output['stats'] = stats def __repr__(self): target = self.target if isinstance(target, str): target_repr = f"{target!r}" elif isinstance(target, Target): target_repr = f"<{target.__class__.__module__}.{target.__repr__()}>" elif target is None: target_repr = repr(target) sampler = self.sampler if isinstance(sampler, emcee.EnsembleSampler): sampler_repr = f"<{sampler.__class__.__module__}.{sampler.__class__.__name__}>" elif sampler is None: sampler_repr = f"{sampler!r}" posterior = self.posterior if isinstance(posterior, pd.DataFrame): posterior_repr = (f"<{posterior.__class__.__module__}.{posterior.__class__.__name__} " f"[{posterior.shape[0]} rows x {posterior.shape[1]} columns]>" ) elif posterior is None: posterior_repr = f"{posterior!r}" photometry = self.photometry if isinstance(photometry, pd.DataFrame): photometry_repr = (f"<{photometry.__class__.__module__}.{photometry.__class__.__name__} " f"[{photometry.shape[0]} rows x {photometry.shape[1]} columns]>" ) elif photometry is None: photometry_repr = f"{photometry!r}" chains = self.chains if isinstance(chains, pd.DataFrame): chains_repr = (f"<{chains.__class__.__module__}.{chains.__class__.__name__} " f"[{chains.shape[0]} rows x {chains.shape[1]} columns]>" ) elif chains is None: chains_repr = f"{chains!r}" return ( f"{self.__class__.__name__}" "(" f"target={target_repr}, " f"sampler={sampler_repr}, " f"posterior={posterior_repr}, " f"photometry={photometry_repr}, " f"chains={chains_repr}" ")" ) def __str__(self): units = { 'age' : 'Myr', 'mass' : 'M_Sun', 'Av' : 'mag', 'f' : 'mag', 'Teff' : 'K', 'radius' : 'R_Sun', 'logg' : 'log(cm/s^2)', 'logL' : 'log(L_Sun)', 'density' : 'M_Sun/R_Sun^3' } target = self.target if isinstance(target, str): target_info = f"{target!r}" prior_info = '' bounds_info = '' elif isinstance(target, Target): if target.coords is None: target_info = f"{target.name!r}" else: target_info = f"{target.name!r} at [{target.coords.to_string(style='hmsdms')}]" prior_info = '\n - priors:' priors = target.initial_conditions.prior.loc[target.name] useful_priors = [(param, *priors.loc[param], units[param]) for param in priors.index if np.nan not in priors.loc[param]] for tup in useful_priors: prior_info = prior_info + f"\n - {tup[0]}: {tup[1]} +/- {tup[2]} {tup[3]}" bounds_info = '\n - bounds:' bounds = target.initial_conditions.loc[target.name, 'bounds'] useful_bounds = [(param, *bounds.loc[param], units[param]) for param in bounds.index] for tup in useful_bounds: bounds_info = bounds_info + f"\n - {tup[0]}: ({tup[1]}, {tup[2]}) {tup[3]}" elif target is None: return "\nNo results" options = self.options if options is None: options_info = '' else: options_info = '' for key in options: if type(options[key]) is not dict: options_info = options_info + f"\n - {key}: {options[key]!r}" else: options_info = options_info + f"\n - {key}:" for subkey in options[key]: options_info = options_info + f"\n - {subkey}: {options[key][subkey]!r}" stats = self.stats if stats is None: stats_info = '' else: stats_info = '\n - stats:' for key in stats: stats_info = stats_info + f"\n - {key}: {stats[key]!r}" photometry = self.photometry if photometry is None or photometry is False: bands_info = '' else: bands_info = f'\n - bands ({len(photometry.index)}):' for band in photometry.index: bands_info = bands_info + f"\n - {band!r}" posterior = self.posterior if posterior is None or posterior is False: params_info = '' else: params_info = '\n - stellar parameters:' for param in posterior.index: params_info = params_info + ( f"\n - {param}: {posterior.loc[param, 'median']:.3g} " f"+/- {posterior.loc[param, 'uncertainty']:.3g} {units[param]}" ) return ( f"\n{self.__class__.__name__} for {target_info}" "\n\n" "** Setup **" f"\n{options_info}" f"\n{prior_info}" f"\n{bounds_info}" "\n\n" "** Results **" f"\n{stats_info}" f"\n{bands_info}" f"\n{params_info}" "\n\n" "*****************************************************" ) @property def target(self): return self._input['target'] @property def options(self): return self._input['options'] @property def sampler(self): return self._output['sampler'] @property def posterior(self): return self._output['posterior'] @property def photometry(self): return self._output['photometry'] @property def chains(self): return self._output['chains'] # alias of self.chains @property def posterior_chains(self): return self.chains @property def stats(self): return self._output['stats'] def add_kwarg(self, **kwargs): for kw in kwargs: if kw in self._input.keys(): self._input[kw] = kwargs[kw] elif kw in self._output.keys(): self._output[kw] = kwargs[kw] else: raise KeyError( f"{kw!r} not a valid keyword argument" )