Source code for pygna.output

import logging
import networkx as nx
import os
import numpy as np
import statsmodels.stats.multitest as multi
import pandas as pd
import scipy
import tempfile
import shutil


[docs]class Output: """ This class is used to print different data on files """ def __init__(self, network_filename: str, output_table_results_file: str, analysis: str, geneset_file: str, setnames: list, geneset_file_B: str = None, setnames_B: list = None): """ :param network_filename: the file containing the network :param output_table_results_file: the output table that contains the results to use :param analysis: the type of analysis performed :param geneset_file: the geneset file use :param setnames: the names of the first geneset :param geneset_file_B: the second geneset file to use :param setnames_B: the names of the second geneset """ self.network_filename = network_filename self.analysis = analysis self.output_table_results = output_table_results_file self.output_gmt = None self.text = [] self.geneset_filename = geneset_file self.setnames = setnames self.geneset_filename_B = geneset_file_B self.setnames_B = setnames_B self.diffusion_matrix_file = None self.GMT_dict = {} if not self.output_table_results.endswith('.csv'): logging.warning('The output table is saved as csv file, the name does not match the file extension')
[docs] def set_diffusion_matrix(self, diffusion_matrix_file: str) -> None: """ Set the diffusion matrix file :param diffusion_matrix_file: set the diffusion matrix file to use Example ______ >>> setnames = ["A", "B", "C"] >>> out = Output("networkfile.tsv", "results.csv", "myanalysis", "genset_a.csv", setnames) >>> out.set_diffusion_matrix("diffusion_matrix.csv") """ self.diffusion_matrix_file = diffusion_matrix_file
# Tables for stats
[docs] def create_st_table_empirical(self) -> None: """ Create the headings of the table that are going to be wrinnte in the csv file Example _______ >>> setnames = ["A", "B", "C"] >>> out = Output("networkfile.tsv", "results.csv", "myanalysis", "genset_a.csv", setnames) >>> out.create_st_table_empirical() """ tmp = tempfile.NamedTemporaryFile(mode='w+t', delete=False) self.table_file_name = tmp.name try: tmp.write("analysis,setname,n_mapped,n_geneset,number_of_permutations,observed,empirical_pvalue,mean(null)," "var(null),network,geneset\n") finally: tmp.close()
[docs] def close_temporary_table(self) -> None: """ Remove the temporary file Example _______ >>> setnames = ["A", "B", "C"] >>> out = Output("networkfile.tsv", "results.csv", "myanalysis", "genset_a.csv", setnames) >>> out.create_st_table_empirical() >>> out.close_temporary_table() """ shutil.copy(self.table_file_name, self.output_table_results) os.remove(self.table_file_name)
[docs] def update_st_table_empirical(self, setname: str, n_mapped: int, n_geneset: int, number_of_permutations: int, observed: int, empirical_pvalue: float, mean_null: np.mean, var_null: np.var) -> None: """ Update the table content, adding a new line to the file :param setname: the name of the geneset :param n_mapped: the number of mapped genes :param n_geneset: the number of genesets :param number_of_permutations: the number of permutations :param observed: value of observed genes :param empirical_pvalue: value of the empirical p-value :param mean_null: mean of the null distribution :param var_null: var of the null distribution Example _______ >>> setnames = ["A", "B", "C"] >>> out = Output("networkfile.tsv", "results.csv", "myanalysis", "genset_a.csv", setnames) >>> out.create_st_table_empirical() >>> out.update_st_table_empirical(setname, n_mapped, n_geneset, number_of_permutations, observed, pvalue=0.001, mean_null=np.mean(0.11), var_null=np.var(0.2)) """ setname = setname.replace(",", "_") with open(self.table_file_name, "a") as f: f.write(",".join([str(x) for x in [self.analysis, setname, n_mapped, n_geneset, number_of_permutations, observed, empirical_pvalue, mean_null, var_null, self.network_filename, self.geneset_filename]]) + "\n")
# Tables for comparisons
[docs] def create_comparison_table_empirical(self) -> None: """ Write the hadings for the comparison table Example _______ >>> setnames = ["A", "B", "C"] >>> out = Output("networkfile.tsv", "results.csv", "myanalysis", "genset_a.csv", setnames) >>> out.create_comparison_table_empirical() """ tmp = tempfile.NamedTemporaryFile(mode='w+t', delete=False) self.table_file_name = tmp.name try: tmp.write("analysis,setname_A,setname_B,n_geneset_A,n_mapped_A,n_geneset_B,n_mapped_B,n_overlaps," "number_of_permutations,observed,empirical_pvalue,mean(null),var(null),network\n") finally: tmp.close()
[docs] def update_comparison_table_empirical(self, setname_A: str, setname_B: str, n_geneset_A: int, n_mapped_A: int, n_geneset_B: int, n_mapped_B: int, n_overlaps: int, number_of_permutations: int, observed: int, empirical_pvalue: float, mean_null: np.mean, var_null: np.var) -> None: """ Update the content of the comparison table, adding a new row on the file :param setname_A: the name of the geneset A :param setname_B: the name of the geneset B :param n_geneset_A: the number of genes in the geneset A :param n_mapped_A: the number of mapped genes in geneset A :param n_geneset_B: the number of genes in the geneset B :param n_mapped_B: the number of mapped genes in geneset B :param n_overlaps: the number of overlaps :param number_of_permutations: number of performed permutations :param observed: number of observed genes :param empirical_pvalue: value of the empirical pvalue :param mean_null: mean of the null distribution :param var_null: variance of the null distribution Example _______ >>> import itertools >>> import pygna.command as cm >>> import pygna.reading_class as rc >>> import pygna.statistical_comparison as sc >>> geneset_a = rc.ReadGmt("genset_file").get_geneset("brca") >>> setnames = [key for key in geneset_a.keys()] >>> network = rc.ReadTsv("network_file.tsv").get_network() >>> distance_matrix_filename = "distance_matrix.tsv" >>> in_memory = True >>> network = nx.Graph(network.subgraph(max(nx.connected_components(network), key=len))) >>> sp_diz = {"nodes": cm.read_distance_matrix(distance_matrix_filename, in_memory=in_memory)[0], ... "matrix": cm.read_distance_matrix(distance_matrix_filename, in_memory=in_memory)[1]} >>> st_comparison = sc.StatisticalComparison(sc.comparison_shortest_path, network, diz=sp_diz, n_proc=2) >>> out = Output("networkfile.tsv", "results.csv", "myanalysis", "genset_a.csv", setnames) >>> for pair in itertools.combinations(setnames, 2): ... observed, pvalue, null_d, a_mapped, b_mapped = st_comparison.comparison_empirical_pvalue(set(geneset_a[pair[0]]), set(geneset_a[pair[1]]), max_iter=number_of_permutations) ... out.update_comparison_table_empirical(pair[0], pair[1], len(set(geneset_a[pair[0]])), a_mapped, len(set(geneset_a[pair[1]])), b_mapped, n_overlaps, number_of_permutations, observed, pvalue, np.mean(null_d), np.var(null_d)) """ setname_A = setname_A.replace(",", "_") setname_B = setname_B.replace(",", "_") with open(self.table_file_name, "a") as f: f.write(",".join([str(x) for x in [self.analysis, setname_A, setname_B, n_geneset_A, n_mapped_A, n_geneset_B, n_mapped_B, n_overlaps, number_of_permutations, observed, empirical_pvalue, mean_null, var_null, self.network_filename]]) + "\n")
[docs] def add_GMT_entry(self, key: str, descriptor: str, gene_list: str) -> None: """ Add a gmt entry in the GMT file :param key: the key name to store :param descriptor: the descriptor of the gene list :param gene_list: the gene list to write Example _______ >>> geneset = rc.ReadGmt("geneset_file.csv").get_geneset("brca") >>> setnames = [key for key in geneset.keys()] >>> import pygna.reading_class as rc >>> network = rc.ReadTsv("network_file.tsv").get_network() >>> out = Output("networkfile.tsv", "results.csv", "myanalysis", "genset_a.csv", setnames) >>> for setname, item in geneset.items(): ... item = set(item) ... module = nx.subgraph(network, item) ... lcc = sorted(list(nx.connected_components(module)), key=len, reverse=True)[0] ... out.add_GMT_entry("brca", "topology_module", lcc) """ try: self.GMT_dict[key] except KeyError: self.GMT_dict[key] = {} self.GMT_dict[key]["descriptor"] = descriptor self.GMT_dict[key]["genes"] = gene_list else: logging.warning("Key Already Exists: " + str(key))
[docs] def create_GMT_output(self, output_gmt: str) -> None: """ Write the GMT line on the GMT file :param output_gmt: the GMT to print Example _______ >>> geneset = rc.ReadGmt("geneset_file.csv").get_geneset("brca") >>> setnames = [key for key in geneset.keys()] >>> import pygna.reading_class as rc >>> network = rc.ReadTsv("network_file.tsv").get_network() >>> out = Output("networkfile.tsv", "results.csv", "myanalysis", "genset_a.csv", setnames) >>> for setname, item in geneset.items(): ... item = set(item) ... module = nx.subgraph(network, item) ... lcc = sorted(list(nx.connected_components(module)), key=len, reverse=True)[0] ... out.add_GMT_entry("brca", "topology_module", lcc) >>> out.create_GMT_output("output_lcc.gmt") """ self.output_gmt = output_gmt print_GMT(self.GMT_dict, self.output_gmt)
[docs]def apply_multiple_testing_correction(table_file: str, pval_col: str = "empirical_pvalue", method: str = "fdr_bh", threshold: float = 0.1) -> None: """ Apply the multiple testing correction and save the file in a csv file :param table_file: the name of the file to read :param pval_col: the name column containing the empirical pvalue :param method: the correction method to use :param threshold: the threshold to use in the method Example _______ >>> table_filename = "pygna_comparison_results.csv" >>> apply_multiple_testing_correction(table_filename, pval_col="empirical_pvalue", method="fdr_bh", threshold=0.1) """ with open(table_file, "r+") as f: table = pd.read_csv(f) rejects, pval, k, bonf = multi.multipletests(table[pval_col].values, alpha=float(threshold), method=method) table["rejects"] = rejects table["bh_pvalue"] = pval table["k"] = k table["bonf"] = bonf table = table.sort_values(by="bh_pvalue") table.to_csv(table_file, index=False)
[docs]def write_graph_summary(graph: nx.Graph, output_file: str, net_name: str = None) -> None: """ This function takes a graph as input and writes the network properties in a text file :param graph: the graph to print :param output_file: the name of the file to print :param net_name: the name of the network Example _______ >>> import pygna.reading_class as rc >>> text_output = "My summary stats" >>> network = rc.ReadTsv("mynetwork.tsv").get_network() >>> write_graph_summary(network, text_output, "mynetwork.tsv") """ if not net_name: net_name = 'network' D = dict(nx.degree(graph)) degree = np.array(list(dict(nx.degree(graph)).values())) n_nodes = nx.number_of_nodes(graph) n_edges = nx.number_of_edges(graph) degrees = {k: v for k, v in D.items()} degrees = sorted(degrees.items(), key=lambda kv: kv[1]) density = (2 * n_edges) / ((n_nodes) * (n_nodes - 1)) with open(output_file, "w") as file1: file1.write("Network Summary for %s " % str(net_name)) file1.write("\n---------------------------------------------------\n") file1.write("\nInfo: " + nx.info(graph)) file1.write("\nOther Properties::\n ") file1.write("\n\t- Density: " + str(density)) file1.write("\n\t- min degree = " + str(np.min(degree))) file1.write("\n\t- max degree = " + str(np.max(degree))) file1.write("\n\t- median degree = " + str(np.median(degree))) file1.write("\n\t- degree mode = " + str(scipy.stats.mode(degree))) file1.write("\n\t- disconnected nodes = " + str(np.sum(degree == 0))) file1.write("\n\t- average clustering" + str(nx.average_clustering(graph))) largest_cc = nx.Graph(graph.subgraph(max(nx.connected_components(graph), key=len))) D = dict(nx.degree(largest_cc)) degree = np.array(list(dict(nx.degree(largest_cc)).values())) n_nodes = nx.number_of_nodes(largest_cc) n_edges = nx.number_of_edges(largest_cc) degrees = {k: v for k, v in D.items()} degrees = sorted(degrees.items(), key=lambda kv: kv[1]) density = (2 * n_edges) / ((n_nodes) * (n_nodes - 1)) with open(output_file, "a") as file1: file1.write("\n\nLargest Connected Component Summary for %s \n " % str(net_name)) file1.write( "-----------------------------------------------------------------\n" ) file1.write("\nInfo: " + nx.info(largest_cc)) file1.write("\nLargest Connected Component::\n") file1.write("\n\t- Density: " + str(density)) file1.write("\n\t- min degree = " + str(np.min(degree))) file1.write("\n\t- max degree = " + str(np.max(degree))) file1.write("\n\t- median degree = " + str(np.median(degree))) file1.write("\n\t- degree mode = " + str(scipy.stats.mode(degree))) file1.write("\n\t- disconnected nodes = " + str(np.sum(degree == 0))) file1.write("\n\t- average clustering" + str(nx.average_clustering(largest_cc)))