## Copyright (c) 2016, Illumina
## All rights reserved.
##
## Redistribution and use in source and binary forms, with or without
## modification, are permitted provided that the following conditions are met:
##
## 1. Redistributions of source code must retain the above copyright notice, this
## list of conditions and the following disclaimer.
## 2. Redistributions in binary form must reproduce the above copyright notice,
## this list of conditions and the following disclaimer in the documentation
## and/or other materials provided with the distribution.
##
## THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
## ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
## WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
## DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
## ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
## (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
## LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
## ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
## (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
## SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
##
## The views and conclusions contained in the software and documentation are those
## of the authors and should not be interpreted as representing official policies,
## either expressed or implied, of the FreeBSD Project.
# This file is a copy of
# https://github.com/Illumina/GTCtoVCF/IlluminaBeadArraysFiles.py with the
# following changes:
# - Reformated code with black
# - Added comment to disable flake8
# - Refactored GenotypeCalls.get_base_calls* to use generators
# flake8: noqa
__version__ = "1.2.0"
import struct
from typing import Generator
from numpy import arctan2, cos, dtype, float32, frombuffer, int32, pi, seterr, sin, uint16
seterr(divide="ignore", invalid="ignore")
nan = float32("nan")
code2genotype = [
"NC",
"AA",
"AB",
"BB",
"NULL",
"A",
"B",
"AAA",
"AAB",
"ABB",
"BBB",
"AAAA",
"AAAB",
"AABB",
"ABBB",
"BBBB",
"AAAAA",
"AAAAB",
"AAABB",
"AABBB",
"ABBBB",
"BBBBB",
"AAAAAA",
"AAAAAB",
"AAAABB",
"AAABBB",
"AABBBB",
"ABBBBB",
"BBBBBB",
"AAAAAAA",
"AAAAAAB",
"AAAAABB",
"AAAABBB",
"AAABBBB",
"AABBBBB",
"ABBBBBB",
"BBBBBBB",
"AAAAAAAA",
"AAAAAAAB",
"AAAAAABB",
"AAAAABBB",
"AAAABBBB",
"AAABBBBB",
"AABBBBBB",
"ABBBBBBB",
"BBBBBBBB",
]
NC = chr(0)
AA = chr(1)
AB = chr(2)
BB = chr(3)
[docs]class GenotypeCalls:
"""Class to parse gtc files as produced by Illumina AutoConvert
and AutoCall software.
Attributes:
supported_versions: Supported file versions as a list of integers
"""
__ID_NUM_SNPS = 1
__ID_PLOIDY = 2
__ID_PLOIDY_TYPE = 3
__ID_SAMPLE_NAME = 10
__ID_SAMPLE_PLATE = 11
__ID_SAMPLE_WELL = 12
__ID_CLUSTER_FILE = 100
__ID_SNP_MANIFEST = 101
__ID_IMAGING_DATE = 200
__ID_AUTOCALL_DATE = 201
__ID_AUTOCALL_VERSION = 300
__ID_NORMALIZATION_TRANSFORMS = 400
__ID_CONTROLS_X = 500
__ID_CONTROLS_Y = 501
__ID_RAW_X = 1000
__ID_RAW_Y = 1001
__ID_GENOTYPES = 1002
__ID_BASE_CALLS = 1003
__ID_GENOTYPE_SCORES = 1004
__ID_SCANNER_DATA = 1005
__ID_CALL_RATE = 1006
__ID_GENDER = 1007
__ID_LOGR_DEV = 1008
__ID_GC10 = 1009
__ID_GC50 = 1011
__ID_B_ALLELE_FREQS = 1012
__ID_LOGR_RATIOS = 1013
__ID_PERCENTILES_X = 1014
__ID_PERCENTILES_Y = 1015
__ID_SLIDE_IDENTIFIER = 1016
supported_version = [3, 4, 5]
def __init__(self, filename, ignore_version=False, check_write_complete=True):
"""Constructor
Args:
filename (string): GTC filename
ignore_version (bool): boolean to ignore automated checks on
file version, not recommended (default: False)
Returns:
GenotypeCalls object
"""
self.filename = filename
with open(self.filename, "rb") as gtc_handle:
identifier = gtc_handle.read(3)
if identifier != b"gtc":
raise Exception("GTC format error: bad format identifier")
self.version = read_byte(gtc_handle)
if self.version not in GenotypeCalls.supported_version and not ignore_version:
raise Exception("Unsupported GTC File version (" + str(self.version) + ")")
number_toc_entries = read_int(gtc_handle)
#
# Parse the table of contents and map the toc entry
# to the lookup
#
self.toc_table = {}
for toc_idx in range(number_toc_entries):
(id, offset) = struct.unpack("<hI", gtc_handle.read(6))
self.toc_table[id] = offset
if check_write_complete and not self.is_write_complete():
raise Exception("GTC file is incomplete")
def __get_generic(self, toc_entry, parse_function):
"""Internal helper function to access a data element in a
generic fashion.
Args:
toc_entry (int): Identifier for entry in table of contents
parse_function (function): Used to parse the value
from a file handle
Returns:
A single value parsed from the file (type dependent on
parse_function)
"""
with open(self.filename, "rb") as gtc_handle:
gtc_handle.seek(self.toc_table[toc_entry])
return parse_function(gtc_handle)
def __get_generic_array(self, toc_entry, parse_function):
"""Internal helper function to access a data array in a generic
fashion.
Args:
toc_entry (int): Identifier for entry in table of contents
parse_function (function): A function used to parse the value
from a file handle
Returns:
An array parsed from the file (type dependent on parse_function)
"""
with open(self.filename, "rb") as gtc_handle:
gtc_handle.seek(self.toc_table[toc_entry])
num_entries = read_int(gtc_handle)
result = []
for idx in range(num_entries):
result.append(parse_function(gtc_handle))
return result
def __get_generic_array_numpy(self, toc_entry, numpy_type):
"""Internal helper function to access a data array in a generic
fashion.
Args:
toc_entry (int): Identifier for entry in table of contents
parse_function (function): A function used to parse the value
from a file handle
Returns:
An array parsed from the file (type dependent on parse_function)
"""
numpy_type = dtype(numpy_type)
with open(self.filename, "rb") as gtc_handle:
gtc_handle.seek(self.toc_table[toc_entry])
num_entries = read_int(gtc_handle)
return frombuffer(gtc_handle.read(num_entries * numpy_type.itemsize), dtype=numpy_type)
[docs] def get_num_snps(self):
"""Returns:
The number of SNPs in the file as an integer
"""
return self.toc_table[GenotypeCalls.__ID_NUM_SNPS]
[docs] def get_ploidy(self):
"""Returns:
The ploidy of the sample
"""
return self.toc_table[GenotypeCalls.__ID_PLOIDY]
[docs] def get_ploidy_type(self):
"""Returns:
The ploidy type of the sample
"""
return self.toc_table[GenotypeCalls.__ID_PLOIDY_TYPE]
[docs] def get_sample_name(self):
"""Returns:
The name of the sample as a string
"""
return self.__get_generic(GenotypeCalls.__ID_SAMPLE_NAME, read_string)
[docs] def get_slide_identifier(self):
"""Returns:
The name of the sample as a string
"""
return self.__get_generic(GenotypeCalls.__ID_SLIDE_IDENTIFIER, read_string)
[docs] def get_sample_plate(self):
"""Returns:
The name of the sample plate as a string
"""
return self.__get_generic(GenotypeCalls.__ID_SAMPLE_PLATE, read_string)
[docs] def get_sample_well(self):
"""Returns:
The name of the sample well as a string
"""
return self.__get_generic(GenotypeCalls.__ID_SAMPLE_WELL, read_string)
[docs] def get_cluster_file(self):
"""Returns:
The name of the cluster file used for genotyping as a string
"""
return self.__get_generic(GenotypeCalls.__ID_CLUSTER_FILE, read_string)
[docs] def get_snp_manifest(self):
"""Returns:
The name of the manifest used for genotyping as a string
"""
return self.__get_generic(GenotypeCalls.__ID_SNP_MANIFEST, read_string)
[docs] def get_imaging_date(self):
"""Returns:
The imaging date of scanning as a string
For example
Monday, December 01, 2014 4:51:47 PM
"""
return self.__get_generic(GenotypeCalls.__ID_IMAGING_DATE, read_string)
[docs] def get_autocall_date(self):
"""Returns:
The imaging date of scanning as a string
For example
2/17/2015 1:47 PM
"""
return self.__get_generic(GenotypeCalls.__ID_AUTOCALL_DATE, read_string)
[docs] def get_autocall_version(self):
"""Returns:
The version of AutoCall used for genotyping as a string
For example
1.6.2.2
"""
return self.__get_generic(GenotypeCalls.__ID_AUTOCALL_VERSION, read_string)
[docs] def get_genotypes(self):
"""Returns:
A byte list (string) of genotypes. See code2genotype for mapping
"""
return self.__get_generic_array(GenotypeCalls.__ID_GENOTYPES, read_byte)
[docs] def get_base_calls_generic(
self, snps, strand_annotations, report_strand, unknown_annotation
) -> Generator[str, None, None]:
"""
Get base calls on arbitrary strand
Args:
snps (list<string>) : A list of string representing the snp on the design strand for the loci (e.g. [A/C])
strand_annotations (list<int>) : A list of strand annotations for the loci
report_strand (int) : The strand to use for reporting (must match encoding of strand_annotations)
unknown_annotation (int) : The encoding used in strand annotations for an unknown strand
Yields:
The genotype basecalls on the report strand as a string.
The characters are A, C, G, T, or - for a no-call/null.
"""
genotypes = self.get_genotypes()
if len(genotypes) != len(snps):
raise ValueError("The number of SNPs must match the number of loci in the GTC file")
if len(genotypes) != len(strand_annotations):
raise ValueError(
"The number of reference strand annotations must match the number of loci in the GTC file"
)
for genotype, snp, strand_annotation in zip(genotypes, snps, strand_annotations):
ab_genotype = code2genotype[genotype]
a_nucleotide = snp[1]
b_nucleotide = snp[-2]
if (
a_nucleotide == "N"
or b_nucleotide == "N"
or strand_annotation == unknown_annotation
or ab_genotype in ["NC", "NULL"]
):
yield "-"
else:
report_strand_nucleotides = []
for ab_allele in ab_genotype:
nucleotide_allele = a_nucleotide if ab_allele == "A" else b_nucleotide
report_strand_nucleotides.append(
nucleotide_allele
if strand_annotation == report_strand
else complement(nucleotide_allele)
)
yield "".join(report_strand_nucleotides)
[docs] def get_base_calls_plus_strand(
self, snps, ref_strand_annotations
) -> Generator[str, None, None]:
"""
Get base calls on plus strand of genomic reference. If you only see no-calls returned from this method,
please verify that the reference strand annotations passed as argument are not unknown (RefStrand.Unknown)
Args:
snps (list<string>) : A list of string representing the snp on the design strand for the loci (e.g. [A/C])
ref_strand_annotations (list<int>) : A list of strand annotations for the loci (e.g., RefStrand.Plus)
Yields:
The genotype basecalls on the report strand as a string.
The characters are A, C, G, T, or - for a no-call/null.
"""
yield from self.get_base_calls_generic(
snps, ref_strand_annotations, RefStrand.Plus, RefStrand.Unknown
)
[docs] def get_base_calls_forward_strand(
self, snps, forward_strand_annotations
) -> Generator[str, None, None]:
"""
Get base calls on the forward strand.
Args:
snps (list<string>) : A list of string representing the snp on the design strand for the loci (e.g. [A/C])
forward_strand_annotations (list<int>) : A list of strand annotations for the loci (e.g., SourceStrand.Forward)
Yields:
The genotype basecalls on the report strand as a string.
The characters are A, C, G, T, or - for a no-call/null.
"""
yield from self.get_base_calls_generic(
snps, forward_strand_annotations, SourceStrand.Forward, RefStrand.Unknown
)
[docs] def get_base_calls(self) -> Generator[str, None, None]:
"""Yields:
The genotype basecalls as a string.
The characters are A, C, G, T, or - for a no-call/null.
The calls are relative to the top strand.
"""
try:
ploidy_type = self.get_ploidy_type()
except:
ploidy_type = 1
genotypes = self.get_genotypes() if ploidy_type != 1 else None
with open(self.filename, "rb") as gtc_handle:
gtc_handle.seek(self.toc_table[GenotypeCalls.__ID_BASE_CALLS])
num_entries = read_int(gtc_handle)
for idx in range(num_entries):
if ploidy_type == 1:
yield gtc_handle.read(2).decode("utf-8")
else:
data_string = gtc_handle.read(2).decode("utf-8")
ab_genotype = code2genotype[genotypes[idx]]
if ab_genotype in ["NC", "NULL"]:
yield "-"
else:
yield "".join(
[
data_string[0] if allele == "A" else data_string[1]
for allele in ab_genotype
]
)
[docs] def get_genotype_scores(self):
"""Returns:
The genotype scores as a list of floats
"""
return self.__get_generic_array_numpy(GenotypeCalls.__ID_GENOTYPE_SCORES, float32)
[docs] def get_scanner_data(self):
"""Returns:
Information about scanner as ScannerData object
"""
return self.__get_generic(GenotypeCalls.__ID_SCANNER_DATA, read_scanner_data)
[docs] def get_control_x_intensities(self):
"""Returns:
The x intensities of control bead types as a list of integers
"""
return self.__get_generic_array_numpy(GenotypeCalls.__ID_CONTROLS_X, uint16)
[docs] def get_control_y_intensities(self):
"""Returns:
The y intensities of control bead types as a list of integers
"""
return self.__get_generic_array_numpy(GenotypeCalls.__ID_CONTROLS_Y, uint16)
[docs] def get_raw_x_intensities(self):
"""Returns:
The raw x intensities of assay bead types as a list of integers
"""
return self.__get_generic_array_numpy(GenotypeCalls.__ID_RAW_X, uint16)
[docs] def get_raw_y_intensities(self):
"""Returns:
The raw y intensities of assay bead types as a list of integers
"""
return self.__get_generic_array_numpy(GenotypeCalls.__ID_RAW_Y, uint16)
[docs] def get_call_rate(self):
"""Returns:
The call rate as a float
"""
return self.__get_generic(GenotypeCalls.__ID_CALL_RATE, read_float)
[docs] def get_gender(self):
"""Returns:
The gender as a char
M - Male, F - Female, U-Unknown
"""
return self.__get_generic(GenotypeCalls.__ID_GENDER, read_char)
[docs] def get_logr_dev(self):
"""Returns:
The logR deviation as a float
"""
return self.__get_generic(GenotypeCalls.__ID_LOGR_DEV, read_float)
[docs] def get_gc10(self):
"""Returns:
The GC10 (GenCall score - 10th percentile) as a float
"""
return self.__get_generic(GenotypeCalls.__ID_GC10, read_float)
[docs] def get_gc50(self):
"""Returns:
The GC50 (GenCall score - 50th percentile) as a float
"""
return self.__get_generic(GenotypeCalls.__ID_GC50, read_float)
[docs] def get_num_calls(self):
"""Returns:
The number of calls as an integer
"""
with open(self.filename, "rb") as gtc_handle:
gtc_handle.seek(self.toc_table[GenotypeCalls.__ID_GC50] + 4)
return read_int(gtc_handle)
[docs] def get_num_no_calls(self):
"""Returns:
The number of no calls as an integer
"""
with open(self.filename, "rb") as gtc_handle:
gtc_handle.seek(self.toc_table[GenotypeCalls.__ID_GC50] + 8)
return read_int(gtc_handle)
[docs] def get_num_intensity_only(self):
"""Returns:
The number of intensity only SNPs
"""
with open(self.filename, "rb") as gtc_handle:
gtc_handle.seek(self.toc_table[GenotypeCalls.__ID_GC50] + 12)
return read_int(gtc_handle)
[docs] def get_ballele_freqs(self):
"""Returns:
The B allele frequencies as a list of floats
"""
if self.version < 4:
raise Exception(
"B allele frequencies unavailable in GTC File version (" + str(self.version) + ")"
)
return self.__get_generic_array_numpy(GenotypeCalls.__ID_B_ALLELE_FREQS, float32)
[docs] def get_logr_ratios(self):
"""Returns:
The logR ratios as a list of floats
"""
if self.version < 4:
raise Exception(
"LogR ratios unavailable in GTC File version (" + str(self.version) + ")"
)
return self.__get_generic_array_numpy(GenotypeCalls.__ID_LOGR_RATIOS, float32)
[docs] def get_percentiles_x(self):
"""Returns:
An array of length three representing 5th, 50th and 95th percentiles for
x intensity
"""
if self.version < 5:
raise Exception(
"Percentile intensities unavailable in GTC File version (" + str(self.version) + ")"
)
result = []
with open(self.filename, "rb") as gtc_handle:
gtc_handle.seek(self.toc_table[GenotypeCalls.__ID_PERCENTILES_X])
result = []
for idx in range(3):
result.append(read_ushort(gtc_handle))
return result
[docs] def get_percentiles_y(self):
"""Returns:
An array of length three representing 5th, 50th and 95th percentiles for
y intensity
"""
if self.version < 5:
raise Exception(
"Percentile intensities unavailable in GTC File version (" + str(self.version) + ")"
)
result = []
with open(self.filename, "rb") as gtc_handle:
gtc_handle.seek(self.toc_table[GenotypeCalls.__ID_PERCENTILES_Y])
result = []
for idx in range(3):
result.append(read_ushort(gtc_handle))
return result
[docs] def get_normalized_intensities(self, normalization_lookups):
"""Calculate and return the normalized intensities
Args:
normalization_lookups (list of ints): Map from each SNP to a normalization transform.
This list can be obtained from the BeadPoolManifest object.
Return:
The normalized intensities for the sample as a list of (x,y) float tuples
"""
normalization_transforms = self.get_normalization_transforms()
return [
normalization_transforms[lookup].normalize_intensities(x_raw, y_raw)
for (x_raw, y_raw, lookup) in zip(
self.get_raw_x_intensities(), self.get_raw_y_intensities(), normalization_lookups
)
]
[docs] def is_write_complete(self):
"""Check for last item written to GTC file to verify that write
has successfully completed
Args:
None
Returns
Whether or not write is complete (bool)
"""
if self.version == 3:
try:
self.get_num_intensity_only()
return True
except:
return False
elif self.version == 4:
try:
self.get_logr_ratios()
return True
except:
return False
elif self.version == 5:
try:
self.get_slide_identifier()
return True
except:
return False
else:
raise Exception(
"Unable to test for write completion on version " + str(self.version) + " GTC file"
)
class NormalizationTransform:
def __init__(self, buffer):
"""Constructor
Args:
buffer (string): byte string containing binary data for transform
Returns:
NormalizationTransform object
"""
(self.version,) = struct.unpack("<i", buffer[:4])
(
self.offset_x,
self.offset_y,
self.scale_x,
self.scale_y,
self.shear,
self.theta,
) = frombuffer(buffer[4:28], dtype=float32)
@staticmethod
def read_normalization_transform(handle):
"""Static helper function to read normalization transform from file handle
Args:
handle (file handle): File handle with position at start of normalization transform entry
Returns:
NormalizationTransform object
"""
return NormalizationTransform(handle.read(52))
@staticmethod
def rect_to_polar(x_y):
"""Converts normalized x,y intensities to (pseudo) polar coordinates (R, theta)
Args:
x_y (float, float): Normalized x,y intensities for probe
Returns:
(R,theta) polar values as tuple of floats
"""
x, y = x_y
if x == 0 and y == 0:
return (nan, nan)
return (x + y, arctan2(y, x) * 2.0 / pi)
def normalize_intensities(self, x, y, threshold=True):
"""Apply this normalization transform to raw intensities
Args:
x (int): Raw x intensities
y (int): Raw y intensities
Returns:
(xn, yn) normalized intensities as tuple of floats
"""
if x <= 0 and y <= 0:
return (nan, nan)
tempx = x - self.offset_x
tempy = y - self.offset_y
tempx2 = cos(self.theta) * tempx + sin(self.theta) * tempy
tempy2 = -sin(self.theta) * tempx + cos(self.theta) * tempy
tempx3 = tempx2 - self.shear * tempy2
tempy3 = tempy2
xn = tempx3 / self.scale_x
yn = tempy3 / self.scale_y
if threshold:
xn = 0 if 0 > xn else xn
yn = 0 if 0 > yn else yn
return (xn, yn)
class ScannerData:
def __init__(self, name, pmt_green, pmt_red, version, user):
"""Constructor
Args:
name (string): scanner identifier
pmt_green (int): gain setting (green channel)
pmt_red (int): gain setting (red channel)
version (string): version of scanner software
user (string): user of the scanner software
Returns:
ScannerData object
"""
self.name = name
self.pmt_green = pmt_green
self.pmt_red = pmt_red
self.version = version
self.user = user
[docs]class BeadPoolManifest:
"""Class for parsing binary (BPM) manifest file.
Attributes:
names (list of strings): Names of loci from manifest
snps (list of strings) : SNP values of loci from manifest
chroms (list of string) : Chromosome values for loci
map_infos = (list of ints) : Map info values for loci
addresses (list of ints): AddressA IDs of loci from manifest
normalization_lookups (list of ints): Normalization lookups from manifest. This indexes into
list of normalization transforms read from GTC file
ref_strands (list of ints) : Reference strand annotation for loci (see RefStrand class)
source_strands (list of ints) : Source strand annotations for loci (see SourceStrand class)
num_loci (int): Number of loci in manifest
manifest_name (string): Name of manifest
control_config (string): Control description from manifest
"""
def __init__(self, filename):
"""Constructor
Args:
filename (string): Locations of BPM (bead pool manifest) file
Returns:
BeadPoolManifest
"""
self.names = []
self.snps = []
self.chroms = []
self.map_infos = []
self.addresses = []
self.normalization_ids = []
self.assay_types = []
self.normalization_lookups = []
self.ref_strands = []
self.source_strands = []
self.num_loci = 0
self.manifest_name = ""
self.control_config = ""
self.__parse_file(filename)
def __parse_file(self, manifest_file):
"""Helper function to initialize this object from a file.
Args:
manifest_file (string): Location of BPM (bead pool manifest) file
Returns:
None
"""
with open(manifest_file, "rb") as manifest_handle:
header = manifest_handle.read(3)
if len(header) != 3 or header != b"BPM":
raise Exception("Invalid BPM format")
version = read_byte(manifest_handle)
if version != 1:
raise Exception("Unknown BPM version (" + str(ord(version)) + ")")
version = read_int(manifest_handle)
version_flag = 0x1000
if version & version_flag == version_flag:
version = version ^ version_flag
if version > 5 or version < 3:
raise Exception("Unsupported BPM version (" + str(version) + ")")
self.manifest_name = read_string(manifest_handle)
if version > 1:
self.control_config = read_string(manifest_handle)
self.num_loci = read_int(manifest_handle)
manifest_handle.seek(4 * self.num_loci, 1)
name_lookup = {}
for idx in range(self.num_loci):
self.names.append(read_string(manifest_handle))
name_lookup[self.names[-1]] = idx
for idx in range(self.num_loci):
normalization_id = read_byte(manifest_handle)
if normalization_id >= 100:
raise Exception("Manifest format error: read invalid normalization ID")
self.normalization_ids.append(normalization_id)
self.assay_types = [0] * self.num_loci
self.addresses = [0] * self.num_loci
self.snps = [""] * self.num_loci
self.chroms = [""] * self.num_loci
self.map_infos = [0] * self.num_loci
self.ref_strands = [RefStrand.Unknown] * self.num_loci
self.source_strands = [SourceStrand.Unknown] * self.num_loci
for idx in range(self.num_loci):
locus_entry = LocusEntry(manifest_handle)
self.assay_types[name_lookup[locus_entry.name]] = locus_entry.assay_type
self.addresses[name_lookup[locus_entry.name]] = locus_entry.address_a
self.snps[name_lookup[locus_entry.name]] = locus_entry.snp
self.chroms[name_lookup[locus_entry.name]] = locus_entry.chrom
self.map_infos[name_lookup[locus_entry.name]] = locus_entry.map_info
self.ref_strands[name_lookup[locus_entry.name]] = locus_entry.ref_strand
self.source_strands[name_lookup[locus_entry.name]] = locus_entry.source_strand
if len(self.normalization_ids) != len(self.assay_types):
raise Exception("Manifest format error: read invalid number of assay entries")
all_norm_ids = set()
for locus_idx in range(self.num_loci):
self.normalization_ids[locus_idx] += 100 * self.assay_types[locus_idx]
# To mimic the byte-wrapping behavior from GenomeStudio, AutoCall, IAAP take the mod of 256
self.normalization_ids[locus_idx] %= 256
all_norm_ids.add(self.normalization_ids[locus_idx])
sorted_norm_ids = sorted(all_norm_ids)
lookup_dictionary = {}
for idx in range(len(sorted_norm_ids)):
lookup_dictionary[sorted_norm_ids[idx]] = idx
self.normalization_lookups = [
lookup_dictionary[normalization_id] for normalization_id in self.normalization_ids
]
class SourceStrand:
Unknown = 0
Forward = 1
Reverse = 2
@staticmethod
def to_string(source_strand):
"""Get an integer representation of source strand annotation
Args:
source_strand (str) : string representation of source strand annotation (e.g., "F")
Returns:
int : int representation of source strand annotation (e.g. SourceStrand.Forward)
"""
if source_strand == SourceStrand.Unknown:
return "U"
elif source_strand == SourceStrand.Forward:
return "F"
elif source_strand == SourceStrand.Reverse:
return "R"
else:
raise Exception("Unexpected value for source strand " + source_strand)
@staticmethod
def from_string(source_strand):
"""Get a string representation of source strand annotation
Args:
source_strand (int) : int representation of source strand (e.g., SourceStrand.Forward)
Returns:
str : string representation of source strand annotation
"""
if source_strand == "U" or source_strand == "":
return SourceStrand.Unknown
if source_strand == "F":
return SourceStrand.Forward
elif source_strand == "R":
return SourceStrand.Reverse
else:
raise Exception("Unexpected value for source strand " + source_strand)
class RefStrand:
Unknown = 0
Plus = 1
Minus = 2
@staticmethod
def to_string(ref_strand):
"""Get a string reprensetation of ref strand annotation
Args:
ref_strand (int) : int representation of ref strand (e.g., RefStrand.Plus)
Returns:
str : string representation of reference strand annotation
"""
if ref_strand == RefStrand.Unknown:
return "U"
elif ref_strand == RefStrand.Plus:
return "+"
elif ref_strand == RefStrand.Minus:
return "-"
else:
raise Exception("Unexpected value for reference strand " + ref_strand)
@staticmethod
def from_string(ref_strand):
"""Get an integer representation of ref strand annotation
Args:
ref_strand (str) : string representation of reference strand annotation (e.g., "+")
Returns:
int : int representation of reference strand annotation (e.g. RefStrand.Plus)
"""
if ref_strand == "U" or ref_strand == "":
return RefStrand.Unknown
if ref_strand == "+":
return RefStrand.Plus
elif ref_strand == "-":
return RefStrand.Minus
else:
raise Exception("Unexpected value for reference strand " + ref_strand)
[docs]class LocusEntry:
"""Helper class representing a locus entry within a bead pool manifest. Current only support version
6,7, and 8.
Attributes:
ilmn_id (string) : IlmnID (probe identifier) of locus
name (string): Name (variant identifier) of locus
snp (string) : SNP value for locus (e.g., [A/C])
chrom (string) : Chromosome for the locus (e.g., XY)
map_info (int) : Mapping location of locus
assay_type (int) : Identifies type of assay (0 - Infinium II , 1 - Infinium I (A/T), 2 - Infinium I (G/C)
address_a (int) : AddressA ID of locus
address_b (int) : AddressB ID of locus (0 if none)
ref_strand (int) : See RefStrand class
source_strand (int) : See SourceStrand class
"""
def __init__(self, handle):
"""Constructor
Args:
handle (file handle): File handle at start of locus entry record
Returns:
LocusEntry
"""
self.ilmn_id = ""
self.name = ""
self.snp = ""
self.chrom = ""
self.map_info = -1
self.assay_type = -1
self.address_a = -1
self.address_b = -1
self.ref_strand = RefStrand.Unknown
self.source_strand = SourceStrand.Unknown
self.__parse_file(handle)
def __parse_file(self, handle):
"""Helper function to initialize this object from a file handle
Args:
handle (file handle): File handle at start of locus entry record
Returns:
None
"""
version = read_int(handle)
if version == 6:
self.__parse_locus_version_6(handle)
elif version == 7:
self.__parse_locus_version_7(handle)
elif version == 8:
self.__parse_locus_version_8(handle)
else:
raise Exception(
"Manifest format error: unknown version for locus entry (" + str(version) + ")"
)
def __parse_locus_version_6(self, handle):
"""Helper function to parse version 6 locus entry
Args:
handle (file handle): File handle at start of locus entry record
Returns:
None
"""
self.ilmn_id = read_string(handle)
self.source_strand = SourceStrand.from_string(self.ilmn_id.split("_")[-2])
self.name = read_string(handle)
for idx in range(3):
read_string(handle)
handle.read(4)
for idx in range(2):
read_string(handle)
self.snp = read_string(handle)
self.chrom = read_string(handle)
for idx in range(2):
read_string(handle)
self.map_info = int(read_string(handle))
for idx in range(2):
read_string(handle)
self.address_a = read_int(handle)
self.address_b = read_int(handle)
for idx in range(7):
read_string(handle)
handle.read(3)
self.assay_type = read_byte(handle)
if self.assay_type not in [0, 1, 2]:
raise Exception("Format error in reading assay type from locus entry")
if self.address_b == 0:
if self.assay_type != 0:
raise Exception("Manifest format error: Assay type is inconsistent with address B")
else:
if self.assay_type == 0:
raise Exception("Manifest format error: Assay type is inconsistent with address B")
def __parse_locus_version_7(self, handle):
"""Helper function to parse version 7 locus entry
Args:
handle (file handle): File handle at start of locus entry record
Returns:
None
"""
self.__parse_locus_version_6(handle)
handle.read(4 * 4)
def __parse_locus_version_8(self, handle):
"""Helper function to parse version 8 locus entry
Args:
handle (file handle): File handle at start of locus entry record
Returns:
None
"""
self.__parse_locus_version_7(handle)
self.ref_strand = RefStrand.from_string(read_string(handle))
complement_map = {"A": "T", "T": "A", "C": "G", "G": "C", "D": "D", "I": "I"}
[docs]def complement(nucleotide):
"""Complement a single nucleotide. Complements of D(eletion) and I(nsertion) are D and I, respectively.
Args:
nucleotide (string) : Nucleotide, must be A, C, T, G, D, or I
Returns:
str : Complemented nucleotide
"""
if nucleotide in complement_map:
return complement_map[nucleotide]
raise ValueError("Nucleotide must be one of A, C, T, G, D, or I")
[docs]def read_char(handle):
"""Helper function to parse character from file handle
Args:
handle (file handle): File handle
Returns:
char value read from handle
"""
return handle.read(1)
[docs]def read_ushort(handle):
"""Helper function to parse ushort from file handle
Args:
handle (file handle): File handle
Returns:
numpy.int16 value read from handle
"""
return frombuffer(handle.read(2), dtype=uint16)[0]
[docs]def read_int(handle):
"""Helper function to parse int from file handle
Args:
handle (file handle): File handle
Returns:
numpy.int32 value read from handle
"""
return struct.unpack("<i", handle.read(4))[0]
[docs]def read_float(handle):
"""Helper function to parse float from file handle
Args:
handle (file handle): File handle
Returns:
numpy.float32 value read from handle
"""
return frombuffer(handle.read(4), dtype=float32)[0]
[docs]def read_byte(handle):
"""Helper function to parse byte from file handle
Args:
handle (file handle): File handle
Returns:
byte value read from handle
"""
return struct.unpack("<B", handle.read(1))[0]
[docs]def read_string(handle):
"""Helper function to parse string from file handle. See https://msdn.microsoft.com/en-us/library/yzxa6408(v=vs.100).aspx
for additional details on string format.
Args:
handle (file handle): File handle
Returns:
string value read from handle
"""
total_length = 0
partial_length = read_byte(handle)
num_bytes = 0
while partial_length & 0x80 > 0:
total_length += (partial_length & 0x7F) << (7 * num_bytes)
partial_length = ord(struct.unpack("c", handle.read(1))[0])
num_bytes += 1
total_length += partial_length << (7 * num_bytes)
result = handle.read(total_length)
if len(result) < total_length:
raise Exception("Failed to read complete string")
else:
return result.decode()
[docs]def read_scanner_data(handle):
"""Helper function to parse ScannerData object from file handle.
Args:
handle (file handle): File handle
Returns:
ScannerData value read from handle
"""
name = read_string(handle)
pmt_green = read_int(handle)
pmt_red = read_int(handle)
scanner_version = read_string(handle)
imaging_user = read_string(handle)
return ScannerData(name, pmt_green, pmt_red, scanner_version, imaging_user)