import os
import re
import numpy as np
import pandas as pd
from cobra import core
from ast import And, BoolOp, Expression, Name, Or
from tqdm.auto import tqdm
from .utils import make_pickle, make_csvs
[docs]
def get_GPR_reactions(model):
"""get_GPR_reactions.
Parameters
----------
model : cobra model
A cobra object, loaded with the cobra library
Returns
-------
pandas dataframe
a dataframe with all the reactions in the model having a GPR
"""
gpr = pd.DataFrame([], columns=['Reaction ID','Reaction Name','GPR'])
for r in range(len(model.reactions)):
reaction = model.reactions[r]
if reaction.gene_reaction_rule != '':
line = [reaction.id,reaction.name,reaction.gene_reaction_rule]
gpr.loc[len(gpr)] = line
return gpr
[docs]
def eval_gpr_activity(expr, gh, gl):
"""eval_gpr_activity.
This is an adaptation of the eval_gpr function available in cobrapy.
Instead of evaluating if a GPR is active according to a list of knockout
genes, it will evaluate if the GPR is regulated by a list of genes
Exemple of usage :
provide the list of Highly expressed genes and Lowly expressed genes
Return all expressions that are True, thus Highly expressed
evaluate compiled ast of gene_reaction_rule with list of active genes
Parameters
----------
expr : Expression
The ast of the gene reaction rule
gh : list
list of highly expressed genes
gl : list
list of lowly expressed genes
Returns
-------
bool
True if the reaction is active with the given gh and gl lists
otherwise false
"""
if isinstance(expr, Expression):
return eval_gpr_activity(expr.body, gh, gl)
elif isinstance(expr, Name):
if expr.id in gh:
return 1
elif expr.id in gl:
return -1
else:
return 0
elif isinstance(expr, BoolOp):
op = expr.op
if isinstance(op, Or):
return max(eval_gpr_activity(i, gh, gl) for i in expr.values)
elif isinstance(op, And):
return min(eval_gpr_activity(i, gh, gl) for i in expr.values)
else:
raise TypeError("unsupported operation " + op.__class__.__name__)
elif expr is None:
return False
else:
raise TypeError("unsupported operation " + repr(expr))
[docs]
def find_reactions_expression_levels(gprs,gh,gl):
"""find_reactions_expression_levels.
Parameters
----------
gprs : cobra gpr
A cobra gpr object, obtained from a cobra model
gh : list
list of highly expressed genes
gl : list
list of lowly expressed genes
Returns
-------
tuple
a tuple of three list.
rh, is the list of reactions identified as active according to gene expression data
rl, is the list of reactions identified as inactive according to gene expression data
rn, is the list of reactions not constrained by gene expression data
"""
rh = []
rl = []
rn = []
for i in range(len(gprs)):
res = eval_gpr_activity(core.gene.GPR().from_string(gprs.iloc[i].GPR).body,gh,gl)
if res > 0:
rh.append(gprs.iloc[i][0])
elif res < 0:
rl.append(gprs.iloc[i][0])
else:
rn.append(gprs.iloc[i][0])
return rh,rl,rn
[docs]
def get_reactions_ids(model):
"""get_reactions_ids.
Parameters
----------
model : cobra model
A cobra object, loaded with the cobra library
Returns
-------
pandas dataframe
a dataframe with all the reactions and eventually their GPR
"""
all_reactions_ids = pd.DataFrame([],columns=['Reaction ID','Reaction Name','GPR'])
for r in range(len(model.reactions)):
reaction = model.reactions[r]
if reaction.gene_reaction_rule == '':
line = [reaction.id,reaction.name,'No GPR']
all_reactions_ids.loc[len(all_reactions_ids)] = line
else:
line = [reaction.id,reaction.name,reaction.gene_reaction_rule]
all_reactions_ids.loc[len(all_reactions_ids)] = line
return all_reactions_ids
[docs]
def get_gene_list(model):
"""get_gene_list.
Parameters
----------
model : cobra model
A cobra object, loaded with the cobra library
Returns
-------
list
a list containing all the genes in the model
"""
gene_list = []
for gene in model.genes:
gene_list.append(gene.id)
return gene_list
[docs]
def identify_model_gene_ids(model):
"""identify_model_gene_ids.
Parameters
----------
model : cobra model
A cobra object, loaded with the cobra library
Returns
-------
str
The type of gene identifiers used in the model
"""
model_genes = get_gene_list(model)
if 'HGNC:' in model_genes[0]:
return 'HGNC ID'
elif 'ENSG' in model_genes[0]:
return 'Ensembl gene ID'
elif '_AT' in model_genes[0]:
return 'NCBI Gene ID'
else:
return 'model not implemented'
[docs]
def fullname_equation(reaction):
"""fullname_equation.
Parameters
----------
reaction : cobra reaction
A cobra reaction object, obtained from a cobra model
Returns
-------
str
the reaction with complete metabolites names
"""
reaction_metabolites = list(reaction.metabolites)
#generate metabolites dict
metabolites_dict = {}
for met in reaction_metabolites:
metabolites_dict[met.id] = met.name
splitted_equation = reaction.reaction.split(' ')
fullname_equation = ""
for elem in splitted_equation:
if re.match(".*\[.\].*",elem):
fullname_equation = fullname_equation+metabolites_dict[elem]+" "
else:
fullname_equation = fullname_equation+elem+" "
return fullname_equation
[docs]
def map_single_column(data,hgnc_data,col_to_add):
"""map_single_column.
Parameters
----------
data : pandas dataframe
a gene expression dataset
hgnc_data : pandas dataframe
a hgnc database annotation dataframe
col_to_add : str
the name of the identifier column to map
Returns
-------
pandas dataframe
the gene expressed dataframe with the mapped identifier added
"""
mapped_ids = []
genes = data['ENTREZID']
for i in range(len(genes)):
id = hgnc_data.loc[hgnc_data['NCBI Gene ID'] == str(genes[i])][col_to_add]
if len(id) == 0:
mapped_ids.append('NA')
else:
mapped_ids.append(id.iloc[0])
if len(mapped_ids) == len(data):
data.insert(2,col_to_add,mapped_ids)
return data
[docs]
def find_high_low_exprs(uarray_data,threshold_dw_perc=25,threshold_up_perc=75):
"""find_high_low_exprs.
Parameters
----------
uarray_data : pandas dataframe
Description of parameter `uarray_data`.
threshold_dw_perc : int
The percentile below which we consider that genes are not expresed.
threshold_up_perc : int
The percentile above which we consider that genes are highly expressed.
Returns
-------
list
Return list of highly/lowly expressed gene .
"""
# threshold_dw_perc : Low expression percentile
# threshold_up_perc : High expression percentile
if pd.DataFrame(uarray_data).shape[1] > 1:
raise ValueError("More than one column provided, check duplicates")
if (0 <= threshold_up_perc <= 100) & (0 <= threshold_dw_perc <= 100) & \
(threshold_dw_perc < threshold_up_perc):
threshold_dw = np.percentile(uarray_data,threshold_dw_perc)
threshold_up = np.percentile(uarray_data,threshold_up_perc)
low_exprs = uarray_data.loc[uarray_data.iloc[:,] <= threshold_dw]
high_exprs = uarray_data.loc[uarray_data.iloc[:,] >= threshold_up]
return [high_exprs,low_exprs]
[docs]
def preprocess_data(data,gene_id_col,model,pickle_path="",csvs_path="", threshold_dw_perc = 25, threshold_up_perc = 75):
"""preprocess_data.
Parameters
----------
data : pandas dataframe
a gene expression dataset
gene_id_col : str
the name of the gene identifier column
model : cobra model
A cobra object, loaded with the cobra library
pickle : str
if not empty, write pkl files containing categorized reactions activity at given location
csvs : str
if not empty, write csv files containing categorized reactions activity at given location
threshold_dw_perc : int
The percentile below which we consider that genes are not expresed.
threshold_up_perc : int
The percentile above which we consider that genes are highly expressed.
Returns
-------
Write a csv or a pickle with for categorized reactions activity
"""
pbar = tqdm(total=len(data.filter(like=".CEL",axis=1).columns))
suffix = '_rh_rl_zscores_'+str(threshold_dw_perc)+'_'+str(threshold_up_perc)
gprs = get_GPR_reactions(model)
for i in range(len(data.columns)):
uarray = data.columns[i]
if ('.CEL' in uarray):
uarray_data = data[uarray]
uarray_data.index = data[gene_id_col]
gh,gl = find_high_low_exprs(uarray_data,threshold_dw_perc,threshold_up_perc)
rh,rl,rn = find_reactions_expression_levels(gprs,gh,gl)
if pickle_path != "":
picklef = uarray+suffix
if os.path.exists(pickle_path):
make_pickle([rh,rl,rn],pickle_path+'/'+picklef+'.pkl')
else:
os.mkdir(pickle_path)
make_pickle([rh,rl,rn],pickle_path+'/'+picklef+'.pkl')
if csvs_path != "":
csvsf = uarray+suffix
if os.path.exists(csvs_path):
make_csvs([rh,rl,rn],csvs_path+"/",csvsf)
else:
os.mkdir(csvs_path)
make_csvs([rh,rl,rn],csvs_path+"/",csvsf)
pbar.update(1)