From 9159a6b96202128d731079a2310881186cb0889a Mon Sep 17 00:00:00 2001 From: Alexander Twrdik <6052859+DiCanio@users.noreply.github.com> Date: Tue, 5 Aug 2025 11:40:23 +0200 Subject: [PATCH 1/2] Make Discovery Endpoints Return NodeID Ensures that discovery endpoints actually return the real node ID instead of shadowing the underlying ID of the robot account. --- .../discovery/DiscoveryController.java | 4 ++-- .../discovery/DiscoveryService.java | 5 +++-- .../discovery/Participant.java | 8 +++++--- .../discovery/DiscoveryControllerIT.java | 16 ++++++++-------- .../discovery/DiscoveryServiceIT.java | 10 +++++----- 5 files changed, 23 insertions(+), 20 deletions(-) diff --git a/src/main/java/de/privateaim/node_message_broker/discovery/DiscoveryController.java b/src/main/java/de/privateaim/node_message_broker/discovery/DiscoveryController.java index 88a50c0..a5711c0 100644 --- a/src/main/java/de/privateaim/node_message_broker/discovery/DiscoveryController.java +++ b/src/main/java/de/privateaim/node_message_broker/discovery/DiscoveryController.java @@ -37,7 +37,7 @@ Mono>> discoverAllParticipants(@PathVar return discoveryService.discoverAllParticipantsOfAnalysis(analysisId) .collectList() .map(participants -> participants.stream() - .map(p -> new ParticipantResponse(p.nodeRobotId(), p.nodeType())) + .map(p -> new ParticipantResponse(p.nodeId(), p.nodeType())) .toList()) .map(ResponseEntity::ok) .onErrorMap(AnalysisNodesLookupException.class, err -> @@ -51,7 +51,7 @@ Mono> discoverSelf(@PathVariable String anal } return discoveryService.discoverSelfInAnalysis(analysisId) - .map(p -> new ParticipantResponse(p.nodeRobotId(), p.nodeType())) + .map(p -> new ParticipantResponse(p.nodeId(), p.nodeType())) .map(ResponseEntity::ok) .onErrorMap(AnalysisNodesLookupException.class, err -> new ResponseStatusException(HttpStatus.BAD_GATEWAY, err.getMessage(), err)) diff --git a/src/main/java/de/privateaim/node_message_broker/discovery/DiscoveryService.java b/src/main/java/de/privateaim/node_message_broker/discovery/DiscoveryService.java index d99169f..6aaf783 100644 --- a/src/main/java/de/privateaim/node_message_broker/discovery/DiscoveryService.java +++ b/src/main/java/de/privateaim/node_message_broker/discovery/DiscoveryService.java @@ -35,7 +35,7 @@ public DiscoveryService(@NotNull HubClient hubClient, * identifier. * * @param analysisId unique identifier of the analysis whose participating nodes shall get discovered - * @return All participants of the analysis if there are any.s + * @return All participants of the analysis if there are any. */ Flux discoverAllParticipantsOfAnalysis(@NotNull String analysisId) { if (analysisId == null) { @@ -50,6 +50,7 @@ Flux discoverAllParticipantsOfAnalysis(@NotNull String analysisId) "analysis nodes", err)) .flatMapIterable(analysisNodes -> analysisNodes.stream() .map(analysisNode -> new Participant( + analysisNode.node.id, analysisNode.node.robotId, ParticipantType.fromRepresentation(analysisNode.node.type) )).toList()) @@ -77,7 +78,7 @@ Mono discoverSelfInAnalysis(@NotNull String analysisId) { .collectList() .flatMap(participants -> { var selfParticipants = participants.stream() - .filter(p -> p.nodeRobotId().equals(selfRobotId)) + .filter(p -> p.robotId().equals(selfRobotId)) .toList(); if (selfParticipants.isEmpty()) { diff --git a/src/main/java/de/privateaim/node_message_broker/discovery/Participant.java b/src/main/java/de/privateaim/node_message_broker/discovery/Participant.java index 1a3d3bd..ebbfceb 100644 --- a/src/main/java/de/privateaim/node_message_broker/discovery/Participant.java +++ b/src/main/java/de/privateaim/node_message_broker/discovery/Participant.java @@ -5,11 +5,13 @@ /** * Represents a participant in its internal form. * - * @param nodeRobotId unique identifier of a node's robot account associated with it - * @param nodeType type of the node (default | aggregator) + * @param nodeId unique identifier of a node + * @param robotId unique identifier of a node's robot account (used for internal communications & authentication) + * @param nodeType type of the node (default | aggregator) */ public record Participant( - String nodeRobotId, + String nodeId, + String robotId, ParticipantType nodeType ) { } diff --git a/src/test/java/de/privateaim/node_message_broker/discovery/DiscoveryControllerIT.java b/src/test/java/de/privateaim/node_message_broker/discovery/DiscoveryControllerIT.java index c067b3b..1426ea6 100644 --- a/src/test/java/de/privateaim/node_message_broker/discovery/DiscoveryControllerIT.java +++ b/src/test/java/de/privateaim/node_message_broker/discovery/DiscoveryControllerIT.java @@ -9,9 +9,9 @@ import org.mockito.Mockito; import org.springframework.boot.autoconfigure.security.reactive.ReactiveSecurityAutoConfiguration; import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest; -import org.springframework.boot.test.mock.mockito.MockBean; import org.springframework.context.ApplicationContext; import org.springframework.http.HttpStatus; +import org.springframework.test.context.bean.override.mockito.MockitoBean; import org.springframework.test.web.reactive.server.WebTestClient; import org.testcontainers.shaded.com.fasterxml.jackson.databind.ObjectMapper; import reactor.core.publisher.Flux; @@ -30,7 +30,7 @@ public final class DiscoveryControllerIT { private static final String ANALYSIS_ID = "ana-123"; private static final ObjectMapper JSON = new ObjectMapper(); - @MockBean + @MockitoBean private DiscoveryService mockedDiscoveryService; private WebTestClient client; @@ -96,8 +96,8 @@ void returns500OnUnknownError() { @Test void returns200WithAllDiscoveredParticipants() throws IOException { var participants = List.of( - new Participant("123", ParticipantType.AGGREGATOR), - new Participant("456", ParticipantType.DEFAULT) + new Participant("123", "abc", ParticipantType.AGGREGATOR), + new Participant("456", "def", ParticipantType.DEFAULT) ); Mockito.doReturn(Flux.fromIterable(participants)) .when(mockedDiscoveryService) @@ -113,9 +113,9 @@ void returns200WithAllDiscoveredParticipants() throws IOException { assertEquals(participants.size(), clientReceivedParticipants.size()); assertEquals(participants.getFirst().nodeType(), clientReceivedParticipants.getFirst().getNodeType()); - assertEquals(participants.getFirst().nodeRobotId(), clientReceivedParticipants.getFirst().nodeId); + assertEquals(participants.getFirst().nodeId(), clientReceivedParticipants.getFirst().nodeId); assertEquals(participants.getLast().nodeType(), clientReceivedParticipants.getLast().getNodeType()); - assertEquals(participants.getLast().nodeRobotId(), clientReceivedParticipants.getLast().nodeId); + assertEquals(participants.getLast().nodeId(), clientReceivedParticipants.getLast().nodeId); } } @@ -164,7 +164,7 @@ void returns404IfNoParticipantResemblesTheSelfNode() { @Test void returns200WithTheDiscoveredSelfNode() throws IOException { - var self = new Participant("robot-123", ParticipantType.AGGREGATOR); + var self = new Participant("node-123", "robot-123", ParticipantType.AGGREGATOR); Mockito.doReturn(Mono.just(self)) .when(mockedDiscoveryService) .discoverSelfInAnalysis(ANALYSIS_ID); @@ -178,7 +178,7 @@ void returns200WithTheDiscoveredSelfNode() throws IOException { assertNotNull(clientReceivedSelf); assertEquals(self.nodeType(), clientReceivedSelf.getNodeType()); - assertEquals(self.nodeRobotId(), clientReceivedSelf.getNodeId()); + assertEquals(self.nodeId(), clientReceivedSelf.getNodeId()); } } } diff --git a/src/test/java/de/privateaim/node_message_broker/discovery/DiscoveryServiceIT.java b/src/test/java/de/privateaim/node_message_broker/discovery/DiscoveryServiceIT.java index 0290ad2..2bc572d 100644 --- a/src/test/java/de/privateaim/node_message_broker/discovery/DiscoveryServiceIT.java +++ b/src/test/java/de/privateaim/node_message_broker/discovery/DiscoveryServiceIT.java @@ -77,12 +77,12 @@ void returnsParticipatingAnalysisNodes() throws JsonProcessingException { assertEquals(participatingAnalysisNodes.size(), discoveredParticipants.size()); assertEquals(participatingAnalysisNodes.getFirst().node.type, discoveredParticipants.getFirst().nodeType().getRepresentation()); - assertEquals(participatingAnalysisNodes.getFirst().node.robotId, - discoveredParticipants.getFirst().nodeRobotId()); + assertEquals(participatingAnalysisNodes.getFirst().node.id, + discoveredParticipants.getFirst().nodeId()); assertEquals(participatingAnalysisNodes.getLast().node.type, discoveredParticipants.getLast().nodeType().getRepresentation()); - assertEquals(participatingAnalysisNodes.getLast().node.robotId, - discoveredParticipants.getLast().nodeRobotId()); + assertEquals(participatingAnalysisNodes.getLast().node.id, + discoveredParticipants.getLast().nodeId()); } @Test @@ -154,7 +154,7 @@ void returnsSelfDiscoveredParticipatingAnalysisNode() throws JsonProcessingExcep .setBody(JSON.writeValueAsString(mockedHubResponse))); StepVerifier.create(discoveryService.discoverSelfInAnalysis(ANALYSIS_ID)) - .expectNext(new Participant(SELF_ROBOT_ID, ParticipantType.AGGREGATOR)) + .expectNext(new Participant("node-1", SELF_ROBOT_ID, ParticipantType.AGGREGATOR)) .verifyComplete(); } } From b580115ab3ba3d3fd45aa5a900280a2d3cf1b84d Mon Sep 17 00:00:00 2001 From: Alexander Twrdik <6052859+DiCanio@users.noreply.github.com> Date: Tue, 5 Aug 2025 13:10:18 +0200 Subject: [PATCH 2/2] Map NodeId to RobotId When Emitting Messages Ensures that node IDs are used for specifying message receivers and maps them internally to a robot ID. Mapping this information is crucial since the robot ID is used for the underlying communication via a socket and is not known to the client. The node ID however, is well known to the client as it can get fetched using discovery endpoints. --- .../message/MessageService.java | 40 ++++++++++++++----- .../message/MessageServiceIT.java | 8 ++-- 2 files changed, 33 insertions(+), 15 deletions(-) diff --git a/src/main/java/de/privateaim/node_message_broker/message/MessageService.java b/src/main/java/de/privateaim/node_message_broker/message/MessageService.java index b3edb8c..76f9aa3 100644 --- a/src/main/java/de/privateaim/node_message_broker/message/MessageService.java +++ b/src/main/java/de/privateaim/node_message_broker/message/MessageService.java @@ -16,6 +16,7 @@ import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; @@ -65,11 +66,12 @@ Mono sendBroadcastMessage(@NotNull String analysisId, @NotNull MessageBroa return Mono.error(new NullPointerException("message request must not be null")); } - return getRobotIdsOffAllOtherParticipatingAnalysisNodes(analysisId) + return getParticipantsOffAllOtherParticipatingAnalysisNodes(analysisId) .onErrorMap(err -> new AnalysisNodesLookupException("could not look up analysis nodes for analysis `%s`" .formatted(analysisId), err)) - .flatMap(robotIds -> { - var messages = buildIndividualMessages(analysisId, messageReq.message, robotIds.stream().toList()); + .flatMap(participants -> { + var participantsRobotIds = participants.stream().map(p -> p.robotId).toList(); + var messages = buildIndividualMessages(analysisId, messageReq.message, participantsRobotIds); return sendIndividualMessages(messages); }); } @@ -99,13 +101,13 @@ Mono sendMessageToSelectedRecipients(@NotNull String analysisId, @NotNull return Mono.error(new IllegalArgumentException("recipients must not be empty")); } - return getRobotIdsOffAllOtherParticipatingAnalysisNodes(analysisId) + return getParticipantsOffAllOtherParticipatingAnalysisNodes(analysisId) .onErrorMap(err -> new AnalysisNodesLookupException("could not look up analysis nodes", err)) - .flatMap(robotIds -> { - if (robotIds.containsAll(messageReq.recipients)) { - + .flatMap(participants -> { + var participantsNodeIds = participants.stream().map(p -> p.nodeId).toList(); + if (participantsNodeIds.containsAll(messageReq.recipients)) { return sendIndividualMessages(buildIndividualMessages(analysisId, messageReq.message, - messageReq.recipients)); + mapNodeIdsToRobotIds(participants, messageReq.recipients))); } else { return Mono.error(new InvalidMessageRecipientsException("list of recipients contains at least " + "one recipients that is not part of the analysis")); @@ -122,11 +124,11 @@ private Mono sendIndividualMessages(Flux messages) { .then(Mono.empty()); } - private Mono> getRobotIdsOffAllOtherParticipatingAnalysisNodes(String analysisId) { + private Mono> getParticipantsOffAllOtherParticipatingAnalysisNodes(String analysisId) { return hubClient.fetchAnalysisNodes(analysisId) .map(nodes -> nodes.stream() - .map(n -> n.node.robotId) - .filter(id -> !id.equals(selfRobotId)) + .map(n -> new AnalysisParticipant(n.node.id, n.node.robotId)) + .filter(participant -> !participant.robotId.equals(selfRobotId)) .collect(Collectors.toSet())); } @@ -143,4 +145,20 @@ private Flux buildIndividualMessages(String analysisId, JsonNode me .toList()) .onErrorMap(err -> new RuntimeException("could not prepare individual messages", err)); } + + private List mapNodeIdsToRobotIds(Set participants, List nodeIds) { + var nodeIdRobotIdMapping = participants.stream() + .map(p -> Map.entry(p.nodeId(), p.robotId())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + return nodeIds.stream() + .map(nodeIdRobotIdMapping::get) + .toList(); + } + + private record AnalysisParticipant( + String nodeId, + String robotId // for internal communication usage + ) { + } } diff --git a/src/test/java/de/privateaim/node_message_broker/message/MessageServiceIT.java b/src/test/java/de/privateaim/node_message_broker/message/MessageServiceIT.java index caedb6a..2914b82 100644 --- a/src/test/java/de/privateaim/node_message_broker/message/MessageServiceIT.java +++ b/src/test/java/de/privateaim/node_message_broker/message/MessageServiceIT.java @@ -98,7 +98,7 @@ public void failsIfMessageRequestsRecipientsThatAreNotPartOfTheAnalysis() throws .setBody(JSON.writeValueAsString(mockedHubResponse))); var messageRequest = new MessageRequest(); - messageRequest.recipients = List.of("robot-not-in-list"); + messageRequest.recipients = List.of("node-not-in-list"); messageRequest.message = JsonNodeFactory.instance.objectNode(); StepVerifier.create(messageService.sendMessageToSelectedRecipients("123", messageRequest)) @@ -115,7 +115,7 @@ public void stillSucceedsIfSingleMessageCannotGetEmitted() throws JsonProcessing var mockedHubResponse = new HubResponseContainer<>(testAnalysisNodes); var messageRequest = new MessageRequest(); - messageRequest.recipients = List.of("robot-1", "robot-2"); + messageRequest.recipients = List.of("node-1", "node-2"); messageRequest.message = JsonNodeFactory.instance.objectNode(); mockWebServer.enqueue(new MockResponse().setResponseCode(HttpStatus.SC_OK) @@ -143,7 +143,7 @@ public void remainingMessagesAreTriedToGetEmittedAfterPreviousOneFails() throws var mockedHubResponse = new HubResponseContainer<>(testAnalysisNodes); var messageRequest = new MessageRequest(); - messageRequest.recipients = List.of("robot-1", "robot-2", "robot-3"); + messageRequest.recipients = List.of("node-1", "node-2", "node-3"); messageRequest.message = JsonNodeFactory.instance.objectNode(); mockWebServer.enqueue(new MockResponse().setResponseCode(HttpStatus.SC_OK) @@ -172,7 +172,7 @@ public void unableToSendMessageToSelf() throws JsonProcessingException { var mockedHubResponse = new HubResponseContainer<>(testAnalysisNodes); var messageRequest = new MessageRequest(); - messageRequest.recipients = List.of("robot-1", "robot-2", SELF_ROBOT_ID); + messageRequest.recipients = List.of("node-1", "node-2", "node-3"); messageRequest.message = JsonNodeFactory.instance.objectNode(); mockWebServer.enqueue(new MockResponse().setResponseCode(HttpStatus.SC_OK)