Source code for sctools.reader

"""
Sequence File Iterators
=======================

.. currentmodule:: sctools

This module defines a general iterator and some helper functions for iterating over files
that contain sequencing data

Methods
-------
infer_open(file_: str, mode: str)
    helper function that determines the compression type of a file without relying on its extension
zip_readers(*readers, indices=None)
    helper function that iterates over one or more readers, optionally extracting only the records
    that correspond to indices

Classes
-------
Reader          Basic reader that loops over one or more input files.

See Also
--------
sctools.gtf.Reader
sctools.fastq.Reader

"""

import os
import gzip
import bz2
from copy import copy
from functools import partial
from typing import Callable, Iterable, Generator, Set, List


[docs]def infer_open(file_: str, mode: str) -> Callable: """Helper function to infer the correct compression type of an input file Identifies files that are .gz or .bz2 compressed without requiring file extensions Parameters ---------- file_ : str the file to open mode : {'r', 'rb'} the mode to open the file in. 'r' returns strings, 'rb' returns bytes Returns ------- open_function : Callable the correct open function for the file's compression with mode pre-set through functools partial """ with open(file_, "rb") as f: data: bytes = f.read(3) # gz and bzip treat 'r' = bytes, 'rt' = string if data[:2] == b"\x1f\x8b": # gzip magic number inferred_openhook: Callable = gzip.open inferred_mode: str = "rt" if mode == "r" else mode elif data == b"BZh": # bz2 magic number inferred_openhook: Callable = bz2.open inferred_mode: str = "rt" if mode == "r" else mode else: inferred_openhook: Callable = open inferred_mode: str = mode return partial(inferred_openhook, mode=inferred_mode)
[docs]class Reader: """Basic reader object that seamlessly loops over multiple input files. Is subclassed to create readers for specific file types (e.g. fastq, gtf, etc.) Parameters ---------- files : Union[str, List], optional The file(s) to read. If '-', read sys.stdin (default = '-') mode : {'r', 'rb'}, optional The open mode for files. If 'r', yield string data, if 'rb', yield bytes data (default = 'r'). header_comment_char : str, optional If not None, skip lines beginning with this character (default = None). """ def __init__(self, files="-", mode="r", header_comment_char=None): if isinstance(files, str): self._files = [files] elif isinstance(files, Iterable): # test items of iterable files = list(files) if all(isinstance(f, str) for f in files): self._files = files else: raise TypeError("All passed files must be type str") else: raise TypeError("Files must be a string filename or a list of such names.") # set open mode: if mode not in {"r", "rb"}: raise ValueError("Mode must be one of 'r', 'rb'") self._mode = mode if isinstance(header_comment_char, str) and mode == "rb": self._header_comment_char = header_comment_char.encode() else: self._header_comment_char = header_comment_char @property def filenames(self) -> List[str]: return self._files def __len__(self): """Return the length of the Reader object. Notes ----- This function requires reading the complete file, and should typically not be used with sys.stdin, as it will consume the input. """ return sum(1 for _ in self) def __iter__(self): for file_ in self._files: f = infer_open(file_, self._mode)(file_) # iterate over the file, dropping header lines if requested try: file_iterator = iter(f) if self._header_comment_char is not None: first_record = next(file_iterator) while first_record.startswith(self._header_comment_char): first_record = next(file_iterator) yield first_record # avoid loss of first non-comment line for record in file_iterator: # now, run to exhaustion yield record finally: # clean up f.close() @property def size(self) -> int: """return the collective size of all files being read in bytes""" return sum(os.stat(f).st_size for f in self._files)
[docs] def select_record_indices(self, indices: Set) -> Generator: """Iterate over provided indices only, skipping other records. Parameters ---------- indices : Set[int] indices to include in the output Yields ------ record, str records from file corresponding to indices """ indices = copy( indices ) # passed indices is a reference, need own copy to modify for idx, record in enumerate(self): if idx in indices: yield record indices.remove(idx) # stopping condition if not indices: break
[docs]def zip_readers(*readers, indices=None) -> Generator: """Zip together multiple reader objects, yielding records simultaneously. If indices is passed, only return lines in file that correspond to indices Parameters ---------- *readers : List[Reader] Reader objects to simultaneously iterate over indices : Set[int], optional indices to include in the output Yields ------ records : Tuple[str] one record per reader passed """ if indices: iterators = zip(*(r.select_record_indices(indices) for r in readers)) else: iterators = zip(*readers) for record_tuple in iterators: yield record_tuple