Source code for sctools.gtf
"""
GTF Records and Iterators
=========================
.. currentmodule:: sctools
This module defines a GTF record class and a Reader class to iterate over GTF-format files
Classes
-------
Record Data class that exposes GTF record fields by name
Reader GTF file reader that yields GTF Records
References
----------
https://useast.ensembl.org/info/website/upload/gff.html
"""
import logging
import string
import re
from typing import List, Dict, Generator, Iterable, Union, Set
from . import reader
_logger = logging.getLogger(__name__)
[docs]class GTFRecord:
"""Data class for storing and interacting with GTF records
Subclassed to produce exon, transcript, and gene-specific record types.
A GTF record has 8 fixed fields which are followed by optional fields separated by ;\t, which
are stored by this class in the attributes field and accessible by get_attribute. Fixed fields
are accessible by name.
Parameters
----------
record : str
an unparsed GTF record
Attributes
----------
seqname : str
The name of the sequence (often chromosome) this record is found on.
chromosome : str
Synonym for seqname.
source : str
The group responsible for generating this annotation.
feature : str
The type of record (e.g. gene, exon, ...).
start : str
The start position of this feature relative to the beginning of seqname.
end : str
The end position of this feature relative to the beginning of seqname....
score : str
The annotation score. Rarely used.
strand : {'+', '-'}
The strand of seqname that this annotation is found on
frame : {'0', '1', '2'}
'0' indicates that the first base of the feature is the first base of a codon,
'1' that the second base is the first base of a codon, and so on
size : int
the number of nucleotides spanned by this feature
Methods
-------
get_attribute(key: str)
attempt to retrieve a variable field with name equal to `key`
set_attribute(key: str, value: str)
set variable field `key` equal to `value`. Overwrites `key` if already present.
"""
__slots__ = ["_fields", "_attributes"]
_del_letters: str = string.ascii_letters
_del_non_letters: str = "".join(
set(string.printable).difference(string.ascii_letters)
)
def __init__(self, record: str):
fields: List[str] = record.strip(";\n").split("\t")
self._fields: List[str] = fields[:8]
self._attributes: Dict[str, str] = {}
for field in fields[8].split(";"):
try:
key, _, value = field.strip().partition(" ")
self._attributes[key] = value.strip('"')
except Exception:
raise RuntimeError(
f'Error parsing field "{field}" of GTF record "{record}"'
)
def __repr__(self):
return "<Record: %s>" % self.__str__()
def __bytes__(self):
return self.__str__().encode()
def __str__(self):
return "\t".join(self._fields) + self._format_attribute() + "\n"
def __hash__(self) -> int:
return hash(self.__str__())
def _format_attribute(self):
return " ".join('%s "%s";' % (k, v) for k, v in self._attributes.items())
@property
def seqname(self) -> str:
return self._fields[0]
@property
def chromosome(self) -> str:
return self._fields[0] # synonym for seqname
@property
def source(self) -> str:
return self._fields[1]
@property
def feature(self) -> str:
return self._fields[2]
@property
def start(self) -> int:
return int(self._fields[3])
@property
def end(self) -> int:
return int(self._fields[4])
@property
def score(self) -> str:
return self._fields[5]
@property
def strand(self) -> str:
return self._fields[6]
@property
def frame(self) -> str:
return self._fields[7]
@property
def size(self) -> int:
size = self.end - self.start
if size < 0:
raise ValueError(f"Invalid record: negative size {size} (start > end)")
else:
return size
[docs] def get_attribute(self, key) -> str:
"""access an item from the attribute field of a GTF file.
Parameters
----------
key : str
Item to retrieve
Returns
-------
value : str
Contents of variable attribute `key`
Raises
------
KeyError
if there is no variable attribute `key` associated with this record
"""
return self._attributes.get(key)
[docs] def set_attribute(self, key, value) -> None:
"""Set variable attribute `key` equal to `value`
If attribute `key` is already set for this record, its contents are overwritten by `value`
Parameters
----------
key : str
attribute name
value : str
attribute content
"""
self._attributes[key] = value
def __eq__(self, other):
return hash(self) == hash(other)
def __ne__(self, other):
return not self.__eq__(other)
[docs]class Reader(reader.Reader):
"""GTF file iterator
Parameters
----------
files : Union[str, List], optional
File(s) to read. If '-', read sys.stdin (default = '-')
mode : {'r', 'rb'}, optional
Open mode. If 'r', read strings. If 'rb', read bytes (default = 'r').
header_comment_char : str, optional
lines beginning with this character are skipped (default = '#')
Methods
-------
filter(retain_types: Iterable[str])
Iterate over a GTF file, only yielding records in `retain_types`.
__iter__()
iterate over GTF records in file, yielding `Record` objects
See Also
--------
sctools.reader.Reader
"""
def __init__(self, files="-", mode="r", header_comment_char="#"):
super().__init__(
files, mode, header_comment_char
) # has different default args from super
[docs] def filter(self, retain_types: Iterable[str]) -> Generator:
"""Iterate over a GTF file, returning only record whose feature type is in retain_types.
Features are stored in GTF field 2.
Parameters
----------
retain_types : Iterable[str]
Record feature types to retain.
Yields
------
gtf_record : Record
gtf `Record` object
"""
retain_types = set(retain_types)
for record in self:
if record.feature in retain_types:
yield record
# todo this lenient behavior is deemed to change in the future (warning -> exception)
def _resolve_multiple_gene_names(gene_name: str):
_logger.warning(
f'Multiple entries encountered for "{gene_name}". Please validate the input GTF file(s). '
f"Skipping the record for now; in the future, this will be considered as a "
f"malformed GTF file."
)
[docs]def get_mitochondrial_gene_names(
files: Union[str, List[str]] = "-", mode: str = "r", header_comment_char: str = "#"
) -> Set[str]:
"""Extract mitocholdrial gene names from GTF file(s) and returns a set of mitochondrial
gene id occurrence in the given file(s).
Parameters
----------
files : Union[str, List], optional
File(s) to read. If '-', read sys.stdin (default = '-')
mode : {'r', 'rb'}, optional
Open mode. If 'r', read strings. If 'rb', read bytes (default = 'r').
header_comment_char : str, optional
lines beginning with this character are skipped (default = '#')
Returns
-------
Set(str)
A set of the mitochondrial gene ids
"""
mitochondrial_gene_ids: Set[str] = set()
for record in Reader(files, mode, header_comment_char).filter(
retain_types=["gene"]
):
gene_name = record.get_attribute("gene_name")
gene_id = record.get_attribute("gene_id")
if gene_name is None:
raise ValueError(
f"Malformed GTF file detected. Record is of type gene but does not have a "
f'"gene_name" field: {record}'
)
if re.match("^mt-", gene_name, re.IGNORECASE):
if gene_id not in mitochondrial_gene_ids:
mitochondrial_gene_ids.add(gene_id)
return mitochondrial_gene_ids
[docs]def extract_gene_names(
files: Union[str, List[str]] = "-", mode: str = "r", header_comment_char: str = "#"
) -> Dict[str, int]:
"""Extract gene names from GTF file(s) and returns a map from gene names to their corresponding
occurrence orders in the given file(s).
Parameters
----------
files : Union[str, List], optional
File(s) to read. If '-', read sys.stdin (default = '-')
mode : {'r', 'rb'}, optional
Open mode. If 'r', read strings. If 'rb', read bytes (default = 'r').
header_comment_char : str, optional
lines beginning with this character are skipped (default = '#')
Returns
-------
Dict[str, int]
A map from gene names to their linear index
"""
gene_name_to_index: Dict[str, int] = dict()
gene_index = 0
for record in Reader(files, mode, header_comment_char).filter(
retain_types=["gene"]
):
gene_name = record.get_attribute("gene_name")
if gene_name is None:
raise ValueError(
f"Malformed GTF file detected. Record is of type gene but does not have a "
f'"gene_name" field: {record}'
)
if gene_name in gene_name_to_index:
_resolve_multiple_gene_names(gene_name)
continue
gene_name_to_index[gene_name] = gene_index
gene_index += 1
return gene_name_to_index
[docs]def extract_extended_gene_names(
files: Union[str, List[str]] = "-", mode: str = "r", header_comment_char: str = "#"
) -> Dict[str, List[tuple]]:
"""Extract extended gene names from GTF file(s) and returns a map from gene names to their corresponding
occurrence locations the given file(s).
Parameters
----------
files : Union[str, List], optional
File(s) to read. If '-', read sys.stdin (default = '-')
mode : {'r', 'rb'}, optional
Open mode. If 'r', read strings. If 'rb', read bytes (default = 'r').
header_comment_char : str, optional
lines beginning with this character are skipped (default = '#')
Returns
-------
Dict[str, List[tuple]]
A dictionary of chromosome names mapping to a List of tuples, each containing
a range as the the first element and a gene name as the second.
Dict[str, List(Tuple((start,end), gene)))
"""
gene_name_to_start_end = dict()
for record in Reader(files, mode, header_comment_char).filter(
retain_types=["gene"]
):
gene_name = record.get_attribute("gene_name")
if gene_name is None:
raise ValueError(
f"Malformed GTF file detected. Record is of type gene but does not have a "
f'"gene_name" field: {record}'
)
# find gene collisions
if gene_name in gene_name_to_start_end:
_resolve_multiple_gene_names(gene_name)
continue
if record.chromosome not in gene_name_to_start_end:
gene_name_to_start_end[record.chromosome] = dict()
gene_name_to_start_end[record.chromosome][gene_name] = (
record.start,
record.end,
)
gene_locations = dict()
# For each chromosome invert the map to be in List[( (start,end), genename )] and sort it by start
for chromosome in gene_name_to_start_end:
gene_locations[chromosome] = [
(locs, key) for key, locs in gene_name_to_start_end[chromosome].items()
]
# Sort by starting location
gene_locations[chromosome].sort(key=lambda x: x[0])
return gene_locations
[docs]def extract_gene_exons(
files: Union[str, List[str]] = "-", mode: str = "r", header_comment_char: str = "#"
) -> Dict[str, List[tuple]]:
"""Extract extended gene names from GTF file(s) and returns a map from gene names to the the
list of exons in the ascending order of the start positions file(s).
Parameters
----------
files : Union[str, List], optional
File(s) to read. If '-', read sys.stdin (default = '-')
mode : {'r', 'rb'}, optional
Open mode. If 'r', read strings. If 'rb', read bytes (default = 'r').
header_comment_char : str, optional
lines beginning with this character are skipped (default = '#')
Returns
-------
Dict[str, List[tuple]]
A dictionary of chromosome names mapping to a List of tuples, each containing
a the exons in the ascending order of the start positions.
Dict[str, List(Tuple((start,end), gene)))
"""
gene_name_to_start_end = dict()
for record in Reader(files, mode, header_comment_char).filter(
retain_types=["exon"]
):
gene_name = record.get_attribute("gene_name")
if gene_name is None:
raise ValueError(
f"Malformed GTF file detected. Record is of type gene but does not have a "
f'"gene_name" field: {record}'
)
if record.chromosome not in gene_name_to_start_end:
gene_name_to_start_end[record.chromosome] = dict()
if gene_name not in gene_name_to_start_end[record.chromosome]:
gene_name_to_start_end[record.chromosome][gene_name] = []
gene_name_to_start_end[record.chromosome][gene_name].append(
(record.start, record.end)
)
gene_locations_exons = dict()
# For each chromosome invert the map to be in List[( (start,end), genename )] and sort it by start
for chromosome in gene_name_to_start_end:
gene_locations_exons[chromosome] = [
(locs, key) for key, locs in gene_name_to_start_end[chromosome].items()
]
# Sort by starting location
gene_locations_exons[chromosome].sort(key=lambda x: x[0])
return gene_locations_exons