diff --git a/permute/multitest_core.py b/permute/multitest_core.py new file mode 100644 index 0000000..0c28270 --- /dev/null +++ b/permute/multitest_core.py @@ -0,0 +1,596 @@ +# -*- coding: utf-8 -*- +""" +Created on Tue Mar 8 10:38:11 2022 + +@author: Clayton +""" + + +import numpy as np +from scipy.stats import ttest_ind, ttest_1samp + +from .utils import get_prng, permute + + +""" TODO for multi testing core package + +two_sample_conf_int once it is finalized + +""" + +def multitest_corr(x, y, alternative='greater', reps=10**4, seed=None, plus1=True): + r""" + Simulate permutation p-value for multiple Pearson correlation coefficients + + Parameters + ---------- + x : array-like with shape (observations,tests) + y : array-like with shape (observations,tests) + alternative : {'greater', 'less', 'two-sided'} + The alternative hypothesis to test + reps : int + seed : RandomState instance or {None, int, RandomState instance} + If None, the pseudorandom number generator is the RandomState + instance used by `np.random`; + If int, seed is the seed used by the random number generator; + If RandomState instance, seed is the pseudorandom number generator + plus1 : bool + flag for whether to add 1 to the numerator and denominator of the + p-value based on the empirical permutation distribution. + Default is True. + + Returns + ------- + tuple + Returns test statistic, p-value, simulated distribution + """ + # make sure the two samples have the same shape + if x.shape != y.shape: + raise ValueError('x and y must have the same shape') + # get number of hypotheses being tested + num_tests = x.shape[1] + prng = get_prng(seed) + # create mask to grab the corr values we care about from np.corrcoef function (majority of what it returns we aren't interested in) + corr_mat_mask = np.zeros((2*num_tests,2*num_tests),dtype=bool) + corr_mat_mask[x.shape[1]+np.arange(num_tests),np.arange(num_tests)] = True + # calculate correlations and grab the pairs we are interested in + tst = np.corrcoef(x, y,rowvar=False)[corr_mat_mask] + # permute to get null distribution + sims = [np.corrcoef(permute(x, prng), y,rowvar=False)[corr_mat_mask] for i in range(reps)] + # get percentiles of statistic + left_pv = (np.sum(sims <= tst,axis=0)+plus1) / (reps+plus1) + right_pv = (np.sum(sims >= tst,axis=0)+plus1) / (reps+plus1) + # assign pvalue based on hypothesis + if alternative == 'greater': + pvalue = right_pv + elif alternative == 'less': + pvalue = left_pv + elif alternative == 'two-sided': + pvalue = np.min([1*np.ones(num_tests), 2 * np.min([left_pv, right_pv],axis=0)],axis=0) + return tst, pvalue, sims + +def multitest_spearman_corr(x, y, alternative='greater', reps=10**4, seed=None, plus1=True): + r""" + Simulate permutation p-value for multiple Spearman correlation coefficients + + Parameters + ---------- + x : array-like with shape (observations,tests) + y : array-like with shape (observations,tests) + alternative : {'greater', 'less', 'two-sided'} + The alternative hypothesis to test + reps : int + seed : RandomState instance or {None, int, RandomState instance} + If None, the pseudorandom number generator is the RandomState + instance used by `np.random`; + If int, seed is the seed used by the random number generator; + If RandomState instance, seed is the pseudorandom number generator + plus1 : bool + flag for whether to add 1 to the numerator and denominator of the + p-value based on the empirical permutation distribution. + Default is True. + + Returns + ------- + tuple + Returns test statistic, p-value, simulated distribution + """ + # sort observations per each test + xnew = np.argsort(x,0)+1 + ynew = np.argsort(y,0)+1 + return multitest_corr(xnew, ynew, alternative=alternative, reps=reps, seed=seed) + + + +def multitest_two_sample_core(potential_outcomes_all, nx, tst_stat, alternative='greater', + reps=10**5, keep_dist=False, seed=None, plus1=True): + r""" + Main workhorse function for two_sample and two_sample_shift + + Parameters + ---------- + potential_outcomes_all : array-like + 3D array [observations,tests,conditions] of multiple potential + outcomes under treatment [:,:,0] and control [:,:,1]. + To be passed in from potential_outcomes + nx : int + Size of the treatment group x + reps : int + number of repetitions + tst_stat: function + The test statistic + alternative : {'greater', 'less', 'two-sided'} + The alternative hypothesis to test + keep_dist : bool + flag for whether to store and return the array of values + of the test statistic. Default is False. + seed : RandomState instance or {None, int, RandomState instance} + If None, the pseudorandom number generator is the RandomState + instance used by `np.random`; + If int, seed is the seed used by the random number generator; + If RandomState instance, seed is the pseudorandom number generator + plus1 : bool + flag for whether to add 1 to the numerator and denominator of the + p-value based on the empirical permutation distribution. + Default is True. + + Returns + ------- + float + the estimated p-value + list + The distribution of test statistics. + These values are only returned if `keep_dist` == True + """ + prng = get_prng(seed) + # get number of hypotheses being tested + num_tests = potential_outcomes_all.shape[1] + # create indexing vector + rr = list(range(potential_outcomes_all.shape[0])) + # get observed statistic + tst = tst_stat(potential_outcomes_all[:nx,:,0], + potential_outcomes_all[nx:,:, 1]) + # create dictionary to store functions for calculating p values + thePvalue = { + 'greater': lambda pUp, pDn: pUp+plus1/(reps+plus1), + 'less': lambda pUp, pDn: pDn+plus1/(reps+plus1), + 'two-sided': lambda pUp, pDn: 2 * np.min(np.stack([0.5*np.ones(num_tests), \ + pUp+plus1/(reps+plus1), \ + pDn+plus1/(reps+plus1)],1),1) + } + # account for keep_dist (keep distribution) + if keep_dist: + dist = np.empty((reps,num_tests)) + for i in range(reps): + # permute indexing vector + prng.shuffle(rr) + # grab shuffled values + pp = np.take(potential_outcomes_all, rr, axis=0) + # calculate statistic + dist[i,:] = tst_stat(pp[:nx, :, 0], pp[nx:,: , 1]) + # calculate percentile + pUp = np.sum(dist >= tst,axis=0)/(reps+plus1) + pDn = np.sum(dist <= tst,axis=0)/(reps+plus1) + return thePvalue[alternative](pUp, pDn), dist + else: + # preallocate vectors to store number of times our observed statistic + # is greater than the permuted statistic(s). Keeps memory requirement low + hitsUp = np.zeros(num_tests) + hitsDn = np.zeros(num_tests) + for i in range(reps): + # permute indexing vector + prng.shuffle(rr) + # grab shuffled values + pp = np.take(potential_outcomes_all, rr, axis=0) + # count if observed statistic is larger or smaller than permuted + hitsUp += tst_stat(pp[:nx, :, 0], pp[nx:, :, 1]) >= tst + hitsDn += tst_stat(pp[:nx, :, 0], pp[nx:, :, 1]) <= tst + # calculate percentile + pUp = hitsUp/(reps+plus1) + pDn = hitsDn/(reps+plus1) + return thePvalue[alternative](pUp, pDn) + + + +def multitest_one_sample(x, y=None, reps=10**5, stat='mean', alternative="greater", + keep_dist=False, seed=None, plus1=True): + r""" + One-sided or two-sided, one-sample permutation test for the mean, + with p-value estimated by simulated random sampling with + reps replications. + + Alternatively, a permutation test for equality of means of two paired + samples. + + Tests the hypothesis that x is distributed symmetrically symmetric about 0 + (or x and y have the same center) against the alternative that x comes from + a population with mean + + (a) greater than 0 (greater than that of the population from which y comes), + if side = 'greater' + (b) less than 0 (less than that of the population from which y comes), + if side = 'less' + (c) different from 0 (different from that of the population from which y comes), + if side = 'two-sided' + + If ``keep_dist``, return the distribution of values of the test statistic; + otherwise, return only the number of permutations for which the value of + the test statistic and p-value. + + Parameters + ---------- + x : array-like + Sample 1 with shape (observations,tests) + y : array-like + Sample 2 with shape (observations,tests). Must preserve the order of pairs with x. + If None, x is taken to be the one sample. + reps : int + number of repetitions + stat : {'mean', 't'} + The test statistic. The statistic is computed based on either z = x or + z = x - y, if y is specified. + + (a) If stat == 'mean', the test statistic is mean(z). + (b) If stat == 't', the test statistic is the t-statistic-- + but the p-value is still estimated by the randomization, + approximating the permutation distribution. + (c) If stat is a function (a callable object), the test statistic is + that function. The function should take a permutation of the + data and compute the test function from it. For instance, if the + test statistic is the maximum absolute value, $\max_i |z_i|$, + the test statistic could be written: + + f = lambda u: np.max(abs(u)) + alternative : {'greater', 'less', 'two-sided'} + The alternative hypothesis to test + keep_dist : bool + flag for whether to store and return the array of values + of the irr test statistic + seed : RandomState instance or {None, int, RandomState instance} + If None, the pseudorandom number generator is the RandomState + instance used by `np.random`; + If int, seed is the seed used by the random number generator; + If RandomState instance, seed is the pseudorandom number generator + plus1 : bool + flag for whether to add 1 to the numerator and denominator of the + p-value based on the empirical permutation distribution. + Default is True. + + Returns + ------- + float + the estimated p-value + float + the test statistic + list + The distribution of test statistics. + These values are only returned if `keep_dist` == True + """ + prng = get_prng(seed) + # since one sample test, if we get both x and y turn into one sample + # (and ensure they have the same shape) + if y is None: + z = x + # make sure + elif x.shape != y.shape: + raise ValueError('x and y must have the same shape') + else: + z = np.array(x) - np.array(y) + # get number of hypotheses being tested + num_tests = z.shape[1] + # create dictionary to store functions for calculating p values + thePvalue = { + 'greater': lambda pUp, pDn: pUp+plus1/(reps+plus1), + 'less': lambda pUp, pDn: pDn+plus1/(reps+plus1), + 'two-sided': lambda pUp, pDn: 2 * np.min(np.stack([0.5*np.ones(num_tests), \ + pUp+plus1/(reps+plus1), \ + pDn+plus1/(reps+plus1)],1),axis=1) + } + # create dictionary to store common statistics ensuring correct axis + stats = { + 'mean': lambda u: np.mean(u,axis=0), + 't': lambda u: ttest_1samp(u, 0, axis=0)[0] + } + # if we were given a custom statistic function, use that + if callable(stat): + tst_fun = stat + else: + tst_fun = stats[stat] + # calculate observed statistic + tst = tst_fun(z) + # account for keep_dist (keep distribution) + if keep_dist: + # preallocate space to build null distribution + # (2D since each test will have its own distribution) + dist = np.empty((reps,num_tests)) + for i in range(reps): + # calculate statistic of current permutation + dist[i,:] = tst_fun(z * (1 - 2 * prng.randint(0, 2, z.shape))) + # calculate percentile for each test + pUp = np.sum(dist >= tst,axis=0)/(reps+plus1) + pDn = np.sum(dist <= tst,axis=0)/(reps+plus1) + return thePvalue[alternative](pUp, pDn), tst, dist + else: + # preallocate vectors to store number of times our observed statistic + # is greater than the permuted statistic(s). Keeps memory requirement low. + hitsUp = np.zeros(num_tests) + hitsDn = np.zeros(num_tests) + for i in range(reps): + # calculate statistic for current permutation + curr_tst = tst_fun(z * (1 - 2 * prng.randint(0, 2, z.shape))) + # iterate counters accordingly + hitsUp += curr_tst >= tst + hitsDn += curr_tst <= tst + # calculate percentiles + pUp = hitsUp/(reps+plus1) + pDn = hitsDn/(reps+plus1) + return thePvalue[alternative](pUp, pDn), tst + + +def multitest_two_sample(x, y, reps=10**5, stat='mean', alternative="greater", + keep_dist=False, seed=None, plus1=True): + r""" + One-sided or two-sided, two-sample permutation multi-test for equality of + two means, with p-value estimated by simulated random sampling with + reps replications. + + Tests the hypothesis that x and y are a random partition of x,y + against the alternative that x comes from a population with mean + + (a) greater than that of the population from which y comes, + if side = 'greater' + (b) less than that of the population from which y comes, + if side = 'less' + (c) different from that of the population from which y comes, + if side = 'two-sided' + + If ``keep_dist``, return the distribution of values of the test statistic; + otherwise, return only the number of permutations for which the value of + the test statistic and p-value. + + Parameters + ---------- + x : array-like + Sample 1 with shape (observations,tests) + y : array-like + Sample 2 with shape (observations,tests) + reps : int + number of repetitions + stat : {'mean', 't'} + The test statistic. + + (a) If stat == 'mean', the test statistic is (mean(x) - mean(y)) + (equivalently, sum(x), since those are monotonically related) + (b) If stat == 't', the test statistic is the two-sample t-statistic-- + but the p-value is still estimated by the randomization, + approximating the permutation distribution. + The t-statistic is computed using scipy.stats.ttest_ind + (c) If stat is a function (a callable object), the test statistic is + that function. The function should take two arguments: + given a permutation of the pooled data, the first argument is the + "new" x and the second argument is the "new" y. + For instance, if the test statistic is the Kolmogorov-Smirnov distance + between the empirical distributions of the two samples, + $\max_t |F_x(t) - F_y(t)|$, the test statistic could be written: + + f = lambda u, v: np.max( \ + [abs(sum(u<=val)/len(u)-sum(v<=val)/len(v)) for val in np.concatenate([u, v])]\ + ) + + alternative : {'greater', 'less', 'two-sided'} + The alternative hypothesis to test + keep_dist : bool + flag for whether to store and return the array of values + of the irr test statistic + seed : RandomState instance or {None, int, RandomState instance} + If None, the pseudorandom number generator is the RandomState + instance used by `np.random`; + If int, seed is the seed used by the random number generator; + If RandomState instance, seed is the pseudorandom number generator + plus1 : bool + flag for whether to add 1 to the numerator and denominator of the + p-value based on the empirical permutation distribution. + Default is True. + + Returns + ------- + float + the estimated p-value + float + the test statistic + list + The distribution of test statistics. + These values are only returned if `keep_dist` == True + """ + # ensure x and y have the same shape + if x.shape != y.shape: + raise ValueError('x and y must have the same shape') + # Set up potential outcomes; under the null, all units are exchangeable + pot_out_all = np.stack( + [np.concatenate([x, y]), np.concatenate([x, y])],2) + + + # If stat is callable, use it as the test function. Otherwise, look in the + # dictionary + stats = { + 'mean': lambda u, v: np.mean(u,axis=0) - np.mean(v,axis=0), + 't': lambda u, v: ttest_ind(u, v, axis=0, equal_var=True)[0] + } + + if callable(stat): + tst_fun = stat + else: + tst_fun = stats[stat] + + # get number of observations + nx = x.shape[0] + # calculate observed statistic for all tests + observed_tst = tst_fun(pot_out_all[:nx, :, 0], pot_out_all[nx:, :, 1]) + # call main worker function + res = multitest_two_sample_core(pot_out_all, nx, tst_fun, alternative=alternative, + reps=reps, keep_dist=keep_dist, seed=seed, plus1=plus1) + # accomadate user request for returning distribution + if keep_dist: + return res[0], observed_tst, res[1] + else: + return res, observed_tst + + +def multitest_potential_outcomes(x, y, f, finverse): + """ + Given observations $x$ under treatment and $y$ under control conditions, + returns the potential outcomes for units under their unobserved condition + under the hypothesis that $x_i = f(y_i)$ for all units. + + Parameters + ---------- + x : array-like + Outcomes under treatment + y : array-like + Outcomes under control + f : function + An invertible function + finverse : function + The inverse function to f. + + Returns + ------- + potential_outcomes : 2D array + The first column contains all potential outcomes under the treatment, + the second column contains all potential outcomes under the control. + """ + + tester = np.array(range(5)) + 1 + assert np.allclose(finverse(f(tester)), + tester), "f and finverse aren't inverses" + assert np.allclose(f(finverse(tester)), + tester), "f and finverse aren't inverses" + + pot_treat = np.concatenate([x, f(y)]) + pot_ctrl = np.concatenate([finverse(x), y]) + + return np.stack([pot_treat, pot_ctrl],2) + + +# def multitest_two_sample_shift(x, y, reps=10**5, stat='mean', alternative="greater", +# keep_dist=False, seed=None, shift=None, plus1=True): +# r""" +# One-sided or two-sided, two-sample permutation multi-test for equality of +# two means, with p-value estimated by simulated random sampling with +# reps replications. + +# Tests the hypothesis that x and y are a random partition of x,y +# against the alternative that x comes from a population with mean + +# (a) greater than that of the population from which y comes, +# if side = 'greater' +# (b) less than that of the population from which y comes, +# if side = 'less' +# (c) different from that of the population from which y comes, +# if side = 'two-sided' + +# If ``keep_dist``, return the distribution of values of the test statistic; +# otherwise, return only the number of permutations for which the value of +# the test statistic and p-value. + +# Parameters +# ---------- +# x : array-like +# Sample 1 +# y : array-like +# Sample 2 +# reps : int +# number of repetitions +# stat : {'mean', 't'} +# The test statistic. + +# (a) If stat == 'mean', the test statistic is (mean(x) - mean(y)) +# (equivalently, sum(x), since those are monotonically related) +# (b) If stat == 't', the test statistic is the two-sample t-statistic-- +# but the p-value is still estimated by the randomization, +# approximating the permutation distribution. +# The t-statistic is computed using scipy.stats.ttest_ind +# (c) If stat is a function (a callable object), the test statistic is +# that function. The function should take two arguments: +# given a permutation of the pooled data, the first argument is the +# "new" x and the second argument is the "new" y. +# For instance, if the test statistic is the Kolmogorov-Smirnov distance +# between the empirical distributions of the two samples, +# $\max_t |F_x(t) - F_y(t)|$, the test statistic could be written: + +# f = lambda u, v: np.max( \ +# [abs(sum(u<=val)/len(u)-sum(v<=val)/len(v)) for val in np.concatenate([u, v])]\ +# ) + +# alternative : {'greater', 'less', 'two-sided'} +# The alternative hypothesis to test +# keep_dist : bool +# flag for whether to store and return the array of values +# of the irr test statistic +# seed : RandomState instance or {None, int, RandomState instance} +# If None, the pseudorandom number generator is the RandomState +# instance used by `np.random`; +# If int, seed is the seed used by the random number generator; +# If RandomState instance, seed is the pseudorandom number generator +# shift : float +# The relationship between x and y under the null hypothesis. + +# (a) A constant scalar shift in the distribution of y. That is, x is equal +# in distribution to y + shift. +# (b) A tuple containing the function and its inverse $(f, f^{-1})$, so +# $x_i = f(y_i)$ and $y_i = f^{-1}(x_i)$ +# plus1 : bool +# flag for whether to add 1 to the numerator and denominator of the +# p-value based on the empirical permutation distribution. +# Default is True. + +# Returns +# ------- +# float +# the estimated p-value +# float +# the test statistic +# list +# The distribution of test statistics. +# These values are only returned if `keep_dist` == True +# """ +# # Set up potential outcomes according to shift +# if isinstance(shift, float) or isinstance(shift, int): +# # Potential outcomes for all units under treatment +# pot_outx = np.concatenate([x, y + shift]) +# # Potential outcomes for all units under control +# pot_outy = np.concatenate([x - shift, y]) +# pot_out_all = np.stack([pot_outx, pot_outy],2) +# elif isinstance(shift, tuple): +# assert (callable(shift[0])), "Supply f and finverse in shift tuple" +# assert (callable(shift[1])), "Supply f and finverse in shift tuple" +# pot_out_all = multitest_potential_outcomes(x, y, shift[0], shift[1]) +# else: +# raise ValueError("Bad input for shift") +# # If stat is callable, use it as the test function. Otherwise, look in the +# # dictionary +# stats = { +# 'mean': lambda u, v: np.mean(u,axis=0) - np.mean(v,axis=0), +# 't': lambda u, v: ttest_ind(u, v, axis=0, equal_var=True)[0] +# } +# if callable(stat): +# tst_fun = stat +# else: +# tst_fun = stats[stat] +# # get number of observations +# nx = x.shape[0] +# # calculate observed statistics for all tests +# observed_tst = tst_fun(pot_out_all[:nx,:, 0], pot_out_all[nx:,:, 1]) +# # call main worker function +# res = multitest_two_sample_core(pot_out_all, nx, tst_fun, alternative=alternative, +# reps=reps, keep_dist=keep_dist, seed=seed, plus1=plus1) +# # accomadate user request for returning distribution +# if keep_dist: +# return res[0], observed_tst, res[1] +# else: +# return res, observed_tst + + + + + diff --git a/permute/multitest_stratified.py b/permute/multitest_stratified.py new file mode 100644 index 0000000..ba8d6d2 --- /dev/null +++ b/permute/multitest_stratified.py @@ -0,0 +1,450 @@ +# -*- coding: utf-8 -*- +""" +Created on Wed Mar 9 15:31:17 2022 + +@author: Clayton +""" + +import numpy as np +from scipy.stats import ttest_ind + +from .utils import get_prng, permute_within_groups + +def multitest_stratified_corrcoef(x, y, group): + r""" + Calculates sum of Spearman correlations between x and y, + computed separately in each group. + + Parameters + ---------- + x : array-like + Variable 1 + y : array-like + Variable 2, of the same length as x + group : array-like + Group memberships, of the same length as x + + Returns + ------- + float + The sum of Spearman correlations + """ + # ensure x and y are the same shape (same number of observations and tests) + if x.shape != y.shape: + raise ValueError('x and y must have the same shape') + # get number of hypotheses tested + num_tests = x.shape[1] + # create mask to grab correlations from corrcoeff we care about (don't care about all pairs) + corr_mat_mask = np.zeros((2*num_tests,2*num_tests),dtype=bool) + corr_mat_mask[x.shape[1]+np.arange(num_tests),np.arange(num_tests)] = True + # preallocate vector to store aggregate correlations for each test + tst = np.zeros(num_tests) + for g in np.unique(group): + # create mask for current group + gg = group == g + # calculate and grab correlation coefficients for current group + tst += np.corrcoef(x[gg,:], y[gg,:],rowvar=False)[corr_mat_mask] + return tst + + + +def multitest_stratified_sim_corr(x, y, group, reps=10**4, alternative='greater', seed=None, plus1=True): + r""" + Simulate permutation p-value of stratified Spearman correlation test. + + Parameters + ---------- + x : array-like + Variable 1 + y : array-like + Variable 2, of the same length as x + group : array-like + Group memberships, of the same length as x + alternative : {'greater', 'less', 'two-sided'} + The alternative hypothesis to test + reps : int + Number of repetitions + seed : RandomState instance or {None, int, RandomState instance} + If None, the pseudorandom number generator is the RandomState + instance used by `np.random`; + If int, seed is the seed used by the random number generator; + If RandomState instance, seed is the pseudorandom number generator. + plus1 : bool + flag for whether to add 1 to the numerator and denominator of the + p-value based on the empirical permutation distribution. + Default is True. + + Returns + ------- + float + the estimated p-value + float + the observed test statistic + list + the null distribution + """ + # ensure x and y have the same shape (same number of observations and tests) + if x.shape != y.shape: + raise ValueError('x and y must have the same shape') + prng = get_prng(seed) + x = x.astype(float) + y = y.astype(float) + # calculate observed statistic + tst = multitest_stratified_corrcoef(x, y, group) + # account for user wanting to perform max correction + # calculate statistic on each permutation to build null distribution + dist = [multitest_stratified_corrcoef(permute_within_groups(x, group, prng), y, group) + for i in range(reps)] + # calculate percentile for each test + right_pv = np.sum(dist >= tst,axis=0) / (reps+plus1) + # create dictionary to store p value calculations + thePvalue = { + 'greater': lambda p: p + plus1/(reps+plus1), + 'less': lambda p: 1 - (p + plus1/(reps+plus1)), + 'two-sided': lambda p: 2 * np.min([p + plus1/(reps+plus1), + 1 - (p + plus1/(reps+plus1))],axis=0) + } + return thePvalue[alternative](right_pv), tst, dist + + +def multitest_stratified_permutationtest_mean(group, condition, response, + groups=None, conditions=None): + r""" + Calculates variability in sample means between treatment conditions, + within groups. + + If there are two treatment conditions, the test statistic is the + difference in means, aggregated across groups. + If there are more than two treatment conditions, the test statistic + is the standard deviation of the means, aggregated across groups. + + Parameters + ---------- + group : array-like + Group memberships + condition : array-like + Treatment conditions, of the same length as group + response : array-like + Responses, of the same length as group. Has shape [observations,tests].. + groups : array-like + Group labels. By default, it is the unique values of group + conditions : array-like + Condition labels. By default, it is the unique values of condition + + + Returns + ------- + tst : float + The observed test statistic + """ + # get number of hypotheses to test + num_tests = response.shape[1] + # get the ID for each group + if groups is None: + groups = np.unique(group) + # get the ID for each condition + if conditions is None: + conditions = np.unique(condition) + # preallocate vector to store the aggregate statistic for each test + tst = np.zeros(num_tests) + # check there are at least 2 groups + if len(groups) < 2: + raise ValueError('Number of groups must be at least 2.') + # if 2 conditions, calculate mean. If more than 2, calculate std of outcomes + # TODO ensure this is intended behavior, in stratified.py this is done + # with the variable "groups", but that doesn't seem right to me + elif len(conditions) == 2: + stat = lambda u: u[0] - u[1] + for g in groups: + # create mask for current group + gg = group == g + # create conjugate mask for group and condition + x = [gg & (condition == c) for c in conditions] + # aggregate statistic for each group and condition + tst += stat([response[x[j],:].mean(axis=0) for j in range(len(x))]) + elif len(conditions) > 2: + for g in groups: + # create mask for current group + gg = group == g + # create conjugate mask for group and condition + x = [gg & (condition == c) for c in conditions] + # aggregate statistic for each group and condition + tst += np.std([response[x[j],:].mean(axis=0) for j in range(len(x))],0) + return tst + + +def multitest_stratified_permutationtest( + group, + condition, + response, + alternative='greater', + reps=10**5, + testStatistic='mean', + seed=None, + plus1=True): + r""" + Stratified permutation test based on differences in means. + + The test statistic is + + .. math:: \sum_{g \in \text{groups}} [ + f(mean(\text{response for cases in group $g$ + assigned to each condition}))]. + + The function f is the difference if there are two conditions, and + the standard deviation if there are more than two conditions. + + There should be at least one group and at least two conditions. + Under the null hypothesis, all assignments to the two conditions that + preserve the number of cases assigned to the conditions are equally + likely. + + Groups in which all cases are assigned to the same condition are + skipped; they do not contribute to the p-value since all randomizations + give the same contribution to the difference in means. + + Parameters + ---------- + group : array-like + Group memberships + condition : array-like + Treatment conditions, of the same length as group + response : array-like + Responses, of the same length as group + alternative : {'greater', 'less', 'two-sided'} + The alternative hypothesis to test + reps : int + Number of repetitions + testStatistic : function + Function to compute test statistic. By default, + stratified_permutationtest_mean + The test statistic. Either a string or function. + + (a) If stat == 'mean', the test statistic is + stratified_permutationtest_mean (default). + (b) If stat is a function (a callable object), the test statistic is + that function. The function should take a permutation of the + data and compute the test function from it. For instance, if the + test statistic is the maximum absolute value, $\max_i |z_i|$, + the test statistic could be written: + + f = lambda u: np.max(abs(u)) + seed : RandomState instance or {None, int, RandomState instance} + If None, the pseudorandom number generator is the RandomState + instance used by `np.random`; + If int, seed is the seed used by the random number generator; + If RandomState instance, seed is the pseudorandom number generator + plus1 : bool + flag for whether to add 1 to the numerator and denominator of the + p-value based on the empirical permutation distribution. + Default is True. + + Returns + ------- + float + the estimated p-value + float + the observed test statistic + list + the null distribution + """ + prng = get_prng(seed) + # get the number of hypotheses to test + num_tests = response.shape[1] + # get the group IDs + groups = np.unique(group) + # get the condition IDs + conditions = np.unique(condition) + # create a dictionary to store common statistic calculation + stats = { + 'mean': lambda u: multitest_stratified_permutationtest_mean( + group, + u, + response, + groups, + conditions)} + if callable(testStatistic): + tst_fun = testStatistic + else: + tst_fun = stats[testStatistic] + # create dictionary to store p values calculatoins + thePvalue = { + 'greater': lambda p: p + plus1/(reps+plus1), + 'less': lambda p: 1 - (p + plus1/(reps+plus1)), + 'two-sided': lambda p: 2 * np.min([p + plus1/(reps+plus1), + 1 - (p + plus1/(reps+plus1))],axis=0) + } + # + if len(conditions) < 2: + # TODO would it be more appropriate to raise error? + # raise ValueError('Number of conditions must be at least 2.') + return 1.0, np.nan, None + else: + # calculate observed statistic + tst = tst_fun(condition) + # preallocate vector to store null distribution + # (2D because each test will have its own distribution) + dist = np.zeros((reps,num_tests)) + for i in range(int(reps)): + # calculate statistic for current permutation + dist[i,:] = tst_fun(permute_within_groups(condition, group, prng)) + # calculate percentile for each test + right_pv = np.sum(dist >= tst,axis=0) / (reps+plus1) + return thePvalue[alternative](right_pv), tst, dist + + +def multitest_stratified_two_sample( + group, + condition, + response, + stat='mean', + alternative="greater", + reps=10**5, + keep_dist=False, + seed=None, + plus1=True): + r""" + One-sided or two-sided, two-sample permutation test for equality of + two means, with p-value estimated by simulated random sampling with + reps replications. + + Tests the hypothesis that x and y are a random partition of x,y + against the alternative that x comes from a population with mean + + (a) greater than that of the population from which y comes, + if side = 'greater' + (b) less than that of the population from which y comes, + if side = 'less' + (c) different from that of the population from which y comes, + if side = 'two-sided' + + Permutations are carried out within the given groups. Under the null + hypothesis, observations within each group are exchangeable. + + If ``keep_dist``, return the distribution of values of the test statistic; + otherwise, return only the number of permutations for which the value of + the test statistic and p-value. + + Parameters + ---------- + group : array-like + Group memberships + condition : array-like + Treatment conditions, of the same length as group + response : array-like + Responses, of the same length as group + stat : {'mean', 't'} + The test statistic. + + (a) If stat == 'mean', the test statistic is (mean(x) - mean(y)) + (equivalently, sum(x), since those are monotonically related), + omitting NaNs, which therefore can be used to code non-responders + (b) If stat == 't', the test statistic is the two-sample t-statistic-- + but the p-value is still estimated by the randomization, + approximating the permutation distribution. + The t-statistic is computed using scipy.stats.ttest_ind + (c) If stat == 'mean_within_strata', the test statistic is the + difference in means within each stratum, added across strata. + (d) If stat is a function (a callable object), the test statistic is + that function. The function should take a permutation of the + pooled data and compute the test function from it. For instance, + if the test statistic is the Kolmogorov-Smirnov distance between + the empirical distributions of the two samples, + $max_t |F_x(t) - F_y(t)|$, the test statistic could be written: + + f = lambda u: np.max( \ + [abs(sum(u[:len(x)]<=v)/len(x)-sum(u[len(x):]<=v)/len(y)) + for v in u]\ + ) + alternative : {'greater', 'less', 'two-sided'} + The alternative hypothesis to test + reps : int + Number of permutations + keep_dist : bool + flag for whether to store and return the array of values + of the test statistic + seed : RandomState instance or {None, int, RandomState instance} + If None, the pseudorandom number generator is the RandomState + instance used by `np.random`; + If int, seed is the seed used by the random number generator; + If RandomState instance, seed is the pseudorandom number generator. + plus1 : bool + flag for whether to add 1 to the numerator and denominator of the + p-value based on the empirical permutation distribution. + Default is True. + + Returns + ------- + float + the estimated p-value + float + the test statistic + list + The distribution of test statistics. + These values are only returned if `keep_dist` == True + """ + + prng = get_prng(seed) + # get number of hypotheses to test + num_tests = response.shape[1] + # get indexing to sort by condition (not sure why this is necessary) + ordering = condition.argsort() + response = response[ordering] + condition = condition[ordering] + group = group[ordering] + # get number of samples that received condition with lowest ID + # TODO should we ensure each condition has the same number of samples? + ntreat = np.sum(condition == condition[0]) + + # get the IDs for each group and condition + groups = np.unique(group) + conditions = np.unique(condition) + # If stat is callable, use it as the test function. Otherwise, look in the + # dictionary + # TODO there is no x, not sure what desired behavior is here + stats = { + 'mean': lambda u: np.nanmean(u[:ntreat],axis=0) - np.nanmean(u[ntreat:],axis=0), + 't': lambda u: ttest_ind( + u[:len(x)][~np.isnan(u[:ntreat])], + u[len(x):][~np.isnan(u[ntreat:])], + axis=0,equal_var=True)[0], + 'mean_within_strata': lambda u: multitest_stratified_permutationtest_mean(group, + condition, + u, + groups, + conditions) + } + if callable(stat): + tst_fun = stat + else: + tst_fun = stats[stat] + # create dictionary to store p value calculations + thePvalue = { + 'greater': lambda p: p + plus1/(reps+plus1), + 'less': lambda p: 1 - (p + plus1/(reps+plus1)), + 'two-sided': lambda p: 2 * np.min([p + plus1/(reps+plus1), + 1 - (p + plus1/(reps+plus1))],axis=0) + } + # get observed statistic + observed_tst = tst_fun(response) + # account for keep_dist (keep distribution) + if keep_dist: + # preallocate vector for null distribution + # (2D because build null distribution for each test) + dist = np.empty((reps,num_tests)) + for i in range(reps): + # calculate statistic for current permutation + dist[i,:] = tst_fun(permute_within_groups( + response, group, seed=prng)) + # calculate percentile for each test + hits = np.sum(dist >= observed_tst,axis=0) + return thePvalue[alternative](hits / (reps+plus1)), observed_tst, dist + else: + # create vector to store number of times each hypothesis is less + # than the corresponding statistic of the permuted values + hits = np.zeros(num_tests) + for i in range(reps): + # calculate current statistic + curr_tst = tst_fun(permute_within_groups(response, group, seed=prng)) + hits += curr_tst >= observed_tst + return thePvalue[alternative](hits / (reps+plus1)), observed_tst diff --git a/permute/tests/test_multitest_core.py b/permute/tests/test_multitest_core.py new file mode 100644 index 0000000..382b0e1 --- /dev/null +++ b/permute/tests/test_multitest_core.py @@ -0,0 +1,210 @@ +# -*- coding: utf-8 -*- +""" +Created on Mon Mar 21 12:23:07 2022 + +@author: Clayton +""" + +import pytest + +import numpy as np +from numpy.random import RandomState +from cryptorandom.cryptorandom import SHA256 + +from ..multitest_core import (multitest_corr, + multitest_spearman_corr, + multitest_two_sample, + multitest_two_sample_shift, + multitest_two_sample_conf_int, + multitest_one_sample) + + + +def multitest_test_corr(): + prng = SHA256(42) + num_tests = 2 + x = prng.randint(0, 5, size=(10,num_tests)) + y = x + res1 = multitest_corr(x, y, seed=prng) + res2 = multitest_corr(x, y) + assert len(res1) == 3 + assert len(res2) == 3 + np.testing.assert_almost_equal(res1[0], np.ones(num_tests), decimal=1) + np.testing.assert_almost_equal(res2[0], np.ones(num_tests), decimal=1) + np.testing.assert_almost_equal(res1[1], res2[1], decimal=1) + print("finished test 1 in test_corr()") + + y = prng.randint(0, 5, size=(10,num_tests)) + res1 = multitest_corr(x, y, alternative="less", seed=prng) + res2 = multitest_corr(x, y, alternative="less") + assert len(res1) == 3 + assert len(res2) == 3 + np.testing.assert_almost_equal(res1[0], res2[0], decimal=1) + np.testing.assert_almost_equal(res1[1], res2[1], decimal=1) + print("finished test 2 in test_corr()") + + res1 = multitest_corr(x, y, alternative="two-sided", seed=prng) + res2 = multitest_corr(x, y, alternative="greater") + assert len(res1) == 3 + assert len(res2) == 3 + np.testing.assert_almost_equal(res1[0], res2[0], decimal=1) + np.testing.assert_almost_equal(res1[1], res2[1]*2, decimal=1) + print("finished test 3 in test_corr()") + + +def multitest_test_spearman_corr(): + prng = SHA256(42) + num_tests = 2 + x = (np.array([2, 4, 6, 8, 10])*np.ones((num_tests,5))).T + y = (np.array([1, 3, 5, 6, 9])*np.ones((num_tests,5))).T + xorder = np.array([[1, 2, 3, 4, 5] ,[1, 2, 3, 4, 5]]).T + res1 = multitest_corr(xorder, xorder, seed=prng) + print("finished test 1 in test_spearman_corr()") + + prng = SHA256(42) + res2 = multitest_spearman_corr(x, y, seed=prng) + np.testing.assert_almost_equal(res1[0], res2[0], decimal=1) + np.testing.assert_almost_equal(res1[1], res2[1], decimal=1) + np.testing.assert_array_equal(res1[2], res2[2]) + print("finished test 2 in test_spearman_corr()") + + +@pytest.mark.slow +def multitest_test_two_sample(): + prng = RandomState(42) + num_samples = 200 + num_tests = 2 + # Normal-normal, different means examples + x = prng.normal(1, size=(num_samples,num_tests)) + y = prng.normal(4, size=(num_samples,num_tests)) + expected = ([[1.0, 1.0],[0,0]], [[-3,-3],[-30,-30]]) # define expected probabilites + plus1 = (True,False) + keep_dist = (True,False) + alternative = ('greater','less','two-sided') + stats = ('mean','t') # TODO add custom stat lambda here + num_cases = str(len(plus1)*len(keep_dist)*len(alternative)*len(stats)) + case_count = 1 + # go through all combinations of parameters + for p in plus1: + for k in keep_dist: + for a in alternative: + for s in stats: + res = multitest_two_sample(x,y,reps=10**4,seed=42,plus1=p,keep_dist=k,alternative=a,stat=s) + # check pvals + if a == 'greater': + np.testing.assert_almost_equal(res[0], expected[0][0], decimal = 1) # compare p vals + elif a == 'less' or a =='two-sided': + np.testing.assert_almost_equal(res[0], expected[0][1], decimal = 1) # compare p vals + # check observed statistic + if s == 'mean': + np.testing.assert_almost_equal(res[1], expected[1][0], decimal = 1) # compare observed statistic + elif s == 't': + np.testing.assert_almost_equal(res[1], expected[1][1], decimal = 0) # compare observed statistic + #check returned distribution + if k: + assert len(res) == 3 # if keep keep dist, expect to res to be length 3 + assert len(res[2].shape) == 2 # should get 2D dist + assert res[2].shape[1] == num_tests # second D should have same number of elements as number of tests + else: + assert len(res) == 2 + print("finished test " + str(case_count) + " of " + num_cases + " in test_two_sample()") + case_count += 1 + +@pytest.mark.slow +def multitest_test_one_sample(): + # same code as multitest_test_two_sample(), but switched tested function to one sample version + prng = RandomState(42) + num_samples = 200 + num_tests = 2 + # Normal-normal, different means examples + x = prng.normal(1, size=(num_samples,num_tests)) + y = prng.normal(4, size=(num_samples,num_tests)) + expected = ([[1.0, 1.0],[0,0]], [[-3,-3],[-28,-28]]) # define expected probabilites for different alternative hypotheses and observed statistics (not sure why observed statistic is different for one and two sample tests) + plus1 = (True,False) + keep_dist = (True,False) + alternative = ('greater','less','two-sided') + stats = ('mean','t') # TODO add custom stat lambda here + num_cases = str(len(plus1)*len(keep_dist)*len(alternative)*len(stats)) + case_count = 1 + # go through all combinations of parameters + for p in plus1: + for k in keep_dist: + for a in alternative: + for s in stats: + res = multitest_one_sample(x,y,seed=42,reps=10**4,plus1=p,keep_dist=k,alternative=a,stat=s) + # check pvals + if a == 'greater': + np.testing.assert_almost_equal(res[0], expected[0][0], decimal = 1) # compare p vals + elif a == 'less' or a =='two-sided': + np.testing.assert_almost_equal(res[0], expected[0][1], decimal = 1) # compare p vals + # check observed statistic + if s == 'mean': + np.testing.assert_almost_equal(res[1], expected[1][0], decimal = 1) # compare observed statistic + elif s == 't': + np.testing.assert_almost_equal(res[1], expected[1][1], decimal = 0) # compare observed statistic + #check returned distribution + if k: + assert len(res) == 3 # if keep keep dist, expect to res to be length 3 + assert len(res[2].shape) == 2 # should get 2D dist + assert res[2].shape[1] == num_tests # second D should have same number of elements as number of tests + else: + assert len(res) == 2 + print("finished test " + str(case_count) + " of " + num_cases + " in test_one_sample()") + case_count += 1 + + +# @pytest.mark.slow +# def multitest_test_two_sample_shift(): +# f = lambda u: u - 3 +# finv = lambda u: u + 3 +# f_err = lambda u: 2 * u +# f_err_inv = lambda u: u / 2 +# prng = RandomState(42) +# num_samples = 200 +# num_tests = 2 +# # Normal-normal, different means examples +# x = prng.normal(1, size=(num_samples,num_tests)) +# y = prng.normal(4, size=(num_samples,num_tests)) +# expected = ([[1.0, 1.0],[0,0]], [[-3,-3],[-28,-28]]) # define expected probabilites for different alternative hypotheses and observed statistics (not sure why observed statistic is different for one and two sample tests) +# plus1 = (True,False) +# max_correct = (True,False) +# keep_dist = (True,False) +# alternative = ('greater','less','two-sided') +# stats = ('mean','t') # TODO add custom stat lambda here +# shift = (2,(f, finv),(f_err, f_err_inv)) +# num_cases = str(len(plus1)*len(keep_dist)*len(alternative)*len(stats)*len(shift)) +# case_count = 1 +# # go through all combinations of parameters +# for p in plus1: +# for k in keep_dist: +# for a in alternative: +# for s in stats: +# for sh in shift: +# res = multitest_two_sample_shift(x,y,seed=42,shift = sh, plus1=p,keep_dist=k,alternative=a,stat=s) +# # check pvals +# if a == 'greater': +# np.testing.assert_almost_equal(res[0], expected[0][0], decimal = 1) # compare p vals +# elif a == 'less' or a =='two-sided': +# np.testing.assert_almost_equal(res[0], expected[0][1], decimal = 1) # compare p vals +# # check observed statistic +# if s == 'mean': +# np.testing.assert_almost_equal(res[1], expected[1][0], decimal = 1) # compare observed statistic +# elif s == 't': +# np.testing.assert_almost_equal(res[1], expected[1][1], decimal = 0) # compare observed statistic +# #check returned distribution +# if k: +# assert len(res[2].shape) == 2 # if not max correct, should get 2D dist +# assert res[2].shape[1] == num_tests # second D should have same number of elements as number of tests +# else: +# assert len(res) == 2 +# print("finished test " + str(case_count) + " of " + num_cases + " in test_one_sample()") +# case_count += 1 + + +# def test_two_sample_bad_shift(): +# # Break it with a bad shift +# x = np.array(range(5)) +# y = np.array(range(1, 6)) +# shift = lambda u: u + 3 +# pytest.raises(ValueError, multitest_two_sample_shift, x, y, seed=5, shift=shift) + diff --git a/permute/tests/test_multitest_stratified.py b/permute/tests/test_multitest_stratified.py new file mode 100644 index 0000000..36be451 --- /dev/null +++ b/permute/tests/test_multitest_stratified.py @@ -0,0 +1,126 @@ +# -*- coding: utf-8 -*- +""" +Created on Mon Mar 21 20:55:37 2022 + +@author: Clayton +""" + +import numpy as np +from numpy.random import RandomState + +import pytest + +from ..stratified import multitest_stratified_permutationtest as spt +from ..stratified import multitest_stratified_permutationtest_mean as sptm +from ..stratified import multitest_stratified_corrcoef, multitest_stratified_sim_corr, multitest_stratified_two_sample + +def test_multitest_stratified_permutationtest(): + num_tests = 2 + group = np.repeat([1, 2, 3], 9) + condition = np.repeat([1, 2, 3] * 3, 3) + response = np.zeros((group.shape[0],num_tests)) + response[[0, 1, 3, 9, 10, 11, 18, 19, 20],:] = 1 + + res = spt(group, condition, response, reps=1000, seed=42) + res1 = spt(group, condition, response, alternative='less', reps=1000, seed=42) + assert np.all(res[0] < 0.01) + assert np.all(res[1] == res1[1]) + np.testing.assert_almost_equal(res[0], 1-res1[0]) + res2 = spt(group, condition, response, alternative='two-sided', reps=1000, seed=42) + assert np.all(res2[0] < 0.02) + + group = np.array([1, 1, 1]) + condition = np.array([2, 2, 2]) + response = np.zeros((group.shape[0],num_tests)) + res2 = spt(group, condition, response, reps=1000, seed=42) + assert res2 == (1.0, np.nan, None) + + +def test_multitest_stratified_permutationtest_mean(): + num_tests = 2 + group = np.array([1, 2, 1, 2]) + condition = np.array([1, 1, 2, 2]) + response = np.zeros((group.shape[0],num_tests)) + groups = np.unique(group) + conditions = np.unique(condition) + res = sptm(group, condition, response, groups, conditions) + assert np.all(res == (0.0,0.0)) + res2 = sptm(group, condition, response) # check defaults work + assert np.all(res2 == (0.0,0.0)) + + +def test_multitest_stratified_permutationtest_mean_error(): + num_tests = 2 + group = np.array([1, 1, 1]) + condition = np.array([2, 2, 2]) + response = np.zeros((group.shape[0],num_tests)) + groups = np.unique(group) + conditions = np.unique(condition) + pytest.raises(ValueError, sptm, group, condition, response, groups, conditions) + + +def test_multitest_stratified_corrcoef(): + num_tests = 2 + prng = RandomState(42) + x = prng.rand(10,num_tests) + y = x + group = prng.randint(3, size=10) + res1 = multitest_stratified_corrcoef(x, y, group) + res2 = multitest_stratified_corrcoef(x, y, group) + assert np.all(res1 == res2) + + +def test_multitest_stratified_sim_corr(): + num_tests = 2 + prng = RandomState(42) + x = prng.rand(10,num_tests) + y = x + group = prng.randint(0, 3, size=10) + res1 = multitest_stratified_sim_corr(x, y, group, seed=prng, reps=100) + res2 = multitest_stratified_sim_corr(x, y, group, seed=prng, alternative='less', reps=100) + res3 = multitest_stratified_sim_corr(x, y, group, seed=prng, alternative='two-sided', reps=100) + + np.testing.assert_almost_equal(res1[0], 1-res2[0]) + assert np.all(res1[1] == res2[1]) + assert np.all(res1[1] == res3[1]) + assert np.all(2*res1[0] == res3[0]) + + +def test_multitest_stratified_strat_tests_equal(): + num_tests = 2 + group = np.repeat([1, 2, 3], 10) + condition = np.repeat([1, 2] * 3, 5) + response = np.zeros((group.shape[0],num_tests)) + response[[0, 1, 3, 9, 10, 11, 18, 19, 20],:] = 1 + + res1 = spt(group, condition, response, reps=1000, seed=42) + res2 = multitest_stratified_two_sample(group, condition, response, reps=1000, + stat='mean_within_strata', seed=42) + assert np.all(res1[1] == res2[1]) + assert np.all(np.fabs(res1[0]-res2[0]) < 0.05) + +def test_multitest_stratified_two_sample(): + num_tests = 2 + group = np.repeat([1, 2, 3], 10) + condition = np.repeat([1, 2] * 3, 5) + response = np.zeros((group.shape[0],num_tests)) + response[[0, 1, 3, 9, 10, 11, 18, 19, 20],:] = 1 + + res = multitest_stratified_two_sample(group, condition, response, reps=1000, + stat='mean', seed=42) + np.testing.assert_almost_equal(res[0], 0.245, 2) + assert np.all(res[1] == 0.2) + + (p, t, dist) = multitest_stratified_two_sample(group, condition, response, reps=1000, + stat='mean', seed=42, keep_dist=True) + assert np.all(res[0] == p) + assert np.all(res[1] == t) + + stat_fun = lambda u: sptm(group, condition, u, np.unique(group), np.unique(condition)) + res = multitest_stratified_two_sample(group, condition, response, reps=1000, + stat=stat_fun, seed=42) + # below differs from test_stratified because changed dependence of stat to use + # in multitest_stratified_two_sample from number of unique groups to conditions. + # TODO ensure this is appropriate + np.testing.assert_almost_equal(res[0], 0.6733, 3) + np.testing.assert_almost_equal(res[1], -0.2, 3)