Skip to content
Draft
17 changes: 11 additions & 6 deletions src/Runner.Common/ActionsRunServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,17 @@ private void CheckConnection()
public Task<AgentJobRequestMessage> GetJobMessageAsync(string id, CancellationToken cancellationToken)
{
CheckConnection();
var jobMessage = RetryRequest<AgentJobRequestMessage>(async () =>
{
return await _actionsRunServerClient.GetJobMessageAsync(id, cancellationToken);
}, cancellationToken);

return jobMessage;
return RetryRequest<AgentJobRequestMessage>(
async () =>
{
try
{
return OperationOutcome.Success(
await _actionsRunServerClient.GetJobMessageAsync(id, cancellationToken));
}
catch (OperationCanceledException) { throw; }
catch (Exception ex) { return OperationOutcome.TransientFailure<AgentJobRequestMessage>(ex.Message); }
}, cancellationToken);
}
}
}
28 changes: 15 additions & 13 deletions src/Runner.Common/BrokerServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,21 @@ public async Task<TaskAgentSession> CreateSessionAsync(TaskAgentSession session,
public Task<TaskAgentMessage> GetRunnerMessageAsync(Guid? sessionId, TaskAgentStatus status, string version, string os, string architecture, bool disableUpdate, CancellationToken cancellationToken)
{
CheckConnection();
var brokerSession = RetryRequest<TaskAgentMessage>(
async () => await _brokerHttpClient.GetRunnerMessageAsync(sessionId, version, status, os, architecture, disableUpdate, cancellationToken), cancellationToken, shouldRetry: ShouldRetryException);

return brokerSession;
return RetryRequest<TaskAgentMessage>(
async () =>
{
try
{
return OperationOutcome.Success(
await _brokerHttpClient.GetRunnerMessageAsync(sessionId, version, status, os, architecture, disableUpdate, cancellationToken));
}
catch (AccessDeniedException) { throw; }
catch (VssUnauthorizedException) { throw; }
catch (RunnerNotFoundException) { throw; }
catch (HostedRunnerDeprovisionedException) { throw; }
catch (OperationCanceledException) { throw; }
catch (Exception ex) { return OperationOutcome.TransientFailure<TaskAgentMessage>(ex.Message); }
}, cancellationToken);
}

public async Task AcknowledgeRunnerRequestAsync(string runnerRequestId, Guid? sessionId, TaskAgentStatus status, string version, string os, string architecture, CancellationToken cancellationToken)
Expand Down Expand Up @@ -106,14 +117,5 @@ public Task ForceRefreshConnection(VssCredentials credentials)
return Task.CompletedTask;
}

public bool ShouldRetryException(Exception ex)
{
if (ex is AccessDeniedException || ex is VssUnauthorizedException || ex is RunnerNotFoundException || ex is HostedRunnerDeprovisionedException)
{
return false;
}

return true;
}
}
}
47 changes: 27 additions & 20 deletions src/Runner.Common/JobServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,33 +61,40 @@ public sealed class JobServer : RunnerService, IJobServer
public async Task ConnectAsync(VssConnection jobConnection)
{
_connection = jobConnection;
int totalAttempts = 5;
int attemptCount = totalAttempts;
var configurationStore = HostContext.GetService<IConfigurationStore>();
var runnerSettings = configurationStore.GetSettings();

while (!_connection.HasAuthenticated && attemptCount-- > 0)
if (!_connection.HasAuthenticated)
{
try
{
await _connection.ConnectAsync();
break;
}
catch (Exception ex) when (attemptCount > 0)
var retryHelper = new RetryHelper(Trace, new RetryStrategy
{
Trace.Info($"Catch exception during connect. {attemptCount} attempts left.");
Trace.Error(ex);

if (runnerSettings.IsHostedServer)
MaxAttempts = 5,
GetBackoff = (attempt, _, _) => BackoffTimerHelper.GetExponentialBackoff(attempt, TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(3.2), TimeSpan.FromMilliseconds(100)),
OnRetry = (context, ex, _) =>
{
await CheckNetworkEndpointsAsync(attemptCount);
}
}

int attempt = totalAttempts - attemptCount;
TimeSpan backoff = BackoffTimerHelper.GetExponentialBackoff(attempt, TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(3.2), TimeSpan.FromMilliseconds(100));
Trace.Info($"Catch exception during connect. {context.MaxAttempts - context.AttemptNumber} attempts left.");
Trace.Error(ex);
},
});

