Source code for npDoseResponse.utils

# -*- coding: utf-8 -*-

# Author: Yikun Zhang
# Last Editing: May 19, 2024

# Description: This script contains the utility functions for the main functions 
# in our package.

import numpy as np
from .rbf import KernelRetrieval

#=======================================================================================#


[docs] def RoTBWLocalPoly(Y, X, kernT="epanechnikov", kernS="epanechnikov", C_h=7, C_b=3): ''' Compute the rule-of-thumb bandwidth selector in Eq.(A1) of Yang and Tschernig (1999). Parameters ---------- Y: (n,)-array The outcomes of n observations. X: (n,d+1)-array The first column of X is the treatment/exposure variable, while the other d columns are confounding variables of n observations. kernT, kernS: str The names of kernel functions for the treatment/exposure variable and confounding variables. (Default: "epanechnikov".) C_h,C_b: float The scaling factors for the rule-of-thumb bandwidth parameters. (Default: C_h=7, C_b=3.) Return ---------- h: float The rule-of-thumb bandwidth parameter for the treatment/exposure variable. b: (d,)-array The rule-of-thumb bandwidth vector for the confounding variables. ''' n = X.shape[0] ## Number of data points d = X.shape[1] - 1 kernT, sigmaK_sq, K_sq = KernelRetrieval(kernT) # Apply the rule-of-thumb bandwidth selector in Eq.(A1) of Yang and Tschernig (1999) p_coeff = np.polyfit(X[:,0], Y, 4) sec_deriv = 12*p_coeff[0]*X[:,0] + 6*p_coeff[1]*X[:,0] + 2*X[:,0] C_fun = np.mean(sec_deriv**2) T = X[:,0].reshape(-1,1) lhs = np.concatenate([np.ones((n,1)), X, T**2, T**3, T**4], axis=1) rcond = np.finfo(lhs.dtype).eps * max(*lhs.shape) beta = np.linalg.lstsq(lhs, Y, rcond=rcond)[0] # Compute the residual sum of squares resid = np.sum((Y - np.dot(lhs, beta))**2) * (np.max(X[:,0]) - np.min(X[:,0]))/ (n-5) sigmaK_sq = sigmaK_sq**2 # ROT h = ((K_sq*resid)/(4*n*sigmaK_sq*C_fun))**(1/5) * (n**(d/(5*(d+5)))) * C_h kernS, sigmaK_sq, K_sq = KernelRetrieval(kernS) sec_deriv = np.zeros((n, d)) for i in range(1, d+1): # Fit a fourth-order polynomial to each confounding variable p_coeff = np.polyfit(X[:,i], Y, 4) sec_deriv[:,i-1] = 12*p_coeff[0]*X[:,i] + 6*p_coeff[1]*X[:,i] + 2*X[:,i] C_fun = np.sum(np.diag(np.dot(sec_deriv.T, sec_deriv))/n) lhs = np.concatenate([np.ones((n,1)), X, T**2, T**3, T**4], axis=1) # lhs = np.concatenate([np.ones((n,1)), X[:,1:]], axis=1) rcond = np.finfo(lhs.dtype).eps * max(*lhs.shape) beta = np.linalg.lstsq(lhs, Y, rcond=rcond)[0] resid = np.sum((Y - np.dot(lhs, beta))**2) * (np.max(X[:,1:], axis=0) - np.min(X[:,1:], axis=0)) / (n-5) sigmaK_sq = sigmaK_sq**2 K_sq = K_sq**d b = ((K_sq*d*resid)/(4*n*sigmaK_sq*C_fun))**(1/(d+5)) * C_b return h, b
[docs] def HatMatrix(X, degree=2, deriv_ord=1, h=None, b=None, print_bw=True, kernT="epanechnikov", kernS="epanechnikov"): ''' Compute the hat matrix of the local polynomial regression when it is viewed as a linear smoother. Parameters ---------- X: (n,d+1)-array The first column of X is the treatment/exposure variable, while the other d columns are confounding variables of n observations. degree: int Degree of local polynomials. (Default: degree=2.) deriv_ord: int The order of the estimated derivative the conditional mean outcome function. (Default: deriv_ord=1. Then, it estimates the partial derivative of the conditional mean outcome function with respect to the treatment variable.) h,b: float The bandwidth parameters for the treatment/exposure variable and confounding variables. print_bw: boolean The indicator of whether the current bandwidth parameters should be printed to the console. (Default: print_bw=True.) kernT, kernS: str The names of kernel functions for the treatment/exposure variable and confounding variables. (Default: "epanechnikov".) Return ---------- hat_mat: (n,n)-array The hat matrix. ''' n = X.shape[0] ## Number of data points d = X.shape[1] - 1 x_eval = X.copy() if print_bw: print("The current bandwidth for treament variable in the local polynomial regression is "+ str(h) + ".\n") print("The current bandwidth for confounding variables in the local polynomial regression is "+ str(b) + ".\n") kernT, sigmaK_sq, K_sq = KernelRetrieval(kernT) kernS, sigmaK_sq, K_sq = KernelRetrieval(kernS) hat_mat = np.zeros((n, n)) for i in range(x_eval.shape[0]): weights = kernT((X[:,0] - x_eval[i,0])/h) * np.prod(kernS((X[:,1:] - x_eval[i,1:])/b), axis=1) # Filter out the data points with zero weights to speed up regressions with kernels of compact support inds = np.where(np.abs(weights) > 1e-26)[0] X_dat = np.zeros((n, degree+1+d)) # X_dat[:,:(degree+1)] = (((X[:,0] - x_eval[i,0])/h).reshape(-1,1))**(np.arange(degree+1)) for p in range(degree + 1): X_dat[:,p] = ((X[:,0] - x_eval[i,0])/h)**p X_dat[:,(degree+1):] = (X[:,1:] - x_eval[i,1:])/b X_dat = X_dat[inds,:] W = np.diag(weights[inds]) design_mat = np.dot(np.dot(X_dat.T, W), X_dat) try: hat_mat_samp = np.dot(np.linalg.inv(design_mat), np.dot(X_dat.T, W)) except: # Add some small quantities to the diagonal matrix to prevent the singularity of the matrix design_mat = design_mat + 1e-16*np.eye(design_mat.shape[0]) hat_mat_samp = np.dot(np.linalg.inv(design_mat), np.dot(X_dat.T, W)) hat_mat[i,inds] = hat_mat_samp[deriv_ord,:] return hat_mat