Source code for cgr_gwas_qc.testing.data

"""Testing data access.

The goal of this module is to make test data easily accessible in fixtures
and tests. Here we are hiding some of the complexity and provide a simple API
for copying test data into a working directory and creating a config.yml.

Currently, we have two sources for test data ``TestData`` and ``RealData``.

The ``TestData`` contains a set of very small test datasets that I collected
from the internet. These data are ideal for testing specific functionality
like converting file types and running upstream parts of the workflow.
However, the data are synthetic so they will not work with filtering steps
and various parts of the workflow.

The ``RealData`` contains ~200 real samples. These data are potentially
re-identifiable so they cannot be distributed outside of the network.
However, because they are real data they will behave well for testing
filtering steps and downstream parts of the workflow. This data also includes
all outputs for these samples from the Production workflow. This will allow
regression testing of most parts of the workflow.
"""

import os
import shutil
import subprocess
from abc import ABC, abstractmethod
from collections import defaultdict
from pathlib import Path
from typing import MutableMapping, Optional, TypeVar, Union
from warnings import warn

from cgr_gwas_qc.cli.pre_flight import update_sample_sheet
from cgr_gwas_qc.models.config.reference_files import ReferenceFiles
from cgr_gwas_qc.models.config.software_params import SoftwareParams
from cgr_gwas_qc.models.config.user_files import UserFiles
from cgr_gwas_qc.models.config.workflow_params import WorkflowParams
from cgr_gwas_qc.parsers.sample_sheet import SampleManifest
from cgr_gwas_qc.testing import make_snakefile, make_test_config

DEFAULT_TEST_DATA_SERVER = "cgemsiii.nci.nih.gov"
DEFAULT_TEST_DATA_PATH = "/DCEG/CGF/Bioinformatics/Production/data/cgr_gwas_qc/test_data"

# This is a trick to add a type annotation that says that the method is
# returning the class itself. This will magically update to the subclass too.
T = TypeVar("T", bound="DataRepo")
U = Union["DataRepo", T]  # I had to use the Union instead of `T` to get mypy to pass