await retryHelper.ExecuteAsync(
operationName: nameof(ConnectAsync),
operation: async context =>
{
try
{
await _connection.ConnectAsync();
}
catch
{
if (runnerSettings.IsHostedServer && context.AttemptNumber < context.MaxAttempts)
{
await CheckNetworkEndpointsAsync(context.MaxAttempts - context.AttemptNumber);
}

await Task.Delay(backoff);
throw;
}
});
}

_taskClient = _connection.GetClient<TaskHttpClient>();
Expand Down
28 changes: 16 additions & 12 deletions src/Runner.Common/LocationServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,25 @@ public sealed class LocationServer : RunnerService, ILocationServer
public async Task ConnectAsync(VssConnection jobConnection)
{
_connection = jobConnection;
int attemptCount = 5;
while (!_connection.HasAuthenticated && attemptCount-- > 0)
if (!_connection.HasAuthenticated)
{
try
var retryHelper = new RetryHelper(Trace, new RetryStrategy
{
await _connection.ConnectAsync();
break;
}
catch (Exception ex) when (attemptCount > 0)
{
Trace.Info($"Catch exception during connect. {attemptCount} attempt left.");
Trace.Error(ex);
}
MaxAttempts = 5,
GetBackoff = (_, _, _) => TimeSpan.FromMilliseconds(100),
OnRetry = (context, ex, _) =>
{
Trace.Info($"Catch exception during connect. {context.MaxAttempts - context.AttemptNumber} attempt left.");
Trace.Error(ex);
},
});

await Task.Delay(100);
await retryHelper.ExecuteAsync(
operationName: nameof(ConnectAsync),
operation: async () =>
{
await _connection.ConnectAsync();
});
}

_locationClient = _connection.GetClient<LocationHttpClient>();
Expand Down
55 changes: 33 additions & 22 deletions src/Runner.Common/ResultsServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,17 @@ public sealed class ResultServer : RunnerService, IResultsServer
private String _liveConsoleFeedUrl;
private string _token;

private ClientWebSocket CreateWebSocketClient(string accessToken)
{
var websocketClient = new ClientWebSocket();
websocketClient.Options.SetRequestHeader("Authorization", $"Bearer {accessToken}");
var userAgentValues = new List<ProductInfoHeaderValue>();
userAgentValues.AddRange(UserAgentUtility.GetDefaultRestUserAgent());
userAgentValues.AddRange(HostContext.UserAgents);
websocketClient.Options.SetRequestHeader("User-Agent", string.Join(" ", userAgentValues.Select(x => x.ToString())));
return websocketClient;
}

public void InitializeResultsClient(Uri uri, string liveConsoleFeedUrl, string token, bool useSdk)
{
this._resultsClient = CreateHttpClient(uri, token, useSdk);
Expand Down Expand Up @@ -179,42 +190,42 @@ private void InitializeWebsocketClient(string liveConsoleFeedUrl, string accessT
}

Trace.Info($"Creating websocket client ..." + liveConsoleFeedUrl);
this._websocketClient = new ClientWebSocket();
this._websocketClient.Options.SetRequestHeader("Authorization", $"Bearer {accessToken}");
var userAgentValues = new List<ProductInfoHeaderValue>();
userAgentValues.AddRange(UserAgentUtility.GetDefaultRestUserAgent());
userAgentValues.AddRange(HostContext.UserAgents);
this._websocketClient.Options.SetRequestHeader("User-Agent", string.Join(" ", userAgentValues.Select(x => x.ToString())));
this._websocketClient = CreateWebSocketClient(accessToken);

// during initialization, retry upto 3 times to setup connection
this._websocketConnectTask = ConnectWebSocketClient(liveConsoleFeedUrl, delay, retryConnection);
}

private async Task ConnectWebSocketClient(string feedStreamUrl, TimeSpan delay, bool retryConnection = false)
{
bool connected = false;
int retries = 0;

do
var retryHelper = new RetryHelper(Trace, new RetryStrategy
{
try
MaxAttempts = retryConnection ? 3 : 1,
GetBackoff = (_, _, _) => TimeSpan.Zero,
OnRetry = (_, ex, _) =>
{
Trace.Info("Exception caught during websocket client connect, retry connection.");
Trace.Error(ex);
_lastConnectionFailure = DateTime.Now;
this._websocketClient = CreateWebSocketClient(_token);
},
OnFailure = (_, _, _) =>
{
this._websocketClient = null;
_lastConnectionFailure = DateTime.Now;
},
});

await retryHelper.ExecuteAsync(
operationName: nameof(ConnectWebSocketClient),
operation: async () =>
{
Trace.Info($"Attempting to start websocket client with delay {delay}.");
await Task.Delay(delay);
using var connectTimeoutTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30));
await this._websocketClient.ConnectAsync(new Uri(feedStreamUrl), connectTimeoutTokenSource.Token);
Trace.Info($"Successfully started websocket client.");
connected = true;
}
catch (Exception ex)
{
Trace.Info("Exception caught during websocket client connect, retry connection.");
Trace.Error(ex);
retries++;
this._websocketClient = null;
_lastConnectionFailure = DateTime.Now;
}
} while (retryConnection && !connected && retries < 3);
});
}

public async Task<bool> AppendLiveConsoleFeedAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, Guid stepId, IList<string> lines, long? startLine, CancellationToken cancellationToken)
Expand Down
117 changes: 117 additions & 0 deletions src/Runner.Common/RetryBackoffs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
using System;

namespace GitHub.Runner.Common
{
public static class RetryBackoffs
{
public static Func<int, int, Exception, TimeSpan> Fixed(TimeSpan delay)
{
if (delay < TimeSpan.Zero)
{
throw new ArgumentOutOfRangeException(nameof(delay));
}

return (_, _, _) => delay;
}

public static Func<int, int, Exception, TimeSpan> Random(TimeSpan minDelay, TimeSpan maxDelay)
{
ValidateRange(minDelay, maxDelay);

return (_, _, _) => NextDelay(minDelay, maxDelay);
}

public static Func<int, int, Exception, TimeSpan> Exponential(TimeSpan minDelay, TimeSpan maxDelay, TimeSpan deltaDelay)
{
ValidateRange(minDelay, maxDelay);
if (deltaDelay < TimeSpan.Zero)
{
throw new ArgumentOutOfRangeException(nameof(deltaDelay));
}

return (attempt, _, _) =>
{
var exponent = Math.Max(0, attempt);
var additional = (Math.Pow(2.0, exponent) - 1.0) * deltaDelay.TotalMilliseconds;
var delayMs = Math.Min(minDelay.TotalMilliseconds + additional, maxDelay.TotalMilliseconds);
return TimeSpan.FromMilliseconds(Math.Max(0, delayMs));
};
}

public static Func<int, int, Exception, TimeSpan> ExponentialFullJitter(TimeSpan minDelay, TimeSpan maxDelay, TimeSpan deltaDelay)
{
ValidateRange(minDelay, maxDelay);
if (deltaDelay < TimeSpan.Zero)
{
throw new ArgumentOutOfRangeException(nameof(deltaDelay));
}

return (attempt, _, _) =>
{
var exponent = Math.Max(0, attempt);
var additional = (Math.Pow(2.0, exponent) - 1.0) * deltaDelay.TotalMilliseconds;
var capMs = Math.Min(minDelay.TotalMilliseconds + additional, maxDelay.TotalMilliseconds);
var floorMs = minDelay.TotalMilliseconds;

if (capMs <= floorMs)
{
return TimeSpan.FromMilliseconds(floorMs);
}

var jitterMs = global::System.Random.Shared.NextDouble() * (capMs - floorMs);
return TimeSpan.FromMilliseconds(floorMs + jitterMs);
};
}

public static Func<int, int, Exception, TimeSpan> DecorrelatedJitter(TimeSpan minDelay, TimeSpan maxDelay)
{
ValidateRange(minDelay, maxDelay);

var previous = minDelay;
return (_, _, _) =>
{
var upperBoundMs = Math.Min(maxDelay.TotalMilliseconds, previous.TotalMilliseconds * 3);
if (upperBoundMs <= minDelay.TotalMilliseconds)
{
previous = minDelay;
return previous;
}

var nextMs = minDelay.TotalMilliseconds + (global::System.Random.Shared.NextDouble() * (upperBoundMs - minDelay.TotalMilliseconds));
previous = TimeSpan.FromMilliseconds(nextMs);
return previous;
};
}

private static void ValidateRange(TimeSpan minDelay, TimeSpan maxDelay)
{
if (minDelay < TimeSpan.Zero)
{
throw new ArgumentOutOfRangeException(nameof(minDelay));
}

if (maxDelay < TimeSpan.Zero)
{
throw new ArgumentOutOfRangeException(nameof(maxDelay));
}

if (maxDelay < minDelay)
{
throw new ArgumentOutOfRangeException(nameof(maxDelay));
}
}

private static TimeSpan NextDelay(TimeSpan minDelay, TimeSpan maxDelay)
{
if (minDelay == maxDelay)
{
return minDelay;
}

var minMs = minDelay.TotalMilliseconds;
var rangeMs = maxDelay.TotalMilliseconds - minMs;
var jitterMs = global::System.Random.Shared.NextDouble() * rangeMs;
return TimeSpan.FromMilliseconds(minMs + jitterMs);
}
}
}
Loading