Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,27 @@
<version>4.12.0</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt-api</artifactId>
<version>0.12.6</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt-impl</artifactId>
<version>0.12.6</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt-jackson</artifactId>
<version>0.12.6</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package de.privateaim.node_message_broker.common;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import de.privateaim.node_message_broker.ConfigurationUtil;
import de.privateaim.node_message_broker.common.hub.HttpHubClient;
import de.privateaim.node_message_broker.common.hub.HttpHubClientConfig;
import de.privateaim.node_message_broker.common.hub.HubClient;
import de.privateaim.node_message_broker.common.hub.auth.HttpHubAuthClient;
import de.privateaim.node_message_broker.common.hub.auth.HttpHubAuthClientConfig;
import de.privateaim.node_message_broker.common.hub.auth.HubAuthClient;
import de.privateaim.node_message_broker.common.hub.auth.RenewAuthTokenFilter;
import de.privateaim.node_message_broker.common.hub.auth.HubOIDCAuthenticator;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
Expand All @@ -24,6 +22,9 @@
@Configuration
public class CommonSpringConfig {

private static final int EXCHANGE__MAX_RETRIES = 5;
private static final int EXCHANGE__MAX_RETRY_DELAY_MS = 1000;

@Value("${app.hub.baseUrl}")
private String hubCoreBaseUrl;

Expand All @@ -48,10 +49,16 @@ public String hubAuthRobotId() {
return hubAuthRobotId;
}

@Qualifier("HUB_EXCHANGE_RETRY_CONFIG")
@Bean
HttpRetryConfig exchangeRetryConfig() {
return new HttpRetryConfig(EXCHANGE__MAX_RETRIES, EXCHANGE__MAX_RETRY_DELAY_MS);
}

@Qualifier("HUB_CORE_WEB_CLIENT")
@Bean
public WebClient alwaysReAuthenticatedWebClient(
@Qualifier("HUB_AUTH_RENEW_TOKEN") ExchangeFilterFunction renewTokenFilter,
@Qualifier("HUB_AUTHENTICATION_MIDDLEWARE") ExchangeFilterFunction authenticationMiddleware,
@Qualifier("BASE_SSL_HTTP_CLIENT_CONNECTOR") ReactorClientHttpConnector baseSslHttpClientConnector) {
// We can't use Spring's default security mechanisms out-of-the-box here since HUB uses a non-standard grant
// type which is not supported. There's a way by using a custom grant type accompanied by a client manager.
Expand All @@ -65,19 +72,17 @@ public WebClient alwaysReAuthenticatedWebClient(
return WebClient.builder()
.uriBuilderFactory(factory)
.defaultHeaders(httpHeaders -> httpHeaders.setAccept(List.of(MediaType.APPLICATION_JSON)))
.filter(renewTokenFilter)
.filter(authenticationMiddleware)
.clientConnector(baseSslHttpClientConnector)
.build();
}

@Bean
public HubClient hubClient(@Qualifier("HUB_CORE_WEB_CLIENT") WebClient alwaysReAuthenticatedWebClient) {
var clientConfig = new HttpHubClientConfig.Builder()
.withMaxRetries(5)
.withRetryDelayMs(1000)
.build();

return new HttpHubClient(alwaysReAuthenticatedWebClient, clientConfig);
public HubClient hubClient(
@Qualifier("HUB_CORE_WEB_CLIENT") WebClient webClient,
@Qualifier("HUB_EXCHANGE_RETRY_CONFIG") HttpRetryConfig retryConfig
) {
return new HttpHubClient(webClient, retryConfig);
}

@Qualifier("HUB_AUTH_WEB_CLIENT")
Expand All @@ -91,21 +96,37 @@ WebClient hubAuthWebClient(
.build();
}

@Qualifier("HUB_AUTH_CLIENT")
@Qualifier("HUB_JSON_MAPPER")
@Bean
ObjectMapper simpleJsonMapper() {
return new ObjectMapper()
.findAndRegisterModules()
.disable(DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES)
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
}

@Qualifier("HUB_AUTHENTICATOR")
@Bean
public HubAuthClient hubAuthClient(@Qualifier("HUB_AUTH_WEB_CLIENT") WebClient webClient) {
var clientConfig = new HttpHubAuthClientConfig.Builder()
.withMaxRetries(5)
.withRetryDelayMs(1000)
OIDCAuthenticator hubAuthenticator(
@Qualifier("HUB_AUTH_WEB_CLIENT") WebClient webClient,
@Qualifier("HUB_EXCHANGE_RETRY_CONFIG") HttpRetryConfig retryConfig,
@Qualifier("HUB_AUTH_ROBOT_ID") String hubAuthRobotId,
@Qualifier("HUB_AUTH_ROBOT_SECRET") String hubAuthRobotSecret,
@Qualifier("HUB_JSON_MAPPER") ObjectMapper jsonMapper
) {
return HubOIDCAuthenticator.builder()
.usingWebClient(webClient)
.withRetryConfig(retryConfig)
.withAuthCredentials(hubAuthRobotId, hubAuthRobotSecret)
.withJsonDecoder(jsonMapper)
.build();
return new HttpHubAuthClient(webClient, clientConfig);
}

@Qualifier("HUB_AUTH_RENEW_TOKEN")
@Qualifier("HUB_AUTHENTICATION_MIDDLEWARE")
@Bean
ExchangeFilterFunction renewAuthTokenFilter(
@Qualifier("HUB_AUTH_CLIENT") HubAuthClient hubAuthClient,
@Qualifier("HUB_AUTH_ROBOT_SECRET") String hubAuthRobotSecret) {
return new RenewAuthTokenFilter(hubAuthClient, hubAuthRobotId, hubAuthRobotSecret);
ExchangeFilterFunction hubAuthenticationMiddleware(
@Qualifier("HUB_AUTHENTICATOR") OIDCAuthenticator authenticator
) {
return new OIDCAuthenticatorMiddleware(authenticator);
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package de.privateaim.node_message_broker.common.hub;
package de.privateaim.node_message_broker.common;

/**
* Additional behavioural configuration for the {@link HttpHubClient}.
* Configuration options for retrying failed HTTP requests.
*
* @param maxRetries number of maximum retries carried out by the client in case of a retryable error
* @param retryDelayMs time between retries in ms
*/
public record HttpHubClientConfig(int maxRetries, int retryDelayMs) {
public record HttpRetryConfig(int maxRetries, int retryDelayMs) {
public static final class Builder {
private int maxRetries = 5;
private int retryDelayMs = 2000;
Expand All @@ -21,7 +21,7 @@ public Builder withRetryDelayMs(int retryDelayMs) {
return this;
}

public HttpHubClientConfig build() {
public HttpRetryConfig build() {
if (maxRetries < 0) {
throw new IllegalArgumentException("maxRetries must be greater than 0");
}
Expand All @@ -30,7 +30,7 @@ public HttpHubClientConfig build() {
throw new IllegalArgumentException("retryDelayMs must be greater than 0");
}

return new HttpHubClientConfig(maxRetries, retryDelayMs);
return new HttpRetryConfig(maxRetries, retryDelayMs);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package de.privateaim.node_message_broker.common;

import org.springframework.security.oauth2.core.OAuth2Token;
import reactor.core.publisher.Mono;

/**
* Describes an OIDC compliant authenticator.
*/
public interface OIDCAuthenticator {
/**
* Authenticates against an external system.
*
* @return A pair of access and refresh token.
*/
Mono<OIDCTokenPair> authenticate();

/**
* Refreshes the authentication against an external system.
*
* @param refreshToken The refresh token to be used.
* @return A pair of access and refresh token.
*/
Mono<OIDCTokenPair> refresh(OAuth2Token refreshToken);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package de.privateaim.node_message_broker.common;

import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.security.oauth2.core.OAuth2AccessToken;
import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.ExchangeFunction;
import reactor.core.publisher.Mono;

import java.time.Instant;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static java.util.Objects.requireNonNull;

/**
* A middleware for authenticating against several different services of a single external provider.
*/
@Slf4j
public final class OIDCAuthenticatorMiddleware implements ExchangeFilterFunction {

private final Map<String, OIDCTokenPair> tokenPairByHost;
private final OIDCAuthenticator oidcAuthenticator;

public OIDCAuthenticatorMiddleware(OIDCAuthenticator oidcAuthenticator) {
this.tokenPairByHost = new ConcurrentHashMap<>();
this.oidcAuthenticator = requireNonNull(oidcAuthenticator, "OIDC authenticator must not be null");
}

@Override
public @NonNull Mono<ClientResponse> filter(@NonNull ClientRequest request, ExchangeFunction next) {
var host = request.url().getHost();
if (host == null) {
return Mono.just(ClientResponse.create(HttpStatus.BAD_REQUEST).build());
}

var authToken = computeAccessTokenForHost(host);
var authenticatedRequest = ClientRequest.from(request)
.headers(headers -> headers.setBearerAuth(authToken.getTokenValue()))
.build();

return next.exchange(authenticatedRequest).flatMap(response -> {
// Handling for unauthorized events in case of time overlaps regarding token expiration.
// Can happen if time skew is not properly handled by the authentication server.
if (response.statusCode().value() == HttpStatus.UNAUTHORIZED.value()) {
return response.releaseBody()
.then(Mono.just(computeAccessTokenForHost(host)))
.flatMap(token -> {
var newRequest = ClientRequest.from(request)
.headers(headers -> headers.setBearerAuth(token.getTokenValue()))
.build();
log.warn("retrying request to '{}' with new bearer token after receiving status code 401 " +
"(unauthorized)", request.url());
return next.exchange(newRequest);
});
} else {
return Mono.just(response);
}
});
}

private OAuth2AccessToken computeAccessTokenForHost(@NonNull String host) {
// TODO: revise - find another way to circumvent using block()
// The following is an atomic operation!
return tokenPairByHost.compute(host, (unused, tokenPair) -> {
if (tokenPair == null) {
log.info("acquiring access token for host '{}' as there is none yet", host);
return oidcAuthenticator.authenticate().block();
}

if (tokenPair.accessToken().getExpiresAt().isBefore(Instant.now())) {
return tokenPair.refreshToken()
.map(refreshToken -> {
if (refreshToken.getExpiresAt().isBefore(Instant.now())) {
log.warn("refresh token expired - acquiring new pair of access token and refresh token for " +
"host '{}'", host);
return oidcAuthenticator.authenticate().block();
} else {
log.info("refreshing access token for host '{}'", host);
return oidcAuthenticator.refresh(refreshToken).block();
}
})
.orElseGet(() -> oidcAuthenticator.authenticate().block());
}
return tokenPair;
}).accessToken();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package de.privateaim.node_message_broker.common;

import org.springframework.security.oauth2.core.OAuth2AccessToken;
import org.springframework.security.oauth2.core.OAuth2RefreshToken;

import java.util.Optional;

/**
* An OIDC compliant pair of access token & refresh token.
*
* @param accessToken JWT acting as an access token.
* @param refreshToken JWT acting as a refresh token for acquiring a new access token.
*/
public record OIDCTokenPair(
OAuth2AccessToken accessToken,
Optional<OAuth2RefreshToken> refreshToken
) {
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package de.privateaim.node_message_broker.common.hub;

import de.privateaim.node_message_broker.common.HttpRetryConfig;
import de.privateaim.node_message_broker.common.hub.api.AnalysisNode;
import de.privateaim.node_message_broker.common.hub.api.HubResponseContainer;
import de.privateaim.node_message_broker.common.hub.api.Node;
Expand Down Expand Up @@ -30,11 +31,11 @@
public final class HttpHubClient implements HubClient {

private final WebClient authenticatedWebClient;
private final HttpHubClientConfig config;
private final HttpRetryConfig retryConfig;

public HttpHubClient(WebClient authenticatedWebClient, HttpHubClientConfig config) {
public HttpHubClient(WebClient authenticatedWebClient, HttpRetryConfig retryConfig) {
this.authenticatedWebClient = requireNonNull(authenticatedWebClient, "authenticated web client must not be null");
this.config = requireNonNull(config, "config must not be null");
this.retryConfig = requireNonNull(retryConfig, "retry config must not be null");
}

// TODO: this might use a cache to cut corners and improve performance by avoiding unnecessary round-trips
Expand All @@ -59,12 +60,12 @@ public Mono<List<AnalysisNode>> fetchAnalysisNodes(String analysisId) {
.bodyToMono(new ParameterizedTypeReference<HubResponseContainer<List<AnalysisNode>>>() {
})
.map(resp -> resp.data)
.retryWhen(Retry.backoff(config.maxRetries(), Duration.ofMillis(config.retryDelayMs()))
.retryWhen(Retry.backoff(retryConfig.maxRetries(), Duration.ofMillis(retryConfig.retryDelayMs()))
.jitter(0.75)
.filter(err -> err instanceof HubCoreServerException)
.onRetryExhaustedThrow(((retryBackoffSpec, retrySignal) ->
new HubAnalysisNodesNotObtainable("exhausted maximum number of retries of '%d'"
.formatted(config.maxRetries())))));
.formatted(retryConfig.maxRetries())))));
}

// TODO: add cache here! - see spring annotations
Expand Down Expand Up @@ -110,11 +111,11 @@ public Mono<ECPublicKey> fetchPublicKey(String robotId) {
"robot id `%s`".formatted(robotId), e));
}
})
.retryWhen(Retry.backoff(config.maxRetries(), Duration.ofMillis(config.retryDelayMs()))
.retryWhen(Retry.backoff(retryConfig.maxRetries(), Duration.ofMillis(retryConfig.retryDelayMs()))
.jitter(0.75)
.filter(err -> err instanceof HubCoreServerException)
.onRetryExhaustedThrow(((retryBackoffSpec, retrySignal) ->
new HubNodePublicKeyNotObtainable("exhausted maximum number of retries of '%d'"
.formatted(config.maxRetries())))));
.formatted(retryConfig.maxRetries())))));
}
}
Loading
Loading