Source code for InPheC_Step2

"""
InPheC_Step2.py

This script is part of the InPheRNo-ChIP project and is responsible for:

    - Estimating the parameters of the model.
    - Running the probabilistic graphical model (PGM).
    - Incorporating enhancements in model Qij to account for inter-lineage dependencies and maintaining numerical stability.

Usage:

    - Run the script from the command line "python InPheC_Step2.py" directly.
    - For detailed argument options, use the help option: `python InPheC_Step2.py --help`

"""
import argparse
import logging
import os
import sys

import cloudpickle
import numpy as np
import pandas as pd
import pymc as pm
import pytensor.tensor as pt
from scipy.stats import beta, rv_continuous, uniform
from tqdm import tqdm

from utils_logo import print_logo
import warnings 
warnings.filterwarnings("ignore")

# Global seed for reproducibility
DEFAULT_SEED = 1011
np.random.seed(DEFAULT_SEED)
print(f"Running on PyMC v{pm.__version__}")


[docs] class Step2ArgumentParser: """ Parses and handles command-line arguments for the step2_main.py script, ensuring proper validation and configuration of input and output paths and file-related options for RNA-seq and ChIP-seq data processing. """ def __init__(self): """ Initializes the argument parser with a description and adds command-line arguments for data processing. """ self.parser = argparse.ArgumentParser(description="Loading input and output directories for Main_step2.py") self._add_arguments() self.args = self.parser.parse_args() self.validate_args() # check if directories exist def _add_arguments(self): # Directory arguments self.parser.add_argument( "-id_rna", "--in_dir_rna", default="./Data/RNA_seq/", help="address of directory containing gene-pheno association for ALL genes", ) self.parser.add_argument( "-id", "--in_dir", default="./step1/", help="address of input directory containing outputs of InPheC_step1.py", ) self.parser.add_argument( "-od", "--out_dir", default="./step2/", help="address of directory containing intermediate output of InPheC_step2.py", ) # File-related arguments self.parser.add_argument( "-loi", "--lineage_of_interest", default=["h64", "dEN"], help="keyword of lineage we are interested in the chip-seq files. this argument is fixed for now.", ) self.parser.add_argument( "-igp", "--input_gene_pheno", default="RNAseq_pval_gene-pheno_intersect_chip_", help="name of file containing gene-phenotype association p-values for set of interest", ) self.parser.add_argument( "-igp_a", "--input_gene_pheno_all", default="RNAseq_pval_gene-phenotype_gse361-gse981-gse371_allGenes.csv", help="name of file containing gene-phenotype association p-values for ALL genes", ) self.parser.add_argument( "-itgGEx", "--input_TF_gene", default="RNAseq_pval_gene-tf_intersect_chip_", help="name of file containing TF-gene association pseudo p-values generated by InPheC_step1.py", ) self.parser.add_argument( "-itgchip", "--input_TF_gene_chip", default="ChIPseq_pval_gene-tf_minQ_", help="chip-seq data containing pseudo values of TF-gene associations for 2 lineages.", ) ## pgm's parameters self.parser.add_argument( "-sb", "--start_batch", type=int, default=0, help="Start index of the batch. Useful if program gets killed in the middle or for parallelization.", ) self.parser.add_argument( "-ni", "--n_iteration", type=int, default=4000, help="number of iterations for the PGM. Typically, 1000 to 5000 draws per chain are suggested for a single chain.", ) # parser.add_argument('-nb', '--n_burn', type=int, default=100, help='number of iterations to burn.') self.parser.add_argument( "-nc", "--n_chain", type=int, default=4, help="number of chains in pm.sample().", ) self.parser.add_argument( "-nt", "--n_tune", type=int, default=2000, help="A common practice is to use a similar number of tuning iterations as draws, often around 1000 to 5000.", ) self.parser.add_argument( "-bs", "--batch_size", type=int, default=10, help="Number of genes to run before saving to disk. Suggestions: 100 genes per batch. Here set to 10 due to github push limits", ) self.parser.add_argument("-pT", "--Prior_T", default=2 / 22, type=float, help="prior of Tij") def _validate_directory(self, directory_path, description): """ Checks if a specified directory path exists and is a valid directory. :param directory_path: The directory path to validate. :type directory_path: str :param description: A brief description of the directory's intended use, for error messages. :type description: str :raises FileNotFoundError: If the directory does not exist or is not a directory. """ if not os.path.exists(directory_path) or not os.path.isdir(directory_path): raise FileNotFoundError(f"{description} not found: {directory_path}")
[docs] def validate_args(self): """ Validates the existence of directories specified in command-line arguments and creates the output directory if it does not exist. """ self._validate_directory(self.args.in_dir_rna, "Input RNA directory") self._validate_directory(self.args.in_dir, "Input directory") if not os.path.exists(self.args.out_dir): os.makedirs(self.args.out_dir, exist_ok=True) print(f" >> Created output directory: {self.args.out_dir}") self._validate_directory(self.args.out_dir, "Output directory")
[docs] def get_args(self): """ Returns the validated command-line arguments. :return: Parsed command-line arguments. :rtype: argparse.Namespace """ return self.args
[docs] class Step1DataLoader: """ Handles the loading of data from outputs of the first step in the InPheRNo-ChIP project pipeline as well as RNA-seq data. This class is tasked with ensuring data integrity and format correctness as it prepares the data for subsequent processing stages. :param args: Parsed command-line arguments containing paths and file-related settings. :type args: argparse.Namespace """ def __init__(self, args): """ Initializes the data loader with command-line arguments specifying paths and settings. :param args: Parsed command-line arguments. :type args: argparse.Namespace """ self.args = args
[docs] def load_csv(self, file_path, delimiter): """ Loads a CSV file into a pandas DataFrame, using a specified delimiter.""" return pd.read_csv(file_path, sep=delimiter, index_col=0, header=0)
[docs] def load_data(self): """ Loads necessary data files from specified directories for further processing, including gene-TF associations, gene-phenotype associations, and ChIP-seq data. Validates the loading process by printing the shapes of loaded datasets. :return: Tuple containing DataFrames for gene-phenotype associations for all genes, gene-phenotype associations for the set of interest, gene-TF associations, and ChIP-seq gene-TF associations. :rtype: tuple """ delim = "," try: # Load 3 files from Main_step1.py addr_out_pvalue_gt = os.path.join( self.args.in_dir, f"{self.args.input_TF_gene}{'_'.join(self.args.lineage_of_interest)}.csv", ) addr_out_pvalue_gp = os.path.join( self.args.in_dir, f"{self.args.input_gene_pheno}{'_'.join(self.args.lineage_of_interest)}.csv", ) addr_out_chip_gt = os.path.join( self.args.in_dir, f"{self.args.input_TF_gene_chip}{'_'.join(self.args.lineage_of_interest)}.csv", ) rna_gp = self.load_csv(addr_out_pvalue_gp, delim) rna_gt = self.load_csv(addr_out_pvalue_gt, delim) chip_gt = self.load_csv(addr_out_chip_gt, delim) # load 1 file from Data folder addr_gene_pheno_all = os.path.join(self.args.in_dir_rna, self.args.input_gene_pheno_all) rna_gp_all = self.load_csv(addr_gene_pheno_all, delim) except: raise FileNotFoundError("File not found") self._print_shape(rna_gp_all, rna_gp, rna_gt, chip_gt) return rna_gp_all, rna_gp, rna_gt, chip_gt
def _print_shape(self, rna_gp_all, rna_gp, rna_gt, chip_gt): """ Displays the shapes of multiple loaded dataframes to validate their structure and size. :param dataframes: Variable number of dataframes to display shapes for. """ data_shapes = { "File loaded": ["rna gene-pheno ALL", "rna gene-pheno interest", "rna gene-tf", "ChIP gene-tf"], "Shape": [rna_gp_all.shape, rna_gp.shape, rna_gt.shape, chip_gt.shape], } df_shapes = pd.DataFrame(data_shapes) print(df_shapes.to_string(index=False))
[docs] class PGMInputPreparer: """ Prepares and processes input data for the probabilistic graphical model (PGM) as part of the InPheRNo-ChIP project. This includes data masking, estimating parameters, and adding control variables to ensure robust model performance. :param out_tmp_dir: Directory path for saving intermediate outputs. :type out_tmp_dir: str :param rna_tg: DataFrame containing RNA-seq gene-TF associations. :type rna_tg: pandas.DataFrame :param rna_gene_pheno: DataFrame containing gene-phenotype associations. :type rna_gene_pheno: pandas.DataFrame :param chip_tg: DataFrame containing ChIP-seq gene-TF associations. :type chip_tg: pandas.DataFrame :param rna_gene_pheno_all: DataFrame containing gene-phenotype associations for all genes. :type rna_gene_pheno_all: pandas.DataFrame """ def __init__(self, out_tmp_dir, rna_tg, rna_gene_pheno, chip_tg, rna_gene_pheno_all): """ Initializes the PGMInputPreparer with directories and data for processing. """ self.eps = sys.float_info.min self.out_tmp_dir = out_tmp_dir self.rna_tg = rna_tg self.rna_gene_pheno = rna_gene_pheno self.chip_tg = chip_tg self.rna_gene_pheno_all = rna_gene_pheno_all def _mask_elasticNet_rlts(self, pvalue_TF_gene): """ Masks the p-values from Elastic Net results to handle edge cases and improve numerical stability. :param pvalue_TF_gene: DataFrame with p-values to be masked. :type pvalue_TF_gene: pandas.DataFrame :return: DataFrame with masked p-values. :rtype: pandas.DataFrame """ # Cap the minimum p-values to avoid precision issues pvalue_TF_gene[(pvalue_TF_gene < self.eps) & (pvalue_TF_gene > -0.5)] = self.eps # when the largest p-value is exactly 1, it faces distribution issues. We multiply it with a number very close to 1. pvalue_TF_gene = pvalue_TF_gene.applymap(lambda x: x * (1 - 1e-15) if x == 1 else x) # mask to 1 if p-value is negative pvalue_TF_gene = pvalue_TF_gene.applymap(lambda x: (1 - self.eps) if x < -0.5 else x) return pvalue_TF_gene
[docs] class beta_unif_gen(rv_continuous): def _pdf(self, x, a0, b0, c0): return c0 * (beta.pdf(x, a=a0, b=b0)) + (1 - c0) * uniform.pdf(x)
[docs] def estimate_a_prime(self, Pj_arr): """ Estimates the 'a_prime' parameter for the PGM using a hybrid beta-uniform distribution. :param Pj_arr: Array of p-values for estimating the distribution parameter. :type Pj_arr: numpy.ndarray :return: Estimated 'a_prime' parameter. :rtype: float """ np.seterr(divide="ignore", invalid="ignore") beta_unif_rv = self.beta_unif_gen(name="beta_unif", a=0.0, b=1.0) a_gp_tmp, _, c, _, _ = beta_unif_rv.fit( Pj_arr.copy(), fb0=1, floc=0, fscale=1 ) if c > 1: # Refit if the mixture coefficient is improperly estimated logging.info(f"a_prime (c>1) estimated: {a_gp_tmp}") a_gp_tmp, _, _, _ = beta.fit(Pj_arr.copy(), fb=1, floc=0, fscale=1) logging.info(f"newly estimated: {a_gp_tmp}") A_gene = a_gp_tmp return A_gene
[docs] def check_modality_per_gene(self): """ Ensures that each gene meets specific criteria in both RNA-seq and ChIP-seq data for inclusion in the PGM. :return: DataFrames filtered based on modality checks. :rtype: tuple of pandas.DataFrame """ # Filter for genes with at least one TF having p-values < 0.05 in rna_tg selected_genes_rna = self.rna_tg[self.rna_tg < 0.05].dropna(how="all").index # Filter for genes in chip_tg with at least one TF having Distance_P_Value < 0.05 selected_genes_chip = self.chip_tg[self.chip_tg["Distance_P_Value"] < 0.05].index.unique() # Intersection of genes meeting criteria in both dataframes selected_genes = list(set(selected_genes_rna) & set(selected_genes_chip)) print(f" >> [modality check] # genes with at least one TF having p-values < 0.05: {len(selected_genes)}") # Filter both dataframes using the selected_genes list filtered_rna_tg = self.rna_tg.loc[selected_genes] filtered_chip_tg = self.chip_tg.loc[selected_genes] filtered_rna_gp = self.rna_gene_pheno.loc[selected_genes] return filtered_rna_tg, filtered_chip_tg, filtered_rna_gp
def _generate_controls(self, df, gene): """ Generates control transcription factors (TFs) for a given gene to facilitate proper model calibration. This method adds three types of control entries: ref_PIE, ref_Q, and ref_PIE_Q. :param df: DataFrame containing the existing gene and TF associations along with their Pj values. :type df: pandas.DataFrame :param gene: Specific gene for which to generate control TFs. :type gene: str :return: A DataFrame that includes the original data along with newly added control entries for the specified gene. :rtype: pandas.DataFrame """ # Filter the dataframe for the specific gene gene_df = df[df["Gene"] == gene] # Calculate the minimum values min_pi_ij = gene_df["pi_ij"].min() min_Qij = gene_df["Qij"].min() # Common Pj value for the gene common_Pj = gene_df["Pj"].iloc[0] # Creating the new rows for reference PIE, Q, and PIE_Q ref_PIE = pd.DataFrame( [ { "Gene": gene, "TF": "ref_PIE", "pi_ij": 1, "Qij": min_Qij, "Pj": common_Pj, } ] ) ref_Q = pd.DataFrame( [{"Gene": gene, "TF": "ref_Q", "pi_ij": min_pi_ij, "Qij": 1, "Pj": common_Pj}] ) ref_PIE_Q = pd.DataFrame([{"Gene": gene, "TF": "ref_PIE_Q", "pi_ij": 1, "Qij": 1, "Pj": common_Pj}]) # Append new rows to the original dataframe return pd.concat([gene_df, ref_PIE, ref_Q, ref_PIE_Q], ignore_index=True)
[docs] def combine_and_add_controls(self, flt_rna_tg, flt_chip_tg, flt_rna_gp): """ Merges datasets and adds control entries to prepare for PGM analysis. :param flt_rna_tg: Filtered RNA-seq gene-TF associations. :type flt_rna_tg: pandas.DataFrame :param flt_chip_tg: Filtered ChIP-seq gene-TF associations. :type flt_chip_tg: pandas.DataFrame :param flt_rna_gp: Filtered gene-phenotype associations. :type flt_rna_gp: pandas.DataFrame :return: Combined DataFrame with control entries added. :rtype: pandas.DataFrame """ # pi_ij flt_rna_tg = self._mask_elasticNet_rlts(flt_rna_tg) flt_rna_tg = flt_rna_tg.stack().reset_index() flt_rna_tg.columns = ["Gene", "TF", "pi_ij"] # Pj flt_rna_gp = flt_rna_gp.reset_index().rename(columns={"index": "Gene", "PValue": "Pj"}) flt_rna_gp = flt_rna_gp[["Gene", "Pj"]] # Qij flt_chip_tg = flt_chip_tg.reset_index() # Calculate min-over-min min_over_min = ( flt_chip_tg.pivot_table(index=["TF", "Gene_ID"], columns="lineage", values="Distance_P_Value") .min(axis=1) .reset_index() ) min_over_min.columns = ["TF", "Gene", "Qij"] # Take intersection of genes result_df = flt_rna_tg.merge(min_over_min, on=["Gene", "TF"], how="outer") result_df = result_df.merge(flt_rna_gp, on=["Gene"], how="inner") result_df = result_df.fillna(1) # Save the result_df (without controls) to a csv file result_df.to_csv(os.path.join(self.out_tmp_dir, "in2pgm_woCtrl.csv"), index=False) # Add controls unique_genes = result_df["Gene"].unique() final = result_df.copy() print(" >> Adding controls...") for gene in tqdm(unique_genes): gene_df_modified = self._generate_controls(result_df, gene) final = pd.concat([final, gene_df_modified], ignore_index=True) final = final.drop_duplicates(keep="first", subset=["Gene", "TF", "pi_ij", "Pj", "Qij"]) return final.sort_values(by=["Gene", "TF"]).reset_index(drop=True)
[docs] class CorePGM: """ This class sets up the probabilistic graphical model (PGM) configurations, manages batch processing of genes, and handles the execution of the model for each gene. :param in2pgm: Input data and configurations for setting up the PGM. :type in2pgm: dict :param pgm_param: Parameters to control the PGM execution, such as number of iterations and chains. :type pgm_param: dict :param Prior_T: Prior probability for the Bernoulli distribution of Tij. :type Prior_T: float """ def __init__(self, in2pgm, pgm_param, Prior_T): """ Initializes the CorePGM with necessary parameters and input data. :param in2pgm: Data and a_prime value for PGM setup. :param pgm_param: Dictionary containing parameters for the PGM such as number of iterations and chains. :param Prior_T: Prior probability for the Bernoulli distribution of transcription factors' interactions. """ self.A_gene = in2pgm["a_prime"] self.obs_all_genes = in2pgm["obs"] self.ls_genes = self.obs_all_genes.Gene.unique().tolist() self.ls_tf = self.obs_all_genes.TF.unique().tolist() self.Prior_T = Prior_T # should be 2/(num_true_tf), not including controls self.pgm_param = pgm_param
[docs] def logsumexp(self, logp1, logp2, r): """ Computes the log-sum-exp of two log probabilities for numerical stability. :param logp1: Log probability 1. :type logp1: float :param logp2: Log probability 2. :type logp2: float :param r: Mixing ratio. :type r: float :return: Result of log-sum-exp calculation. :rtype: float """ max_logp = pt.maximum(logp1, logp2) return max_logp + pt.log(r * pt.exp(logp1 - max_logp) + (1 - r) * pt.exp(logp2 - max_logp))
[docs] def setup_model(self, n_TF, n_lineages, a_gp): """ Configures the PGM model using the specified parameters and distributions. :param n_TF: Number of transcription factors. :type n_TF: int :param n_lineages: Number of lineages, default to 1, as we are considering the minimum p-value across lineages. :type n_lineages: int :param a_gp: Estimated parameter 'a_prime' for the Beta distribution in phenotype modeling. :type a_gp: float :return: Configured PGM model. :rtype: pymc.Model """ def phenotype_logp(value, T_sum, a_gp): h1_logp = pm.logp(pm.Beta.dist(alpha=a_gp, beta=1.0, shape=(1, 1, 1)), value) h0_logp = pm.logp(pm.Uniform.dist(0, 1, shape=(1, 1, 1)), value) return pt.switch(pt.eq(T_sum, 0), h0_logp, h1_logp) def rna_tg_logp(value, T, r_tg, a_tg1, a_tg0): tg1_logp = pm.logp(pm.Beta.dist(alpha=a_tg1, beta=1, shape=(1, n_TF, 1)), value) tg0_logp = pm.logp(pm.Beta.dist(alpha=a_tg0, beta=1, shape=(1, n_TF, 1)), value) return pt.switch(T, tg1_logp, self.logsumexp(tg1_logp, tg0_logp, r_tg)) def chip_tg_logp(value, T, r_tgp, a_tgp1, a_tgp0): tgp1_logp = pm.logp(pm.Beta.dist(alpha=a_tgp1, beta=1.0, shape=(1, n_TF, n_lineages)), value) tgp0_logp = pm.logp(pm.Beta.dist(alpha=a_tgp0, beta=1.0, shape=(1, n_TF, n_lineages)), value) return pt.switch(T, tgp1_logp, self.logsumexp(tgp1_logp, tgp0_logp, r_tgp)) with pm.Model() as model: # Define MutableData for observations obs_p_gene = pm.MutableData("obs_p_gene", np.zeros((1, 1, 1))) obs_p_TF_gene = pm.MutableData("obs_p_TF_gene", np.zeros((1, n_TF, 1))) obs_chip_tg_both = pm.MutableData("obs_chip_tg_both", np.zeros((1, n_TF, n_lineages))) # ---- params for RNA-seq module ---- a_tg1 = pm.Uniform("a_tg1", lower=0.0, upper=0.5, shape=(1, 1, 1)) a_tg0 = pm.Uniform("a_tg0", lower=0.5, upper=1.0, shape=(1, 1, 1)) r_tg = pm.Uniform("r_tg", lower=0.0, upper=1.0, shape=(1, 1, 1)) # ---- params for ChIP-seq module ---- a_tgp1 = pm.Uniform("a_tgp1", lower=0.0, upper=0.5, shape=(1, 1, 1)) a_tgp0 = pm.Uniform("a_tgp0", lower=0.5, upper=1.0, shape=(1, 1, 1)) r_tgp = pm.Uniform("r_tgp", lower=0.0, upper=1.0, shape=(1, 1, 1)) # ---- params for Tij ---- upper = 3 / (n_TF-3) lower = 1 / (n_TF-3) mu = float(self.Prior_T) sigma = np.sqrt(((upper - mu) ** 2 + (mu - lower) ** 2) / 4) p_T = pm.TruncatedNormal("p_T", mu=mu, sigma=sigma, lower=0.0, upper=4/(n_TF-3), shape=(1, 1, 1)) # [Tij] T = pm.Bernoulli("T", p=p_T, shape=(1, n_TF, 1)) T_sum = pm.Deterministic("T_sum", pt.sum(T, axis=1)) # [phenotype] p_gene = pm.DensityDist("p_gene", T_sum, a_gp, logp=phenotype_logp, observed=obs_p_gene) # [GEx] pi_ij -- elastic net p_TF_gene = pm.DensityDist("p_TF_gene", T, r_tg, a_tg1, a_tg0, logp=rna_tg_logp, observed=obs_p_TF_gene) # [chip-seq] single peak, 2 lineages p_chip_tg = pm.DensityDist( "p_chip_tg", T, r_tgp, a_tgp1, a_tgp0, logp=chip_tg_logp, observed=obs_chip_tg_both ) return model
[docs] def run_model(self, one_gene): """ Executes the PGM for a single gene based on the provided gene data and model parameters. :param one_gene: Data for one gene including its interactions and expressions. :type one_gene: pandas.DataFrame :return: Trace of the model after sampling. :rtype: pymc.backends.base.MultiTrace """ a_gp = float(self.A_gene) n_TF = one_gene.shape[0] # this should be 22 TFs + 3 controls n_lineages = 1 # 2 model = self.setup_model(n_TF, n_lineages, a_gp) # Extract observations obs_p_gene = np.reshape(one_gene["Pj"].unique(), (1, 1, 1)) obs_rna_tg = np.reshape(one_gene["pi_ij"].values, (1, n_TF, 1)) # Stacking to match shape (n_TF, n_lineages) obs_q_ij = one_gene["Qij"].values obs_q_ij = np.reshape(np.stack([obs_q_ij], axis=1), (1, n_TF, n_lineages)) pm.set_data( { "obs_p_gene": obs_p_gene, "obs_p_TF_gene": obs_rna_tg, "obs_chip_tg_both": obs_q_ij, }, model=model, ) with model: trace = pm.sample( draws=self.pgm_param["ni"], chains=self.pgm_param["nc"], tune=self.pgm_param["nt"], random_seed=self.pgm_param["s"], cores=4, nuts={"target_accept": 0.95}, progressbar=True, return_inferencedata=True, ) return trace
[docs] def run(self, out_dir, n_chain, start_batch, batch_size): """ Processes genes in batches, executing the PGM for each batch and managing the output. :param out_dir: Directory to save the output files. :type out_dir: str :param n_chain: Number of chains for MCMC sampling. :type n_chain: int :param start_batch: Starting index of the batch processing. :type start_batch: int :param batch_size: Number of genes processed in each batch. :type batch_size: int """ n_batches = len(self.ls_genes) // batch_size + (len(self.ls_genes) % batch_size > 0) for batch_id in range(start_batch, n_batches): # Initialize traces and info_per_batch traces = {} info_per_batch = {} # Extract genes for this batch start_idx = batch_id * batch_size end_idx = min(start_idx + batch_size, len(self.ls_genes)) batch_genes = self.ls_genes[start_idx:end_idx] print(f" >> Running batch {batch_id} with {len(batch_genes)} genes...") for gene in tqdm(batch_genes): RANDOM_SEED = (np.random.rand(n_chain) * 12345 + DEFAULT_SEED).astype(int).tolist() self.pgm_param["s"] = RANDOM_SEED # the seed is gene-specific info_per_batch[gene] = self.obs_all_genes[self.obs_all_genes["Gene"] == gene] traces[gene] = self.run_model(info_per_batch[gene]) # Save traces to pickle file model_fpath = os.path.join( out_dir, "RnCnP_vary-Tp%s_nc%s_ni%i_nt%s_noburn_batch%s.pkl" % ( np.round(self.Prior_T, 4), self.pgm_param["nc"], self.pgm_param["ni"], self.pgm_param["nt"], batch_id, ), ) with open(model_fpath, "wb") as fp: cloudpickle.dump({"trace": traces, "info_per_batch": info_per_batch}, fp) print(f" >> Saved batch {batch_id} to {model_fpath}") # Clear traces for the next batch traces.clear() return None
[docs] def step2_main(): """ Entry point for executing the step 2 of the InPheRNo-ChIP project's pipeline. This function orchestrates the loading of data, preprocessing, and execution of the probabilistic graphical model (PGM) for gene regulatory network (GRN) inference. Steps: - Parses command-line arguments for configuration settings. - Loads data produced by step 1, including gene-TF and gene-phenotype associations. - Prepares input for the PGM, including data cleaning, modality checks, and control integration. - Configures and runs the PGM to infer regulatory networks. The function manages file directories, initializes data preparation, sets PGM parameters, and handles the batch processing of genes, ensuring all outputs are appropriately saved and managed. """ in2pgm = {} # Dictionary to store inputs required for PGM setup # Parse arguments parser = Step2ArgumentParser() args = parser.get_args() # Retrieve command-line arguments # Load data from the output of step 1 data_loader = Step1DataLoader(args) rna_gp_all, rna_gp, rna_gt, chip_gt = data_loader.load_data() # Prepare input for PGM tmp_out_dir = os.path.join(args.out_dir, "tmp") os.makedirs(tmp_out_dir, exist_ok=True) # save intermediate file # Initialize data preparer and estimate a_prime from the data preparer = PGMInputPreparer(tmp_out_dir, rna_gt, rna_gp, chip_gt, rna_gp_all) in2pgm["a_prime"] = preparer.estimate_a_prime(rna_gp_all) # Perform modality checks and prepare the data for PGM input ## e.g., 1.4k genes (from step 1) --> 1.2k genes after modality check flt_rna_tg, flt_chip_tg, flt_rna_gp = preparer.check_modality_per_gene() # Combine and add controls in2pgm["obs"] = preparer.combine_and_add_controls( flt_rna_tg, flt_chip_tg, flt_rna_gp ) # Set parameters for the PGM execution pgm_param = {"ni": args.n_iteration, "nc": args.n_chain, "nt": args.n_tune} # Initialize and run the PGM pgm = CorePGM(in2pgm, pgm_param, args.Prior_T) pgm.run(args.out_dir, args.n_chain, args.start_batch, args.batch_size)
if __name__ == "__main__": print_logo("Running step 2") step2_main()