Source code for metaDMG.utils

#%%
import re
import warnings
from functools import partial
from itertools import islice
from pathlib import Path
from typing import Iterable, Iterator, Optional, Union

import numpy as np
import pandas as pd
import psutil
import typer
import yaml
from logger_tt import logger
from scipy.stats import betabinom as sp_betabinom


#%%


[docs]class Config(dict): """Config contains the parameters related to specific alignment file.""" pass
[docs]class Configs(dict): """Configs contains the parameters related to config file. Inherits from dict. Implements iterations. """ def __iter__(self) -> Iterator[Config]: """Iteration Yields ------ Iterator[Config] Allow for iteration """ dir_lca = self["output_dir"] / "lca" dir_pmd = self["output_dir"] / "pmd" samples = self["samples"].keys() for sample in samples: config = Config(self) config["sample"] = sample config["bam"] = config["samples"][sample] config["path_mismatches_txt"] = dir_lca / f"{sample}.mismatches.txt.gz" if config["damage_mode"] == "lca": config["path_mismatches_stat"] = ( dir_lca / f"{sample}.mismatches.stat.txt.gz" ) else: config["path_mismatches_stat"] = dir_lca / f"{sample}.stat.txt" config["path_lca"] = dir_lca / f"{sample}.lca.txt.gz" config["path_lca_log"] = dir_lca / f"{sample}.log.txt" config["path_tmp"] = config["output_dir"] / "tmp" / sample config["path_pmd"] = dir_pmd / f"{sample}.pmd.txt.gz" yield config
[docs] def get_nth(self, n: int) -> Config: """Gets the n'th config Parameters ---------- n The index Returns ------- Config A single configuration """ return next(islice(self, n, None))
[docs] def get_first(self) -> Config: """Get the first config Returns ------- Config A single configuration """ return self.get_nth(n=0)
def __len__(self) -> int: """The number of configs Returns ------- int The number of configs """ return len(self["samples"].keys())
[docs] def check_number_of_jobs(self) -> None: """Compare the number of configs to the number of parallel_samples used.""" parallel_samples = min(self["parallel_samples"], len(self["samples"])) cores_per_sample = self["cores_per_sample"] N_jobs = parallel_samples * cores_per_sample max_cores = psutil.cpu_count(logical=True) max_cores_real = psutil.cpu_count(logical=False) if N_jobs > max_cores: logger.warning( f"The total number of jobs {N_jobs} are higher " f"than the number of parallel_samples {max_cores}. " f"Do not do this unless you know what you are doing. " f"Try decreasing either 'parallel_samples' or 'parallel_samples-per-sample'." ) elif N_jobs > max_cores_real: logger.info( f"The total number of jobs {N_jobs} are higher " f"than the real number of parallel_samples {max_cores_real} (non-logical). " f"This might decrease performance. " )
[docs]def make_configs( config_file: Optional[Path], log_port: Optional[int] = None, log_path: Optional[str] = None, force: bool = False, ) -> Configs: """Create an instance of Configs from a config file Parameters ---------- config_file The config file to load log_port Optional log port, by default None log_path Optional log path, by default None force Whether or not the computations are force, by default False Returns ------- An instance of Configs Raises ------ typer.Abort If not a proper config file """ if config_file is None: config_file = Path("config.yaml") if not config_file.exists(): logger.error("Error! Please select a proper config file!") raise typer.Abort() logger.info(f"Using {config_file} as config file.") with open(config_file, "r") as file: d = yaml.safe_load(file) d = update_old_config(d) d["log_port"] = log_port d["log_path"] = log_path d.setdefault("forward_only", False) d.setdefault("cores_per_sample", 1) d.setdefault("damage_mode", "lca") d.setdefault("min_reads", 0) d["force"] = force paths = ["names", "nodes", "acc2tax", "output_dir", "config_file"] for path in paths: if d[path]: d[path] = Path(d[path]) for key, val in d["samples"].items(): d["samples"][key] = Path(val) for key, val in d.items(): if isinstance(val, str): if val.isdigit(): d[key] = int(key) d["custom_database"] = 0 if d["custom_database"] else 1 return Configs(d)
#%% def update_old_config(d: dict) -> dict: if "version" in d: # new version, not changing anything return d logger.warning( "Using an old version of the config file. Please remake the config file." ) d_old2new = { "metaDMG-lca": "metaDMG_cpp", "minmapq": "min_mapping_quality", "editdistmin": "min_edit_dist", "editdistmax": "max_edit_dist", "simscorelow": "min_similarity_score", "simscorehigh": "max_similarity_score", "weighttype": "weight_type", "storage_dir": "output_dir", "dir": "output_dir", "fix_ncbi": "custom_database", "cores": "parallel_samples", "cores_per_sample": "cores_per_sample", "config_path": "config_file", } d_new = {} for key, value in d.items(): if key in d_old2new: key = d_old2new[key] d_new[key] = value d_new.pop("forced") return d_new #%% # def remove_file(file: Path | str, missing_ok: bool = False) -> None: def remove_file(file: Union[Path, str], missing_ok: bool = False) -> None: Path(file).unlink(missing_ok=missing_ok) # def remove_directory(path: Path | str, missing_ok: bool = False) -> None:
[docs]def remove_directory(path: Union[Path, str], missing_ok: bool = False) -> None: """Remove everything in a directory Parameters ---------- path Directory to be deleted """ try: path = Path(path) for child in path.iterdir(): if child.is_file(): remove_file(child) else: remove_directory(child) path.rmdir() except FileNotFoundError: if not missing_ok: raise
#%%
[docs]def split_string(s: str) -> list[str]: """Split a string by comma, space, or both. Parameters ---------- s Input string Returns ------- List of strings """ return re.findall(r"[^,\s]+", s)
#%% #%% #%% def check_metaDMG_fit(): try: import metaDMG.fit except ModuleNotFoundError: print("""The 'fit' extras has to be installed: pip install "metaDMG[fit]" """) raise typer.Abort() def check_metaDMG_viz(): try: import metaDMG.viz except ModuleNotFoundError: print("""The 'viz' extras has to be installed: pip install "metaDMG[viz]" """) raise typer.Abort() #%%
[docs]def get_results_dir( config_file: Optional[Path] = None, results_dir: Optional[Path] = None, ) -> Path: """Helper function that gets the results directory from either the config file or the results directory directly. Parameters ---------- config_file Config file, by default None results_dir Results directory, by default None Returns ------- Path to the results directory Raises ------ AssertionError If both config file and results directory are set, raise error """ if config_file is not None and results_dir is not None: raise AssertionError("'config_file' and 'results_dir' cannot both be set") if results_dir: return results_dir if config_file is None: config_file = Path("config.yaml") configs = make_configs(config_file) return configs["output_dir"] / "results"
#%% def get_single_fit_prediction(df_results): Bayesian = any([column == "damage" for column in df_results.columns]) if Bayesian: prefix = "" else: prefix = "MAP_" if "k-1" in df_results.columns: forward_only = False else: forward_only = True A = df_results[f"{prefix}A"].values q = df_results[f"{prefix}q"].values c = df_results[f"{prefix}c"].values phi = df_results[f"{prefix}phi"].values max_position = max( int(name.split("+")[1]) for name in df_results.columns if name.startswith("k+") ) if forward_only: x = np.arange(max_position) + 1 else: x = np.hstack( [np.arange(max_position) + 1, np.arange(-1, -max_position - 1, -1)] ) x = x.reshape((-1, 1)) mask_N = [ (name.startswith("N+") or name.startswith("N-")) for name in df_results.columns ] N = df_results.iloc[:, mask_N].values Dx = A * (1 - q) ** (np.abs(x) - 1) + c alpha = Dx * phi beta = (1 - Dx) * phi dist = sp_betabinom(n=N, a=alpha.T, b=beta.T) with warnings.catch_warnings(): warnings.filterwarnings("ignore") std = dist.std() / N std[np.isnan(std)] = 0 df_Dx = pd.concat( ( # pd.DataFrame(df_results.tax_id, columns=["tax_id"]), pd.DataFrame(Dx.T, columns=[f"Dx{xi:+}" for xi in x.flatten()]), pd.DataFrame(std, columns=[f"Dx_std{xi:+}" for xi in x.flatten()]), ), axis=1, ) return df_Dx def append_fit_predictions(df_results): df_Dx = get_single_fit_prediction(df_results) return pd.concat((df_results.reset_index(drop=True), df_Dx), axis=1) #%%
[docs]def run_PMD(config: Config): """Run the PMD command from metaDMG-cpp and output the result to the gzipped txt_out Parameters ---------- alignment_file Alignment file to compute the PMD scores on txt_out The (gzipped) output txt file """ import gzip import shlex import subprocess txt_out = config["path_pmd"] txt_out.parent.mkdir(parents=True, exist_ok=True) with gzip.open(f"{txt_out}", "wt") as zip_out: cpp = subprocess.Popen( shlex.split(f"{config['metaDMG_cpp']} pmd {config['bam']}"), stdout=subprocess.PIPE, ) zip = subprocess.Popen( ["gzip"], stdin=cpp.stdout, stdout=zip_out, ) zip.communicate()