From d7aecb074f9cf878f7dfd42ef5136380427f26bb Mon Sep 17 00:00:00 2001 From: marcosf2 Date: Tue, 2 Sep 2025 09:59:24 -0500 Subject: [PATCH 01/23] First version of working websockets --- src/pqnstack/app/api/main.py | 4 ++++ src/pqnstack/app/api/routes/handshake.py | 15 +++++++++++++++ src/pqnstack/app/api/routes/websocket.py | 24 ++++++++++++++++++++++++ src/pqnstack/app/core/config.py | 3 +++ 4 files changed, 46 insertions(+) create mode 100644 src/pqnstack/app/api/routes/handshake.py create mode 100644 src/pqnstack/app/api/routes/websocket.py diff --git a/src/pqnstack/app/api/main.py b/src/pqnstack/app/api/main.py index d50d7c12..a36b74de 100644 --- a/src/pqnstack/app/api/main.py +++ b/src/pqnstack/app/api/main.py @@ -5,6 +5,8 @@ from pqnstack.app.api.routes import rng from pqnstack.app.api.routes import serial from pqnstack.app.api.routes import timetagger +from pqnstack.app.api.routes import websocket +from pqnstack.app.api.routes import handshake api_router = APIRouter() api_router.include_router(chsh.router) @@ -12,3 +14,5 @@ api_router.include_router(timetagger.router) api_router.include_router(rng.router) api_router.include_router(serial.router) +api_router.include_router(websocket.router) +api_router.include_router(handshake.router) \ No newline at end of file diff --git a/src/pqnstack/app/api/routes/handshake.py b/src/pqnstack/app/api/routes/handshake.py new file mode 100644 index 00000000..72fbac6b --- /dev/null +++ b/src/pqnstack/app/api/routes/handshake.py @@ -0,0 +1,15 @@ +from fastapi import APIRouter + +from pqnstack.app.core.config import state, state_change_event + +router = APIRouter() + + + +@router.get("/handshake") +async def handshake() -> bool: + state.incoming = True + + state_change_event.set() + return True + diff --git a/src/pqnstack/app/api/routes/websocket.py b/src/pqnstack/app/api/routes/websocket.py new file mode 100644 index 00000000..e8e8df5b --- /dev/null +++ b/src/pqnstack/app/api/routes/websocket.py @@ -0,0 +1,24 @@ +from fastapi import APIRouter, WebSocket, WebSocketDisconnect + +router = APIRouter() + +@router.websocket("/ws") +async def websocket_endpoint(websocket: WebSocket): + """ + A simple websocket endpoint that accepts a connection, receives messages, + and sends a response back. + """ + from pqnstack.app.core.config import state, state_change_event + await websocket.accept() + await websocket.send_text("HELLO HELLO HELLO") + try: + while True: + await state_change_event.wait() # Wait for a state change event + if state.incoming: + await websocket.send_text("PING PING PING") + state.incoming = False + # To prevent sending this message in a loop, you might want to reset the state: + + state_change_event.clear() # Reset the event for the next change + except WebSocketDisconnect: + print("Client disconnected") diff --git a/src/pqnstack/app/core/config.py b/src/pqnstack/app/core/config.py index 1999eac7..6b341e0a 100644 --- a/src/pqnstack/app/core/config.py +++ b/src/pqnstack/app/core/config.py @@ -1,5 +1,6 @@ import logging from functools import lru_cache +import asyncio from pydantic import BaseModel from pydantic import Field @@ -73,6 +74,7 @@ def get_settings() -> Settings: class NodeState(BaseModel): + incoming: bool = False chsh_request_basis: list[float] = [22.5, 67.5] # FIXME: Use enums for this qkd_basis_list: list[QKDEncodingBasis] = [ @@ -95,3 +97,4 @@ class NodeState(BaseModel): state = NodeState() +state_change_event = asyncio.Event() From 2525062add26dde4103e0a50e41ba0efcd19c098 Mon Sep 17 00:00:00 2001 From: marcosf2 Date: Thu, 4 Sep 2025 15:30:54 -0500 Subject: [PATCH 02/23] Implements basic state handling, as well as client disconnects detection --- configs/config_app_example.toml | 2 + src/pqnstack/app/api/main.py | 8 +- src/pqnstack/app/api/routes/coordination.py | 109 ++++++++++++++++++++ src/pqnstack/app/api/routes/debug.py | 11 ++ src/pqnstack/app/api/routes/handshake.py | 15 --- src/pqnstack/app/api/routes/websocket.py | 24 ----- src/pqnstack/app/core/config.py | 21 +++- 7 files changed, 146 insertions(+), 44 deletions(-) create mode 100644 src/pqnstack/app/api/routes/coordination.py create mode 100644 src/pqnstack/app/api/routes/debug.py delete mode 100644 src/pqnstack/app/api/routes/handshake.py delete mode 100644 src/pqnstack/app/api/routes/websocket.py diff --git a/configs/config_app_example.toml b/configs/config_app_example.toml index 81c4a476..84c8a813 100644 --- a/configs/config_app_example.toml +++ b/configs/config_app_example.toml @@ -1,5 +1,7 @@ # MAKE SURE TO RENAME THIS FILE TO config.toml AND PLACE IT IN THE ROOT OF THE PROJECT +node_name = "example_node" + # Router configuration router_name = "router1" router_address = "xx.xx.xx.xx" # Replace with actual IP address diff --git a/src/pqnstack/app/api/main.py b/src/pqnstack/app/api/main.py index a36b74de..fe65fa93 100644 --- a/src/pqnstack/app/api/main.py +++ b/src/pqnstack/app/api/main.py @@ -5,8 +5,8 @@ from pqnstack.app.api.routes import rng from pqnstack.app.api.routes import serial from pqnstack.app.api.routes import timetagger -from pqnstack.app.api.routes import websocket -from pqnstack.app.api.routes import handshake +from pqnstack.app.api.routes import coordination +from pqnstack.app.api.routes import debug api_router = APIRouter() api_router.include_router(chsh.router) @@ -14,5 +14,5 @@ api_router.include_router(timetagger.router) api_router.include_router(rng.router) api_router.include_router(serial.router) -api_router.include_router(websocket.router) -api_router.include_router(handshake.router) \ No newline at end of file +api_router.include_router(coordination.router) +api_router.include_router(debug.router) \ No newline at end of file diff --git a/src/pqnstack/app/api/routes/coordination.py b/src/pqnstack/app/api/routes/coordination.py new file mode 100644 index 00000000..cb040382 --- /dev/null +++ b/src/pqnstack/app/api/routes/coordination.py @@ -0,0 +1,109 @@ +import logging +import asyncio + +from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Request, HTTPException, status + +from pqnstack.app.api.deps import ClientDep +from pqnstack.app.core.config import state, state_change_event, settings, user_replied_event + +logger = logging.getLogger(__name__) + + +router = APIRouter(prefix="/coordination", tags=["coordination"]) + + +@router.post("/collect_follower") +async def collect_follower(address: str, http_client: ClientDep) -> bool: + + logger.info("Requesting client at %s to follow", address) + + ret = await http_client.post(f"http://{address}/coordination/follow_requested?leaders_name={settings.node_name}") + if ret.status_code != 200: + raise HTTPException(status_code=ret.status_code, detail=ret.text) + + + if ret.json() is True: + state.leading = True + state.followers_address = address + logger.info("Successfully collected follower") + return True + if ret.json() is False: + logger.info("Follower rejected follow request") + return False + + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR , detail="Could not collect follower for unknown reasons") + + +@router.post('/follow_requested') +async def follow_requested(request: Request, leaders_name: str) -> bool: + + logger.info("Requesting client at %s to follow", leaders_name) + print("state.client_listening", state.client_listening_for_follower_requests) + print("state.following", state.following) + # Check if the client is ready to accept a follower request and that node is not already following someone. + if state.client_listening_for_follower_requests and not state.following: + + # Load the state with the incoming info of the request + state.leaders_address = request.client.host + state.leaders_name = leaders_name + state.following_requested = True + # Trigger the state change to get the websocket to send question to user + state_change_event.set() + + logger.info("Asking user to accept follow request from %s (%s)", leaders_name, request.client.host) + await user_replied_event.wait() # Wait for a state change event to see if user accepted + user_replied_event.clear() # Reset the event for the next change + if state.following_requested_user_response: + logger.info(f"Follow request from {request.client.host} accepted.") + state.client_listening_for_follower_requests = True + state.following = True + state.leaders_address = request.client.host + state_change_event.set() + return True + + logger.info(f"Follow request from {request.client.host} rejected.") + # Clean up the state if user rejected + state.leaders_address = None + state.leaders_name = None + state.following_requested = False + + else: + logger.info("Request rejected because %s", ("client is not listening for requests" if state.client_listening_for_follower_requests else "this node is already following someone")) + + return False + + +@router.websocket("/ws") +async def websocket_endpoint(websocket: WebSocket): + """ + A simple websocket endpoint that accepts a connection, receives messages, + and sends a response back. + """ + from pqnstack.app.core.config import state, state_change_event + await websocket.accept() + logger.info("Client connected to websocket for multiplayer coordination.") + state.client_listening_for_follower_requests = True + + async def state_change_handler(): + while True: + await state_change_event.wait() # Wait for a state change event + if state.following_requested: + logger.debug("Websocket detected a follow request, asking user for response.") + await websocket.send_text(f"Do you want to accept a connection from {state.leaders_name} ({state.leaders_address})?") + state_change_event.clear() # Reset the event for the next change + + state_change_task = asyncio.create_task(state_change_handler()) + + try: + while True: + response = await websocket.receive_text() + state.following_requested_user_response = response.lower() in ["true", "yes", "y"] + state.following_requested = False + logger.debug("Websocket received a response from user: %s", state.following_requested_user_response) + user_replied_event.set() + + except WebSocketDisconnect: + logger.info("Client disconnected from websocket for multiplayer coordination.") + state.client_listening_for_follower_requests = False + finally: + state_change_task.cancel() \ No newline at end of file diff --git a/src/pqnstack/app/api/routes/debug.py b/src/pqnstack/app/api/routes/debug.py new file mode 100644 index 00000000..0250aa90 --- /dev/null +++ b/src/pqnstack/app/api/routes/debug.py @@ -0,0 +1,11 @@ +from fastapi import APIRouter + +from pqnstack.app.core.config import state + +router = APIRouter(prefix='/debug', tags=['debug']) + + +@router.get('/state') +async def get_state(): + return state + diff --git a/src/pqnstack/app/api/routes/handshake.py b/src/pqnstack/app/api/routes/handshake.py deleted file mode 100644 index 72fbac6b..00000000 --- a/src/pqnstack/app/api/routes/handshake.py +++ /dev/null @@ -1,15 +0,0 @@ -from fastapi import APIRouter - -from pqnstack.app.core.config import state, state_change_event - -router = APIRouter() - - - -@router.get("/handshake") -async def handshake() -> bool: - state.incoming = True - - state_change_event.set() - return True - diff --git a/src/pqnstack/app/api/routes/websocket.py b/src/pqnstack/app/api/routes/websocket.py deleted file mode 100644 index e8e8df5b..00000000 --- a/src/pqnstack/app/api/routes/websocket.py +++ /dev/null @@ -1,24 +0,0 @@ -from fastapi import APIRouter, WebSocket, WebSocketDisconnect - -router = APIRouter() - -@router.websocket("/ws") -async def websocket_endpoint(websocket: WebSocket): - """ - A simple websocket endpoint that accepts a connection, receives messages, - and sends a response back. - """ - from pqnstack.app.core.config import state, state_change_event - await websocket.accept() - await websocket.send_text("HELLO HELLO HELLO") - try: - while True: - await state_change_event.wait() # Wait for a state change event - if state.incoming: - await websocket.send_text("PING PING PING") - state.incoming = False - # To prevent sending this message in a loop, you might want to reset the state: - - state_change_event.clear() # Reset the event for the next change - except WebSocketDisconnect: - print("Client disconnected") diff --git a/src/pqnstack/app/core/config.py b/src/pqnstack/app/core/config.py index 6b341e0a..d5913f99 100644 --- a/src/pqnstack/app/core/config.py +++ b/src/pqnstack/app/core/config.py @@ -33,6 +33,7 @@ class QKDSettings(BaseModel): class Settings(BaseSettings): + node_name: str = "node1" router_name: str = "router1" router_address: str = "localhost" router_port: int = 5555 @@ -74,7 +75,24 @@ def get_settings() -> Settings: class NodeState(BaseModel): - incoming: bool = False + + # FIXME: Make sure we are checking for the client_listening_for_follower_requests state everywhere. + client_listening_for_follower_requests: bool = False + + # Leader's state + leading: bool = False + followers_address: str | None = None + + + # Follower's state + following: bool = False + following_requested: bool = False + # User's response to the follow request. None if no response yet, True if accepted, False if rejected. + following_requested_user_response: bool | None = None + # The address of the leader this node is following. None if not following anyone. + leaders_address: str | None = None + leaders_name: str | None = None + chsh_request_basis: list[float] = [22.5, 67.5] # FIXME: Use enums for this qkd_basis_list: list[QKDEncodingBasis] = [ @@ -98,3 +116,4 @@ class NodeState(BaseModel): state = NodeState() state_change_event = asyncio.Event() +user_replied_event = asyncio.Event() From 9c53aafc0a86c114356ead3cbf36727a07638171 Mon Sep 17 00:00:00 2001 From: marcosf2 Date: Thu, 4 Sep 2025 18:00:31 -0500 Subject: [PATCH 03/23] cleans up code and deals with ruff and mypy --- src/pqnstack/app/api/routes/coordination.py | 137 +++++++++++++------- src/pqnstack/app/api/routes/debug.py | 8 +- src/pqnstack/app/core/config.py | 7 +- 3 files changed, 98 insertions(+), 54 deletions(-) diff --git a/src/pqnstack/app/api/routes/coordination.py b/src/pqnstack/app/api/routes/coordination.py index cb040382..93a533e5 100644 --- a/src/pqnstack/app/api/routes/coordination.py +++ b/src/pqnstack/app/api/routes/coordination.py @@ -1,10 +1,18 @@ -import logging import asyncio +import logging -from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Request, HTTPException, status +from fastapi import APIRouter +from fastapi import HTTPException +from fastapi import Request +from fastapi import WebSocket +from fastapi import WebSocketDisconnect +from fastapi import status from pqnstack.app.api.deps import ClientDep -from pqnstack.app.core.config import state, state_change_event, settings, user_replied_event +from pqnstack.app.core.config import ask_user_for_follow_event +from pqnstack.app.core.config import settings +from pqnstack.app.core.config import state +from pqnstack.app.core.config import user_replied_event logger = logging.getLogger(__name__) @@ -12,16 +20,34 @@ router = APIRouter(prefix="/coordination", tags=["coordination"]) +# TODO: Send a disconnection message if I was following someone. +@router.post("/reset_coordination_state") +async def reset_coordination_state() -> None: + """Reset the coordination state of the node.""" + state.leading = False + state.followers_address = None + state.following = False + state.following_requested = False + state.following_requested_user_response = None + state.leaders_address = None + state.leaders_name = None + + @router.post("/collect_follower") async def collect_follower(address: str, http_client: ClientDep) -> bool: + """ + Endpoint called by a leader node (this one) to request a follower node (other node) to follow it. + Returns + ------- + True if the follower accepted the request, False otherwise. + """ logger.info("Requesting client at %s to follow", address) ret = await http_client.post(f"http://{address}/coordination/follow_requested?leaders_name={settings.node_name}") - if ret.status_code != 200: + if ret.status_code != status.HTTP_200_OK: raise HTTPException(status_code=ret.status_code, detail=ret.text) - if ret.json() is True: state.leading = True state.followers_address = address @@ -31,79 +57,98 @@ async def collect_follower(address: str, http_client: ClientDep) -> bool: logger.info("Follower rejected follow request") return False - raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR , detail="Could not collect follower for unknown reasons") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Could not collect follower for unknown reasons" + ) -@router.post('/follow_requested') +@router.post("/follow_requested") async def follow_requested(request: Request, leaders_name: str) -> bool: + """ + Endpoint is called by a leader node (other node) to request this node to follow it. + + Returns + ------- + True if the follow request is accepted, False otherwise. + """ + logger.debug("Requesting client at %s to follow", leaders_name) + + if request.client is None: + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Request lacks the clients host") + leaders_address = request.client.host - logger.info("Requesting client at %s to follow", leaders_name) - print("state.client_listening", state.client_listening_for_follower_requests) - print("state.following", state.following) # Check if the client is ready to accept a follower request and that node is not already following someone. if state.client_listening_for_follower_requests and not state.following: - - # Load the state with the incoming info of the request - state.leaders_address = request.client.host - state.leaders_name = leaders_name state.following_requested = True # Trigger the state change to get the websocket to send question to user - state_change_event.set() + ask_user_for_follow_event.set() - logger.info("Asking user to accept follow request from %s (%s)", leaders_name, request.client.host) + logger.debug("Asking user to accept follow request from %s (%s)", leaders_name, leaders_address) await user_replied_event.wait() # Wait for a state change event to see if user accepted user_replied_event.clear() # Reset the event for the next change if state.following_requested_user_response: - logger.info(f"Follow request from {request.client.host} accepted.") - state.client_listening_for_follower_requests = True + logger.debug("Follow request from %s accepted.", leaders_address) state.following = True - state.leaders_address = request.client.host - state_change_event.set() + state.leaders_name = leaders_name + state.leaders_address = leaders_address + ask_user_for_follow_event.set() return True - logger.info(f"Follow request from {request.client.host} rejected.") + logger.debug("Follow request from %s rejected.", leaders_address) # Clean up the state if user rejected state.leaders_address = None state.leaders_name = None state.following_requested = False else: - logger.info("Request rejected because %s", ("client is not listening for requests" if state.client_listening_for_follower_requests else "this node is already following someone")) + logger.info( + "Request rejected because %s", + ( + "client is not listening for requests" + if state.client_listening_for_follower_requests + else "this node is already following someone" + ), + ) return False -@router.websocket("/ws") -async def websocket_endpoint(websocket: WebSocket): - """ - A simple websocket endpoint that accepts a connection, receives messages, - and sends a response back. - """ - from pqnstack.app.core.config import state, state_change_event +@router.websocket("/follow_requested_alerts") +async def follow_requested_alert(websocket: WebSocket) -> None: + """Websocket endpoint is used to alert the client when a follow request is received. It also handles the response from the client.""" await websocket.accept() logger.info("Client connected to websocket for multiplayer coordination.") state.client_listening_for_follower_requests = True - async def state_change_handler(): + async def ask_user_for_follow_handler() -> None: + """Task that waits for the ask_user_for_follow_event event and sends a message to the client if a follow request is detected.""" while True: - await state_change_event.wait() # Wait for a state change event + await ask_user_for_follow_event.wait() # Wait for a state change event if state.following_requested: logger.debug("Websocket detected a follow request, asking user for response.") - await websocket.send_text(f"Do you want to accept a connection from {state.leaders_name} ({state.leaders_address})?") - state_change_event.clear() # Reset the event for the next change - - state_change_task = asyncio.create_task(state_change_handler()) + await websocket.send_text( + f"Do you want to accept a connection from {state.leaders_name} ({state.leaders_address})?" + ) + ask_user_for_follow_event.clear() # Reset the event for the next change + + async def client_message_handler() -> None: + """Task that waits for a message from the client and handles the response. It also handles the case where the client disconnects.""" + try: + while True: + response = await websocket.receive_text() + state.following_requested_user_response = response.lower() in ["true", "yes", "y"] + state.following_requested = False + logger.debug("Websocket received a response from user: %s", state.following_requested_user_response) + user_replied_event.set() + except WebSocketDisconnect: + logger.info("Client disconnected from websocket for multiplayer coordination.") + state.client_listening_for_follower_requests = False + + state_change_task = asyncio.create_task(ask_user_for_follow_handler()) + client_message_task = asyncio.create_task(client_message_handler()) try: - while True: - response = await websocket.receive_text() - state.following_requested_user_response = response.lower() in ["true", "yes", "y"] - state.following_requested = False - logger.debug("Websocket received a response from user: %s", state.following_requested_user_response) - user_replied_event.set() - - except WebSocketDisconnect: - logger.info("Client disconnected from websocket for multiplayer coordination.") - state.client_listening_for_follower_requests = False + await asyncio.gather(state_change_task, client_message_task) finally: - state_change_task.cancel() \ No newline at end of file + state_change_task.cancel() + client_message_task.cancel() diff --git a/src/pqnstack/app/api/routes/debug.py b/src/pqnstack/app/api/routes/debug.py index 0250aa90..1e2af119 100644 --- a/src/pqnstack/app/api/routes/debug.py +++ b/src/pqnstack/app/api/routes/debug.py @@ -1,11 +1,11 @@ from fastapi import APIRouter +from pqnstack.app.core.config import NodeState from pqnstack.app.core.config import state -router = APIRouter(prefix='/debug', tags=['debug']) +router = APIRouter(prefix="/debug", tags=["debug"]) -@router.get('/state') -async def get_state(): +@router.get("/state") +async def get_state() -> NodeState: return state - diff --git a/src/pqnstack/app/core/config.py b/src/pqnstack/app/core/config.py index d5913f99..7c6061d2 100644 --- a/src/pqnstack/app/core/config.py +++ b/src/pqnstack/app/core/config.py @@ -1,6 +1,6 @@ +import asyncio import logging from functools import lru_cache -import asyncio from pydantic import BaseModel from pydantic import Field @@ -75,7 +75,6 @@ def get_settings() -> Settings: class NodeState(BaseModel): - # FIXME: Make sure we are checking for the client_listening_for_follower_requests state everywhere. client_listening_for_follower_requests: bool = False @@ -83,9 +82,9 @@ class NodeState(BaseModel): leading: bool = False followers_address: str | None = None - # Follower's state following: bool = False + # Other node requested this node to follow it. following_requested: bool = False # User's response to the follow request. None if no response yet, True if accepted, False if rejected. following_requested_user_response: bool | None = None @@ -115,5 +114,5 @@ class NodeState(BaseModel): state = NodeState() -state_change_event = asyncio.Event() +ask_user_for_follow_event = asyncio.Event() user_replied_event = asyncio.Event() From 63c668a3be0e7c1f2a9dd83abd7501d5a4671ffb Mon Sep 17 00:00:00 2001 From: marcosf2 Date: Thu, 11 Sep 2025 13:03:00 -0500 Subject: [PATCH 04/23] Moved all coordination state into its own model and dep --- src/pqnstack/app/api/deps.py | 10 ++++ src/pqnstack/app/api/routes/coordination.py | 64 +++++++++++---------- src/pqnstack/app/core/config.py | 11 ++-- 3 files changed, 50 insertions(+), 35 deletions(-) diff --git a/src/pqnstack/app/api/deps.py b/src/pqnstack/app/api/deps.py index e3c163b5..234a31b7 100644 --- a/src/pqnstack/app/api/deps.py +++ b/src/pqnstack/app/api/deps.py @@ -5,7 +5,9 @@ from fastapi import Depends from pqnstack.app.core.config import settings +from pqnstack.app.core.config import CoordinationState from pqnstack.network.client import Client +from pqnstack.app.core.config import state async def get_http_client() -> AsyncGenerator[httpx.AsyncClient, None]: @@ -22,3 +24,11 @@ async def get_instrument_client() -> AsyncGenerator[Client, None]: InstrumentClientDep = Annotated[httpx.AsyncClient, Depends(get_instrument_client)] + + +async def get_coordination_state() -> AsyncGenerator[CoordinationState, None]: + yield state.coordination_state + + +CoordinationStateDep = Annotated[CoordinationState, Depends(get_coordination_state)] + diff --git a/src/pqnstack/app/api/routes/coordination.py b/src/pqnstack/app/api/routes/coordination.py index 93a533e5..b9096996 100644 --- a/src/pqnstack/app/api/routes/coordination.py +++ b/src/pqnstack/app/api/routes/coordination.py @@ -9,9 +9,9 @@ from fastapi import status from pqnstack.app.api.deps import ClientDep +from pqnstack.app.api.deps import CoordinationStateDep from pqnstack.app.core.config import ask_user_for_follow_event from pqnstack.app.core.config import settings -from pqnstack.app.core.config import state from pqnstack.app.core.config import user_replied_event logger = logging.getLogger(__name__) @@ -22,19 +22,19 @@ # TODO: Send a disconnection message if I was following someone. @router.post("/reset_coordination_state") -async def reset_coordination_state() -> None: +async def reset_coordination_state(coord: CoordinationStateDep) -> None: """Reset the coordination state of the node.""" - state.leading = False - state.followers_address = None - state.following = False - state.following_requested = False - state.following_requested_user_response = None - state.leaders_address = None - state.leaders_name = None + coord.leading = False + coord.followers_address = "" + coord.following = False + coord.following_requested = False + coord.following_requested_user_response = None + coord.leaders_address = "" + coord.leaders_name = "" @router.post("/collect_follower") -async def collect_follower(address: str, http_client: ClientDep) -> bool: +async def collect_follower(address: str, coord: CoordinationStateDep, http_client: ClientDep) -> bool: """ Endpoint called by a leader node (this one) to request a follower node (other node) to follow it. @@ -49,8 +49,8 @@ async def collect_follower(address: str, http_client: ClientDep) -> bool: raise HTTPException(status_code=ret.status_code, detail=ret.text) if ret.json() is True: - state.leading = True - state.followers_address = address + coord.leading = True + coord.followers_address = address logger.info("Successfully collected follower") return True if ret.json() is False: @@ -63,7 +63,7 @@ async def collect_follower(address: str, http_client: ClientDep) -> bool: @router.post("/follow_requested") -async def follow_requested(request: Request, leaders_name: str) -> bool: +async def follow_requested(request: Request, leaders_name: str, coord: CoordinationStateDep) -> bool: """ Endpoint is called by a leader node (other node) to request this node to follow it. @@ -78,34 +78,36 @@ async def follow_requested(request: Request, leaders_name: str) -> bool: leaders_address = request.client.host # Check if the client is ready to accept a follower request and that node is not already following someone. - if state.client_listening_for_follower_requests and not state.following: - state.following_requested = True + if coord.client_listening_for_follower_requests and not coord.following: + coord.following_requested = True + coord.leaders_name = leaders_name + coord.leaders_address = leaders_address # Trigger the state change to get the websocket to send question to user ask_user_for_follow_event.set() logger.debug("Asking user to accept follow request from %s (%s)", leaders_name, leaders_address) await user_replied_event.wait() # Wait for a state change event to see if user accepted user_replied_event.clear() # Reset the event for the next change - if state.following_requested_user_response: + if coord.following_requested_user_response: logger.debug("Follow request from %s accepted.", leaders_address) - state.following = True - state.leaders_name = leaders_name - state.leaders_address = leaders_address + coord.following = True + coord.leaders_name = leaders_name + coord.leaders_address = leaders_address ask_user_for_follow_event.set() return True logger.debug("Follow request from %s rejected.", leaders_address) # Clean up the state if user rejected - state.leaders_address = None - state.leaders_name = None - state.following_requested = False + coord.leaders_address = "" + coord.leaders_name = "" + coord.following_requested = False else: logger.info( "Request rejected because %s", ( "client is not listening for requests" - if state.client_listening_for_follower_requests + if coord.client_listening_for_follower_requests else "this node is already following someone" ), ) @@ -114,20 +116,20 @@ async def follow_requested(request: Request, leaders_name: str) -> bool: @router.websocket("/follow_requested_alerts") -async def follow_requested_alert(websocket: WebSocket) -> None: +async def follow_requested_alert(websocket: WebSocket, coord: CoordinationStateDep) -> None: """Websocket endpoint is used to alert the client when a follow request is received. It also handles the response from the client.""" await websocket.accept() logger.info("Client connected to websocket for multiplayer coordination.") - state.client_listening_for_follower_requests = True + coord.client_listening_for_follower_requests = True async def ask_user_for_follow_handler() -> None: """Task that waits for the ask_user_for_follow_event event and sends a message to the client if a follow request is detected.""" while True: await ask_user_for_follow_event.wait() # Wait for a state change event - if state.following_requested: + if coord.following_requested: logger.debug("Websocket detected a follow request, asking user for response.") await websocket.send_text( - f"Do you want to accept a connection from {state.leaders_name} ({state.leaders_address})?" + f"Do you want to accept a connection from {coord.leaders_name} ({coord.leaders_address})?" ) ask_user_for_follow_event.clear() # Reset the event for the next change @@ -136,13 +138,13 @@ async def client_message_handler() -> None: try: while True: response = await websocket.receive_text() - state.following_requested_user_response = response.lower() in ["true", "yes", "y"] - state.following_requested = False - logger.debug("Websocket received a response from user: %s", state.following_requested_user_response) + coord.following_requested_user_response = response.lower() in ["true", "yes", "y"] + coord.following_requested = False + logger.debug("Websocket received a response from user: %s", coord.following_requested_user_response) user_replied_event.set() except WebSocketDisconnect: logger.info("Client disconnected from websocket for multiplayer coordination.") - state.client_listening_for_follower_requests = False + coord.client_listening_for_follower_requests = False state_change_task = asyncio.create_task(ask_user_for_follow_handler()) client_message_task = asyncio.create_task(client_message_handler()) diff --git a/src/pqnstack/app/core/config.py b/src/pqnstack/app/core/config.py index 7c6061d2..b3272562 100644 --- a/src/pqnstack/app/core/config.py +++ b/src/pqnstack/app/core/config.py @@ -74,13 +74,13 @@ def get_settings() -> Settings: settings = get_settings() -class NodeState(BaseModel): +class CoordinationState(BaseModel): # FIXME: Make sure we are checking for the client_listening_for_follower_requests state everywhere. client_listening_for_follower_requests: bool = False # Leader's state leading: bool = False - followers_address: str | None = None + followers_address: str = "" # Follower's state following: bool = False @@ -89,9 +89,12 @@ class NodeState(BaseModel): # User's response to the follow request. None if no response yet, True if accepted, False if rejected. following_requested_user_response: bool | None = None # The address of the leader this node is following. None if not following anyone. - leaders_address: str | None = None - leaders_name: str | None = None + leaders_address: str = "" + leaders_name: str = "" + +class NodeState(BaseModel): + coordination_state: CoordinationState = CoordinationState() chsh_request_basis: list[float] = [22.5, 67.5] # FIXME: Use enums for this qkd_basis_list: list[QKDEncodingBasis] = [ From a8ba0b4b88f395efc4a6e36646c38689b4d01963 Mon Sep 17 00:00:00 2001 From: marcosf2 Date: Wed, 17 Sep 2025 11:14:12 -0500 Subject: [PATCH 05/23] Implemented feedback to coordination.py --- src/pqnstack/app/api/routes/coordination.py | 92 ++++++++++++--------- 1 file changed, 53 insertions(+), 39 deletions(-) diff --git a/src/pqnstack/app/api/routes/coordination.py b/src/pqnstack/app/api/routes/coordination.py index b9096996..7479531f 100644 --- a/src/pqnstack/app/api/routes/coordination.py +++ b/src/pqnstack/app/api/routes/coordination.py @@ -7,6 +7,7 @@ from fastapi import WebSocket from fastapi import WebSocketDisconnect from fastapi import status +from pydantic import BaseModel from pqnstack.app.api.deps import ClientDep from pqnstack.app.api.deps import CoordinationStateDep @@ -17,12 +18,24 @@ logger = logging.getLogger(__name__) +class FollowRequestResponse(BaseModel): + accepted: bool + + +class CollectFollowerResponse(BaseModel): + success: bool + + +class ResetCoordinationStateResponse(BaseModel): + message: str = "Coordination state reset successfully" + + router = APIRouter(prefix="/coordination", tags=["coordination"]) # TODO: Send a disconnection message if I was following someone. -@router.post("/reset_coordination_state") -async def reset_coordination_state(coord: CoordinationStateDep) -> None: +@router.post("/reset_coordination_state", response_model=ResetCoordinationStateResponse) +async def reset_coordination_state(coord: CoordinationStateDep) -> ResetCoordinationStateResponse: """Reset the coordination state of the node.""" coord.leading = False coord.followers_address = "" @@ -31,16 +44,17 @@ async def reset_coordination_state(coord: CoordinationStateDep) -> None: coord.following_requested_user_response = None coord.leaders_address = "" coord.leaders_name = "" + return ResetCoordinationStateResponse() -@router.post("/collect_follower") -async def collect_follower(address: str, coord: CoordinationStateDep, http_client: ClientDep) -> bool: +@router.post("/collect_follower", response_model=CollectFollowerResponse) +async def collect_follower(address: str, coord: CoordinationStateDep, http_client: ClientDep) -> CollectFollowerResponse: """ Endpoint called by a leader node (this one) to request a follower node (other node) to follow it. Returns ------- - True if the follower accepted the request, False otherwise. + CollectFollowerResponse indicating if the follower accepted the request. """ logger.info("Requesting client at %s to follow", address) @@ -48,28 +62,29 @@ async def collect_follower(address: str, coord: CoordinationStateDep, http_clien if ret.status_code != status.HTTP_200_OK: raise HTTPException(status_code=ret.status_code, detail=ret.text) - if ret.json() is True: + response_data = ret.json() + if response_data.get("accepted") is True: coord.leading = True coord.followers_address = address logger.info("Successfully collected follower") - return True - if ret.json() is False: + return CollectFollowerResponse(success=True) + if response_data.get("accepted") is False: logger.info("Follower rejected follow request") - return False + return CollectFollowerResponse(success=False) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Could not collect follower for unknown reasons" ) -@router.post("/follow_requested") -async def follow_requested(request: Request, leaders_name: str, coord: CoordinationStateDep) -> bool: +@router.post("/follow_requested", response_model=FollowRequestResponse) +async def follow_requested(request: Request, leaders_name: str, coord: CoordinationStateDep) -> FollowRequestResponse: """ Endpoint is called by a leader node (other node) to request this node to follow it. Returns ------- - True if the follow request is accepted, False otherwise. + FollowRequestResponse indicating if the follow request is accepted. """ logger.debug("Requesting client at %s to follow", leaders_name) @@ -78,41 +93,40 @@ async def follow_requested(request: Request, leaders_name: str, coord: Coordinat leaders_address = request.client.host # Check if the client is ready to accept a follower request and that node is not already following someone. - if coord.client_listening_for_follower_requests and not coord.following: - coord.following_requested = True - coord.leaders_name = leaders_name - coord.leaders_address = leaders_address - # Trigger the state change to get the websocket to send question to user - ask_user_for_follow_event.set() - - logger.debug("Asking user to accept follow request from %s (%s)", leaders_name, leaders_address) - await user_replied_event.wait() # Wait for a state change event to see if user accepted - user_replied_event.clear() # Reset the event for the next change - if coord.following_requested_user_response: - logger.debug("Follow request from %s accepted.", leaders_address) - coord.following = True - coord.leaders_name = leaders_name - coord.leaders_address = leaders_address - ask_user_for_follow_event.set() - return True - - logger.debug("Follow request from %s rejected.", leaders_address) - # Clean up the state if user rejected - coord.leaders_address = "" - coord.leaders_name = "" - coord.following_requested = False - - else: + if not coord.client_listening_for_follower_requests or coord.following: logger.info( "Request rejected because %s", ( "client is not listening for requests" - if coord.client_listening_for_follower_requests + if not coord.client_listening_for_follower_requests else "this node is already following someone" ), ) + return FollowRequestResponse(accepted=False) + + coord.following_requested = True + coord.leaders_name = leaders_name + coord.leaders_address = leaders_address + # Trigger the state change to get the websocket to send question to user + ask_user_for_follow_event.set() + + logger.debug("Asking user to accept follow request from %s (%s)", leaders_name, leaders_address) + await user_replied_event.wait() # Wait for a state change event to see if user accepted + user_replied_event.clear() # Reset the event for the next change + if coord.following_requested_user_response: + logger.debug("Follow request from %s accepted.", leaders_address) + coord.following = True + coord.leaders_name = leaders_name + coord.leaders_address = leaders_address + ask_user_for_follow_event.set() + return FollowRequestResponse(accepted=True) - return False + logger.debug("Follow request from %s rejected.", leaders_address) + # Clean up the state if user rejected + coord.leaders_address = "" + coord.leaders_name = "" + coord.following_requested = False + return FollowRequestResponse(accepted=False) @router.websocket("/follow_requested_alerts") From df1d790c9290d8ce4b34bcec2e0bea5a8927adc0 Mon Sep 17 00:00:00 2001 From: marcosf2 Date: Wed, 17 Sep 2025 11:34:33 -0500 Subject: [PATCH 06/23] Removed CoordinationState object --- src/pqnstack/app/api/deps.py | 12 +--- src/pqnstack/app/api/routes/coordination.py | 66 ++++++++++----------- src/pqnstack/app/core/config.py | 14 +++-- 3 files changed, 46 insertions(+), 46 deletions(-) diff --git a/src/pqnstack/app/api/deps.py b/src/pqnstack/app/api/deps.py index 234a31b7..079886a0 100644 --- a/src/pqnstack/app/api/deps.py +++ b/src/pqnstack/app/api/deps.py @@ -5,10 +5,9 @@ from fastapi import Depends from pqnstack.app.core.config import settings -from pqnstack.app.core.config import CoordinationState from pqnstack.network.client import Client -from pqnstack.app.core.config import state - +from pqnstack.app.core.config import NodeState +from pqnstack.app.core.config import get_state async def get_http_client() -> AsyncGenerator[httpx.AsyncClient, None]: async with httpx.AsyncClient(timeout=60) as client: @@ -26,9 +25,4 @@ async def get_instrument_client() -> AsyncGenerator[Client, None]: InstrumentClientDep = Annotated[httpx.AsyncClient, Depends(get_instrument_client)] -async def get_coordination_state() -> AsyncGenerator[CoordinationState, None]: - yield state.coordination_state - - -CoordinationStateDep = Annotated[CoordinationState, Depends(get_coordination_state)] - +StateDep = Annotated[NodeState, Depends(get_state)] diff --git a/src/pqnstack/app/api/routes/coordination.py b/src/pqnstack/app/api/routes/coordination.py index 7479531f..86b61d25 100644 --- a/src/pqnstack/app/api/routes/coordination.py +++ b/src/pqnstack/app/api/routes/coordination.py @@ -10,7 +10,7 @@ from pydantic import BaseModel from pqnstack.app.api.deps import ClientDep -from pqnstack.app.api.deps import CoordinationStateDep +from pqnstack.app.api.deps import StateDep from pqnstack.app.core.config import ask_user_for_follow_event from pqnstack.app.core.config import settings from pqnstack.app.core.config import user_replied_event @@ -35,20 +35,20 @@ class ResetCoordinationStateResponse(BaseModel): # TODO: Send a disconnection message if I was following someone. @router.post("/reset_coordination_state", response_model=ResetCoordinationStateResponse) -async def reset_coordination_state(coord: CoordinationStateDep) -> ResetCoordinationStateResponse: +async def reset_coordination_state(state: StateDep) -> ResetCoordinationStateResponse: """Reset the coordination state of the node.""" - coord.leading = False - coord.followers_address = "" - coord.following = False - coord.following_requested = False - coord.following_requested_user_response = None - coord.leaders_address = "" - coord.leaders_name = "" + state.leading = False + state.followers_address = "" + state.following = False + state.following_requested = False + state.following_requested_user_response = None + state.leaders_address = "" + state.leaders_name = "" return ResetCoordinationStateResponse() @router.post("/collect_follower", response_model=CollectFollowerResponse) -async def collect_follower(address: str, coord: CoordinationStateDep, http_client: ClientDep) -> CollectFollowerResponse: +async def collect_follower(address: str, state: StateDep, http_client: ClientDep) -> CollectFollowerResponse: """ Endpoint called by a leader node (this one) to request a follower node (other node) to follow it. @@ -64,8 +64,8 @@ async def collect_follower(address: str, coord: CoordinationStateDep, http_clien response_data = ret.json() if response_data.get("accepted") is True: - coord.leading = True - coord.followers_address = address + state.leading = True + state.followers_address = address logger.info("Successfully collected follower") return CollectFollowerResponse(success=True) if response_data.get("accepted") is False: @@ -78,7 +78,7 @@ async def collect_follower(address: str, coord: CoordinationStateDep, http_clien @router.post("/follow_requested", response_model=FollowRequestResponse) -async def follow_requested(request: Request, leaders_name: str, coord: CoordinationStateDep) -> FollowRequestResponse: +async def follow_requested(request: Request, leaders_name: str, state: StateDep) -> FollowRequestResponse: """ Endpoint is called by a leader node (other node) to request this node to follow it. @@ -93,57 +93,57 @@ async def follow_requested(request: Request, leaders_name: str, coord: Coordinat leaders_address = request.client.host # Check if the client is ready to accept a follower request and that node is not already following someone. - if not coord.client_listening_for_follower_requests or coord.following: + if not state.client_listening_for_follower_requests or state.following: logger.info( "Request rejected because %s", ( "client is not listening for requests" - if not coord.client_listening_for_follower_requests + if not state.client_listening_for_follower_requests else "this node is already following someone" ), ) return FollowRequestResponse(accepted=False) - coord.following_requested = True - coord.leaders_name = leaders_name - coord.leaders_address = leaders_address + state.following_requested = True + state.leaders_name = leaders_name + state.leaders_address = leaders_address # Trigger the state change to get the websocket to send question to user ask_user_for_follow_event.set() logger.debug("Asking user to accept follow request from %s (%s)", leaders_name, leaders_address) await user_replied_event.wait() # Wait for a state change event to see if user accepted user_replied_event.clear() # Reset the event for the next change - if coord.following_requested_user_response: + if state.following_requested_user_response: logger.debug("Follow request from %s accepted.", leaders_address) - coord.following = True - coord.leaders_name = leaders_name - coord.leaders_address = leaders_address + state.following = True + state.leaders_name = leaders_name + state.leaders_address = leaders_address ask_user_for_follow_event.set() return FollowRequestResponse(accepted=True) logger.debug("Follow request from %s rejected.", leaders_address) # Clean up the state if user rejected - coord.leaders_address = "" - coord.leaders_name = "" - coord.following_requested = False + state.leaders_address = "" + state.leaders_name = "" + state.following_requested = False return FollowRequestResponse(accepted=False) @router.websocket("/follow_requested_alerts") -async def follow_requested_alert(websocket: WebSocket, coord: CoordinationStateDep) -> None: +async def follow_requested_alert(websocket: WebSocket, state: StateDep) -> None: """Websocket endpoint is used to alert the client when a follow request is received. It also handles the response from the client.""" await websocket.accept() logger.info("Client connected to websocket for multiplayer coordination.") - coord.client_listening_for_follower_requests = True + state.client_listening_for_follower_requests = True async def ask_user_for_follow_handler() -> None: """Task that waits for the ask_user_for_follow_event event and sends a message to the client if a follow request is detected.""" while True: await ask_user_for_follow_event.wait() # Wait for a state change event - if coord.following_requested: + if state.following_requested: logger.debug("Websocket detected a follow request, asking user for response.") await websocket.send_text( - f"Do you want to accept a connection from {coord.leaders_name} ({coord.leaders_address})?" + f"Do you want to accept a connection from {state.leaders_name} ({state.leaders_address})?" ) ask_user_for_follow_event.clear() # Reset the event for the next change @@ -152,13 +152,13 @@ async def client_message_handler() -> None: try: while True: response = await websocket.receive_text() - coord.following_requested_user_response = response.lower() in ["true", "yes", "y"] - coord.following_requested = False - logger.debug("Websocket received a response from user: %s", coord.following_requested_user_response) + state.following_requested_user_response = response.lower() in ["true", "yes", "y"] + state.following_requested = False + logger.debug("Websocket received a response from user: %s", state.following_requested_user_response) user_replied_event.set() except WebSocketDisconnect: logger.info("Client disconnected from websocket for multiplayer coordination.") - coord.client_listening_for_follower_requests = False + state.client_listening_for_follower_requests = False state_change_task = asyncio.create_task(ask_user_for_follow_handler()) client_message_task = asyncio.create_task(client_message_handler()) diff --git a/src/pqnstack/app/core/config.py b/src/pqnstack/app/core/config.py index b3272562..2d31f33c 100644 --- a/src/pqnstack/app/core/config.py +++ b/src/pqnstack/app/core/config.py @@ -74,7 +74,9 @@ def get_settings() -> Settings: settings = get_settings() -class CoordinationState(BaseModel): + +class NodeState(BaseModel): + # Coordination state # FIXME: Make sure we are checking for the client_listening_for_follower_requests state everywhere. client_listening_for_follower_requests: bool = False @@ -92,10 +94,10 @@ class CoordinationState(BaseModel): leaders_address: str = "" leaders_name: str = "" - -class NodeState(BaseModel): - coordination_state: CoordinationState = CoordinationState() + # CHSH state chsh_request_basis: list[float] = [22.5, 67.5] + + # QKD state # FIXME: Use enums for this qkd_basis_list: list[QKDEncodingBasis] = [ QKDEncodingBasis.DA, @@ -119,3 +121,7 @@ class NodeState(BaseModel): state = NodeState() ask_user_for_follow_event = asyncio.Event() user_replied_event = asyncio.Event() + + +def get_state() -> NodeState: + return state From dc47b5d97213eca9b2675a268d9170a88c7301f5 Mon Sep 17 00:00:00 2001 From: marcosf2 Date: Wed, 17 Sep 2025 11:41:05 -0500 Subject: [PATCH 07/23] State is now used through a dep --- src/pqnstack/app/api/routes/chsh.py | 4 ++-- src/pqnstack/app/api/routes/debug.py | 4 ++-- src/pqnstack/app/api/routes/qkd.py | 8 +++++--- src/pqnstack/app/core/config.py | 1 - 4 files changed, 9 insertions(+), 8 deletions(-) diff --git a/src/pqnstack/app/api/routes/chsh.py b/src/pqnstack/app/api/routes/chsh.py index c1bd7d7c..ebd85dfb 100644 --- a/src/pqnstack/app/api/routes/chsh.py +++ b/src/pqnstack/app/api/routes/chsh.py @@ -7,8 +7,8 @@ from fastapi import status from pqnstack.app.api.deps import ClientDep +from pqnstack.app.api.deps import StateDep from pqnstack.app.core.config import settings -from pqnstack.app.core.config import state from pqnstack.app.core.models import calculate_chsh_expectation_error from pqnstack.network.client import Client @@ -129,7 +129,7 @@ async def chsh( @router.post("/request-angle-by-basis") -async def request_angle_by_basis(index: int, *, perp: bool = False) -> bool: +async def request_angle_by_basis(index: int, state: StateDep, *, perp: bool = False) -> bool: client = Client(host=settings.router_address, port=settings.router_port, timeout=600_000) hwp = cast( "RotatorInstrument", diff --git a/src/pqnstack/app/api/routes/debug.py b/src/pqnstack/app/api/routes/debug.py index 1e2af119..12768c32 100644 --- a/src/pqnstack/app/api/routes/debug.py +++ b/src/pqnstack/app/api/routes/debug.py @@ -1,11 +1,11 @@ from fastapi import APIRouter +from pqnstack.app.api.deps import StateDep from pqnstack.app.core.config import NodeState -from pqnstack.app.core.config import state router = APIRouter(prefix="/debug", tags=["debug"]) @router.get("/state") -async def get_state() -> NodeState: +async def get_state(state: StateDep) -> NodeState: return state diff --git a/src/pqnstack/app/api/routes/qkd.py b/src/pqnstack/app/api/routes/qkd.py index ab32c1e5..1cdc75c6 100644 --- a/src/pqnstack/app/api/routes/qkd.py +++ b/src/pqnstack/app/api/routes/qkd.py @@ -8,8 +8,8 @@ from fastapi import status from pqnstack.app.api.deps import ClientDep +from pqnstack.app.api.deps import StateDep from pqnstack.app.core.config import settings -from pqnstack.app.core.config import state from pqnstack.constants import BasisBool from pqnstack.constants import QKDEncodingBasis from pqnstack.network.client import Client @@ -25,6 +25,7 @@ async def _qkd( follower_node_address: str, http_client: ClientDep, + state: StateDep, timetagger_address: str | None = None, ) -> list[int]: logger.debug("Starting QKD") @@ -109,6 +110,7 @@ def get_outcome(state: int, basis: int, choice: int, counts: int) -> int: async def qkd( follower_node_address: str, http_client: ClientDep, + state: StateDep, timetagger_address: str | None = None, ) -> list[int]: """Perform a QKD protocol with the given follower node.""" @@ -123,7 +125,7 @@ async def qkd( @router.post("/single_bit") -async def request_qkd_single_pass() -> bool: +async def request_qkd_single_pass(state: StateDep) -> bool: client = Client(host=settings.router_address, port=settings.router_port, timeout=600_000) hwp = cast( "RotatorInstrument", @@ -153,7 +155,7 @@ async def request_qkd_single_pass() -> bool: @router.post("/request_basis_list") -def request_qkd_basis_list(leader_basis_list: list[str]) -> list[str]: +def request_qkd_basis_list(leader_basis_list: list[str], state: StateDep) -> list[str]: """Return the list of basis angles for QKD.""" # Check that lengths match if len(leader_basis_list) != len(state.qkd_request_basis_list): diff --git a/src/pqnstack/app/core/config.py b/src/pqnstack/app/core/config.py index 2d31f33c..15024c13 100644 --- a/src/pqnstack/app/core/config.py +++ b/src/pqnstack/app/core/config.py @@ -74,7 +74,6 @@ def get_settings() -> Settings: settings = get_settings() - class NodeState(BaseModel): # Coordination state # FIXME: Make sure we are checking for the client_listening_for_follower_requests state everywhere. From 05db5dee8b175f8424a89a786acac257364274c4 Mon Sep 17 00:00:00 2001 From: marcosf2 Date: Wed, 17 Sep 2025 13:08:57 -0500 Subject: [PATCH 08/23] mypy and ruff fixes --- src/pqnstack/app/api/routes/coordination.py | 6 +++--- src/pqnstack/app/api/routes/qkd.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/pqnstack/app/api/routes/coordination.py b/src/pqnstack/app/api/routes/coordination.py index 86b61d25..1e618873 100644 --- a/src/pqnstack/app/api/routes/coordination.py +++ b/src/pqnstack/app/api/routes/coordination.py @@ -34,7 +34,7 @@ class ResetCoordinationStateResponse(BaseModel): # TODO: Send a disconnection message if I was following someone. -@router.post("/reset_coordination_state", response_model=ResetCoordinationStateResponse) +@router.post("/reset_coordination_state") async def reset_coordination_state(state: StateDep) -> ResetCoordinationStateResponse: """Reset the coordination state of the node.""" state.leading = False @@ -47,7 +47,7 @@ async def reset_coordination_state(state: StateDep) -> ResetCoordinationStateRes return ResetCoordinationStateResponse() -@router.post("/collect_follower", response_model=CollectFollowerResponse) +@router.post("/collect_follower") async def collect_follower(address: str, state: StateDep, http_client: ClientDep) -> CollectFollowerResponse: """ Endpoint called by a leader node (this one) to request a follower node (other node) to follow it. @@ -77,7 +77,7 @@ async def collect_follower(address: str, state: StateDep, http_client: ClientDep ) -@router.post("/follow_requested", response_model=FollowRequestResponse) +@router.post("/follow_requested") async def follow_requested(request: Request, leaders_name: str, state: StateDep) -> FollowRequestResponse: """ Endpoint is called by a leader node (other node) to request this node to follow it. diff --git a/src/pqnstack/app/api/routes/qkd.py b/src/pqnstack/app/api/routes/qkd.py index 1cdc75c6..b79b0e33 100644 --- a/src/pqnstack/app/api/routes/qkd.py +++ b/src/pqnstack/app/api/routes/qkd.py @@ -121,7 +121,7 @@ async def qkd( detail="QKD basis list is empty", ) - return await _qkd(follower_node_address, http_client, timetagger_address) + return await _qkd(follower_node_address, http_client, state, timetagger_address) @router.post("/single_bit") From 405716e17a5e08899efa05f4d4afb69e530079e0 Mon Sep 17 00:00:00 2001 From: marcosf2 Date: Wed, 24 Sep 2025 16:50:47 -0500 Subject: [PATCH 09/23] Minor bugfix --- src/pqnstack/app/api/routes/coordination.py | 27 ++++++++++++++------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/src/pqnstack/app/api/routes/coordination.py b/src/pqnstack/app/api/routes/coordination.py index 1e618873..ded798b9 100644 --- a/src/pqnstack/app/api/routes/coordination.py +++ b/src/pqnstack/app/api/routes/coordination.py @@ -93,7 +93,7 @@ async def follow_requested(request: Request, leaders_name: str, state: StateDep) leaders_address = request.client.host # Check if the client is ready to accept a follower request and that node is not already following someone. - if not state.client_listening_for_follower_requests or state.following: + if not state.client_listening_for_follower_requests and state.following: logger.info( "Request rejected because %s", ( @@ -118,7 +118,6 @@ async def follow_requested(request: Request, leaders_name: str, state: StateDep) state.following = True state.leaders_name = leaders_name state.leaders_address = leaders_address - ask_user_for_follow_event.set() return FollowRequestResponse(accepted=True) logger.debug("Follow request from %s rejected.", leaders_address) @@ -139,13 +138,23 @@ async def follow_requested_alert(websocket: WebSocket, state: StateDep) -> None: async def ask_user_for_follow_handler() -> None: """Task that waits for the ask_user_for_follow_event event and sends a message to the client if a follow request is detected.""" while True: - await ask_user_for_follow_event.wait() # Wait for a state change event - if state.following_requested: - logger.debug("Websocket detected a follow request, asking user for response.") - await websocket.send_text( - f"Do you want to accept a connection from {state.leaders_name} ({state.leaders_address})?" - ) - ask_user_for_follow_event.clear() # Reset the event for the next change + try: + await ask_user_for_follow_event.wait() # Wait for a state change event + if state.following_requested: + logger.debug("Websocket detected a follow request, asking user for response.") + try: + await websocket.send_text( + f"Do you want to accept a connection from {state.leaders_name} ({state.leaders_address})?" + ) + except RuntimeError as e: + if "websocket.close" in str(e) or "response already completed" in str(e): + logger.debug("WebSocket already closed, cannot send message") + break + raise + ask_user_for_follow_event.clear() # Reset the event for the next change + except Exception as e: + logger.error("Error in ask_user_for_follow_handler: %s", e) + break async def client_message_handler() -> None: """Task that waits for a message from the client and handles the response. It also handles the case where the client disconnects.""" From e8f78aaf67dbd1fba81805c120ded58a46273e10 Mon Sep 17 00:00:00 2001 From: marcosf2 Date: Wed, 15 Oct 2025 18:31:35 -0500 Subject: [PATCH 10/23] ruff mypy checks --- src/pqnstack/app/api/main.py | 6 +++--- src/pqnstack/app/api/routes/coordination.py | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/pqnstack/app/api/main.py b/src/pqnstack/app/api/main.py index fe65fa93..175e0899 100644 --- a/src/pqnstack/app/api/main.py +++ b/src/pqnstack/app/api/main.py @@ -1,12 +1,12 @@ from fastapi import APIRouter from pqnstack.app.api.routes import chsh +from pqnstack.app.api.routes import coordination +from pqnstack.app.api.routes import debug from pqnstack.app.api.routes import qkd from pqnstack.app.api.routes import rng from pqnstack.app.api.routes import serial from pqnstack.app.api.routes import timetagger -from pqnstack.app.api.routes import coordination -from pqnstack.app.api.routes import debug api_router = APIRouter() api_router.include_router(chsh.router) @@ -15,4 +15,4 @@ api_router.include_router(rng.router) api_router.include_router(serial.router) api_router.include_router(coordination.router) -api_router.include_router(debug.router) \ No newline at end of file +api_router.include_router(debug.router) diff --git a/src/pqnstack/app/api/routes/coordination.py b/src/pqnstack/app/api/routes/coordination.py index ded798b9..fb7ac6a1 100644 --- a/src/pqnstack/app/api/routes/coordination.py +++ b/src/pqnstack/app/api/routes/coordination.py @@ -152,8 +152,8 @@ async def ask_user_for_follow_handler() -> None: break raise ask_user_for_follow_event.clear() # Reset the event for the next change - except Exception as e: - logger.error("Error in ask_user_for_follow_handler: %s", e) + except Exception: + logger.exception("Error in ask_user_for_follow_handler") break async def client_message_handler() -> None: From 5a664ad58246a88729e4a6acde65d666ff09740c Mon Sep 17 00:00:00 2001 From: marcosf2 Date: Fri, 17 Oct 2025 16:36:23 -0500 Subject: [PATCH 11/23] Checking for websocket connection before sending message --- src/pqnstack/app/api/routes/coordination.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/pqnstack/app/api/routes/coordination.py b/src/pqnstack/app/api/routes/coordination.py index fb7ac6a1..35d77850 100644 --- a/src/pqnstack/app/api/routes/coordination.py +++ b/src/pqnstack/app/api/routes/coordination.py @@ -93,7 +93,7 @@ async def follow_requested(request: Request, leaders_name: str, state: StateDep) leaders_address = request.client.host # Check if the client is ready to accept a follower request and that node is not already following someone. - if not state.client_listening_for_follower_requests and state.following: + if not state.client_listening_for_follower_requests or state.following: logger.info( "Request rejected because %s", ( @@ -142,15 +142,13 @@ async def ask_user_for_follow_handler() -> None: await ask_user_for_follow_event.wait() # Wait for a state change event if state.following_requested: logger.debug("Websocket detected a follow request, asking user for response.") - try: + if websocket.client_state.name == "CONNECTED": await websocket.send_text( f"Do you want to accept a connection from {state.leaders_name} ({state.leaders_address})?" ) - except RuntimeError as e: - if "websocket.close" in str(e) or "response already completed" in str(e): - logger.debug("WebSocket already closed, cannot send message") - break - raise + else: + logger.debug("WebSocket not connected, cannot send message") + break ask_user_for_follow_event.clear() # Reset the event for the next change except Exception: logger.exception("Error in ask_user_for_follow_handler") From ee81de71e6b7c18eefdff266ef12b772c37dacf0 Mon Sep 17 00:00:00 2001 From: marcosf2 Date: Sat, 18 Oct 2025 09:28:39 -0500 Subject: [PATCH 12/23] Adds virtual rotary encoder --- src/pqnstack/app/api/routes/debug.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/pqnstack/app/api/routes/debug.py b/src/pqnstack/app/api/routes/debug.py index 12768c32..e94a1f4a 100644 --- a/src/pqnstack/app/api/routes/debug.py +++ b/src/pqnstack/app/api/routes/debug.py @@ -1,7 +1,7 @@ from fastapi import APIRouter from pqnstack.app.api.deps import StateDep -from pqnstack.app.core.config import NodeState +from pqnstack.app.core.config import NodeState, Settings, settings router = APIRouter(prefix="/debug", tags=["debug"]) @@ -9,3 +9,8 @@ @router.get("/state") async def get_state(state: StateDep) -> NodeState: return state + + +@router.get("/settings") +async def get_settings() -> Settings: + return settings \ No newline at end of file From 5ae12f652a2f4450cab247a8036254fb2189b909 Mon Sep 17 00:00:00 2001 From: marcosf2 Date: Sun, 19 Oct 2025 10:41:57 -0500 Subject: [PATCH 13/23] Refactor CollectFollowerResponse to use 'accepted' instead of 'success' and ws logic fix --- src/pqnstack/app/api/routes/coordination.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/pqnstack/app/api/routes/coordination.py b/src/pqnstack/app/api/routes/coordination.py index 35d77850..7f4ee3c3 100644 --- a/src/pqnstack/app/api/routes/coordination.py +++ b/src/pqnstack/app/api/routes/coordination.py @@ -23,7 +23,7 @@ class FollowRequestResponse(BaseModel): class CollectFollowerResponse(BaseModel): - success: bool + accepted: bool class ResetCoordinationStateResponse(BaseModel): @@ -67,10 +67,10 @@ async def collect_follower(address: str, state: StateDep, http_client: ClientDep state.leading = True state.followers_address = address logger.info("Successfully collected follower") - return CollectFollowerResponse(success=True) + return CollectFollowerResponse(accepted=True) if response_data.get("accepted") is False: logger.info("Follower rejected follow request") - return CollectFollowerResponse(success=False) + return CollectFollowerResponse(accepted=False) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Could not collect follower for unknown reasons" @@ -125,6 +125,7 @@ async def follow_requested(request: Request, leaders_name: str, state: StateDep) state.leaders_address = "" state.leaders_name = "" state.following_requested = False + state.following_requested_user_response = None return FollowRequestResponse(accepted=False) @@ -150,9 +151,12 @@ async def ask_user_for_follow_handler() -> None: logger.debug("WebSocket not connected, cannot send message") break ask_user_for_follow_event.clear() # Reset the event for the next change - except Exception: - logger.exception("Error in ask_user_for_follow_handler") + except WebSocketDisconnect: + logger.info("WebSocket disconnected in ask_user_for_follow_handler") break + except Exception: + logger.exception("Error in ask_user_for_follow_handler, continuing to listen") + ask_user_for_follow_event.clear() # Reset the event to continue async def client_message_handler() -> None: """Task that waits for a message from the client and handles the response. It also handles the case where the client disconnects.""" From b992fc89adb156cc6bc507f3fe2405c8d2fb2d33 Mon Sep 17 00:00:00 2001 From: marcosf2 Date: Mon, 27 Oct 2025 10:07:01 -0500 Subject: [PATCH 14/23] Question order is synchronized --- src/pqnstack/app/api/routes/coordination.py | 15 ++++-- src/pqnstack/app/api/routes/debug.py | 6 ++- src/pqnstack/app/api/routes/qkd.py | 54 +++++++++++++++++++++ src/pqnstack/app/core/config.py | 8 ++- 4 files changed, 74 insertions(+), 9 deletions(-) diff --git a/src/pqnstack/app/api/routes/coordination.py b/src/pqnstack/app/api/routes/coordination.py index 7f4ee3c3..6a399a36 100644 --- a/src/pqnstack/app/api/routes/coordination.py +++ b/src/pqnstack/app/api/routes/coordination.py @@ -33,7 +33,8 @@ class ResetCoordinationStateResponse(BaseModel): router = APIRouter(prefix="/coordination", tags=["coordination"]) -# TODO: Send a disconnection message if I was following someone. +# TODO: Send a disconnection message if I was following/leading someone. +# FIXME: This is techincally resetting more than just coordination state. including qkd. @router.post("/reset_coordination_state") async def reset_coordination_state(state: StateDep) -> ResetCoordinationStateResponse: """Reset the coordination state of the node.""" @@ -44,11 +45,12 @@ async def reset_coordination_state(state: StateDep) -> ResetCoordinationStateRes state.following_requested_user_response = None state.leaders_address = "" state.leaders_name = "" + state.qkd_question_order = [] return ResetCoordinationStateResponse() @router.post("/collect_follower") -async def collect_follower(address: str, state: StateDep, http_client: ClientDep) -> CollectFollowerResponse: +async def collect_follower(request: Request, address: str, state: StateDep, http_client: ClientDep) -> CollectFollowerResponse: """ Endpoint called by a leader node (this one) to request a follower node (other node) to follow it. @@ -58,7 +60,10 @@ async def collect_follower(address: str, state: StateDep, http_client: ClientDep """ logger.info("Requesting client at %s to follow", address) - ret = await http_client.post(f"http://{address}/coordination/follow_requested?leaders_name={settings.node_name}") + # Get the port this server is listening on + server_port = request.scope["server"][1] + + ret = await http_client.post(f"http://{address}/coordination/follow_requested?leaders_name={settings.node_name}&leaders_port={server_port}") if ret.status_code != status.HTTP_200_OK: raise HTTPException(status_code=ret.status_code, detail=ret.text) @@ -78,7 +83,7 @@ async def collect_follower(address: str, state: StateDep, http_client: ClientDep @router.post("/follow_requested") -async def follow_requested(request: Request, leaders_name: str, state: StateDep) -> FollowRequestResponse: +async def follow_requested(request: Request, leaders_name: str, leaders_port: int, state: StateDep) -> FollowRequestResponse: """ Endpoint is called by a leader node (other node) to request this node to follow it. @@ -90,7 +95,7 @@ async def follow_requested(request: Request, leaders_name: str, state: StateDep) if request.client is None: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Request lacks the clients host") - leaders_address = request.client.host + leaders_address = f"{request.client.host}:{leaders_port}" # Check if the client is ready to accept a follower request and that node is not already following someone. if not state.client_listening_for_follower_requests or state.following: diff --git a/src/pqnstack/app/api/routes/debug.py b/src/pqnstack/app/api/routes/debug.py index e94a1f4a..3e43d74b 100644 --- a/src/pqnstack/app/api/routes/debug.py +++ b/src/pqnstack/app/api/routes/debug.py @@ -1,7 +1,9 @@ from fastapi import APIRouter from pqnstack.app.api.deps import StateDep -from pqnstack.app.core.config import NodeState, Settings, settings +from pqnstack.app.core.config import NodeState +from pqnstack.app.core.config import Settings +from pqnstack.app.core.config import settings router = APIRouter(prefix="/debug", tags=["debug"]) @@ -13,4 +15,4 @@ async def get_state(state: StateDep) -> NodeState: @router.get("/settings") async def get_settings() -> Settings: - return settings \ No newline at end of file + return settings diff --git a/src/pqnstack/app/api/routes/qkd.py b/src/pqnstack/app/api/routes/qkd.py index b79b0e33..198a1d84 100644 --- a/src/pqnstack/app/api/routes/qkd.py +++ b/src/pqnstack/app/api/routes/qkd.py @@ -174,3 +174,57 @@ def request_qkd_basis_list(leader_basis_list: list[str], state: StateDep) -> lis state.qkd_request_bit_list.clear() return ret + + +@router.get("/question_order") +async def request_qkd_question_order(state: StateDep, http_client: ClientDep,) -> list[int]: + """ + Return the question order for QKD. + + If this node is a leader, it generates a random question order and stores it in the state. + If this node is a follower, it requests the question order from the leader node. + Returns the question order as a list of integers. + """ + + if state.leading and state.following: + logger.error("Node cannot be both leader and follower") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Node cannot be both leader and follower", + ) + + if len(state.qkd_question_order) == 0: + if state.leading and state.followers_address != "": + question_range = range(settings.qkd_settings.minimum_question_index, settings.qkd_settings.maximum_question_index + 1) + question_order = random.sample(list(question_range), settings.qkd_settings.bitstring_length) + state.qkd_question_order = question_order + elif state.leading and state.followers_address == "": + logger.error("Leader node has no follower address set") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Leader node has no follower address set", + ) + + elif state.following and state.leaders_address != "": + try: + r = await http_client.get(f"http://{state.leaders_address}/qkd/question_order") + if r.status_code != status.HTTP_200_OK: + logger.error("Failed to get question order from leader: %s", r.text) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Failed to get question order from leader", + ) + state.qkd_question_order = r.json() + except Exception as e: + logger.error("Error requesting question order from leader: %s", e) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Error requesting question order from leader: {e}", + ) from e + elif state.following and state.leaders_address == "": + logger.error("Follower node has no leader address set") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Follower node has no leader address set", + ) + return state.qkd_question_order diff --git a/src/pqnstack/app/core/config.py b/src/pqnstack/app/core/config.py index 15024c13..80338e0c 100644 --- a/src/pqnstack/app/core/config.py +++ b/src/pqnstack/app/core/config.py @@ -28,6 +28,8 @@ class QKDSettings(BaseModel): hwp: tuple[str, str] = ("", "") request_hwp: tuple[str, str] = ("", "") bitstring_length: int = 4 + minimum_question_index: int = 1 + maximum_question_index: int = 20 discriminating_threshold: int = 10 measurement_config: MeasurementConfig = Field(default_factory=lambda: MeasurementConfig(integration_time_s=5)) @@ -97,8 +99,9 @@ class NodeState(BaseModel): chsh_request_basis: list[float] = [22.5, 67.5] # QKD state - # FIXME: Use enums for this - qkd_basis_list: list[QKDEncodingBasis] = [ + # FIXME: At the moment the reset_coordination_state resets this, probably want to refactor that function out. + qkd_question_order: list[int] = [] # Order of questions for QKD + qkd_leader_basis_list: list[QKDEncodingBasis] = [ QKDEncodingBasis.DA, QKDEncodingBasis.DA, QKDEncodingBasis.DA, @@ -111,6 +114,7 @@ class NodeState(BaseModel): QKDEncodingBasis.HV, QKDEncodingBasis.HV, ] + qkd_follower_basis_list: list[QKDEncodingBasis] = [] qkd_bit_list: list[int] = [] qkd_resulting_bit_list: list[int] = [] # Resulting bits after QKD qkd_request_basis_list: list[QKDEncodingBasis] = [] # Basis angles for QKD From ba5d6cd5f34b317224940bfce479a35421debbc6 Mon Sep 17 00:00:00 2001 From: marcosf2 Date: Mon, 27 Oct 2025 11:05:59 -0500 Subject: [PATCH 15/23] Basis submitted for leader and follower --- src/pqnstack/app/api/routes/coordination.py | 2 + src/pqnstack/app/api/routes/qkd.py | 58 ++++++++++++++++++++- 2 files changed, 59 insertions(+), 1 deletion(-) diff --git a/src/pqnstack/app/api/routes/coordination.py b/src/pqnstack/app/api/routes/coordination.py index 6a399a36..9676db23 100644 --- a/src/pqnstack/app/api/routes/coordination.py +++ b/src/pqnstack/app/api/routes/coordination.py @@ -46,6 +46,8 @@ async def reset_coordination_state(state: StateDep) -> ResetCoordinationStateRes state.leaders_address = "" state.leaders_name = "" state.qkd_question_order = [] + state.qkd_leader_basis_list = [] + state.qkd_follower_basis_list = [] return ResetCoordinationStateResponse() diff --git a/src/pqnstack/app/api/routes/qkd.py b/src/pqnstack/app/api/routes/qkd.py index 198a1d84..b7317134 100644 --- a/src/pqnstack/app/api/routes/qkd.py +++ b/src/pqnstack/app/api/routes/qkd.py @@ -3,13 +3,14 @@ from typing import TYPE_CHECKING from typing import cast +import httpx from fastapi import APIRouter from fastapi import HTTPException from fastapi import status from pqnstack.app.api.deps import ClientDep from pqnstack.app.api.deps import StateDep -from pqnstack.app.core.config import settings +from pqnstack.app.core.config import settings, NodeState from pqnstack.constants import BasisBool from pqnstack.constants import QKDEncodingBasis from pqnstack.network.client import Client @@ -228,3 +229,58 @@ async def request_qkd_question_order(state: StateDep, http_client: ClientDep,) - detail="Follower node has no leader address set", ) return state.qkd_question_order + + +def _submit_qkd_basis_list_leader(state: NodeState, http_client: httpx.Client, basis_list: list[QKDEncodingBasis]): + state.qkd_leader_basis_list = basis_list + + + +def _submit_qkd_basis_list_follower(state: NodeState, http_client: httpx.Client, basis_list: list[QKDEncodingBasis]): + state.qkd_follower_basis_list = basis_list + + +@router.post("/submit_selection_and_start_qkd") +def submit_qkd_selection_and_start_qkd(state: StateDep, http_client: ClientDep, basis_list: list[str]): + """GUI calls this function to submit the QKD basis selection and start the QKD protocol. + This call is called by both leader and follower, depending on the node role, different actions are taken.""" + + if state.leading and state.following: + logger.error("Node cannot be both leader and follower") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Node cannot be both leader and follower", + ) + + if not state.leading and not state.following: + logger.error("Node must be either leader or follower to start QKD") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Node must be either leader or follower to start QKD", + ) + + # Convert 'a' or 'b' strings to QKDEncodingBasis enum values + qkd_basis_list = [] + for basis_str in basis_list: + if basis_str.lower() == 'a': + qkd_basis_list.append(QKDEncodingBasis.HV) + elif basis_str.lower() == 'b': + qkd_basis_list.append(QKDEncodingBasis.DA) + else: + logger.error(f"Invalid basis string: {basis_str}. Expected 'a' or 'b'") + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Invalid basis string: {basis_str}. Expected 'a' or 'b'", + ) + + if state.leading: + ret = _submit_qkd_basis_list_leader(state, http_client, qkd_basis_list) + return ret + + if state.following: + ret = _submit_qkd_basis_list_follower(state, http_client, qkd_basis_list) + return ret + + + + From 85bf6e62a3c6375d10edf512226fa2f5fa6d72dc Mon Sep 17 00:00:00 2001 From: marcosf2 Date: Tue, 28 Oct 2025 10:51:06 -0500 Subject: [PATCH 16/23] QKD now runs from the GUI --- src/pqnstack/app/api/routes/coordination.py | 12 ++- src/pqnstack/app/api/routes/qkd.py | 108 ++++++++++++++++---- src/pqnstack/app/core/config.py | 1 + 3 files changed, 97 insertions(+), 24 deletions(-) diff --git a/src/pqnstack/app/api/routes/coordination.py b/src/pqnstack/app/api/routes/coordination.py index 9676db23..ab4d4078 100644 --- a/src/pqnstack/app/api/routes/coordination.py +++ b/src/pqnstack/app/api/routes/coordination.py @@ -52,7 +52,9 @@ async def reset_coordination_state(state: StateDep) -> ResetCoordinationStateRes @router.post("/collect_follower") -async def collect_follower(request: Request, address: str, state: StateDep, http_client: ClientDep) -> CollectFollowerResponse: +async def collect_follower( + request: Request, address: str, state: StateDep, http_client: ClientDep +) -> CollectFollowerResponse: """ Endpoint called by a leader node (this one) to request a follower node (other node) to follow it. @@ -65,7 +67,9 @@ async def collect_follower(request: Request, address: str, state: StateDep, http # Get the port this server is listening on server_port = request.scope["server"][1] - ret = await http_client.post(f"http://{address}/coordination/follow_requested?leaders_name={settings.node_name}&leaders_port={server_port}") + ret = await http_client.post( + f"http://{address}/coordination/follow_requested?leaders_name={settings.node_name}&leaders_port={server_port}" + ) if ret.status_code != status.HTTP_200_OK: raise HTTPException(status_code=ret.status_code, detail=ret.text) @@ -85,7 +89,9 @@ async def collect_follower(request: Request, address: str, state: StateDep, http @router.post("/follow_requested") -async def follow_requested(request: Request, leaders_name: str, leaders_port: int, state: StateDep) -> FollowRequestResponse: +async def follow_requested( + request: Request, leaders_name: str, leaders_port: int, state: StateDep +) -> FollowRequestResponse: """ Endpoint is called by a leader node (other node) to request this node to follow it. diff --git a/src/pqnstack/app/api/routes/qkd.py b/src/pqnstack/app/api/routes/qkd.py index b7317134..684f0c90 100644 --- a/src/pqnstack/app/api/routes/qkd.py +++ b/src/pqnstack/app/api/routes/qkd.py @@ -1,5 +1,7 @@ +import asyncio import logging import secrets +import random from typing import TYPE_CHECKING from typing import cast @@ -41,7 +43,7 @@ async def _qkd( ) counts = [] - for basis in state.qkd_basis_list: + for basis in state.qkd_leader_basis_list: r = await http_client.post(f"http://{follower_node_address}/qkd/single_bit") if r.status_code != status.HTTP_200_OK: @@ -78,16 +80,19 @@ def get_outcome(state: int, basis: int, choice: int, counts: int) -> int: outcome = [] logger.debug( - "Going for qkd_basis_list: %s, qkd_bit_list: %s, counts: %s", state.qkd_basis_list, state.qkd_bit_list, counts + "Going for qkd_leader_basis_list: %s, qkd_bit_list: %s, counts: %s", + state.qkd_leader_basis_list, + state.qkd_bit_list, + counts, ) - for basis, choice, count in zip(state.qkd_basis_list, state.qkd_bit_list, counts, strict=False): + for basis, choice, count in zip(state.qkd_leader_basis_list, state.qkd_bit_list, counts, strict=False): out = get_outcome(settings.bell_state.value, BasisBool[basis.name].value, choice, count) logger.debug( "Calculating outcome for basis: %s, choice: %s, count: %s, outcome: %s", basis.name, choice, count, out ) outcome.append(out) - basis_list = [basis.name for basis in state.qkd_basis_list] + basis_list = [basis.name for basis in state.qkd_leader_basis_list] # FIXME: Send already binary basis instead of HV/AD. r = await http_client.post(f"http://{follower_node_address}/qkd/request_basis_list", json=basis_list) @@ -115,7 +120,7 @@ async def qkd( timetagger_address: str | None = None, ) -> list[int]: """Perform a QKD protocol with the given follower node.""" - if not state.qkd_basis_list: + if not state.qkd_leader_basis_list: logger.error("QKD basis list is empty") raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, @@ -142,8 +147,18 @@ async def request_qkd_single_pass(state: StateDep) -> bool: logger.debug("Halfwaveplate device found: %s", hwp) - _bases = (QKDEncodingBasis.HV, QKDEncodingBasis.DA) - basis_choice = _bases[secrets.randbits(1)] # FIXME: Make this real quantum random. + # Check if we have basis choices available + if state.qkd_single_bit_current_index >= len(state.qkd_follower_basis_list): + logger.error("No more basis choices available in follower basis list") + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="No more basis choices available in follower basis list", + ) + + # Get the basis choice from the follower basis list + basis_choice = state.qkd_follower_basis_list[state.qkd_single_bit_current_index] + state.qkd_single_bit_current_index += 1 + int_choice = secrets.randbits(1) # FIXME: Make this real quantum random. state.qkd_request_basis_list.append(basis_choice) @@ -178,7 +193,10 @@ def request_qkd_basis_list(leader_basis_list: list[str], state: StateDep) -> lis @router.get("/question_order") -async def request_qkd_question_order(state: StateDep, http_client: ClientDep,) -> list[int]: +async def request_qkd_question_order( + state: StateDep, + http_client: ClientDep, +) -> list[int]: """ Return the question order for QKD. @@ -196,8 +214,10 @@ async def request_qkd_question_order(state: StateDep, http_client: ClientDep,) - if len(state.qkd_question_order) == 0: if state.leading and state.followers_address != "": - question_range = range(settings.qkd_settings.minimum_question_index, settings.qkd_settings.maximum_question_index + 1) - question_order = random.sample(list(question_range), settings.qkd_settings.bitstring_length) + question_range = range( + settings.qkd_settings.minimum_question_index, settings.qkd_settings.maximum_question_index + 1 + ) + question_order = random.sample(list(question_range), settings.qkd_settings.bitstring_length) # just choosing question order, no need for secure secrets package. state.qkd_question_order = question_order elif state.leading and state.followers_address == "": logger.error("Leader node has no follower address set") @@ -231,17 +251,61 @@ async def request_qkd_question_order(state: StateDep, http_client: ClientDep,) - return state.qkd_question_order -def _submit_qkd_basis_list_leader(state: NodeState, http_client: httpx.Client, basis_list: list[QKDEncodingBasis]): +@router.get("/is_follower_ready") +async def is_follower_ready(state: StateDep) -> bool: + """Check if the follower node is ready for QKD. + Follower is ready when the state has the basis list with as many choices as the bitstring length. + """ + if not state.following: + logger.error("Node is not a follower") + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail="Node is not a follower", + ) + + return len(state.qkd_follower_basis_list) == settings.qkd_settings.bitstring_length + + +async def _wait_for_follower_ready(state: NodeState, http_client: httpx.AsyncClient): + """Poll the follower until it's ready, checking every 0.5 seconds.""" + while True: + try: + r = await http_client.get(f"http://{state.followers_address}/qkd/is_follower_ready") + if r.status_code == status.HTTP_200_OK: + is_ready = r.json() + if is_ready: + logger.info("YAY FOLLOWER READY") + break + else: + logger.info("Tried checking if follower is ready, but it wasn't ready") + else: + logger.info("Tried checking if follower is ready, but received non-200 status code") + except Exception as e: + logger.info(f"Tried checking if follower is ready, but encountered error: {e}") + + await asyncio.sleep(0.5) + + +async def _submit_qkd_basis_list_leader( + state: NodeState, http_client: httpx.AsyncClient, basis_list: list[QKDEncodingBasis], timetagger_address: str +): state.qkd_leader_basis_list = basis_list + await _wait_for_follower_ready(state, http_client) + ret = await _qkd(state.followers_address, http_client, state, timetagger_address) + print("Final QKD bits:", ret) -def _submit_qkd_basis_list_follower(state: NodeState, http_client: httpx.Client, basis_list: list[QKDEncodingBasis]): +async def _submit_qkd_basis_list_follower( + state: NodeState, http_client: httpx.Client, basis_list: list[QKDEncodingBasis] +): state.qkd_follower_basis_list = basis_list @router.post("/submit_selection_and_start_qkd") -def submit_qkd_selection_and_start_qkd(state: StateDep, http_client: ClientDep, basis_list: list[str]): +async def submit_qkd_selection_and_start_qkd( + state: StateDep, http_client: ClientDep, basis_list: list[str], timetagger_address: str = "" +): """GUI calls this function to submit the QKD basis selection and start the QKD protocol. This call is called by both leader and follower, depending on the node role, different actions are taken.""" @@ -262,9 +326,9 @@ def submit_qkd_selection_and_start_qkd(state: StateDep, http_client: ClientDep, # Convert 'a' or 'b' strings to QKDEncodingBasis enum values qkd_basis_list = [] for basis_str in basis_list: - if basis_str.lower() == 'a': + if basis_str.lower() == "a": qkd_basis_list.append(QKDEncodingBasis.HV) - elif basis_str.lower() == 'b': + elif basis_str.lower() == "b": qkd_basis_list.append(QKDEncodingBasis.DA) else: logger.error(f"Invalid basis string: {basis_str}. Expected 'a' or 'b'") @@ -274,13 +338,15 @@ def submit_qkd_selection_and_start_qkd(state: StateDep, http_client: ClientDep, ) if state.leading: - ret = _submit_qkd_basis_list_leader(state, http_client, qkd_basis_list) + if timetagger_address == "": + logger.error("Leader must provide timetagger address to start QKD") + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Leader must provide timetagger address to start QKD", + ) + ret = await _submit_qkd_basis_list_leader(state, http_client, qkd_basis_list, timetagger_address) return ret if state.following: - ret = _submit_qkd_basis_list_follower(state, http_client, qkd_basis_list) + ret = await _submit_qkd_basis_list_follower(state, http_client, qkd_basis_list) return ret - - - - diff --git a/src/pqnstack/app/core/config.py b/src/pqnstack/app/core/config.py index 80338e0c..5e9cbeab 100644 --- a/src/pqnstack/app/core/config.py +++ b/src/pqnstack/app/core/config.py @@ -115,6 +115,7 @@ class NodeState(BaseModel): QKDEncodingBasis.HV, ] qkd_follower_basis_list: list[QKDEncodingBasis] = [] + qkd_single_bit_current_index: int = 0 # Current index in follower basis list for single_bit endpoint qkd_bit_list: list[int] = [] qkd_resulting_bit_list: list[int] = [] # Resulting bits after QKD qkd_request_basis_list: list[QKDEncodingBasis] = [] # Basis angles for QKD From 029bb24593eff76aff1d91c1e95d23f5ffeffd89 Mon Sep 17 00:00:00 2001 From: marcosf2 Date: Thu, 30 Oct 2025 10:49:02 -0500 Subject: [PATCH 17/23] adds qkd response to gui --- src/pqnstack/app/api/routes/coordination.py | 3 + src/pqnstack/app/api/routes/qkd.py | 84 ++++++++++++++++++++- src/pqnstack/app/core/config.py | 4 +- 3 files changed, 86 insertions(+), 5 deletions(-) diff --git a/src/pqnstack/app/api/routes/coordination.py b/src/pqnstack/app/api/routes/coordination.py index ab4d4078..563dfb65 100644 --- a/src/pqnstack/app/api/routes/coordination.py +++ b/src/pqnstack/app/api/routes/coordination.py @@ -45,9 +45,12 @@ async def reset_coordination_state(state: StateDep) -> ResetCoordinationStateRes state.following_requested_user_response = None state.leaders_address = "" state.leaders_name = "" + state.qkd_emoji_pick = "" state.qkd_question_order = [] state.qkd_leader_basis_list = [] state.qkd_follower_basis_list = [] + state.qkd_single_bit_current_index = 0 + state.qkd_n_matching_bits = -1 return ResetCoordinationStateResponse() diff --git a/src/pqnstack/app/api/routes/qkd.py b/src/pqnstack/app/api/routes/qkd.py index 684f0c90..c84ad183 100644 --- a/src/pqnstack/app/api/routes/qkd.py +++ b/src/pqnstack/app/api/routes/qkd.py @@ -9,6 +9,7 @@ from fastapi import APIRouter from fastapi import HTTPException from fastapi import status +from pydantic import BaseModel from pqnstack.app.api.deps import ClientDep from pqnstack.app.api.deps import StateDep @@ -25,6 +26,13 @@ router = APIRouter(prefix="/qkd", tags=["qkd"]) +class QKDResult(BaseModel): + n_matching_bits: int + n_total_bits: int + emoji: str + role: str + + async def _qkd( follower_node_address: str, http_client: ClientDep, @@ -192,6 +200,13 @@ def request_qkd_basis_list(leader_basis_list: list[str], state: StateDep) -> lis return ret +@router.post("/set_qkd_emoji") +def set_qkd_emoji(emoji: str, state: StateDep): + """Set the emoji pick for QKD.""" + state.qkd_emoji_pick = emoji + return {"message": "Emoji set successfully"} + + @router.get("/question_order") async def request_qkd_question_order( state: StateDep, @@ -266,6 +281,16 @@ async def is_follower_ready(state: StateDep) -> bool: return len(state.qkd_follower_basis_list) == settings.qkd_settings.bitstring_length +@router.post("/submit_qkd_result") +async def submit_qkd_result(result: QKDResult, state: StateDep) -> None: + """ + QKD leader calls this endpoint of the follower to submit the QKD result as well as the emoji chosen. + """ + state.qkd_emoji_pick = result.emoji + state.qkd_n_matching_bits = result.n_matching_bits + logger.info("Received QKD result from follower: %s", result) + + async def _wait_for_follower_ready(state: NodeState, http_client: httpx.AsyncClient): """Poll the follower until it's ready, checking every 0.5 seconds.""" while True: @@ -281,26 +306,77 @@ async def _wait_for_follower_ready(state: NodeState, http_client: httpx.AsyncCli else: logger.info("Tried checking if follower is ready, but received non-200 status code") except Exception as e: - logger.info(f"Tried checking if follower is ready, but encountered error: {e}") + logger.info("Tried checking if follower is ready, but encountered error: %s", e) await asyncio.sleep(0.5) +async def _submit_qkd_result_to_follower( + state: NodeState, http_client: httpx.AsyncClient, qkd_result: QKDResult +): + """Submit the QKD result to the follower node.""" + try: + r = await http_client.post( + f"http://{state.followers_address}/qkd/submit_qkd_result", + json=qkd_result.model_dump() + ) + if r.status_code != status.HTTP_200_OK: + logger.error("Failed to submit QKD result to follower: %s", r.text) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Failed to submit QKD result to follower", + ) + logger.info("Successfully submitted QKD result to follower") + except Exception as e: + logger.error("Error submitting QKD result to follower: %s", e) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Error submitting QKD result to follower: {e}", + ) from e + + async def _submit_qkd_basis_list_leader( state: NodeState, http_client: httpx.AsyncClient, basis_list: list[QKDEncodingBasis], timetagger_address: str -): +) -> QKDResult: state.qkd_leader_basis_list = basis_list await _wait_for_follower_ready(state, http_client) ret = await _qkd(state.followers_address, http_client, state, timetagger_address) - print("Final QKD bits:", ret) + logger.info("Final QKD bits: %s", str(ret)) + + # Assemble QKDResult object + qkd_result = QKDResult( + n_matching_bits=len(ret), + n_total_bits=settings.qkd_settings.bitstring_length, + emoji=state.qkd_emoji_pick, + role="leader" + ) + + # Submit result to follower + await _submit_qkd_result_to_follower(state, http_client, qkd_result) + return qkd_result async def _submit_qkd_basis_list_follower( state: NodeState, http_client: httpx.Client, basis_list: list[QKDEncodingBasis] -): +) -> QKDResult: state.qkd_follower_basis_list = basis_list + # Wait until the leader submits the QKD result (state.qkd_n_matching_bits != -1) + while state.qkd_n_matching_bits == -1: + await asyncio.sleep(0.5) + + # Reassemble the QKDResult object from the state + qkd_result = QKDResult( + n_matching_bits=state.qkd_n_matching_bits, + n_total_bits=settings.qkd_settings.bitstring_length, + emoji=state.qkd_emoji_pick, + role="follower" + ) + + logger.info("Follower received QKD result: %s", state.qkd_n_matching_bits) + return qkd_result + @router.post("/submit_selection_and_start_qkd") async def submit_qkd_selection_and_start_qkd( diff --git a/src/pqnstack/app/core/config.py b/src/pqnstack/app/core/config.py index 5e9cbeab..3939065b 100644 --- a/src/pqnstack/app/core/config.py +++ b/src/pqnstack/app/core/config.py @@ -27,7 +27,7 @@ class CHSHSettings(BaseModel): class QKDSettings(BaseModel): hwp: tuple[str, str] = ("", "") request_hwp: tuple[str, str] = ("", "") - bitstring_length: int = 4 + bitstring_length: int = 6 minimum_question_index: int = 1 maximum_question_index: int = 20 discriminating_threshold: int = 10 @@ -101,6 +101,7 @@ class NodeState(BaseModel): # QKD state # FIXME: At the moment the reset_coordination_state resets this, probably want to refactor that function out. qkd_question_order: list[int] = [] # Order of questions for QKD + qkd_emoji_pick: str = "" # Emoji chosen for QKD qkd_leader_basis_list: list[QKDEncodingBasis] = [ QKDEncodingBasis.DA, QKDEncodingBasis.DA, @@ -120,6 +121,7 @@ class NodeState(BaseModel): qkd_resulting_bit_list: list[int] = [] # Resulting bits after QKD qkd_request_basis_list: list[QKDEncodingBasis] = [] # Basis angles for QKD qkd_request_bit_list: list[int] = [] + qkd_n_matching_bits: int = -1 # Leaders populate this value after qkd is done. Same with the emoji state = NodeState() From 9f4de2e5688d01fd886ad59e3921b2a87ae97fd1 Mon Sep 17 00:00:00 2001 From: marcosf2 Date: Thu, 30 Oct 2025 11:28:31 -0500 Subject: [PATCH 18/23] Ruff and mypy checks --- src/pqnstack/app/api/routes/qkd.py | 76 +++++++++++++++--------------- src/pqnstack/app/core/config.py | 3 +- 2 files changed, 41 insertions(+), 38 deletions(-) diff --git a/src/pqnstack/app/api/routes/qkd.py b/src/pqnstack/app/api/routes/qkd.py index c84ad183..33547d60 100644 --- a/src/pqnstack/app/api/routes/qkd.py +++ b/src/pqnstack/app/api/routes/qkd.py @@ -13,7 +13,9 @@ from pqnstack.app.api.deps import ClientDep from pqnstack.app.api.deps import StateDep -from pqnstack.app.core.config import settings, NodeState +from pqnstack.app.core.config import NodeState +from pqnstack.app.core.config import qkd_result_received_event +from pqnstack.app.core.config import settings from pqnstack.constants import BasisBool from pqnstack.constants import QKDEncodingBasis from pqnstack.network.client import Client @@ -201,10 +203,9 @@ def request_qkd_basis_list(leader_basis_list: list[str], state: StateDep) -> lis @router.post("/set_qkd_emoji") -def set_qkd_emoji(emoji: str, state: StateDep): +def set_qkd_emoji(emoji: str, state: StateDep) -> None: """Set the emoji pick for QKD.""" state.qkd_emoji_pick = emoji - return {"message": "Emoji set successfully"} @router.get("/question_order") @@ -219,7 +220,6 @@ async def request_qkd_question_order( If this node is a follower, it requests the question order from the leader node. Returns the question order as a list of integers. """ - if state.leading and state.following: logger.error("Node cannot be both leader and follower") raise HTTPException( @@ -251,8 +251,8 @@ async def request_qkd_question_order( detail="Failed to get question order from leader", ) state.qkd_question_order = r.json() - except Exception as e: - logger.error("Error requesting question order from leader: %s", e) + except (httpx.HTTPError, httpx.RequestError, httpx.TimeoutException) as e: + logger.exception("Error requesting question order from leader: %s") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error requesting question order from leader: {e}", @@ -263,12 +263,15 @@ async def request_qkd_question_order( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Follower node has no leader address set", ) + return state.qkd_question_order @router.get("/is_follower_ready") async def is_follower_ready(state: StateDep) -> bool: - """Check if the follower node is ready for QKD. + """ + Check if the follower node is ready for QKD. + Follower is ready when the state has the basis list with as many choices as the bitstring length. """ if not state.following: @@ -283,15 +286,14 @@ async def is_follower_ready(state: StateDep) -> bool: @router.post("/submit_qkd_result") async def submit_qkd_result(result: QKDResult, state: StateDep) -> None: - """ - QKD leader calls this endpoint of the follower to submit the QKD result as well as the emoji chosen. - """ + """QKD leader calls this endpoint of the follower to submit the QKD result as well as the emoji chosen.""" state.qkd_emoji_pick = result.emoji state.qkd_n_matching_bits = result.n_matching_bits + qkd_result_received_event.set() # Signal that the result has been received logger.info("Received QKD result from follower: %s", result) -async def _wait_for_follower_ready(state: NodeState, http_client: httpx.AsyncClient): +async def _wait_for_follower_ready(state: NodeState, http_client: httpx.AsyncClient) -> None: """Poll the follower until it's ready, checking every 0.5 seconds.""" while True: try: @@ -299,13 +301,12 @@ async def _wait_for_follower_ready(state: NodeState, http_client: httpx.AsyncCli if r.status_code == status.HTTP_200_OK: is_ready = r.json() if is_ready: - logger.info("YAY FOLLOWER READY") + logger.info("Follower has all basis choices. Ready to start QKD") break - else: - logger.info("Tried checking if follower is ready, but it wasn't ready") + logger.info("Tried checking if follower is ready, but it wasn't ready") else: logger.info("Tried checking if follower is ready, but received non-200 status code") - except Exception as e: + except (httpx.HTTPError, httpx.RequestError, httpx.TimeoutException) as e: logger.info("Tried checking if follower is ready, but encountered error: %s", e) await asyncio.sleep(0.5) @@ -313,12 +314,11 @@ async def _wait_for_follower_ready(state: NodeState, http_client: httpx.AsyncCli async def _submit_qkd_result_to_follower( state: NodeState, http_client: httpx.AsyncClient, qkd_result: QKDResult -): +) -> None: """Submit the QKD result to the follower node.""" try: r = await http_client.post( - f"http://{state.followers_address}/qkd/submit_qkd_result", - json=qkd_result.model_dump() + f"http://{state.followers_address}/qkd/submit_qkd_result", json=qkd_result.model_dump() ) if r.status_code != status.HTTP_200_OK: logger.error("Failed to submit QKD result to follower: %s", r.text) @@ -327,8 +327,8 @@ async def _submit_qkd_result_to_follower( detail="Failed to submit QKD result to follower", ) logger.info("Successfully submitted QKD result to follower") - except Exception as e: - logger.error("Error submitting QKD result to follower: %s", e) + except (httpx.HTTPError, httpx.RequestError, httpx.TimeoutException) as e: + logger.exception("Error submitting QKD result to follower") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error submitting QKD result to follower: {e}", @@ -349,7 +349,7 @@ async def _submit_qkd_basis_list_leader( n_matching_bits=len(ret), n_total_bits=settings.qkd_settings.bitstring_length, emoji=state.qkd_emoji_pick, - role="leader" + role="leader", ) # Submit result to follower @@ -357,23 +357,25 @@ async def _submit_qkd_basis_list_leader( return qkd_result -async def _submit_qkd_basis_list_follower( - state: NodeState, http_client: httpx.Client, basis_list: list[QKDEncodingBasis] -) -> QKDResult: +async def _submit_qkd_basis_list_follower(state: NodeState, basis_list: list[QKDEncodingBasis]) -> QKDResult: state.qkd_follower_basis_list = basis_list - # Wait until the leader submits the QKD result (state.qkd_n_matching_bits != -1) - while state.qkd_n_matching_bits == -1: - await asyncio.sleep(0.5) + # don't wait for the event if the result is already set. This avoids deadlocks in case the result was set before this function is called. + if state.qkd_n_matching_bits == -1: + # Wait until the leader submits the QKD result + await qkd_result_received_event.wait() # Reassemble the QKDResult object from the state qkd_result = QKDResult( n_matching_bits=state.qkd_n_matching_bits, n_total_bits=settings.qkd_settings.bitstring_length, emoji=state.qkd_emoji_pick, - role="follower" + role="follower", ) + # Clear the event for the next QKD run + qkd_result_received_event.clear() + logger.info("Follower received QKD result: %s", state.qkd_n_matching_bits) return qkd_result @@ -381,10 +383,12 @@ async def _submit_qkd_basis_list_follower( @router.post("/submit_selection_and_start_qkd") async def submit_qkd_selection_and_start_qkd( state: StateDep, http_client: ClientDep, basis_list: list[str], timetagger_address: str = "" -): - """GUI calls this function to submit the QKD basis selection and start the QKD protocol. - This call is called by both leader and follower, depending on the node role, different actions are taken.""" +) -> QKDResult: + """ + GUI calls this function to submit the QKD basis selection and start the QKD protocol. + This call is called by both leader and follower, depending on the node role, different actions are taken. + """ if state.leading and state.following: logger.error("Node cannot be both leader and follower") raise HTTPException( @@ -407,7 +411,7 @@ async def submit_qkd_selection_and_start_qkd( elif basis_str.lower() == "b": qkd_basis_list.append(QKDEncodingBasis.DA) else: - logger.error(f"Invalid basis string: {basis_str}. Expected 'a' or 'b'") + logger.exception("Invalid basis string: %s. Expected 'a' or 'b'", basis_str) raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid basis string: {basis_str}. Expected 'a' or 'b'", @@ -420,9 +424,7 @@ async def submit_qkd_selection_and_start_qkd( status_code=status.HTTP_400_BAD_REQUEST, detail="Leader must provide timetagger address to start QKD", ) - ret = await _submit_qkd_basis_list_leader(state, http_client, qkd_basis_list, timetagger_address) - return ret + return await _submit_qkd_basis_list_leader(state, http_client, qkd_basis_list, timetagger_address) - if state.following: - ret = await _submit_qkd_basis_list_follower(state, http_client, qkd_basis_list) - return ret + # If the node is not leading, it is assumed it is a follower due to previous check + return await _submit_qkd_basis_list_follower(state, qkd_basis_list) diff --git a/src/pqnstack/app/core/config.py b/src/pqnstack/app/core/config.py index 3939065b..e572a5a3 100644 --- a/src/pqnstack/app/core/config.py +++ b/src/pqnstack/app/core/config.py @@ -101,7 +101,7 @@ class NodeState(BaseModel): # QKD state # FIXME: At the moment the reset_coordination_state resets this, probably want to refactor that function out. qkd_question_order: list[int] = [] # Order of questions for QKD - qkd_emoji_pick: str = "" # Emoji chosen for QKD + qkd_emoji_pick: str = "" # Emoji chosen for QKD qkd_leader_basis_list: list[QKDEncodingBasis] = [ QKDEncodingBasis.DA, QKDEncodingBasis.DA, @@ -127,6 +127,7 @@ class NodeState(BaseModel): state = NodeState() ask_user_for_follow_event = asyncio.Event() user_replied_event = asyncio.Event() +qkd_result_received_event = asyncio.Event() def get_state() -> NodeState: From 842082cf1d68195569931824b82bd45c159bc1d4 Mon Sep 17 00:00:00 2001 From: marcosf2 Date: Mon, 3 Nov 2025 17:18:32 -0600 Subject: [PATCH 19/23] reduce maximum question index to 8 and remove node address from connection request message --- src/pqnstack/app/api/routes/coordination.py | 4 +--- src/pqnstack/app/core/config.py | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/pqnstack/app/api/routes/coordination.py b/src/pqnstack/app/api/routes/coordination.py index 563dfb65..001d680d 100644 --- a/src/pqnstack/app/api/routes/coordination.py +++ b/src/pqnstack/app/api/routes/coordination.py @@ -160,9 +160,7 @@ async def ask_user_for_follow_handler() -> None: if state.following_requested: logger.debug("Websocket detected a follow request, asking user for response.") if websocket.client_state.name == "CONNECTED": - await websocket.send_text( - f"Do you want to accept a connection from {state.leaders_name} ({state.leaders_address})?" - ) + await websocket.send_text(f"Do you want to accept a connection from {state.leaders_name}?") else: logger.debug("WebSocket not connected, cannot send message") break diff --git a/src/pqnstack/app/core/config.py b/src/pqnstack/app/core/config.py index e572a5a3..3e243348 100644 --- a/src/pqnstack/app/core/config.py +++ b/src/pqnstack/app/core/config.py @@ -29,7 +29,7 @@ class QKDSettings(BaseModel): request_hwp: tuple[str, str] = ("", "") bitstring_length: int = 6 minimum_question_index: int = 1 - maximum_question_index: int = 20 + maximum_question_index: int = 8 discriminating_threshold: int = 10 measurement_config: MeasurementConfig = Field(default_factory=lambda: MeasurementConfig(integration_time_s=5)) From 7d922f4cce5eba9b55031cade8ed1f46781f16bd Mon Sep 17 00:00:00 2001 From: marcosf2 Date: Tue, 4 Nov 2025 15:22:46 -0600 Subject: [PATCH 20/23] refactor CHSH calculation and add expectation signs to settings --- src/pqnstack/app/api/routes/chsh.py | 14 +++----------- src/pqnstack/app/api/routes/coordination.py | 1 + src/pqnstack/app/core/config.py | 1 + 3 files changed, 5 insertions(+), 11 deletions(-) diff --git a/src/pqnstack/app/api/routes/chsh.py b/src/pqnstack/app/api/routes/chsh.py index ebd85dfb..26b821c2 100644 --- a/src/pqnstack/app/api/routes/chsh.py +++ b/src/pqnstack/app/api/routes/chsh.py @@ -94,18 +94,10 @@ async def _chsh( # Complexity is high due to the nature of the CHSH experiment. logger.info("Expectation values: %s", expectation_values) logger.info("Expectation errors: %s", expectation_errors) - negative_count = sum(1 for v in expectation_values if v < 0) - negative_indices = [i for i, v in enumerate(expectation_values) if v < 0] - impossible_counts = [0, 2, 4] + # FIXME: This is a temporary fix for handling impossible expectation values. We should not have to rely on the settings for this. + expectation_values = [x*y for x,y in zip(expectation_values, settings.chsh_settings.expectation_signs)] - if negative_count in impossible_counts: - msg = f"Impossible negative expectation values found: {negative_indices}, expectation_values = {expectation_values}, expectation_errors = {expectation_errors}" - raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=msg) - - if len(negative_indices) > 1 or negative_indices[0] != 0: - logger.warning("Expectation values have unexpected negative indices: %s", negative_indices) - - chsh_value = sum(abs(x) for x in expectation_values) + chsh_value = sum(x for x in expectation_values) chsh_error = sum(x**2 for x in expectation_errors) ** 0.5 return chsh_value, chsh_error diff --git a/src/pqnstack/app/api/routes/coordination.py b/src/pqnstack/app/api/routes/coordination.py index 001d680d..e2cc9688 100644 --- a/src/pqnstack/app/api/routes/coordination.py +++ b/src/pqnstack/app/api/routes/coordination.py @@ -46,6 +46,7 @@ async def reset_coordination_state(state: StateDep) -> ResetCoordinationStateRes state.leaders_address = "" state.leaders_name = "" state.qkd_emoji_pick = "" + state.qkd_bit_list = [] state.qkd_question_order = [] state.qkd_leader_basis_list = [] state.qkd_follower_basis_list = [] diff --git a/src/pqnstack/app/core/config.py b/src/pqnstack/app/core/config.py index 3e243348..a6659792 100644 --- a/src/pqnstack/app/core/config.py +++ b/src/pqnstack/app/core/config.py @@ -22,6 +22,7 @@ class CHSHSettings(BaseModel): hwp: tuple[str, str] = ("", "") request_hwp: tuple[str, str] = ("", "") measurement_config: MeasurementConfig = Field(default_factory=lambda: MeasurementConfig(integration_time_s=5)) + expectation_signs: tuple[int, int, int, int] = (1, 1, 1, -1) class QKDSettings(BaseModel): From b6b8e471095141f517b72d249f2108c86aee432c Mon Sep 17 00:00:00 2001 From: marcosf2 Date: Tue, 4 Nov 2025 16:00:42 -0600 Subject: [PATCH 21/23] Adding logging calls --- src/pqnstack/app/api/routes/chsh.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/pqnstack/app/api/routes/chsh.py b/src/pqnstack/app/api/routes/chsh.py index 26b821c2..559afbcf 100644 --- a/src/pqnstack/app/api/routes/chsh.py +++ b/src/pqnstack/app/api/routes/chsh.py @@ -96,7 +96,9 @@ async def _chsh( # Complexity is high due to the nature of the CHSH experiment. # FIXME: This is a temporary fix for handling impossible expectation values. We should not have to rely on the settings for this. expectation_values = [x*y for x,y in zip(expectation_values, settings.chsh_settings.expectation_signs)] + logger.info("What are you settings? %s", settings.chsh_settings.expectation_signs) + logger.info("After passing signed calculation: %s", expectation_values) chsh_value = sum(x for x in expectation_values) chsh_error = sum(x**2 for x in expectation_errors) ** 0.5 From 56aa7deba359edcbd1708891c95873d861b680dc Mon Sep 17 00:00:00 2001 From: marcosf2 Date: Tue, 4 Nov 2025 16:30:54 -0600 Subject: [PATCH 22/23] Adds modulo for basis. --- src/pqnstack/app/api/routes/chsh.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/pqnstack/app/api/routes/chsh.py b/src/pqnstack/app/api/routes/chsh.py index 559afbcf..6a66989e 100644 --- a/src/pqnstack/app/api/routes/chsh.py +++ b/src/pqnstack/app/api/routes/chsh.py @@ -44,6 +44,7 @@ async def _chsh( # Complexity is high due to the nature of the CHSH experiment. expectation_values = [] expectation_errors = [] + basis = [0, abs(basis[0] - basis[1]) % 90] for angle in basis: # Going through my basis angles for i in range(2): # Going through follower basis angles counts = [] From 770ec5e2d838c5b6a8c4dc1d8650732b6d606d34 Mon Sep 17 00:00:00 2001 From: marcosf2 Date: Tue, 4 Nov 2025 18:10:28 -0600 Subject: [PATCH 23/23] Added resetting to missing qkd state variables --- src/pqnstack/app/api/routes/coordination.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/pqnstack/app/api/routes/coordination.py b/src/pqnstack/app/api/routes/coordination.py index e2cc9688..e7e6dc64 100644 --- a/src/pqnstack/app/api/routes/coordination.py +++ b/src/pqnstack/app/api/routes/coordination.py @@ -51,6 +51,9 @@ async def reset_coordination_state(state: StateDep) -> ResetCoordinationStateRes state.qkd_leader_basis_list = [] state.qkd_follower_basis_list = [] state.qkd_single_bit_current_index = 0 + state.qkd_resulting_bit_list = [] + state.qkd_request_basis_list = [] + state.qkd_request_bit_list = [] state.qkd_n_matching_bits = -1 return ResetCoordinationStateResponse()