# -*- coding: utf-8 -*-
from scipy.optimize import curve_fit
from scipy.stats import beta, linregress
import numpy as np
from beinf import beinf
[docs]class taqm():
'''
Contains the methods needed for performing trend-adjusted quantile mapping (TAQM).
It relies on the methods from the :class:`beinf` class.
Methods Summary:
----------------
``calibrate(x_params,y_params,x_t_params,X,Y,X_t,trust_sharp_fcst=False)``
calibrated forecast BEINF parameters and calibrated forecast ensemble
``fit_data(X,Y,X_t)``
BEINF parameters for the TAMH, TAOH, and the raw forecast
``lin(a1,b1,T)``
linear equation values
``piecewise_lin(a1,b1,a2,b2,t_b,T)``
piece-wise linear equation values
``trend_adjust_1p(data_all,tau_t,t)``
trend-adjusted values using a single period
``trend_adjust_2p(data_all,tau_t,t,t_b=1999)``
trend-adjusted values using two periods
``unpack_params(array)``
the individual parameters stored in array
'''
[docs] def lin(self,a1,b1,T):
r"""Evaluates the piece-wise linear equation
.. math::
z = a_1 T + b_1
:label: pw1
at :math:`T`.
Args:
a1 (float):
The slope in :eq:`pw1`.
b1 (float):
The z-intercept in
:eq:`pw1`.
t_b (float):
The breakpoint for :math:`z` in :eq:`pw1`.
T (float or ndarray):
The point(s) at which :eq:`pw1` is evaluated.
Returns: z (float or ndarray):
The value of :eq:`pw1` at each point T. Values
less than 0 and greater than 1
are clipped to 0 and 1, respectively.
"""
#Linear equation
z = a1*T + b1
#don't allow for sic values less than zero or greater than 1
if np.any(z<0.0):
if isinstance(z,np.float64):
z = 0.0
else:
z[z<0.0] = 0.0
if np.any(z>1.0):
if isinstance(z,np.float64):
z = 1.0
else:
z[z>1.0] = 1.0
return z
[docs] def piecewise_lin(self,a1,b1,a2,b2,t_b,T):
r"""Evaluates the piece-wise linear equation
.. math::
z = \begin{cases}
a_1 T + b_1, & T<t_b \\
a_2 T + b_2, & T>t_b
\end{cases}
:label: pw2
at :math:`T`.
Args:
a1, a2 (floats):
The slopes in :eq:`pw2`.
b1, b2 (floats):
The z-intercepts in
:eq:`pw2`.
t_b (float):
The breakpoint for :math:`z` in :eq:`pw2`.
T (float or ndarray):
The point(s) at which :eq:`pw2` is evaluated.
Returns: z (float or ndarray):
The value of :eq:`pw2` at each point T. Values
less than 0 and greater than 1
are clipped to 0 and 1, respectively.
"""
if isinstance(T,np.ndarray):
#If T is an, z and T are arrays
z = np.zeros(T.shape)
z[T<t_b] = a1*T[T<t_b] + b1
z[T>=t_b] = a2*T[T>=t_b] + b2
else:
#If for a single year T and y are single value integers or floats
if T<t_b:
z = a1*T + b1
else:
z = a2*T + b2
#don't allow for sic values less than zero or greater than 1
if np.any(z<0.0):
if isinstance(z,np.float64):
z = 0.0
else:
z[z<0.0] = 0.0
if np.any(z>1.0):
if isinstance(z,np.float64):
z = 1.0
else:
z[z>1.0] = 1.0
return z
[docs] def trend_adjust_1p(self,data_all,tau_t,t):
"""Linearly detrend data_all and re-center about its
linear least squares fit evaluated at
:math:`T=t`. One may want to use
this trend adjustment over :func:`~taqm.trend_adjust_2p`
if the hindcast record is over the more recent record.
Args:
data_all (ndarray):
A time series of size N, or an ensemble time series of size NxM,
where M is the number of ensemble members.
tau_t (ndarray):
All hindcast years exluding the forecast year.
t (float):
The forecast year.
Returns: data_ta (ndarray):
Trend-adjusted values with same shape as data_all.
"""
if np.ndim(data_all)>1:
# If data_all is shape N by M, take mean across axis M observations (ensemble mean)
# for subsequent calculation of the linear least squares solution
data = np.mean(data_all,axis=1)
else:
data = np.copy(data_all)
if np.all(data_all==0.0) or np.all(data_all==1.0):
# If all values in data_all are either 0 or 1,
# no trend adjustment is needed
data_ta = np.copy(data_all)
else:
#compute least squares paramaters
m, b = linregress(tau_t,data)[:2]
#non-stationary mean for year t
z_tilde_nsm = self.lin(m,b,t)
if z_tilde_nsm<0.0:
z_tilde_nsm = 0.0
if z_tilde_nsm>1.0:
z_tilde_nsm = 1.0
#detrend data and re-center
if np.ndim(data_all)>1:
data_d = data_all - self.lin(m,b,tau_t)[:,np.newaxis]
data_ta = data_d + z_tilde_nsm
else:
data_d = data_all - self.lin(m,b,tau_t)
data_ta = data_d + z_tilde_nsm
#change values below zero (above one) to zero (one)
data_ta[data_ta<0.0] = 0.0
data_ta[data_ta>1.0] = 1.0
return data_ta
[docs] def trend_adjust_2p(self,data_all,tau_t,t,t_b=1999):
"""Piece-wise linearly detrend data_all and re-center it about its
non-linear least squares fit to Eq. :eq:`pw2` evaluated at
:math:`T=` `t`. This method carries
out the trend-adjustment technique described in section 5a
of Dirkson et al, 2018. The non-linear least squares fit constrains
Eq. :eq:`pw2` to be continuous at :math:`T=t_b`.
Args:
data_all (ndarray):
A time series of size N, or an ensemble time series of size NxM,
where M is the number of ensemble members.
tau_t (ndarray):
All hindcast years exluding the forecast year `t`.
t (float):
The forecast year.
t_b (float):
The breakpoint year in Eq. :eq:`pw2`.
Returns: data_ta (ndarray):
Trend-adjusted values with same shape as data_all.
"""
if np.ndim(data_all)>1:
# If data_all is shape N by M, take mean across axis M observations (ensemble mean)
# for subsequent calculation of the linear least squares solution
data = np.mean(data_all,axis=1)
else:
data = np.copy(data_all)
if np.all(data_all==0.0) or np.all(data_all==1.0):
# If all values in data_all are either 0 or 1,
# no trend adjustment is needed
data_ta = np.copy(data_all)
else:
# else, compute non-linear least squares
# parameters of the piece-wise equation
# and recenter data_all about this solution evaluated
# at time=year
def piecewise_regress(T,z):
# function for computing least squares parameters
def f_min(T,a1,b1,a2):
# function for the piecewise linear equation
# with continuity at yr_bp
z_tilde = np.zeros(len(T))
z_tilde[T<=t_b] = a1*T[T<=t_b] + b1
z_tilde[T>t_b] = a2*T[T>t_b] + (a1-a2)*t_b + b1
return z_tilde
popt, pcov = curve_fit(f_min,T,z,p0=[1,1,1])
a1,b1,a2 = popt
b2 = (a1-a2)*t_b + b1
return a1,b1,a2,b2
#compute least squares paramaters
a1, b1, a2, b2 = piecewise_regress(tau_t,data)
#non-stationary mean for year t
z_tilde_nsm = self.piecewise_lin(a1,b1,a2,b2,t_b,t)
if z_tilde_nsm<0.0:
z_tilde_nsm = 0.0
if z_tilde_nsm>1.0:
z_tilde_nsm = 1.0
#detrend data and re-center
if np.ndim(data_all)>1:
data_d = data_all - self.piecewise_lin(a1,b1,a2,b2,t_b,tau_t)[:,np.newaxis]
data_ta = data_d + z_tilde_nsm
else:
data_d = data_all - self.piecewise_lin(a1,b1,a2,b2,t_b,tau_t)
data_ta = data_d + z_tilde_nsm
#change values below zero (above one) to zero (one)
data_ta[data_ta<0.0] = 0.0
data_ta[data_ta>1.0] = 1.0
return data_ta
[docs] def fit_params(self,X,Y,X_t):
'''
Fits X (the TAMH ensemble time series), Y (the TAOH time series), and X_t (the raw forecast ensemble) to the BEINF
distribution. This method carries out the fiting procedure described in
section 5b of Dirkson et al, 2018.
Args:
X (ndarray):
The TAMH ensemble time series of size NxM.
Y (ndarray):
The TAOH time series of size N.
X_t (ndarray):
The raw forecast ensemble of size M.
Returns: x_params (ndarray), y_params (ndarray), x_t_params (ndarray):
The shape parameters :math:`a`,
:math:`b`, :math:`p`, :math:`q`
for the BEINF distribution fitted to each
X, Y, and X_t (see :meth:`beinf.fit`).
'''
#Fit TAMH to the beinf distribution
a_x, b_x, p_x, q_x = beinf.fit(X.flatten())
x_params = np.array([a_x, b_x, p_x, q_x])
#fit TAOH to the beinf distribution
a_y, b_y, p_y, q_y = beinf.fit(Y)
y_params = np.array([a_y, b_y, p_y, q_y])
#fit forecast to the beinf distribution
a_x_t, b_x_t, p_x_t, q_x_t = beinf.fit(X_t)
#store parameters in an array
x_t_params = np.array([a_x_t, b_x_t, p_x_t, q_x_t])
return x_params, y_params, x_t_params
[docs] def calibrate(self,x_params,y_params,x_t_params,X,Y,X_t,trust_sharp_fcst=False):
r'''
Calibrates the raw forecast BEINF paramaters :math:`a_{x_t}`,
:math:`b_{x_t}`, :math:`p_{x_t}` and :math:`q_{x_t}`. This method
carries out the calibration step described in section 5c in Dirkson et al, 2018.
Args:
x_params (ndarray):
An array containing the four parameters of the BEINF distribution
for the TAMH ensemble time series.
y_params (ndarray):
An array containing the four parameters of the BEINF distribution
for the TAOH time series.
x_t_params (ndarray):
An array containing the four parameters of the BEINF distribution
for the raw forecast ensemble.
trust_sharp_fcst (boolean, optional):
`True` to revert to the raw forecast when
:math:`p_{x_t}=1`. `False`
to revert to the TAOH distribution when :math:`p_{x_t}=1`.
Returns: x_t_cal_params (ndarray), X_t_cal_beta (ndarray):
x_t_cal_params contains the four BEINF
distribution parameters for the calibrated forecast: :math:`a_{\hat{x}_t}`, :math:`b_{\hat{x}_t}`,
:math:`p_{\hat{x}_t}` and :math:`q_{\hat{x}_t}`. When :math:`a_{\hat{x}_t}` and :math:`b_{\hat{x}_t}`
could not be fit, they are returned as :code:`a=np.inf` and :code:`b=np.inf`.
X_t_cal_beta contains the calibrated forecast ensemble (np.inf replaces replace 0's and 1's in the
ensemble). This array contains all :code:`np.inf` values when any of :math:`p_y=1`,
:math:`p_x=1`, or :math:`p_{x_t}=1`, or when all parameters in x_t_cal_params are defined (none are equal to :code:`np.inf`).
'''
a_x, b_x, p_x, q_x = x_params[0], x_params[1], x_params[2], x_params[3]
a_y, b_y, p_y, q_y = y_params[0], y_params[1], y_params[2], y_params[3]
a_x_t, b_x_t, p_x_t, q_x_t = x_t_params[0], x_t_params[1], x_t_params[2], x_t_params[3]
#function to avoid returning nan when dividing by zero
def safediv(numerator,denominator):
if denominator==0.0:
return 0.0
else:
return numerator/denominator
#calibrate forecast parameters
if p_y==1.0 or p_x==1.0 or p_x_t==1.0:
# if any of TAMH, TAOH, or the forecast are entirely
# comprised of zeros and ones, calibration cannot be done.
# choices are:
if np.logical_and(p_x_t==1.0,trust_sharp_fcst==True):
# trust the raw forecast values when they are perfectly sharp
a_x_t_cal, b_x_t_cal, p_x_t_cal, q_x_t_cal = np.copy(a_x_t), np.copy(b_x_t), np.copy(p_x_t), np.copy(q_x_t)
else:
# trust TAOH
a_x_t_cal, b_x_t_cal, p_x_t_cal, q_x_t_cal = np.copy(a_y), np.copy(b_y), np.copy(p_y), np.copy(q_y)
#in this case, there are no "beta" claibrated forecast values
X_t_cal_beta = np.inf*np.ones(len(X_t))
else:
#calibrate the bernoulli portion of the beinf forecast distribution
p_x_t_cal = max(min(p_x_t + p_y - p_x,1.),0.)
num = max(min(p_x_t*q_x_t + p_y*q_y - p_x*q_x,1.),0.)
den = np.copy(p_x_t_cal)
q_x_t_cal = max(min(safediv(num,den),1.),0.)
if p_x_t_cal==1.0:
# if the calibration of p and q result in p=1 then there
# should be no "beta" calibrated forecast values
a_x_t_cal,b_x_t_cal = np.inf, np.inf
X_t_cal_beta = np.inf*np.ones(len(X_t))
else:
# Get the values from the forecast ensmeble TAMH, and
# TAOH that are between zero and one
X_t_beta = np.copy(X_t[np.logical_and(X_t!=0.0,X_t!=1.0)])
X_beta = X[np.logical_and(X!=0.0,X!=1.0)].flatten()
Y_beta = Y[np.logical_and(Y!=0.0,Y!=1.0)]
# calibrate forecast SIC values
# between zero and one using quantile mapping
if a_x!=np.inf and a_y!=np.inf and a_x_t!=np.inf:
rv_x_beta = beta(a_x, b_x, loc=0.0, scale=1.0 - 0.0)
rv_y_beta = beta(a_y, b_y, loc=0.0, scale=1.0 - 0.0)
# if none of cases 2-4 were encountered
# for the raw forecast values
X_t_cal_beta = rv_y_beta.ppf(rv_x_beta.cdf(X_t_beta))
else:
#revert to empirical-based quantile mapping
X_t_cal_beta = np.percentile(Y_beta,beinf.ecdf(X_t_beta,X_beta)*100.,interpolation='linear')
cases = beinf.check_cases(X_t_cal_beta)
#Solving of Eq. 8 (quantile mapping)
if cases==False:
a_x_t_cal, b_x_t_cal = beinf.fit_beta(X_t_cal_beta)
X_t_cal_beta = np.inf*np.ones(len(X_t))
else:
a_x_t_cal, b_x_t_cal = np.inf, np.inf
if len(X_t)!=len(X_t_cal_beta):
X_t_cal_beta = np.append(X_t_cal_beta,np.inf*np.ones(len(X_t)-len(X_t_cal_beta)))
x_t_cal_params = np.array([a_x_t_cal, b_x_t_cal, p_x_t_cal, q_x_t_cal])
#Return BEINF distribution parameters
return x_t_cal_params, X_t_cal_beta
[docs] def unpack_params(self,array):
'''
Unpacks the individual parameters a, b, p, q from array.
Args:
array (ndarray):
Array containing the four parameters a,b,p,q for
a BEINF distribution.
Returns: a,b,p,q (floats):
The individual parameters for the BEINF disribution.
'''
a,b,p,q = map(lambda i: array[i], range(len(array)))
return a,b,p,q