diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..77ac754 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +.venv/ +__pycache__/ +*.pyc diff --git a/compliance.md b/compliance.md new file mode 100644 index 0000000..b094f08 --- /dev/null +++ b/compliance.md @@ -0,0 +1,44 @@ +# RFC 5322 Compliance Matrix + +Maps ABNF productions to parser implementation and test coverage. + +| ABNF Production | RFC § | Parser Method | Test Cases | Status | +|---|---|---|---|---| +| `address` | RFC 5322 §3.4 | `parse() / _parse_address()` | `test_mailbox_simple_addr_spec`, `test_mailbox_name_addr`, `test_group_empty`, `test_group_single_mailbox`, `test_*` | ✅ Complete | +| `mailbox` | RFC 5322 §3.4 | `_parse_mailbox()` | `test_mailbox_simple_addr_spec`, `test_mailbox_name_addr`, `test_mailbox_name_addr_no_space_before_angle`, `test_mailbox_angle_addr_only`, `test_mailbox_quoted_display_name` | ✅ Complete | +| `name-addr` | RFC 5322 §3.4 | `_parse_name_addr() / _parse_angle_addr()` | `test_mailbox_name_addr`, `test_mailbox_name_addr_no_space_before_angle`, `test_name_addr_cfws_around_angle` | ✅ Complete | +| `angle-addr` | RFC 5322 §3.4 | `_parse_angle_addr()` | `test_mailbox_angle_addr_only`, `test_name_addr_cfws_around_angle`, `test_unclosed_angle_bracket` | ✅ Complete | +| `addr-spec` | RFC 5322 §3.4.1 | `_parse_addr_spec()` | `test_addr_spec_minimal`, `test_addr_spec_common_form`, `test_addr_spec_missing_at_rejected`, `test_addr_spec_missing_domain_rejected`, `test_addr_spec_double_at_rejected`, `test_addr_spec_only_at_rejected` | ✅ Complete | +| `local-part` | RFC 5322 §3.4.1 | `_parse_local_part()` | `test_local_part_dot_atom`, `test_local_part_quoted_string`, `test_local_part_quoted_string_with_escape`, `test_local_part_case_preserved` | ✅ Complete | +| `domain` | RFC 5322 §3.4.1 | `_parse_domain()` | `test_domain_dot_atom`, `test_domain_subdomains`, `test_domain_literal_ipv4`, `test_domain_literal_ipv6`, `test_domain_literal_with_cfws`, `test_domain_literal_tag` | ✅ Complete | +| `domain-literal` | RFC 5322 §3.4.1 | `_parse_domain_literal()` | `test_domain_literal_ipv4`, `test_domain_literal_ipv6`, `test_domain_literal_with_cfws`, `test_domain_literal_tag` | ✅ Complete | +| `dot-atom` | RFC 5322 §3.2.3 | `_parse_dot_atom()` | `test_dot_atom_simple`, `test_dot_atom_multiple_dots`, `test_dot_atom_as_domain`, `test_dot_atom_trailing_dot_rejected`, `test_atom_leading_dot_fails`, `test_atom_consecutive_dots` | ✅ Complete | +| `quoted-string` | RFC 5322 §3.2.4 | `_parse_quoted_string()` | `test_quoted_string_simple`, `test_quoted_string_preserves_spaces`, `test_quoted_string_preserves_tabs`, `test_quoted_string_in_display_name`, `test_quoted_string_with_qtext_specials`, `test_quoted_string_empty`, `test_quoted_string_unclosed_rejected`, `test_quoted_string_with_crlf_strict` | ✅ Complete | +| `quoted-pair` | RFC 5322 §3.2.1 | `_parse_quoted_pair()` | `test_quoted_pair_in_quoted_string`, `test_quoted_pair_backslash_backslash`, `test_quoted_pair_in_display_name`, `test_quoted_pair_in_comment_strict`, `test_obs_qp_rejected_strict`, `test_obs_qp_accepted_permissive` | ✅ Complete | +| `CFWS` | RFC 5322 §3.2.2 | `_skip_cfws() / _parse_comment()` | `test_comment_before_address`, `test_comment_after_address_in_angle`, `test_comment_in_display_name`, `test_nested_comments`, `test_multiple_comments`, `test_comment_inside_angle_addr`, `test_fws_after_comma_in_list` | ✅ Complete | +| `FWS` | RFC 5322 §3.2.2 | `_skip_fws()` | `test_single_space_between_words`, `test_multiple_spaces_collapse`, `test_tab_between_words`, `test_fws_after_comma_in_list` | ✅ Complete | +| `comment` | RFC 5322 §3.2.2 | `_parse_comment()` | `test_comment_before_address`, `test_nested_comments`, `test_multiple_comments`, `test_comment_inside_angle_addr`, `test_comment_after_address_in_angle` | ✅ Complete | +| `phrase` | RFC 5322 §3.2.5 | `_parse_phrase()` | `test_phrase_single_word`, `test_phrase_multiple_words`, `test_phrase_mixed_atom_and_quoted`, `test_phrase_dots_between_atoms`, `test_phrase_dots_between_atoms_rejected_strict` | ✅ Complete | +| `atom` | RFC 5322 §3.2.3 | `_parse_atom()` | `test_atom_simple`, `test_atom_with_allowed_special_chars` | ✅ Complete | +| `word` | RFC 5322 §3.2.5 | `_parse_word()` | `test_phrase_single_word`, `test_phrase_multiple_words`, `test_phrase_mixed_atom_and_quoted` | ✅ Complete | +| `address-list` | RFC 5322 §3.4 | `parse_address_list()` | `test_address_list_two_simple`, `test_address_list_mixed_types`, `test_address_list_with_group`, `test_address_list_single_address`, `test_address_list_with_cfws` | ✅ Complete | +| `mailbox-list` | RFC 5322 §3.4 | `parse_mailbox_list()` | `test_mailbox_list_simple`, `test_mailbox_list_rejects_group_strict` | ✅ Complete | +| `group` | RFC 5322 §3.4 | `_parse_group()` | `test_group_empty`, `test_group_single_mailbox`, `test_group_multiple_mailboxes`, `test_group_multiple_mailboxes_with_names`, `test_group_cfws_after_colon`, `test_group_cfws_before_semicolon`, `test_group_not_closed_rejected`, `test_group_missing_colon_rejected` | ✅ Complete | +| `dtext` | RFC 5322 §3.4.1 | `_parse_domain_literal()` | `test_domain_literal_ipv4`, `test_domain_literal_tag` | ✅ Complete | +| `obs-local-part` | RFC 5322 §4.4 | `_parse_obs_local_part()` | `test_obs_local_part_with_dots`, `test_obs_local_part_atom_and_quoted` | ✅ Complete | +| `obs-domain` | RFC 5322 §4.4 | `_parse_obs_domain()` | `test_obs_domain_atoms` | ✅ Complete | +| `obs-angle-addr` | RFC 5322 §4.4 | `_parse_angle_addr() (permissive)` | `test_obs_angle_addr_single_hop`, `test_obs_angle_addr_multi_hop`, `test_obs_angle_addr_with_display_name`, `test_obs_angle_addr_rejected_strict` | ✅ Complete | +| `obs-route` | RFC 5322 §4.4 | `_parse_obs_route()` | `test_obs_angle_addr_single_hop`, `test_obs_angle_addr_multi_hop` | ✅ Complete | +| `obs-group-list` | RFC 5322 §4.4 | `_parse_group() (permissive)` | `test_obs_group_list_empty_with_commas` | ✅ Complete | +| `obs-mbox-list` | RFC 5322 §4.4 | `parse_mailbox_list() (permissive)` | `test_obs_mbox_list_trailing_comma`, `test_obs_mbox_list_leading_comma`, `test_obs_mbox_list_empty_elements` | ✅ Complete | +| `obs-addr-list` | RFC 5322 §4.4 | `parse_address_list() (permissive)` | `test_obs_addr_list_trailing_comma`, `test_obs_addr_list_leading_comma` | ✅ Complete | + +## Summary + +- **Parser**: `parser.py` — `RFC5322Address` dataclass + `AddressParser` class with `parse()`, `parse_address_list()`, `parse_mailbox_list()` +- **Tests**: `test_parser.py` — 112 test cases, all passing +- **Strict mode**: Rejects all `obs-*` productions from RFC 5322 §4.4 +- **Permissive mode**: Accepts obsolete forms per §4.4 +- **No external dependencies**: Pure Python stdlib only +- **Type hints**: All public methods annotated +- **Max line length**: 998 characters (RFC 5322 limit) diff --git a/parser.py b/parser.py new file mode 100644 index 0000000..ed8230f --- /dev/null +++ b/parser.py @@ -0,0 +1,1101 @@ +#!/usr/bin/env python3 +"""RFC 5322 Address Parser. + +Implements the full ABNF grammar from RFC 5322 §3.2 (lexical tokens), +§3.4–§3.4.1 (address specification), and §4.4 (obsolete addressing). + +strict=True (default) rejects all obs-* productions. +strict=False (permissive) accepts obsolete forms per §4. + +Pure Python 3 stdlib — no external dependencies. +""" + +from __future__ import annotations + +import re +from dataclasses import dataclass, field +from typing import List, Optional, Tuple + +# --------------------------------------------------------------------------- +# Character classification helpers (matches RFC 5322 definitions) +# --------------------------------------------------------------------------- + +# Printable US-ASCII (33-126) +_VCHAR = set(chr(c) for c in range(33, 127)) + +# White space: SP (32) and HTAB (9) +_WSP = {" ", "\t"} + +# atext (§3.2.3): ALPHA / DIGIT / "!" / "#" / "$" / "%" / "&" / "'" / +# "*" / "+" / "-" / "/" / "=" / "?" / "^" / "_" / "`" / "{" / "|" / "}" / "~" +_ATEXT = ( + set(chr(c) for c in range(65, 91)) # ALPHA uppercase + | set(chr(c) for c in range(97, 123)) # ALPHA lowercase + | set(chr(c) for c in range(48, 58)) # DIGIT + | set("!#$%&'*+-/=?^_`{|}~") +) + +# ctext (§3.2.2): %d33-39 / %d42-91 / %d93-126 / obs-ctext +_CTEXT = ( + set(chr(c) for c in range(33, 40)) # 33-39 + | set(chr(c) for c in range(42, 92)) # 42-91 + | set(chr(c) for c in range(93, 127)) # 93-126 +) + +# qtext (§3.2.4): %d33 / %d35-91 / %d93-126 / obs-qtext +_QTEXT = ( + set(chr(33)) + | set(chr(c) for c in range(35, 92)) # 35-91 (excludes 34=DQUOTE) + | set(chr(c) for c in range(93, 127)) # 93-126 +) + +# dtext (§3.4.1): %d33-90 / %d94-126 / obs-dtext +_DTEXT = ( + set(chr(c) for c in range(33, 91)) # 33-90 + | set(chr(c) for c in range(94, 127)) # 94-126 +) + +# obs-NO-WS-CTL (§4.1): %d1-8 / %d11 / %d12 / %d14-31 / %d127 +_OBS_NO_WS_CTL = ( + set(chr(c) for c in range(1, 9)) + | {chr(11), chr(12)} + | set(chr(c) for c in range(14, 32)) + | {chr(127)} +) + + +# --------------------------------------------------------------------------- +# Exceptions +# --------------------------------------------------------------------------- + +class AddressParserError(ValueError): + """Raised when an RFC 5322 address string cannot be parsed.""" + + def __init__(self, message: str, pos: int = 0, context: str = ""): + super().__init__(message) + self.pos = pos + self.context = context + + +# --------------------------------------------------------------------------- +# Data model +# --------------------------------------------------------------------------- + +@dataclass +class RFC5322Address: + """Represents a parsed RFC 5322 address. + + Attributes: + display_name: Optional display name (from name-addr or group). + local_part: Local part of the addr-spec (before the @). + domain: Domain part of the addr-spec (after the @). + is_group: True if this is a group construct. + group_mailboxes: List of member mailboxes (only for groups). + raw: The raw input string for this address. + """ + + display_name: Optional[str] = None + local_part: Optional[str] = None + domain: Optional[str] = None + is_group: bool = False + group_mailboxes: List["RFC5322Address"] = field(default_factory=list) + raw: str = "" + + def __repr__(self) -> str: + if self.is_group: + members = ", ".join(repr(m) for m in self.group_mailboxes) + return (f"RFC5322Address(display_name={self.display_name!r}, " + f"is_group=True, group_mailboxes=[{members}])") + return (f"RFC5322Address(display_name={self.display_name!r}, " + f"local_part={self.local_part!r}, " + f"domain={self.domain!r})") + + +# =================================================================== +# Parser +# =================================================================== + +class AddressParser: + """RFC 5322 address parser. + + Example: + >>> parser = AddressParser(strict=True) + >>> result = parser.parse("John Doe ") + >>> result.local_part + 'jdoe' + """ + + # Maximum characters per line (RFC 5322 §2.1.1) + MAX_LINE_LENGTH = 998 + + def __init__(self, strict: bool = True): + """Initialise the parser. + + Args: + strict: If True (default), reject all obs-* productions. + If False, accept obsolete syntax per §4. + """ + self._strict = strict + self._input: str = "" + self._pos: int = 0 + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + def parse(self, text: str) -> RFC5322Address: + """Parse a single address (mailbox or group). + + Args: + text: Input string to parse. + + Returns: + An RFC5322Address instance. + + Raises: + AddressParserError: If parsing fails. + """ + if len(text) > self.MAX_LINE_LENGTH: + raise AddressParserError( + f"Input exceeds {self.MAX_LINE_LENGTH} characters", + pos=0, context=text[:40], + ) + self._input = text + self._pos = 0 + self._skip_cfws() + addr = self._parse_address() + self._skip_cfws() + if self._pos < len(self._input): + raise AddressParserError( + f"Unexpected trailing character at {self._pos}: " + f"{self._input[self._pos]!r}", + pos=self._pos, + context=self._input[max(0, self._pos - 10):self._pos + 10], + ) + return addr + + def parse_address_list(self, text: str) -> List[RFC5322Address]: + """Parse a list of addresses separated by commas. + + Args: + text: Input string, e.g. "alice@a.com, bob@b.com". + + Returns: + List of RFC5322Address instances. + """ + if len(text) > self.MAX_LINE_LENGTH: + raise AddressParserError( + f"Input exceeds {self.MAX_LINE_LENGTH} characters", + ) + self._input = text + self._pos = 0 + self._skip_cfws() + results = self._parse_address_list() + self._skip_cfws() + if self._pos < len(self._input): + raise AddressParserError( + f"Unexpected trailing character at {self._pos}: " + f"{self._input[self._pos]!r}", + pos=self._pos, + ) + return results + + def parse_mailbox_list(self, text: str) -> List[RFC5322Address]: + """Parse a list of mailboxes separated by commas. + + Args: + text: Input string, e.g. "alice@a.com, bob@b.com". + + Returns: + List of RFC5322Address instances (mailboxes only, no groups). + """ + if len(text) > self.MAX_LINE_LENGTH: + raise AddressParserError( + f"Input exceeds {self.MAX_LINE_LENGTH} characters", + ) + self._input = text + self._pos = 0 + self._skip_cfws() + results = self._parse_mailbox_list() + self._skip_cfws() + if self._pos < len(self._input): + raise AddressParserError( + f"Unexpected trailing character at {self._pos}: " + f"{self._input[self._pos]!r}", + pos=self._pos, + ) + return results + + # ------------------------------------------------------------------ + # Peek / consume helpers + # ------------------------------------------------------------------ + + def _peek(self) -> str: + """Return current character or '' if at end.""" + if self._pos < len(self._input): + return self._input[self._pos] + return "" + + def _consume(self) -> str: + """Return current character and advance position.""" + ch = self._peek() + if ch: + self._pos += 1 + return ch + + def _expect(self, expected: str) -> str: + """Consume and return the current character if it matches + *expected*, else raise.""" + ch = self._peek() + if ch != expected: + raise AddressParserError( + f"Expected {expected!r}, got {ch!r} at position {self._pos}", + pos=self._pos, + context=self._input[max(0, self._pos - 10):self._pos + 10], + ) + self._pos += 1 + return ch + + # ------------------------------------------------------------------ + # §3.2.1 Quoted characters — quoted-pair + # ------------------------------------------------------------------ + + def _parse_quoted_pair(self) -> str: + """Parse a quoted-pair; return the un-escaped character.""" + if self._peek() != "\\": + raise AddressParserError( + f"Expected backslash for quoted-pair at {self._pos}", + pos=self._pos, + ) + self._pos += 1 + ch = self._peek() + if not ch: + raise AddressParserError( + "Unexpected end of input after backslash", + pos=self._pos, + ) + + # Standard form: VCHAR or WSP + if ch in _VCHAR or ch in _WSP: + self._pos += 1 + return ch + + # obs-qp (only if permissive) + if not self._strict: + if ch == "\x00" or ch in _OBS_NO_WS_CTL or ch in ("\n", "\r"): + self._pos += 1 + return ch + + raise AddressParserError( + f"Invalid quoted-pair character {ch!r} at {self._pos}", + pos=self._pos, + ) + + # ------------------------------------------------------------------ + # §3.2.2 Folding White Space and Comments — FWS, CFWS, comment + # ------------------------------------------------------------------ + + def _skip_fws(self) -> None: + """Skip folding white space (FWS). + + FWS = ([*WSP CRLF] 1*WSP) / obs-FWS + + The grammar breaks down as: + 1. Optional prefix: zero or more (*WSP CRLF) blocks + (i.e. optional WSP before each CRLF). + 2. Mandatory 1*WSP (at least one space or tab). + 3. In permissive mode, obs-FWS appends: *(CRLF 1*WSP). + """ + start = self._pos + + # ----------------------------------------------------------------- + # Step 1 — [*WSP CRLF] (optional prefix) + # Each iteration consumes one (*WSP CRLF) block. + # Important: WSP that is NOT followed by CRLF must NOT be consumed + # here – it is reserved for the mandatory 1*WSP below. + # ----------------------------------------------------------------- + while True: + saved = self._pos + # optional WSP before CRLF + while self._peek() in _WSP: + self._pos += 1 + if self._has_crlf(): + self._pos += 2 # consumed *WSP CRLF + else: + self._pos = saved # restore — no CRLF follows + break + + # ----------------------------------------------------------------- + # Step 2 — 1*WSP (mandatory – at least one space or tab) + # ----------------------------------------------------------------- + if self._peek() not in _WSP: + self._pos = start # no FWS at all + return + + while self._peek() in _WSP: + self._pos += 1 + + # ----------------------------------------------------------------- + # Step 3 — obs-FWS = *(CRLF 1*WSP) (permissive mode only) + # ----------------------------------------------------------------- + if not self._strict: + while True: + if self._has_crlf(): + saved2 = self._pos + self._pos += 2 + if self._peek() in _WSP: + while self._peek() in _WSP: + self._pos += 1 + else: + self._pos = saved2 + break + else: + break + + def _has_crlf(self) -> bool: + """Check if CRLF is at the current position.""" + return ( + self._pos + 1 < len(self._input) + and self._input[self._pos] == "\r" + and self._input[self._pos + 1] == "\n" + ) + + def _parse_comment(self) -> str: + """Parse a comment: '(' *([FWS] ccontent) [FWS] ')'.""" + self._expect("(") + content_parts: List[str] = [] + depth = 1 + + while depth > 0 and self._pos < len(self._input): + ch = self._peek() + if ch == "(": + depth += 1 + content_parts.append(self._consume()) + elif ch == ")": + depth -= 1 + if depth == 0: + self._consume() + break + content_parts.append(self._consume()) + elif ch == "\\": + try: + qp = self._parse_quoted_pair() + content_parts.append(qp) + except AddressParserError: + content_parts.append(self._consume()) + if self._peek(): + content_parts.append(self._consume()) + elif ch in _WSP or ch == "\r": + self._skip_fws() + content_parts.append(" ") + elif ch in _CTEXT: + content_parts.append(self._consume()) + elif not self._strict and ch in _OBS_NO_WS_CTL: + content_parts.append(self._consume()) + else: + if not self._strict and ord(ch) < 128: + content_parts.append(self._consume()) + else: + raise AddressParserError( + f"Invalid ctext character {ch!r} at {self._pos}", + pos=self._pos, + ) + + return "".join(content_parts) + + def _skip_cfws(self) -> None: + """Skip comments and folding white space (CFWS). + + CFWS = (1*([FWS] comment) [FWS]) / FWS + """ + while True: + saved = self._pos + self._skip_fws() + if self._peek() == "(": + self._parse_comment() + self._skip_fws() + saved = self._pos + if self._pos == saved: + break + + # ------------------------------------------------------------------ + # §3.2.3 Atom / dot-atom + # ------------------------------------------------------------------ + + def _parse_atom(self) -> str: + """atom = [CFWS] 1*atext [CFWS]""" + self._skip_cfws() + text = self._parse_atext_run() + if not text: + raise AddressParserError( + f"Expected atom at position {self._pos}", + pos=self._pos, + ) + self._skip_cfws() + return text + + def _parse_atext_run(self) -> str: + """Parse a run of 1*atext characters.""" + buf: List[str] = [] + while self._peek() in _ATEXT: + buf.append(self._consume()) + return "".join(buf) + + def _parse_dot_atom_text(self) -> str: + """dot-atom-text = 1*atext *("." 1*atext)""" + part = self._parse_atext_run() + if not part: + raise AddressParserError( + f"Expected dot-atom-text at position {self._pos}", + pos=self._pos, + ) + buf = [part] + while self._peek() == ".": + self._consume() # consume the dot + next_part = self._parse_atext_run() + if not next_part: + # Dot not followed by atext — invalid dot-atom-text. + # Back up one character (the dot) so callers see what + # stopped the parse, but signal failure so that + # permissive mode can fall through to obs-* rules. + self._pos -= 1 + raise AddressParserError( + f"Invalid dot-atom-text: trailing dot at {self._pos}", + pos=self._pos, + ) + buf.append(".") + buf.append(next_part) + return "".join(buf) + + def _parse_dot_atom(self) -> str: + """dot-atom = [CFWS] dot-atom-text [CFWS]""" + self._skip_cfws() + text = self._parse_dot_atom_text() + self._skip_cfws() + return text + + # ------------------------------------------------------------------ + # §3.2.4 Quoted Strings + # ------------------------------------------------------------------ + + def _parse_quoted_string(self) -> str: + """Parse a quoted-string; return content between quotes. + + Per §3.2.4: CRLF inside FWS/CFWS within a quoted-string is + semantically invisible, but WSP (SP / HTAB) is visible content + and MUST be preserved in the result. + """ + self._skip_cfws() + self._expect('"') + buf: List[str] = [] + while self._peek() and self._peek() != '"': + # ---- strip invisible CRLF (keep WSP as visible content) ---- + while True: + saved = self._pos + while self._peek() in _WSP: + self._pos += 1 + if self._has_crlf(): + self._pos += 2 # CRLF (and leading *WSP) is invisible + else: + self._pos = saved # restore WSP — it is visible content + break + + if self._peek() == '"': + break + + # visible WSP (SP / HTAB) + if self._peek() in _WSP: + buf.append(self._consume()) + continue + + # qcontent + if self._peek() == "\\": + qp = self._parse_quoted_pair() + buf.append(qp) + elif self._peek() in _QTEXT: + buf.append(self._consume()) + elif not self._strict and self._peek() in _OBS_NO_WS_CTL: + buf.append(self._consume()) + else: + if not self._strict and ord(self._peek()) < 128: + buf.append(self._consume()) + else: + raise AddressParserError( + f"Invalid qcontent char {self._peek()!r} at {self._pos}", + pos=self._pos, + ) + self._expect('"') + self._skip_cfws() + return "".join(buf) + + # ------------------------------------------------------------------ + # §3.2.5 Miscellaneous — word, phrase + # ------------------------------------------------------------------ + + def _parse_word(self) -> str: + """word = atom / quoted-string""" + saved = self._pos + if self._peek() == '"': + return self._parse_quoted_string() + try: + return self._parse_atom() + except AddressParserError: + self._pos = saved + raise + + def _parse_phrase(self) -> str: + """phrase = 1*word / obs-phrase + + In strict mode dots are only allowed when followed by a + quoted-string or end-of-phrase (abbreviation style like ``Dr.``). + A dot directly between two atoms (e.g. ``John.Doe``) is an + obs-phrase production and is rejected. In permissive mode dots + between words are consumed silently. + """ + words: List[str] = [] + saved = self._pos + + # ---- first word (mandatory) ---- + try: + w = self._parse_word() + words.append(w) + except AddressParserError: + # obs-phrase fallback (§4.1): word *(word / "." / CFWS) + if not self._strict: + self._pos = saved + try: + first = self._parse_word() + words.append(first) + except AddressParserError: + self._pos = saved + raise AddressParserError( + f"Expected phrase at position {self._pos}", + pos=self._pos, + ) + + while True: + saved2 = self._pos + self._skip_cfws() + if self._peek() == ".": + self._consume() + continue + try: + w2 = self._parse_word() + words.append(w2) + except AddressParserError: + self._pos = saved2 + break + + return " ".join(words) + + raise AddressParserError( + f"Expected phrase at position {self._pos}", + pos=self._pos, + ) + + # ---- additional words, with dot handling ---- + while True: + saved2 = self._pos + self._skip_cfws() + if self._peek() == ".": + dot_pos = self._pos + self._consume() + if self._strict: + # In strict mode, a dot directly between two atoms + # is obs-phrase — reject it. Dots followed by a + # quoted-string or end-of-phrase are tolerated as + # trailing punctuation (e.g. ``Dr.``, ``Jr.``). + saved3 = self._pos + self._skip_cfws() + if self._peek() in _ATEXT: + raise AddressParserError( + f"Dot between atoms in phrase " + f"(obs-phrase) at position {dot_pos}", + pos=dot_pos, + ) + self._pos = saved3 + if words: + words[-1] = words[-1] + "." + # In permissive mode dots are consumed silently + continue + try: + w = self._parse_word() + words.append(w) + except AddressParserError: + self._pos = saved2 + break + + return " ".join(words) + + # ------------------------------------------------------------------ + # §3.4 Address Specification + # ------------------------------------------------------------------ + + def _parse_address(self) -> RFC5322Address: + """address = mailbox / group""" + saved = self._pos + try: + return self._parse_group() + except AddressParserError: + self._pos = saved + return self._parse_mailbox() + + def _parse_mailbox(self) -> RFC5322Address: + """mailbox = name-addr / addr-spec""" + saved = self._pos + + # Look-ahead: scan for '<' before '@' to detect name-addr + brace_pos = -1 + at_pos = -1 + i = self._pos + in_quote = False + depth = 0 + while i < len(self._input): + ch = self._input[i] + if in_quote: + if ch == '\\': + i += 2 + continue + if ch == '"': + in_quote = False + elif depth > 0: + if ch == '(': + depth += 1 + elif ch == ')': + depth -= 1 + elif ch == '\\': + i += 2 + continue + else: + if ch == '"': + in_quote = True + elif ch == '(': + depth = 1 + elif ch == '<': + brace_pos = i + break + elif ch == '@': + at_pos = i + break + elif ch in (',', ';', '>', ':'): + break + i += 1 + + # name-addr detection + is_name_addr = brace_pos >= 0 and (at_pos < 0 or brace_pos < at_pos) + + if is_name_addr: + try: + return self._parse_name_addr() + except AddressParserError: + self._pos = saved + raise + + # Try name-addr (bare case) + try: + return self._parse_name_addr() + except AddressParserError: + self._pos = saved + return self._parse_addr_spec_mailbox() + + def _parse_name_addr(self) -> RFC5322Address: + """name-addr = [display-name] angle-addr""" + saved = self._pos + display_name = None + + if self._peek() != "<": + before = self._pos + try: + display_name = self._parse_phrase() + except AddressParserError: + # When the phrase partially consumed input (e.g. an + # obs-phrase dot between atoms was rejected in strict + # mode) do NOT fall through to bare angle-addr — let + # the error propagate so the caller sees the real + # reason the input was rejected. + if self._pos != before: + raise + self._pos = saved + + angle = self._parse_angle_addr() + result = RFC5322Address( + display_name=display_name, + local_part=angle.local_part, + domain=angle.domain, + raw=self._input[saved:self._pos], + ) + return result + + def _parse_angle_addr(self) -> RFC5322Address: + """angle-addr = [CFWS] "<" addr-spec ">" [CFWS] / obs-angle-addr""" + saved = self._pos + self._skip_cfws() + + if self._peek() != "<": + raise AddressParserError( + f"Expected '<' at position {self._pos}", + pos=self._pos, + ) + + self._consume() # "<" + self._skip_cfws() + saved2 = self._pos # position after "<" CFWS + + # ---- standard path: "<" addr-spec ">" ---- + try: + addr = self._parse_addr_spec() + self._skip_cfws() + self._expect(">") + self._skip_cfws() + addr.raw = self._input[saved:self._pos] + return addr + except AddressParserError: + # ---- obs-angle-addr (§4.4): "<" obs-route addr-spec ">" ---- + if not self._strict: + self._pos = saved2 + self._parse_obs_route() + addr = self._parse_addr_spec() + self._skip_cfws() + self._expect(">") + self._skip_cfws() + addr.raw = self._input[saved:self._pos] + return addr + raise + + def _parse_group(self) -> RFC5322Address: + """group = display-name ":" [group-list] ";" [CFWS]""" + saved = self._pos + display_name = self._parse_phrase() + self._skip_cfws() + + if self._peek() != ":": + raise AddressParserError( + f"Expected ':' for group at position {self._pos}", + pos=self._pos, + ) + self._consume() # ":" + saved_after_colon = self._pos + self._skip_cfws() + + group_list: List[RFC5322Address] = [] + + if self._peek() == ";": + pass + elif self._try_group_semicolon_after_cfws(): + pass + else: + try: + group_list = self._parse_mailbox_list() + except AddressParserError: + if not self._strict: + self._pos = saved_after_colon + self._parse_obs_group_list() + else: + raise + + self._skip_cfws() + self._expect(";") + self._skip_cfws() + + return RFC5322Address( + display_name=display_name, + is_group=True, + group_mailboxes=group_list, + raw=self._input[saved:self._pos], + ) + + def _try_group_semicolon_after_cfws(self) -> bool: + """Check if CFWS then ';' follows (empty group-list).""" + saved = self._pos + self._skip_cfws() + if self._peek() == ";": + return True + self._pos = saved + return False + + # ------------------------------------------------------------------ + # Lists + # ------------------------------------------------------------------ + + def _parse_address_list(self) -> List[RFC5322Address]: + """address-list = ...""" + saved = self._pos + try: + return self._parse_standard_address_list() + except AddressParserError: + if not self._strict: + self._pos = saved + return self._parse_obs_addr_list() + raise + + def _parse_standard_address_list(self) -> List[RFC5322Address]: + results: List[RFC5322Address] = [] + results.append(self._parse_address()) + self._skip_cfws() + while self._peek() == ",": + self._consume() + self._skip_cfws() + results.append(self._parse_address()) + self._skip_cfws() + return results + + def _parse_mailbox_list(self) -> List[RFC5322Address]: + saved = self._pos + try: + return self._parse_standard_mailbox_list() + except AddressParserError: + if not self._strict: + self._pos = saved + return self._parse_obs_mbox_list() + raise + + def _parse_standard_mailbox_list(self) -> List[RFC5322Address]: + results: List[RFC5322Address] = [] + results.append(self._parse_mailbox()) + self._skip_cfws() + while self._peek() == ",": + self._consume() + self._skip_cfws() + results.append(self._parse_mailbox()) + self._skip_cfws() + return results + + # ------------------------------------------------------------------ + # §3.4.1 Addr-Spec + # ------------------------------------------------------------------ + + def _parse_addr_spec(self) -> RFC5322Address: + saved = self._pos + local_part = self._parse_local_part() + self._skip_cfws() + self._expect("@") + self._skip_cfws() + domain = self._parse_domain() + return RFC5322Address( + local_part=local_part, + domain=domain, + raw=self._input[saved:self._pos], + ) + + def _parse_addr_spec_mailbox(self) -> RFC5322Address: + saved = self._pos + addr = self._parse_addr_spec() + addr.raw = self._input[saved:self._pos] + return addr + + def _parse_local_part(self) -> str: + saved = self._pos + # In permissive mode delegate to obs-local-part for mixed + # forms like "hello"."world" or hello."world" where + # strict dot-atom or a bare quoted-string would stop short. + if not self._strict: + return self._parse_obs_local_part() + if self._peek() == '"': + return self._parse_quoted_string() + try: + return self._parse_dot_atom() + except AddressParserError: + if not self._strict: + self._pos = saved + return self._parse_obs_local_part() + raise + + def _parse_domain(self) -> str: + saved = self._pos + if self._peek() == "[": + return self._parse_domain_literal() + try: + return self._parse_dot_atom() + except AddressParserError: + if not self._strict: + self._pos = saved + return self._parse_obs_domain() + raise + + # ------------------------------------------------------------------ + # domain-literal + # ------------------------------------------------------------------ + + def _parse_domain_literal(self) -> str: + saved = self._pos + self._skip_cfws() + self._expect("[") + buf: List[str] = [] + while self._peek() and self._peek() != "]": + self._skip_cfws() + if self._peek() == "]": + break + if self._peek() in _DTEXT: + buf.append(self._consume()) + elif self._peek() == "\\": + if not self._strict: + try: + qp = self._parse_quoted_pair() + buf.append(qp) + except AddressParserError: + buf.append(self._consume()) + if self._peek(): + buf.append(self._consume()) + else: + raise AddressParserError( + f"Invalid dtext char {self._peek()!r} at {self._pos}", + pos=self._pos, + ) + elif not self._strict and self._peek() in _OBS_NO_WS_CTL: + buf.append(self._consume()) + else: + raise AddressParserError( + f"Invalid dtext char {self._peek()!r} at {self._pos}", + pos=self._pos, + ) + self._skip_cfws() + self._expect("]") + self._skip_cfws() + return "[" + "".join(buf) + "]" + + # ------------------------------------------------------------------ + # §4.4 Obsolete Addressing + # ------------------------------------------------------------------ + + def _parse_obs_route(self) -> None: + """obs-route = obs-domain-list ':'""" + self._skip_cfws() + while self._peek() == ",": + self._consume() + self._skip_cfws() + self._expect("@") + self._parse_domain() + self._skip_cfws() + while self._peek() == ",": + self._consume() + self._skip_cfws() + if self._peek() == "@": + self._consume() + self._parse_domain() + self._skip_cfws() + self._expect(":") + + def _parse_obs_mbox_list(self) -> List[RFC5322Address]: + results: List[RFC5322Address] = [] + while True: + saved = self._pos + self._skip_cfws() + if self._peek() == ",": + self._consume() + else: + self._pos = saved + break + results.append(self._parse_mailbox()) + self._skip_cfws() + while self._peek() == ",": + self._consume() + self._skip_cfws() + saved2 = self._pos + self._skip_cfws() + if self._peek() in (",", ";") or self._pos >= len(self._input): + continue + self._pos = saved2 + try: + results.append(self._parse_mailbox()) + except AddressParserError: + self._pos = saved2 + self._skip_cfws() + self._skip_cfws() + return results + + def _parse_obs_addr_list(self) -> List[RFC5322Address]: + results: List[RFC5322Address] = [] + while True: + saved = self._pos + self._skip_cfws() + if self._peek() == ",": + self._consume() + else: + self._pos = saved + break + results.append(self._parse_address()) + self._skip_cfws() + while self._peek() == ",": + self._consume() + self._skip_cfws() + saved2 = self._pos + self._skip_cfws() + if self._peek() in (",", ";") or self._pos >= len(self._input): + continue + self._pos = saved2 + try: + results.append(self._parse_address()) + except AddressParserError: + self._pos = saved2 + self._skip_cfws() + self._skip_cfws() + return results + + def _parse_obs_group_list(self) -> None: + """obs-group-list = 1*([CFWS] ',') [CFWS]""" + count = 0 + while True: + self._skip_cfws() + if self._peek() == ",": + self._consume() + count += 1 + else: + break + if count == 0: + raise AddressParserError( + f"Expected obs-group-list at position {self._pos}", + pos=self._pos, + ) + self._skip_cfws() + + def _parse_obs_local_part(self) -> str: + """obs-local-part = word *('.' word)""" + words: List[str] = [] + words.append(self._parse_word()) + while self._peek() == ".": + saved = self._pos + self._consume() + try: + words.append(".") + words.append(self._parse_word()) + except AddressParserError: + self._pos = saved + break + return "".join(words) + + def _parse_obs_domain(self) -> str: + """obs-domain = atom *('.' atom)""" + atoms: List[str] = [] + atoms.append(self._parse_atom()) + while self._peek() == ".": + saved = self._pos + self._consume() + try: + atoms.append(".") + atoms.append(self._parse_atom()) + except AddressParserError: + self._pos = saved + break + return "".join(atoms) + + +# --------------------------------------------------------------------------- +# Module-level convenience functions +# --------------------------------------------------------------------------- + +def parse_address(text: str, strict: bool = True) -> RFC5322Address: + """Parse a single RFC 5322 address (mailbox or group).""" + parser = AddressParser(strict=strict) + return parser.parse(text) + + +def parse_address_list(text: str, strict: bool = True) -> List[RFC5322Address]: + """Parse an RFC 5322 address list (comma-separated addresses).""" + parser = AddressParser(strict=strict) + return parser.parse_address_list(text) + + +def parse_mailbox_list(text: str, strict: bool = True) -> List[RFC5322Address]: + """Parse an RFC 5322 mailbox list (comma-separated mailboxes).""" + parser = AddressParser(strict=strict) + return parser.parse_mailbox_list(text) diff --git a/test_parser.py b/test_parser.py new file mode 100644 index 0000000..03d6d97 --- /dev/null +++ b/test_parser.py @@ -0,0 +1,852 @@ +#!/usr/bin/env python3 +"""Comprehensive test suite for RFC 5322 Address Parser. + +Tests are organised by RFC 5322 section: + §3.2 – Lexical Tokens + §3.4 – Address Specification + §3.4.1 – Addr-Spec Specification + §4.4 – Obsolete Addressing + §A – Examples from RFC 5322 Appendix A + Error – Error handling and edge cases + +Run: + python -m pytest test_parser.py -v + python test_parser.py +""" + +from __future__ import annotations + +import pytest + +from parser import ( + AddressParser, + AddressParserError, + RFC5322Address, + parse_address, + parse_address_list, + parse_mailbox_list, +) + + +# ═══════════════════════════════════════════════════════════════════════════ +# Helpers +# ═══════════════════════════════════════════════════════════════════════════ + +def _a(display_name=None, local_part=None, domain=None, + is_group=False, group_mailboxes=None): + """Factory for expected RFC5322Address values.""" + return RFC5322Address( + display_name=display_name, + local_part=local_part, + domain=domain, + is_group=is_group, + group_mailboxes=group_mailboxes or [], + ) + + +def _strip_raw(addr): + """Return a copy of *addr* with ``raw`` cleared for comparison.""" + return RFC5322Address( + display_name=addr.display_name, + local_part=addr.local_part, + domain=addr.domain, + is_group=addr.is_group, + group_mailboxes=[_strip_raw(m) for m in addr.group_mailboxes], + ) + + +def _strip_raw_list(lst): + return [_strip_raw(a) for a in lst] + + +# ═══════════════════════════════════════════════════════════════════════════ +# §3.2 Lexical Tokens +# ═══════════════════════════════════════════════════════════════════════════ + +class TestLexicalTokens: + """Tests for RFC 5322 §3.2 — Lexical Tokens.""" + + # ── §3.2.1 Quoted characters (quoted-pair) ──────────────────────── + + def test_quoted_pair_in_quoted_string(self): + """quoted-pair inside a quoted-string un-escapes the character.""" + r = parse_address('"test\\"name"@example.com') + assert r.local_part == 'test"name' + + def test_quoted_pair_backslash_backslash(self): + """Backslash escaping another backslash.""" + r = parse_address('"test\\\\name"@example.com') + assert r.local_part == 'test\\name' + + def test_quoted_pair_in_display_name(self): + """Escaped quote in display-name quoted-string.""" + r = parse_address('"John \\"The Man\\" Doe" ') + assert r.display_name == 'John "The Man" Doe' + + def test_quoted_pair_in_comment_strict(self): + """Backslash escapes inside comments (strict mode).""" + r = parse_address(r'(comment with \) paren) user@example.com') + assert r.local_part == 'user' + + def test_obs_qp_rejected_strict(self): + """obs-qp (backslash + control char) rejected in strict mode.""" + with pytest.raises(AddressParserError): + parse_address('"test\\\x01"@example.com', strict=True) + + def test_obs_qp_accepted_permissive(self): + """obs-qp accepted with strict=False.""" + r = parse_address('"test\\\x01"@example.com', strict=False) + assert r.local_part == 'test\x01' + + # ── §3.2.2 Folding White Space and Comments ────────────────────── + + def test_single_space_between_words(self): + """Single SP between atoms in display-name.""" + r = parse_address('John Doe ') + assert r.display_name == 'John Doe' + + def test_multiple_spaces_collapse(self): + """Multiple SP between atoms collapse to one.""" + r = parse_address('John Doe ') + assert r.display_name == 'John Doe' + + def test_tab_between_words(self): + """HTAB between atoms in display-name.""" + r = parse_address('John\tDoe ') + assert r.display_name == 'John Doe' + + def test_comment_before_address(self): + """CFWS before addr-spec is skipped.""" + r = parse_address('(a comment) user@example.com') + assert r.local_part == 'user' + assert r.domain == 'example.com' + + def test_comment_after_address_in_angle(self): + """CFWS after addr-spec inside angle brackets.""" + r = parse_address('') + assert r.local_part == 'user' + + def test_comment_in_display_name(self): + """Comment between words in display-name.""" + r = parse_address('John (middle name) Doe ') + assert r.display_name == 'John Doe' + + def test_nested_comments(self): + """Nested comments are handled correctly.""" + r = parse_address('(outer (inner) still outer) user@example.com') + assert r.local_part == 'user' + + def test_multiple_comments(self): + """Multiple consecutive comments.""" + r = parse_address('(one)(two) user@example.com') + assert r.local_part == 'user' + + def test_comment_inside_angle_addr(self): + """Comment right after opening angle bracket.""" + r = parse_address('< (comment) user@example.com>') + assert r.local_part == 'user' + + def test_fws_after_comma_in_list(self): + """FWS after comma in address list.""" + r = parse_address_list('alice@a.com, bob@b.com') + assert len(r) == 2 + assert r[0].local_part == 'alice' + assert r[1].local_part == 'bob' + + # ── §3.2.3 Atom / dot-atom ─────────────────────────────────────── + + def test_atom_simple(self): + """Simple atom as local-part.""" + r = parse_address('hello@example.com') + assert r.local_part == 'hello' + + def test_atom_with_allowed_special_chars(self): + """Atom with atext special characters.""" + r = parse_address("a!b#c$d%e&f'g*h+i-j/k=l?m^n_o`p{q|r}s~t@example.com") + assert r.local_part == "a!b#c$d%e&f'g*h+i-j/k=l?m^n_o`p{q|r}s~t" + + def test_dot_atom_simple(self): + """Simple dot-atom.""" + r = parse_address('first.last@example.com') + assert r.local_part == 'first.last' + + def test_dot_atom_multiple_dots(self): + """Multiple dots in dot-atom.""" + r = parse_address('a.b.c.d@example.com') + assert r.local_part == 'a.b.c.d' + + def test_dot_atom_as_domain(self): + """dot-atom used as domain part.""" + r = parse_address('user@mail.example.co.uk') + assert r.domain == 'mail.example.co.uk' + + def test_dot_atom_trailing_dot_rejected(self): + """Trailing dot — dot-atom-text requires 1*atext after each dot.""" + with pytest.raises(AddressParserError): + parse_address('hello.@example.com') + + def test_atom_leading_dot_fails(self): + """Leading dot makes dot-atom-text fail.""" + with pytest.raises(AddressParserError): + parse_address('.hello@example.com') + + def test_atom_consecutive_dots(self): + """Consecutive dots — dot-atom-text requires 1*atext between dots.""" + with pytest.raises(AddressParserError): + parse_address('a..b@example.com') + + # ── §3.2.4 Quoted Strings ──────────────────────────────────────── + + def test_quoted_string_simple(self): + """Simple quoted-string as local-part.""" + r = parse_address('"hello world"@example.com') + assert r.local_part == 'hello world' + + def test_quoted_string_preserves_spaces(self): + """Spaces inside quoted-string are preserved.""" + r = parse_address('"hello world"@example.com') + assert r.local_part == 'hello world' + + def test_quoted_string_preserves_tabs(self): + """Tabs inside quoted-string are preserved.""" + r = parse_address('"hello\tworld"@example.com') + assert r.local_part == 'hello\tworld' + + def test_quoted_string_in_display_name(self): + """Quoted-string as display-name.""" + r = parse_address('"Doe, John" ') + assert r.display_name == 'Doe, John' + + def test_quoted_string_with_qtext_specials(self): + """Characters allowed in qtext (excluding DQUOTE and backslash).""" + r = parse_address('"! #$%&\'()*+,-./0-9:;<=>?@A-Z[]^_`a-z{|}~"@example.com') + assert r.local_part is not None + + def test_quoted_string_empty(self): + """Empty quoted-string is valid.""" + r = parse_address('""@example.com') + assert r.local_part == '' + + def test_quoted_string_unclosed_rejected(self): + """Unclosed quoted-string raises error.""" + with pytest.raises(AddressParserError): + parse_address('"unclosed@example.com') + + def test_quoted_string_with_crlf_strict(self): + """CRLF inside quoted-string handled (invisible in strict).""" + r = parse_address('"hello\r\n world"@example.com') + assert r.local_part == 'hello world' + + # ── §3.2.5 Miscellaneous Tokens (word, phrase) ─────────────────── + + def test_phrase_single_word(self): + """Phrase with a single atom.""" + r = parse_address('Hello ') + assert r.display_name == 'Hello' + + def test_phrase_multiple_words(self): + """Phrase with multiple atoms.""" + r = parse_address('John David Doe ') + assert r.display_name == 'John David Doe' + + def test_phrase_mixed_atom_and_quoted(self): + """Phrase mixing atoms and quoted-strings.""" + r = parse_address('Dr. "John Doe" Jr. ') + assert r.display_name == 'Dr. John Doe Jr.' + + def test_phrase_dots_between_atoms(self): + """Dots between atoms are part of obs-phrase (permissive mode).""" + r = parse_address('John.Doe ', strict=False) + assert r.display_name == 'John Doe' + + def test_phrase_dots_between_atoms_rejected_strict(self): + """Dots between atoms (obs-phrase) rejected in strict mode.""" + with pytest.raises(AddressParserError, match='obs-phrase'): + parse_address('John.Doe ', strict=True) + + +# ═══════════════════════════════════════════════════════════════════════════ +# §3.4 Address Specification +# ═══════════════════════════════════════════════════════════════════════════ + +class TestAddressSpecification: + """Tests for RFC 5322 §3.4 — Address Specification.""" + + # ── mailbox ──────────────────────────────────────────────────────── + + def test_mailbox_simple_addr_spec(self): + """Bare addr-spec is a valid mailbox.""" + r = parse_address('user@example.com') + assert r.local_part == 'user' + assert r.domain == 'example.com' + assert r.display_name is None + + def test_mailbox_name_addr(self): + """name-addr form of mailbox.""" + r = parse_address('John Doe ') + assert r.display_name == 'John Doe' + assert r.local_part == 'user' + assert r.domain == 'example.com' + + def test_mailbox_name_addr_no_space_before_angle(self): + """name-addr without space before '<'.""" + r = parse_address('John Doe') + assert r.display_name == 'John Doe' + assert r.local_part == 'user' + + def test_mailbox_angle_addr_only(self): + """Bare angle-addr (no display-name).""" + r = parse_address('') + assert r.display_name is None + assert r.local_part == 'user' + assert r.domain == 'example.com' + + def test_mailbox_quoted_display_name(self): + """Display-name as a quoted-string.""" + r = parse_address('"John Doe" ') + assert r.display_name == 'John Doe' + + def test_name_addr_cfws_around_angle(self): + """CFWS inside angle brackets.""" + r = parse_address('John < (c1) user (c2) @ (c3) example.com (c4) >') + assert r.display_name == 'John' + assert r.local_part == 'user' + assert r.domain == 'example.com' + + # ── group ────────────────────────────────────────────────────────── + + def test_group_empty(self): + """Empty group (undisclosed recipients).""" + r = parse_address('undisclosed-recipients:;') + assert r.is_group + assert r.display_name == 'undisclosed-recipients' + assert r.group_mailboxes == [] + + def test_group_single_mailbox(self): + """Group with a single mailbox.""" + r = parse_address('Friends: alice@example.com;') + assert r.is_group + assert r.display_name == 'Friends' + assert len(r.group_mailboxes) == 1 + assert r.group_mailboxes[0].local_part == 'alice' + + def test_group_multiple_mailboxes(self): + """Group with multiple mailboxes.""" + r = parse_address('Team: alice@a.com, bob@b.com, carol@c.com;') + assert r.is_group + assert r.display_name == 'Team' + assert len(r.group_mailboxes) == 3 + + def test_group_multiple_mailboxes_with_names(self): + """Group mailboxes can have display names.""" + r = parse_address('Team: Alice , Bob ;') + assert r.is_group + assert len(r.group_mailboxes) == 2 + assert r.group_mailboxes[0].display_name == 'Alice' + assert r.group_mailboxes[1].display_name == 'Bob' + + def test_group_cfws_after_colon(self): + """CFWS between colon and group-list.""" + r = parse_address('Team: (comment) alice@a.com;') + assert r.is_group + assert len(r.group_mailboxes) == 1 + + def test_group_cfws_before_semicolon(self): + """CFWS before closing semicolon.""" + r = parse_address('Team: alice@a.com (comment);') + assert r.is_group + assert len(r.group_mailboxes) == 1 + + def test_group_not_closed_rejected(self): + """Group without closing semicolon is rejected.""" + with pytest.raises(AddressParserError): + parse_address('Team: alice@a.com') + + def test_group_missing_colon_rejected(self): + """Group missing colon is parsed as addr-spec instead.""" + with pytest.raises(AddressParserError): + parse_address('Team alice@a.com;') + + # ── address-list ─────────────────────────────────────────────────── + + def test_address_list_two_simple(self): + """Two simple addresses in a list.""" + r = parse_address_list('alice@a.com, bob@b.com') + assert len(r) == 2 + + def test_address_list_mixed_types(self): + """List containing both bare addr-spec and name-addr.""" + r = parse_address_list('user@host.com, John ') + assert len(r) == 2 + assert r[0].display_name is None + assert r[1].display_name == 'John' + + def test_address_list_with_group(self): + """Address list can contain group addresses.""" + r = parse_address_list('alice@a.com, Group: bob@b.com, carol@c.com;') + assert len(r) == 2 + assert not r[0].is_group + assert r[1].is_group + + def test_address_list_single_address(self): + """Address list with a single address.""" + r = parse_address_list('user@example.com') + assert len(r) == 1 + + def test_address_list_with_cfws(self): + """CFWS around commas in address list.""" + r = parse_address_list('a@b.c (c1) , (c2) d@e.f') + assert len(r) == 2 + + # ── mailbox-list ─────────────────────────────────────────────────── + + def test_mailbox_list_simple(self): + """Simple mailbox list.""" + r = parse_mailbox_list('alice@a.com, bob@b.com') + assert len(r) == 2 + assert not r[0].is_group + assert not r[1].is_group + + def test_mailbox_list_rejects_group_strict(self): + """mailbox-list rejects group construct (groups not mailboxes).""" + with pytest.raises(AddressParserError): + parse_mailbox_list('Group: alice@a.com;') + + +# ═══════════════════════════════════════════════════════════════════════════ +# §3.4.1 Addr-Spec Specification +# ═══════════════════════════════════════════════════════════════════════════ + +class TestAddrSpec: + """Tests for RFC 5322 §3.4.1 — Addr-Spec Specification.""" + + # ── local-part ───────────────────────────────────────────────────── + + def test_local_part_dot_atom(self): + """local-part as dot-atom.""" + r = parse_address('john.doe@example.com') + assert r.local_part == 'john.doe' + + def test_local_part_quoted_string(self): + """local-part as quoted-string.""" + r = parse_address('"john doe"@example.com') + assert r.local_part == 'john doe' + + def test_local_part_quoted_string_with_escape(self): + """quoted-string local-part with quoted-pair.""" + r = parse_address('"john \\"doe\\""@example.com') + assert r.local_part == 'john "doe"' + + def test_local_part_case_preserved(self): + """local-part case is preserved.""" + r = parse_address('John.Doe@Example.COM') + assert r.local_part == 'John.Doe' + + # ── domain ───────────────────────────────────────────────────────── + + def test_domain_dot_atom(self): + """Domain as dot-atom.""" + r = parse_address('user@example.com') + assert r.domain == 'example.com' + + def test_domain_subdomains(self): + """Domain with multiple subdomains.""" + r = parse_address('user@mail.eng.example.com') + assert r.domain == 'mail.eng.example.com' + + def test_domain_literal_ipv4(self): + """Domain-literal with IPv4 address.""" + r = parse_address('user@[127.0.0.1]') + assert r.domain == '[127.0.0.1]' + + def test_domain_literal_ipv6(self): + """Domain-literal with IPv6 address.""" + r = parse_address('user@[IPv6:2001:db8::1]') + assert r.domain == '[IPv6:2001:db8::1]' + + def test_domain_literal_with_cfws(self): + """Domain-literal with CFWS inside brackets.""" + r = parse_address('user@[ (comment) 127.0.0.1 ]') + assert r.domain == '[127.0.0.1]' + + def test_domain_literal_tag(self): + """Domain-literal with a tag.""" + r = parse_address('user@[some-tag]') + assert r.domain == '[some-tag]' + + # ── addr-spec full ───────────────────────────────────────────────── + + def test_addr_spec_minimal(self): + """Minimal valid addr-spec.""" + r = parse_address('a@b.c') + assert r.local_part == 'a' + assert r.domain == 'b.c' + assert r.display_name is None + + def test_addr_spec_common_form(self): + """Common email address form.""" + r = parse_address('first.last@example.com') + assert r.local_part == 'first.last' + assert r.domain == 'example.com' + + def test_addr_spec_missing_at_rejected(self): + """Missing @ rejected.""" + with pytest.raises(AddressParserError): + parse_address('noat.example.com') + + def test_addr_spec_missing_domain_rejected(self): + """Missing domain after @.""" + with pytest.raises(AddressParserError): + parse_address('user@') + + def test_addr_spec_double_at_rejected(self): + """Double @ rejected.""" + with pytest.raises(AddressParserError): + parse_address('user@host@extra.com') + + def test_addr_spec_only_at_rejected(self): + """Only @ with no local-part.""" + with pytest.raises(AddressParserError): + parse_address('@example.com') + + +# ═══════════════════════════════════════════════════════════════════════════ +# §4.4 Obsolete Addressing (strict=False) +# ═══════════════════════════════════════════════════════════════════════════ + +class TestObsoleteAddressing: + """Tests for RFC 5322 §4.4 — Obsolete Addressing (permissive mode).""" + + @pytest.fixture(autouse=True) + def setup(self): + self.p = AddressParser(strict=False) + self.ps = AddressParser(strict=True) + + # ── obs-angle-addr / obs-route ───────────────────────────────────── + + def test_obs_angle_addr_single_hop(self): + """Source route with one relay.""" + r = self.p.parse('<@relay.com:user@final.com>') + assert r.local_part == 'user' + assert r.domain == 'final.com' + + def test_obs_angle_addr_multi_hop(self): + """Source route with multiple relays.""" + r = self.p.parse('<@hosta.int,@hostb.int:user@example.com>') + assert r.local_part == 'user' + assert r.domain == 'example.com' + + def test_obs_angle_addr_with_display_name(self): + """Source route with display name.""" + r = self.p.parse('John Doe <@relay.com:user@final.com>') + assert r.display_name == 'John Doe' + assert r.local_part == 'user' + assert r.domain == 'final.com' + + def test_obs_angle_addr_rejected_strict(self): + """obs-angle-addr rejected in strict mode.""" + with pytest.raises(AddressParserError): + self.ps.parse('<@relay.com:user@final.com>') + + # ── obs-local-part ───────────────────────────────────────────────── + + def test_obs_local_part_with_dots(self): + """obs-local-part allows word *('.' word).""" + r = self.p.parse('"hello"."world"@example.com') + assert r.local_part == 'hello.world' + + def test_obs_local_part_atom_and_quoted(self): + """obs-local-part mixing atoms and quoted strings.""" + r = self.p.parse('hello."world"@example.com') + assert r.local_part == 'hello.world' + + # ── obs-domain ───────────────────────────────────────────────────── + + def test_obs_domain_atoms(self): + """obs-domain: atom *('.' atom) (non-strict fallback).""" + r = self.p.parse('user@domain.com') + assert r.domain == 'domain.com' + + # ── obs-mbox-list ────────────────────────────────────────────────── + + def test_obs_mbox_list_trailing_comma(self): + """obs-mbox-list allows trailing comma.""" + r = self.p.parse_mailbox_list('alice@a.com, bob@b.com,') + assert len(r) == 2 + + def test_obs_mbox_list_leading_comma(self): + """obs-mbox-list allows leading comma.""" + r = self.p.parse_mailbox_list(', alice@a.com, bob@b.com') + assert len(r) == 2 + + def test_obs_mbox_list_empty_elements(self): + """obs-mbox-list allows empty elements (,,).""" + r = self.p.parse_mailbox_list('alice@a.com,, bob@b.com') + assert len(r) == 2 + + # ── obs-addr-list ────────────────────────────────────────────────── + + def test_obs_addr_list_trailing_comma(self): + """obs-addr-list allows trailing comma.""" + r = self.p.parse_address_list('alice@a.com, bob@b.com,') + assert len(r) == 2 + + def test_obs_addr_list_leading_comma(self): + """obs-addr-list allows leading comma.""" + r = self.p.parse_address_list(', alice@a.com') + assert len(r) == 1 + + # ── obs-group-list ───────────────────────────────────────────────── + + def test_obs_group_list_empty_with_commas(self): + """obs-group-list: commas only between : and ;.""" + r = self.p.parse('EmptyGroup:,,;') + assert r.is_group + assert r.group_mailboxes == [] + + +# ═══════════════════════════════════════════════════════════════════════════ +# RFC 5322 Appendix A — Example Messages (Addressing Examples) +# ═══════════════════════════════════════════════════════════════════════════ + +class TestAppendixA: + """Tests drawn from RFC 5322 Appendix A addressing examples.""" + + def test_simple_from_example(self): + """A.1.1 — Simple addressing: From: John Doe """ + r = parse_address('John Doe ') + assert r.display_name == 'John Doe' + assert r.local_part == 'jdoe' + assert r.domain == 'machine.example' + + def test_mailbox_types_example(self): + """A.1.2 — Different types of mailboxes.""" + r = parse_address('jdoe@machine.example') + assert r.local_part == 'jdoe' + assert r.domain == 'machine.example' + + def test_group_example(self): + """A.1.3 — Group address: A Group:Chris Jones ,...""" + r = parse_address( + 'A Group:Chris Jones ,' + 'john@b.test, John ;' + ) + assert r.is_group + assert r.display_name == 'A Group' + assert len(r.group_mailboxes) == 3 + assert r.group_mailboxes[0].display_name == 'Chris Jones' + assert r.group_mailboxes[2].display_name == 'John' + + def test_empty_group_example(self): + """A.1.3 — Empty group: undisclosed-recipients:;""" + r = parse_address('undisclosed-recipients:;') + assert r.is_group + assert r.display_name == 'undisclosed-recipients' + assert r.group_mailboxes == [] + + def test_white_space_oddities_example(self): + """A.5 — White space and comments oddities.""" + r = parse_address('< (comment) user@example.com (another) >') + assert r.local_part == 'user' + assert r.domain == 'example.com' + + +# ═══════════════════════════════════════════════════════════════════════════ +# Error handling / edge cases +# ═══════════════════════════════════════════════════════════════════════════ + +class TestErrorHandling: + """Tests for error conditions and edge cases.""" + + def test_empty_string_rejected(self): + """Empty input raises error.""" + with pytest.raises(AddressParserError): + parse_address('') + + def test_whitespace_only_rejected(self): + """Whitespace-only input raises error.""" + with pytest.raises(AddressParserError): + parse_address(' ') + + def test_line_too_long_rejected(self): + """Input exceeding 998 characters rejected.""" + long_input = 'a' * 999 + '@b.com' + with pytest.raises(AddressParserError, match='exceeds'): + parse_address(long_input) + + def test_trailing_garbage_rejected(self): + """Content after the address is rejected.""" + with pytest.raises(AddressParserError, match='trailing'): + parse_address('user@example.com GARBAGE') + + def test_unclosed_angle_bracket(self): + """Missing closing '>' rejected.""" + with pytest.raises(AddressParserError): + parse_address('' without opening '<'.""" + with pytest.raises(AddressParserError): + parse_address('user@example.com>') + + def test_address_parser_error_attributes(self): + """AddressParserError has pos and context attributes.""" + try: + parse_address('not(valid') + except AddressParserError as e: + assert hasattr(e, 'pos') + assert hasattr(e, 'context') + assert isinstance(e.pos, int) + assert isinstance(e.context, str) + + def test_parse_address_list_empty_string(self): + """Empty address list raises error.""" + with pytest.raises(AddressParserError): + parse_address_list('') + + def test_parse_mailbox_list_empty_string(self): + """Empty mailbox list raises error.""" + with pytest.raises(AddressParserError): + parse_mailbox_list('') + + def test_max_line_length_default(self): + """MAX_LINE_LENGTH is 998 (RFC 5322 §2.1.1).""" + assert AddressParser.MAX_LINE_LENGTH == 998 + + +# ═══════════════════════════════════════════════════════════════════════════ +# Strict vs. non-strict mode +# ═══════════════════════════════════════════════════════════════════════════ + +class TestStrictMode: + """Tests for strict (default) vs. permissive (strict=False) behaviour.""" + + def test_default_is_strict(self): + """AddressParser default mode is strict.""" + p = AddressParser() + assert p._strict is True + + def test_permissive_constructor(self): + """AddressParser(strict=False) is permissive.""" + p = AddressParser(strict=False) + assert p._strict is False + + def test_module_function_default_strict(self): + """Module-level parse_address defaults to strict.""" + with pytest.raises(AddressParserError): + parse_address('<@relay.com:user@final.com>') + + def test_module_function_permissive(self): + """Module-level parse_address accepts strict=False.""" + r = parse_address('<@relay.com:user@final.com>', strict=False) + assert r.local_part == 'user' + assert r.domain == 'final.com' + + def test_standard_valid_in_both_modes(self): + """Valid standard addresses parse in both modes.""" + addr = 'user@example.com' + r1 = parse_address(addr, strict=True) + r2 = parse_address(addr, strict=False) + assert r1.local_part == r2.local_part + assert r1.domain == r2.domain + + +# ═══════════════════════════════════════════════════════════════════════════ +# Data model tests +# ═══════════════════════════════════════════════════════════════════════════ + +class TestDataModel: + """Tests for the RFC5322Address dataclass.""" + + def test_repr_simple(self): + """repr for simple address.""" + a = RFC5322Address(local_part='user', domain='example.com') + r = repr(a) + assert 'user' in r + assert 'example.com' in r + assert 'is_group' not in r + + def test_repr_group(self): + """repr for group address.""" + a = RFC5322Address( + display_name='Group', + is_group=True, + group_mailboxes=[RFC5322Address(local_part='u', domain='d')], + ) + r = repr(a) + assert 'Group' in r + assert 'is_group=True' in r + + def test_raw_attribute_set(self): + """raw attribute contains the parsed substring.""" + r = parse_address('John ') + assert r.raw == 'John ' + + def test_strip_raw_helper(self): + """_strip_raw clears the raw attribute.""" + r = parse_address('user@example.com') + s = _strip_raw(r) + assert s.raw == '' + + def test_default_values(self): + """Default values for RFC5322Address.""" + a = RFC5322Address() + assert a.display_name is None + assert a.local_part is None + assert a.domain is None + assert a.is_group is False + assert a.group_mailboxes == [] + + +# ═══════════════════════════════════════════════════════════════════════════ +# Test runner (when executed directly, without pytest) +# ═══════════════════════════════════════════════════════════════════════════ + +if __name__ == '__main__': + import sys + + def run_tests(): + total = 0 + passed = 0 + failed = 0 + errors = [] + + import inspect + current_module = sys.modules[__name__] + test_classes = [] + for name, obj in inspect.getmembers(current_module): + if (inspect.isclass(obj) + and name.startswith('Test') + and obj.__module__ == current_module.__name__): + test_classes.append(obj) + + for cls in test_classes: + print(f'\n{"="*60}') + print(f' {cls.__name__}') + print(f'{"="*60}') + instance = cls() + for tname in sorted(dir(instance)): + if not tname.startswith('test_'): + continue + method = getattr(instance, tname) + total += 1 + try: + if hasattr(instance, 'setup') and callable(instance.setup): + instance.setup() + method() + passed += 1 + print(f' PASS {tname}') + except Exception as e: + failed += 1 + msg = f' FAIL {tname}: {e}' + print(msg) + errors.append(msg) + + print(f'\n{"="*60}') + print(f' Results: {passed}/{total} passed, {failed} failed') + print(f'{"="*60}') + + return failed == 0 + + success = run_tests() + sys.exit(0 if success else 1)