Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ Mono<ResponseEntity<List<ParticipantResponse>>> discoverAllParticipants(@PathVar
return discoveryService.discoverAllParticipantsOfAnalysis(analysisId)
.collectList()
.map(participants -> participants.stream()
.map(p -> new ParticipantResponse(p.nodeRobotId(), p.nodeType()))
.map(p -> new ParticipantResponse(p.nodeId(), p.nodeType()))
.toList())
.map(ResponseEntity::ok)
.onErrorMap(AnalysisNodesLookupException.class, err ->
Expand All @@ -51,7 +51,7 @@ Mono<ResponseEntity<ParticipantResponse>> discoverSelf(@PathVariable String anal
}

return discoveryService.discoverSelfInAnalysis(analysisId)
.map(p -> new ParticipantResponse(p.nodeRobotId(), p.nodeType()))
.map(p -> new ParticipantResponse(p.nodeId(), p.nodeType()))
.map(ResponseEntity::ok)
.onErrorMap(AnalysisNodesLookupException.class, err ->
new ResponseStatusException(HttpStatus.BAD_GATEWAY, err.getMessage(), err))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public DiscoveryService(@NotNull HubClient hubClient,
* identifier.
*
* @param analysisId unique identifier of the analysis whose participating nodes shall get discovered
* @return All participants of the analysis if there are any.s
* @return All participants of the analysis if there are any.
*/
Flux<Participant> discoverAllParticipantsOfAnalysis(@NotNull String analysisId) {
if (analysisId == null) {
Expand All @@ -50,6 +50,7 @@ Flux<Participant> discoverAllParticipantsOfAnalysis(@NotNull String analysisId)
"analysis nodes", err))
.flatMapIterable(analysisNodes -> analysisNodes.stream()
.map(analysisNode -> new Participant(
analysisNode.node.id,
analysisNode.node.robotId,
ParticipantType.fromRepresentation(analysisNode.node.type)
)).toList())
Expand Down Expand Up @@ -77,7 +78,7 @@ Mono<Participant> discoverSelfInAnalysis(@NotNull String analysisId) {
.collectList()
.flatMap(participants -> {
var selfParticipants = participants.stream()
.filter(p -> p.nodeRobotId().equals(selfRobotId))
.filter(p -> p.robotId().equals(selfRobotId))
.toList();

if (selfParticipants.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
/**
* Represents a participant in its internal form.
*
* @param nodeRobotId unique identifier of a node's robot account associated with it
* @param nodeType type of the node (default | aggregator)
* @param nodeId unique identifier of a node
* @param robotId unique identifier of a node's robot account (used for internal communications & authentication)
* @param nodeType type of the node (default | aggregator)
*/
public record Participant(
String nodeRobotId,
String nodeId,
String robotId,
ParticipantType nodeType
) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -65,11 +66,12 @@ Mono<Void> sendBroadcastMessage(@NotNull String analysisId, @NotNull MessageBroa
return Mono.error(new NullPointerException("message request must not be null"));
}

return getRobotIdsOffAllOtherParticipatingAnalysisNodes(analysisId)
return getParticipantsOffAllOtherParticipatingAnalysisNodes(analysisId)
.onErrorMap(err -> new AnalysisNodesLookupException("could not look up analysis nodes for analysis `%s`"
.formatted(analysisId), err))
.flatMap(robotIds -> {
var messages = buildIndividualMessages(analysisId, messageReq.message, robotIds.stream().toList());
.flatMap(participants -> {
var participantsRobotIds = participants.stream().map(p -> p.robotId).toList();
var messages = buildIndividualMessages(analysisId, messageReq.message, participantsRobotIds);
return sendIndividualMessages(messages);
});
}
Expand Down Expand Up @@ -99,13 +101,13 @@ Mono<Void> sendMessageToSelectedRecipients(@NotNull String analysisId, @NotNull
return Mono.error(new IllegalArgumentException("recipients must not be empty"));
}

return getRobotIdsOffAllOtherParticipatingAnalysisNodes(analysisId)
return getParticipantsOffAllOtherParticipatingAnalysisNodes(analysisId)
.onErrorMap(err -> new AnalysisNodesLookupException("could not look up analysis nodes", err))
.flatMap(robotIds -> {
if (robotIds.containsAll(messageReq.recipients)) {

.flatMap(participants -> {
var participantsNodeIds = participants.stream().map(p -> p.nodeId).toList();
if (participantsNodeIds.containsAll(messageReq.recipients)) {
return sendIndividualMessages(buildIndividualMessages(analysisId, messageReq.message,
messageReq.recipients));
mapNodeIdsToRobotIds(participants, messageReq.recipients)));
} else {
return Mono.error(new InvalidMessageRecipientsException("list of recipients contains at least " +
"one recipients that is not part of the analysis"));
Expand All @@ -122,11 +124,11 @@ private Mono<Void> sendIndividualMessages(Flux<EmitMessage> messages) {
.then(Mono.empty());
}

private Mono<Set<String>> getRobotIdsOffAllOtherParticipatingAnalysisNodes(String analysisId) {
private Mono<Set<AnalysisParticipant>> getParticipantsOffAllOtherParticipatingAnalysisNodes(String analysisId) {
return hubClient.fetchAnalysisNodes(analysisId)
.map(nodes -> nodes.stream()
.map(n -> n.node.robotId)
.filter(id -> !id.equals(selfRobotId))
.map(n -> new AnalysisParticipant(n.node.id, n.node.robotId))
.filter(participant -> !participant.robotId.equals(selfRobotId))
.collect(Collectors.toSet()));
}

Expand All @@ -143,4 +145,20 @@ private Flux<EmitMessage> buildIndividualMessages(String analysisId, JsonNode me
.toList())
.onErrorMap(err -> new RuntimeException("could not prepare individual messages", err));
}

private List<String> mapNodeIdsToRobotIds(Set<AnalysisParticipant> participants, List<String> nodeIds) {
var nodeIdRobotIdMapping = participants.stream()
.map(p -> Map.entry(p.nodeId(), p.robotId()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

return nodeIds.stream()
.map(nodeIdRobotIdMapping::get)
.toList();
}

private record AnalysisParticipant(
String nodeId,
String robotId // for internal communication usage
) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
import org.mockito.Mockito;
import org.springframework.boot.autoconfigure.security.reactive.ReactiveSecurityAutoConfiguration;
import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.context.ApplicationContext;
import org.springframework.http.HttpStatus;
import org.springframework.test.context.bean.override.mockito.MockitoBean;
import org.springframework.test.web.reactive.server.WebTestClient;
import org.testcontainers.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import reactor.core.publisher.Flux;
Expand All @@ -30,7 +30,7 @@ public final class DiscoveryControllerIT {
private static final String ANALYSIS_ID = "ana-123";
private static final ObjectMapper JSON = new ObjectMapper();

@MockBean
@MockitoBean
private DiscoveryService mockedDiscoveryService;

private WebTestClient client;
Expand Down Expand Up @@ -96,8 +96,8 @@ void returns500OnUnknownError() {
@Test
void returns200WithAllDiscoveredParticipants() throws IOException {
var participants = List.of(
new Participant("123", ParticipantType.AGGREGATOR),
new Participant("456", ParticipantType.DEFAULT)
new Participant("123", "abc", ParticipantType.AGGREGATOR),
new Participant("456", "def", ParticipantType.DEFAULT)
);
Mockito.doReturn(Flux.fromIterable(participants))
.when(mockedDiscoveryService)
Expand All @@ -113,9 +113,9 @@ void returns200WithAllDiscoveredParticipants() throws IOException {

assertEquals(participants.size(), clientReceivedParticipants.size());
assertEquals(participants.getFirst().nodeType(), clientReceivedParticipants.getFirst().getNodeType());
assertEquals(participants.getFirst().nodeRobotId(), clientReceivedParticipants.getFirst().nodeId);
assertEquals(participants.getFirst().nodeId(), clientReceivedParticipants.getFirst().nodeId);
assertEquals(participants.getLast().nodeType(), clientReceivedParticipants.getLast().getNodeType());
assertEquals(participants.getLast().nodeRobotId(), clientReceivedParticipants.getLast().nodeId);
assertEquals(participants.getLast().nodeId(), clientReceivedParticipants.getLast().nodeId);
}
}

Expand Down Expand Up @@ -164,7 +164,7 @@ void returns404IfNoParticipantResemblesTheSelfNode() {

@Test
void returns200WithTheDiscoveredSelfNode() throws IOException {
var self = new Participant("robot-123", ParticipantType.AGGREGATOR);
var self = new Participant("node-123", "robot-123", ParticipantType.AGGREGATOR);
Mockito.doReturn(Mono.just(self))
.when(mockedDiscoveryService)
.discoverSelfInAnalysis(ANALYSIS_ID);
Expand All @@ -178,7 +178,7 @@ void returns200WithTheDiscoveredSelfNode() throws IOException {

assertNotNull(clientReceivedSelf);
assertEquals(self.nodeType(), clientReceivedSelf.getNodeType());
assertEquals(self.nodeRobotId(), clientReceivedSelf.getNodeId());
assertEquals(self.nodeId(), clientReceivedSelf.getNodeId());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ void returnsParticipatingAnalysisNodes() throws JsonProcessingException {
assertEquals(participatingAnalysisNodes.size(), discoveredParticipants.size());
assertEquals(participatingAnalysisNodes.getFirst().node.type,
discoveredParticipants.getFirst().nodeType().getRepresentation());
assertEquals(participatingAnalysisNodes.getFirst().node.robotId,
discoveredParticipants.getFirst().nodeRobotId());
assertEquals(participatingAnalysisNodes.getFirst().node.id,
discoveredParticipants.getFirst().nodeId());
assertEquals(participatingAnalysisNodes.getLast().node.type,
discoveredParticipants.getLast().nodeType().getRepresentation());
assertEquals(participatingAnalysisNodes.getLast().node.robotId,
discoveredParticipants.getLast().nodeRobotId());
assertEquals(participatingAnalysisNodes.getLast().node.id,
discoveredParticipants.getLast().nodeId());
}

@Test
Expand Down Expand Up @@ -154,7 +154,7 @@ void returnsSelfDiscoveredParticipatingAnalysisNode() throws JsonProcessingExcep
.setBody(JSON.writeValueAsString(mockedHubResponse)));

StepVerifier.create(discoveryService.discoverSelfInAnalysis(ANALYSIS_ID))
.expectNext(new Participant(SELF_ROBOT_ID, ParticipantType.AGGREGATOR))
.expectNext(new Participant("node-1", SELF_ROBOT_ID, ParticipantType.AGGREGATOR))
.verifyComplete();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void failsIfMessageRequestsRecipientsThatAreNotPartOfTheAnalysis() throws
.setBody(JSON.writeValueAsString(mockedHubResponse)));

var messageRequest = new MessageRequest();
messageRequest.recipients = List.of("robot-not-in-list");
messageRequest.recipients = List.of("node-not-in-list");
messageRequest.message = JsonNodeFactory.instance.objectNode();

StepVerifier.create(messageService.sendMessageToSelectedRecipients("123", messageRequest))
Expand All @@ -115,7 +115,7 @@ public void stillSucceedsIfSingleMessageCannotGetEmitted() throws JsonProcessing
var mockedHubResponse = new HubResponseContainer<>(testAnalysisNodes);

var messageRequest = new MessageRequest();
messageRequest.recipients = List.of("robot-1", "robot-2");
messageRequest.recipients = List.of("node-1", "node-2");
messageRequest.message = JsonNodeFactory.instance.objectNode();

mockWebServer.enqueue(new MockResponse().setResponseCode(HttpStatus.SC_OK)
Expand Down Expand Up @@ -143,7 +143,7 @@ public void remainingMessagesAreTriedToGetEmittedAfterPreviousOneFails() throws
var mockedHubResponse = new HubResponseContainer<>(testAnalysisNodes);

var messageRequest = new MessageRequest();
messageRequest.recipients = List.of("robot-1", "robot-2", "robot-3");
messageRequest.recipients = List.of("node-1", "node-2", "node-3");
messageRequest.message = JsonNodeFactory.instance.objectNode();

mockWebServer.enqueue(new MockResponse().setResponseCode(HttpStatus.SC_OK)
Expand Down Expand Up @@ -172,7 +172,7 @@ public void unableToSendMessageToSelf() throws JsonProcessingException {
var mockedHubResponse = new HubResponseContainer<>(testAnalysisNodes);

var messageRequest = new MessageRequest();
messageRequest.recipients = List.of("robot-1", "robot-2", SELF_ROBOT_ID);
messageRequest.recipients = List.of("node-1", "node-2", "node-3");
messageRequest.message = JsonNodeFactory.instance.objectNode();

mockWebServer.enqueue(new MockResponse().setResponseCode(HttpStatus.SC_OK)
Expand Down
Loading