"""
InPheC_Step2.py
This script is part of the InPheRNo-ChIP project and is responsible for:
- Estimating the parameters of the model.
- Running the probabilistic graphical model (PGM).
- Incorporating enhancements in model Qij to account for inter-lineage dependencies and maintaining numerical stability.
Usage:
- Run the script from the command line "python InPheC_Step2.py" directly.
- For detailed argument options, use the help option: `python InPheC_Step2.py --help`
"""
import argparse
import logging
import os
import sys
import cloudpickle
import numpy as np
import pandas as pd
import pymc as pm
import pytensor.tensor as pt
from scipy.stats import beta, rv_continuous, uniform
from tqdm import tqdm
from utils_logo import print_logo
import warnings
warnings.filterwarnings("ignore")
# Global seed for reproducibility
DEFAULT_SEED = 1011
np.random.seed(DEFAULT_SEED)
print(f"Running on PyMC v{pm.__version__}")
[docs]
class Step2ArgumentParser:
"""
Parses and handles command-line arguments for the step2_main.py script, ensuring proper validation and configuration
of input and output paths and file-related options for RNA-seq and ChIP-seq data processing.
"""
def __init__(self):
"""
Initializes the argument parser with a description and adds command-line arguments for data processing.
"""
self.parser = argparse.ArgumentParser(description="Loading input and output directories for Main_step2.py")
self._add_arguments()
self.args = self.parser.parse_args()
self.validate_args() # check if directories exist
def _add_arguments(self):
# Directory arguments
self.parser.add_argument(
"-id_rna",
"--in_dir_rna",
default="./Data/RNA_seq/",
help="address of directory containing gene-pheno association for ALL genes",
)
self.parser.add_argument(
"-id",
"--in_dir",
default="./step1/",
help="address of input directory containing outputs of InPheC_step1.py",
)
self.parser.add_argument(
"-od",
"--out_dir",
default="./step2/",
help="address of directory containing intermediate output of InPheC_step2.py",
)
# File-related arguments
self.parser.add_argument(
"-loi",
"--lineage_of_interest",
default=["h64", "dEN"],
help="keyword of lineage we are interested in the chip-seq files. this argument is fixed for now.",
)
self.parser.add_argument(
"-igp",
"--input_gene_pheno",
default="RNAseq_pval_gene-pheno_intersect_chip_",
help="name of file containing gene-phenotype association p-values for set of interest",
)
self.parser.add_argument(
"-igp_a",
"--input_gene_pheno_all",
default="RNAseq_pval_gene-phenotype_gse361-gse981-gse371_allGenes.csv",
help="name of file containing gene-phenotype association p-values for ALL genes",
)
self.parser.add_argument(
"-itgGEx",
"--input_TF_gene",
default="RNAseq_pval_gene-tf_intersect_chip_",
help="name of file containing TF-gene association pseudo p-values generated by InPheC_step1.py",
)
self.parser.add_argument(
"-itgchip",
"--input_TF_gene_chip",
default="ChIPseq_pval_gene-tf_minQ_",
help="chip-seq data containing pseudo values of TF-gene associations for 2 lineages.",
)
## pgm's parameters
self.parser.add_argument(
"-sb",
"--start_batch",
type=int,
default=0,
help="Start index of the batch. Useful if program gets killed in the middle or for parallelization.",
)
self.parser.add_argument(
"-ni",
"--n_iteration",
type=int,
default=4000,
help="number of iterations for the PGM. Typically, 1000 to 5000 draws per chain are suggested for a single chain.",
)
# parser.add_argument('-nb', '--n_burn', type=int, default=100, help='number of iterations to burn.')
self.parser.add_argument(
"-nc",
"--n_chain",
type=int,
default=4,
help="number of chains in pm.sample().",
)
self.parser.add_argument(
"-nt",
"--n_tune",
type=int,
default=2000,
help="A common practice is to use a similar number of tuning iterations as draws, often around 1000 to 5000.",
)
self.parser.add_argument(
"-bs",
"--batch_size",
type=int,
default=10,
help="Number of genes to run before saving to disk. Suggestions: 100 genes per batch. Here set to 10 due to github push limits",
)
self.parser.add_argument("-pT", "--Prior_T", default=2 / 22, type=float, help="prior of Tij")
def _validate_directory(self, directory_path, description):
"""
Checks if a specified directory path exists and is a valid directory.
:param directory_path: The directory path to validate.
:type directory_path: str
:param description: A brief description of the directory's intended use, for error messages.
:type description: str
:raises FileNotFoundError: If the directory does not exist or is not a directory.
"""
if not os.path.exists(directory_path) or not os.path.isdir(directory_path):
raise FileNotFoundError(f"{description} not found: {directory_path}")
[docs]
def validate_args(self):
"""
Validates the existence of directories specified in command-line arguments and creates the output directory if it does not exist.
"""
self._validate_directory(self.args.in_dir_rna, "Input RNA directory")
self._validate_directory(self.args.in_dir, "Input directory")
if not os.path.exists(self.args.out_dir):
os.makedirs(self.args.out_dir, exist_ok=True)
print(f" >> Created output directory: {self.args.out_dir}")
self._validate_directory(self.args.out_dir, "Output directory")
[docs]
def get_args(self):
"""
Returns the validated command-line arguments.
:return: Parsed command-line arguments.
:rtype: argparse.Namespace
"""
return self.args
[docs]
class Step1DataLoader:
"""
Handles the loading of data from outputs of the first step in the InPheRNo-ChIP project pipeline as well as RNA-seq data.
This class is tasked with ensuring data integrity and format correctness as it prepares the data for subsequent processing stages.
:param args: Parsed command-line arguments containing paths and file-related settings.
:type args: argparse.Namespace
"""
def __init__(self, args):
"""
Initializes the data loader with command-line arguments specifying paths and settings.
:param args: Parsed command-line arguments.
:type args: argparse.Namespace
"""
self.args = args
[docs]
def load_csv(self, file_path, delimiter):
""" Loads a CSV file into a pandas DataFrame, using a specified delimiter."""
return pd.read_csv(file_path, sep=delimiter, index_col=0, header=0)
[docs]
def load_data(self):
"""
Loads necessary data files from specified directories for further processing, including gene-TF associations,
gene-phenotype associations, and ChIP-seq data. Validates the loading process by printing the shapes of loaded datasets.
:return: Tuple containing DataFrames for gene-phenotype associations for all genes, gene-phenotype associations for the set of interest,
gene-TF associations, and ChIP-seq gene-TF associations.
:rtype: tuple
"""
delim = ","
try:
# Load 3 files from Main_step1.py
addr_out_pvalue_gt = os.path.join(
self.args.in_dir,
f"{self.args.input_TF_gene}{'_'.join(self.args.lineage_of_interest)}.csv",
)
addr_out_pvalue_gp = os.path.join(
self.args.in_dir,
f"{self.args.input_gene_pheno}{'_'.join(self.args.lineage_of_interest)}.csv",
)
addr_out_chip_gt = os.path.join(
self.args.in_dir,
f"{self.args.input_TF_gene_chip}{'_'.join(self.args.lineage_of_interest)}.csv",
)
rna_gp = self.load_csv(addr_out_pvalue_gp, delim)
rna_gt = self.load_csv(addr_out_pvalue_gt, delim)
chip_gt = self.load_csv(addr_out_chip_gt, delim)
# load 1 file from Data folder
addr_gene_pheno_all = os.path.join(self.args.in_dir_rna, self.args.input_gene_pheno_all)
rna_gp_all = self.load_csv(addr_gene_pheno_all, delim)
except:
raise FileNotFoundError("File not found")
self._print_shape(rna_gp_all, rna_gp, rna_gt, chip_gt)
return rna_gp_all, rna_gp, rna_gt, chip_gt
def _print_shape(self, rna_gp_all, rna_gp, rna_gt, chip_gt):
"""
Displays the shapes of multiple loaded dataframes to validate their structure and size.
:param dataframes: Variable number of dataframes to display shapes for.
"""
data_shapes = {
"File loaded": ["rna gene-pheno ALL", "rna gene-pheno interest", "rna gene-tf", "ChIP gene-tf"],
"Shape": [rna_gp_all.shape, rna_gp.shape, rna_gt.shape, chip_gt.shape],
}
df_shapes = pd.DataFrame(data_shapes)
print(df_shapes.to_string(index=False))
[docs]
class CorePGM:
"""
This class sets up the probabilistic graphical model (PGM) configurations, manages batch processing of genes, and handles the execution of the model for each gene.
:param in2pgm: Input data and configurations for setting up the PGM.
:type in2pgm: dict
:param pgm_param: Parameters to control the PGM execution, such as number of iterations and chains.
:type pgm_param: dict
:param Prior_T: Prior probability for the Bernoulli distribution of Tij.
:type Prior_T: float
"""
def __init__(self, in2pgm, pgm_param, Prior_T):
"""
Initializes the CorePGM with necessary parameters and input data.
:param in2pgm: Data and a_prime value for PGM setup.
:param pgm_param: Dictionary containing parameters for the PGM such as number of iterations and chains.
:param Prior_T: Prior probability for the Bernoulli distribution of transcription factors' interactions.
"""
self.A_gene = in2pgm["a_prime"]
self.obs_all_genes = in2pgm["obs"]
self.ls_genes = self.obs_all_genes.Gene.unique().tolist()
self.ls_tf = self.obs_all_genes.TF.unique().tolist()
self.Prior_T = Prior_T # should be 2/(num_true_tf), not including controls
self.pgm_param = pgm_param
[docs]
def logsumexp(self, logp1, logp2, r):
"""
Computes the log-sum-exp of two log probabilities for numerical stability.
:param logp1: Log probability 1.
:type logp1: float
:param logp2: Log probability 2.
:type logp2: float
:param r: Mixing ratio.
:type r: float
:return: Result of log-sum-exp calculation.
:rtype: float
"""
max_logp = pt.maximum(logp1, logp2)
return max_logp + pt.log(r * pt.exp(logp1 - max_logp) + (1 - r) * pt.exp(logp2 - max_logp))
[docs]
def setup_model(self, n_TF, n_lineages, a_gp):
"""
Configures the PGM model using the specified parameters and distributions.
:param n_TF: Number of transcription factors.
:type n_TF: int
:param n_lineages: Number of lineages, default to 1, as we are considering the minimum p-value across lineages.
:type n_lineages: int
:param a_gp: Estimated parameter 'a_prime' for the Beta distribution in phenotype modeling.
:type a_gp: float
:return: Configured PGM model.
:rtype: pymc.Model
"""
def phenotype_logp(value, T_sum, a_gp):
h1_logp = pm.logp(pm.Beta.dist(alpha=a_gp, beta=1.0, shape=(1, 1, 1)), value)
h0_logp = pm.logp(pm.Uniform.dist(0, 1, shape=(1, 1, 1)), value)
return pt.switch(pt.eq(T_sum, 0), h0_logp, h1_logp)
def rna_tg_logp(value, T, r_tg, a_tg1, a_tg0):
tg1_logp = pm.logp(pm.Beta.dist(alpha=a_tg1, beta=1, shape=(1, n_TF, 1)), value)
tg0_logp = pm.logp(pm.Beta.dist(alpha=a_tg0, beta=1, shape=(1, n_TF, 1)), value)
return pt.switch(T, tg1_logp, self.logsumexp(tg1_logp, tg0_logp, r_tg))
def chip_tg_logp(value, T, r_tgp, a_tgp1, a_tgp0):
tgp1_logp = pm.logp(pm.Beta.dist(alpha=a_tgp1, beta=1.0, shape=(1, n_TF, n_lineages)), value)
tgp0_logp = pm.logp(pm.Beta.dist(alpha=a_tgp0, beta=1.0, shape=(1, n_TF, n_lineages)), value)
return pt.switch(T, tgp1_logp, self.logsumexp(tgp1_logp, tgp0_logp, r_tgp))
with pm.Model() as model:
# Define MutableData for observations
obs_p_gene = pm.MutableData("obs_p_gene", np.zeros((1, 1, 1)))
obs_p_TF_gene = pm.MutableData("obs_p_TF_gene", np.zeros((1, n_TF, 1)))
obs_chip_tg_both = pm.MutableData("obs_chip_tg_both", np.zeros((1, n_TF, n_lineages)))
# ---- params for RNA-seq module ----
a_tg1 = pm.Uniform("a_tg1", lower=0.0, upper=0.5, shape=(1, 1, 1))
a_tg0 = pm.Uniform("a_tg0", lower=0.5, upper=1.0, shape=(1, 1, 1))
r_tg = pm.Uniform("r_tg", lower=0.0, upper=1.0, shape=(1, 1, 1))
# ---- params for ChIP-seq module ----
a_tgp1 = pm.Uniform("a_tgp1", lower=0.0, upper=0.5, shape=(1, 1, 1))
a_tgp0 = pm.Uniform("a_tgp0", lower=0.5, upper=1.0, shape=(1, 1, 1))
r_tgp = pm.Uniform("r_tgp", lower=0.0, upper=1.0, shape=(1, 1, 1))
# ---- params for Tij ----
upper = 3 / (n_TF-3)
lower = 1 / (n_TF-3)
mu = float(self.Prior_T)
sigma = np.sqrt(((upper - mu) ** 2 + (mu - lower) ** 2) / 4)
p_T = pm.TruncatedNormal("p_T", mu=mu, sigma=sigma, lower=0.0, upper=4/(n_TF-3), shape=(1, 1, 1))
# [Tij]
T = pm.Bernoulli("T", p=p_T, shape=(1, n_TF, 1))
T_sum = pm.Deterministic("T_sum", pt.sum(T, axis=1))
# [phenotype]
p_gene = pm.DensityDist("p_gene", T_sum, a_gp, logp=phenotype_logp, observed=obs_p_gene)
# [GEx] pi_ij -- elastic net
p_TF_gene = pm.DensityDist("p_TF_gene", T, r_tg, a_tg1, a_tg0, logp=rna_tg_logp, observed=obs_p_TF_gene)
# [chip-seq] single peak, 2 lineages
p_chip_tg = pm.DensityDist(
"p_chip_tg", T, r_tgp, a_tgp1, a_tgp0, logp=chip_tg_logp, observed=obs_chip_tg_both
)
return model
[docs]
def run_model(self, one_gene):
"""
Executes the PGM for a single gene based on the provided gene data and model parameters.
:param one_gene: Data for one gene including its interactions and expressions.
:type one_gene: pandas.DataFrame
:return: Trace of the model after sampling.
:rtype: pymc.backends.base.MultiTrace
"""
a_gp = float(self.A_gene)
n_TF = one_gene.shape[0] # this should be 22 TFs + 3 controls
n_lineages = 1 # 2
model = self.setup_model(n_TF, n_lineages, a_gp)
# Extract observations
obs_p_gene = np.reshape(one_gene["Pj"].unique(), (1, 1, 1))
obs_rna_tg = np.reshape(one_gene["pi_ij"].values, (1, n_TF, 1))
# Stacking to match shape (n_TF, n_lineages)
obs_q_ij = one_gene["Qij"].values
obs_q_ij = np.reshape(np.stack([obs_q_ij], axis=1), (1, n_TF, n_lineages))
pm.set_data(
{
"obs_p_gene": obs_p_gene,
"obs_p_TF_gene": obs_rna_tg,
"obs_chip_tg_both": obs_q_ij,
},
model=model,
)
with model:
trace = pm.sample(
draws=self.pgm_param["ni"],
chains=self.pgm_param["nc"],
tune=self.pgm_param["nt"],
random_seed=self.pgm_param["s"],
cores=4,
nuts={"target_accept": 0.95},
progressbar=True,
return_inferencedata=True,
)
return trace
[docs]
def run(self, out_dir, n_chain, start_batch, batch_size):
"""
Processes genes in batches, executing the PGM for each batch and managing the output.
:param out_dir: Directory to save the output files.
:type out_dir: str
:param n_chain: Number of chains for MCMC sampling.
:type n_chain: int
:param start_batch: Starting index of the batch processing.
:type start_batch: int
:param batch_size: Number of genes processed in each batch.
:type batch_size: int
"""
n_batches = len(self.ls_genes) // batch_size + (len(self.ls_genes) % batch_size > 0)
for batch_id in range(start_batch, n_batches):
# Initialize traces and info_per_batch
traces = {}
info_per_batch = {}
# Extract genes for this batch
start_idx = batch_id * batch_size
end_idx = min(start_idx + batch_size, len(self.ls_genes))
batch_genes = self.ls_genes[start_idx:end_idx]
print(f" >> Running batch {batch_id} with {len(batch_genes)} genes...")
for gene in tqdm(batch_genes):
RANDOM_SEED = (np.random.rand(n_chain) * 12345 + DEFAULT_SEED).astype(int).tolist()
self.pgm_param["s"] = RANDOM_SEED # the seed is gene-specific
info_per_batch[gene] = self.obs_all_genes[self.obs_all_genes["Gene"] == gene]
traces[gene] = self.run_model(info_per_batch[gene])
# Save traces to pickle file
model_fpath = os.path.join(
out_dir,
"RnCnP_vary-Tp%s_nc%s_ni%i_nt%s_noburn_batch%s.pkl"
% (
np.round(self.Prior_T, 4),
self.pgm_param["nc"],
self.pgm_param["ni"],
self.pgm_param["nt"],
batch_id,
),
)
with open(model_fpath, "wb") as fp:
cloudpickle.dump({"trace": traces, "info_per_batch": info_per_batch}, fp)
print(f" >> Saved batch {batch_id} to {model_fpath}")
# Clear traces for the next batch
traces.clear()
return None
[docs]
def step2_main():
"""
Entry point for executing the step 2 of the InPheRNo-ChIP project's pipeline. This function orchestrates
the loading of data, preprocessing, and execution of the probabilistic graphical model (PGM) for gene
regulatory network (GRN) inference.
Steps:
- Parses command-line arguments for configuration settings.
- Loads data produced by step 1, including gene-TF and gene-phenotype associations.
- Prepares input for the PGM, including data cleaning, modality checks, and control integration.
- Configures and runs the PGM to infer regulatory networks.
The function manages file directories, initializes data preparation, sets PGM parameters, and
handles the batch processing of genes, ensuring all outputs are appropriately saved and managed.
"""
in2pgm = {} # Dictionary to store inputs required for PGM setup
# Parse arguments
parser = Step2ArgumentParser()
args = parser.get_args() # Retrieve command-line arguments
# Load data from the output of step 1
data_loader = Step1DataLoader(args)
rna_gp_all, rna_gp, rna_gt, chip_gt = data_loader.load_data()
# Prepare input for PGM
tmp_out_dir = os.path.join(args.out_dir, "tmp")
os.makedirs(tmp_out_dir, exist_ok=True) # save intermediate file
# Initialize data preparer and estimate a_prime from the data
preparer = PGMInputPreparer(tmp_out_dir, rna_gt, rna_gp, chip_gt, rna_gp_all)
in2pgm["a_prime"] = preparer.estimate_a_prime(rna_gp_all)
# Perform modality checks and prepare the data for PGM input
## e.g., 1.4k genes (from step 1) --> 1.2k genes after modality check
flt_rna_tg, flt_chip_tg, flt_rna_gp = preparer.check_modality_per_gene()
# Combine and add controls
in2pgm["obs"] = preparer.combine_and_add_controls(
flt_rna_tg, flt_chip_tg, flt_rna_gp
)
# Set parameters for the PGM execution
pgm_param = {"ni": args.n_iteration, "nc": args.n_chain, "nt": args.n_tune}
# Initialize and run the PGM
pgm = CorePGM(in2pgm, pgm_param, args.Prior_T)
pgm.run(args.out_dir, args.n_chain, args.start_batch, args.batch_size)
if __name__ == "__main__":
print_logo("Running step 2")
step2_main()