Source code for debiasing.debias_prog

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# Author: Yikun Zhang
# Last Editing: September 3, 2023

# Description: This script contains the key functions for our debiasing method.

import numpy as np
import scipy.stats
from sklearn.linear_model import Lasso
from sklearn.model_selection import KFold
import statsmodels.api as sm
import cvxpy as cp

#=======================================================================================#

[docs] def ScaledLasso(X, Y, lam0=None, return_lamb=False): ''' Scaled sparse linear regression (Translated from R package 'scalreg'). Parameters ---------- X: (n,d)-array The input design matrix. Y: (n,)-array The input outcome (or response) vector. lam0: str or float The regularization parameter, which can be "univ", "quantile" or other specified numerical value. (Default: lam0=None so that if d<10^6, lam0="quantile"; otherwise, lam0="univ".) return_lamb: logical Whether the final value of the regularization parameter is returned or not. (Default: return_lamb=False.) Return ---------- beta_est: (d,)-array The estimated regression coefficient by scaled Lasso. sigma_new: float The estimated noise level (i.e., the standard deviation of the noise variable). lam: float (optional) The final value of the regularization parameter. ''' n = X.shape[0] d = X.shape[1] if lam0 is None: if d > 1e6: lam0 = 'univ' else: lam0 = 'quantile' if lam0 == 'univ': lam0 = np.sqrt(2*np.log(d)/n) if lam0 == 'quantile': L = 0.1 Lold = 0 while abs(L - Lold) > 1e-4: k = L**4 + 2*(L**2) Lold = L L = -scipy.stats.norm.ppf(min([k/d, 0.99])) L = (L + Lold)/2 if d == 1: L = 0.5 lam0 = np.sqrt(2/n) * L sigma_int = 0.1 sigma_new = 5 flag = 0 while (abs(sigma_int - sigma_new) > 1e-4) and (flag <= 500): flag += 1 sigma_int = sigma_new lam = lam0 * sigma_int lasso_fit = Lasso(alpha=lam, fit_intercept=False, tol=1e-9, max_iter=int(1e7)).fit(X, Y) sigma_new = np.sqrt(np.mean((Y - lasso_fit.predict(X))**2)) beta_est = lasso_fit.coef_ if return_lamb: return beta_est, sigma_new, lam else: return beta_est, sigma_new
[docs] def LassoRefit(X, Y, x, method='scaled_lasso', return_beta=False): ''' Lasso refitting method. Parameters ---------- X: (n,d)-array The input design matrix. Y: (n,)-array The input outcome (or response) vector. x: (d,)-array The current query point. method: string The actual method for fitting the Lasso regression. It can be either 'sqrt_lasso' or the default value 'scaled_lasso'. return_beta: logical Whether the final value of the estimated regression coefficient is returned or not. (Default: return_beta=False.) Return ---------- m_refit: float The estimated regression function by Lasso refitting method. beta_pilot: (d,)-array (optional) The estimated regression coefficient by scaled Lasso. asym_sd: float The estimated (asymptotic) standard deviation of the Lasso refitting estimator. sigma_hat: float The estimated noise level (i.e., the standard deviation of the noise variable). df: int The degree of freedom of the t-distribution for the Lasso refitting estimator. ''' if method == 'sqrt_lasso': n = X.shape[0] d = X.shape[1] sqrt_lasso = sm.OLS(Y,X).fit_regularized(method='sqrt_lasso', alpha=1.1*np.sqrt(n)*scipy.stats.norm.ppf(1-0.05/(2*d)), L1_wt=1.0, refit=False) beta_pilot = sqrt_lasso.params else: beta_pilot, sigma_pilot = ScaledLasso(X=X, Y=Y, lam0='quantile') flag = 1 thres = 1e-7 while flag == 1: X_sel = X[:,abs(beta_pilot) > thres] inv_Sigma = np.linalg.inv(np.dot(X_sel.T, X_sel)) beta_new = np.dot(inv_Sigma, np.dot(X_sel.T, Y)) x_new = x[abs(beta_pilot) > thres] m_refit = np.dot(beta_new, x_new) asym_sd = np.sqrt(np.dot(x_new, np.dot(inv_Sigma, x_new))) df = X_sel.shape[0] - X_sel.shape[1] if df > 0: flag = 0 sigma_hat = np.sqrt(np.sum((Y - np.dot(X_sel, beta_new))**2)/df) else: thres = 10*thres print('The degree of freedom is negative for Lasso refitting. '+ 'We will raise the threshold for nonzero regression coefficients') if return_beta: return m_refit, beta_pilot, asym_sd, sigma_hat, df else: return m_refit, asym_sd, sigma_hat, df
[docs] def DebiasProg(X, x, Pi, gamma_n=0.1): ''' The proposed Debiasing (primal) program. Parameters ---------- X: (n,d)-array The input design matrix. x: (d,)-array The current query point. Pi: (n,n)-array A diagonal matrix with (estimated) propensity scores as its diagonal entries. gamma_n: float The regularization parameter "\gamma/n". (Default: gamma_n=0.1.) Return ---------- w: (n,)-array The estimated weights by our debiasing program. ''' n = X.shape[0] w = cp.Variable(n) debias_obj = cp.Minimize(cp.quad_form(w, Pi)) constraints = [x - (1/np.sqrt(n))*(w @ Pi @ X) <= gamma_n, x - (1/np.sqrt(n))*(w @ Pi @ X) >= -gamma_n] debias_prog = cp.Problem(debias_obj, constraints) try: debias_prog.solve(solver=cp.MOSEK) except cp.SolverError: debias_prog.solve(solver=cp.CVXOPT, max_iters=30000) if debias_prog.value == np.inf: print('The primal debiasing program is infeasible!') return np.nan*np.ones((n,)) else: return w.value
[docs] def SoftThres(theta, lamb): ''' Soft-thresholding function. Parameters ---------- theta: (d,)-array The input vector for soft-thresholding. lamb: float The thresholding parameter. Return ---------- theta: (n,)-array The output vector after soft-thresholding. ''' try: return np.sign(theta)*max([abs(theta) - lamb, 0]) except ValueError: res = np.zeros((theta.shape[0], 2)) res[:,0] = np.abs(theta) - lamb return np.sign(theta)*np.max(res, axis=1)
[docs] def DualObj(X, x, Pi, ll_cur, gamma_n=0.05): ''' Compute the objective function of the dual form of our debiasing program. Parameters ---------- X: (n,d)-array The input design matrix. x: (d,)-array The current query point. Pi: (n,n)-array A diagonal matrix with (estimated) propensity scores as its diagonal entries. ll_cur: (d,)-array The current value of the dual solution vector. gamma_n: float The regularization parameter "\gamma/n". (Default: gamma_n=0.05.) Return ---------- dual_obj: float The value of the objective function of our dual debiasing program. ''' n = X.shape[0] A = np.dot(X.T, np.dot(Pi, X)) dual_obj = np.dot(np.dot(ll_cur.reshape(1, -1), A), ll_cur)/(4*n) + np.dot(x, ll_cur) \ + gamma_n*np.sum(np.abs(ll_cur)) return dual_obj
[docs] def DualCD(X, x, Pi=None, gamma_n=0.05, ll_init=None, eps=1e-9, max_iter=5000): ''' Coordinate descent algorithm for solving the dual form of our debiasing program. Parameters ---------- X: (n,d)-array The input design matrix. x: (d,)-array The current query point. Pi: (n,n)-array A diagonal matrix with (estimated) propensity scores as its diagonal entries. gamma_n: float The regularization parameter "\gamma/n". (Default: gamma_n=0.05.) ll_init: (d,)-array The initial value of the dual solution vector. (Default: ll_init=None. Then, the vector with all-one entries is used.) eps: float. The precision parameter for stopping the iteration. (Default: eps=1e-9.) max_iter: int Maximum number of coordinate descent iterations. (Default: max_iter=5000.) Return ---------- ll_new: (d,)-array The solution vector to our dual debiasing program. ''' n = X.shape[0] d = X.shape[1] if Pi is None: Pi = np.eye(n) A = np.dot(X.T, np.dot(Pi, X)) if ll_init is None: ll_new = np.ones((d,)) else: ll_new = ll_init ll_old = 100*np.ones_like(ll_new) cnt = 0 flag = 0 while (np.linalg.norm(ll_old - ll_new) > eps) and ((cnt <= max_iter) or (flag == 0)): ll_old = np.copy(ll_new) cnt += 1 # Coordinate descent for j in range(d): ll_cur = ll_new.copy() mask = np.ones(ll_cur.shape, dtype=bool) mask[j] = 0 A_kj = A[mask, j] ll_cur = ll_cur[mask] up_val = SoftThres(-np.dot(A_kj, ll_cur)/(2*n) - x[j], lamb=gamma_n)/(A[j,j]/(2*n)) if np.isnan(up_val): ll_new[j] = 0 else: ll_new[j] = up_val if (cnt > max_iter) and (flag == 0): print('The coordinate descent algorithm has reached its maximum number of iterations: '\ +str(max_iter)+'! Reiterate one more times without small perturbations to the scaled design matrix...') A = A + 1e-9*np.eye(d) cnt = 0 flag = 1 return ll_new
[docs] def DebiasProgCV(X, x, prop_score, gamma_lst=None, cv_fold=5, cv_rule='1se'): ''' Our proposed Debiasing (primal) program. Parameters ---------- X: (n,d)-array The input design matrix. x: (d,)-array The current query point. prop_score: (n,)-array The (estimated) propensity scores evaluted on the covariates. gamma_lst: list or (m,)-array The list of candidate values for the tuning parameter "\gamma/n". (Default: gamma_lst=None. Then, gamma_lst contains 41 equally spacing value between 0.001 and max(abs(x)).) cv_fold: int The number of folds for cross-validation on the dual program. (Default: cv_fold=5.) cv_rule: string The criteria/rules for selecting the final value of the regularization parameter "\gamma/n" in the dual program. (Default: cv_rule='1se'. The candidate choices include '1se', 'minfeas', and 'mincv'.) Return ---------- w_obs: (n,)-array The final estimated weights by our debiasing program. ll_obs: (n,)-array The final value of the solution to our debiasing dual program. gamma_n_opt: float The final value of the tuning parameter "\gamma/n" selected by cross-validation. ''' if gamma_lst is None: gamma_lst = np.linspace(0.001, np.max(abs(x)), 41) kf = KFold(n_splits=cv_fold, shuffle=True, random_state=0) f_ind = 0 dual_loss = np.zeros((cv_fold, len(gamma_lst))) for train_ind, test_ind in kf.split(X): X_train = X[train_ind,:] X_test = X[test_ind,:] prop_score_train = prop_score[train_ind] prop_score_test = prop_score[test_ind] for j in range(len(gamma_lst)): w_train = DebiasProg(X=X_train.copy(), x=x, Pi=np.diag(prop_score_train), gamma_n=gamma_lst[j]) if any(np.isnan(w_train)): print(r'The primal debiasing program for this fold of the data is '\ 'not feasible when \gamma/n='+str(round(gamma_lst[j], 4))+'!') dual_loss[f_ind, j] = np.nan else: ll_train = DualCD(X=X_train, x=x, Pi=np.diag(prop_score_train), gamma_n=gamma_lst[j], ll_init=None, eps=1e-8, max_iter=5000) if sum(abs(w_train + np.dot(X_train, ll_train)/(2*np.sqrt(X_train.shape[0])))>1e-3) > 0: print(r'The strong duality between primal and dual programs does not satisfy '\ 'when \gamma/n='+str(round(gamma_lst[j], 4))+'!') dual_loss[f_ind, j] = np.nan else: dual_loss[f_ind, j] = DualObj(X_test, x=x, Pi=np.diag(prop_score_test), ll_cur=ll_train, gamma_n=gamma_lst[j]) f_ind = f_ind + 1 mean_dual_loss = np.mean(dual_loss, axis=0) std_dual_loss = np.std(dual_loss, axis=0, ddof=1)/np.sqrt(cv_fold) # Different rules for choosing the tuning parameter if cv_rule == 'mincv': gamma_n_opt = gamma_lst[np.nanargmin(mean_dual_loss)] if cv_rule == '1se': One_SE = (mean_dual_loss > np.nanmin(mean_dual_loss) + std_dual_loss[np.nanargmin(mean_dual_loss)]) & \ (gamma_lst < gamma_lst[np.nanargmin(mean_dual_loss)]) if sum(One_SE) == 0: One_SE = np.full((len(gamma_lst),), True) gamma_lst = gamma_lst[One_SE] gamma_n_opt = gamma_lst[np.nanargmin(mean_dual_loss[One_SE])] if cv_rule == 'minfeas': gamma_n_opt = np.min(gamma_lst[~np.isnan(mean_dual_loss)]) # Solve the primal and dual on the original dataset w_obs = DebiasProg(X=X.copy(), x=x, Pi=np.diag(prop_score), gamma_n=gamma_n_opt) ll_obs = DualCD(X=X, x=x, Pi=np.diag(prop_score), gamma_n=gamma_n_opt, ll_init=None, eps=1e-9) return w_obs, ll_obs, gamma_n_opt
[docs] def DualADMM(X, x, Pi=None, gamma_n=0.05, rho=1, ll_init=None, eps=1e-9): ''' Alternating direction method of multipliers (ADMM) algorithm for solving the dual form of our debiasing program. Parameters ---------- X: (n,d)-array The input design matrix. x: (d,)-array The current query point. Pi: (n,n)-array A diagonal matrix with (estimated) propensity scores as its diagonal entries. gamma_n: float The regularization parameter "\gamma/n". (Default: gamma_n=0.05.) rho: float The dual parameter in ADMM. ll_init: (d,)-array The initial value of the dual solution vector. (Default: ll_init=None. Then, the vector with all-one entries is used.) eps: float. The precision parameter for stopping the iteration. (Default: eps=1e-9.) max_iter: int Maximum number of coordinate descent iterations. (Default: max_iter=5000.) Return ---------- ll_new: (d,)-array The solution vector to our dual debiasing program. ''' n = X.shape[0] d = X.shape[1] if Pi is None: Pi = np.eye(n) A = np.dot(X.T, np.dot(Pi, X)) if ll_init is None: ll_new = np.ones((d,)) else: ll_new = ll_init ll_old = 100*np.ones_like(ll_new) nu_aug = np.ones_like(ll_new) mu_aug = np.zeros_like(ll_new) while np.linalg.norm(ll_old - ll_new) > eps: ll_old = np.copy(ll_new) ll_new = np.dot(np.linalg.inv(A/(2*n) + rho*np.eye(d)), rho*nu_aug - mu_aug - x) nu_aug = SoftThres(ll_new + mu_aug/rho, lamb=gamma_n/rho) mu_aug = mu_aug + rho*(ll_new - nu_aug) return ll_new