Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 64 additions & 55 deletions seqBackupLib/illumina.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,51 +3,24 @@
from pathlib import Path


class IlluminaFastq:
MACHINE_TYPES = {
"VH": "Illumina-NextSeq",
"D": "Illumina-HiSeq",
"M": "Illumina-MiSeq",
"A": "Illumina-NovaSeq",
"NB": "Illumina-MiniSeq",
"LH": "Illumina-NovaSeqX",
}
MACHINE_TYPES = {
"VH": "Illumina-NextSeq",
"D": "Illumina-HiSeq",
"M": "Illumina-MiSeq",
"A": "Illumina-NovaSeq",
"NB": "Illumina-MiniSeq",
"LH": "Illumina-NovaSeqX",
}

def __init__(self, f: TextIOWrapper):
self.file = f
self.fastq_info = self._parse_header()
self.folder_info = self._parse_folder()

def __str__(self):
return "_".join(
[
self.fastq_info["instrument"],
self.fastq_info["run_number"],
self.fastq_info["flowcell_id"],
self.fastq_info["lane"],
]
)
def extract_instrument_code(instrument: str) -> str:
return "".join(filter(lambda x: not x.isdigit(), instrument))

def is_same_run(self, other: "IlluminaFastq") -> bool:
keys = ["run_number", "instrument", "flowcell_id"]
return all(self.fastq_info[k] == other.fastq_info[k] for k in keys)

def _parse_header(self) -> dict[str, str]:
line = next(self.file).strip()
if not line.startswith("@"):
raise ValueError("Not a FASTQ header line")
# Remove first character, @
line = line[1:]
word1, _, word2 = line.partition(" ")

keys1 = ("instrument", "run_number", "flowcell_id", "lane")
vals1 = dict((k, v) for k, v in zip(keys1, word1.split(":")))

keys2 = ("read", "is_filtered", "control_number", "index_reads")
vals2 = dict((k, v) for k, v in zip(keys2, word2.split(":")))

vals1.update(vals2)
return vals1
class IlluminaDir:
def __init__(self, run_name: str):
self.run_name = run_name
self.folder_info = self._parse_folder()

def _parse_folder(self) -> dict[str, str]:
# Extract directory name info
Expand All @@ -62,8 +35,9 @@ def _parse_folder(self) -> dict[str, str]:
raise ValueError(f"Invalid date format in run name: {date}")

instrument = parts[1]
if self._extract_instrument_code(instrument) not in self.MACHINE_TYPES:
if extract_instrument_code(instrument) not in MACHINE_TYPES:
raise ValueError(f"Invalid instrument code in run name: {instrument}")
self.machine_type = MACHINE_TYPES[extract_instrument_code(instrument)]

run_number = parts[2]
if not run_number.isdigit():
Expand All @@ -89,25 +63,56 @@ def _parse_folder(self) -> dict[str, str]:
):
vals1["flowcell_id"] = vals1["flowcell_id"][1:]

return vals1


class IlluminaFastq:
def __init__(self, f: TextIOWrapper):
self.file = f
self.fastq_info = self._parse_header()
self.folder_info = IlluminaDir(self.run_name).folder_info
self.folder_info.update(self._parse_fastq_file())

def __str__(self):
return "_".join(
[
self.fastq_info["instrument"],
self.fastq_info["run_number"],
self.fastq_info["flowcell_id"],
self.fastq_info["lane"],
]
)

def is_same_run(self, other: "IlluminaFastq") -> bool:
keys = ["run_number", "instrument", "flowcell_id"]
return all(self.fastq_info[k] == other.fastq_info[k] for k in keys)

def _parse_header(self) -> dict[str, str]:
line = next(self.file).strip()
if not line.startswith("@"):
raise ValueError("Not a FASTQ header line")
# Remove first character, @
line = line[1:]
word1, _, word2 = line.partition(" ")

keys1 = ("instrument", "run_number", "flowcell_id", "lane")
vals1 = dict((k, v) for k, v in zip(keys1, word1.split(":")))

keys2 = ("read", "is_filtered", "control_number", "index_reads")
vals2 = dict((k, v) for k, v in zip(keys2, word2.split(":")))

vals1.update(vals2)
return vals1

def _parse_fastq_file(self) -> dict[str, str]:
# Extract file name info
matches = re.match(
"Undetermined_S0_L00([1-8])_([RI])([12])_001.fastq.gz", self.filepath.name
)
keys2 = ("lane", "read_or_index", "read")
vals2 = dict((k, v) for k, v in zip(keys2, matches.groups()))

vals1.update(vals2)
return vals1

@staticmethod
def _extract_instrument_code(instrument: str) -> str:
return "".join(filter(lambda x: not x.isdigit(), instrument))

@property
def machine_type(self):
return self.MACHINE_TYPES[
self._extract_instrument_code(self.fastq_info["instrument"])
]
return vals2

@property
def lane(self) -> str:
Expand All @@ -117,14 +122,18 @@ def lane(self) -> str:
def filepath(self) -> Path:
return Path(self.file.name)

@property
def machine_type(self) -> str:
return MACHINE_TYPES[extract_instrument_code(self.fastq_info["instrument"])]

@property
def run_name(self) -> str:
for part in self.filepath.parts:
segments = part.split("_")
if (
len(segments) >= 4
and segments[0].isdigit()
and self._extract_instrument_code(segments[1]) in self.MACHINE_TYPES
and extract_instrument_code(segments[1]) in MACHINE_TYPES
and segments[2].isdigit()
):
return part
Expand Down
19 changes: 16 additions & 3 deletions test/test_illumina.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import pytest
from pathlib import Path
from seqBackupLib.backup import DEFAULT_MIN_FILE_SIZE
from seqBackupLib.illumina import IlluminaFastq
from seqBackupLib.illumina import IlluminaDir, IlluminaFastq, MACHINE_TYPES


machine_fixtures = {
Expand All @@ -15,7 +15,7 @@
}


@pytest.mark.parametrize("machine_type", IlluminaFastq.MACHINE_TYPES.keys())
@pytest.mark.parametrize("machine_type", MACHINE_TYPES.keys())
def test_illumina_fastq(machine_type, request):
fixture_name = machine_fixtures.get(machine_type)
if not fixture_name:
Expand All @@ -29,8 +29,21 @@ def test_illumina_fastq(machine_type, request):
r1 = IlluminaFastq(f)

print("FASTQ info: ", r1.fastq_info, "\nFolder info: ", r1.folder_info)
assert r1.machine_type == IlluminaFastq.MACHINE_TYPES[machine_type]
assert r1.machine_type == MACHINE_TYPES[machine_type]
assert r1.check_fp_vs_content()[0], r1.check_fp_vs_content()
assert not r1.check_file_size(DEFAULT_MIN_FILE_SIZE)
assert r1.check_file_size(100)
assert r1.check_index_read_exists()


@pytest.mark.parametrize("machine_type", MACHINE_TYPES.keys())
def test_illumina_dir(machine_type, request):
fixture_name = machine_fixtures.get(machine_type)
if not fixture_name:
raise ValueError(
f"All supported machine types must be tested. Missing: {machine_type}"
)

fp = request.getfixturevalue(fixture_name)

d = IlluminaDir(fp.name)