Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions sigflow/parsers/tlv.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,27 @@ def parse(self, data: bytes, context: ExecutionContext) -> list[Frame]:
offset = 0
seq = 0
while offset < len(data):
# Need at least the TLV header (tag + length)
if len(data) - offset < TLV.size:
context.warn("tlv-truncated", "partial tlv header", offset)
break

tag, length = TLV.unpack_from(data, offset)
offset += TLV.size

if length > self.max_payload:
context.warn("tlv-large", f"tag {tag} exceeds max payload", offset)
break

# If the TLV header is present, but the declared value bytes are truncated,
# downgrade to a recoverable diagnostic (consistent with BinaryFrameParser).
if offset + length > len(data):
context.warn("tlv-truncated", "truncated tlv value", offset)
break

value = self.require(data, offset, length)
frames.append(Frame(tag, seq, value, metadata={"tag": tag}))
seq += 1
offset += length
return frames

14 changes: 14 additions & 0 deletions tests/test_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from sigflow.core.context import ExecutionContext
from sigflow.core.exceptions import ParseError
from sigflow.parsers.binary import BinaryFrameParser
from sigflow.parsers.tlv import TLVParser


def test_binary_parser_reads_frames(sample_stream):
Expand All @@ -19,3 +20,16 @@ def test_binary_parser_reports_truncation(sample_stream):
def test_invalid_magic_raises():
with pytest.raises(ParseError):
BinaryFrameParser().parse(b"NOPE" + b"0" * 32, ExecutionContext({}))


def test_tlv_truncated_value_is_recoverable():
# TLV format: >HH (tag, length)
# Provide header declaring length=4 but only 2 bytes for value.
data = b"\x00\x01" + b"\x00\x04" + b"AB"
ctx = ExecutionContext({})
frames = TLVParser().parse(data, ctx)

# No ParseError should be raised; parsing should stop and emit a diagnostic.
assert frames == []
assert any(d.code == "tlv-truncated" for d in ctx.diagnostics)

Loading