import os
import string
from functools import partial
from itertools import product
import pytest
from .. import fastq, consts
from ..reader import zip_readers
# set some useful globals for testing
data_dir = os.path.split(__file__)[0] + "/data/"
_i7_files = [
data_dir + f for f in ("test_i7.fastq", "test_i7.fastq.gz", "test_i7.fastq.bz2")
]
_files = [data_dir + f for f in ("test_i7.fastq", "test_r1.fastq", "test_r2.fastq")]
_gz_files = [
data_dir + f for f in ("test_i7.fastq.gz", "test_r1.fastq.gz", "test_r2.fastq.gz")
]
_bz2_files = [
data_dir + f
for f in ("test_i7.fastq.bz2", "test_r1.fastq.bz2", "test_r2.fastq.bz2")
]
_modes = ("r", "rb")
_files_and_modes = list(product(_i7_files, _modes))
_multifiles_and_modes = list(product((_files, _gz_files, _bz2_files), _modes))
_map_encoder = {"r": str, "rb": partial(bytes, encoding="utf-8")}
# TEST READER
[docs]@pytest.fixture(scope="module", params=_files_and_modes)
def i7_files_compressions_and_modes(request):
"""generates different compression types and modes for testing"""
return request.param[0], request.param[1]
[docs]@pytest.fixture(scope="module", params=_multifiles_and_modes)
def reader_all_compressions(request):
"""generates open fastq reader files for each compression and read mode"""
return fastq.Reader(request.param[0], request.param[1])
[docs]@pytest.fixture(scope="module")
def bytes_fastq_record():
return [b"@name\n", b"ACTACAAT\n", b"+\n", b"%%%%AAAA\n"]
[docs]@pytest.fixture(scope="module")
def string_fastq_record():
return ["@name\n", "ACTACAAT\n", "+\n", "%%%%AAAA\n"]
[docs]def test_reader_stores_filenames():
names = ["notreal", "fake"]
rd = fastq.Reader(files=names)
assert rd.filenames == names
[docs]def test_reader_reads_first_record(reader_all_compressions):
for record in reader_all_compressions:
assert isinstance(record, fastq.Record)
expected_result = (
"NCACAATG\n" if isinstance(record.sequence, str) else b"NCACAATG\n"
)
assert record.sequence == expected_result
break # just first record
[docs]def test_reader_reads_correct_number_of_records_across_multiple_files(
reader_all_compressions,
):
assert len(reader_all_compressions) == 300 # 3 files
[docs]def test_mixed_filetype_read_gets_correct_record_number():
rd = fastq.Reader([_gz_files[0], _bz2_files[0]], mode="r", header_comment_char="#")
assert len(rd) == 200
[docs]def test_non_string_filename_raises_typeerror():
with pytest.raises(TypeError):
_ = fastq.Reader(10, "r")
[docs]def test_non_string_filename_in_iterable_raises_typeerror():
with pytest.raises(TypeError):
_ = fastq.Reader(("works", 10), "r")
[docs]def test_invalid_open_mode_raises_valueerror():
with pytest.raises(ValueError):
_ = fastq.Reader("works", "not_acceptable_open_mode")
[docs]def test_fastq_returns_correct_filesize_for_single_and_multiple_files():
rd = fastq.Reader(
_i7_files[0], mode="r", header_comment_char="#" # mode irrelevant
)
assert rd.size == 7774
rd = fastq.Reader(_i7_files, mode="r", header_comment_char="#") # mode irrelevant
assert rd.size == 7774 + 853 + 802 # three file sizes
[docs]def test_reader_properly_subsets_based_on_indices():
rd = fastq.Reader(_i7_files[0], mode="r")
indices = {0, 5, 10, 12}
n_records = sum(1 for _ in rd.select_record_indices(indices))
assert n_records == len(indices)
[docs]def test_zipping_readers_generates_expected_output():
rd1 = fastq.Reader(_files[0], "r")
rd2 = fastq.Reader(_files[0], "r")
for r1, r2 in zip_readers(rd1, rd2):
assert isinstance(r1, fastq.Record)
assert isinstance(r2, fastq.Record)
expected_result = "NCACAATG\n"
assert r1.sequence == r2.sequence == expected_result
break # just first record
[docs]def test_zipping_readers_with_indices_generates_expected_output():
rd1 = fastq.Reader(_files[0], "r")
rd2 = fastq.Reader(_files[0], "r")
indices = {0, 1, 2, 3}
for r1, r2 in zip_readers(rd1, rd2, indices=indices):
assert isinstance(r1, fastq.Record)
assert isinstance(r2, fastq.Record)
expected_result = "NCACAATG\n"
assert r1.sequence == r2.sequence == expected_result
break # just first record
[docs]def test_printing_bytes_record_generates_valid_fastq_record(bytes_fastq_record):
record = fastq.Record(bytes_fastq_record)
assert str(record) == b"".join(bytes_fastq_record).decode()
assert bytes(record) == b"".join(bytes_fastq_record)
[docs]def test_bytes_fastq_record_quality_score_parsing(bytes_fastq_record):
record = fastq.Record(bytes_fastq_record)
assert record.average_quality() == 18
[docs]def test_printing_string_record_generates_valid_fastq_record(string_fastq_record):
record = fastq.StrRecord(string_fastq_record)
assert str(record) == "".join(string_fastq_record)
assert bytes(record) == "".join(string_fastq_record).encode()
[docs]def test_string_fastq_record_quality_score_parsing(string_fastq_record):
record = fastq.StrRecord(string_fastq_record)
assert record.average_quality() == 18
# TEST RECORD
[docs]def test_fields_populate_properly(reader_all_compressions):
encoder = _map_encoder[reader_all_compressions._mode]
name_prefix = encoder("@")
alphabet = set(encoder("ACGTN"))
name2_string = encoder("+\n")
ascii_chars = set(i for i in encoder(string.printable))
for record in reader_all_compressions:
assert record.name.startswith(name_prefix)
assert all(i in alphabet for i in record.sequence.strip())
assert record.name2 == name2_string
assert all(i in ascii_chars for i in record.quality.strip())
# TEST BarcodeGeneratorWithCorrectedCellbarcodes
[docs]@pytest.fixture(scope="function")
def embedded_barcode_generator():
cell_barcode = fastq.EmbeddedBarcode(
start=0,
end=16,
quality_tag=consts.QUALITY_CELL_BARCODE_TAG_KEY,
sequence_tag=consts.RAW_CELL_BARCODE_TAG_KEY,
)
molecule_barcode = fastq.EmbeddedBarcode(
start=16,
end=26,
quality_tag=consts.QUALITY_MOLECULE_BARCODE_TAG_KEY,
sequence_tag=consts.RAW_MOLECULE_BARCODE_TAG_KEY,
)
return fastq.EmbeddedBarcodeGenerator(
data_dir + "test_r1.fastq.gz", [cell_barcode, molecule_barcode]
)
[docs]@pytest.fixture(scope="function")
def barcode_generator_with_corrected_cell_barcodes():
cell_barcode = fastq.EmbeddedBarcode(
start=0,
end=16,
quality_tag=consts.QUALITY_CELL_BARCODE_TAG_KEY,
sequence_tag=consts.RAW_CELL_BARCODE_TAG_KEY,
)
molecule_barcode = fastq.EmbeddedBarcode(
start=16,
end=26,
quality_tag=consts.QUALITY_MOLECULE_BARCODE_TAG_KEY,
sequence_tag=consts.RAW_MOLECULE_BARCODE_TAG_KEY,
)
return fastq.BarcodeGeneratorWithCorrectedCellBarcodes(
data_dir + "test_r1.fastq.gz",
cell_barcode,
data_dir + "1k-august-2016.txt",
[molecule_barcode],
)
[docs]def test_embedded_barcode_generator_produces_outputs_of_expected_size(
embedded_barcode_generator,
):
for cell_seq, cell_qual, umi_seq, umi_qual in embedded_barcode_generator:
# correct values
correct_cell_barcode_length = 16
correct_umi_length = 10
# note that all barcodes are strings and therefore should get 'Z' values
# test cell tags
assert cell_seq[0] == consts.RAW_CELL_BARCODE_TAG_KEY
assert len(cell_seq[1]) == correct_cell_barcode_length
assert all(v in "ACGTN" for v in cell_seq[1])
assert cell_seq[2] == "Z"
assert cell_qual[0] == consts.QUALITY_CELL_BARCODE_TAG_KEY
assert len(cell_qual[1]) == correct_cell_barcode_length
assert all(v in string.printable for v in cell_qual[1])
assert cell_seq[2] == "Z"
# test umi tags
assert umi_seq[0] == consts.RAW_MOLECULE_BARCODE_TAG_KEY
assert len(umi_seq[1]) == correct_umi_length
assert all(v in "ACGTN" for v in umi_seq[1])
assert umi_seq[2] == "Z"
assert umi_qual[0] == consts.QUALITY_MOLECULE_BARCODE_TAG_KEY
assert len(umi_qual[1]) == correct_umi_length
assert all(v in string.printable for v in umi_qual[1])
assert umi_seq[2] == "Z"
break # just the first tag is fine
[docs]def test_corrects_barcodes(barcode_generator_with_corrected_cell_barcodes):
success = False
for barcode_sets in barcode_generator_with_corrected_cell_barcodes:
for barcode_set in barcode_sets:
if barcode_set[0] == consts.CELL_BARCODE_TAG_KEY:
success = True
break
assert success