Source code for singlecell_cookbook.tools.analysis.pseudobulk._edger

import pandas as pd
from anndata import AnnData

from .utils import aggregate_and_filter


[docs] def edger_pseudobulk( adata_: AnnData, group_key: str, condition_group: str | list[str] | None = None, reference_group: str | None = None, cell_identity_key: str | None = None, layer: str | None = None, replicas_per_group: int = 10, min_cells_per_group: int = 30, bootstrap_sampling: bool = True, use_cells: dict[str, list[str]] | None = None, aggregate: bool = True, verbosity: int = 0, ) -> dict[str, pd.DataFrame]: """ Fits a model using edgeR and computes top tags for a given condition vs reference group. Parameters ---------- adata_ : AnnData Annotated data matrix. group_key : str Key in AnnData object to use to group cells. condition_group : str | list[str] | None, optional Condition group to compare to reference group. If None, each group will be contrasted to the corresponding reference group. reference_group : str | None, optional Reference group to compare condition group(s) to. If None, the condition group is compared to the rest of the cells. cell_identity_key : str | None, optional If provided, separate contrasts will be computed for each identity. Defaults to None. layer : str | None, optional Layer in AnnData object to use. EdgeR requires raw counts. Defaults to None. replicas_per_group : int, optional Number of replicas to create for each group. Defaults to 10. min_cells_per_group : int, optional Minimum number of cells required for a group to be included. Defaults to 30. bootstrap_sampling : bool, optional Whether to use bootstrap sampling to create replicas. Defaults to True. use_cells : dict[str, list[str]] | None, optional If not None, only use the specified cells. Defaults to None. Dictionary key is a categorical variable in the obs dataframe and the dictionary value is a list of categories to include. aggregate : bool, optional Whether to aggregate cells before fitting the model. EdgeR requires a small number of samples, so if adata_ is a single-cell experiment, the cells should be aggregated. Defaults to True. verbosity : int, optional Verbosity level. Defaults to 0. Returns ------- dict[str, pd.DataFrame] Dictionary of dataframes, one for each contrast, with the following columns: * gene_ids : str Gene IDs. * logFC : float Log2 fold change. * logCPM : float Log2 CPM. * F: float F-statistic. * PValue : float p-value. * FDR : float False discovery rate. * pct_expr_cnd : float Percentage of cells in condition group expressing the gene. * pct_expr_ref : float Percentage of cells in reference group expressing the gene. """ import anndata2ri import rpy2.robjects as robjects from rpy2.rinterface_lib.embedded import RRuntimeError from rpy2.robjects import numpy2ri, pandas2ri from rpy2.robjects.conversion import localconverter numpy2ri.activate() R = robjects.r if aggregate: aggr_adata = aggregate_and_filter( adata_, group_key, cell_identity_key, layer, replicas_per_group, min_cells_per_group, bootstrap_sampling, use_cells, ) else: aggr_adata = adata_.copy() with localconverter(anndata2ri.converter): R.assign("aggr_adata", aggr_adata) # defines the R function for fitting the model with edgeR R(_fit_model_r_script) if condition_group is None: condition_group_list = aggr_adata.obs[group_key].unique() elif isinstance(condition_group, str): condition_group_list = [condition_group] else: condition_group_list = condition_group if cell_identity_key is not None: cids = aggr_adata.obs[cell_identity_key].unique() else: cids = [""] tt_dict = {} for condition_group in condition_group_list: if reference_group is not None and condition_group == reference_group: continue if verbosity > 0: print(f"Fitting model for {condition_group}...") if reference_group is not None: gk = group_key else: gk = f"{group_key}_{condition_group}" try: R(f""" outs <- fit_model(aggr_adata, "{gk}", "{cell_identity_key}", verbosity = {verbosity}) fit <- outs$fit y <- outs$y """) except RRuntimeError as e: print("Error fitting model for", condition_group) print("Error:", e) print("Skipping...", flush=True) continue if reference_group is None: new_contrasts_tuples = [ ( condition_group, # common prefix "", # condition group "not", # reference group cid, # cell identity ) for cid in cids ] else: new_contrasts_tuples = [ ( "", # common prefix condition_group, # condition group reference_group, # reference group cid, # cell identity ) for cid in cids ] new_contrasts = [ f"group{cnd}{prefix}_{cid}".strip("_") + "-" + f"group{ref}{prefix}_{cid}".strip("_") for prefix, cnd, ref, cid in new_contrasts_tuples ] for contrast, contrast_tuple in zip(new_contrasts, new_contrasts_tuples): prefix, cnd, ref, cid = contrast_tuple if ref == "not": cnd, ref = "", "rest" contrast_key = f"{prefix}{cnd}_vs_{ref}" if cid: contrast_key = f"{cell_identity_key}:{cid}|{contrast_key}" if verbosity > 0: print(f"Computing contrast: {contrast_key}... ({contrast})") R(f"myContrast <- makeContrasts('{contrast}', levels = y$design)") R("qlf <- glmQLFTest(fit, contrast=myContrast)") R("tt <- topTags(qlf, n = Inf)$table") tt: pd.DataFrame = pandas2ri.rpy2py(R("tt")) tt.index.name = "gene_ids" genes = tt.index cnd, ref = [c[5:] for c in contrast.split("-")] tt["pct_expr_cnd"] = aggr_adata.var[f"pct_expr_{cnd}"].loc[genes] tt["pct_expr_ref"] = aggr_adata.var[f"pct_expr_{ref}"].loc[genes] tt_dict[contrast_key] = tt return tt_dict
_fit_model_r_script = """ suppressPackageStartupMessages({ library(edgeR) library(MAST) }) fit_model <- function(adata_, group_key, cell_identity_key = "None", verbosity = 0){ if (verbosity > 0){ cat("Group key:", group_key, "\n") cat("Cell identity key:", cell_identity_key, "\n") } # create an edgeR object with counts and grouping factor y <- DGEList(assay(adata_, "X"), group = colData(adata_)[[group_key]]) # filter out genes with low counts if (verbosity > 1){ cat("Dimensions before subsetting:", dim(y), "\n") } keep <- filterByExpr(y) y <- y[keep, , keep.lib.sizes=FALSE] if (verbosity > 1){ cat("Dimensions after subsetting:", dim(y), "\n") } # normalize y <- calcNormFactors(y) # create a vector that is concatentation of condition and cell type that we will later use with contrasts if (cell_identity_key == "None"){ group <- colData(adata_)[[group_key]] } else { group <- paste0(colData(adata_)[[group_key]], "_", colData(adata_)[[cell_identity_key]]) } if (verbosity > 1){ cat("Group(s):", group, "\n") } replica <- colData(adata_)$replica # create a design matrix: here we have multiple replicas so also consider that in the design matrix design <- model.matrix(~ 0 + group + replica) # estimate dispersion y <- estimateDisp(y, design = design) # fit the model fit <- glmQLFit(y, design) return(list("fit"=fit, "design"=design, "y"=y)) } """