Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions sigflow/parsers/legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,28 @@ def parse(self, data: bytes, context: ExecutionContext) -> list[Frame]:
frames = []
offset = 0
seq = 0

while offset + LEGACY.size <= len(data):
if len(frames) >= self.max_frames:
context.warn("frame-limit", "frame limit reached", offset)
break

stream_id, length = LEGACY.unpack_from(data, offset)
offset += LEGACY.size

if length > self.max_payload:
context.warn("legacy-large", "legacy payload exceeded limit", offset)
break

payload = self.require(data, offset, length)
frames.append(Frame(stream_id, seq, payload, version=0, offset=offset))
frames.append(
Frame(stream_id, seq, payload, version=0, offset=offset)
)

offset += length
seq += 1

if offset != len(data):
context.warn("legacy-tail", "ignored trailing legacy bytes", offset)
return frames

return frames
11 changes: 7 additions & 4 deletions sigflow/parsers/tlv.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,31 @@ def parse(self, data: bytes, context: ExecutionContext) -> list[Frame]:
frames = []
offset = 0
seq = 0

while offset < len(data):
# Need at least the TLV header (tag + length)
if len(data) - offset < TLV.size:
context.warn("tlv-truncated", "partial tlv header", offset)
break

if len(frames) >= self.max_frames:
context.warn("frame-limit", "frame limit reached", offset)
break

tag, length = TLV.unpack_from(data, offset)
offset += TLV.size

if length > self.max_payload:
context.warn("tlv-large", f"tag {tag} exceeds max payload", offset)
break

# If the TLV header is present, but the declared value bytes are truncated,
# downgrade to a recoverable diagnostic (consistent with BinaryFrameParser).
if offset + length > len(data):
context.warn("tlv-truncated", "truncated tlv value", offset)
break

value = self.require(data, offset, length)
frames.append(Frame(tag, seq, value, metadata={"tag": tag}))

seq += 1
offset += length
return frames

return frames
57 changes: 50 additions & 7 deletions tests/test_parsers.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
import struct
import pytest
from sigflow.core.context import ExecutionContext
from sigflow.core.exceptions import ParseError
from sigflow.parsers.binary import BinaryFrameParser
from sigflow.parsers.tlv import TLVParser
from sigflow.parsers.legacy import LegacyParser

TLV = struct.Struct(">HH")
LEGACY = struct.Struct(">HI")


def test_binary_parser_reads_frames(sample_stream):
Expand All @@ -13,23 +18,61 @@ def test_binary_parser_reads_frames(sample_stream):
def test_binary_parser_reports_truncation(sample_stream):
ctx = ExecutionContext({})
frames = BinaryFrameParser().parse(sample_stream[:-2], ctx)

assert frames
assert any(d.code in {"truncated-header", "truncated-payload", "checksum"} for d in ctx.diagnostics)
assert any(
d.code in {"truncated-header", "truncated-payload", "checksum"}
for d in ctx.diagnostics
)


def test_invalid_magic_raises():
with pytest.raises(ParseError):
BinaryFrameParser().parse(b"NOPE" + b"0" * 32, ExecutionContext({}))
BinaryFrameParser().parse(
b"NOPE" + b"0" * 32,
ExecutionContext({})
)


def test_tlv_parser_stops_at_max_frames():
data = (
TLV.pack(1, 1) + b"a" +
TLV.pack(2, 1) + b"b" +
TLV.pack(3, 1) + b"c"
)

parser = TLVParser()
parser.max_frames = 2

ctx = ExecutionContext({})
frames = parser.parse(data, ctx)

assert len(frames) == 2
assert any(d.code == "frame-limit" for d in ctx.diagnostics)


def test_legacy_parser_stops_at_max_frames():
data = (
LEGACY.pack(1, 1) + b"a" +
LEGACY.pack(2, 1) + b"b" +
LEGACY.pack(3, 1) + b"c"
)

parser = LegacyParser()
parser.max_frames = 2

ctx = ExecutionContext({})
frames = parser.parse(data, ctx)

assert len(frames) == 2
assert any(d.code == "frame-limit" for d in ctx.diagnostics)


def test_tlv_truncated_value_is_recoverable():
# TLV format: >HH (tag, length)
# Provide header declaring length=4 but only 2 bytes for value.

data = b"\x00\x01" + b"\x00\x04" + b"AB"
ctx = ExecutionContext({})
frames = TLVParser().parse(data, ctx)

# No ParseError should be raised; parsing should stop and emit a diagnostic.
assert frames == []
assert any(d.code == "tlv-truncated" for d in ctx.diagnostics)

assert any(d.code == "tlv-truncated" for d in ctx.diagnostics)
Loading