class DataRepo(ABC):
    """Abstract Base Class for Test Data.

    My goal is to create a set of classes to easily copy test datasets to the
    test's working directory. The general functionality is very similar for
    different sources of tests so I opted to use class inheritance instead of
    duplicating code/logic.

    Similar to a regular class, an Abstract Base Class (ABC), allows you to
    create attributes/properties that are inheritable by subclasses. However,
    the ABC also allows you to define the attributes/methods that your
    subclasses need but not implement them.
    """

    # Required Class Attributes, make sure to define in your subclass.
    _data_type: str  # This will be the `project_name` in the config
    _data_path: Path  # Path to the data repository
    _sample_sheet: str  # name of the sample sheet relative to the data repository
    _illumina_manifest_file: str  # name of the BPM file
    _thousand_genome_vcf: str  # name of the VCF file
    _thousand_genome_tbi: str  # name of the VCF.TBI file
    _gtc_pattern: str  # GTC file name pattern
    _idat_red_pattern: str  # Red IDAT file name pattern
    _idat_green_pattern: str  # Green IDAT file name pattern
    _ped: str  # Name of the aggregate samples PED
    _map: str  # Name of the aggregate samples MAP
    _bed: str  # Name of the aggregate samples BED
    _bim: str  # Name of the aggregate samples BIM
    _fam: str  # Name of the aggregate samples FAM
    _num_snps: int  # Number of SNPs in the array
    _subject_id_column: str  # The column containing subject information
    _expected_sex_column: str  # The column containing sex information
    _case_control_column: str  # The column containing Case/Control information
    _reference_fasta: str  # fasta reference file for GCT to VCF

    def __init__(self, working_dir: Optional[Path] = None):
        self.working_dir = working_dir
        self.ss = SampleManifest(self / self._sample_sheet)
        self._config: MutableMapping = defaultdict(dict)

        self._config["project_name"] = self.ss.header["Project Name"].split(";")[0]
        self._config["snp_array"] = self.ss.manifests["snp_array"]
        self._config["num_snps"] = self._num_snps

        self._config["sample_sheet"] = (self / self._sample_sheet).resolve()
        self._config["num_samples"] = self.ss.data.shape[0]

        self._config["reference_files"] = ReferenceFiles(
            illumina_manifest_file=(self / self._illumina_manifest_file).resolve(),
            thousand_genome_vcf=(self / self._thousand_genome_vcf).resolve(),
            thousand_genome_tbi=(self / self._thousand_genome_tbi).resolve(),
            reference_fasta=(self / self._reference_fasta).resolve(),
        ).dict()

        self._config["user_files"] = UserFiles().dict()

        self._config["software_params"] = SoftwareParams().dict()

        self._config["workflow_params"] = WorkflowParams(
            subject_id_column=self._subject_id_column,
            expected_sex_column=self._expected_sex_column,
            case_control_column=self._case_control_column,
        ).dict()

    @abstractmethod
    def _add_gtcs(self):
        """Each subclass must define this method.

        Handling of copying GTCs is very specific to the source of the test
        data. So I require that each subclass defines this method.
        """
        raise NotImplementedError

    @abstractmethod
    def _add_idats(self):
        """Each subclass must define this method.

        Handling of copying IDATs is very specific to the source of the test
        data. So I require that each subclass defines this method.
        """
        raise NotImplementedError

    def make_cgr_sample_sheet(self, **kwargs) -> U:
        """Parse the manifest file and create the cgr sample sheet.

        Parses the sample manifest and creates a sample sheet saved to
        ``working_dir/cgr_sample_sheet.csv``. Similar steps are performed
        during ``cgr preflight``.

        Args:
            problem_sample_ids: A list of Sample IDs to mark for exclusion. Defaults to None.
            kwargs: Any valid ``cgr_gwas_qc.models.config.workflow_params.WorkflowPrams``.

        Raises:
            ValueError: You must have provided a working directory when
                        instantiating the object.

        Returns:
            Creates a file ``working_dir/cgr_sample_sheet.csv`` and returns
            the ``DataRepo`` object.
        """
        if self.working_dir:
            params = WorkflowParams(**{**self._config["workflow_params"], **kwargs})
            sample_sheet = SampleManifest(self._config["sample_sheet"]).data
            update_sample_sheet(
                sample_sheet,
                params.subject_id_column,
                params.expected_sex_column,
                params.case_control_column,
                [],
                2,
            ).to_csv(self.working_dir / "cgr_sample_sheet.csv", index=False)
            return self
        else:
            raise ValueError("You need to have set ``self.working_dir``.")

    def add_user_files(self, entry_point: str = "bed", copy: bool = True) -> U:
        """Add user provided files.

        The user files can either be copied into the working directory or
        have their full path referenced in the config. If a working directory is
        given (``self.working_dir``) then they will be copied otherwise they
        will be referenced in the config.

        Args:
            entry_point: Which entry point to use ("gtc", "ped", or "bed").
                            Defaults to "bed".

        """
        if self.working_dir and copy:
            self._copy_user_files(entry_point)
        else:
            self._link_user_files(entry_point)

        return self

    def _link_user_files(self, entry_point: str):
        # Don't copy. Add full path to user files in the data repository.
        if entry_point == "gtc":
            self._config["user_files"] = {
                "gtc_pattern": (self._data_path.absolute() / self._gtc_pattern).as_posix(),
                "idat_pattern": {
                    "red": (self._data_path.absolute() / self._idat_red_pattern).as_posix(),
                    "green": (self._data_path.absolute() / self._idat_green_pattern).as_posix(),
                },
            }

        elif entry_point == "ped":
            self._config["user_files"] = {
                "ped": (self / self._ped).absolute(),
                "map": (self / self._map).absolute(),
            }

        else:
            self._config["user_files"] = {
                "bed": (self / self._bed).absolute(),
                "bim": (self / self._bim).absolute(),
                "fam": (self / self._fam).absolute(),
            }

    def _copy_user_files(self, entry_point: str):
        if entry_point == "gtc":
            self._config["user_files"] = {
                "gtc_pattern": self._gtc_pattern,
                "idat_pattern": {"red": self._idat_red_pattern, "green": self._idat_green_pattern},
            }
            self._add_gtcs()
            self._add_idats()

        elif entry_point == "ped":
            self._config["user_files"] = {
                "ped": "samples.ped",
                "map": "samples.map",
            }
            self.copy(self._ped, self._config["user_files"]["ped"])
            self.copy(self._map, self._config["user_files"]["map"])

        elif entry_point == "bed":
            self._config["user_files"] = {
                "bed": "samples.bed",
                "bim": "samples.bim",
                "fam": "samples.fam",
            }
            self.copy(self._bed, self._config["user_files"]["bed"])
            self.copy(self._bim, self._config["user_files"]["bim"])
            self.copy(self._fam, self._config["user_files"]["fam"])

    def make_config(self, **kwargs) -> U:
        """Create a config.yml in ``self.working_dir``.

        The config.yml will be based on which objects were added using the
        ``self.add_*`` methods.

        Args:
            kwargs: Config options that you would like to set when creating
              the config.yml
        """
        # Update config with passed items
        for k, v in kwargs.items():
            if isinstance(v, dict):
                self._config[k].update(v)
            else:
                self._config[k] = v

        if self.working_dir:
            make_test_config(self.working_dir, **self._config)
        else:
            raise ValueError("You need to have set ``self.working_dir``.")
        return self

    def make_snakefile(self, contents: str) -> U:
        """Convenience wrapper of ``cgr_gwas_qc.testing.make_snakefile``."""
        if self.working_dir:
            make_snakefile(self.working_dir, contents)
        else:
            raise ValueError("You need to have set ``self.working_dir``.")
        return self

    def copy(self, source: str, destination: str) -> U:
        """Copy folder or file to a new directory

        Args:
            source: The name of a folder or file in the repository
              ``self._data_path`` that you want to copy to
              ``self.working_dir``.
            destination: The name where you want to copy the file or
              folder in ``self.working_dir``. Default None.

        Raises:
            ValueError: If the ``source`` does not exist in the repository.
        """
        if not self.working_dir:
            raise ValueError("You need to have set ``self.working_dir``.")

        source_path = (self._data_path / source).absolute()
        destination_path = (self.working_dir / destination).absolute()
        destination_path.parent.mkdir(exist_ok=True, parents=True)  # Make parent dirs if needed
        if source_path.is_dir():
            shutil.copytree(source_path, destination_path, copy_function=shutil.copy)
        elif source_path.is_file():
            shutil.copyfile(source_path, destination_path)
        else:
            raise ValueError(f"{source_path.as_posix()} is not a file or directory.")

        return self

    def __truediv__(self, value) -> Path:
        """Override division operator to build paths."""
        pth = self._data_path / value
        if pth.exists():
            return pth
        raise FileNotFoundError(pth.as_posix())


