From c5bcece035f5fdd08d07806330a34f67df70bf07 Mon Sep 17 00:00:00 2001 From: Chayashree B R Date: Tue, 26 May 2026 19:39:39 +0530 Subject: [PATCH] Enforce max_frames in TLVParser and LegacyParser Resolve TLV merge conflict Add regression tests for max_frames handling --- sigflow/parsers/legacy.py | 16 +++++++++-- sigflow/parsers/tlv.py | 16 ++++++++++- tests/test_parsers.py | 56 ++++++++++++++++++++++++++++++++++++++- 3 files changed, 84 insertions(+), 4 deletions(-) diff --git a/sigflow/parsers/legacy.py b/sigflow/parsers/legacy.py index 0edc46c..b51b3bb 100644 --- a/sigflow/parsers/legacy.py +++ b/sigflow/parsers/legacy.py @@ -14,16 +14,28 @@ def parse(self, data: bytes, context: ExecutionContext) -> list[Frame]: frames = [] offset = 0 seq = 0 + while offset + LEGACY.size <= len(data): + if len(frames) >= self.max_frames: + context.warn("frame-limit", "frame limit reached", offset) + break + stream_id, length = LEGACY.unpack_from(data, offset) offset += LEGACY.size + if length > self.max_payload: context.warn("legacy-large", "legacy payload exceeded limit", offset) break + payload = self.require(data, offset, length) - frames.append(Frame(stream_id, seq, payload, version=0, offset=offset)) + frames.append( + Frame(stream_id, seq, payload, version=0, offset=offset) + ) + offset += length seq += 1 + if offset != len(data): context.warn("legacy-tail", "ignored trailing legacy bytes", offset) - return frames + + return frames \ No newline at end of file diff --git a/sigflow/parsers/tlv.py b/sigflow/parsers/tlv.py index c05746c..7410875 100644 --- a/sigflow/parsers/tlv.py +++ b/sigflow/parsers/tlv.py @@ -13,17 +13,31 @@ def parse(self, data: bytes, context: ExecutionContext) -> list[Frame]: frames = [] offset = 0 seq = 0 + while offset < len(data): if len(data) - offset < TLV.size: context.warn("tlv-truncated", "partial tlv header", offset) break + + if len(frames) >= self.max_frames: + context.warn("frame-limit", "frame limit reached", offset) + break + tag, length = TLV.unpack_from(data, offset) offset += TLV.size + if length > self.max_payload: context.warn("tlv-large", f"tag {tag} exceeds max payload", offset) break + + if offset + length > len(data): + context.warn("tlv-truncated", "truncated tlv value", offset) + break + value = self.require(data, offset, length) frames.append(Frame(tag, seq, value, metadata={"tag": tag})) + seq += 1 offset += length - return frames + + return frames \ No newline at end of file diff --git a/tests/test_parsers.py b/tests/test_parsers.py index 638db7a..0ceb5bb 100644 --- a/tests/test_parsers.py +++ b/tests/test_parsers.py @@ -1,7 +1,16 @@ +import struct import pytest from sigflow.core.context import ExecutionContext from sigflow.core.exceptions import ParseError from sigflow.parsers.binary import BinaryFrameParser +<<<<<<< HEAD +======= +from sigflow.parsers.tlv import TLVParser +from sigflow.parsers.legacy import LegacyParser + +TLV = struct.Struct(">HH") +LEGACY = struct.Struct(">HI") +>>>>>>> 0d5af8c (Add regression tests for max_frames handling) def test_binary_parser_reads_frames(sample_stream): @@ -12,10 +21,55 @@ def test_binary_parser_reads_frames(sample_stream): def test_binary_parser_reports_truncation(sample_stream): ctx = ExecutionContext({}) frames = BinaryFrameParser().parse(sample_stream[:-2], ctx) + assert frames - assert any(d.code in {"truncated-header", "truncated-payload", "checksum"} for d in ctx.diagnostics) + assert any( + d.code in {"truncated-header", "truncated-payload", "checksum"} + for d in ctx.diagnostics + ) def test_invalid_magic_raises(): with pytest.raises(ParseError): +<<<<<<< HEAD BinaryFrameParser().parse(b"NOPE" + b"0" * 32, ExecutionContext({})) +======= + BinaryFrameParser().parse( + b"NOPE" + b"0" * 32, + ExecutionContext({}) + ) + + +def test_tlv_parser_stops_at_max_frames(): + data = ( + TLV.pack(1, 1) + b"a" + + TLV.pack(2, 1) + b"b" + + TLV.pack(3, 1) + b"c" + ) + + parser = TLVParser() + parser.max_frames = 2 + + ctx = ExecutionContext({}) + frames = parser.parse(data, ctx) + + assert len(frames) == 2 + assert any(d.code == "frame-limit" for d in ctx.diagnostics) + + +def test_legacy_parser_stops_at_max_frames(): + data = ( + LEGACY.pack(1, 1) + b"a" + + LEGACY.pack(2, 1) + b"b" + + LEGACY.pack(3, 1) + b"c" + ) + + parser = LegacyParser() + parser.max_frames = 2 + + ctx = ExecutionContext({}) + frames = parser.parse(data, ctx) + + assert len(frames) == 2 + assert any(d.code == "frame-limit" for d in ctx.diagnostics) +>>>>>>> 0d5af8c (Add regression tests for max_frames handling)