import networkx as nx
import math
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import numpy as np
import logging
from pygna import output
[docs]class DegreeModel(object):
"""
This class is used to calculate a degree model
"""
def __init__(self, network_prob: float = 0.5, vip_prob: float = 1, n_nodes: int = 10,
vip_percentage: float = 0.1):
self.n_nodes = n_nodes
self.graph = nx.Graph()
self.network_prob = network_prob
self.vip_prob = vip_prob
self.n_vip = math.ceil(vip_percentage * self.n_nodes)
self.nodes = ["N" + str(i) for i in range(n_nodes)]
self.cluster_dict = {}
[docs] def set_nodes(self, nodes_names: list) -> None:
"""
Set the name of the nodes and calculate the lenght of the list
:param nodes_names: the list with the nodes name
Example
_______
>>> nodes = list("A", "B", "C")
>>> dm = DegreeModel()
>>> dm.set_nodes(nodes)
"""
self.nodes = nodes_names
self.n_nodes = len(nodes_names)
[docs] def create_graph(self) -> None:
"""
Create a graph from the nodes
Example
_______
>>> dm = DegreeModel()
>>> dm.create_graph()
"""
reject = True
logging.info("Reject=" + str(reject))
while reject:
graph = generate_graph_vip(self.n_nodes, self.n_vip, network_prob=self.network_prob, vip_prob=self.vip_prob,
node_names=self.nodes)
LCC = max(nx.connected_components(graph), key=len)
reject = len(LCC) != self.n_nodes
logging.info("Reject=" + str(reject))
logging.info("Nodes: %d, in LCC: %d" % (self.n_nodes, len(LCC)))
self.graph = graph
[docs] def write_network(self, output_file: str) -> None:
"""
Write on file the network as an edge list
:param output_file: the file path where to save the network
Example
_______
>>> dm = DegreeModel()
>>> dm.write_network("myoutputfile.tsv")
"""
self.network_file = output_file
logging.info("Network written on %s" % output_file)
if output_file.endswith(".tsv"):
nx.write_edgelist(self.graph, output_file, data=False, delimiter="\t")
else:
logging.error("output file format unknown")
[docs] def write_genelist(self, output_file: str) -> None:
"""
Write the GMT gene list on a specific file
:param output_file: the file path where to save the gene list
Example
_______
>>> dm = DegreeModel()
>>> dm.write_genelist("myoutput.gmt")
"""
self.genelist_file = output_file
clusters = nx.get_node_attributes(self.graph, "cluster")
for i in set(clusters.values()):
c = "cluster_" + str(i)
self.cluster_dict[c] = {}
self.cluster_dict[c]["descriptor"] = "cluster"
self.cluster_dict[c]["genes"] = [
str(j) for j in clusters.keys() if clusters[j] == i
]
if output_file.endswith(".gmt"):
output.print_GMT(self.cluster_dict, self.genelist_file)
else:
logging.error("output file format unknown")
[docs]def generate_graph_vip(n_nodes: int, n_vip: int, network_prob: float = 0.5, vip_prob: float = 1,
node_names: list = None) -> nx.Graph:
"""
This function creates a graph with n_nodes number of vertices and a matrix block_model that describes the intra e inter-block connectivity.
The nodes_in_block is parameter, list, to control the number of nodes in each cluster
:param n_nodes: number of nodes in the network
:param n_vip: number of VIP to create
:param network_prob: probability of connection in the network
:param vip_prob: probability of connection of the vip
:param node_names: list of nodes for the network
Example
_______
>>> n_nodes = 100
>>> n_vip = 10
>>> network_prob = 0.5
>>> vip_prob = 1
>>> nodes = ["A", "B", "C"]
>>> graph = generate_graph_vip(n_nodes, n_vip, network_prob=network_prob, vip_prob=vip_prob, node_names=nodes)
"""
if not node_names:
node_names = range(n_nodes)
edges = []
G = nx.Graph()
list_temp = [(n_nodes - n_vip) * [0]]
list_temp.append(n_vip * [1])
cluster = np.array([val for sublist in list_temp for val in sublist])
np.random.shuffle(cluster)
prob = [network_prob, vip_prob]
p = [prob[i] for i in cluster]
for i in range(n_nodes):
G.add_node(node_names[i], cluster=cluster[i], prob=p[i])
for i in range(n_nodes):
for j in range(i + 1, n_nodes):
if np.random.binomial(1, p[i]) == 1:
edges.append((node_names[i], node_names[j]))
G.add_edges_from(edges)
return G
[docs]def plot_vip_graph(graph: nx.Graph, output_folder: str = None) -> None:
"""
Plot the VIP graph in the specific folder
:param graph: the graph to plot
:param output_folder: the folder path where to save the file
Example
_______
>>> g = nx.complete_graph(100)
>>> plot_vip_graph(g, "./")
"""
nodes = graph.nodes()
colors = ["#b15928", "#1f78b4"]
cluster = nx.get_node_attributes(graph, "cluster")
labels = [colors[cluster[n]] for n in nodes]
layout = nx.spring_layout(graph)
plt.figure(figsize=(13.5, 5))
plt.subplot(1, 3, 1)
nx.draw(
graph,
nodelist=nodes,
pos=layout,
node_color="#636363",
node_size=50,
edge_color="#bdbdbd",
)
plt.title("Observed network")
plt.subplot(1, 3, 2)
plt.imshow(nx.adjacency_matrix(graph).toarray(), cmap="OrRd")
plt.title("Adjacency Matrix")
plt.subplot(1, 3, 3)
legend = []
for ix, c in enumerate(colors):
legend.append(mpatches.Patch(color=c, label="C%d" % ix))
nx.draw(
graph,
nodelist=nodes,
pos=layout,
node_color=labels,
node_size=50,
edge_color="#bdbdbd",
)
plt.legend(handles=legend, ncol=len(colors), mode="expand", borderaxespad=0)
plt.title("VIP clustering")
plt.savefig(output_folder + "VIP.pdf", bbox_inches="tight")
[docs]def plot_adjacency(graph: nx.Graph, output_folder: str, prefix: str) -> None:
"""
Plot the adjacency matrix on file
:param graph: the graph to plot
:param output_folder: the folder where to save the file
:param prefix: the prefix to give to the file
Example
_______
>>> g = nx.complete_graph(100)
>>> plot_adjacency(g, "./","adj_matrix_")
"""
plt.figure(figsize=(13.5, 5))
plt.subplot(1, 1, 1)
plt.imshow(nx.adjacency_matrix(graph).toarray(), cmap="OrRd")
plt.title("Adjacency Matrix")
plt.savefig(output_folder + prefix + "VIP.png")
def generate_hdn_network(output_folder: 'the output folder path',
prefix:'the prefix of the file to be saved',
n_nodes: 'the number of nodes in the network' = 1000,
network_prob: 'probability of connection in the network' = 0.005,
hdn_probability: 'probability of connection of the HDNs'= 0.3,
hdn_percentage: 'percentage of HDNs' = 0.05,
number_of_simulations: int = 5):
"""
This function generates a simulated network using the VIP model
"""
dm = DegreeModel(
network_prob=network_prob,
vip_prob=hdn_probability,
n_nodes=n_nodes,
vip_percentage=hdn_percentage,
)
for i in range(number_of_simulations):
dm.create_graph()
dm.write_network(output_folder + prefix + "_s_" + str(i) + "_network.tsv")
dm.write_genelist(output_folder + prefix + "_s_" + str(i) + "_genes.gmt")
plot_adjacency(dm.graph, output_folder, prefix=prefix + "_s_" + str(i))
def hdn_add_branching(input_geneset_file:'input geneset containing the sets to be merged',
network_file: 'network filename',
hdn_set:'setname of the HDNs in the geneset'='cluster_1',
general_set:'other setname in the geneset'='cluster_0',
number_of_hdns:'number of seed HDNs to start adding nodes'=3,
number_of_reps: 'number of new genesets created for each condition'=3,
output_geneset_file:'if no output gmt filename is passed, the data is added to the input file'=None,
output_graph_file:'if graphml file is passed the network with labels is written' = None ):
"""
Creates new genesets from the vip list, new genesets are created adding 1 step
nodes to vips. The new genes are created as branches.
"""
if output_geneset_file:
logging.info('output written to: %s ' %output_geneset_file)
else:
output_geneset_file=input_geneset_file
logging.info('output written on the same file as the input %s' %output_geneset_file)
geneset = ps.GMTParser().read(input_geneset_file, read_descriptor=True)
network = ps.__load_network(network_file)
print(type(network))
# If graphml output is set, we create a MultiGraph from the original network
if output_graph_file:
MG = nx.MultiDiGraph()
MG.add_nodes_from(network)
print(MG.nodes())
try:
vips=set(geneset[hdn_set]['genes'])
general=set(geneset[general_set]['genes'])
except:
sys.exit('The vip setname could not be read')
if output_graph_file:
# add annotation
vip_dict = { i:1 if i in vips else 0 for i in list(network.nodes()) }
nx.set_node_attributes(MG, vip_dict, 'vip')
# new_geneset={}
# new_geneset[hdn_set]={}
# new_geneset[hdn_set]['descriptor']='original clusters'
# new_geneset[hdn_set]['genes']=list(vips)
# new_geneset[general_set]={}
# new_geneset[general_set]['descriptor']='original clusters'
# new_geneset[general_set]['genes']=list(general)
new_geneset=geneset
# A geneset is created for each repetition and combination of
# of first layer connections and length of the branch
for rep in range(number_of_reps):
for first_layer in [1]:
for branch_length in [5, 10, 15]:
new_name='branching_'+str(first_layer)+'_'+str(branch_length)+'_'+str(rep)
new_set=[]
for k in range(number_of_hdns):
seed=random.choice(list(vips))
new_set.append(seed)
for fl in range(first_layer):
# From the seed node, get a first connection node
seed_edges=[i[1] for i in network.edges(seed)]
new_node=random.choice(seed_edges)
while new_node in new_set:
new_node=random.choice(seed_edges)
new_set.append(new_node)
#if output_graph_file:
# MG.add_edges_from([(seed,new_node,{'branch':new_name, 'number':0})])
# nx.set_node_attributes(MG, {seed:'1'}, 'in_set')
for br in range(branch_length-1):
new_node_edges=[i[1] for i in network.edges(new_node)]
if len(set(new_node_edges).difference(set(new_set)))>0:
old_node=new_node
new_node=random.choice(list(set(new_node_edges).difference(set(new_set))))
new_set.append(new_node)
kkk=nx.shortest_path_length(network, source=old_node, target=new_node)
if kkk>1:
print('WARNING path length: %d' %kkk)
if output_graph_file:
MG.add_edges_from([(old_node,new_node,{'branch':new_name, 'number':br+1})])
nx.set_node_attributes(MG, {old_node:'1'}, 'in_set')
if output_graph_file:
nx.set_node_attributes(MG, {new_node:'1'}, 'in_set')
new_geneset[new_name]={}
new_geneset[new_name]['descriptor']='branching'
new_geneset[new_name]['genes']=new_set
all_nodes = set()
for k,diz in new_geneset.items():
if diz['descriptor']=='branching':
all_nodes=all_nodes.union(set(diz['genes']) )
for g in diz['genes']:
all_nodes=all_nodes.union(set(network[g].keys()))
ed=[(g,i) for i in network[g].keys()]
if output_graph_file:
MG.add_edges_from(ed)
universe=set(network.nodes())
difference= universe.difference(all_nodes)
out.print_GMT(new_geneset, output_geneset_file)
if output_graph_file:
MG.remove_nodes_from(difference)
nx.write_graphml(MG, output_graph_file+ ".graphml")
def hdn_add_partial(input_geneset_file:'input geneset containing the sets to be merged',
hdn_set:'setname of the HDNs in the geneset'='cluster_1',
general_set:'other setname in the geneset'='cluster_0',
reps: 'number of new genesets made of part of vips' = 3,
percentage_partial_vips: 'percentage of HDNs in the new geneset' = '0.1,0.2,0.5',
output_geneset_file:'if no output gmt filename is passed, the data is added to the input file'=None):
"""
Creates new genesets from the vip list, number of genesets and portion of
genes can be specified by input.
"""
if type(percentage_partial_vips)==str:
percentage_partial_vips=percentage_partial_vips.replace('[','').replace(']','').replace(' ','')
percentage_partial_vips=[float(i) for i in percentage_partial_vips.split(',')]
if output_geneset_file:
logging.info('output written to: %s ' %output_geneset_file)
else:
output_geneset_file=input_geneset_file
logging.info('output written on the same file as the input %s' %output_geneset_file)
geneset = ps.GMTParser().read(input_geneset_file, read_descriptor=True)
try:
vips=set(geneset[hdn_set]['genes'])
general=set(geneset[general_set]['genes'])
except:
sys.exit('The vip setname could not be read')
# Genesets made of part of the vips
for perc in percentage_partial_vips:
for rep in range(reps):
new_name='Partial_vip_'+str(perc)+'_'+str(rep)
new_set=np.random.choice(list(vips), int(perc*len(vips)))
geneset[new_name]={}
geneset[new_name]['descriptor']='partial vip list'
geneset[new_name]['genes']=list(new_set)
out.print_GMT(geneset, output_geneset_file)
def hdn_add_extended(input_geneset_file:'input geneset containing the sets to be merged',
hdn_set:'setname of the HDNs in the geneset'='cluster_1',
general_set:'other setname in the geneset'='cluster_0',
reps: 'number of new genesets made of part of HDNs' = 3,
percentage_extended_vips: 'percentage of HDNs in the new genesett' = '[0.2]',
ratio_others: 'ratio of genes to add to HDNs' = '[2,2.5,3,4]',
output_geneset_file:'if no output gmt filename is passed, the data is added to the input file'=None):
"""
Creates new genesets from the vip list, number of genesets and portion of genes
can be specified by input. The final new geneset is going to be formed by:
percentage ev*HDN_total + ratio*percentage ev*vips total.
"""
if type(percentage_extended_vips)==str:
percentage_extended_vips=percentage_extended_vips.replace('[','').replace(']','').replace(' ','')
percentage_extended_vips=[float(i) for i in percentage_extended_vips.split(',')]
if type(ratio_others)==str:
ratio_others=ratio_others.replace('[','').replace(']','').replace(' ','')
ratio_others=[float(i) for i in ratio_others.split(',')]
if output_geneset_file:
logging.info('output written to: %s ' %output_geneset_file)
else:
output_geneset_file=input_geneset_file
logging.info('output written on the same file as the input %s' %output_geneset_file)
geneset = ps.GMTParser().read(input_geneset_file, read_descriptor=True)
try:
vips=set(geneset[hdn_set]['genes'])
general=set(geneset[general_set]['genes'])
except:
sys.exit('The vip setname could not be read')
# VIPs and other genes
for rep in range(reps):
for perc_ext in percentage_extended_vips:
new_vips=np.random.choice(list(vips), int(perc_ext*len(vips)))
for add in ratio_others:
new_name='Extend_vip_'+str(perc_ext)+'_'+str(add)+'_'+str(rep)
# Add a number of genes smaller than the total of generals
new_genes_number=min(len(general), int(add*np.ceil(perc_ext*len(vips))) )
new_genes=np.random.choice(list(general),new_genes_number )
new_set=set(new_vips).union(set(new_genes))
geneset[new_name]={}
geneset[new_name]['descriptor']='extended vip and other genes list'
geneset[new_name]['genes']=list(new_set)
out.print_GMT(geneset, output_geneset_file)