diff --git a/sigflow/parsers/legacy.py b/sigflow/parsers/legacy.py index 0edc46c..b51b3bb 100644 --- a/sigflow/parsers/legacy.py +++ b/sigflow/parsers/legacy.py @@ -14,16 +14,28 @@ def parse(self, data: bytes, context: ExecutionContext) -> list[Frame]: frames = [] offset = 0 seq = 0 + while offset + LEGACY.size <= len(data): + if len(frames) >= self.max_frames: + context.warn("frame-limit", "frame limit reached", offset) + break + stream_id, length = LEGACY.unpack_from(data, offset) offset += LEGACY.size + if length > self.max_payload: context.warn("legacy-large", "legacy payload exceeded limit", offset) break + payload = self.require(data, offset, length) - frames.append(Frame(stream_id, seq, payload, version=0, offset=offset)) + frames.append( + Frame(stream_id, seq, payload, version=0, offset=offset) + ) + offset += length seq += 1 + if offset != len(data): context.warn("legacy-tail", "ignored trailing legacy bytes", offset) - return frames + + return frames \ No newline at end of file diff --git a/sigflow/parsers/tlv.py b/sigflow/parsers/tlv.py index ae00549..7410875 100644 --- a/sigflow/parsers/tlv.py +++ b/sigflow/parsers/tlv.py @@ -13,12 +13,16 @@ def parse(self, data: bytes, context: ExecutionContext) -> list[Frame]: frames = [] offset = 0 seq = 0 + while offset < len(data): - # Need at least the TLV header (tag + length) if len(data) - offset < TLV.size: context.warn("tlv-truncated", "partial tlv header", offset) break + if len(frames) >= self.max_frames: + context.warn("frame-limit", "frame limit reached", offset) + break + tag, length = TLV.unpack_from(data, offset) offset += TLV.size @@ -26,15 +30,14 @@ def parse(self, data: bytes, context: ExecutionContext) -> list[Frame]: context.warn("tlv-large", f"tag {tag} exceeds max payload", offset) break - # If the TLV header is present, but the declared value bytes are truncated, - # downgrade to a recoverable diagnostic (consistent with BinaryFrameParser). if offset + length > len(data): context.warn("tlv-truncated", "truncated tlv value", offset) break value = self.require(data, offset, length) frames.append(Frame(tag, seq, value, metadata={"tag": tag})) + seq += 1 offset += length - return frames + return frames \ No newline at end of file diff --git a/tests/test_parsers.py b/tests/test_parsers.py index 68c0e50..189acbe 100644 --- a/tests/test_parsers.py +++ b/tests/test_parsers.py @@ -1,8 +1,13 @@ +import struct import pytest from sigflow.core.context import ExecutionContext from sigflow.core.exceptions import ParseError from sigflow.parsers.binary import BinaryFrameParser from sigflow.parsers.tlv import TLVParser +from sigflow.parsers.legacy import LegacyParser + +TLV = struct.Struct(">HH") +LEGACY = struct.Struct(">HI") def test_binary_parser_reads_frames(sample_stream): @@ -13,23 +18,61 @@ def test_binary_parser_reads_frames(sample_stream): def test_binary_parser_reports_truncation(sample_stream): ctx = ExecutionContext({}) frames = BinaryFrameParser().parse(sample_stream[:-2], ctx) + assert frames - assert any(d.code in {"truncated-header", "truncated-payload", "checksum"} for d in ctx.diagnostics) + assert any( + d.code in {"truncated-header", "truncated-payload", "checksum"} + for d in ctx.diagnostics + ) def test_invalid_magic_raises(): with pytest.raises(ParseError): - BinaryFrameParser().parse(b"NOPE" + b"0" * 32, ExecutionContext({})) + BinaryFrameParser().parse( + b"NOPE" + b"0" * 32, + ExecutionContext({}) + ) + + +def test_tlv_parser_stops_at_max_frames(): + data = ( + TLV.pack(1, 1) + b"a" + + TLV.pack(2, 1) + b"b" + + TLV.pack(3, 1) + b"c" + ) + + parser = TLVParser() + parser.max_frames = 2 + + ctx = ExecutionContext({}) + frames = parser.parse(data, ctx) + + assert len(frames) == 2 + assert any(d.code == "frame-limit" for d in ctx.diagnostics) + + +def test_legacy_parser_stops_at_max_frames(): + data = ( + LEGACY.pack(1, 1) + b"a" + + LEGACY.pack(2, 1) + b"b" + + LEGACY.pack(3, 1) + b"c" + ) + + parser = LegacyParser() + parser.max_frames = 2 + + ctx = ExecutionContext({}) + frames = parser.parse(data, ctx) + + assert len(frames) == 2 + assert any(d.code == "frame-limit" for d in ctx.diagnostics) def test_tlv_truncated_value_is_recoverable(): - # TLV format: >HH (tag, length) - # Provide header declaring length=4 but only 2 bytes for value. + data = b"\x00\x01" + b"\x00\x04" + b"AB" ctx = ExecutionContext({}) frames = TLVParser().parse(data, ctx) - # No ParseError should be raised; parsing should stop and emit a diagnostic. assert frames == [] - assert any(d.code == "tlv-truncated" for d in ctx.diagnostics) - + assert any(d.code == "tlv-truncated" for d in ctx.diagnostics) \ No newline at end of file