Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions python_otbr_api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
}


_PASCAL_TO_CAMEL: dict[str, str] = {v: k for k, v in _CAMEL_TO_PASCAL.items()}


def _normalize_keys(data: Any) -> Any:
"""Normalize camelCase JSON keys to PascalCase.

Expand All @@ -59,6 +62,21 @@ def _normalize_keys(data: Any) -> Any:
}


def _to_camel_keys(data: dict) -> dict:
"""Convert PascalCase JSON keys to camelCase for serialization.

The OTBR REST API expects camelCase keys (per the OpenAPI spec in
ot-br-posix). This function converts the internal PascalCase keys to
camelCase. Unknown keys and non-dict values pass through unchanged.
"""
return {
_PASCAL_TO_CAMEL.get(k, k): (
_to_camel_keys(v) if isinstance(v, dict) else v
)
for k, v in data.items()
}


@dataclass
class Timestamp:
"""Timestamp."""
Expand All @@ -84,7 +102,7 @@ def as_json(self) -> dict:
result["Seconds"] = self.seconds
if self.ticks is not None:
result["Ticks"] = self.ticks
return result
return _to_camel_keys(result)

@classmethod
def from_json(cls, json_data: Any) -> Timestamp:
Expand Down Expand Up @@ -151,7 +169,7 @@ def as_json(self) -> dict:
result["Routers"] = self.routers
if self.to_ble_link is not None:
result["TobleLink"] = self.to_ble_link
return result
return _to_camel_keys(result)

@classmethod
def from_json(cls, json_data: Any) -> SecurityPolicy:
Expand Down Expand Up @@ -225,7 +243,7 @@ def as_json(self) -> dict:
result["PSKc"] = self.psk_c
if self.security_policy is not None:
result["SecurityPolicy"] = self.security_policy.as_json()
return result
return _to_camel_keys(result)

@classmethod
def from_json(cls, json_data: Any) -> ActiveDataSet:
Expand Down Expand Up @@ -278,7 +296,7 @@ def as_json(self) -> dict:
result["Delay"] = self.delay
if self.pending_timestamp is not None:
result["PendingTimestamp"] = self.pending_timestamp.as_json()
return result
return _to_camel_keys(result)

@classmethod
def from_json(cls, json_data: Any) -> PendingDataSet:
Expand Down
93 changes: 48 additions & 45 deletions tests/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,31 @@ async def test_get_active_dataset(aioclient_mock: AiohttpClientMocker):
DATASET_JSON["PSKc"],
security_policy,
)
assert active_dataset.as_json() == DATASET_JSON
# as_json() now emits camelCase keys matching the OTBR REST API spec
camel_dataset = {
"activeTimestamp": {"authoritative": False, "seconds": 1, "ticks": 0},
"channelMask": 134215680,
"channel": 15,
"extPanId": "8478E3379E047B92",
"meshLocalPrefix": "fd89:bde7:42ed:a901::/64",
"networkKey": "96271D6ECC78749114AB6A591E0D06F1",
"networkName": "OpenThread HA",
"panId": 33991,
"pskc": "9760C89414D461AC717DCD105EB87E5B",
"securityPolicy": {
"autonomousEnrollment": False,
"commercialCommissioning": False,
"externalCommissioning": True,
"nativeCommissioning": True,
"networkKeyProvisioning": False,
"nonCcmRouters": False,
"obtainNetworkKey": True,
"rotationTime": 672,
"routers": True,
"tobleLink": True,
},
}
assert active_dataset.as_json() == camel_dataset


async def test_get_active_dataset_empty(aioclient_mock: AiohttpClientMocker):
Expand Down Expand Up @@ -245,7 +269,7 @@ async def test_create_active_dataset(aioclient_mock: AiohttpClientMocker):
assert aioclient_mock.call_count == 2
assert aioclient_mock.mock_calls[-1][0] == "PUT"
assert aioclient_mock.mock_calls[-1][1].path == "/node/dataset/active"
assert aioclient_mock.mock_calls[-1][2] == {"NetworkName": "OpenThread HA"}
assert aioclient_mock.mock_calls[-1][2] == {"networkName": "OpenThread HA"}

await otbr.create_active_dataset(
python_otbr_api.ActiveDataSet(network_name="OpenThread HA", channel=15)
Expand All @@ -254,8 +278,8 @@ async def test_create_active_dataset(aioclient_mock: AiohttpClientMocker):
assert aioclient_mock.mock_calls[-1][0] == "PUT"
assert aioclient_mock.mock_calls[-1][1].path == "/node/dataset/active"
assert aioclient_mock.mock_calls[-1][2] == {
"NetworkName": "OpenThread HA",
"Channel": 15,
"networkName": "OpenThread HA",
"channel": 15,
}


Expand Down Expand Up @@ -295,11 +319,11 @@ async def test_create_pending_dataset(aioclient_mock: AiohttpClientMocker):
assert aioclient_mock.mock_calls[-1][0] == "PUT"
assert aioclient_mock.mock_calls[-1][1].path == "/node/dataset/pending"
assert aioclient_mock.mock_calls[-1][2] == {
"ActiveDataset": {
"NetworkName": "OpenThread HA",
"activeDataset": {
"networkName": "OpenThread HA",
},
"Delay": 12345,
"PendingTimestamp": {},
"delay": 12345,
"pendingTimestamp": {},
}

await otbr.create_pending_dataset(
Expand All @@ -312,11 +336,11 @@ async def test_create_pending_dataset(aioclient_mock: AiohttpClientMocker):
assert aioclient_mock.mock_calls[-1][0] == "PUT"
assert aioclient_mock.mock_calls[-1][1].path == "/node/dataset/pending"
assert aioclient_mock.mock_calls[-1][2] == {
"ActiveDataset": {
"Channel": 15,
"NetworkName": "OpenThread HA",
"activeDataset": {
"channel": 15,
"networkName": "OpenThread HA",
},
"Delay": 23456,
"delay": 23456,
}


Expand All @@ -340,24 +364,17 @@ async def test_set_channel(aioclient_mock: AiohttpClientMocker) -> None:
aioclient_mock.get(f"{BASE_URL}/node/dataset/active", json=DATASET_JSON)
aioclient_mock.put(f"{BASE_URL}/node/dataset/pending", status=HTTPStatus.CREATED)
new_channel = 16
expected_active_timestamp = DATASET_JSON["ActiveTimestamp"] | {"Seconds": 2}
expected_pending_dataset = {
"ActiveDataset": DATASET_JSON
| {
"ActiveTimestamp": expected_active_timestamp,
"Channel": new_channel,
},
"Delay": 1234,
}

assert new_channel != DATASET_JSON["Channel"]
await otbr.set_channel(new_channel, 1234)
assert aioclient_mock.call_count == 2
assert aioclient_mock.mock_calls[0][0] == "GET"
assert aioclient_mock.mock_calls[0][1].path == "/node/dataset/active"
assert aioclient_mock.mock_calls[1][0] == "PUT"
assert aioclient_mock.mock_calls[1][1].path == "/node/dataset/pending"
assert aioclient_mock.mock_calls[1][2] == expected_pending_dataset
pending = aioclient_mock.mock_calls[1][2]
assert pending["delay"] == 1234
assert pending["activeDataset"]["channel"] == new_channel
assert pending["activeDataset"]["activeTimestamp"]["seconds"] == 2


async def test_set_channel_default_delay(aioclient_mock: AiohttpClientMocker) -> None:
Expand All @@ -367,24 +384,17 @@ async def test_set_channel_default_delay(aioclient_mock: AiohttpClientMocker) ->
aioclient_mock.get(f"{BASE_URL}/node/dataset/active", json=DATASET_JSON)
aioclient_mock.put(f"{BASE_URL}/node/dataset/pending", status=HTTPStatus.CREATED)
new_channel = 16
expected_active_timestamp = DATASET_JSON["ActiveTimestamp"] | {"Seconds": 2}
expected_pending_dataset = {
"ActiveDataset": DATASET_JSON
| {
"ActiveTimestamp": expected_active_timestamp,
"Channel": new_channel,
},
"Delay": 300000,
}

assert new_channel != DATASET_JSON["Channel"]
await otbr.set_channel(new_channel)
assert aioclient_mock.call_count == 2
assert aioclient_mock.mock_calls[0][0] == "GET"
assert aioclient_mock.mock_calls[0][1].path == "/node/dataset/active"
assert aioclient_mock.mock_calls[1][0] == "PUT"
assert aioclient_mock.mock_calls[1][1].path == "/node/dataset/pending"
assert aioclient_mock.mock_calls[1][2] == expected_pending_dataset
pending = aioclient_mock.mock_calls[1][2]
assert pending["delay"] == 300000
assert pending["activeDataset"]["channel"] == new_channel
assert pending["activeDataset"]["activeTimestamp"]["seconds"] == 2


async def test_set_channel_no_timestamp(aioclient_mock: AiohttpClientMocker) -> None:
Expand All @@ -397,24 +407,17 @@ async def test_set_channel_no_timestamp(aioclient_mock: AiohttpClientMocker) ->
aioclient_mock.get(f"{BASE_URL}/node/dataset/active", json=dataset_json)
aioclient_mock.put(f"{BASE_URL}/node/dataset/pending", status=HTTPStatus.CREATED)
new_channel = 16
expected_active_timestamp = {"Authoritative": False, "Seconds": 1, "Ticks": 0}
expected_pending_dataset = {
"ActiveDataset": DATASET_JSON
| {
"ActiveTimestamp": expected_active_timestamp,
"Channel": new_channel,
},
"Delay": 300000,
}

assert new_channel != DATASET_JSON["Channel"]
await otbr.set_channel(new_channel)
assert aioclient_mock.call_count == 2
assert aioclient_mock.mock_calls[0][0] == "GET"
assert aioclient_mock.mock_calls[0][1].path == "/node/dataset/active"
assert aioclient_mock.mock_calls[1][0] == "PUT"
assert aioclient_mock.mock_calls[1][1].path == "/node/dataset/pending"
assert aioclient_mock.mock_calls[1][2] == expected_pending_dataset
pending = aioclient_mock.mock_calls[1][2]
assert pending["delay"] == 300000
assert pending["activeDataset"]["channel"] == new_channel
assert pending["activeDataset"]["activeTimestamp"]["seconds"] == 1


async def test_set_channel_invalid_channel(aioclient_mock: AiohttpClientMocker) -> None:
Expand Down
60 changes: 60 additions & 0 deletions tests/test_models.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Test data models."""

