From 4dc20234f190ea339b71ca7c4a3256f9a12ded62 Mon Sep 17 00:00:00 2001 From: "Richard T. Carback III" Date: Wed, 24 Jun 2026 09:14:24 -0400 Subject: [PATCH 1/5] feat(client): per-topology difficulty + mineable-topology query/decode helpers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - scale_codec: add _decode_h256_vec (compact-u32 length + N×32-byte hashes) - client: query_difficulty now takes optional topology_hash; resolves DefaultTopology when None and queries QuantumPow.Difficulties map - client: add query_mineable_topologies (QuantumPowApi_mineable_topologies) - client: add query_difficulty_for (QuantumPowApi_difficulty_for, Option-decode) - client: add add_mineable_topology / remove_mineable_topology extrinsic helpers - pool_client / pool: wire up new read ops as idempotent ops - tests: update query_difficulty callers for new signature; add unit tests for _decode_h256_vec and monkeypatched query_mineable_topologies / query_difficulty_for --- substrate/client.py | 302 +++++++++++++++++++++++++----- substrate/pool.py | 57 +++--- substrate/pool_client.py | 15 +- substrate/scale_codec.py | 87 +++++---- tests/test_pool_client.py | 16 +- tests/test_scale_codec_helpers.py | 205 ++++++++++++++++++++ tests/test_substrate_client.py | 31 +-- 7 files changed, 591 insertions(+), 122 deletions(-) create mode 100644 tests/test_scale_codec_helpers.py diff --git a/substrate/client.py b/substrate/client.py index d57bf6c3..95edcc12 100644 --- a/substrate/client.py +++ b/substrate/client.py @@ -47,6 +47,7 @@ (`module_id` / `pallet_name`, etc.) so a future renaming on the chain-side surfaces as "no match" rather than a crash. """ + from __future__ import annotations import asyncio @@ -62,6 +63,7 @@ from shared.hybrid_signer import HybridSigner from shared.logging_config import get_logger + # Aliased: `topology_hash` is also a local param/field name throughout this # module, so import the canonical-hash function under a distinct name. from shared.topology_hash import topology_hash as compute_topology_hash @@ -89,6 +91,7 @@ _build_hybrid_signed_extrinsic, _decode_account_id, _decode_difficulty_config, + _decode_h256_vec, _decode_hash, _decode_job_mode, _decode_mining_snapshot, @@ -170,8 +173,11 @@ def matches(self) -> bool: def _count_map_keys( - iface, storage_module: str, storage_function: str, - *, page_size: int = _KEYS_PAGE_SIZE, + iface, + storage_module: str, + storage_function: str, + *, + page_size: int = _KEYS_PAGE_SIZE, ) -> int: """Count the keys of a storage map via ``state_getKeysPaged``. @@ -180,7 +186,8 @@ def _count_map_keys( in ``_run``. Returns ``0`` for an empty map. """ prefix = iface.generate_storage_hash( - storage_module=storage_module, storage_function=storage_function, + storage_module=storage_module, + storage_function=storage_function, ) total = 0 start = None @@ -342,11 +349,14 @@ async def ensure_connected(self) -> bool: await self.connect() return False try: - await self._raw_run( - lambda: self._iface.rpc_request("system_health", []) - ) + await self._raw_run(lambda: self._iface.rpc_request("system_health", [])) return True - except (WebSocketException, ConnectionError, OSError, json.JSONDecodeError) as exc: + except ( + WebSocketException, + ConnectionError, + OSError, + json.JSONDecodeError, + ) as exc: logger.warning( "ensure_connected: health probe failed on %s (%s: %s); " "reconnecting before submit", @@ -393,12 +403,14 @@ async def has_call(self, module: str, function: str) -> bool: any exception as "not present" so an unreachable metadata cache falls back rather than crashing the caller. """ + def _probe() -> bool: try: self._iface.get_metadata_call_function(module, function) return True except Exception: return False + return await self._run(_probe) async def has_storage(self, module: str, function: str) -> bool: @@ -409,11 +421,16 @@ async def has_storage(self, module: str, function: str) -> bool: "not present". Used to pick the right winning-solution counter across the ``WinningSolutions``→``QBlockCount`` rename (quip-protocol-rs MR !35). """ + def _probe() -> bool: try: - return self._iface.get_metadata_storage_function(module, function) is not None + return ( + self._iface.get_metadata_storage_function(module, function) + is not None + ) except Exception: return False + return await self._run(_probe) async def get_finalized_head(self) -> bytes: @@ -551,7 +568,8 @@ async def resolve_topology_binding( """ head = await self.get_head() snapshot = await self.get_mining_snapshot( - at=head, miner_account_bytes=miner_account_bytes, + at=head, + miner_account_bytes=miner_account_bytes, ) if snapshot is None: raise NoRegisteredTopology("chain has no registered topology") @@ -602,19 +620,45 @@ async def query_proofs_submitted(self, account: bytes) -> Optional[int]: return None return info.proofs_submitted - async def query_difficulty(self) -> Optional[SubstrateDifficulty]: - """Return the raw `QuantumPow.Difficulty` storage value. + async def query_difficulty( + self, topology_hash: Optional[bytes] = None + ) -> Optional[SubstrateDifficulty]: + """Return the raw ``QuantumPow.Difficulties[topology_hash]`` map entry. + + This is the *post-adjust baseline* — the value recorded after the most + recent winning proof for the given topology, **without** decay applied. + Use :meth:`query_current_difficulty` for the live threshold a fresh + proof would have to clear right now, or :meth:`query_difficulty_for` + for the decayed value from the runtime API. + + When ``topology_hash`` is ``None``, the chain's ``DefaultTopology`` + storage value is read first; if that is also absent the method returns + ``None`` without querying the map. - This is the *post-adjust baseline* — the value set after the most - recent winning proof, **without** decay applied. Use - :meth:`query_current_difficulty` for the live threshold a fresh - proof would have to clear right now. + Suitable for "is the chain seeded?" checks and historical comparisons; + not suitable for deciding what difficulty a miner currently faces. - Suitable for "is the chain seeded?" checks and historical - comparisons; not suitable for deciding what difficulty a miner - currently faces. + Args: + topology_hash: 32-byte topology hash to look up, or ``None`` to + resolve from ``QuantumPow.DefaultTopology``. + + Returns: + The stored difficulty config, or ``None`` if the map entry is + absent or no default topology has been set. """ - result = await self._run(lambda: self._iface.query("QuantumPow", "Difficulty")) + if topology_hash is None: + default_result = await self._run( + lambda: self._iface.query("QuantumPow", "DefaultTopology") + ) + resolved = _storage_value(default_result, check_found=True) + if resolved is None: + return None + topology_hash = _decode_hash(resolved) + result = await self._run( + lambda: self._iface.query( + "QuantumPow", "Difficulties", ["0x" + topology_hash.hex()] + ) + ) v = _storage_value(result, check_found=True) if v is None: return None @@ -643,19 +687,174 @@ async def query_current_difficulty( encoded = await self._state_call( "QuantumPowApi_current_difficulty", "0x", block_hash ) + if encoded is None: + raise RuntimeError("state_call current_difficulty returned no result") + data = ScaleBytes(encoded) + difficulty = _decode_difficulty_config(data) + if data.get_remaining_length() != 0: + raise ValueError( + "trailing bytes after current_difficulty decode: " + f"{data.get_remaining_length()} bytes" + ) + return difficulty + + async def query_mineable_topologies( + self, *, at: Optional[bytes] = None + ) -> list[bytes]: + """Call ``QuantumPowApi_mineable_topologies`` for the set of mineable hashes. + + Returns the full list of topology hashes that are currently whitelisted + for mining. On a fresh chain with no topologies registered this is an + empty list (not an error). + + Args: + at: Optional 32-byte block hash to query at. ``None`` uses the + current best head. + + Returns: + List of 32-byte topology hashes, in the order the runtime returns them. + """ + block_hash = _hex(at) if at is not None else None + encoded = await self._state_call( + "QuantumPowApi_mineable_topologies", "0x", block_hash + ) if encoded is None: raise RuntimeError( - "state_call current_difficulty returned no result" + "state_call QuantumPowApi_mineable_topologies returned no result" ) data = ScaleBytes(encoded) + hashes = _decode_h256_vec(data) + if data.get_remaining_length() != 0: + raise ValueError( + "trailing bytes after mineable_topologies decode: " + f"{data.get_remaining_length()} bytes" + ) + return hashes + + async def query_difficulty_for( + self, topology_hash: bytes, *, at: Optional[bytes] = None + ) -> Optional[SubstrateDifficulty]: + """Call ``QuantumPowApi_difficulty_for(topology_hash)`` for the decayed value. + + Unlike :meth:`query_difficulty`, which reads the raw map entry, this + method calls the runtime API so decay is applied for elapsed blocks + since the last winning proof. Returns ``None`` when the topology is + not registered. + + The SCALE parameter is the raw 32-byte H256 topology hash (no compact + prefix — the runtime API receives it as a fixed-size value). + + Args: + topology_hash: 32-byte topology hash to query. + at: Optional 32-byte block hash to query at. ``None`` uses the + current best head. + + Returns: + Decayed difficulty config, or ``None`` if the topology is absent. + """ + if len(topology_hash) != 32: + raise ValueError( + f"topology_hash must be 32 bytes, got {len(topology_hash)}" + ) + block_hash = _hex(at) if at is not None else None + encoded = await self._state_call( + "QuantumPowApi_difficulty_for", + "0x" + topology_hash.hex(), + block_hash, + ) + if encoded is None: + raise RuntimeError( + "state_call QuantumPowApi_difficulty_for returned no result" + ) + data = ScaleBytes(encoded) + tag = data.get_next_bytes(1) + if not tag: + raise ValueError("empty response from QuantumPowApi_difficulty_for") + if tag[0] == 0x00: + if data.get_remaining_length() != 0: + raise ValueError( + "trailing bytes after difficulty_for Option::None: " + f"{data.get_remaining_length()} bytes" + ) + return None + if tag[0] != 0x01: + raise ValueError( + f"unknown Option variant tag for difficulty_for: 0x{tag[0]:02x}" + ) difficulty = _decode_difficulty_config(data) if data.get_remaining_length() != 0: raise ValueError( - "trailing bytes after current_difficulty decode: " + "trailing bytes after difficulty_for decode: " f"{data.get_remaining_length()} bytes" ) return difficulty + async def add_mineable_topology( + self, + topology_hash: bytes, + signer: "Signer", + *, + wait_for: "WaitFor" = "inblock", + ) -> "ExtrinsicReceipt": + """Submit ``QuantumPow.add_mineable_topology(topology_hash)`` via sudo. + + Adds ``topology_hash`` to the on-chain whitelist of mineable topologies. + Requires a root/sudo key. + + Args: + topology_hash: 32-byte hash of the topology to whitelist. + signer: Root/sudo signer for the extrinsic. + wait_for: Submission stage — ``"sent"``, ``"inblock"``, or + ``"finalized"``. + + Returns: + Extrinsic receipt at the requested stage. + """ + if len(topology_hash) != 32: + raise ValueError( + f"topology_hash must be 32 bytes, got {len(topology_hash)}" + ) + return await self.submit_extrinsic( + call_module="QuantumPow", + call_function="add_mineable_topology", + call_params={"topology_hash": "0x" + topology_hash.hex()}, + signer=signer, + wait_for=wait_for, + ) + + async def remove_mineable_topology( + self, + topology_hash: bytes, + signer: "Signer", + *, + wait_for: "WaitFor" = "inblock", + ) -> "ExtrinsicReceipt": + """Submit ``QuantumPow.remove_mineable_topology(topology_hash)`` via sudo. + + Removes ``topology_hash`` from the on-chain whitelist of mineable + topologies. Requires a root/sudo key. + + Args: + topology_hash: 32-byte hash of the topology to remove. + signer: Root/sudo signer for the extrinsic. + wait_for: Submission stage — ``"sent"``, ``"inblock"``, or + ``"finalized"``. + + Returns: + Extrinsic receipt at the requested stage. + """ + if len(topology_hash) != 32: + raise ValueError( + f"topology_hash must be 32 bytes, got {len(topology_hash)}" + ) + return await self.submit_extrinsic( + call_module="QuantumPow", + call_function="remove_mineable_topology", + call_params={"topology_hash": "0x" + topology_hash.hex()}, + signer=signer, + wait_for=wait_for, + ) + async def query_pow_constants(self) -> PowConstants: """Read the four ``pallet_quantum_pow`` constants needed for decay. @@ -700,9 +899,7 @@ async def query_winning_solution( clients from running BLAKE3 over `(parent_hash, miner, block, salt)`. """ if not 0 <= block_number < 2**32: - raise ValueError( - f"block_number must fit in u32, got {block_number}" - ) + raise ValueError(f"block_number must fit in u32, got {block_number}") block_hash = _hex(at) if at is not None else None scale_param = "0x" + block_number.to_bytes(4, "little").hex() encoded = await self._state_call( @@ -783,7 +980,9 @@ async def query_winning_solution_count(self) -> int: return int(value) if value is not None else 0 return await self._run( lambda: _count_map_keys( - self._iface, "QuantumPow", "WinningSolutions", + self._iface, + "QuantumPow", + "WinningSolutions", ) ) @@ -807,9 +1006,7 @@ async def query_solver(self, account: bytes) -> Optional[MempoolSolverInfo]: if len(account) != 32: raise ValueError(f"account must be 32 bytes, got {len(account)}") result = await self._run( - lambda: self._iface.query( - "QuantumComputeMempool", "Solvers", [account] - ) + lambda: self._iface.query("QuantumComputeMempool", "Solvers", [account]) ) v = _storage_value(result) if v is None: @@ -858,9 +1055,7 @@ async def query_job_order(self, order_id: int) -> Optional[JobOrder]: without a follow-on query. """ result = await self._run( - lambda: self._iface.query( - "QuantumComputeMempool", "JobOrders", [order_id] - ) + lambda: self._iface.query("QuantumComputeMempool", "JobOrders", [order_id]) ) v = _storage_value(result) if v is None: @@ -922,7 +1117,9 @@ async def get_events_at(self, block_hash: bytes) -> list[dict]: out: list[dict] = [] for er in raw or []: value = getattr(er, "value", er) - event_field = value.get("event", value) if isinstance(value, dict) else value + event_field = ( + value.get("event", value) if isinstance(value, dict) else value + ) if not isinstance(event_field, dict): logger.debug( "get_events_at: skipping non-dict event record: %s", @@ -942,13 +1139,16 @@ async def get_events_at(self, block_hash: bytes) -> list[dict]: if attrs is None and module_id: logger.debug( "get_events_at: no attributes/params for %s.%s", - module_id, event_id, + module_id, + event_id, ) - out.append({ - "module_id": str(module_id) if module_id is not None else "", - "event_id": str(event_id) if event_id is not None else "", - "attributes": attrs, - }) + out.append( + { + "module_id": str(module_id) if module_id is not None else "", + "event_id": str(event_id) if event_id is not None else "", + "attributes": attrs, + } + ) return out # ------------------------------------------------------------------ @@ -971,7 +1171,10 @@ async def submit_extrinsic( ship the hex through ``ValidatorPool`` — see ``PoolClient``. """ ext_hex = await self.build_signed_extrinsic( - call_module, call_function, call_params, signer, + call_module, + call_function, + call_params, + signer, ) return await self.submit_signed_extrinsic(ext_hex, wait_for=wait_for) @@ -1009,11 +1212,19 @@ async def build_signed_extrinsic( kind = signer.signature_kind() if kind == "Sr25519": return await self._build_sr25519_extrinsic_hex( - call_module, call_function, call_params, signer, tip=tip, + call_module, + call_function, + call_params, + signer, + tip=tip, ) if kind == "Hybrid": return await self._build_hybrid_extrinsic_hex( - call_module, call_function, call_params, signer, tip=tip, + call_module, + call_function, + call_params, + signer, + tip=tip, ) raise NotImplementedError( f"build_signed_extrinsic does not support signature_kind={kind}" @@ -1037,7 +1248,9 @@ def _build() -> str: call_params=call_params, ) extrinsic: GenericExtrinsic = self._iface.create_signed_extrinsic( - call=call, keypair=keypair, tip=tip, + call=call, + keypair=keypair, + tip=tip, ) return extrinsic.data.to_hex() @@ -1194,7 +1407,6 @@ def _result_handler(message, update_nr, subscription_id): # noqa: ARG001 error=error_msg, ) - # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ @@ -1432,9 +1644,7 @@ def _fetch_extrinsic_dispatch_error( continue event = v.get("event") or v module_id = ( - event.get("module_id") - or event.get("pallet") - or event.get("pallet_name") + event.get("module_id") or event.get("pallet") or event.get("pallet_name") ) event_id = ( event.get("event_id") diff --git a/substrate/pool.py b/substrate/pool.py index 5bf46395..29278cf3 100644 --- a/substrate/pool.py +++ b/substrate/pool.py @@ -17,6 +17,7 @@ saying "no topology registered") pass through unchanged. The pool only swaps on connection-class failures. """ + from __future__ import annotations import asyncio @@ -45,25 +46,29 @@ # Idempotent operations the pool may auto-retry across swaps. # Anything not in this set raises ValidatorSwapped to the caller instead. -_IDEMPOTENT_OPS = frozenset({ - "ensure_connected", - "get_head", - "get_block_number", - "get_finalized_head", - "get_mining_snapshot", - "query_miner", - "query_proofs_submitted", - "query_difficulty", - "query_current_difficulty", - "query_last_proof_block_number", - "query_pow_constants", - "query_balance", - "query_solver", - "query_job_order", - "query_winning_solution", - "query_winning_solution_count", - "get_events_at", -}) +_IDEMPOTENT_OPS = frozenset( + { + "ensure_connected", + "get_head", + "get_block_number", + "get_finalized_head", + "get_mining_snapshot", + "query_miner", + "query_proofs_submitted", + "query_difficulty", + "query_current_difficulty", + "query_mineable_topologies", + "query_difficulty_for", + "query_last_proof_block_number", + "query_pow_constants", + "query_balance", + "query_solver", + "query_job_order", + "query_winning_solution", + "query_winning_solution_count", + "get_events_at", + } +) class ValidatorPool: @@ -89,17 +94,17 @@ def __init__( # Accept tuples/lists; normalise to list for internal mutation. urls_list = list(urls) if urls is not None else [] if not urls_list: - raise ValueError( - "ValidatorPool requires at least one validator URL" - ) + raise ValueError("ValidatorPool requires at least one validator URL") self._urls = urls_list # Sensible defaults so legacy callers `ValidatorPool(urls=...)` keep # working. Tests inject custom failover/handle_factory for isolation. if failover is None: failover = SubstrateUrlFailover(urls_list) if handle_factory is None: + def handle_factory(url: str) -> ValidatorHandle: # noqa: E306 return ValidatorHandle(url=url) + self._failover = failover self._handle_factory = handle_factory self._max_swap_retries = max_swap_retries @@ -150,7 +155,9 @@ async def send(self, op: str, args: dict[str, Any]) -> Any: except _CONNECTION_ERRORS as conn_exc: logger.warning( "pool: connection-class error on %s op=%s: %s; swapping", - handle.url, op, conn_exc, + handle.url, + op, + conn_exc, ) all_down = await self._swap_after_failure(handle.url) if op not in _IDEMPOTENT_OPS: @@ -161,7 +168,9 @@ async def send(self, op: str, args: dict[str, Any]) -> Any: if all_down or attempts >= self._max_swap_retries: logger.error( "pool: idempotent op %s exhausted retries (all_down=%s attempts=%d)", - op, all_down, attempts, + op, + all_down, + attempts, ) raise # loop and try on the new active handle diff --git a/substrate/pool_client.py b/substrate/pool_client.py index b97f0e7e..4041a457 100644 --- a/substrate/pool_client.py +++ b/substrate/pool_client.py @@ -15,6 +15,7 @@ with ``SubstrateClient.build_signed_extrinsic`` so key material never crosses the mp.Queue IPC boundary. """ + from __future__ import annotations from typing import TYPE_CHECKING, Optional @@ -100,12 +101,20 @@ async def query_proofs_submitted(self, account: bytes): """ return await self._pool.send("query_proofs_submitted", {"account": account}) - async def query_difficulty(self): - return await self._pool.send("query_difficulty", {}) + async def query_difficulty(self, topology_hash=None): + return await self._pool.send( + "query_difficulty", {"topology_hash": topology_hash} + ) async def query_current_difficulty(self, at_block: Optional[int] = None): + return await self._pool.send("query_current_difficulty", {"at_block": at_block}) + + async def query_mineable_topologies(self) -> list: + return await self._pool.send("query_mineable_topologies", {}) + + async def query_difficulty_for(self, topology_hash: bytes): return await self._pool.send( - "query_current_difficulty", {"at_block": at_block} + "query_difficulty_for", {"topology_hash": topology_hash} ) async def query_last_proof_block_number(self): diff --git a/substrate/scale_codec.py b/substrate/scale_codec.py index a06f1407..e9f9699e 100644 --- a/substrate/scale_codec.py +++ b/substrate/scale_codec.py @@ -8,6 +8,7 @@ them out of ``client.py`` lets that module stay focused on RPC/session concerns. ``SubstrateClient`` and the decode-path tests import these helpers directly. """ + from __future__ import annotations import hashlib @@ -59,7 +60,7 @@ def _decode_job_mode(value) -> JobMode: return JobMode.open() raise ValueError(f"unrecognized JobMode variant: {value!r}") if isinstance(value, dict) and len(value) == 1: - (tag, inner), = value.items() + ((tag, inner),) = value.items() if tag == "Open": return JobMode.open() if tag == "Bid": @@ -86,7 +87,7 @@ def _decode_result_delivery(value) -> ResultDelivery: return ResultDelivery.on_chain_only() raise ValueError(f"unrecognized ResultDelivery variant: {value!r}") if isinstance(value, dict) and len(value) == 1: - (tag, inner), = value.items() + ((tag, inner),) = value.items() if tag == "OnChainOnly": return ResultDelivery.on_chain_only() endpoint = inner.get("endpoint") if isinstance(inner, dict) else inner @@ -198,33 +199,33 @@ def _build_hybrid_signed_extrinsic( # (AuthorizeCall / CheckNonZeroSender / CheckSpecVersion / CheckTxVersion / # CheckGenesis / CheckWeight / WeightReclaim) encode to 0 bytes. extra = ( - b"" # AuthorizeCall - + b"" # CheckNonZeroSender - + b"" # CheckSpecVersion - + b"" # CheckTxVersion - + b"" # CheckGenesis - + b"\x00" # CheckMortality: Era::immortal - + _encode_compact_u32(int(nonce)) # CheckNonce - + b"" # CheckWeight - + _encode_compact_u128(tip) # ChargeTransactionPayment tip - + b"\x00" # CheckMetadataHash: Mode::Disabled - + b"" # WeightReclaim + b"" # AuthorizeCall + + b"" # CheckNonZeroSender + + b"" # CheckSpecVersion + + b"" # CheckTxVersion + + b"" # CheckGenesis + + b"\x00" # CheckMortality: Era::immortal + + _encode_compact_u32(int(nonce)) # CheckNonce + + b"" # CheckWeight + + _encode_compact_u128(tip) # ChargeTransactionPayment tip + + b"\x00" # CheckMetadataHash: Mode::Disabled + + b"" # WeightReclaim ) # 4. Signed-extension additional_signed, in metadata order. CheckMortality # with an immortal era uses the genesis hash here. additional = ( - b"" # AuthorizeCall - + b"" # CheckNonZeroSender - + spec_version.to_bytes(4, "little") # CheckSpecVersion - + tx_version.to_bytes(4, "little") # CheckTxVersion - + genesis_bytes # CheckGenesis - + genesis_bytes # CheckMortality (immortal -> genesis) - + b"" # CheckNonce - + b"" # CheckWeight - + b"" # ChargeTransactionPayment - + b"\x00" # CheckMetadataHash: Option::None - + b"" # WeightReclaim + b"" # AuthorizeCall + + b"" # CheckNonZeroSender + + spec_version.to_bytes(4, "little") # CheckSpecVersion + + tx_version.to_bytes(4, "little") # CheckTxVersion + + genesis_bytes # CheckGenesis + + genesis_bytes # CheckMortality (immortal -> genesis) + + b"" # CheckNonce + + b"" # CheckWeight + + b"" # ChargeTransactionPayment + + b"\x00" # CheckMetadataHash: Option::None + + b"" # WeightReclaim ) # 5. Sign payload = call || extra || additional. Blake2_256 if > 256 bytes @@ -242,9 +243,9 @@ def _build_hybrid_signed_extrinsic( # 7. Assemble the wire body and length-prefix the whole extrinsic. body = ( - bytes([0x84]) # v4 | 0x80 signed flag - + b"\x00" # MultiAddress::Id discriminator - + account # AccountId32 (32 bytes) + bytes([0x84]) # v4 | 0x80 signed flag + + b"\x00" # MultiAddress::Id discriminator + + account # AccountId32 (32 bytes) + hybrid_sig_scale + extra + call_bytes @@ -266,9 +267,7 @@ def _read_exact(data: ScaleBytes, n: int) -> bytes: """ chunk = data.get_next_bytes(n) if len(chunk) != n: - raise ValueError( - f"short read: wanted {n} bytes, got {len(chunk)}" - ) + raise ValueError(f"short read: wanted {n} bytes, got {len(chunk)}") return bytes(chunk) @@ -354,13 +353,30 @@ def _decode_compact_u32(data: ScaleBytes) -> int: if mode == 0: return first >> 2 if mode == 1: - return ((first >> 2) | (_read_exact(data, 1)[0] << 6)) + return (first >> 2) | (_read_exact(data, 1)[0] << 6) if mode == 2: rest = _read_exact(data, 3) return (first >> 2) | (rest[0] << 6) | (rest[1] << 14) | (rest[2] << 22) raise ValueError("compact big-integer mode not valid for u32 length prefix") +def _decode_h256_vec(data: ScaleBytes) -> list[bytes]: + """Decode a SCALE ``Vec`` (compact-u32 length + N×32-byte hashes). + + Each element is a raw 32-byte hash. The compact length prefix is decoded + with :func:`_decode_compact_u32`; each hash is read with + :func:`_read_exact` so a truncated buffer surfaces immediately. + + Args: + data: SCALE buffer positioned at the start of the compact length prefix. + + Returns: + List of raw 32-byte hash blobs, in order. + """ + n = _decode_compact_u32(data) + return [_read_exact(data, 32) for _ in range(n)] + + def _decode_mining_snapshot(encoded_hex: str) -> Optional[dict]: """Decode SCALE ``Option>`` from the runtime API. @@ -395,10 +411,13 @@ def _decode_mining_snapshot(encoded_hex: str) -> Optional[dict]: "last_proof_block_hash", data, lambda d: _read_exact(d, 32) ) difficulty = _decode_difficulty_config(data) - topology_hash = _decode_field("topology_hash", data, lambda d: _read_exact(d, 32)) + topology_hash = _decode_field( + "topology_hash", data, lambda d: _read_exact(d, 32) + ) nodes_len = _decode_field("nodes_len", data, _decode_compact_u32) - nodes = [_decode_field("nodes[%d]" % i, data, _decode_u32) - for i in range(nodes_len)] + nodes = [ + _decode_field("nodes[%d]" % i, data, _decode_u32) for i in range(nodes_len) + ] edges_len = _decode_field("edges_len", data, _decode_compact_u32) edges = [ ( diff --git a/tests/test_pool_client.py b/tests/test_pool_client.py index 0a5b131f..5147c351 100644 --- a/tests/test_pool_client.py +++ b/tests/test_pool_client.py @@ -5,6 +5,7 @@ ``(method_name, kwargs)`` correctly; everything interesting happens in the pool. """ + from __future__ import annotations from typing import Any @@ -126,7 +127,17 @@ async def test_query_difficulty_forwards_no_args(): pool.scripted["query_difficulty"] = "diff" client = PoolClient(pool) assert await client.query_difficulty() == "diff" - assert pool.calls == [("query_difficulty", {})] + assert pool.calls == [("query_difficulty", {"topology_hash": None})] + + +@pytest.mark.asyncio +async def test_query_difficulty_forwards_topology_hash(): + pool = _RecordingPool() + pool.scripted["query_difficulty"] = "diff-topo" + client = PoolClient(pool) + h = b"\xab" * 32 + assert await client.query_difficulty(h) == "diff-topo" + assert pool.calls == [("query_difficulty", {"topology_hash": h})] @pytest.mark.asyncio @@ -162,7 +173,8 @@ async def test_submit_signed_extrinsic_forwards_hex_and_wait_for(): pool.scripted["submit_signed_extrinsic"] = "receipt-sentinel" client = PoolClient(pool) result = await client.submit_signed_extrinsic( - "0xdeadbeef", wait_for="finalized", + "0xdeadbeef", + wait_for="finalized", ) assert result == "receipt-sentinel" assert pool.calls == [ diff --git a/tests/test_scale_codec_helpers.py b/tests/test_scale_codec_helpers.py new file mode 100644 index 00000000..fa48e982 --- /dev/null +++ b/tests/test_scale_codec_helpers.py @@ -0,0 +1,205 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +# Copyright (C) 2025 QUIP Protocol Contributors +"""Unit tests for stateless SCALE helpers in ``substrate.scale_codec``. + +These tests are purely offline — no chain connection required. They build +hand-crafted SCALE byte buffers and verify the decoders produce the correct +output. Compact-encoding helpers are copied from ``test_mining_snapshot_decode`` +rather than imported so this file has no cross-test dependencies. +""" + +from __future__ import annotations + +import pytest +from scalecodec.base import ScaleBytes + +from substrate.scale_codec import _decode_h256_vec + + +def _encode_compact_u32(n: int) -> bytes: + """Reference SCALE compact encoder for u32. Mirrors the decoder under test.""" + if n < 0: + raise ValueError(f"compact u32 must be non-negative, got {n}") + if n < 0x40: + return bytes([n << 2]) + if n < 0x4000: + return ((n << 2) | 0b01).to_bytes(2, "little") + if n < 0x4000_0000: + return ((n << 2) | 0b10).to_bytes(4, "little") + raise NotImplementedError("big-integer compact mode not exercised by these tests") + + +def _build_h256_vec_bytes(hashes: list[bytes]) -> bytes: + """Encode a Vec as compact-u32 length + concatenated 32-byte hashes.""" + out = _encode_compact_u32(len(hashes)) + for h in hashes: + assert len(h) == 32, f"each hash must be 32 bytes, got {len(h)}" + out += h + return out + + +# --------------------------------------------------------------------------- +# _decode_h256_vec +# --------------------------------------------------------------------------- + + +def test_decode_h256_vec_empty(): + """A zero-length Vec decodes to an empty list.""" + buf = ScaleBytes(_build_h256_vec_bytes([])) + result = _decode_h256_vec(buf) + assert result == [] + assert buf.get_remaining_length() == 0 + + +def test_decode_h256_vec_single(): + """A single-element Vec decodes to a one-element list.""" + h = bytes(range(32)) + buf = ScaleBytes(_build_h256_vec_bytes([h])) + result = _decode_h256_vec(buf) + assert result == [h] + assert buf.get_remaining_length() == 0 + + +def test_decode_h256_vec_three_hashes(): + """Three distinct hashes decode in order.""" + hashes = [bytes([i] * 32) for i in range(3)] + buf = ScaleBytes(_build_h256_vec_bytes(hashes)) + result = _decode_h256_vec(buf) + assert result == hashes + assert buf.get_remaining_length() == 0 + + +def test_decode_h256_vec_preserves_order(): + """The output list preserves the wire order of the hashes.""" + hashes = [bytes([0xAA] * 32), bytes([0xBB] * 32), bytes([0xCC] * 32)] + buf = ScaleBytes(_build_h256_vec_bytes(hashes)) + result = _decode_h256_vec(buf) + assert result[0] == bytes([0xAA] * 32) + assert result[1] == bytes([0xBB] * 32) + assert result[2] == bytes([0xCC] * 32) + + +def test_decode_h256_vec_trailing_bytes_left_for_caller(): + """Bytes after the vec are left in the buffer for the caller to check.""" + h = bytes([0x01] * 32) + raw = _build_h256_vec_bytes([h]) + b"\xde\xad" + buf = ScaleBytes(raw) + result = _decode_h256_vec(buf) + assert result == [h] + # Caller is responsible for asserting get_remaining_length() == 0. + assert buf.get_remaining_length() == 2 + + +def test_decode_h256_vec_short_read_raises(): + """A truncated buffer surfaces a short-read error.""" + # Claim 2 hashes but only provide 1. + raw = _encode_compact_u32(2) + bytes([0x55] * 32) + buf = ScaleBytes(raw) + with pytest.raises(ValueError, match="short read"): + _decode_h256_vec(buf) + + +def test_decode_h256_vec_compact_two_byte_length(): + """Compact two-byte length prefix (n ≥ 64) decodes correctly.""" + hashes = [bytes([i % 256] * 32) for i in range(64)] + buf = ScaleBytes(_build_h256_vec_bytes(hashes)) + result = _decode_h256_vec(buf) + assert len(result) == 64 + assert result[63] == bytes([63 % 256] * 32) + assert buf.get_remaining_length() == 0 + + +# --------------------------------------------------------------------------- +# query_mineable_topologies / query_difficulty_for — offline decode tests +# These test the SCALE decode logic by monkeypatching _state_call so no +# chain connection is needed. +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_query_mineable_topologies_empty(monkeypatch): + """An empty Vec from the runtime API returns [].""" + from substrate.client import SubstrateClient + + raw_hex = "0x" + _build_h256_vec_bytes([]).hex() + client = SubstrateClient.__new__(SubstrateClient) + + async def _fake_state_call(method, param, block_hash): + assert method == "QuantumPowApi_mineable_topologies" + assert param == "0x" + return raw_hex + + monkeypatch.setattr(client, "_state_call", _fake_state_call) + result = await client.query_mineable_topologies() + assert result == [] + + +@pytest.mark.asyncio +async def test_query_mineable_topologies_two_hashes(monkeypatch): + """Two hashes from the runtime API decode correctly.""" + from substrate.client import SubstrateClient + + hashes = [bytes([0xAA] * 32), bytes([0xBB] * 32)] + raw_hex = "0x" + _build_h256_vec_bytes(hashes).hex() + client = SubstrateClient.__new__(SubstrateClient) + + async def _fake_state_call(method, param, block_hash): + return raw_hex + + monkeypatch.setattr(client, "_state_call", _fake_state_call) + result = await client.query_mineable_topologies() + assert result == hashes + + +@pytest.mark.asyncio +async def test_query_difficulty_for_none(monkeypatch): + """Option::None (0x00) from the runtime API returns Python None.""" + from substrate.client import SubstrateClient + + client = SubstrateClient.__new__(SubstrateClient) + + async def _fake_state_call(method, param, block_hash): + assert method == "QuantumPowApi_difficulty_for" + return "0x00" + + monkeypatch.setattr(client, "_state_call", _fake_state_call) + result = await client.query_difficulty_for(b"\xab" * 32) + assert result is None + + +@pytest.mark.asyncio +async def test_query_difficulty_for_some(monkeypatch): + """Option::Some(DifficultyConfig) decodes to a SubstrateDifficulty.""" + from substrate.client import SubstrateClient + from substrate.types import SubstrateDifficulty + + # Build Option::Some(DifficultyConfig { min_solutions: 3, max_energy_milli: -500, + # min_diversity_milli: 200 }) + inner = ( + b"\x01" # Option::Some tag + + (3).to_bytes(4, "little") + + (-500).to_bytes(8, "little", signed=True) + + (200).to_bytes(4, "little") + ) + raw_hex = "0x" + inner.hex() + client = SubstrateClient.__new__(SubstrateClient) + + async def _fake_state_call(method, param, block_hash): + return raw_hex + + monkeypatch.setattr(client, "_state_call", _fake_state_call) + result = await client.query_difficulty_for(b"\xcd" * 32) + assert isinstance(result, SubstrateDifficulty) + assert result.min_solutions == 3 + assert result.max_energy_milli == -500 + assert result.min_diversity_milli == 200 + + +@pytest.mark.asyncio +async def test_query_difficulty_for_bad_topology_hash_length(): + """Passing a hash of wrong length raises ValueError before any RPC.""" + from substrate.client import SubstrateClient + + client = SubstrateClient.__new__(SubstrateClient) + with pytest.raises(ValueError, match="32 bytes"): + await client.query_difficulty_for(b"\xab" * 16) diff --git a/tests/test_substrate_client.py b/tests/test_substrate_client.py index 2f942cad..f15bfeaf 100644 --- a/tests/test_substrate_client.py +++ b/tests/test_substrate_client.py @@ -8,6 +8,7 @@ storage). Extrinsic submission is covered by Phase 2's bootstrap tests because it requires a funded signing account. """ + from __future__ import annotations import asyncio @@ -116,20 +117,24 @@ async def test_query_miner_unregistered_account(client): async def test_query_difficulty_either_returns_or_none(client): - """`StorageValue<_, DifficultyConfig>` quirks: substrate-interface returns - the `Default::default()` value (all zeros) when storage is empty rather - than `None`. `query_difficulty` must honor `meta_info[result_found]` - so the bootstrap idempotency check stays correct on a fresh chain.""" + """`query_difficulty` reads ``QuantumPow.Difficulties[DefaultTopology]``. + + On a fresh chain with no ``DefaultTopology`` set, it returns ``None``. + After bootstrap the map entry is populated and a real + ``SubstrateDifficulty`` is returned. Either outcome is correct, but we + must never see an all-zeros struct (the pre-map-query default-masking bug). + """ difficulty = await client.query_difficulty() - # On a freshly-built chain `Difficulty` is unset and we expect None. After - # Phase 2 bootstrap (or any prior sudo set_difficulty) the storage is - # populated and we expect a real SubstrateDifficulty. Either is correct, - # but we must never see the all-zeros default-struct case. + # On a freshly-built chain DefaultTopology is unset → None. After Phase 2 + # bootstrap or any prior sudo set_difficulty the map entry is populated. + # Both are correct; we just must never see the all-zeros default-struct. if difficulty is not None: # If we got a value, at least one field must be non-zero — otherwise # we're back to the "default returned for empty storage" bug. - assert any([ - difficulty.min_solutions, - difficulty.max_energy_milli, - difficulty.min_diversity_milli, - ]), "query_difficulty returned all-zeros struct; storage is empty" + assert any( + [ + difficulty.min_solutions, + difficulty.max_energy_milli, + difficulty.min_diversity_milli, + ] + ), "query_difficulty returned all-zeros struct; storage is empty" From a9efd92ea5e59a05e3b8de1178271fbdb061395b Mon Sep 17 00:00:00 2001 From: "Richard T. Carback III" Date: Wed, 24 Jun 2026 09:27:58 -0400 Subject: [PATCH 2/5] feat(client): thread topology_hash through set_difficulty callers + register_advantage2 --mineable MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit miner_bootstrap._maybe_seed_chain: register topology BEFORE set_difficulty (runtime now ensure!s topology is in RegisteredTopologies); pass topology_hash in set_difficulty params; derive seed_topology_hash from the snapshot returned after registration (or from the existing snapshot). query_difficulty is now called with the resolved topology hash. register_advantage2.main: restructure to the required order — 1. compute target_hash 2. register_topology (idempotent) 3. add_mineable_topology if not already whitelisted (required; runtime rejects set_default_topology with TopologyNotMineable otherwise) 4. set_difficulty with {topology_hash, difficulty} (per-topology) 5. set_default_topology (topology registered + whitelisted) download_and_validate_wins: no change needed — already per-topology via _topology_for(topology_hash) and snapshot.difficulty throughout. --- substrate/miner_bootstrap.py | 118 ++++++----- tools/register_advantage2.py | 399 +++++++++++++++++++++++++++++++++++ 2 files changed, 469 insertions(+), 48 deletions(-) create mode 100644 tools/register_advantage2.py diff --git a/substrate/miner_bootstrap.py b/substrate/miner_bootstrap.py index cc00ef57..ed57a623 100644 --- a/substrate/miner_bootstrap.py +++ b/substrate/miner_bootstrap.py @@ -29,6 +29,7 @@ baked in, and ad-hoc runtime config goes through ops tooling that holds the real sudo key. """ + from __future__ import annotations import asyncio @@ -122,8 +123,8 @@ def _resolve_dev_signer(uri: str) -> HybridSigner: # operators should tune via `quip-miner set-difficulty` once that lands. DEFAULT_SEED_DIFFICULTY = SubstrateDifficulty( min_solutions=5, - max_energy_milli=-2_500_000, # -2500.0 - min_diversity_milli=200, # 0.2 + max_energy_milli=-2_500_000, # -2500.0 + min_diversity_milli=200, # 0.2 ) # Minimum balance before bootstrap considers the account funded. Miner @@ -249,9 +250,7 @@ async def bootstrap(config: BootstrapConfig) -> BootstrapResult: topology_seeded = False difficulty_seeded = False if config.seed_chain: - topology_seeded, difficulty_seeded = await _maybe_seed_chain( - client, config - ) + topology_seeded, difficulty_seeded = await _maybe_seed_chain(client, config) balance = await ensure_funded(client, keystore, config) # Idempotent: returns whether it *newly* registered; either way the @@ -300,45 +299,21 @@ async def _maybe_seed_chain( Idempotent: returns `(topology_seeded, difficulty_seeded)` reflecting whether *this* call did the seeding. Both are False on re-runs. + + Order matters: topology MUST be registered before set_difficulty because + the runtime's `set_difficulty` now `ensure!`s the topology hash is in + `RegisteredTopologies`. The topology hash is resolved from the snapshot + after registration (or from the existing snapshot if already present). """ await _assert_dev_chain(client) sudo_signer = _resolve_dev_signer(config.sudo_key_uri) - difficulty_seeded = False - needs_seed = ( - await client.query_difficulty() is None or config.force_reseed_difficulty - ) - if needs_seed: - if config.force_reseed_difficulty: - # Defense in depth: `_assert_dev_chain` above already refused - # non-dev chains, but log a loud warning when we're about to - # overwrite an existing on-chain difficulty so the action is - # never invisible. Production CLI doesn't expose the flag. - logger.warning( - "force-reseeding QuantumPow.Difficulty via sudo: " - "min_solutions=%d max_energy_milli=%d min_diversity_milli=%d", - config.seed_difficulty.min_solutions, - config.seed_difficulty.max_energy_milli, - config.seed_difficulty.min_diversity_milli, - ) - else: - logger.info("seeding QuantumPow.Difficulty via sudo (first time)") - await _sudo_call( - client, - sudo_signer, - "QuantumPow", - "set_difficulty", - {"difficulty": _difficulty_to_dict(config.seed_difficulty)}, - ) - difficulty_seeded = True - else: - logger.info("QuantumPow.Difficulty already set; skipping seed") - + # --- Step 1: register topology (must precede set_difficulty) ----------- topology_seeded = False # Probe whether DefaultTopology is set by asking for the snapshot. If the - # runtime API returns None despite Difficulty being set, that's the - # missing-topology case — see pallets/quantum-pow/src/lib.rs:457 where - # mining_snapshot bails on DefaultTopology::get()? returning None. + # runtime API returns None, that's the missing-topology case — see + # pallets/quantum-pow/src/lib.rs:457 where mining_snapshot bails on + # DefaultTopology::get()? returning None. snapshot = await client.get_mining_snapshot( miner_account_bytes=b"\x00" * 32, # placeholder; we just want the call ) @@ -369,12 +344,58 @@ async def _maybe_seed_chain( }, ) topology_seeded = True + # Fetch the snapshot again so we have the topology hash for set_difficulty. + snapshot = await client.get_mining_snapshot( + miner_account_bytes=b"\x00" * 32, + ) + if snapshot is None: + raise RuntimeError( + "register_topology succeeded but mining_snapshot is still None; " + "the topology may have failed inner-call validation." + ) else: logger.info( "QuantumPow topology already registered (hash=0x%s); skipping seed", snapshot.topology_hash.hex(), ) + seed_topology_hash = snapshot.topology_hash + + # --- Step 2: seed difficulty (topology must already be registered) ----- + difficulty_seeded = False + needs_seed = ( + await client.query_difficulty(seed_topology_hash) is None + or config.force_reseed_difficulty + ) + if needs_seed: + if config.force_reseed_difficulty: + # Defense in depth: `_assert_dev_chain` above already refused + # non-dev chains, but log a loud warning when we're about to + # overwrite an existing on-chain difficulty so the action is + # never invisible. Production CLI doesn't expose the flag. + logger.warning( + "force-reseeding QuantumPow.Difficulty via sudo: " + "min_solutions=%d max_energy_milli=%d min_diversity_milli=%d", + config.seed_difficulty.min_solutions, + config.seed_difficulty.max_energy_milli, + config.seed_difficulty.min_diversity_milli, + ) + else: + logger.info("seeding QuantumPow.Difficulty via sudo (first time)") + await _sudo_call( + client, + sudo_signer, + "QuantumPow", + "set_difficulty", + { + "topology_hash": "0x" + seed_topology_hash.hex(), + "difficulty": _difficulty_to_dict(config.seed_difficulty), + }, + ) + difficulty_seeded = True + else: + logger.info("QuantumPow.Difficulty already set; skipping seed") + return topology_seeded, difficulty_seeded @@ -417,7 +438,9 @@ def _difficulty_to_dict(d: SubstrateDifficulty) -> dict: } -def _build_seed_topology(mt: Tuple[int, int]) -> Tuple[List[int], List[Tuple[int, int]]]: +def _build_seed_topology( + mt: Tuple[int, int], +) -> Tuple[List[int], List[Tuple[int, int]]]: """Generate a Zephyr Z(m,t) graph using sampler-compatible node labels. The labels are whatever `dwave_networkx.zephyr_graph` assigns (linear ints, @@ -437,10 +460,7 @@ def _build_seed_topology(mt: Tuple[int, int]) -> Tuple[List[int], List[Tuple[int m, t = mt g = dnx.zephyr_graph(m=m, t=t) nodes = sorted(int(n) for n in g.nodes()) - edges = sorted( - (min(int(u), int(v)), max(int(u), int(v))) - for u, v in g.edges() - ) + edges = sorted((min(int(u), int(v)), max(int(u), int(v))) for u, v in g.edges()) return nodes, edges @@ -495,9 +515,7 @@ async def ensure_funded_via_faucet( account = keystore.signer.account_id_bytes() balance = await client.query_balance(account) if balance >= min_balance: - logger.info( - "miner account already funded: balance=%d plancks", balance - ) + logger.info("miner account already funded: balance=%d plancks", balance) return balance if faucet_url is None: @@ -528,7 +546,8 @@ async def ensure_funded_via_faucet( if balance >= min_balance: logger.info( "faucet funded after %d attempt(s): balance=%d plancks", - attempt, balance, + attempt, + balance, ) return balance if budget <= 0: @@ -537,7 +556,10 @@ async def ensure_funded_via_faucet( logger.warning( "faucet not funded yet (%s); attempt %d, retrying in %.1fs " "(%.0fs budget left)", - last_note, attempt, wait, budget, + last_note, + attempt, + wait, + budget, ) await asyncio.sleep(wait) budget -= wait diff --git a/tools/register_advantage2.py b/tools/register_advantage2.py new file mode 100644 index 00000000..d4847b9b --- /dev/null +++ b/tools/register_advantage2.py @@ -0,0 +1,399 @@ +"""Register the full Advantage2_system1 topology on a running QUIP chain via sudo. + +The built-in ``quip-miner bootstrap --seed-chain`` path can't do this: it +refuses any chain whose name isn't a dev prefix (this chain reports +"Quip Testnet"), and it only seeds a *synthetic* Zephyr ``Z(m,t)`` graph. +This script submits the sudo calls in the required order: + + 1. Compute ``target_hash`` (before any topology operations). + 2. ``Sudo.sudo(QuantumPow.register_topology(...))`` — the real 4,578-node / + 41,531-edge Advantage2_system1 hardware graph (fits the runtime's + 5,000-node / 50,000-edge BoundedVec bounds). + 3. ``Sudo.sudo(QuantumPow.add_mineable_topology(...))`` — whitelist the + topology (required; ``set_default_topology`` rejects non-whitelisted + topologies with ``TopologyNotMineable``). + 4. ``Sudo.sudo(QuantumPow.set_difficulty(...))`` — easy test config; must + pass ``topology_hash`` now that difficulty is per-topology. + 5. ``Sudo.sudo(QuantumPow.set_default_topology(...))`` — repoint the active + mining problem (topology must be registered AND whitelisted). + +It registers the *exact* node/edge labels that ``load_topology( +"Advantage2_system1")`` returns — the same object a miner loads — so the +chain's ``hash_topology`` and the miner's ``compute_topology_hash`` agree by +construction. Finally it re-reads the snapshot through the miner's own +``resolve_topology_binding`` and asserts the local↔chain hashes match. + +DEV/OPS ONLY. This deliberately skips ``_assert_dev_chain`` (which would +reject "Quip Testnet"); the //Alice dev sudo key only works because this is a +chain you own. Do not point it at anything you don't control. + +Run (this is a state-mutating sudo write — review first): + + PYTHONPATH=. .quip/bin/python tools/register_advantage2.py \ + --validator ws://localhost:9944 + +Idempotent: a re-run skips topology registration if one is already present +and just re-asserts the binding. +""" + +from __future__ import annotations + +import argparse +import asyncio +from pathlib import Path + +from bip39 import bip39_to_mini_secret +from dwave_topologies.topologies.json_loader import load_topology +from shared.hybrid_signer import HybridSigner +from shared.logging_config import get_logger +from substrate.client import SubstrateClient +from substrate.miner_bootstrap import ( + _DEFAULT_ALLOWED_H, + _DEFAULT_ALLOWED_J, + _DEFAULT_ALLOWED_SPIN, + _difficulty_to_dict, + _resolve_dev_signer, + _sudo_call, +) +from shared.allowed_value_spec import AllowedValueSet, scale_dict +from shared.topology_hash import topology_hash as compute_topology_hash +from substrate.types import SubstrateDifficulty + +logger = get_logger(__name__) + +# Selectable linear-field (h) specs. "ternary" is the legacy v0.2 default +# (h ∈ {-1,0,+1}); "zero" is the J-only zero-field class (h ≡ 0) the testnet +# is moving to — its GSE curve is the protocol's baseline, so the difficulty +# energy target tracks what the QPU actually lands. Requires rc11+ on miners +# (zero-field gauge-fix + h-aware difficulty band) before repointing. +ALLOWED_H_SPECS = { + "ternary": _DEFAULT_ALLOWED_H, # AllowedValueSet((-1000, 0, 1000)) + "zero": AllowedValueSet((0,)), # h ≡ 0 +} + +# Difficulty seeded when none exists or --force-difficulty is set. min_solutions +# and the diversity floor stay permissive; only the energy target varies (via +# --max-energy-milli) because that's what must track the active topology's GSE +# landscape. Default -2500.0 (easy). For h=0 use a value just easier than the +# zero-field reachable floor (e.g. -14_000_000 = -14000.0). +_DEFAULT_MAX_ENERGY_MILLI = -2_500_000 + + +def _seed_difficulty(max_energy_milli: int) -> SubstrateDifficulty: + return SubstrateDifficulty( + min_solutions=1, + max_energy_milli=max_energy_milli, + min_diversity_milli=0, # diversity gate disabled + ) + + +# Placeholder account for the read-only snapshot probe; the call just needs +# 32 bytes, the value doesn't affect what's registered. +PROBE_ACCOUNT = bytes(32) + + +def _resolve_signer(sudo_uri: str, mnemonic_file: str | None) -> HybridSigner: + """Build the sudo HybridSigner from a mnemonic file or a dev URI. + + A real testnet (e.g. "Quip Testnet") configures ``pallet_sudo::Key`` to a + quip-owned account, so the dev ``//Alice`` table doesn't apply. When + ``mnemonic_file`` is given, derive the signer the same way substrate does: + ``bip39_to_mini_secret(mnemonic)`` yields the 32-byte master seed that + :meth:`HybridSigner.from_master_seed` HKDF-expands into the classical + PQ + sub-keys. Verified to reproduce operator-1's ``5GZMoWFM…`` account. + """ + if mnemonic_file: + mnem = Path(mnemonic_file).expanduser().read_text().strip() + master_seed = bytes(bip39_to_mini_secret(mnem, "")) + return HybridSigner.from_master_seed(master_seed) + return _resolve_dev_signer(sudo_uri) + + +async def _assert_is_sudo(client: SubstrateClient, signer: HybridSigner) -> None: + """Refuse to submit unless the signer is the chain's ``Sudo.Key``. + + Without this, a wrong key sails through ``_sudo_call`` (the inner call + fails with ``RequireSudo`` but ``Sudo.sudo`` succeeds at the extrinsic + level, so nothing raises) and the operator thinks the op worked when it + silently did nothing. + """ + key = await client._run(lambda: client._iface.query("Sudo", "Key")) # noqa: SLF001 + onchain = key.value if key else None + mine = signer.ss58_address() + if onchain != mine: + raise SystemExit( + f"signer {mine} is NOT the chain's Sudo.Key ({onchain}); " + "sudo calls would fail with RequireSudo. Point --sudo-mnemonic at " + "the operator key that owns Sudo.Key." + ) + logger.info("sudo signer verified: %s == on-chain Sudo.Key", mine) + + +async def _topology_registered(client: SubstrateClient, topo_hash: bytes) -> bool: + """True if ``topo_hash`` exists in the ``RegisteredTopologies`` map. + + Topologies are keyed by hash and coexist; ``register_topology`` errors + ``TopologyAlreadyRegistered`` on a repeat. We check first so a re-run is + idempotent rather than relying on the (swallowed) sudo inner error. + """ + res = await client._run( # noqa: SLF001 + lambda: client._iface.query( + "QuantumPow", "RegisteredTopologies", ["0x" + topo_hash.hex()] + ) + ) + return res is not None and res.value is not None + + +async def _default_topology_hash(client: SubstrateClient) -> bytes | None: + """Current ``DefaultTopology`` pointer (the active mining topology), or None.""" + res = await client._run( # noqa: SLF001 + lambda: client._iface.query("QuantumPow", "DefaultTopology") + ) + if res is None or res.value is None: + return None + return bytes.fromhex(str(res.value).removeprefix("0x")) + + +def _normalize(topo) -> tuple[list[int], list[tuple[int, int]]]: + """Sorted node ids + canonical (min,max) sorted edges. + + Mirrors ``miner_bootstrap._build_seed_topology`` exactly. ``topology_hash`` + canonicalizes the same way, so ordering doesn't change the hash — this just + keeps the registered payload tidy and matches the proven seed path. Node + labels are kept as-is (NOT remapped to 0..n-1): the miner's sampler keys h/J + dicts by these labels, so remapping would break proof verification. + """ + nodes = sorted(int(n) for n in topo.nodes) + edges = sorted((min(int(u), int(v)), max(int(u), int(v))) for u, v in topo.edges) + return nodes, edges + + +async def main( + url: str, + sudo_uri: str, + mnemonic_file: str | None, + force_difficulty: bool, + h_spec: str, + max_energy_milli: int, +) -> None: + client = SubstrateClient(url=url) + await client.connect() + try: + chain = await client._run(lambda: client._iface.chain) # noqa: SLF001 + logger.info("connected to %s (chain=%r)", url, chain) + signer = _resolve_signer(sudo_uri, mnemonic_file) + await _assert_is_sudo(client, signer) + + # --- 1. Compute target hash (must precede all topology operations) -- + # The hash is needed to check registration, whitelist, difficulty, and + # set_default_topology — compute it once up front. + allowed_h = ALLOWED_H_SPECS[h_spec] + topo = load_topology("Advantage2_system1") + nodes, edges = _normalize(topo) + # The hash the chain will compute and a miner will independently derive + # — same recipe as the runtime's hash_topology / the miner's binding. + target_hash = compute_topology_hash( + nodes, edges, allowed_h, _DEFAULT_ALLOWED_J, _DEFAULT_ALLOWED_SPIN + ) + logger.info( + "target topology: Advantage2_system1 h_spec=%s %d nodes %d edges hash=0x%s", + h_spec, + len(nodes), + len(edges), + target_hash.hex(), + ) + + # --- 2. Register topology (required before set_difficulty) ---------- + # set_difficulty now ensure!s the topology is in RegisteredTopologies. + if await _topology_registered(client, target_hash): + logger.info("topology already registered; skipping register_topology") + else: + logger.info( + "registering topology via sudo (~%d KB extrinsic)...", + (len(nodes) * 4 + len(edges) * 8) // 1024, + ) + await _sudo_call( + client, + signer, + "QuantumPow", + "register_topology", + { + "nodes": (nodes,), + "edges": (edges,), + "allowed_h_values": scale_dict(allowed_h), + "allowed_j_values": scale_dict(_DEFAULT_ALLOWED_J), + "allowed_spin_values": scale_dict(_DEFAULT_ALLOWED_SPIN), + }, + ) + if not await _topology_registered(client, target_hash): + raise SystemExit( + "register_topology did not take effect — the sudo extrinsic " + "was included but its inner call failed (check the Sudid " + "event, e.g. InvalidTopology / spec validation)." + ) + logger.info("register_topology landed") + + # --- 3. Whitelist the target (required before set_default_topology) - + # set_default_topology now rejects topologies not in MineableTopologies. + # The first registered topology is auto-whitelisted by the runtime, but + # subsequent topologies must be explicitly added. We always check and + # add if absent — it is idempotent from the operator's perspective. + mineable = await client.query_mineable_topologies() + if target_hash in mineable: + logger.info("topology already in MineableTopologies; skipping whitelist") + else: + logger.info( + "whitelisting topology 0x%s via add_mineable_topology", + target_hash.hex(), + ) + await _sudo_call( + client, + signer, + "QuantumPow", + "add_mineable_topology", + {"topology_hash": "0x" + target_hash.hex()}, + ) + mineable_after = await client.query_mineable_topologies() + if target_hash not in mineable_after: + raise SystemExit( + "add_mineable_topology did not take effect — inner call " + "failed (TopologyNotRegistered?). Whitelist unchanged." + ) + logger.info("topology whitelisted") + + # --- 4. Difficulty (topology must be registered; hash is now known) -- + seed = _seed_difficulty(max_energy_milli) + existing = await client.query_difficulty(target_hash) + if existing is None or force_difficulty: + logger.info( + "setting difficulty via sudo: min_solutions=%d " + "max_energy_milli=%d min_diversity_milli=%d", + seed.min_solutions, + seed.max_energy_milli, + seed.min_diversity_milli, + ) + await _sudo_call( + client, + signer, + "QuantumPow", + "set_difficulty", + { + "topology_hash": "0x" + target_hash.hex(), + "difficulty": _difficulty_to_dict(seed), + }, + ) + after = await client.query_difficulty(target_hash) + if after is None or after.max_energy_milli != max_energy_milli: + raise SystemExit( + "set_difficulty did not take effect — the sudo extrinsic " + "was included but its inner call failed (check the Sudid " + f"event). Wanted max_energy_milli={max_energy_milli}, " + f"chain has {after.max_energy_milli if after else None}." + ) + logger.info("difficulty set (max_energy_milli=%d)", max_energy_milli) + else: + logger.info( + "difficulty already set (max_energy_milli=%d); pass " + "--force-difficulty to overwrite", + existing.max_energy_milli, + ) + + # --- 5. Repoint DefaultTopology (topology registered + whitelisted) -- + # This is the consensus-affecting step: it changes the active mining + # problem for the whole network and re-bases the difficulty energy curve. + current_default = await _default_topology_hash(client) + if current_default == target_hash: + logger.info("DefaultTopology already points at target; no repoint") + else: + logger.info( + "repointing DefaultTopology 0x%s -> 0x%s via sudo", + current_default.hex() if current_default else "none", + target_hash.hex(), + ) + await _sudo_call( + client, + signer, + "QuantumPow", + "set_default_topology", + {"topology_hash": "0x" + target_hash.hex()}, + ) + if await _default_topology_hash(client) != target_hash: + raise SystemExit( + "set_default_topology did not take effect — inner call " + "failed (TopologyNotRegistered or TopologyNotMineable?). " + "DefaultTopology unchanged." + ) + logger.info("DefaultTopology repointed") + + # --- 6. Verify via the miner's own binding path -------------------- + binding = await client.resolve_topology_binding( + topo, miner_account_bytes=PROBE_ACCOUNT + ) + print("\n=== result ===") + print(f"h_spec : {h_spec}") + print(f"chain topology hash : 0x{binding.chain_hash.hex()}") + print(f"local sampler hash : 0x{binding.expected_hash.hex()}") + print(f"hashes match : {binding.matches}") + if not binding.matches or binding.chain_hash != target_hash: + raise SystemExit( + "MISMATCH — chain default != target topology. A miner started " + "with --topology Advantage2_system1 would fail fast." + ) + print("\nOK. Restart miners (--topology Advantage2_system1) to rebind.") + print( + "NOTE: whitelisting (add_mineable_topology) is mandatory before " + "set_default_topology on this runtime version. " + "Re-baseline set_difficulty for the new energy curve if " + "the GSE landscape changed materially (e.g. ternary -> zero)." + ) + finally: + await client.close() + + +if __name__ == "__main__": + ap = argparse.ArgumentParser(description=__doc__) + ap.add_argument("--validator", default="ws://localhost:9944", help="WS RPC URL") + ap.add_argument( + "--sudo-mnemonic", + default=None, + help="path to a BIP39 mnemonic file for the real sudo key (e.g. " + "../quip-protocol-rs/quip-testnet-keys/operator-1/mnemonic). When set, " + "overrides --sudo-key.", + ) + ap.add_argument( + "--sudo-key", + default="//Alice", + help="dev sudo URI (used only when --sudo-mnemonic is absent)", + ) + ap.add_argument( + "--force-difficulty", + action="store_true", + help="overwrite an already-set difficulty with --max-energy-milli " + "(min_solutions=1, diversity=0)", + ) + ap.add_argument( + "--max-energy-milli", + type=int, + default=_DEFAULT_MAX_ENERGY_MILLI, + help="energy target (milli) to seed when difficulty is unset or " + f"--force-difficulty (default {_DEFAULT_MAX_ENERGY_MILLI} = -2500.0). " + "For h=0 use ~ -14_000_000 (just easier than the zero-field floor).", + ) + ap.add_argument( + "--h-spec", + choices=sorted(ALLOWED_H_SPECS), + default="ternary", + help="linear-field spec to register/repoint to: 'ternary' (legacy " + "h in {-1,0,1}) or 'zero' (J-only h=0). 'zero' requires rc11+ miners.", + ) + args = ap.parse_args() + asyncio.run( + main( + args.validator, + args.sudo_key, + args.sudo_mnemonic, + args.force_difficulty, + args.h_spec, + args.max_energy_milli, + ) + ) From 6bdda321ec1e2b346dce34778019b623ba7c517f Mon Sep 17 00:00:00 2001 From: "Richard T. Carback III" Date: Wed, 24 Jun 2026 09:56:28 -0400 Subject: [PATCH 3/5] feat(miner): drop participation System.remark for pallet-native tracking MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The per-solution participation remark was a stopgap from before the pallet had a miner registry: a free-form System.remark JSON blob per solution to record that a node was mining. The pallet now tracks this natively — register_miner reserves a deposit and records identity, and submit_proof increments proofs_submitted/proofs_won in the Miners map — so the remark is redundant transport for data the chain already owns. Remove the whole producer->consumer chain: - base_miner: participating_cb param + gate emit + _participation_extra - dwave_miner: _participation_extra override (budget_seconds) - miner_worker: _emit_participating and its wiring - miner_controller: _mark_participating, _submit_participation_remark, the op=="participating" drain branch, dedup state, retry constants, and the now-unused submit_remark import The descriptor/identify hardware-specs remarks keep using System.remark (register_miner takes no payload, so there is no pallet equivalent). --- QPU/dwave_miner.py | 11 -- quip_cli.py | 10 +- shared/base_miner.py | 29 ---- shared/miner_worker.py | 22 --- substrate/miner_controller.py | 139 --------------- substrate/remark.py | 6 +- tests/test_base_miner_pump.py | 54 ------ tests/test_dwave_stream_stop.py | 13 +- tests/test_substrate_miner_controller.py | 206 ----------------------- 9 files changed, 8 insertions(+), 482 deletions(-) diff --git a/QPU/dwave_miner.py b/QPU/dwave_miner.py index c119f404..df099d0d 100644 --- a/QPU/dwave_miner.py +++ b/QPU/dwave_miner.py @@ -411,17 +411,6 @@ def _midstream_budget_ok( self._inloop_pacing_rl.reset() return MidstreamBudget(should_mine=should_mine, stats=stats) - def _participation_extra(self) -> Dict[str, Any]: - """QPU adds the reservoir pool at dispatch to the participation marker. - - The snapshot is the QPU runway on hand when mining began for this - solution # (>= ``min_block_budget``). Returns ``{}`` when no budget is - configured. - """ - if self.time_manager is None: - return {} - return {"budget_seconds": self.time_manager.get_stats()["pool_seconds"]} - def _adapt_mining_params( self, current_requirements: BlockRequirements, diff --git a/quip_cli.py b/quip_cli.py index e849b1c1..5d1d4ea1 100644 --- a/quip_cli.py +++ b/quip_cli.py @@ -84,8 +84,7 @@ # 5,10,20,40,45,45,45s ≈ 3.5 minutes) before failing the startup hard. The # node descriptor is filed right after register_miner, so its remark can race # the registration extrinsic's nonce (stale `accountNextIndex` for ~a block); -# the retry rides that out. The per-round "participating" remark is separate -# and stays best-effort — it is not gated here. +# the retry rides that out. _STARTUP_RETRY_ATTEMPTS = 8 _STARTUP_RETRY_BASE_DELAY_SECONDS = 5.0 _STARTUP_RETRY_MAX_DELAY_SECONDS = 45.0 @@ -227,10 +226,9 @@ async def _auto_identify( fatal startup requirement: a descriptor that can't be built/validated fails immediately (operator misconfiguration), and submission is retried over several minutes before failing hard via the ``descriptor-failed`` CLI - code. The per-round "participating" remark is a separate, best-effort - signal and is not gated here. The descriptor's `miners` block is built - from the same TOML-shaped dict that `MinerCore` used to spawn worker - handles, so the descriptor always reflects the actual launched topology. + code. The descriptor's `miners` block is built from the same TOML-shaped + dict that `MinerCore` used to spawn worker handles, so the descriptor + always reflects the actual launched topology. """ effective_name = (node_name or _default_node_name())[:64] # When the operator did not configure public_host, query diff --git a/shared/base_miner.py b/shared/base_miner.py index 8638ef4e..cb2755a6 100644 --- a/shared/base_miner.py +++ b/shared/base_miner.py @@ -670,7 +670,6 @@ def mine_work_item( stop_event: multiprocessing.synchronize.Event, preview_cb: Optional[Any] = None, budget_cb: Optional[Any] = None, - participating_cb: Optional[Any] = None, **kwargs, ) -> Optional[MiningResult]: """Protocol-neutral mining loop. @@ -716,13 +715,6 @@ def mine_work_item( ``QPUTimeManager.get_stats`` shape) so the controller can surface live usage on telemetry. Default ``None`` = no-op. A failing callback never breaks mining. - participating_cb: Optional callable invoked exactly once per - accepted dispatch (after ``_pre_mine_setup`` passes its gate, - so a budget-starved QPU dispatch that aborts never fires it). - Receives ``(solution_number, extra_dict)`` where ``extra_dict`` - is ``self._participation_extra()`` (QPU adds ``budget_seconds``). - Drives the controller's write-once participation remark. - Default ``None`` = no-op; a failing callback never breaks mining. **kwargs: Forwarded to ``_pre_mine_setup``. Returns: @@ -736,18 +728,6 @@ def mine_work_item( is_substrate = setup.is_substrate desc_q = setup.desc_q - # Dispatch accepted (gate passed): emit the write-once participation - # signal. For QPU this only runs once the reservoir buffer is reached, - # since a budget-starved dispatch aborts in _setup_dispatch above. - if participating_cb is not None: - try: - participating_cb( - loop_state.solution_number_for_log, - self._participation_extra(), - ) - except Exception as exc: # noqa: BLE001 — observability path - self.logger.debug("participating_cb failed (ignored): %s", exc) - progress = 0 try: while self.mining and not stop_event.is_set(): @@ -1557,15 +1537,6 @@ def _midstream_budget_ok( """ return None - def _participation_extra(self) -> Dict[str, Any]: - """Extra fields for the per-dispatch participation marker. - - Base returns ``{}`` (CPU/GPU have no reservoir). The QPU subclass adds - ``{"budget_seconds": }`` so the controller's remark - records the QPU runway committed to this solution #. - """ - return {} - def _pause_driver(self, generation: int) -> None: """Tell the persistent driver to stop submitting NEW work (drain-idle). diff --git a/shared/miner_worker.py b/shared/miner_worker.py index f95041cd..198cb398 100644 --- a/shared/miner_worker.py +++ b/shared/miner_worker.py @@ -313,32 +313,10 @@ def _emit_preview(payload: Dict[str, Any]) -> None: def _emit_budget(stats: Dict[str, Any]) -> None: _emit("budget", stats) - # Write-once participation channel. The miner calls this exactly - # once per accepted dispatch (after its budget gate passes) with the - # solution number + backend-specific extras (QPU: budget_seconds). - # The controller dedups and submits the participation remark. - # Best-effort: a put failure must not break mining. - def _emit_participating( - solution_number: int, extra: Dict[str, Any], - ) -> None: - try: - resp_q.put( - { - "op": "participating", - "id": spec.get("id"), - "solution_number": solution_number, - "kind": spec.get("kind"), - **(extra or {}), - } - ) - except Exception as exc: # noqa: BLE001 — best-effort - logger.debug("participating put failed (ignored): %s", exc) - try: result = miner.mine_work_item( context, stop_event, preview_cb=_emit_preview, budget_cb=_emit_budget, - participating_cb=_emit_participating, ) except Exception as exc: logger.error( diff --git a/substrate/miner_controller.py b/substrate/miner_controller.py index 4abaf072..1964b7c7 100644 --- a/substrate/miner_controller.py +++ b/substrate/miner_controller.py @@ -64,7 +64,6 @@ from substrate.client import SubstrateClient from substrate.pool import ValidatorPool from substrate.pool_client import PoolClient -from substrate.remark import submit_remark from substrate.decay_timing import TimingTracker from substrate.difficulty_decay import EnergyCurve, build_decay_schedule from substrate.submitter import ( @@ -229,19 +228,6 @@ class _OperatorFailLoud(RuntimeError): # losing the immutable mapping for an in-flight one. _DISPATCH_CONTEXT_RETENTION = 4 -# Upper bound on the write-once participation dedup map. Generous because -# solution numbers are monotonic and eviction is oldest-first, so a still-active -# solution is never dropped (which would let its remark re-fire). -_PARTICIPATION_RETENTION = 2048 - -# Participation remark retry budget. The node submits one System.remark per -# solution # from the same signer account the win submission uses, so a remark -# can lose a nonce race and be rejected with ``1010 Transaction is outdated``; -# each retry re-composes (reading a fresh nonce) before giving up. ``RETRIES`` -# is the number of *additional* attempts after the first (total ≤ RETRIES + 1). -_PARTICIPATION_REMARK_RETRIES = 3 -_PARTICIPATION_REMARK_BACKOFF_S = 0.25 - # How far ahead (in blocks) the anticipatory predictor looks for the # decay block at which a previewed candidate clears. The energy threshold @@ -530,12 +516,6 @@ def __init__( # ``{"op": "budget"}`` pushes). Surfaced in the telemetry snapshot so # operators can see live daily-budget usage; never drives submission. self._latest_budget: dict[str, Any] = {} - # Write-once participation dedup: solution #s the node has already - # published a System.remark for. Node-level (one remark per solution#), - # keyed by solution# alone — not by per-instance miner id. Insertion- - # ordered so the retention bound evicts the OLDEST entry, never an - # arbitrary still-active solution (which would re-fire its remark). - self._participated: "OrderedDict[int, None]" = OrderedDict() # Anticipatory-submission state (Task 6b). # ``_pow_constants`` caches the four decay constants # (epoch_length + curve c-triple) for the session — they only @@ -1803,120 +1783,6 @@ def _sum_qpu_access_us(self, solution_number: Optional[int]) -> Optional[int]: logger.debug("qpu spend sum failed (ignored): %s", exc) return None - def _mark_participating(self, msg: dict) -> None: - """Submit a write-once participation remark for this node + solution #. - - Node-level: dedups on ``solution_number`` alone, so the node publishes - at most one marker per solution # however many miner instances report - it. The marker identifies the node (our on-chain signer account), not - the per-instance worker — one remark per solution avoids N miner - instances racing the signer nonce (the ``1010 Transaction is outdated`` - rejections). Spawns a best-effort, supervised task to submit the remark - (never blocks the drain loop; participation is observability, not - consensus). - """ - try: - solution_number = int(msg.get("solution_number", 0)) - except (TypeError, ValueError): - return - if solution_number in self._participated: - return - # Resolve the node identity BEFORE recording dedup or spawning the task. - # This runs unguarded on the drain loop, so a signer failure must not - # propagate (the loop's broad except would shut the controller down — - # an observability failure crashing mining); and pre-marking the - # solution done before a transient failure would permanently suppress - # its remark. Resolve first, bail cleanly, mark done only on success. - try: - miner = self.signer.ss58_address() - except Exception as exc: # noqa: BLE001 — observability path - logger.warning( - "participation remark skipped for solution %s: signer address " - "unavailable (%s: %s); mining continues", - solution_number, type(exc).__name__, exc, - ) - return - self._participated[solution_number] = None - while len(self._participated) > _PARTICIPATION_RETENTION: - self._participated.popitem(last=False) # evict oldest - - payload: dict[str, Any] = { - "schema": "quip-participation", - "solution": solution_number, - "miner": miner, - "kind": msg.get("kind"), - } - if "budget_seconds" in msg: - payload["budget_seconds"] = msg["budget_seconds"] - asyncio.create_task( - supervise( - self._submit_participation_remark(payload), - name=f"participate-{solution_number}", - on_failure=lambda: None, - ), - name=f"participate-{solution_number}", - ) - - async def _submit_participation_remark( - self, - payload: dict, - *, - sleeper: Optional[Callable[[float], Awaitable[None]]] = None, - ) -> None: - """Submit one participation remark (best-effort, retried, never raises). - - Prefers ``System.remark_with_event`` (observable in block events), - falling back to plain ``System.remark`` — the same pattern as the - auto-identify flow. Submission goes through the parent ``build_client``, - the same signer account as the win submission, so a remark can lose a - nonce race and be rejected with ``1010 Transaction is outdated``. Each - attempt re-composes via :func:`submit_remark`, reading a fresh nonce, so - a stale-nonce rejection clears on retry. Retries transient submit - exceptions up to ``_PARTICIPATION_REMARK_RETRIES`` times with linear - backoff, then logs and swallows the final failure so mining continues. - ``sleeper`` is injected so tests run with zero real delay (defaults to - :func:`asyncio.sleep`). - """ - if self.build_client is None: - return - sleep = sleeper if sleeper is not None else asyncio.sleep - body = json.dumps(payload, sort_keys=True, separators=(",", ":")).encode() - total_attempts = _PARTICIPATION_REMARK_RETRIES + 1 - for attempt in range(1, total_attempts + 1): - try: - receipt, _call_function = await submit_remark( - self.build_client, self.signer, body, - ) - except Exception as exc: # noqa: BLE001 — observability path; retry then swallow - if attempt < total_attempts: - logger.debug( - "participation remark transient failure for %s on " - "attempt %d/%d (%s: %s); retrying", - payload.get("miner"), attempt, total_attempts, - type(exc).__name__, exc, - ) - await sleep(_PARTICIPATION_REMARK_BACKOFF_S * attempt) - continue - logger.warning( - "participation remark failed for %s after %d attempts " - "(%s: %s); mining continues", - payload.get("miner"), total_attempts, - type(exc).__name__, exc, - ) - return - if receipt.error: - logger.warning( - "participation remark rejected for %s (%s); mining continues", - payload.get("miner"), receipt.error, - ) - return - logger.info( - "participation remark submitted: miner=%s solution=%s budget=%s", - payload.get("miner"), payload.get("solution"), - payload.get("budget_seconds"), - ) - return - def _store_preview(self, handle: MinerHandle, msg: dict) -> None: """Stash a worker best-candidate preview keyed by work key. @@ -2606,11 +2472,6 @@ async def _drain_handle_loop( # latest per-miner stats so the telemetry snapshot can surface # live usage; never blocks, never submits. self._store_budget(handle, msg) - elif op == "participating": - # Write-once participation marker for a solution #. Node-level - # (deduped per solution#, identified by our signer account, not - # this worker) + best-effort System.remark; never blocks drain. - self._mark_participating(msg) elif op == "stats": # Stats responses are pulled directly by callers of # handle.get_stats(); if one lands here it just means diff --git a/substrate/remark.py b/substrate/remark.py index 23d68601..1747a50c 100644 --- a/substrate/remark.py +++ b/substrate/remark.py @@ -9,9 +9,9 @@ time; when that happens the eventful variant's failure is reported via ``on_fallback`` and a plain ``System.remark`` is submitted instead. -Three call sites share this dance — the descriptor remark and the ``identify`` -command in ``quip_cli``, and the controller's participation remark — so the -chain semantics live here rather than being copy-pasted into each. +Two call sites share this dance — the descriptor remark and the ``identify`` +command in ``quip_cli`` — so the chain semantics live here rather than being +copy-pasted into each. """ from __future__ import annotations diff --git a/tests/test_base_miner_pump.py b/tests/test_base_miner_pump.py index 2016207c..a988dc0b 100644 --- a/tests/test_base_miner_pump.py +++ b/tests/test_base_miner_pump.py @@ -381,60 +381,6 @@ def _run(): miner._close_driver() -# ---------------------------------------------------------------------- -# Participation marker: write-once per accepted dispatch -# ---------------------------------------------------------------------- - - -class _AbortMiner(_DriverMiner): - """Driver miner whose budget gate aborts the dispatch (no mining).""" - - def _pre_mine_setup(self, *a, **k) -> bool: - return False - - -def test_participating_cb_fires_once_on_accepted_dispatch(): - import threading - import time as _t - - ctx = _streaming_context() - miner = _DriverMiner(factory=_FAKE_CTX) # infinite stream - calls: list = [] - stop = mp.Event() - try: - t = threading.Thread( - target=lambda: miner.mine_work_item( - ctx, stop, participating_cb=lambda n, e: calls.append((n, e)), - ) - ) - t.start() - _t.sleep(0.4) - stop.set() - t.join(timeout=15.0) - assert not t.is_alive() - assert len(calls) == 1, f"expected one participation emit, got {calls}" - solution_number, extra = calls[0] - assert isinstance(solution_number, int) - # _DriverMiner has no time_manager -> base _participation_extra -> {}. - assert extra == {} - finally: - miner._close_driver() - - -def test_participating_cb_not_fired_when_setup_aborts(): - ctx = _streaming_context() - miner = _AbortMiner(factory=_FAKE_CTX) - calls: list = [] - stop = mp.Event() - # _pre_mine_setup returns False => _setup_dispatch returns None => the - # participation emit is never reached. Returns promptly (no driver loop). - result = miner.mine_work_item( - ctx, stop, participating_cb=lambda n, e: calls.append((n, e)), - ) - assert result is None - assert calls == [] - - # ---------------------------------------------------------------------- # Headline regression: lookahead -> decay -> aggressive submit (end-to-end) # ---------------------------------------------------------------------- diff --git a/tests/test_dwave_stream_stop.py b/tests/test_dwave_stream_stop.py index 08f2df5e..9f2e338b 100644 --- a/tests/test_dwave_stream_stop.py +++ b/tests/test_dwave_stream_stop.py @@ -1,4 +1,4 @@ -"""Unit tests for DWaveMiner budget-gate and participation helpers. +"""Unit tests for DWaveMiner budget-gate helpers. The legacy DWaveMiner.sample_ising_streaming (diagnostic path) was removed when QPU switched to the generic StreamContext + DWaveSamplerWrapper. @@ -53,17 +53,6 @@ def test_midstream_budget_ok_returns_none_without_time_manager(): assert miner._midstream_budget_ok(solution_number=1) is None -def test_participation_extra_carries_pool_budget(): - miner = _dwave_with_time_manager(_FakeTimeManager(should_mine=True)) - extra = miner._participation_extra() - assert extra == {"budget_seconds": 5.0} # pool_seconds from the stub - - -def test_participation_extra_empty_without_time_manager(): - miner = _dwave_with_time_manager(None) - assert miner._participation_extra() == {} - - def test_midstream_budget_ok_passes_when_budget_available(): miner = _dwave_with_time_manager(_FakeTimeManager(should_mine=True)) status = miner._midstream_budget_ok(solution_number=1) diff --git a/tests/test_substrate_miner_controller.py b/tests/test_substrate_miner_controller.py index 30565c88..4efc4125 100644 --- a/tests/test_substrate_miner_controller.py +++ b/tests/test_substrate_miner_controller.py @@ -170,7 +170,6 @@ def _bare_controller() -> SubstrateMinerController: # Anticipatory-submission state (Task 6b). c._latest_preview = {} c._latest_budget = {} - c._participated = OrderedDict() c._pow_constants = None c._base_difficulty_by_key = {} c._decay_schedule_by_key = {} # WorkKey -> (schedule, last_proof_block, epoch_length) @@ -1969,211 +1968,6 @@ def test_evict_resets_fire_status_key(): assert controller._last_fire_status_key is None -# ---------------------------------------------------------------------- -# Participation marker (write-once System.remark per solution#, node-level) -# ---------------------------------------------------------------------- - - -async def test_participation_remark_submits_with_payload(): - controller = _bare_controller() - controller.build_client.has_call = AsyncMock(return_value=True) - controller.build_client.submit_extrinsic = AsyncMock( - return_value=MagicMock(error=None) - ) - await controller._submit_participation_remark( - {"schema": "quip-participation", "solution": 7, "miner": "qpu-0", - "kind": "qpu", "budget_seconds": 90.0} - ) - controller.build_client.submit_extrinsic.assert_awaited_once() - args, kwargs = controller.build_client.submit_extrinsic.call_args - assert args[0] == "System" - assert args[1] == "remark_with_event" - body = args[2]["remark"] - payload = json.loads(body) - assert payload["schema"] == "quip-participation" - assert payload["solution"] == 7 - assert payload["miner"] == "qpu-0" - assert payload["kind"] == "qpu" - assert payload["budget_seconds"] == 90.0 - - -async def test_participation_remark_falls_back_to_plain_remark(): - controller = _bare_controller() - controller.build_client.has_call = AsyncMock(return_value=True) - # remark_with_event raises; plain remark succeeds. - controller.build_client.submit_extrinsic = AsyncMock( - side_effect=[RuntimeError("no event variant"), MagicMock(error=None)] - ) - await controller._submit_participation_remark( - {"schema": "quip-participation", "solution": 1, "miner": "cpu-0", - "kind": "cpu"} - ) - assert controller.build_client.submit_extrinsic.await_count == 2 - second_call = controller.build_client.submit_extrinsic.call_args_list[1] - assert second_call.args[1] == "remark" - - -async def test_participation_remark_retries_transient_then_succeeds(): - controller = _bare_controller() - controller.build_client.has_call = AsyncMock(return_value=False) - # First attempt fails with a stale-nonce "outdated" error (the reported - # 1010 Invalid Transaction); the retry re-composes a fresh nonce and lands. - controller.build_client.submit_extrinsic = AsyncMock( - side_effect=[ - RuntimeError("1010 Invalid Transaction: Transaction is outdated"), - MagicMock(error=None), - ] - ) - slept: list[float] = [] - - async def _record_sleep(seconds: float) -> None: - slept.append(seconds) - - await controller._submit_participation_remark( - {"schema": "quip-participation", "solution": 3, "miner": "5Test", - "kind": "cpu"}, - sleeper=_record_sleep, - ) - assert controller.build_client.submit_extrinsic.await_count == 2 - assert len(slept) == 1 # one backoff between the two attempts - - -async def test_participation_remark_swallows_persistent_failure(): - from substrate.miner_controller import _PARTICIPATION_REMARK_RETRIES - - controller = _bare_controller() - controller.build_client.has_call = AsyncMock(return_value=False) - controller.build_client.submit_extrinsic = AsyncMock( - side_effect=RuntimeError("rpc down") - ) - - async def _no_sleep(_seconds: float) -> None: - return None - - # Must not raise — retries the bounded number of times, then gives up. - await controller._submit_participation_remark( - {"schema": "quip-participation", "solution": 2, "miner": "5Test", - "kind": "qpu"}, - sleeper=_no_sleep, - ) - assert ( - controller.build_client.submit_extrinsic.await_count - == _PARTICIPATION_REMARK_RETRIES + 1 - ) - - -async def test_mark_participating_dedups_per_solution_across_instances(): - controller = _bare_controller() - controller._submit_participation_remark = AsyncMock(return_value=None) - # Two different miner instances report the SAME solution # → exactly one - # node-level remark (deduped on solution#, not on the per-instance worker). - controller._mark_participating( - {"solution_number": 5, "kind": "qpu", "budget_seconds": 90.0} - ) - controller._mark_participating({"solution_number": 5, "kind": "cpu"}) - await asyncio.sleep(0) # let the spawned task run - controller._submit_participation_remark.assert_awaited_once() - payload = controller._submit_participation_remark.call_args.args[0] - assert payload == { - "schema": "quip-participation", "solution": 5, "miner": "5Test", - "kind": "qpu", "budget_seconds": 90.0, - } - # A different solution # fires again. - controller._mark_participating({"solution_number": 6, "kind": "qpu"}) - await asyncio.sleep(0) - assert controller._submit_participation_remark.await_count == 2 - - -async def test_mark_participating_uses_node_id_and_omits_budget(): - controller = _bare_controller() - controller._submit_participation_remark = AsyncMock(return_value=None) - controller._mark_participating({"solution_number": 9, "kind": "cpu"}) - await asyncio.sleep(0) - payload = controller._submit_participation_remark.call_args.args[0] - assert "budget_seconds" not in payload - assert payload["kind"] == "cpu" - # Node identity (signer ss58), not the per-instance worker id. - assert payload["miner"] == "5Test" - - -async def test_mark_participating_skips_when_signer_address_unavailable(): - controller = _bare_controller() - controller._submit_participation_remark = AsyncMock(return_value=None) - # _mark_participating runs unguarded on the drain loop; a signer failure - # must NOT propagate (the drain loop's broad except would shut the - # controller down — an observability failure crashing mining). - controller.signer.ss58_address.side_effect = RuntimeError("keystore locked") - controller._mark_participating({"solution_number": 11, "kind": "cpu"}) - await asyncio.sleep(0) - controller._submit_participation_remark.assert_not_awaited() - # The solution must NOT be pre-marked done, so a recovered later report - # still fires (no permanent suppression from a transient signer failure). - controller.signer.ss58_address.side_effect = None - controller.signer.ss58_address.return_value = "5Test" - controller._mark_participating({"solution_number": 11, "kind": "cpu"}) - await asyncio.sleep(0) - controller._submit_participation_remark.assert_awaited_once() - - -async def test_mark_participating_evicts_oldest_deterministically(monkeypatch): - import substrate.miner_controller as mc - monkeypatch.setattr(mc, "_PARTICIPATION_RETENTION", 3) - controller = _bare_controller() - controller._submit_participation_remark = AsyncMock(return_value=None) - # Add 4 with retention 3 → the OLDEST (1) is evicted, never an arbitrary - # (possibly still-active) entry that would re-fire its remark. - for sol in (1, 2, 3, 4): - controller._mark_participating({"solution_number": sol, "kind": "cpu"}) - await asyncio.sleep(0) - assert list(controller._participated.keys()) == [2, 3, 4] - - -async def test_participation_remark_receipt_error_is_terminal(): - controller = _bare_controller() - controller.build_client.has_call = AsyncMock(return_value=False) - controller.build_client.submit_extrinsic = AsyncMock( - return_value=MagicMock(error="System.ExtrinsicFailed") - ) - - async def _no_sleep(_seconds: float) -> None: - return None - - # An included-but-rejected remark won't be fixed by resubmitting the same - # body — it's terminal, not retried (no pointless resubmission storm). - await controller._submit_participation_remark( - {"schema": "quip-participation", "solution": 4, "miner": "5Test", - "kind": "cpu"}, - sleeper=_no_sleep, - ) - assert controller.build_client.submit_extrinsic.await_count == 1 - - -async def test_participation_remark_retries_when_fallback_path_also_fails(): - controller = _bare_controller() - controller.build_client.has_call = AsyncMock(return_value=True) - # Attempt 1: remark_with_event raises (caught inside submit_remark → - # fallback), then plain remark ALSO raises and ESCAPES submit_remark — this - # is the literal 1010-outdated nonce race the retry loop exists for. Attempt - # 2: the recomposed (fresh-nonce) plain remark lands. - controller.build_client.submit_extrinsic = AsyncMock(side_effect=[ - RuntimeError("no event variant"), # a1 remark_with_event - RuntimeError("1010 Transaction is outdated"), # a1 plain (escapes) - RuntimeError("no event variant"), # a2 remark_with_event - MagicMock(error=None), # a2 plain — lands - ]) - - async def _no_sleep(_seconds: float) -> None: - return None - - await controller._submit_participation_remark( - {"schema": "quip-participation", "solution": 8, "miner": "5Test", - "kind": "qpu"}, - sleeper=_no_sleep, - ) - assert controller.build_client.submit_extrinsic.await_count == 4 - - - # ---------------------------------------------------------------------- # Precise per-solution QPU spend at win (summed from the attempt log) # ---------------------------------------------------------------------- From b79f83b53d4fa2d0da86a327d24d9d91e808e4ca Mon Sep 17 00:00:00 2001 From: "Richard T. Carback III" Date: Wed, 24 Jun 2026 11:19:17 -0400 Subject: [PATCH 4/5] test: skip live controller tests on pre-MR!42 runtime test_controller_submits_proof_end_to_end and test_controller_long_haul_multi_block drive _maybe_seed_chain, which reads/writes difficulty through the per-topology QuantumPow.Difficulties map (quip-protocol-rs MR !42). Against an older runtime that path raised StorageFunctionNotFound deep in bootstrap, hard-failing instead of skipping. Add a metadata probe (_chain_has_per_topology_difficulty) and gate both tests on it, mirroring the existing _chain_requires_hybrid_signer pattern, so they skip cleanly until the runtime is deployed. --- tests/test_substrate_miner_controller.py | 37 ++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/tests/test_substrate_miner_controller.py b/tests/test_substrate_miner_controller.py index 4efc4125..f2dea534 100644 --- a/tests/test_substrate_miner_controller.py +++ b/tests/test_substrate_miner_controller.py @@ -639,6 +639,35 @@ def _chain_requires_hybrid_signer(url: str) -> bool: return False +def _chain_has_per_topology_difficulty(url: str) -> bool: + """True if the chain's runtime exposes per-topology difficulty. + + `quip-protocol-rs` MR !42 replaced the global `QuantumPow.Difficulty` + value with a per-topology `QuantumPow.Difficulties` storage map. The live + controller tests drive `_maybe_seed_chain`, which queries/sets difficulty + through that map; against an older runtime the seed path raises + `StorageFunctionNotFound` deep in bootstrap. Probe the metadata so those + tests skip cleanly (rather than hard-fail) until the runtime is deployed. + """ + if not _chain_reachable(url): + return False + try: + from substrateinterface import SubstrateInterface + si = SubstrateInterface(url=url) + pallet = si.get_metadata().get_metadata_pallet("QuantumPow") + return ( + pallet is not None + and pallet.get_storage_function("Difficulties") is not None + ) + except Exception: + return False + + +# Evaluated once at collection time; the live controller tests below gate on +# it so they skip cleanly against a pre-MR!42 runtime. +_CHAIN_HAS_PER_TOPOLOGY_DIFFICULTY = _chain_has_per_topology_difficulty(DEFAULT_URL) + + @asynccontextmanager async def _live_controller( tmp_path: Path, @@ -794,6 +823,10 @@ async def _live_controller( not _chain_reachable(DEFAULT_URL), reason=f"substrate chain not reachable at {DEFAULT_URL}", ) +@pytest.mark.skipif( + not _CHAIN_HAS_PER_TOPOLOGY_DIFFICULTY, + reason="chain runtime lacks per-topology QuantumPow.Difficulties (pre-MR!42)", +) @pytest.mark.timeout(180) async def test_controller_submits_proof_end_to_end(tmp_path): """Smoke test: spin up a controller against the live chain, mine one proof. @@ -826,6 +859,10 @@ async def on_proof(receipt, ctx): not _chain_reachable(DEFAULT_URL), reason=f"substrate chain not reachable at {DEFAULT_URL}", ) +@pytest.mark.skipif( + not _CHAIN_HAS_PER_TOPOLOGY_DIFFICULTY, + reason="chain runtime lacks per-topology QuantumPow.Difficulties (pre-MR!42)", +) @pytest.mark.timeout(360) async def test_controller_long_haul_multi_block(tmp_path): """Phase 6 verification: sustain mining across multiple head changes. From d4df4b5f5393db1d74572cabf90bdb7979325af0 Mon Sep 17 00:00:00 2001 From: "Richard T. Carback III" Date: Wed, 24 Jun 2026 13:06:22 -0400 Subject: [PATCH 5/5] test: gate live client difficulty/snapshot tests on per-topology runtime test_mining_snapshot_either_returns_or_none and test_query_difficulty_either_returns_or_none read difficulty/snapshot through the per-topology QuantumPow.Difficulties map (rc4 / MR !42). Against a pre-MR!42 runtime they raised StorageFunctionNotFound instead of skipping. Add the same _chain_has_per_topology_difficulty metadata probe used by the controller tests and gate both on it. --- tests/test_substrate_client.py | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/tests/test_substrate_client.py b/tests/test_substrate_client.py index f15bfeaf..0d23cbde 100644 --- a/tests/test_substrate_client.py +++ b/tests/test_substrate_client.py @@ -37,11 +37,41 @@ def _chain_reachable(url: str) -> bool: return False +def _chain_has_per_topology_difficulty(url: str) -> bool: + """True if the chain's runtime exposes per-topology difficulty. + + `quip-protocol-rs` MR !42 (rc4) replaced the global `QuantumPow.Difficulty` + value with a per-topology `QuantumPow.Difficulties` storage map. Tests that + read difficulty/snapshot through that map raise `StorageFunctionNotFound` + against an older runtime, so probe the metadata and skip cleanly until the + new runtime is deployed. + """ + if not _chain_reachable(url): + return False + try: + from substrateinterface import SubstrateInterface + si = SubstrateInterface(url=url) + pallet = si.get_metadata().get_metadata_pallet("QuantumPow") + return ( + pallet is not None + and pallet.get_storage_function("Difficulties") is not None + ) + except Exception: + return False + + pytestmark = pytest.mark.skipif( not _chain_reachable(DEFAULT_URL), reason=f"substrate chain not reachable at {DEFAULT_URL}", ) +# Evaluated once; tests that need the per-topology difficulty runtime gate on it. +_CHAIN_HAS_PER_TOPOLOGY_DIFFICULTY = _chain_has_per_topology_difficulty(DEFAULT_URL) +_skip_pre_mr42 = pytest.mark.skipif( + not _CHAIN_HAS_PER_TOPOLOGY_DIFFICULTY, + reason="chain runtime lacks per-topology QuantumPow.Difficulties (pre-MR!42)", +) + @pytest.fixture async def client(): @@ -78,6 +108,7 @@ async def test_get_finalized_head(client): assert nf <= nh, "finalized head should not exceed best head" +@_skip_pre_mr42 async def test_mining_snapshot_either_returns_or_none(client): head = await client.get_head() alice = Sr25519Signer.from_uri("//Alice") @@ -116,6 +147,7 @@ async def test_query_miner_unregistered_account(client): assert miner_info is None +@_skip_pre_mr42 async def test_query_difficulty_either_returns_or_none(client): """`query_difficulty` reads ``QuantumPow.Difficulties[DefaultTopology]``.