diff --git a/.gitignore b/.gitignore index 103423a..914de3f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,6 @@ HELP.md +project_overview.md + target/ !.mvn/wrapper/maven-wrapper.jar !**/src/main/**/target/ @@ -34,3 +36,13 @@ system-tests/environment/node/node-docker-compose.yml system-tests/environment/resources/secrets/ !system-tests/environment/resources/secrets/setup-secrets.sh system-tests/tests/mb-test-cli +system-tests/test-setup/* +system-tests/curl + +message-broker-sink-server + +# secrets +*.crt +.env* +privatekey*.pem +*secret*.txt diff --git a/dev/docker-compose.yml b/dev/docker-compose.yml index f737ddd..03abd28 100644 --- a/dev/docker-compose.yml +++ b/dev/docker-compose.yml @@ -4,7 +4,7 @@ services: image: mongo:7.0.5@sha256:fcde2d71bf00b592c9cabab1d7d01defde37d69b3d788c53c3bc7431b6b15de8 ports: - "17017:27017" - restart: always + restart: unless-stopped environment: MONGO_INITDB_DATABASE: "message-broker-db" TZ: "Europe/Berlin" diff --git a/src/main/java/de/privateaim/node_message_broker/common/hub/auth/HubOIDCAuthenticator.java b/src/main/java/de/privateaim/node_message_broker/common/hub/auth/HubOIDCAuthenticator.java index 5b97603..b4f3c5f 100644 --- a/src/main/java/de/privateaim/node_message_broker/common/hub/auth/HubOIDCAuthenticator.java +++ b/src/main/java/de/privateaim/node_message_broker/common/hub/auth/HubOIDCAuthenticator.java @@ -65,12 +65,7 @@ public Mono authenticate() { .with("secret", clientSecret)) .retrieve() .onStatus(HttpStatusCode::is5xxServerError, - response -> { - var err = new HubAuthException("could not fetch hub access token"); - - log.warn("retrying token request after failed attempt", err); - return Mono.error(err); - }) + response -> Mono.error(new HubAuthException("could not fetch hub access token"))) .bodyToMono(HubAuthTokenResponse.class) .flatMap(hubAuthTokenResponse -> { try { @@ -83,12 +78,15 @@ public Mono authenticate() { "trying to fetch access token", e)); } }) - .doOnError(err -> log.error(err.getMessage(), err)) + .doOnError(err -> { + log.error("authentication request failed: {}", err.getMessage()); + log.debug("authentication request error details", err); + }) .retryWhen(Retry.backoff(retryConfig.maxRetries(), Duration.ofMillis(retryConfig.retryDelayMs())) .jitter(0.75) .filter(err -> err instanceof HubAuthException) - .onRetryExhaustedThrow(((retryBackoffSpec, retrySignal) -> - new HubAccessTokenNotObtainable("exhausted maximum retries of '%d'" + .onRetryExhaustedThrow(((retryBackoffSpec, + retrySignal) -> new HubAccessTokenNotObtainable("exhausted maximum retries of '%d'" .formatted(retryConfig.maxRetries()))))); } @@ -102,12 +100,8 @@ public Mono refresh(OAuth2Token refreshToken) { .with("refresh_token", refreshToken.getTokenValue())) .retrieve() .onStatus(HttpStatusCode::is5xxServerError, - response -> { - var err = new HubAuthException("could not fetch new access token using refresh token"); - - log.warn("retrying token refresh request after failed attempt", err); - return Mono.error(err); - }) + response -> Mono + .error(new HubAuthException("could not fetch new access token using refresh token"))) .bodyToMono(HubAuthTokenResponse.class) .flatMap(hubAuthTokenResponse -> { try { @@ -120,12 +114,15 @@ public Mono refresh(OAuth2Token refreshToken) { "trying to refresh access token", e)); } }) - .doOnError(err -> log.error(err.getMessage(), err)) + .doOnError(err -> { + log.error("token refresh request failed: {}", err.getMessage()); + log.debug("token refresh error details", err); + }) .retryWhen(Retry.backoff(retryConfig.maxRetries(), Duration.ofMillis(retryConfig.retryDelayMs())) .jitter(0.75) .filter(err -> err instanceof HubAuthException) - .onRetryExhaustedThrow(((retryBackoffSpec, retrySignal) -> - new HubAccessTokenNotObtainable("exhausted maximum retries of '%d'" + .onRetryExhaustedThrow(((retryBackoffSpec, + retrySignal) -> new HubAccessTokenNotObtainable("exhausted maximum retries of '%d'" .formatted(retryConfig.maxRetries()))))); } @@ -211,7 +208,6 @@ public Instant getExpiresAt() { } } - public static Builder builder() { return new Builder(); } diff --git a/src/main/java/de/privateaim/node_message_broker/message/MessageSpringConfig.java b/src/main/java/de/privateaim/node_message_broker/message/MessageSpringConfig.java index 6a62250..0205450 100644 --- a/src/main/java/de/privateaim/node_message_broker/message/MessageSpringConfig.java +++ b/src/main/java/de/privateaim/node_message_broker/message/MessageSpringConfig.java @@ -90,7 +90,7 @@ OkHttpClient decoratedSocketBaseClient(@Qualifier("COMMON_JAVA_SSL_CONTEXT") SSL @Qualifier("COMMON_TRUST_MANAGER_FACTORY") TrustManagerFactory tmf) { var clientBuilder = new OkHttpClient.Builder() - .readTimeout(1, TimeUnit.MINUTES); + .pingInterval(4, TimeUnit.SECONDS); decorateClientWithSSLContext(clientBuilder, sslCtx, tmf); decorateClientWithProxySettings(clientBuilder); @@ -146,47 +146,103 @@ public Socket underlyingMessengerSocket( @Qualifier("HUB_AUTHENTICATOR") OIDCAuthenticator hubAuthenticator, @Qualifier("HUB_MESSAGE_RECEIVER") MessageReceiver messageReceiver, @Qualifier("HUB_MESSENGER_UNDERLYING_SOCKET_SECURE_CLIENT") OkHttpClient secureBaseClient) { + + URI messengerUri = URI.create(hubMessengerBaseUrl); + + String socketHost = messengerUri.getScheme() + "://" + messengerUri.getHost(); + if (messengerUri.getPort() != -1) { + socketHost += ":" + messengerUri.getPort(); + } + String socketPath = "/socket.io/"; + if (messengerUri.getPath() != null && !messengerUri.getPath().isEmpty() + && !messengerUri.getPath().equals("/")) { + socketPath = messengerUri.getPath() + "/socket.io/"; + } + IO.Options options = IO.Options.builder() - .setPath(null) + .setPath(socketPath) .setAuth(new HashMap<>()) + // Configure robust reconnection: infinite attempts with exponential backoff + .setReconnection(true) + .setReconnectionAttempts(Integer.MAX_VALUE) + .setReconnectionDelay(1000) // Start with 1 second delay + .setReconnectionDelayMax(30000) // Max 30 seconds between attempts + .setRandomizationFactor(0.5) // Add jitter to prevent thundering herd .build(); - // this is used for SSL backed connections that need to trust additional certificates + // this is used for SSL backed connections that need to trust additional + // certificates options.callFactory = secureBaseClient; options.webSocketFactory = secureBaseClient; - final Socket socket = IO.socket(URI.create(hubMessengerBaseUrl), options); + // socket.io expects the base URL to be without the path + final Socket socket = IO.socket(URI.create(socketHost), options); + log.info("created socket for hub messenger at `{}`", socketHost); socket.on(Socket.EVENT_CONNECT_ERROR, objects -> { - log.error("cannot connect to hub messenger at `{}`", hubMessengerBaseUrl); + String errorMsg = objects.length > 0 ? objects[0].toString() : "unknown error"; + log.error("cannot connect to hub messenger at `{}` - error: {}", hubMessengerBaseUrl, errorMsg); - // we block here since this is a crucial component - var oidcTokenPair = hubAuthenticator.authenticate().block(); - if (oidcTokenPair == null) { - throw new RuntimeException("authentication failed - cannot connect to hub messenger at `%s`" - .formatted(hubMessengerBaseUrl)); + // Try to refresh the authentication token and reconnect + // IMPORTANT: Do not throw exceptions here - it would kill the EventThread and stop reconnection + try { + var oidcTokenPair = hubAuthenticator.authenticate().block(); + if (oidcTokenPair != null) { + options.auth.put("token", oidcTokenPair.accessToken().getTokenValue()); + log.info("refreshed authentication token, reconnecting to hub messenger"); + socket.connect(); + } else { + log.warn("authentication returned null - will retry on next reconnection attempt"); + } + } catch (Exception e) { + log.warn("failed to refresh authentication token - will retry on next reconnection attempt: {}", + e.getMessage()); } - options.auth.put("token", oidcTokenPair.accessToken().getTokenValue()); - - log.info("reconnecting to hub messenger at `{}` with new authentication token", hubMessengerBaseUrl); - socket.connect(); }); socket.on(Socket.EVENT_CONNECT, objects -> log.info("connected to hub messenger at `{}`", hubMessengerBaseUrl)); - socket.io().on(Manager.EVENT_RECONNECT_ATTEMPT, - objects -> { - log.info("trying to reconnect to hub messenger via socket at `{}", hubMessengerBaseUrl); - // we block here since this is a crucial component - var oidcTokenPair = hubAuthenticator.authenticate().block(); - if (oidcTokenPair == null) { - throw new RuntimeException("authentication failed - cannot connect to hub messenger at `%s`" - .formatted(hubMessengerBaseUrl)); - } + socket.on(Socket.EVENT_DISCONNECT, objects -> { + String reason = objects.length > 0 ? objects[0].toString() : "unknown"; + log.warn("disconnected from hub messenger at `{}` - reason: {}", hubMessengerBaseUrl, reason); + }); + socket.io().on(Manager.EVENT_RECONNECT_ATTEMPT, objects -> { + int attemptNumber = objects.length > 0 ? (int) objects[0] : -1; + log.info("reconnection attempt #{} to hub messenger at `{}`", attemptNumber, hubMessengerBaseUrl); + + // Try to refresh the authentication token before reconnecting + // IMPORTANT: Do not throw exceptions here - it would kill the EventThread and stop reconnection + try { + var oidcTokenPair = hubAuthenticator.authenticate().block(); + if (oidcTokenPair != null) { options.auth.put("token", oidcTokenPair.accessToken().getTokenValue()); - }); + log.debug("refreshed authentication token for reconnection attempt #{}", attemptNumber); + } else { + log.warn("authentication returned null on attempt #{} - will use existing token", attemptNumber); + } + } catch (Exception e) { + log.warn("failed to refresh authentication token on attempt #{} - will use existing token: {}", + attemptNumber, e.getMessage()); + } + }); + + socket.io().on(Manager.EVENT_RECONNECT, objects -> { + int attemptNumber = objects.length > 0 ? (int) objects[0] : -1; + log.info("successfully reconnected to hub messenger at `{}` after {} attempts", + hubMessengerBaseUrl, attemptNumber); + }); + + socket.io().on(Manager.EVENT_RECONNECT_ERROR, objects -> { + String errorMsg = objects.length > 0 ? objects[0].toString() : "unknown error"; + log.warn("reconnection error to hub messenger at `{}`: {}", hubMessengerBaseUrl, errorMsg); + }); + + socket.io().on(Manager.EVENT_ERROR, objects -> { + String errorMsg = objects.length > 0 ? objects[0].toString() : "unknown error"; + log.error("socket manager error for hub messenger at `{}`: {}", hubMessengerBaseUrl, errorMsg); + }); socket.on(SOCKET_RECEIVE_HUB_MESSAGE_IDENTIFIER, objects -> { log.debug("processing incoming message"); @@ -196,6 +252,20 @@ public Socket underlyingMessengerSocket( } ); + // Get auth token before first connection attempt to avoid an expected initial failure + try { + var oidcTokenPair = hubAuthenticator.authenticate().block(); + if (oidcTokenPair != null) { + options.auth.put("token", oidcTokenPair.accessToken().getTokenValue()); + log.info("obtained initial authentication token for hub messenger"); + } else { + log.warn("initial authentication returned null - will authenticate on first connection error"); + } + } catch (Exception e) { + log.warn("failed to obtain initial authentication token - will authenticate on first connection error: {}", + e.getMessage()); + } + socket.connect(); return socket; }