import python_otbr_api
from python_otbr_api.models import SecurityPolicy


def test_deserialize_pending_dataset():
Expand Down Expand Up @@ -66,3 +67,62 @@ def test_deserialize_pending_dataset_camelcase():
12345,
python_otbr_api.Timestamp(),
)


def test_serialize_active_dataset_camelcase():
"""Test that as_json() emits camelCase keys matching the OTBR REST API."""
dataset = python_otbr_api.ActiveDataSet(
active_timestamp=python_otbr_api.Timestamp(
authoritative=False, seconds=1, ticks=0
),
network_key="00112233445566778899aabbccddeeff",
network_name="OpenThread-1234",
extended_pan_id="dead00beef00cafe",
mesh_local_prefix="fd11:2222:3333::/64",
pan_id=12345,
channel=15,
psk_c="aabbccddeeff00112233445566778899",
security_policy=SecurityPolicy(
rotation_time=672, obtain_network_key=True, routers=True
),
channel_mask=134215680,
)
result = dataset.as_json()
# All top-level keys must be camelCase
assert "activeTimestamp" in result
assert "networkKey" in result
assert "networkName" in result
assert "extPanId" in result
assert "meshLocalPrefix" in result
assert "panId" in result
assert "channel" in result
assert "pskc" in result
assert "securityPolicy" in result
assert "channelMask" in result
# Nested keys must also be camelCase
assert "seconds" in result["activeTimestamp"]
assert "rotationTime" in result["securityPolicy"]
assert "obtainNetworkKey" in result["securityPolicy"]


def test_serialize_pending_dataset_camelcase():
"""Test that PendingDataSet.as_json() emits camelCase keys."""
pending = python_otbr_api.PendingDataSet(
active_dataset=python_otbr_api.ActiveDataSet(network_name="OpenThread HA"),
delay=30000,
pending_timestamp=python_otbr_api.Timestamp(seconds=2, ticks=0),
)
result = pending.as_json()
assert "activeDataset" in result
assert "delay" in result
assert "pendingTimestamp" in result
assert "networkName" in result["activeDataset"]


def test_roundtrip_camelcase():
"""Test that from_json(as_json(x)) preserves data."""
original = python_otbr_api.ActiveDataSet(
network_name="Test", channel=15, pan_id=4660
)
roundtripped = python_otbr_api.ActiveDataSet.from_json(original.as_json())
assert roundtripped == original