Source code for pygna.statistical_comparison

import logging
import networkx as nx
import numpy as np
import sys
import multiprocessing
import pygna.statistical_test as st
from pygna.utils import get_sampling_p, get_node_bins_map

[docs]class StatisticalComparison: """ This class implements the statistical analysis comparison between two genesets. Please refer to the single method documentation for the returning values """ def __init__(self, comparison_statistic, network: nx.Graph, n_proc: int = 1, diz: dict = {}, degree_bins = 1): """ :param comparison_statistic: the method do be applied to perform the statistical analysis :param network: the network to be used during the analysis :param n_proc: the number of CPU used to perform the elaboration :param diz: the dictionary used for the genesets """ self.__comparison_statistic = comparison_statistic self.__network = network self.__diz = diz self.__n_proc = n_proc self.degree_bins = degree_bins self.node_2_bin_map = None self.bins = None if (type(self.__network) is nx.Graph) or (type(self.__network) is nx.DiGraph): self.__universe = list(set(self.__network.nodes())) elif type(self.__network) is dict: self.__universe = list(set(self.__network.keys())) else: logging.error("Unknown network type: %s" % type(self.__network)) sys.exit(-1) # If number of bins is specified, the mapping is generated if (degree_bins>1): if type(self.__network) is dict: logging.error('Cannot do degree corrected sampling without the whole network') degree= list(self.__network.degree(nbunch = self.__universe)) [nn,dd] = list(zip(*degree)) # We make sure that later the universe corresponds to the mapping self.__universe = list(nn) histogram, bins_edge = np.histogram(list(dd), bins = degree_bins, density = False) self.bins = {int(i):{'nk':histogram[i], 'range':(bins_edge[i],bins_edge[i+1])} for i in range(len(histogram))} self.bins[len(self.bins.keys())-1]['range'] = (self.bins[len(self.bins.keys())-1]['range'][0], self.bins[len(self.bins.keys())-1]['range'][1]+1) self.node_2_bin_map = get_node_bins_map(degree, self.bins)
[docs] def comparison_empirical_pvalue(self, genesetA: set, genesetB: set, alternative: str = "less", max_iter: int = 100, keep: bool = False) -> [int, float, float, int, int]: """ Calculate the empirical value between two genesets :param genesetA: the first geneset to compare :param genesetB: the second geneset to compare :param alternative: the pvalue selection of the observed genes :param max_iter: the maximum number of iterations :param keep: if the geneset B should not be kept :return observed, pvalue, null_distribution, len(mapped_genesetA), len(mapped_genesetB): the list with the data calculated """ # mapping genesets mapped_genesetA = sorted(list(set(genesetA) & set(self.__universe))) mapped_genesetB = sorted(list(set(genesetB) & set(self.__universe))) logging.info("Mapped %d genes out of %d from genesetA" % (len(mapped_genesetA), len(genesetA))) logging.info("Mapped %d genes out of %d from genesetB" % (len(mapped_genesetB), len(genesetB))) if (len(mapped_genesetA) == 0) or (len(mapped_genesetB) == 0): return 0, 1, np.array([0]), 0, 0 else: sampling_p_a = None sampling_p_b = None if self.degree_bins > 1: # if more than one bin is specified, the sampling probability is generated sampling_p_a = get_sampling_p(mapped_genesetA, self.__network, self.bins, self.node_2_bin_map) if (~keep): sampling_p_b = get_sampling_p(mapped_genesetB, self.__network, self.bins, self.node_2_bin_map) # Observed statistical values observed = self.__comparison_statistic(self.__network, mapped_genesetA, mapped_genesetB, self.__diz) # iterations null_distribution = StatisticalComparison.get_comparison_null_distribution_mp(self, mapped_genesetA, mapped_genesetB, max_iter, keep, sampling_p_a = sampling_p_a, sampling_p_b = sampling_p_b, ) pvalue = 1 if alternative == "greater": pvalue = (np.sum(null_distribution >= observed) + 1) / (float(len(null_distribution) + 1)) else: pvalue = (np.sum(null_distribution <= observed) + 1) / (float(len(null_distribution) + 1)) return observed, pvalue, null_distribution, len(mapped_genesetA), len(mapped_genesetB)
[docs] def get_comparison_null_distribution_mp(self, genesetA: list, genesetB: list, max_iter: int = 100, keep: bool = False, sampling_p_a = None, sampling_p_b = None) -> np.ndarray: """ Calculate the null distribution between two genesets with multiple CPUs :param genesetA: the first geneset to compare :param genesetB: the second geneset to compare :param max_iter: maximum number of iteration to perform :param keep: if the geneset B should not be kept :param sampling_p_a: random sampling probability for geneset a :param sampling_p_b: random sampling probability for geneset b :return: the array with null distribution """ n_trial = int(max_iter / self.__n_proc) logging.info("n_proc = %d, each computing %d permutations " % (int(self.__n_proc), n_trial)) # Don't set up the multicore architecture if only one core is given if self.__n_proc == 1: null_distribution = StatisticalComparison.get_comparison_null_distribution(self, genesetA, genesetB, max_iter, keep, sampling_p_a = sampling_p_a, sampling_p_b = sampling_p_b) else: p = multiprocessing.Pool(self.__n_proc) results = [p.apply_async(StatisticalComparison.get_comparison_null_distribution, args=(self, genesetA, genesetB, n_trial, keep, sampling_p_a, sampling_p_b)) for w in list(range(1, self.__n_proc + 1))] null_distribution = np.array([]) for r in results: null_distribution = np.hstack((null_distribution, np.array(r.get()))) p.close() return np.asarray(null_distribution)
[docs] def get_comparison_null_distribution(self, genesetA: list, genesetB: list, n_samples: int, keep: bool, sampling_p_a = None, sampling_p_b = None) -> list: """ Calculate the null distribution between two genesets with single CPU :param genesetA: the first geneset to compare :param genesetB: the second geneset to compare :param n_samples: the number of samples to be taken :param keep: if the geneset B should not be kept :return: the random distribution calculated """ np.random.seed() random_dist = [] # association statistic, B kept, A bootstrapped if keep: for i in range(n_samples): random_sample_A = np.random.choice(self.__universe, len(genesetA), replace=False, p = sampling_p_a) random_dist.append(self.__comparison_statistic(self.__network, set(random_sample_A), set(genesetB), self.__diz)) # association statistic, B kept, A bootstrapped else: for i in range(n_samples): random_sample_A = np.random.choice(self.__universe, len(genesetA), replace=False, p = sampling_p_a) random_sample_B = np.random.choice(self.__universe, len(genesetB), replace=False, p = sampling_p_b) random_dist.append(self.__comparison_statistic(self.__network, set(random_sample_A), set(random_sample_B), self.__diz)) return random_dist
############################################################################### ### TEST STATISTICS FOR COMPARISONS ######################################### ###############################################################################
[docs]def comparison_shortest_path(network: nx.Graph, genesetA: set, genesetB: set, diz: dict) -> float: """ Evaluate the shortest path between two genesets :param network: the graph representing the network :param genesetA: the first geneset list :param genesetB: the second geneset list :param diz: the dictionary containing the nodes name and index """ n = np.array([diz["nodes"].index(i) for i in genesetA]) m = np.array([diz["nodes"].index(i) for i in genesetB]) if len(n) == 0 or len(m) == 0: logging.info("Geneset length is equal to 0") sys.exit() cum_sum = calculate_sum(n, m, diz) cum_sum += calculate_sum(m, n, diz) d_AB = cum_sum / float(len(genesetA) + len(genesetB)) d_A = st.geneset_localisation_statistic(network, genesetA, diz) d_B = st.geneset_localisation_statistic(network, genesetB, diz) return d_AB - (d_A + d_B) / 2
[docs]def calculate_sum(n: np.ndarray, m: np.ndarray, diz: dict) -> np.ndarray: """ Evaluate the sum of the columns of two matrices :param n: the first column :param m: the second column :param diz: the dictionary containing the data """ diz = diz["matrix"] sub_matrix = diz[n[:, None], m] sub_matrix = np.where(sub_matrix != np.inf, sub_matrix, 0) min_columns = np.amin(sub_matrix, axis=0) sum_columns = np.sum(min_columns) return sum_columns
[docs]def comparison_random_walk(network: nx.Graph, genesetA: list, genesetB: list, diz: dict = {}) -> float: """ Evaluate the random walk on two genesets :param network: the graph representing the network :param genesetA: the first geneset list :param genesetB: the second geneset list :param diz: the dictionary containing the nodes name and index """ try: diz["matrix"] except KeyError: print("The dictionary doesnt have a matrix key") raise genesetA_index = [diz["nodes"].index(i) for i in genesetA] genesetB_index = [diz["nodes"].index(i) for i in genesetB] if len(genesetA_index) == 0 or len(genesetB_index) == 0: sys.exit() probAB = [diz["matrix"][i, j] for i in genesetA_index for j in genesetB_index] probBA = [diz["matrix"][i, j] for i in genesetB_index for j in genesetA_index] prob = np.sum(probAB) + np.sum(probBA) return prob