Source code for npDoseResponse.utils

# -*- coding: utf-8 -*-

# Author: Yikun Zhang
# Last Editing: January 15, 2025

# Description: This script contains the utility functions for the main functions 
# in our package.

import numpy as np
from .rbf import KernelRetrieval
import scipy.integrate as integrate
from sklearn.preprocessing import PolynomialFeatures

#=======================================================================================#


[docs] def RoTBWLocalPoly(Y, X, kernT="epanechnikov", kernS="epanechnikov", C_h=10, C_b=15): ''' Compute the rule-of-thumb bandwidth selector in Eq.(A1) of Yang and Tschernig (1999). Parameters ---------- Y: (n,)-array The outcomes of n observations. X: (n,d+1)-array The first column of X is the treatment/exposure variable, while the other d columns are confounding variables of n observations. kernT, kernS: str The names of kernel functions for the treatment/exposure variable and confounding variables. (Default: "epanechnikov".) C_h,C_b: float The scaling factors for the rule-of-thumb bandwidth parameters. (Default: C_h=7, C_b=3.) Return ---------- h: float The rule-of-thumb bandwidth parameter for the treatment/exposure variable. b: (d,)-array The rule-of-thumb bandwidth vector for the confounding variables. ''' n = X.shape[0] ## Number of data points d = X.shape[1] - 1 kernT, sigmaK_sq, K_sq = KernelRetrieval(kernT) # Apply the rule-of-thumb bandwidth selector in Eq.(A1) of Yang and Tschernig (1999) p_coeff = np.polyfit(X[:,0], Y, 4) sec_deriv = 12*p_coeff[0]*X[:,0] + 6*p_coeff[1]*X[:,0] + 2*X[:,0] C_fun = np.mean(sec_deriv**2) T = X[:,0].reshape(-1,1) lhs = np.concatenate([np.ones((n,1)), X, T**2, T**3, T**4], axis=1) rcond = np.finfo(lhs.dtype).eps * max(*lhs.shape) beta = np.linalg.lstsq(lhs, Y, rcond=rcond)[0] # Compute the residual sum of squares resid = np.sum((Y - np.dot(lhs, beta))**2) * (np.max(X[:,0]) - np.min(X[:,0]))/ (n-5) sigmaK_sq = sigmaK_sq**2 # ROT # h = ((K_sq*resid)/(4*n*sigmaK_sq*C_fun))**(1/5) * (n**(d/(5*(d+5)))) * C_h h = ((K_sq*resid)/(4*n*sigmaK_sq*C_fun))**(1/5) * C_h kernS, sigmaK_sq, K_sq = KernelRetrieval(kernS) sec_deriv = np.zeros((n, d)) for i in range(1, d+1): # Fit a fourth-order polynomial to each confounding variable p_coeff = np.polyfit(X[:,i], Y, 4) sec_deriv[:,i-1] = 12*p_coeff[0]*X[:,i] + 6*p_coeff[1]*X[:,i] + 2*X[:,i] C_fun = np.sum(np.diag(np.dot(sec_deriv.T, sec_deriv))/n) lhs = np.concatenate([np.ones((n,1)), X, T**2, T**3, T**4], axis=1) # lhs = np.concatenate([np.ones((n,1)), X[:,1:]], axis=1) rcond = np.finfo(lhs.dtype).eps * max(*lhs.shape) beta = np.linalg.lstsq(lhs, Y, rcond=rcond)[0] resid = np.sum((Y - np.dot(lhs, beta))**2) * (np.max(X[:,1:], axis=0) - np.min(X[:,1:], axis=0)) / (n-5) sigmaK_sq = sigmaK_sq**2 K_sq = K_sq**d # b = ((K_sq*d*resid)/(4*n*sigmaK_sq*C_fun))**(1/(d+5)) * C_b b = ((K_sq*d*resid)/(4*sigmaK_sq*C_fun))**(1/(d+5)) * n**(-1/(d+1)) * C_b return h, b
[docs] def HatMatrix(X, degree=2, deriv_ord=1, h=None, b=None, print_bw=True, kernT="epanechnikov", kernS="epanechnikov"): ''' Compute the hat matrix of the local polynomial regression when it is viewed as a linear smoother. Parameters ---------- X: (n,d+1)-array The first column of X is the treatment/exposure variable, while the other d columns are confounding variables of n observations. degree: int Degree of local polynomials. (Default: degree=2.) deriv_ord: int The order of the estimated derivative the conditional mean outcome function. (Default: deriv_ord=1. Then, it estimates the partial derivative of the conditional mean outcome function with respect to the treatment variable.) h,b: float The bandwidth parameters for the treatment/exposure variable and confounding variables. print_bw: boolean The indicator of whether the current bandwidth parameters should be printed to the console. (Default: print_bw=True.) kernT, kernS: str The names of kernel functions for the treatment/exposure variable and confounding variables. (Default: "epanechnikov".) Return ---------- hat_mat: (n,n)-array The hat matrix. ''' n = X.shape[0] ## Number of data points d = X.shape[1] - 1 x_eval = X.copy() if print_bw: print("The current bandwidth for treament variable in the local polynomial regression is "+ str(h) + ".\n") print("The current bandwidth for confounding variables in the local polynomial regression is "+ str(b) + ".\n") kernT, sigmaK_sq, K_sq = KernelRetrieval(kernT) kernS, sigmaK_sq, K_sq = KernelRetrieval(kernS) hat_mat = np.zeros((n, n)) for i in range(x_eval.shape[0]): weights = kernT((X[:,0] - x_eval[i,0])/h) * np.prod(kernS((X[:,1:] - x_eval[i,1:])/b), axis=1) # Filter out the data points with zero weights to speed up regressions with kernels of compact support inds = np.where(np.abs(weights) > 1e-26)[0] X_dat = np.zeros((n, degree+1+d)) # X_dat[:,:(degree+1)] = (((X[:,0] - x_eval[i,0])/h).reshape(-1,1))**(np.arange(degree+1)) for p in range(degree + 1): X_dat[:,p] = ((X[:,0] - x_eval[i,0])/h)**p X_dat[:,(degree+1):] = (X[:,1:] - x_eval[i,1:])/b X_dat = X_dat[inds,:] W = np.diag(weights[inds]) design_mat = np.dot(np.dot(X_dat.T, W), X_dat) try: hat_mat_samp = np.dot(np.linalg.inv(design_mat), np.dot(X_dat.T, W)) except: # Add some small quantities to the diagonal matrix to prevent the singularity of the matrix design_mat = design_mat + 1e-16*np.eye(design_mat.shape[0]) hat_mat_samp = np.dot(np.linalg.inv(design_mat), np.dot(X_dat.T, W)) hat_mat[i,inds] = hat_mat_samp[deriv_ord,:] return hat_mat
[docs] def BndKern(x_qry, kern, deriv_ord=0, alpha=1, bnd='left'): ''' Generalized jackknife boundary kernel. Parameters ---------- x_qry: (m,)-array The coordinates of m query points in the 1-dimensional Euclidean space. kern: python function The kernel function. deriv_ord: int The order of the derivative estimator. (Default: deriv_ord=0, which is for nonparametric density or curve estimation.) alpha: float The truncated proportion of the kernel support (0 <= alpha <= 1). (Default: alpha=1, which recovers the original kernel function for the interior points.) bnd: str Indicator of whether the input point is within the left or right boundary of the support. (Default: bnd='left'.) Return ---------- res: (m,)-array The boundary kernel function evaluated at m query points. ''' kappa_lst = np.zeros((2*deriv_ord+3,)) if bnd == 'left': sign = -1 else: sign = 1 for i in range(deriv_ord, 3*deriv_ord + 3): kappa_lst[i-deriv_ord] = integrate.quad(lambda x: ((sign*x)**i)*kern(x), -np.inf, alpha)[0] mat = np.zeros((deriv_ord + 2, deriv_ord + 2)) for i in range(deriv_ord + 2): mat[i,:] = kappa_lst[i:(i+deriv_ord+2)] B = np.zeros((deriv_ord + 2, )) B[deriv_ord] = 1 coef = np.linalg.solve(mat, B) res = 0 for j in range(deriv_ord+2): res += coef[j]*(x_qry**j)*kern(x_qry) return res
[docs] def KDE1D(x, data, kern='epanechnikov', h=None): ''' One-dimensional kernel density estimation with generalized jackknife boundary corrections (Jones 1993). Parameters ---------- x: (m,)-array The coordinates of m query points in the 1-dim Euclidean space. data: (n,)-array The coordinates of n random sample points in the d-dimensional Euclidean space. kern: str The name of the kernel function. (Default: "epanechnikov".) h: float The bandwidth parameter. (Default: h=None. Then the Silverman's rule of thumb is applied; see Chen et al.(2016) for details.) Return ---------- f_hat: (m,)-array The corresponding kernel density estimates at m query points. ''' n = data.shape[0] ## Number of data points d = 1 ## Dimension of the data kern, sigmaK_sq, K_sq = KernelRetrieval(kern) if h is None: # Apply Silverman's rule of thumb to select the bandwidth parameter h = (4/(d+2))**(1/(d+4))*(n**(-1/(d+4)))*np.std(data) print("The current bandwidth for KDE is "+ str(h) + ".\n") f_hat = np.zeros((x.shape[0], )) for i in range(x.shape[0]): if x[i] < np.min(data) or x[i] > np.max(data): f_hat[i] = 0 elif x[i] < np.min(data) + h: alpha = (x[i] - np.min(data))/h f_hat[i] = np.mean(BndKern((data - x[i])/h, kern=kern, deriv_ord=0, alpha=alpha, bnd='left') / h) elif x[i] > np.max(data) - h: alpha = (np.max(data) - x[i])/h f_hat[i] = np.mean(BndKern((data - x[i])/h, kern=kern, deriv_ord=0, alpha=alpha, bnd='right') / h) else: f_hat[i] = np.mean(kern((data - x[i])/h) / h) return f_hat
[docs] def KDE(x, data, kern='gaussian', h=None): ''' The d-dimensional Euclidean kernel density estimator. Parameters ---------- x: (m,d)-array The coordinates of m query points in the d-dim Euclidean space. data: (n,d)-array The coordinates of n random sample points in the d-dimensional Euclidean space. kern: str The name of the kernel function. (Default: "gaussian".) h: float The bandwidth parameter. (Default: h=None. Then the Silverman's rule of thumb is applied. See Chen et al.(2016) for details.) Return ---------- f_hat: (m,)-array The corresponding kernel density estimates at m query points. ''' n = data.shape[0] ## Number of data points if len(data.shape) == 1: d = 1 data = data.reshape(-1,1) x = x.reshape(-1,1) else: d = data.shape[1] ## Dimension of the data kern, sigmaK_sq, K_sq = KernelRetrieval(kern) if h is None: # Apply Silverman's rule of thumb to select the bandwidth parameter h = (4/(d+2))**(1/(d+4))*(n**(-1/(d+4)))*np.std(data, axis=0) print("The current bandwidth is "+ str(h) + ".\n") f_hat = np.zeros((x.shape[0], )) for i in range(x.shape[0]): f_hat[i] = np.mean(np.exp(np.sum(-((x[i,:] - data)/h)**2, axis=1)/2))/ ((2*np.pi)**(d/2)*np.prod(h)) return f_hat
[docs] def CondDenEstKDE(Y, X, reg_mod, y_eval=None, x_eval=None, kern='epanechnikov', b=None): ''' Conditional density estimation by applying the kernel density estimator (KDE) on the regression residuals. Parameters ---------- Y: (n,)-array The outcome variables of n observations. X: (n,d)-array The d-dimensional covariates of n observations. reg_mod: scikit-learn model or any python model that can use ".fit()" and ".predict()" The conditional mean outcome (or regression) model of Y given X. y_eval: (m,)-array The outcome variables at which we evaluate the estimated conditional densities. x_eval: (m,d)-array The covariates at which we evaluate the estimated conditional densities. kern: str The name of the kernel function. (Default: kern="epanechnikov".) b: float The bandwidth parameter for KDE. (Default: b=None.) Return ---------- cond_est: (m,)-array The estimated conditional densities at the m query points. ''' if y_eval is None: y_eval = Y if x_eval is None: x_eval = X reg_hat = reg_mod.fit(X, Y) y_pred = reg_hat.predict(x_eval) cond_est = KDE1D(x=y_eval-y_pred, data=y_eval-y_pred, kern=kern, h=b) return cond_est
[docs] def CondDenEst(Y, X, reg_mod, y_eval=None, x_eval=None, kern='gaussian', b=None, poly_ext=False): ''' Conditional density estimation via nonparametric regression on the kernel-smoothed outcome variables. Parameters ---------- Y: (n,)-array The outcome variables of n observations. X: (n,d)-array The d-dimensional covariates of n observations. reg_mod: scikit-learn model or any python model that can use ".fit()" and ".predict()" The conditional mean outcome (or regression) model of Y given X. y_eval: (m,)-array The outcome variables on which we evaluate the estimated conditional densities. x_eval: (m,d)-array The covariates on which we evaluate the estimated conditional densities. kern: str The name of the kernel function. (Default: kern="gaussian".) b: float The bandwidth parameter for KDE. (Default: b=None.) poly_ext: boolean The indicator of whether polynomial features are generated from the current covariates. (Default: poly_ext=False.) Return ---------- cond_est: (m,)-array The estimated conditional densities at the m query points. ''' n = X.shape[0] if b is None: # Apply Silverman's rule of thumb to select the bandwidth parameter b = (4/3)**(1/5)*(n**(-1/5))*np.std(Y) print("The current bandwidth for the conditional density estimator is "+ str(b) + ".\n") if y_eval is None: y_eval = Y if x_eval is None: x_eval = X kern, sigmaK_sq, K_sq = KernelRetrieval(kern) cond_est = np.zeros((y_eval.shape[0],)) if poly_ext: X_inter = PolynomialFeatures(degree=3, interaction_only=False, include_bias=True).fit_transform(X) x_eval_inter = PolynomialFeatures(degree=3, interaction_only=False, include_bias=True).fit_transform(x_eval) else: X_inter = X x_eval_inter = x_eval # Estimate the conditional density by fitting the kernelized outcomes for i in range(y_eval.shape[0]): Y_kern = kern((Y - y_eval[i])/b)/b reg_hat = reg_mod.fit(X_inter, Y_kern) y_pred = reg_hat.predict(x_eval_inter[i,:].reshape(1,-1)) cond_est[i] = y_pred[0] return cond_est