From 1e4a4d22accb66ea165b5fcca0c10224003577e1 Mon Sep 17 00:00:00 2001 From: hingbong Date: Thu, 9 Oct 2025 11:47:40 +0800 Subject: [PATCH] feat: enhance encoding handling with best-effort decoding --- src/oxmsg/domain/encodings.py | 8 ++--- src/oxmsg/properties.py | 59 +++++++++++++++++++++++++++++++++-- 2 files changed, 60 insertions(+), 7 deletions(-) diff --git a/src/oxmsg/domain/encodings.py b/src/oxmsg/domain/encodings.py index 7edb08a..ee80fd5 100644 --- a/src/oxmsg/domain/encodings.py +++ b/src/oxmsg/domain/encodings.py @@ -60,12 +60,12 @@ def encoding_from_codepage(codepage: int) -> str: 866: "cp866", # -- OEM Russian -- 869: "ibm869", # -- OEM Modern Greek -- 870: "cp870", # -- IBM EBCDIC Multilingual/ROECE (Latin 2) -- - 874: "windows-874", # -- ANSI/OEM Thai (ISO 8859-11) -- + 874: "cp874", # -- ANSI/OEM Thai (ISO 8859-11) -- 875: "cp875", # -- IBM EBCDIC Greek Modern -- 932: "shift_jis", # -- ANSI/OEM Japanese -- - 936: "gb2312", # -- ANSI/OEM Simplified Chinese (PRC, Singapore) -- - 949: "ks_c_5601-1987", # -- ANSI/OEM Korean (Unified Hangul Code) -- - 950: "windows-950", # -- ANSI/OEM Traditional Chinese (Taiwan, Hong Kong SAR, PRC) -- + 936: "gb18030", # -- ANSI/OEM Simplified Chinese (PRC, Singapore) -- + 949: "cp949", # -- ANSI/OEM Korean (Unified Hangul Code) -- + 950: "cp950", # -- ANSI/OEM Traditional Chinese (Taiwan, Hong Kong SAR, PRC) -- 1026: "IBM1026", # -- IBM EBCDIC Turkish (Latin 5) -- 1047: "cp1047", # -- IBM EBCDIC Latin 1/Open System -- 1140: "cp1140", # -- IBM EBCDIC US-Canada (037 + Euro symbol) -- diff --git a/src/oxmsg/properties.py b/src/oxmsg/properties.py index f5f2f61..8b5e558 100644 --- a/src/oxmsg/properties.py +++ b/src/oxmsg/properties.py @@ -408,9 +408,9 @@ def value(self) -> str: The caller is responsible for determining the encoding and applying it to get a str value. """ - return self._storage.property_stream_bytes(self.pid, self.ptyp).decode( - self._body_encoding if self.pid == c.PID_BODY else self._str_prop_encoding - ) + raw_value = self._storage.property_stream_bytes(self.pid, self.ptyp) + encoding = self._body_encoding if self.pid == c.PID_BODY else self._str_prop_encoding + return _decode_best_effort(raw_value, encoding) class TimeProperty(BaseProperty): @@ -427,6 +427,59 @@ def value(self) -> dt.datetime: return epoch + dt.timedelta(seconds=seconds_since_epoch) +def _decode_best_effort(raw_value: bytes, primary_encoding: str) -> str: + """Decode `raw_value` using `primary_encoding` with pragmatic fallbacks. + + Some MSG files are observed to declare an encoding but store bytes that do not strictly + conform. Rather than fail, try a small set of compatible encodings and, as a last resort, + decode with replacement characters so that parsing succeeds. + """ + candidates = _candidate_encodings(primary_encoding) + for encoding in candidates: + try: + return raw_value.decode(encoding) + except LookupError: + continue + except UnicodeDecodeError: + continue + + for encoding in candidates: + try: + return raw_value.decode(encoding, errors="replace") + except LookupError: + continue + + return raw_value.decode("latin-1", errors="replace") + + +def _candidate_encodings(primary_encoding: str) -> tuple[str, ...]: + """Return preferred encodings to try for a declared `primary_encoding`.""" + encoding = primary_encoding or "latin-1" + lower = encoding.lower() + + choices: list[str] = [encoding] + + if lower in {"utf-8", "utf8", "utf-8-sig"}: + choices.extend(["cp1252", "latin-1"]) + elif lower in {"iso-8859-1", "iso8859-1", "iso-8859-15", "iso8859-15", "latin-1"}: + choices.append("cp1252") + elif lower in {"gb2312", "gbk", "gb18030", "euc-cn", "euc_cn"}: + choices.append("gb18030") + elif lower in {"iso-2022-jp", "iso2022_jp", "csiso2022jp"}: + choices.extend(["shift_jis", "cp932"]) + elif lower in {"shift_jis", "sjis"}: + choices.append("cp932") + elif lower in {"windows-950", "cp950", "big5"}: + choices.append("cp950") + elif lower in {"windows-874", "cp874"}: + choices.append("cp874") + + choices.append("latin-1") + + # Preserve order while de-duplicating + return tuple(dict.fromkeys(choices)) + + def _batched_bytes(block: bytes, n: int) -> Iterator[bytes]: """Batch bytes from `block` into segments of `n` bytes each.