Source code for manamodeller.results_processing

import os
import re
import glob
import pandas as pd

from multiprocessing import JoinableQueue
from .utils import launch_multi_proc

[docs] def concatenate_reaction_div_enum(path_concat_rxn_enum,path_concat_div_enum, out_dir,col_index="",single_csv=False,ncpus=1): """concatenate_reaction_div_enum. Parameters ---------- path_concat_rxn_enum : str path to the concatenated reaction enum directory path_concat_div_enum : str path to the concatenated diversity enum directory out_dir : str path to the csvs output directory col_index : str column name of a column to be used as index (optional) single_csv : boolean option for the concatenate_csv function, if True all solutions will be stored in a single csv file ncpus : int the number of cpus allocated, will enable parallel processing Returns ------- a JoinableQueue Object """ #source : https://www.freecodecamp.org/news/how-to-combine-multiple- #csv-files-with-8-lines-of-code-265183e0854/ q = JoinableQueue() list_dir = glob.glob(path_concat_rxn_enum+"/*.csv") for rxn_enum_file in list_dir: files = glob.glob(path_concat_div_enum+'/'+os.path.basename(rxn_enum_file).split('_')[0]+'*_solutions.csv') files.append(rxn_enum_file) if ncpus == 1: concatenate_csv(files,out_dir,col_index,single_csv) else: q.put((concatenate_csv,(files,out_dir,col_index,single_csv))) if ncpus == 1: pass else: launch_multi_proc(ncpus,q) return q
[docs] def concatenate_solutions(csv_dir,out_dir,col_index="",single_csv=False,ncpus=1, restart=False): """concatenate_solutions. Parameters ---------- csv_dir : str path to the csvs to concatenate directory out_dir : str path to the csvs output directory col_index : str column name of a column to be used as index (optional) single_csv : boolean option for the concatenate_csv function, if True all solutions will be stored in a single csv file ncpus : int the number of cpus allocated, will enable parallel processing Returns ------- a JoinableQueue Object """ #source : https://www.freecodecamp.org/news/how-to-combine-multiple- #csv-files-with-8-lines-of-code-265183e0854/ if (os.path.basename(out_dir) == "full_rxn_enum_set"): index_suffix = '_renum' elif (os.path.basename(out_dir) == "full_div_enum_set"): index_suffix = '_rdivers' else: return "Wrong outdir name" q = JoinableQueue() list_dir = list(set([i.split('_')[0] for i in os.listdir(csv_dir)])) for i in range(0,len(list_dir)): if restart: #search if a concatenated csv exist for this file id in the full rxn set rgx = re.compile(csv_dir+list_dir[i].split('_')[0]+'.*') match_res = list(filter(rgx.match,os.path.basename(csv_dir))) if len(match_res) == 0: files = glob.glob(csv_dir+list_dir[i].split('_')[0]+'*_solutions.csv') if ncpus == 1: concatenate_csv(files,out_dir,col_index,single_csv,index_suffix) else: q.put((concatenate_csv,(files,out_dir,col_index,single_csv,index_suffix))) else: files = glob.glob(csv_dir+list_dir[i].split('_')[0]+'*_solutions.csv') if ncpus == 1: concatenate_csv(files,out_dir,col_index,single_csv,index_suffix) else: q.put((concatenate_csv,(files,out_dir,col_index,single_csv,index_suffix))) if ncpus == 1: pass else: launch_multi_proc(ncpus,q) return q
[docs] def concatenate_csv(filenames,out_dir,col_index,single_csv,index_suffix=""): """concatenate_csv. Parameters ---------- filenames : str list of csv files to concatenate into one csv file out_dir : str path to the concatenated csv output directory col_index : str column name of a column to be used as index (optional) single_csv : boolean option for the concatenate_csv function, if True all solutions will be stored in a single csv file index_suffix : str suffix to add to csv's row index Returns ------- write the concatenated csv in the """ list_csvs = [] index = [] nrenum = 0 if col_index == "": col_index = list(pd.read_csv(filenames[0]).columns) for i in range(len(filenames)): tmp = pd.read_csv(filenames[i]) #check that the number of columns match and get colnames of the first file if len(col_index) != len(tmp.columns): print("Error") else: tmp.columns = col_index if len(os.path.basename(filenames[i]).split('_')) == 2: file_id = os.path.basename(filenames[i]).split('_')[0]+'_full_rxn_enum' nrenum = tmp.shape[0] else: file_id = os.path.basename(filenames[i]).split('_')[0]+'_'+os.path.basename(filenames[i]).split('_')[3] index.append(file_id) list_csvs.append(tmp) combined_csv = pd.concat(list_csvs,ignore_index=False) if nrenum > 0: #Modify index after reaction_enum solutions line_count = pd.RangeIndex(0,combined_csv.shape[0],1) index_list = list(os.path.basename(filenames[0]).split('_')[0]+'_' + line_count.astype(str) + str(index_suffix)) index_list[0:nrenum] = list(combined_csv[0:nrenum]['Solutions_IDS']) combined_csv.index = index_list else: line_count = pd.RangeIndex(0,combined_csv.shape[0],1) combined_csv.index = os.path.basename(filenames[0]).split('_')[0]+'_' + line_count.astype(str) + str(index_suffix) combined_csv.drop(combined_csv.columns[0],axis=1,inplace=True) combined_csv.drop_duplicates(inplace=True) #remove identical solutions if single_csv: combined_csv.to_csv(out_dir+'/all_solutions.csv', mode='a', encoding='utf-8-sig') else: combined_csv.to_csv(out_dir+'/'+os.path.basename(filenames[0]).split('_')[0]+'_solutions.csv', encoding='utf-8-sig') return
[docs] def remove_done_batchs(batch_dir,result_dir,launch_undone = True,relax_param = False,enum_type="reaction_enum", para_batch=False, env="MANA"): """remove_done_batchs. Parameters ---------- batch_dir : str path to the batchs directory result_dir : str path to the modelling result directory launch_undone : boolean If True, write the master bash file to launch all failed batchs relax_param : boolean If True, relax the mipgap tolerance parameter enum_type : str string indicating which type of enumeration is being processed (optional) para_batch : boolean if True, launch each batch file independantly (instead of parallel on conditions, parallel on batch) env : str name of the anaconda environment to be activated Returns ------- a list with failed batch names """ removed_batchs = [] results = glob.glob(result_dir+'/*solutions.csv') for file in results: #reconstruct the batch name, with a regex to be able to use it for reaction enum and diversity enum cleanfile = os.path.basename(file) item = str(cleanfile.split('_')[0])+'_'+str(cleanfile.split('_')[3])+'_.*_enum.sh' #look if item match with a batch file,(meaning that the batch is done) batchs = os.listdir(batch_dir) for batch in batchs: if re.search(item,batch): os.remove(batch_dir+'/'+batch) removed_batchs.append(batch) if relax_param == True: #listdir again because we do not want to iterate over removed batchs batchs = os.listdir(batch_dir) for batch in batchs: #read current batch with open(batch_dir+batch,'r') as f: content = f.read() #replace mipgap with open(batch_dir+batch,'w') as f: f.write(re.sub(r'--mipgap .*','--mipgap 0.01',content)) if para_batch == True: batchs = os.listdir(batch_dir) for batch in batchs: #read current batch with open(batch_dir+batch,'r') as f: content = f.read() if '#!/bin/bash' in content: continue with open(batch_dir+batch,'w') as f: f.write('#!/bin/bash\n#SBATCH -p workq\n#SBATCH --mem=12G\n#SBATCH --cpus-per-task=4\n#SBATCH -t 48:00:00\n#SBATCH -J '+enum_type+'\n#SBATCH -o log_dir/runout_relaunch.out\n#SBATCH ' '-e log_dir/runerr_relaunch.out\nsource activate '+env+'\n'+content) if launch_undone == True: with open(batch_dir.split('/')[0]+"/launch_failed_batch_"+enum_type+".sh", "w+") as f: f.write('#!/bin/bash\n#SBATCH -p workq\n#SBATCH --mem=12G\n#SBATCH --cpus-per-task=12\n#SBATCH -t 48:00:00\n#SBATCH -J '+enum_type+'\n#SBATCH -o log_dir/runout_relaunch.out\n#SBATCH ' '-e log_dir/runerr_relaunch.out\nsource activate '+env+'\n ls '+batch_dir+'*enum.sh|xargs -n 1 -P 1 bash') return removed_batchs
[docs] def remove_zerobiomass_solutions(enum_dir,reaction_list,separator=','): """remove_zerobiomass_solutions. Parameters ---------- enum_dir : str the path to the enumeration directory reaction_list : str the path to the reaction_list directory separator : str the character used to separate columns in the file Returns ------- overwrite the csv file without solutions with 0 flux in biomass reaction """ col_list = ['Solutions_IDS'] + list(pd.read_csv(reaction_list).iloc[:,0]) for file in os.listdir(enum_dir): tmp_file = pd.read_csv(enum_dir+'/'+file,sep=separator) tmp_file.columns = col_list #drop rows where biomass_reaction equals 0 tmp_file.drop(tmp_file[tmp_file['biomass_reaction'] == 0].index,axis=0,inplace=True) #write modified solution file tmp_file.to_csv(enum_dir+'/'+file,sep=separator,index=False)