[docs]class FakeData(DataRepo): """A repository of synthetic data stored in ``tests/data``. Example: >>> cache = FakeData(working_dir) >>> cache / existing_file.txt # If file exists then returns path to the file Path(".../existing_file.txt") >>> cache / non_existing_file.txt # If file does not exist then raises exception FileNotFoundError: ".../non_existing_file.txt" >>> cache.make_cgr_sample_sheet() # copy sample sheet to working dir >>> cache.add_user_files() # add BED/BIM/FAM to working dir >>> cache.make_config() # make the config for files added using cache """ _data_type = "Fake Data" _data_path = (Path(__file__).parents[3] / "tests/data").absolute() _sample_sheet = "example_sample_sheet.csv" _illumina_manifest_file = "illumina/bpm/small_manifest.bpm" _thousand_genome_vcf = "1KG/small_1KG.vcf.gz" _thousand_genome_tbi = "1KG/small_1KG.vcf.gz.tbi" _reference_fasta = "" _gtc_pattern = "{Sample_ID}.gtc" _idat_red_pattern = "{SentrixBarcode_A}_{SentrixPosition_A}_Red.idat" _idat_green_pattern = "{SentrixBarcode_A}_{SentrixPosition_A}_Grn.idat" _ped = "plink/samples.ped" _map = "plink/samples.map" _bed = "plink/samples.bed" _bim = "plink/samples.bim" _fam = "plink/samples.fam" _test_gtc = "illumina/gtc/small_genotype.gtc" _test_idat = "illumina/idat/small_intensity.idat" _snp_array = "Fake-GSA" _num_snps = 2_000 _subject_id_column = "LIMS_Individual_ID" _expected_sex_column = "Expected_Sex" _case_control_column = "Case/Control_Status" def add_user_files(self, entry_point: str = "bed", copy: bool = True) -> U: if (self.working_dir is None or not copy) and entry_point == "gtc": # TestData only has 1 GTC file that I copy multiple times to # simulate samples. So this entry point must be copied into the # working directory. raise ValueError("Test GTC files must be copied to the working directory.") return super().add_user_files(entry_point, copy) def _add_gtcs(self): for _, r in self.ss.data.iterrows(): self.copy(self._test_gtc, self._gtc_pattern.format(**dict(r))) def _add_idats(self): for _, r in self.ss.data.iterrows(): self.copy(self._test_idat, self._idat_red_pattern.format(**dict(r))) self.copy(self._test_idat, self._idat_green_pattern.format(**dict(r)))
def _make_cgr_cache(name) -> Path: """Create cache folder.""" suffix = "cgr_gwas_qc/" + name if os.environ.get("XDG_CACHE_HOME"): _cache_path = Path(os.environ["XDG_CACHE_HOME"]) / suffix else: _cache_path = Path(__file__).absolute().parents[3] / ".cache" / suffix _cache_path.mkdir(parents=True, exist_ok=True) return _cache_path
[docs]class RealData(DataRepo): """A repository of real data stored on cgems. Example: >>> cache = ReadData(working_dir, sync=True) # automatically symlink (on cgems) or rsync data locally >>> cache / existing_file.txt # If file exists then returns path to the file Path("../../../.cache/cgr_gwas_qc/test_data/existing_file.txt") >>> cache / non_existing_file.txt # If file does not exist then raises exception FileNotFoundError: "../../../.cache/cgr_gwas_qc/test_data/non_existing_file.txt" >>> cache.make_cgr_sample_sheet() # copy sample sheet to working dir >>> cache.add_user_files(copy=False) # add full path of user files to config >>> cache.make_config() # make the config for files added using cache """ _data_type = "Real Data" _data_path = _make_cgr_cache("test_data") _sample_sheet = "original_data/manifest_short.csv" _illumina_manifest_file = "reference_data/GSAMD-24v1-0_20011747_A1.bpm" _thousand_genome_vcf = ( "reference_data/ALL.wgs.phase3_shapeit2_mvncall_integrated_v5.20130502.sites.vcf.gz" ) _thousand_genome_tbi = ( "reference_data/ALL.wgs.phase3_shapeit2_mvncall_integrated_v5.20130502.sites.vcf.gz.tbi" ) _gtc_pattern = "original_data/{SentrixBarcode_A}_{SentrixPosition_A}.gtc" _idat_red_pattern = "original_data/{SentrixBarcode_A}_{SentrixPosition_A}_Red.idat" _idat_green_pattern = "original_data/{SentrixBarcode_A}_{SentrixPosition_A}_Grn.idat" _ped = "production_outputs/plink_start/samples.ped" _map = "production_outputs/plink_start/samples.map" _bed = "production_outputs/plink_start/samples.bed" _bim = "production_outputs/plink_start/samples.bim" _fam = "production_outputs/plink_start/samples.fam" _snp_array = "GSAMD-24v1-0" _num_snps = 700078 _subject_id_column = "PI_Subject_ID" _expected_sex_column = "Expected_Sex" _case_control_column = "Case/Control_Status" def __init__( self, working_dir: Optional[Path] = None, full_sample_sheet: bool = True, sync: bool = False, GRCh_version: int = 37, ): """A real test data repository of ~200 samples. These data are stored on cgems. If you are running this on a computer with access to cgems then providing ``sync=True`` will rsync the data into a local cache. If you are running this on cgems then ``sync=True`` will create a symlink into the cache folder. Args: sync: If True we will try to cache data locally [Default False]. GRCh_version: Which version of the human genome to use [Default 37]. If it is 38 then override the Illumina manifest and thousand genomes files. """ if full_sample_sheet: self._sample_sheet = "original_data/manifest_full.csv" if sync: self._sync_data() if GRCh_version == 38: # Override reference file sto GRCh38 warn("User files are still only available for GRCh37.") self._illumina_manifest_file = "reference_data/GSAMD-24v1-0_20011747_A2.bpm" self._thousand_genome_vcf = ( "reference_data/ALL.wgs.shapeit2_integrated_v1a.GRCh38.20181129.sites.vcf.gz" ) self._thousand_genome_tbi = ( "reference_data/ALL.wgs.shapeit2_integrated_v1a.GRCh38.20181129.sites.vcf.gz.tbi" ) super().__init__(working_dir) def _add_gtcs(self): target_folder = self.working_dir / "original_data" target_folder.mkdir(exist_ok=True) for file_name in (self / "original_data").glob("*.gtc"): shutil.copy(file_name, target_folder) def _add_idats(self): target_folder = self.working_dir / "original_data" target_folder.mkdir(exist_ok=True) for file_name in (self / "original_data").glob("*.idat"): shutil.copy(file_name, target_folder) def _sync_data(self): """Symlink or Rsync data to cache folder.""" user = os.environ.get("TEST_DATA_USER", None) or os.environ.get("USER") server = os.environ.get("TEST_DATA_SERVER", DEFAULT_TEST_DATA_SERVER) remote_path = Path(os.environ.get("TEST_DATA_PATH", DEFAULT_TEST_DATA_PATH)) if remote_path.exists(): self._data_path.symlink_to(remote_path) return # Rsync from remote server cmd = " ".join( [ "rsync", "-aL", "--exclude=.snakemake", "--exclude=logs", "--exclude=gwas_qc_log.*", f"{user}@{server}:{remote_path.as_posix()}/", f"{self._data_path.as_posix()}/", ] ) try: subprocess.run( cmd, shell=True, check=True, stderr=subprocess.STDOUT, executable="/bin/bash" ) except subprocess.SubprocessError: warn( f"Could not connect to {user}@{server}:{remote_path}. To " "change set the environmental variable `TEST_DATA_USER`, " "`TEST_DATA_SERVER`, and `TEST_DATA_PATH." )