"""
Nucleotide Barcode Manipulation Tools
=====================================
.. currentmodule:: sctools
This module contains tools to characterize oligonucleotide barcodes and a simple hamming-base
error-correction approach which corrects barcodes within a specified distance of a "whitelist" of
expected barcodes.
Classes
-------
Barcodes Class to characterize a set of barcodes
ErrorsToCorrectBarcodesMap Class to carry out error correction routines
"""
import itertools
from collections import Counter
from typing import Mapping, Iterator, List, Tuple, Iterable
import numpy as np
import pysam
from . import consts
from .encodings import TwoBit
from .stats import base4_entropy
[docs]class Barcodes:
"""Container for a set of nucleotide barcodes.
Contained barcodes are encoded in 2bit representation for fast operations. Instances of this
class can optionally be constructed from an iterable where barcodes can be present multiple
times. In these cases, barcodes are analyzed based on their observed frequencies.
Parameters
----------
barcodes: Mapping[str, int]
dictionary-like mapping barcodes to the number of times they were observed
barcode_length: int
the length of all barcodes in the set. Different-length barcodes are not supported.
See Also
--------
sctools.encodings.TwoBit
"""
def __init__(self, barcodes: Mapping[str, int], barcode_length: int):
if not isinstance(barcodes, Mapping):
raise TypeError(
'The argument "barcodes" must be a dict-like object mapping barcodes to counts'
)
self._mapping: Mapping[str, int] = barcodes
if not isinstance(barcode_length, int) and barcode_length > 0:
raise ValueError('The argument "barcode_length" must be a positive integer')
self._barcode_length: int = barcode_length
def __contains__(self, item) -> bool:
return item in self._mapping
def __iter__(self) -> Iterator[str]:
return iter(self._mapping)
def __len__(self) -> int:
return len(self._mapping)
def __getitem__(self, item) -> int:
return self._mapping[item]
[docs] def summarize_hamming_distances(self) -> Mapping[str, float]:
"""Returns descriptive statistics on hamming distances between pairs of barcodes.
Returns
-------
descriptive_statistics : Mapping[str, float]
minimum, 25th percentile, median, 75th percentile, maximum, and average hamming
distance between all pairs of barcodes
References
----------
https://en.wikipedia.org/wiki/Hamming_distance
"""
distances: List = []
for a, b in itertools.combinations(self, 2):
distances.append(TwoBit.hamming_distance(a, b))
keys: Tuple = (
"minimum",
"25th percentile",
"median",
"75th percentile",
"maximum",
"average",
)
values: List = list(np.percentile(distances, [0, 25, 50, 75, 100]))
values.append(np.mean(distances))
return dict(zip(keys, values))
[docs] def base_frequency(self, weighted=False) -> np.ndarray:
"""return the frequency of each base at each position in the barcode set
Notes
-----
weighting is currently not supported, and must be set to False or base_frequency will raise
NotImplementedError # todo fix
Parameters
----------
weighted: bool, optional
if True, each barcode is counted once for each time it was observed (default = False)
Returns
-------
frequencies : np.array
barcode_length x 4 2d numpy array
Raises
------
NotImplementedError
if weighted is True
"""
base_counts_by_position: np.ndarray = np.zeros(
(self._barcode_length, 4), dtype=np.uint64
)
keys: np.ndarray = np.fromiter(self._mapping.keys(), dtype=np.uint64)
for i in reversed(range(self._barcode_length)):
binary_base_representations, counts = np.unique(
keys & 3, return_counts=True
)
if weighted:
raise NotImplementedError
else:
base_counts_by_position[i, binary_base_representations] = counts
# finished with this nulceotide, move two bits forward to the next one
keys >>= 2
return base_counts_by_position
[docs] def effective_diversity(self, weighted=False) -> np.ndarray:
"""Returns the effective base diversity of the barcode set by position.
maximum diversity for each position is 1, and represents a perfect split of 25% per base at
a given position.
Parameters
----------
weighted : bool, optional
if True, each barcode is counted once for each time it was observed (default = False)
Returns
-------
effective_diversity : np.array[float]
1-d array of size barcode_length containing floats in [0, 1]
"""
return base4_entropy(self.base_frequency(weighted=weighted))
[docs] @classmethod
def from_whitelist(cls, file_: str, barcode_length: int):
"""Creates a barcode set from a whitelist file.
Parameters
----------
file_ : str
location of the whitelist file. Should be formatted one barcode per line. Barcodes
should be encoded in plain text (UTF-8, ASCII), not bit-encoded. Each barcode will be
assigned a count of 1.
barcode_length : int
Length of the barcodes in the file.
Returns
-------
barcodes : Barcodes
class object containing barcodes from a whitelist file
"""
tbe = TwoBit(barcode_length)
with open(file_, "rb") as f:
return cls(
Counter(tbe.encode(barcode[:-1]) for barcode in f), barcode_length
)
[docs] @classmethod
def from_iterable_encoded(cls, iterable: Iterable[int], barcode_length: int):
"""Construct an ObservedBarcodeSet from an iterable of encoded barcodes.
Parameters
----------
iterable : Iterable[int]
iterable of barcodes encoded in TwoBit representation
barcode_length : int
the length of the barcodes in `iterable`
Returns
-------
barcodes : Barcodes
class object containing barcodes from a whitelist file
"""
return cls(Counter(iterable), barcode_length=barcode_length)
[docs] @classmethod
def from_iterable_strings(cls, iterable: Iterable[str], barcode_length: int):
"""Construct an ObservedBarcodeSet from an iterable of string barcodes.
Parameters
----------
iterable : Iterable[str]
iterable of barcodes encoded in TwoBit representation
barcode_length : int
the length of the barcodes in `iterable`
Returns
-------
barcodes : Barcodes
class object containing barcodes from a whitelist file
"""
tbe: TwoBit = TwoBit(barcode_length)
return cls(
Counter(tbe.encode(b.encode()) for b in iterable),
barcode_length=barcode_length,
)
[docs] @classmethod
def from_iterable_bytes(cls, iterable: Iterable[bytes], barcode_length: int):
"""Construct an ObservedBarcodeSet from an iterable of bytes barcodes.
Parameters
----------
iterable : Iterable[bytes]
iterable of barcodes in bytes representation
barcode_length : int
the length of the barcodes in `iterable`
Returns
-------
barcodes : Barcodes
class object containing barcodes from a whitelist file
"""
tbe: TwoBit = TwoBit(barcode_length)
return cls(
Counter(tbe.encode(b) for b in iterable), barcode_length=barcode_length
)
[docs]class ErrorsToCorrectBarcodesMap:
"""Correct any barcode that is within one hamming distance of a whitelisted barcode
Parameters
----------
errors_to_barcodes : Mapping[str, str]
dict-like mapping 1-base errors to the whitelist barcode that they could be generated from
Methods
-------
get_corrected_barcode(barcode: str)
Return a barcode if it is whitelist, or the corrected version if within edit distance 1
correct_bam(bam_file: str, output_bam_file: str)
correct barcodes in a bam file, given a whitelist
References
----------
https://en.wikipedia.org/wiki/Hamming_distance
"""
def __init__(self, errors_to_barcodes: Mapping[str, str]):
if not isinstance(errors_to_barcodes, Mapping):
raise TypeError(
f'The argument "errors_to_barcodes" must be a mapping of erroneous barcodes to correct '
f"barcodes, not {type(errors_to_barcodes)}"
)
self._map = errors_to_barcodes
[docs] def get_corrected_barcode(self, barcode: str) -> str:
"""Return a barcode if it is whitelist, or the corrected version if within edit distance 1
Parameters
----------
barcode : str
the barcode to return the corrected version of. If the barcode is in the whitelist,
the input barcode is returned unchanged.
Returns
-------
corrected_barcode : str
corrected version of the barcode
Raises
------
KeyError
if the passed barcode is not within 1 hamming distance of any whitelist barcode
References
----------
https://en.wikipedia.org/wiki/Hamming_distance
"""
return self._map[barcode]
@staticmethod
def _prepare_single_base_error_hash_table(
barcodes: Iterable[str],
) -> Mapping[str, str]:
"""Generate a map of correct barcodes and single base error codes to whitelist barcodes
Parameters
----------
barcodes : Iterable[str]
:param Iterable barcodes: iterable of string barcodes
:return dict: mapping between erroneous barcodes with single-base mutations and the barcode
they were generated from
"""
error_map = {}
for barcode in barcodes:
# include correct barcode
error_map[barcode] = barcode
# include all single-base errors
for i, nucleotide in enumerate(barcode):
errors = set("ACGTN")
errors.discard(nucleotide)
for e in errors:
error_map[barcode[:i] + e + barcode[i + 1 :]] = barcode
return error_map
[docs] @classmethod
def single_hamming_errors_from_whitelist(cls, whitelist_file: str):
"""Factory method to generate instance of class from a file containing "correct" barcodes.
Parameters
----------
whitelist_file : str
Text file containing barcode per line.
Returns
-------
errors_to_barcodes_map : ErrorsToCorrectBarcodesMap
instance of cls, built from whitelist
"""
with open(whitelist_file, "r") as f:
return cls(
cls._prepare_single_base_error_hash_table((line[:-1] for line in f))
)
[docs] def correct_bam(self, bam_file: str, output_bam_file: str) -> None:
"""Correct barcodes in a (potentially unaligned) bamfile, given a whitelist.
Parameters
----------
bam_file : str
BAM format file in same order as the fastq files
output_bam_file : str
BAM format file containing cell, umi, and sample tags.
"""
with pysam.AlignmentFile(bam_file, "rb") as fin, pysam.AlignmentFile(
output_bam_file, "wb", template=fin
) as fout:
for alignment in fin:
try:
tag = self.get_corrected_barcode(alignment.get_tag("CR"))
except KeyError: # pass through the uncorrected barcode.
tag = alignment.get_tag(consts.RAW_CELL_BARCODE_TAG_KEY)
alignment.set_tag(
tag=consts.CELL_BARCODE_TAG_KEY, value=tag, value_type="Z"
)
fout.write(alignment)