diff --git a/SSMP/Api/Client/Networking/ClientAddonNetworkReceiver.cs b/SSMP/Api/Client/Networking/ClientAddonNetworkReceiver.cs index 97d0ca4c..37c5bb63 100644 --- a/SSMP/Api/Client/Networking/ClientAddonNetworkReceiver.cs +++ b/SSMP/Api/Client/Networking/ClientAddonNetworkReceiver.cs @@ -1,7 +1,9 @@ using System; using System.Collections.Generic; +using SSMP.Api.Networking; using SSMP.Collection; using SSMP.Networking.Packet; +using SSMP.Networking.Packet.Connection; using SSMP.Networking.Packet.Update; namespace SSMP.Api.Client.Networking; @@ -55,10 +57,12 @@ public void CommitPacketHandlers() { } // Assign the addon packet info in the dictionary of the client update packet - ClientUpdatePacket.AddonPacketInfoDict[ClientAddon.Id.Value] = new AddonPacketInfo( + var addonPacketInfo = new AddonPacketInfo( PacketInstantiator!, PacketIdSize ); + ClientUpdatePacket.AddonPacketInfoDict[ClientAddon.Id.Value] = addonPacketInfo; + ClientConnectionPacket.AddonPacketInfoDict[ClientAddon.Id.Value] = addonPacketInfo; foreach (var idHandlerPair in PacketHandlers) { PacketManager.RegisterClientAddonUpdatePacketHandler( diff --git a/SSMP/Api/Client/Networking/ClientAddonNetworkSender.cs b/SSMP/Api/Client/Networking/ClientAddonNetworkSender.cs index 1b74cd95..ea19d1b5 100644 --- a/SSMP/Api/Client/Networking/ClientAddonNetworkSender.cs +++ b/SSMP/Api/Client/Networking/ClientAddonNetworkSender.cs @@ -1,4 +1,5 @@ using System; +using SSMP.Api.Networking; using SSMP.Networking.Client; using SSMP.Networking.Packet; @@ -61,7 +62,8 @@ public void SendSingleData(TPacketId packetId, IPacketData packetData) { if (!PacketIdLookup.TryGetValue(packetId, out var idValue)) { throw new InvalidOperationException( - InvalidPacketIdMsg); + InvalidPacketIdMsg + ); } if (!_clientAddon.Id.HasValue) { @@ -87,7 +89,8 @@ TPacketData packetData if (!PacketIdLookup.TryGetValue(packetId, out var idValue)) { throw new InvalidOperationException( - InvalidPacketIdMsg); + InvalidPacketIdMsg + ); } if (!_clientAddon.Id.HasValue) { @@ -101,4 +104,32 @@ TPacketData packetData packetData ); } + + /// + public void SendChunkData(TPacketId packetId, IPacketData packetData) { + var (idValue, addonId) = ValidateCommon(packetId); + _netClient.UpdateManager.SendChunkPacket( + ChunkAddonPacketBuilder.BuildServerBound(idValue, addonId, packetData) + ); + } + + /// + /// Validates the common client-side preconditions required before sending chunk data. + /// + /// The addon packet identifier to validate and resolve. + /// + /// The resolved packet ID byte value and the current addon ID. + /// + /// + /// Thrown if the client is not connected, the addon has no assigned ID, or the packet ID is invalid. + /// + private (byte idValue, byte addonId) ValidateCommon(TPacketId packetId) { + if (!_netClient.IsConnected) { + throw new InvalidOperationException(NotConnectedMsg); + } + + return !_clientAddon.Id.HasValue + ? throw new InvalidOperationException(NoClientAddonId) + : (ResolvePacketId(packetId, InvalidPacketIdMsg), _clientAddon.Id.Value); + } } diff --git a/SSMP/Api/Client/Networking/IClientAddonNetworkSender.cs b/SSMP/Api/Client/Networking/IClientAddonNetworkSender.cs index 6705438c..91b066ce 100644 --- a/SSMP/Api/Client/Networking/IClientAddonNetworkSender.cs +++ b/SSMP/Api/Client/Networking/IClientAddonNetworkSender.cs @@ -29,4 +29,12 @@ void SendCollectionData( TPacketId packetId, TPacketData packetData ) where TPacketData : IPacketData, new(); + + /// + /// Send a single instance of IPacketData over the network through the chunk system with the given packet ID. + /// This should be used for large packets (exceeding 64 KiB). + /// + /// The packet ID. + /// An instance of IPacketData to send. + void SendChunkData(TPacketId packetId, IPacketData packetData); } diff --git a/SSMP/Api/Client/Networking/INetClient.cs b/SSMP/Api/Client/Networking/INetClient.cs index 725c003d..5ca7d9f6 100644 --- a/SSMP/Api/Client/Networking/INetClient.cs +++ b/SSMP/Api/Client/Networking/INetClient.cs @@ -6,7 +6,7 @@ namespace SSMP.Api.Client.Networking; /// /// The net client for all network-related interaction. /// -public interface INetClient { +public interface INetClient : IDisposable { /// /// Whether the client is currently connected to a server. /// diff --git a/SSMP/Api/Client/Networking/AddonNetworkTransmitter.cs b/SSMP/Api/Networking/AddonNetworkTransmitter.cs similarity index 60% rename from SSMP/Api/Client/Networking/AddonNetworkTransmitter.cs rename to SSMP/Api/Networking/AddonNetworkTransmitter.cs index 892d57bf..0c69fd2e 100644 --- a/SSMP/Api/Client/Networking/AddonNetworkTransmitter.cs +++ b/SSMP/Api/Networking/AddonNetworkTransmitter.cs @@ -1,7 +1,7 @@ using System; using SSMP.Collection; -namespace SSMP.Api.Client.Networking; +namespace SSMP.Api.Networking; /// /// Static class for addon network transmitters. @@ -37,9 +37,19 @@ internal abstract class AddonNetworkTransmitter where TPacketId : Enu /// /// A lookup for packet IDs and corresponding raw byte values. /// - protected readonly BiLookup PacketIdLookup; + protected readonly BiLookup PacketIdLookup = + AddonNetworkTransmitter.ConstructPacketIdLookup(); - protected AddonNetworkTransmitter() { - PacketIdLookup = AddonNetworkTransmitter.ConstructPacketIdLookup(); + /// + /// Resolve the given addon packet ID to its wire-format byte value. + /// + /// The enum packet ID to resolve. + /// Exception message used when the packet ID is unknown. + /// The byte value for the packet ID. + /// Thrown when the packet ID is not part of the lookup. + protected byte ResolvePacketId(TPacketId packetId, string invalidPacketIdMessage) { + return !PacketIdLookup.TryGetValue(packetId, out var idValue) + ? throw new InvalidOperationException(invalidPacketIdMessage) + : idValue; } } diff --git a/SSMP/Api/Server/Networking/IServerAddonNetworkSender.cs b/SSMP/Api/Server/Networking/IServerAddonNetworkSender.cs index a8b28157..45d0d8e1 100644 --- a/SSMP/Api/Server/Networking/IServerAddonNetworkSender.cs +++ b/SSMP/Api/Server/Networking/IServerAddonNetworkSender.cs @@ -78,4 +78,33 @@ void BroadcastCollectionData( TPacketId packetId, TPacketData packetData ) where TPacketData : IPacketData, new(); + + /// + /// Send a single instance of IPacketData with the given packet ID over the network to the player + /// with the given ID through the chunk system. + /// This should be used for large packets (exceeding 64 KiB). + /// + /// The packet ID. + /// An instance of IPacketData to send. + /// The ID of the player. + void SendChunkData(TPacketId packetId, IPacketData packetData, ushort playerId); + + /// + /// Send a single instance of IPacketData with the given packet ID over the network to the players + /// with the given IDs through the chunk system. + /// This should be used for large packets (exceeding 64 KiB). + /// + /// The packet ID. + /// An instance of IPacketData to send. + /// The IDs of the players. + void SendChunkData(TPacketId packetId, IPacketData packetData, params ushort[] playerIds); + + /// + /// Send a single instance of IPacketData with the given packet ID over the network to all connected + /// players through the chunk system. + /// This should be used for large packets (exceeding 64 KiB). + /// + /// The packet ID. + /// An instance of IPacketData to send. + void BroadcastChunkData(TPacketId packetId, IPacketData packetData); } diff --git a/SSMP/Api/Server/Networking/ServerAddonNetworkReceiver.cs b/SSMP/Api/Server/Networking/ServerAddonNetworkReceiver.cs index dd7a5c3c..41d7edce 100644 --- a/SSMP/Api/Server/Networking/ServerAddonNetworkReceiver.cs +++ b/SSMP/Api/Server/Networking/ServerAddonNetworkReceiver.cs @@ -1,5 +1,5 @@ using System; -using SSMP.Api.Client.Networking; +using SSMP.Api.Networking; using SSMP.Networking.Packet; namespace SSMP.Api.Server.Networking; @@ -42,10 +42,7 @@ PacketManager packetManager /// public void RegisterPacketHandler(TPacketId packetId, Action handler) { - if (!PacketIdLookup.TryGetValue(packetId, out var idValue)) { - throw new InvalidOperationException( - InvalidPacketIdMsg); - } + var idValue = ResolvePacketId(packetId, InvalidPacketIdMsg); if (!_serverAddon.Id.HasValue) { throw new InvalidOperationException(NoAddonIdMsg); @@ -61,10 +58,7 @@ public void RegisterPacketHandler(TPacketId packetId, Action handler) { /// public void RegisterPacketHandler(TPacketId packetId, GenericServerPacketHandler handler) where TPacketData : IPacketData { - if (!PacketIdLookup.TryGetValue(packetId, out var idValue)) { - throw new InvalidOperationException( - InvalidPacketIdMsg); - } + var idValue = ResolvePacketId(packetId, InvalidPacketIdMsg); if (!_serverAddon.Id.HasValue) { throw new InvalidOperationException(NoAddonIdMsg); @@ -79,10 +73,10 @@ public void RegisterPacketHandler(TPacketId packetId, /// public void DeregisterPacketHandler(TPacketId packetId) { - if (!PacketIdLookup.TryGetValue(packetId, out var idValue)) { - throw new InvalidOperationException( - "Given packet ID was not part of enum when creating this network receiver"); - } + var idValue = ResolvePacketId( + packetId, + "Given packet ID was not part of enum when creating this network receiver" + ); if (!_serverAddon.Id.HasValue) { throw new InvalidOperationException(NoAddonIdMsg); diff --git a/SSMP/Api/Server/Networking/ServerAddonNetworkSender.cs b/SSMP/Api/Server/Networking/ServerAddonNetworkSender.cs index 50e1831d..6ef21d5b 100644 --- a/SSMP/Api/Server/Networking/ServerAddonNetworkSender.cs +++ b/SSMP/Api/Server/Networking/ServerAddonNetworkSender.cs @@ -1,5 +1,5 @@ using System; -using SSMP.Api.Client.Networking; +using SSMP.Api.Networking; using SSMP.Networking.Packet; using SSMP.Networking.Server; @@ -62,7 +62,8 @@ public void SendSingleData(TPacketId packetId, IPacketData packetData, ushort pl if (!PacketIdLookup.TryGetValue(packetId, out var idValue)) { throw new InvalidOperationException( - PacketIdInvalidExceptionMsg); + PacketIdInvalidExceptionMsg + ); } var updateManager = _netServer.GetUpdateManagerForClient(playerId); @@ -97,7 +98,8 @@ public void BroadcastSingleData(TPacketId packetId, IPacketData packetData) { if (!PacketIdLookup.TryGetValue(packetId, out var idValue)) { throw new InvalidOperationException( - PacketIdInvalidExceptionMsg); + PacketIdInvalidExceptionMsg + ); } if (!_serverAddon.Id.HasValue) { @@ -105,13 +107,14 @@ public void BroadcastSingleData(TPacketId packetId, IPacketData packetData) { } _netServer.SetDataForAllClients(updateManager => { - updateManager?.SetAddonData( - _serverAddon.Id.Value, - idValue, - _packetIdSize, - packetData - ); - }); + updateManager?.SetAddonData( + _serverAddon.Id.Value, + idValue, + _packetIdSize, + packetData + ); + } + ); } /// @@ -126,7 +129,8 @@ ushort playerId if (!PacketIdLookup.TryGetValue(packetId, out var idValue)) { throw new InvalidOperationException( - PacketIdInvalidExceptionMsg); + PacketIdInvalidExceptionMsg + ); } var updateManager = _netServer.GetUpdateManagerForClient(playerId); @@ -168,7 +172,8 @@ TPacketData packetData if (!PacketIdLookup.TryGetValue(packetId, out var idValue)) { throw new InvalidOperationException( - PacketIdInvalidExceptionMsg); + PacketIdInvalidExceptionMsg + ); } if (!_serverAddon.Id.HasValue) { @@ -176,12 +181,69 @@ TPacketData packetData } _netServer.SetDataForAllClients(updateManager => { - updateManager?.SetAddonDataAsCollection( - _serverAddon.Id.Value, - idValue, - _packetIdSize, - packetData - ); - }); + updateManager?.SetAddonDataAsCollection( + _serverAddon.Id.Value, + idValue, + _packetIdSize, + packetData + ); + } + ); + } + + /// + public void SendChunkData(TPacketId packetId, IPacketData packetData, ushort playerId) { + var (idValue, addonId) = ValidateCommon(packetId); + + var updateManager = _netServer.GetUpdateManagerForClient(playerId); + if (updateManager == null) { + throw new InvalidOperationException($"Player with ID '{playerId}' is not connected"); + } + + updateManager.SendChunkPacket( + ChunkAddonPacketBuilder.BuildClientBound(idValue, addonId, packetData) + ); + } + + /// + public void SendChunkData(TPacketId packetId, IPacketData packetData, params ushort[] playerIds) { + var (idValue, addonId) = ValidateCommon(packetId); + var packet = ChunkAddonPacketBuilder.BuildClientBound(idValue, addonId, packetData); + + foreach (var playerId in playerIds) { + var updateManager = _netServer.GetUpdateManagerForClient(playerId); + if (updateManager == null) { + throw new InvalidOperationException($"Player with ID '{playerId}' is not connected"); + } + + updateManager.SendChunkPacket(packet); + } + } + + /// + public void BroadcastChunkData(TPacketId packetId, IPacketData packetData) { + var (idValue, addonId) = ValidateCommon(packetId); + var packet = ChunkAddonPacketBuilder.BuildClientBound(idValue, addonId, packetData); + _netServer.SetDataForAllClients(updateManager => updateManager?.SendChunkPacket(packet)); + } + + /// + /// Validates the common server-side preconditions required before sending chunk data. + /// + /// The addon packet identifier to validate and resolve. + /// + /// The resolved packet ID byte value and the current addon ID. + /// + /// + /// Thrown if the server is not started, the addon has no assigned ID, or the packet ID is invalid. + /// + private (byte idValue, byte addonId) ValidateCommon(TPacketId packetId) { + if (!_netServer.IsStarted) { + throw new InvalidOperationException(ServerNotStartedExceptionMsg); + } + + return !_serverAddon.Id.HasValue + ? throw new InvalidOperationException(NoAddonIdMsg) + : (ResolvePacketId(packetId, PacketIdInvalidExceptionMsg), _serverAddon.Id.Value); } } diff --git a/SSMP/Networking/Chunk/ChunkReceiver.cs b/SSMP/Networking/Chunk/ChunkReceiver.cs index 22d7c125..94745934 100644 --- a/SSMP/Networking/Chunk/ChunkReceiver.cs +++ b/SSMP/Networking/Chunk/ChunkReceiver.cs @@ -1,4 +1,5 @@ using System; +using System.Diagnostics; using SSMP.Networking.Packet.Data; namespace SSMP.Networking.Chunk; @@ -16,39 +17,67 @@ namespace SSMP.Networking.Chunk; /// slices. Uses delegate injection instead of inheritance for flexibility. /// internal sealed class ChunkReceiver { + /// + /// The number of milliseconds after which an incomplete chunk is considered stale and reset. + /// + private const int ReceiveTimeoutMillis = 5000; + + /// + /// Lock object for synchronizing access to shared receiver state. + /// + private readonly object _stateLock = new(); + /// /// Boolean array where each value indicates whether the slice of the same index was received. + /// Allocated dynamically for the current chunk. /// - private readonly bool[] _received; + private bool[]? _received; + /// - /// Byte array that contains (parts of) the chunk data that is received. + /// Array of byte array segments representing the received slices of the current chunk. + /// Allocated dynamically for the current chunk. /// - private readonly byte[] _chunkData; + private byte[][]? _sliceSegments; /// /// Whether we are currently receiving a chunk. If not, receiving a slice containing a chunk ID that is one higher /// that the last received chunk will start the reception process again. /// private bool _isReceiving; + /// /// The currently (if receiving) or last received (when not receiving) chunk ID. + /// Null if no chunk has been received yet. /// - private byte _chunkId = 255; + private byte? _chunkId; + /// /// The size of the chunk that we are currently receiving. Only calculated when the last slice is received, since /// that is the only slice with a different slice size. /// private int _chunkSize; + /// /// The number of slices that the chunk we are currently receiving contains. Set whenever we receive the first /// slice in a chunk. /// private int _numSlices; + /// /// The number of slices we have received so far. Used to keep track when all slices are received. /// private int _numReceivedSlices; + /// + /// Timestamp of when the last slice for the current chunk was received. + /// + private long _lastReceiveTimestamp; + + /// + /// The maximum chunk size currently accepted by this receiver. + /// + public int MaxAllowedChunkSize { get; set; } = ConnectionManager.MaxChunkSize; + /// /// Event that is called when the entirety of a chunk is received. /// @@ -65,9 +94,6 @@ internal sealed class ChunkReceiver { /// Delegate to call when sending acknowledgement data. public ChunkReceiver(SetSliceAckDataDelegate setSliceAckData) { _setSliceAckData = setSliceAckData ?? throw new ArgumentNullException(nameof(setSliceAckData)); - - _received = new bool[ConnectionManager.MaxSlicesPerChunk]; - _chunkData = new byte[ConnectionManager.MaxChunkSize]; } /// @@ -78,78 +104,147 @@ public ChunkReceiver(SetSliceAckDataDelegate setSliceAckData) { /// /// The received slice data. public void ProcessReceivedData(SliceData sliceData) { - //Logger.Debug($"Received slice packet: {sliceData.ChunkId}, {sliceData.SliceId}, {sliceData.NumSlices}"); + var shouldSendAck = false; + var shouldTriggerEvent = false; + var isStaleDuplicate = false; + Packet.Packet? packetToTrigger = null; - // We check if the received chunk ID is smaller than the current chunk ID accounting for wrapping IDs - if (ConnectionManager.IsWrappingIdSmaller(sliceData.ChunkId, _chunkId)) { - //Logger.Debug("Chunk ID of received slice packet is smaller than currently receiving chunk"); - return; - } + lock (_stateLock) { + //Logger.Debug($"Received slice packet: {sliceData.ChunkId}, {sliceData.SliceId}, {sliceData.NumSlices}"); - if (!_isReceiving) { - if (sliceData.ChunkId == (byte) (_chunkId + 1)) { - //Logger.Debug($"Received new chunk with ID: {sliceData.ChunkId}"); + if (_isReceiving && IsTimedOut()) { SoftReset(); - - _chunkId += 1; - _isReceiving = true; - _numSlices = sliceData.NumSlices; - } else if (sliceData.ChunkId == _chunkId) { - //Logger.Debug("Already received all slices, resending ack packet"); - SendAckData(); + _isReceiving = false; + _chunkId = null; + } + + // We check if the received chunk ID is smaller than the current chunk ID accounting for wrapping IDs + if (_chunkId.HasValue && ConnectionManager.IsWrappingIdSmaller(sliceData.ChunkId, _chunkId.Value)) { + //Logger.Debug("Chunk ID of received slice packet is smaller than currently receiving chunk"); return; - } else { - //Logger.Debug($"Received old chunk: {_chunkId}, ignoring"); + } + + // Validate slice count and ID bounds + if (sliceData.NumSlices is < 1 or > ConnectionManager.MaxSlicesPerChunk) { + Logging.Logger.Error($"Invalid slice count received: {sliceData.NumSlices}"); return; } - } else { - // If the received number of slices does not match the number slices we are keeping track of, we discard - // the slice altogether as it is likely not correct - if (_numSlices != sliceData.NumSlices) { - //Logger.Debug("Number of slices in slice packet does not correspond with local number of slices"); + + if (sliceData.SliceId >= sliceData.NumSlices) { + Logging.Logger.Error($"Invalid SliceId {sliceData.SliceId} for NumSlices {sliceData.NumSlices}"); return; } - } - if (_received[sliceData.SliceId]) { - //Logger.Debug($"Received duplicate slice: {sliceData.SliceId}, ignoring"); - return; - } + if (sliceData.Data.Length > ConnectionManager.MaxSliceSize) { + Logging.Logger.Error($"Invalid slice data length: {sliceData.Data.Length}"); + return; + } - _numReceivedSlices += 1; - _received[sliceData.SliceId] = true; - - // Copy over the data from the received slice into the chunk data array at the correct position - Array.Copy( - sliceData.Data, - 0, - _chunkData, - sliceData.SliceId * ConnectionManager.MaxSliceSize, - sliceData.Data.Length - ); - - SendAckData(); - - // If this is the last slice in the chunk, we can calculate the chunk size - if (sliceData.SliceId == _numSlices - 1) { - _chunkSize = (_numSlices - 1) * ConnectionManager.MaxSliceSize + sliceData.Data.Length; - //Logger.Debug($"Received last slice in chunk, chunk size: {_chunkSize}"); + var maxAllowedSlices = (MaxAllowedChunkSize + ConnectionManager.MaxSliceSize - 1) / + ConnectionManager.MaxSliceSize; + if (sliceData.NumSlices > maxAllowedSlices) { + Logging.Logger.Error( + $"Rejected chunk with {sliceData.NumSlices} slices because it exceeds the allowed limit of {maxAllowedSlices}." + ); + return; + } + + switch (_isReceiving) { + // Ignore slices from newer chunks while the current chunk is still incomplete. + case true when _chunkId.HasValue && sliceData.ChunkId != _chunkId.Value: + return; + case false when !_chunkId.HasValue || sliceData.ChunkId != _chunkId.Value: + //Logger.Debug($"Received new chunk with ID: {sliceData.ChunkId}"); + SoftReset(); + + _chunkId = sliceData.ChunkId; + _isReceiving = true; + _numSlices = sliceData.NumSlices; + _received = new bool[_numSlices]; + _sliceSegments = new byte[_numSlices][]; + _lastReceiveTimestamp = Stopwatch.GetTimestamp(); + break; + case false: { + if (sliceData.ChunkId == _chunkId.Value) { + //Logger.Debug("Already received all slices, resending ack packet"); + shouldSendAck = true; + isStaleDuplicate = true; + } + + break; + } + default: { + // If the received number of slices does not match the number slices we are keeping track of, we + // discard + // the slice altogether as it is likely not correct + if (_numSlices != sliceData.NumSlices) { + //Logger.Debug("Number of slices in slice packet does not correspond with local number of + // slices"); + return; + } + + break; + } + } + + if (!isStaleDuplicate) { + if (_received == null || _sliceSegments == null) return; + if (sliceData.SliceId >= _received.Length) return; + + if (_received[sliceData.SliceId]) { + //Logger.Debug($"Received duplicate slice: {sliceData.SliceId}, ignoring"); + return; + } + + _numReceivedSlices += 1; + _received[sliceData.SliceId] = true; + _lastReceiveTimestamp = Stopwatch.GetTimestamp(); + + // Store a reference to the received slice data segment directly to avoid GC LOH allocation pressure + _sliceSegments[sliceData.SliceId] = sliceData.Data; + + // Whenever the last-ID slice arrives, correct the chunk size to account for its (potentially partial) + // length. + // This must happen before the assembly check below so that out-of-order delivery is handled correctly. + if (sliceData.SliceId == _numSlices - 1) { + _chunkSize = (_numSlices - 1) * ConnectionManager.MaxSliceSize + sliceData.Data.Length; + if (_chunkSize > MaxAllowedChunkSize) { + Logging.Logger.Error($"Rejected chunk larger than allowed max size: {_chunkSize}"); + SoftReset(); + _isReceiving = false; + _chunkId = null; + return; + } + //Logger.Debug($"Corrected chunk size after receiving last-ID slice: {_chunkSize}"); + } + + shouldSendAck = true; + + if (_numReceivedSlices == _numSlices) { + var byteArray = new byte[_chunkSize]; + var offset = 0; + for (var i = 0; i < _numSlices; i++) { + var segment = _sliceSegments[i]; + Array.Copy(segment, 0, byteArray, offset, segment.Length); + offset += segment.Length; + } + + packetToTrigger = new Packet.Packet(byteArray); + + _sliceSegments = null; + shouldTriggerEvent = true; + _isReceiving = false; + } + } } - if (_numReceivedSlices == _numSlices) { - var byteArray = new byte[_chunkSize]; - Array.Copy( - _chunkData, - 0, - byteArray, - 0, - _chunkSize - ); - var packet = new Packet.Packet(byteArray); - - ChunkReceivedEvent?.Invoke(packet); + // Perform delegate invocation and event triggering outside the state lock to prevent deadlocks + if (shouldSendAck) { + SendAckData(); + } - _isReceiving = false; + if (shouldTriggerEvent && packetToTrigger != null) { + ChunkReceivedEvent?.Invoke(packetToTrigger); } } @@ -158,20 +253,36 @@ public void ProcessReceivedData(SliceData sliceData) { /// default values. /// public void Reset() { - SoftReset(); - - _isReceiving = false; - _chunkId = 255; + lock (_stateLock) { + SoftReset(); + + _isReceiving = false; + _chunkId = null; + } } /// /// Send acknowledgement data containing the boolean array of all slices that have been acknowledged thus far. /// private void SendAckData() { - var acked = new bool[_numSlices]; - Array.Copy(_received, acked, _numSlices); + var shouldSendAck = false; + byte ackChunkId = 0; + ushort ackNumSlices = 0; + bool[]? ackedSlices = null; - SetSliceAckData(_chunkId, (ushort) _numSlices, acked); + lock (_stateLock) { + if (_received != null && _chunkId.HasValue) { + shouldSendAck = true; + ackChunkId = _chunkId.Value; + ackNumSlices = (ushort) _numSlices; + ackedSlices = new bool[_numSlices]; + Array.Copy(_received, ackedSlices, _numSlices); + } + } + + if (shouldSendAck && ackedSlices != null) { + _setSliceAckData(ackChunkId, ackNumSlices, ackedSlices); + } } /// @@ -179,20 +290,24 @@ private void SendAckData() { /// slices, and number of received slices to 0. /// private void SoftReset() { - Array.Clear(_received, 0, _received.Length); + _received = null; + _sliceSegments = null; _chunkSize = 0; _numSlices = 0; _numReceivedSlices = 0; + _lastReceiveTimestamp = 0; } /// - /// Set the slice ack data using the injected delegate. + /// Checks whether the active partial chunk has timed out. /// - /// The ID of the chunk for this acknowledgement. - /// The number of slices in this chunk. - /// The boolean array containing acknowledgements of all slices. - private void SetSliceAckData(byte chunkId, ushort numSlices, bool[] acked) { - _setSliceAckData(chunkId, numSlices, acked); + private bool IsTimedOut() { + if (_lastReceiveTimestamp == 0) { + return false; + } + + var elapsedMillis = (Stopwatch.GetTimestamp() - _lastReceiveTimestamp) * 1000 / Stopwatch.Frequency; + return elapsedMillis > ReceiveTimeoutMillis; } } diff --git a/SSMP/Networking/Chunk/ChunkSender.cs b/SSMP/Networking/Chunk/ChunkSender.cs index b533146b..bb152012 100644 --- a/SSMP/Networking/Chunk/ChunkSender.cs +++ b/SSMP/Networking/Chunk/ChunkSender.cs @@ -1,4 +1,7 @@ +// ReSharper disable InconsistentlySynchronizedField + using System; +using System.Buffers; using System.Collections.Concurrent; using System.Diagnostics; using System.Threading; @@ -14,35 +17,53 @@ namespace SSMP.Networking.Chunk; /// The ID of the slice. /// The number of slices in the chunk. /// The slice data. -internal delegate void SetSliceDataDelegate(byte chunkId, byte sliceId, byte numSlices, byte[] data); +internal delegate void SetSliceDataDelegate(byte chunkId, ushort sliceId, ushort numSlices, byte[] data); /// /// Class that processes and manages chunks by sending slices of those chunks and receiving acknowledgements for those /// slices. Uses delegate injection instead of inheritance for flexibility. /// -internal sealed class ChunkSender { - /// - /// The number of milliseconds to wait between sending slices. - /// - private const int WaitMillisBetweenSlices = 20; +internal sealed class ChunkSender : IDisposable { /// /// The number of milliseconds to wait before re-sending a slice. /// private const int WaitMillisResendSlice = 100; - + + /// + /// The maximum number of slices that may be in-flight for a chunk at once. + /// + private const int MaxInFlightSlices = 64; + + /// + /// The maximum number of chunk bytes that may be in-flight at once. + /// + private const int MaxInFlightBytes = 64 * 1024; + /// /// Blocking collection of packets that need to be sent as chunks. /// private readonly BlockingCollection _toSendPackets; + /// + /// Lock object for synchronizing access to shared sender state. + /// + private readonly object _stateLock = new(); + + /// + /// Lock object for synchronizing thread and cancellation token lifecycle (Start/Stop). + /// + private readonly object _lifecycleLock = new(); + /// /// Boolean array where each value indicates whether the slice of the same index was acknowledged. + /// Allocated dynamically for the current chunk. /// - private readonly bool[] _acked; + private bool[]? _acked; + /// - /// Byte array that contains the chunk data that needs to be sent. + /// Reference to the active chunk's bytes. Stored only during active sending. /// - private readonly byte[] _chunkData; + private byte[]? _currentChunkData; /// /// Manual reset event that is used for its wait handle to time when to send the next slice. @@ -54,38 +75,79 @@ internal sealed class ChunkSender { /// acknowledgements. /// private bool _isSending; + /// /// The ID of the chunk we are currently sending. /// private byte _chunkId; + /// /// The size of the chunk we are currently sending. /// private int _chunkSize; + /// /// The number of slices of the chunk we are currently sending. /// private int _numSlices; + /// /// The number of acknowledged slices in the currently sending chunk. /// private int _numAckedSlices; + + /// + /// The number of packets enqueued for sending. Synchronized under stateLock to prevent unsynchronized concurrent collection warning. + /// + private int _queuedPacketsCount; + + /// + /// Flag indicating whether this chunk sender has been disposed. + /// + private bool _isDisposed; + /// /// The ID of the slice we are currently sending. /// private int _currentSliceId; /// - /// Array of stopwatches that keep track of the elapsed time since we have last sent the slice with the same ID. - /// If this time is smaller than a certain threshold, we do not send the slice again yet. + /// Array of millisecond timestamps (using Environment.TickCount64) representing when the slice of the same index + /// was last sent. Used to enforce resend pacing. + /// Allocated dynamically for the current chunk. + /// + private long[]? _sliceLastSentTicks; + + /// + /// Boolean array indicating whether a slice is currently in-flight. + /// + private bool[]? _sliceInFlight; + + /// + /// Lengths of all slices in the current chunk. + /// + private int[]? _sliceLengths; + + /// + /// Number of slices currently in-flight. + /// + private int _numInFlightSlices; + + /// + /// Number of bytes currently in-flight. /// - private readonly Stopwatch?[] _sliceStopwatches; + private int _numInFlightBytes; /// /// Cancellation token source for cancelling the send task. /// private CancellationTokenSource? _sendTaskTokenSource; + /// + /// Reference to the active sender thread. + /// + private Thread? _senderThread; + /// /// Event that is called when we finish sending data. This is registered internally when the /// method is called and we are waiting for the current chunk to finish sending. @@ -103,59 +165,98 @@ internal sealed class ChunkSender { /// Delegate to call when sending slice data. public ChunkSender(SetSliceDataDelegate setSliceData) { _setSliceData = setSliceData ?? throw new ArgumentNullException(nameof(setSliceData)); - - _toSendPackets = new BlockingCollection(); - - _acked = new bool[ConnectionManager.MaxSlicesPerChunk]; - _chunkData = new byte[ConnectionManager.MaxChunkSize]; - _sliceStopwatches = new Stopwatch[ConnectionManager.MaxSlicesPerChunk]; + _toSendPackets = new BlockingCollection(); _sliceWaitHandle = new ManualResetEventSlim(); } /// /// Start the chunk sender by starting the thread that manages the chunk sending. /// + /// Thrown if the ChunkSender has been disposed. public void Start() { - _sendTaskTokenSource?.Cancel(); - _sendTaskTokenSource?.Dispose(); - _sendTaskTokenSource = new CancellationTokenSource(); - - Reset(); - - new Thread(() => StartSends(_sendTaskTokenSource.Token)).Start(); + lock (_lifecycleLock) { + lock (_stateLock) { + if (_isDisposed) { + throw new ObjectDisposedException( + nameof(ChunkSender), "Cannot start a disposed ChunkSender." + ); + } + } + + _sendTaskTokenSource?.Cancel(); + if (_senderThread is { IsAlive: true } && Thread.CurrentThread != _senderThread) { + _senderThread.Join(); + } + + _sendTaskTokenSource?.Dispose(); + _sendTaskTokenSource = new CancellationTokenSource(); + + Reset(); + + var token = _sendTaskTokenSource.Token; + _senderThread = new Thread(() => StartSends(token)) { + IsBackground = true, + Name = "SSMP Chunk Sender Thread" + }; + _senderThread.Start(); + } } /// /// Stop the chunk sender by cancelling the send task. /// public void Stop() { - _sendTaskTokenSource?.Cancel(); - _sendTaskTokenSource?.Dispose(); - _sendTaskTokenSource = null; - - // Reset state to ensure clean slate on disconnect/stop - Reset(); + lock (_lifecycleLock) { + _sendTaskTokenSource?.Cancel(); + if (_senderThread is { IsAlive: true } && Thread.CurrentThread != _senderThread) { + _senderThread.Join(); + } + + _sendTaskTokenSource?.Dispose(); + _sendTaskTokenSource = null; + + // Reset state to ensure clean slate on disconnect/stop + Reset(); + } } - + + /// + /// Dispose of the chunk sender and its owned disposable resources. + /// + public void Dispose() { + Stop(); + lock (_stateLock) { + _isDisposed = true; + } + + _toSendPackets.Dispose(); + _sliceWaitHandle.Dispose(); + } + /// /// Reset the chunk sender variables to their default values. /// private void Reset() { - _isSending = false; - _chunkId = 0; - _chunkSize = 0; - _numSlices = 0; - _numAckedSlices = 0; - _currentSliceId = 0; - - for (var i = 0; i < _sliceStopwatches.Length; i++) { - _sliceStopwatches[i]?.Stop(); - _sliceStopwatches[i] = null; + lock (_stateLock) { + _isSending = false; + _chunkId = 0; + _chunkSize = 0; + _numSlices = 0; + _numAckedSlices = 0; + _numInFlightSlices = 0; + _numInFlightBytes = 0; + _queuedPacketsCount = 0; + _currentSliceId = 0; + + ReleaseArrays(); + _currentChunkData = null; + FinishSendingDataEvent = null; + + // Clear the blocking collection under stateLock to prevent concurrent enqueue desynchronization + while (_toSendPackets.TryTake(out _)) { + } } - - // Clear the blocking collection - while (_toSendPackets.TryTake(out _)) { } } /// @@ -165,28 +266,51 @@ private void Reset() { public void FinishSendingData(Action callback) { // If we aren't currently sending and the queue does not contain any packets to send, we immediately invoke // the callback and return - if (!_isSending && _toSendPackets.Count == 0) { - callback.Invoke(); - return; + var executeImmediately = false; + lock (_stateLock) { + if (!_isSending && _queuedPacketsCount == 0) { + executeImmediately = true; + } else { + // Otherwise, we register the event + // We do it like this so we can deregister the event immediately after it is called, so it doesn't + // trigger + // more than once + Action? lambda = null; + lambda = () => { + callback.Invoke(); + lock (_stateLock) { + FinishSendingDataEvent -= lambda; + } + }; + FinishSendingDataEvent += lambda; + } } - // Otherwise, we register the event - // We do it like this so we can deregister the event immediately after it is called, so it doesn't trigger - // more than once - Action? lambda = null; - lambda = () => { + if (executeImmediately) { callback.Invoke(); - FinishSendingDataEvent -= lambda; - }; - FinishSendingDataEvent += lambda; + } } /// /// Enqueue a packet to be sent as a chunk. /// /// The packet to send. + /// Thrown if the packet is null. public void EnqueuePacket(Packet.Packet packet) { - _toSendPackets.Add(packet); + if (packet == null) { + throw new ArgumentNullException(nameof(packet), "Cannot enqueue a null packet."); + } + + lock (_stateLock) { + if (_isDisposed) { + throw new ObjectDisposedException( + nameof(ChunkSender), "Cannot enqueue packets to a disposed ChunkSender." + ); + } + + _queuedPacketsCount++; + _toSendPackets.Add(packet); + } } /// @@ -199,28 +323,46 @@ public void EnqueuePacket(Packet.Packet packet) { public void ProcessReceivedData(SliceAckData sliceAckData) { //Logger.Debug($"Received slice ack packet: {sliceAckData.ChunkId}, {sliceAckData.NumSlices}"); - if (!_isSending) { - //Logger.Debug("Not sending a chunk, ignoring ack packet"); - return; - } + lock (_stateLock) { + if (!_isSending || _acked == null) { + //Logger.Debug("Not sending a chunk, ignoring ack packet"); + return; + } - if (_chunkId != sliceAckData.ChunkId) { - //Logger.Debug("Chunk ID of received ack packet does not correspond with currently sending chunk"); - return; - } + if (_chunkId != sliceAckData.ChunkId) { + //Logger.Debug("Chunk ID of received ack packet does not correspond with currently sending chunk"); + return; + } - if (_numSlices != sliceAckData.NumSlices) { - //Logger.Debug("Number of slices in ack packet does not correspond with local number of slices"); - return; - } + if (_numSlices != sliceAckData.NumSlices) { + //Logger.Debug("Number of slices in ack packet does not correspond with local number of slices"); + return; + } + + var newAckProcessed = false; + for (var i = 0; i < _numSlices; i++) { + if (!sliceAckData.Acked[i] || _acked[i]) { + continue; + } - for (var i = 0; i < _numSlices; i++) { - if (sliceAckData.Acked[i] && !_acked[i]) { _acked[i] = true; _numAckedSlices += 1; - + if (_sliceInFlight != null && _sliceInFlight[i]) { + _sliceInFlight[i] = false; + _numInFlightSlices -= 1; + if (_sliceLengths != null) { + _numInFlightBytes -= _sliceLengths[i]; + } + } + + newAckProcessed = true; + //Logger.Debug($"Received acknowledgement for slice {i}, total acked: {_numAckedSlices}"); } + + if (newAckProcessed) { + _sliceWaitHandle.Set(); + } } } @@ -235,89 +377,137 @@ public void ProcessReceivedData(SliceAckData sliceAckData) { /// /// The cancellation token for cancelling the sending process. private void StartSends(CancellationToken cancellationToken) { - while (!cancellationToken.IsCancellationRequested) { - if (_toSendPackets.Count == 0) { - FinishSendingDataEvent?.Invoke(); - } - - Packet.Packet packet; - try { - packet = _toSendPackets.Take(cancellationToken); - } catch (OperationCanceledException) { - continue; - } - - _isSending = true; - - //Logger.Debug("Successfully taken new packet from blocking collection, starting networking chunk"); - - Array.Clear(_sliceStopwatches, 0, _sliceStopwatches.Length); - Array.Clear(_acked, 0, _acked.Length); - _numAckedSlices = 0; + try { + while (!cancellationToken.IsCancellationRequested) { + Action? eventToInvoke = null; + lock (_stateLock) { + if (_queuedPacketsCount == 0) { + eventToInvoke = FinishSendingDataEvent; + } + } - var packetBytes = packet.ToArray(); + eventToInvoke?.Invoke(); - _chunkSize = packetBytes.Length; - _numSlices = _chunkSize / ConnectionManager.MaxSliceSize; - if (_chunkSize % ConnectionManager.MaxSliceSize != 0) { - _numSlices += 1; - } - - //Logger.Debug($"ChunkSize: {_chunkSize}, NumSlices: {_numSlices}"); + Packet.Packet packet; + try { + packet = _toSendPackets.Take(cancellationToken); + } catch (OperationCanceledException) { + continue; + } - // Skip over chunks that exceed the maximum size that our system can handle - if (_chunkSize > ConnectionManager.MaxChunkSize) { - Logger.Error($"Could not send packet that exceeds max chunk size: {_chunkSize}"); - continue; - } + //Logger.Debug("Successfully taken new packet from blocking collection, starting networking chunk"); - // Copy the raw bytes from the packet into the chunk data array - Array.Copy(packetBytes, _chunkData, _chunkSize); + var decremented = false; + try { + var packetBytes = packet.ToArray(); + + switch (packetBytes.Length) { + // Skip over chunks that exceed the maximum size that our system can handle + case > ConnectionManager.MaxChunkSize: { + Logger.Error($"Could not send packet that exceeds max chunk size: {packetBytes.Length}"); + lock (_stateLock) { + _queuedPacketsCount--; + decremented = true; + } + + continue; + } + case 0: { + Logger.Error("Cannot send an empty chunk packet."); + lock (_stateLock) { + _queuedPacketsCount--; + decremented = true; + } + + continue; + } + } - do { - //Logger.Debug($"Sending next slice: {_currentSliceId}"); - SendNextSlice(); + lock (_stateLock) { + _queuedPacketsCount--; + decremented = true; + _chunkSize = packetBytes.Length; + _numSlices = _chunkSize / ConnectionManager.MaxSliceSize; + if (_chunkSize % ConnectionManager.MaxSliceSize != 0) { + _numSlices += 1; + } + + _isSending = true; + _acked = ArrayPool.Shared.Rent(_numSlices); + _sliceLastSentTicks = ArrayPool.Shared.Rent(_numSlices); + _sliceInFlight = ArrayPool.Shared.Rent(_numSlices); + _sliceLengths = ArrayPool.Shared.Rent(_numSlices); + Array.Clear(_acked, 0, _numSlices); + Array.Clear(_sliceLastSentTicks, 0, _numSlices); + Array.Clear(_sliceInFlight, 0, _numSlices); + _numAckedSlices = 0; + _numInFlightSlices = 0; + _numInFlightBytes = 0; + _currentSliceId = 0; + + for (var sliceId = 0; sliceId < _numSlices; sliceId++) { + var startIndex = sliceId * ConnectionManager.MaxSliceSize; + _sliceLengths[sliceId] = System.Math.Min( + ConnectionManager.MaxSliceSize, + _chunkSize - startIndex + ); + } + + // Reference the raw bytes from the packet dynamically + _currentChunkData = packetBytes; + } + } catch (Exception ex) { + Logger.Error($"Error serializing packet for chunk sending: {ex.Message}"); + lock (_stateLock) { + if (!decremented) { + _queuedPacketsCount--; + } + + ReleaseArrays(); + _currentChunkData = null; + _isSending = false; + } - // Obtain (or create) the stopwatch for the slice and start it - var sliceStopwatch = _sliceStopwatches[_currentSliceId]; - if (sliceStopwatch == null) { - sliceStopwatch = new Stopwatch(); - _sliceStopwatches[_currentSliceId] = sliceStopwatch; + continue; } - sliceStopwatch.Restart(); + do { + _sliceWaitHandle.Reset(); - if (!TryGetNextSliceToSend()) { - //Logger.Debug($"All slices have been acked ({_numAckedSlices}), stopping sending slices"); - break; - } + if (TryGetNextSliceToSend(out var waitMillisNextSlice, out var completed)) { + SendNextSlice(); + continue; + } - long waitMillisNextSlice; - - // Get the stopwatch for this slice, and check whether we have already sent this slice not too long ago - // If so, we wait longer before resending the slice. Otherwise, we default to the normal send rate. - sliceStopwatch = _sliceStopwatches[_currentSliceId]; - if (sliceStopwatch == null) { - waitMillisNextSlice = WaitMillisBetweenSlices; - } else { - waitMillisNextSlice = WaitMillisResendSlice - sliceStopwatch.ElapsedMilliseconds; - if (waitMillisNextSlice < 0) { - waitMillisNextSlice = WaitMillisBetweenSlices; + if (completed) { + break; } - } - //Logger.Debug($"Waiting on handle for next slice: {waitMillisNextSlice}"); - try { - _sliceWaitHandle.Wait((int) waitMillisNextSlice, cancellationToken); - } catch (OperationCanceledException) { - //Logger.Debug("Wait operation was cancelled, breaking"); + try { + _sliceWaitHandle.Wait(waitMillisNextSlice, cancellationToken); + } catch (OperationCanceledException) { + break; + } + } while (!cancellationToken.IsCancellationRequested); + + if (cancellationToken.IsCancellationRequested) { break; } - } while (!cancellationToken.IsCancellationRequested); - //Logger.Debug($"Incrementing chunk ID to: {_chunkId + 1}"); - _chunkId += 1; - _isSending = false; + //Logger.Debug($"Incrementing chunk ID to: {_chunkId + 1}"); + lock (_stateLock) { + _chunkId += 1; + ReleaseArrays(); + _currentChunkData = null; + _isSending = false; + } + } + } catch (Exception ex) when (ex is not OperationCanceledException) { + Logger.Error($"Fatal error in SSMP Chunk Sender background thread: {ex.Message}\n{ex.StackTrace}"); + } finally { + lock (_stateLock) { + ReleaseArrays(); + } } } @@ -326,23 +516,46 @@ private void StartSends(CancellationToken cancellationToken) { /// data in the array and copy the data into a new array for adding to the update packet. /// private void SendNextSlice() { - var startIndex = _currentSliceId * ConnectionManager.MaxSliceSize; - - byte[] sliceBytes; + byte[]? currentChunkData; + int currentSliceId; + int numSlices; + byte chunkId; + int chunkSize; + + lock (_stateLock) { + if (_currentChunkData == null) return; + currentChunkData = _currentChunkData; + currentSliceId = _currentSliceId; + numSlices = _numSlices; + chunkId = _chunkId; + chunkSize = _chunkSize; + } + + var startIndex = currentSliceId * ConnectionManager.MaxSliceSize; + + int sliceLength; // Figure out if the start index for the next slice would exceed the chunk size. If so, the length of the slice // is less than the maximum slice size, which we need to calculate - if ((_currentSliceId + 1) * ConnectionManager.MaxSliceSize > _chunkSize) { - var length = _chunkSize - startIndex; - sliceBytes = new byte[length]; - - Array.Copy(_chunkData, startIndex, sliceBytes, 0, length); + if ((currentSliceId + 1) * ConnectionManager.MaxSliceSize > chunkSize) { + sliceLength = chunkSize - startIndex; } else { - sliceBytes = new byte[ConnectionManager.MaxSliceSize]; - - Array.Copy(_chunkData, startIndex, sliceBytes, 0, sliceBytes.Length); + sliceLength = ConnectionManager.MaxSliceSize; + } + + var sliceBytes = new byte[sliceLength]; + Array.Copy(currentChunkData, startIndex, sliceBytes, 0, sliceLength); + + lock (_stateLock) { + if (_sliceInFlight != null && !_sliceInFlight[currentSliceId]) { + _sliceInFlight[currentSliceId] = true; + _numInFlightSlices += 1; + _numInFlightBytes += sliceLength; + } + + _sliceLastSentTicks?[currentSliceId] = Stopwatch.GetTimestamp(); } - SetSliceData(_chunkId, (byte) _currentSliceId, (byte) _numSlices, sliceBytes); + _setSliceData(chunkId, (ushort) currentSliceId, (ushort) numSlices, sliceBytes); } /// @@ -351,31 +564,93 @@ private void SendNextSlice() { /// equals the number of slices in the chunk, so we don't end up in an infinite loop. /// /// True if a next slice could be found, false if all slices are acknowledged. - private bool TryGetNextSliceToSend() { - do { - // We do the check inside the loop to prevent multi-thread issues where another ack is received and - // a non-acked slice cannot be found anywhere, resulting in an infinite while loop + private bool TryGetNextSliceToSend(out int waitMillis, out bool completed) { + lock (_stateLock) { + waitMillis = Timeout.Infinite; + completed = false; + + if (_acked == null || _sliceLengths == null || _sliceInFlight == null) { + return false; + } + if (_numAckedSlices == _numSlices) { + completed = true; return false; } - _currentSliceId += 1; - if (_currentSliceId >= _numSlices) { - _currentSliceId = 0; + for (var offset = 1; offset <= _numSlices; offset++) { + var candidateSliceId = (_currentSliceId + offset) % _numSlices; + if (_acked[candidateSliceId]) { + continue; + } + + var sliceLength = _sliceLengths[candidateSliceId]; + if (_sliceInFlight[candidateSliceId]) { + if (_sliceLastSentTicks == null) { + continue; + } + + var lastSent = _sliceLastSentTicks[candidateSliceId]; + var elapsedMillis = lastSent == 0 + ? WaitMillisResendSlice + : (Stopwatch.GetTimestamp() - lastSent) * 1000 / Stopwatch.Frequency; + var remainingMillis = WaitMillisResendSlice - (int) elapsedMillis; + if (remainingMillis <= 0) { + _currentSliceId = candidateSliceId; + return true; + } + + waitMillis = waitMillis == Timeout.Infinite + ? remainingMillis + : System.Math.Min(waitMillis, remainingMillis); + continue; + } + + if (!HasWindowCapacity(sliceLength)) { + continue; + } + + _currentSliceId = candidateSliceId; + return true; } - } while (_acked[_currentSliceId]); - return true; + return false; + } + } + + /// + /// Return the rented ack and pacing arrays to the Shared ArrayPool. + /// + private void ReleaseArrays() { + if (_acked != null) { + ArrayPool.Shared.Return(_acked); + _acked = null; + } + + if (_sliceLastSentTicks != null) { + ArrayPool.Shared.Return(_sliceLastSentTicks); + _sliceLastSentTicks = null; + } + + if (_sliceInFlight != null) { + ArrayPool.Shared.Return(_sliceInFlight); + _sliceInFlight = null; + } + + if (_sliceLengths != null) { + ArrayPool.Shared.Return(_sliceLengths); + _sliceLengths = null; + } + + _numInFlightSlices = 0; + _numInFlightBytes = 0; } /// - /// Send the given slice data using the injected delegate. + /// Checks whether the sender can emit a new slice without exceeding the in-flight window. /// - /// The ID of the chunk for this slice. - /// The ID of the slice. - /// The number of slices in this chunk. - /// The byte array containing the data of the slice. - private void SetSliceData(byte chunkId, byte sliceId, byte numSlices, byte[] data) { - _setSliceData(chunkId, sliceId, numSlices, data); + private bool HasWindowCapacity(int sliceLength) { + return _numInFlightSlices < MaxInFlightSlices && + _numInFlightBytes + sliceLength <= MaxInFlightBytes; } } diff --git a/SSMP/Networking/Client/ClientConnectionManager.cs b/SSMP/Networking/Client/ClientConnectionManager.cs index 542f4170..87e6cf97 100644 --- a/SSMP/Networking/Client/ClientConnectionManager.cs +++ b/SSMP/Networking/Client/ClientConnectionManager.cs @@ -27,6 +27,11 @@ internal class ClientConnectionManager : ConnectionManager { /// public event Action? ServerInfoReceivedEvent; + /// + /// Whether chunked addon payloads are allowed to be dispatched yet. + /// + public bool AllowAddonChunks { get; set; } + /// /// Construct the connection manager with the given packet manager and chunk sender, and receiver instances. /// Will register handlers in the packet manager that relate to the connection. @@ -102,7 +107,32 @@ private void OnChunkReceived(Packet.Packet packet) { return; } - // Let the packet manager handle the connection packet, which will invoke the relevant data handlers - PacketManager.HandleClientConnectionPacket(connectionPacket); + var packetData = connectionPacket.GetPacketData(); + if (packetData.ContainsKey(ClientConnectionPacketId.ServerInfo)) { + PacketManager.HandleClientConnectionPacket(connectionPacket); + return; + } + + if (!packetData.TryGetValue(ClientConnectionPacketId.ChunkAddonData, out var chunkAddonDataRaw)) { + Logger.Debug("Received unexpected connection chunk packet with no supported payload"); + return; + } + + if (!AllowAddonChunks) { + Logger.Warn("Discarded chunked addon payload before the client completed connection setup"); + return; + } + + var chunkAddonData = (ChunkAddonData) chunkAddonDataRaw; + if (!ClientConnectionPacket.AddonPacketInfoDict.TryGetValue(chunkAddonData.AddonId, out var addonPacketInfo)) { + Logger.Warn($"Received chunked addon payload for unknown addon ID {chunkAddonData.AddonId}"); + return; + } + + var packetDataInstance = addonPacketInfo.PacketDataInstantiator.Invoke(chunkAddonData.PacketId); + packetDataInstance.ReadData(new Packet.Packet(chunkAddonData.Payload)); + PacketManager.HandleClientAddonPacketSingle( + chunkAddonData.AddonId, chunkAddonData.PacketId, packetDataInstance + ); } } diff --git a/SSMP/Networking/Client/ClientUpdateManager.cs b/SSMP/Networking/Client/ClientUpdateManager.cs index 8d2c878b..7c09e7d6 100644 --- a/SSMP/Networking/Client/ClientUpdateManager.cs +++ b/SSMP/Networking/Client/ClientUpdateManager.cs @@ -58,23 +58,21 @@ private PacketDataCollection GetOrCreateCollection(ServerUpdatePacketId pa } /// - /// Set slice data in the current packet. + /// Send a slice packet immediately, bypassing the gameplay tick loop. /// /// The ID of the chunk the slice belongs to. /// The ID of the slice within the chunk. /// The number of slices in the chunk. /// The raw data in the slice as a byte array. - public void SetSliceData(byte chunkId, byte sliceId, byte numSlices, byte[] data) { - var sliceData = new SliceData { + public void SetSliceData(byte chunkId, ushort sliceId, ushort numSlices, byte[] data) { + var slicePacket = new ServerUpdatePacket(); + slicePacket.SetSendingPacketData(ServerUpdatePacketId.Slice, new SliceData { ChunkId = chunkId, SliceId = sliceId, NumSlices = numSlices, Data = data - }; - - lock (Lock) { - CurrentUpdatePacket.SetSendingPacketData(ServerUpdatePacketId.Slice, sliceData); - } + }); + SendSlicePacket(slicePacket); } /// diff --git a/SSMP/Networking/Client/NetClient.cs b/SSMP/Networking/Client/NetClient.cs index 1e7d2826..2b002d32 100644 --- a/SSMP/Networking/Client/NetClient.cs +++ b/SSMP/Networking/Client/NetClient.cs @@ -102,7 +102,9 @@ public NetClient(PacketManager packetManager) { // Create chunk sender/receiver with delegates to the update manager _chunkSender = new ChunkSender(UpdateManager.SetSliceData); _chunkReceiver = new ChunkReceiver(UpdateManager.SetSliceAckData); + UpdateManager.EnqueueChunkPacketAction = _chunkSender.EnqueuePacket; _connectionManager = new ClientConnectionManager(_packetManager, _chunkSender, _chunkReceiver); + EnterPreAuthChunkMode(); _connectionManager.ServerInfoReceivedEvent += OnServerInfoReceived; } @@ -124,20 +126,36 @@ public void Connect( List addonData, IEncryptedTransport transport ) { + var disconnectExisting = false; + // Prevent multiple simultaneous connection attempts lock (_connectionLock) { - if (ConnectionStatus == ClientConnectionStatus.Connecting) { - Logger.Warn("Connection attempt already in progress, ignoring duplicate request"); - return; + switch (ConnectionStatus) { + case ClientConnectionStatus.Connecting: + Logger.Warn("Connection attempt already in progress, ignoring duplicate request"); + return; + case ClientConnectionStatus.Connected: + disconnectExisting = true; + break; + case ClientConnectionStatus.NotConnected: + default: + ConnectionStatus = ClientConnectionStatus.Connecting; + break; } + } - if (ConnectionStatus == ClientConnectionStatus.Connected) { - Logger.Warn("Already connected, disconnecting first"); - // Don't fire DisconnectEvent when transitioning to a new connection - InternalDisconnect(shouldFireEvent: false); - } + if (disconnectExisting) { + Logger.Warn("Already connected, disconnecting first"); + InternalDisconnect(shouldFireEvent: false); - ConnectionStatus = ClientConnectionStatus.Connecting; + lock (_connectionLock) { + if (ConnectionStatus == ClientConnectionStatus.Connecting) { + Logger.Warn("Connection attempt already in progress, ignoring duplicate request"); + return; + } + + ConnectionStatus = ClientConnectionStatus.Connecting; + } } // Start a new thread for establishing the connection, otherwise Unity will hang @@ -147,15 +165,25 @@ IEncryptedTransport transport _transport.DataReceivedEvent += OnReceiveData; _transport.Connect(address, port); - UpdateManager.Transport = _transport; - UpdateManager.Reset(); - UpdateManager.StartUpdates(); - _chunkSender.Start(); + lock (_connectionLock) { + if (ConnectionStatus != ClientConnectionStatus.Connecting) { + // Connection attempt was aborted (disconnected or disposed) + _transport.DataReceivedEvent -= OnReceiveData; + _transport.Disconnect(); + return; + } + + UpdateManager.Transport = _transport; + UpdateManager.Reset(); + UpdateManager.StartUpdates(); + _chunkSender.Start(); + EnterPreAuthChunkMode(); - // Subscribes all connection methods to timeout events - UpdateManager.TimeoutEvent += OnConnectTimedOut; + // Subscribes all connection methods to timeout events + UpdateManager.TimeoutEvent += OnConnectTimedOut; - _connectionManager.StartConnection(username, authKey, addonData); + _connectionManager.StartConnection(username, authKey, addonData); + } } catch (TlsTimeoutException) { Logger.Info("DTLS connection timed out"); HandleConnectFailed(new ConnectionFailedResult { Reason = ConnectionFailedReason.TimedOut }); @@ -177,23 +205,37 @@ IEncryptedTransport transport /// /// Disconnect from the current server. /// - public void Disconnect() { - lock (_connectionLock) { - InternalDisconnect(); - } + public void Disconnect() => InternalDisconnect(); + + /// + /// Dispose of the network client and its owned disposable resources. + /// + public void Dispose() { + InternalDisconnect(shouldFireEvent: false); + _chunkSender.Dispose(); } /// - /// Internal disconnect implementation without locking (assumes caller holds lock). + /// Internal disconnect implementation that transitions the connection state first and performs blocking teardown + /// outside the connection lock. /// /// Whether to fire DisconnectEvent. Set to false when cleaning up an old connection /// before immediately starting a new one. private void InternalDisconnect(bool shouldFireEvent = true) { - if (ConnectionStatus == ClientConnectionStatus.NotConnected) { - return; - } + IEncryptedTransport? transportToDisconnect; + bool wasConnectedOrConnecting; + + lock (_connectionLock) { + if (ConnectionStatus == ClientConnectionStatus.NotConnected) { + return; + } - var wasConnectedOrConnecting = ConnectionStatus != ClientConnectionStatus.NotConnected; + wasConnectedOrConnecting = true; + ConnectionStatus = ClientConnectionStatus.NotConnected; + transportToDisconnect = _transport; + _transport = null; + _leftoverData = null; + } try { UpdateManager.StopUpdates(); @@ -201,23 +243,19 @@ private void InternalDisconnect(bool shouldFireEvent = true) { UpdateManager.TimeoutEvent -= OnUpdateTimedOut; _chunkSender.Stop(); _chunkReceiver.Reset(); + EnterPreAuthChunkMode(); - if (_transport != null) { - _transport.DataReceivedEvent -= OnReceiveData; - _transport.Disconnect(); + if (transportToDisconnect != null) { + transportToDisconnect.DataReceivedEvent -= OnReceiveData; + transportToDisconnect.Disconnect(); } } catch (Exception e) { Logger.Error($"Error in NetClient.InternalDisconnect: {e}"); } - ConnectionStatus = ClientConnectionStatus.NotConnected; - // Clear all client addon packet handlers, because their IDs become invalid _packetManager.ClearClientAddonUpdatePacketHandlers(); - // Clear leftover data - _leftoverData = null; - // Fire DisconnectEvent on main thread for all disconnects (internal or explicit) // This provides a consistent notification for observers to clean up resources if (shouldFireEvent && wasConnectedOrConnecting) { @@ -258,7 +296,7 @@ private void OnReceiveData(byte[] buffer, int length) { // Route all transports through UpdateManager for sequence/ACK tracking // UpdateManager will skip UDP-specific logic for Steam transports UpdateManager.OnReceivePacket(clientUpdatePacket); - + // First check for slice or slice ack data and handle it separately by passing it onto either the chunk // sender or chunk receiver var packetData = clientUpdatePacket.GetPacketData(); @@ -294,6 +332,8 @@ private void OnServerInfoReceived(ServerInfo serverInfo) { ConnectionStatus = ClientConnectionStatus.Connected; } + EnterPostAuthChunkMode(); + ThreadUtil.RunActionOnMainThread(() => { try { ConnectEvent?.Invoke(serverInfo); @@ -306,12 +346,12 @@ private void OnServerInfoReceived(ServerInfo serverInfo) { } // Connection rejected - var result = serverInfo.ConnectionResult == ServerConnectionResult.InvalidAddons + ConnectionFailedResult result = serverInfo.ConnectionResult == ServerConnectionResult.InvalidAddons ? new ConnectionInvalidAddonsResult { Reason = ConnectionFailedReason.InvalidAddons, AddonData = serverInfo.AddonData } - : (ConnectionFailedResult) new ConnectionFailedMessageResult { + : new ConnectionFailedMessageResult { Reason = ConnectionFailedReason.Other, Message = serverInfo.ConnectionRejectedMessage }; @@ -341,12 +381,25 @@ private void OnUpdateTimedOut() { /// The connection failed result containing failure details. private void HandleConnectFailed(ConnectionFailedResult result) { lock (_connectionLock) { - InternalDisconnect(); + if (ConnectionStatus != ClientConnectionStatus.Connecting) { + return; + } } + InternalDisconnect(); ThreadUtil.RunActionOnMainThread(() => { ConnectFailedEvent?.Invoke(result); }); } + private void EnterPreAuthChunkMode() { + _chunkReceiver.MaxAllowedChunkSize = ConnectionManager.MaxPreAuthChunkSize; + _connectionManager.AllowAddonChunks = false; + } + + private void EnterPostAuthChunkMode() { + _chunkReceiver.MaxAllowedChunkSize = ConnectionManager.MaxChunkSize; + _connectionManager.AllowAddonChunks = true; + } + /// public IClientAddonNetworkSender GetNetworkSender( ClientAddon addon @@ -355,7 +408,7 @@ ClientAddon addon // Check whether there already is a network sender for the given addon if (addon.NetworkSender != null) { - if (!(addon.NetworkSender is IClientAddonNetworkSender addonNetworkSender)) { + if (addon.NetworkSender is not IClientAddonNetworkSender addonNetworkSender) { throw new InvalidOperationException( "Cannot request network senders with differing generic parameters" ); diff --git a/SSMP/Networking/ConnectionManager.cs b/SSMP/Networking/ConnectionManager.cs index 61515159..796a19cf 100644 --- a/SSMP/Networking/ConnectionManager.cs +++ b/SSMP/Networking/ConnectionManager.cs @@ -19,13 +19,19 @@ internal abstract class ConnectionManager(PacketManager packetManager) { /// /// The maximum number of slices in a chunk. /// - public const int MaxSlicesPerChunk = 256; + public const int MaxSlicesPerChunk = 32768; /// /// The maximum size of a chunk in bytes. /// public const int MaxChunkSize = MaxSliceSize * MaxSlicesPerChunk; + /// + /// The maximum chunk size accepted before a connection is fully established. + /// Keeps handshake traffic well below the full post-auth chunk budget. + /// + public const int MaxPreAuthChunkSize = 256 * 1024; + /// /// The number of milliseconds a connection attempt can maximally take before being timed out. /// diff --git a/SSMP/Networking/Packet/ChunkAddonPacketBuilder.cs b/SSMP/Networking/Packet/ChunkAddonPacketBuilder.cs new file mode 100644 index 00000000..c05b0091 --- /dev/null +++ b/SSMP/Networking/Packet/ChunkAddonPacketBuilder.cs @@ -0,0 +1,94 @@ +using System; +using SSMP.Networking.Packet.Connection; +using SSMP.Networking.Packet.Data; + +namespace SSMP.Networking.Packet; + +/// +/// Builds the connection-packet envelopes used for large chunked addon payloads. +/// +internal static class ChunkAddonPacketBuilder { + /// + /// Build a chunked addon payload destined for a client. + /// + public static Packet BuildClientBound(byte packetId, byte addonId, IPacketData packetData) { + return BuildPacket( + new ClientConnectionPacket(), + ClientConnectionPacketId.ChunkAddonData, + packetId, + addonId, + packetData + ); + } + + /// + /// Build a chunked addon payload destined for the server. + /// + public static Packet BuildServerBound(byte packetId, byte addonId, IPacketData packetData) { + return BuildPacket( + new ServerConnectionPacket(), + ServerConnectionPacketId.ChunkAddonData, + packetId, + addonId, + packetData + ); + } + + /// + /// Build a chunk-transport connection packet that wraps addon payload data in a + /// envelope. + /// + /// + /// The outer connection packet instance used to serialize the chunk addon envelope. + /// + /// + /// The connection-packet ID that identifies the payload as . + /// + /// The addon-local packet ID to embed in the envelope. + /// The addon ID that owns the packet payload. + /// The addon payload to serialize and wrap. + /// + /// A serialized packet ready for chunk transport, guaranteed to be larger than + /// and no larger than . + /// + /// + /// Thrown when the serialized packet exceeds or when + /// the payload is small enough to fit in the standard non-chunk addon transport. + /// + private static Packet BuildPacket( + BasePacket connectionPacket, + TPacketId chunkAddonPacketId, + byte packetId, + byte addonId, + IPacketData packetData + ) where TPacketId : Enum { + // TODO: Revisit this path in a dedicated feature branch and remove the intermediate payload array copy. + var payloadPacket = new Packet(); + packetData.WriteData(payloadPacket); + + connectionPacket.SetSendingPacketData( + chunkAddonPacketId, + new ChunkAddonData { + AddonId = addonId, + PacketId = packetId, + Payload = payloadPacket.ToArray() + } + ); + + var packet = new Packet(); + connectionPacket.CreatePacket(packet); + + return packet.Length switch { + > ConnectionManager.MaxChunkSize => throw new ArgumentException( + $"Addon packet data size ({packet.Length} bytes) exceeds the maximum chunk size ({ConnectionManager.MaxChunkSize} bytes).", + nameof(packetData) + ), + <= ushort.MaxValue => throw new ArgumentException( + $"Addon packet data size ({packet.Length} bytes) is not larger than ushort.MaxValue ({ushort.MaxValue}). " + + "For payloads smaller than or equal to ushort.MaxValue, please use standard updates instead of chunk transport.", + nameof(packetData) + ), + _ => packet + }; + } +} diff --git a/SSMP/Networking/Packet/Connection/ClientConnectionPacket.cs b/SSMP/Networking/Packet/Connection/ClientConnectionPacket.cs index 90f3db09..c317c6da 100644 --- a/SSMP/Networking/Packet/Connection/ClientConnectionPacket.cs +++ b/SSMP/Networking/Packet/Connection/ClientConnectionPacket.cs @@ -8,11 +8,10 @@ namespace SSMP.Networking.Packet.Connection; internal class ClientConnectionPacket : BasePacket { /// protected override IPacketData InstantiatePacketDataFromId(ClientConnectionPacketId packetId) { - switch (packetId) { - case ClientConnectionPacketId.ServerInfo: - return new ServerInfo(); - default: - return new EmptyData(); - } + return packetId switch { + ClientConnectionPacketId.ServerInfo => new ServerInfo(), + ClientConnectionPacketId.ChunkAddonData => new ChunkAddonData(), + _ => new EmptyData() + }; } } diff --git a/SSMP/Networking/Packet/Connection/ClientConnectionPacketId.cs b/SSMP/Networking/Packet/Connection/ClientConnectionPacketId.cs index f085328f..55aab8d4 100644 --- a/SSMP/Networking/Packet/Connection/ClientConnectionPacketId.cs +++ b/SSMP/Networking/Packet/Connection/ClientConnectionPacketId.cs @@ -8,4 +8,9 @@ internal enum ClientConnectionPacketId { /// Information about the server meant for the client detailing whether the connection was accepted. /// ServerInfo = 0, + + /// + /// Chunk-only large addon payload sent over the connection chunk path. + /// + ChunkAddonData = 1, } diff --git a/SSMP/Networking/Packet/Connection/ServerConnectionPacket.cs b/SSMP/Networking/Packet/Connection/ServerConnectionPacket.cs index 175e0dbe..d6c17aba 100644 --- a/SSMP/Networking/Packet/Connection/ServerConnectionPacket.cs +++ b/SSMP/Networking/Packet/Connection/ServerConnectionPacket.cs @@ -8,11 +8,10 @@ namespace SSMP.Networking.Packet.Connection; internal class ServerConnectionPacket : BasePacket { /// protected override IPacketData InstantiatePacketDataFromId(ServerConnectionPacketId packetId) { - switch (packetId) { - case ServerConnectionPacketId.ClientInfo: - return new ClientInfo(); - default: - return new EmptyData(); - } + return packetId switch { + ServerConnectionPacketId.ClientInfo => new ClientInfo(), + ServerConnectionPacketId.ChunkAddonData => new ChunkAddonData(), + _ => new EmptyData() + }; } } diff --git a/SSMP/Networking/Packet/Connection/ServerConnectionPacketId.cs b/SSMP/Networking/Packet/Connection/ServerConnectionPacketId.cs index 76f42fc6..e1da4ad5 100644 --- a/SSMP/Networking/Packet/Connection/ServerConnectionPacketId.cs +++ b/SSMP/Networking/Packet/Connection/ServerConnectionPacketId.cs @@ -8,4 +8,9 @@ internal enum ServerConnectionPacketId { /// Information about the client that the server can use to determine whether to accept the connection. /// ClientInfo = 0, + + /// + /// Chunk-only large addon payload sent over the connection chunk path. + /// + ChunkAddonData = 1, } diff --git a/SSMP/Networking/Packet/Data/ChunkAddonData.cs b/SSMP/Networking/Packet/Data/ChunkAddonData.cs new file mode 100644 index 00000000..fd0d1806 --- /dev/null +++ b/SSMP/Networking/Packet/Data/ChunkAddonData.cs @@ -0,0 +1,51 @@ +using System; + +namespace SSMP.Networking.Packet.Data; + +/// +/// Dedicated chunk-only envelope for large addon payloads. +/// This bypasses the normal BasePacket addon framing so it can carry payloads above ushort.MaxValue. +/// +internal sealed class ChunkAddonData : IPacketData { + /// + public bool IsReliable => true; + + /// + public bool DropReliableDataIfNewerExists => false; + + /// + /// The addon ID that owns the payload. + /// + public byte AddonId { get; set; } + + /// + /// The addon-local packet ID of the payload. + /// + public byte PacketId { get; set; } + + /// + /// The serialized addon payload bytes. + /// + public byte[] Payload { get; set; } = null!; + + /// + public void WriteData(IPacket packet) { + packet.Write(AddonId); + packet.Write(PacketId); + packet.Write(Payload.Length); + packet.Write(Payload); + } + + /// + public void ReadData(IPacket packet) { + AddonId = packet.ReadByte(); + PacketId = packet.ReadByte(); + + var payloadLength = packet.ReadInt(); + if (payloadLength < 0) { + throw new Exception($"Chunk addon payload length cannot be negative: {payloadLength}"); + } + + Payload = packet.ReadBytes(payloadLength); + } +} diff --git a/SSMP/Networking/Packet/Data/SliceAckData.cs b/SSMP/Networking/Packet/Data/SliceAckData.cs index bd59f97d..9e5cefc3 100644 --- a/SSMP/Networking/Packet/Data/SliceAckData.cs +++ b/SSMP/Networking/Packet/Data/SliceAckData.cs @@ -1,3 +1,5 @@ +using System; + namespace SSMP.Networking.Packet.Data; /// @@ -9,15 +11,14 @@ internal class SliceAckData : IPacketData { /// public bool DropReliableDataIfNewerExists => false; - + /// /// The ID of the chunk that is being networked. /// public byte ChunkId { get; set; } /// - /// The total number of slices in this chunk. Encoded as a byte where all values are shifted by -1 to ensure we - /// can encode 256 as a value, since we don't use 0. + /// The total number of slices in this chunk. /// public ushort NumSlices { get; set; } @@ -31,38 +32,39 @@ internal class SliceAckData : IPacketData { /// public void WriteData(IPacket packet) { packet.Write(ChunkId); - - var encodedNumSlices = (byte) (NumSlices - 1); - packet.Write(encodedNumSlices); + packet.Write(NumSlices); // Keep track of current index for writing ack array var currentIndex = 0; - // Do while loop, since we will always be writing at least a single byte bit flag - do { + while (currentIndex < NumSlices) { packet.Write(CreateAckFlag(currentIndex, currentIndex + 8, Acked)); - // Continue while loop if we need to write another flag, namely when the new starting index is smaller - // than the number of slices - } while ((currentIndex += 8) <= NumSlices); + currentIndex += 8; + } } /// public void ReadData(IPacket packet) { ChunkId = packet.ReadByte(); + NumSlices = packet.ReadUShort(); + + switch (NumSlices) { + case < 1: + throw new Exception("Invalid slice count: NumSlices must be at least 1"); + case > ConnectionManager.MaxSlicesPerChunk: + throw new Exception( + $"Invalid slice count: {NumSlices} exceeds maximum of {ConnectionManager.MaxSlicesPerChunk}" + ); + } - var encodedNumSlices = packet.ReadByte(); - NumSlices = (ushort) (encodedNumSlices + 1); - - var acked = new bool[ConnectionManager.MaxSlicesPerChunk]; + var acked = new bool[NumSlices]; // Keep track of current index for writing to ack array var currentIndex = 0; - // Do while loop, since we will always be reading at least one byte for the bit flag - do { + while (currentIndex < NumSlices) { var flag = packet.ReadByte(); ReadAckFlag(flag, currentIndex, currentIndex + 8, ref acked); - // Continue while loop if we need to read another flag, namely when the new starting index is smaller - // than the number of slices - } while ((currentIndex += 8) <= NumSlices); + currentIndex += 8; + } Acked = acked; } @@ -82,7 +84,7 @@ private static byte CreateAckFlag(int startIndex, int endIndex, bool[] acked) { if (acked.Length <= i) { break; } - + if (acked[i]) { flag |= currentValue; } @@ -104,6 +106,10 @@ private static void ReadAckFlag(byte flag, int startIndex, int endIndex, ref boo byte currentValue = 1; for (var i = startIndex; i < endIndex; i++) { + if (i >= acked.Length) { + break; + } + if ((flag & currentValue) != 0) { acked[i] = true; } diff --git a/SSMP/Networking/Packet/Data/SliceData.cs b/SSMP/Networking/Packet/Data/SliceData.cs index dfa99991..99394f84 100644 --- a/SSMP/Networking/Packet/Data/SliceData.cs +++ b/SSMP/Networking/Packet/Data/SliceData.cs @@ -11,7 +11,7 @@ internal class SliceData : IPacketData { /// public bool DropReliableDataIfNewerExists => false; - + /// /// The ID of the chunk that is being networked. /// @@ -20,11 +20,10 @@ internal class SliceData : IPacketData { /// /// The ID of this slice. /// - public byte SliceId { get; set; } + public ushort SliceId { get; set; } /// - /// The total number of slices in this chunk. It is an unsigned short because we can have 256 slices in a chunk. - /// It is encoded as a byte, where all values are shifted by one since 0 is not used. + /// The total number of slices in this chunk. /// public ushort NumSlices { get; set; } @@ -37,44 +36,48 @@ internal class SliceData : IPacketData { public void WriteData(IPacket packet) { packet.Write(ChunkId); packet.Write(SliceId); - - // Shift all values by -1 so that we can encode 256 as a number of slices - var encodedNumSlices = (byte) (NumSlices - 1); - packet.Write(encodedNumSlices); + packet.Write(NumSlices); var length = Data.Length; if (length > ConnectionManager.MaxSliceSize) { - throw new ArgumentOutOfRangeException(nameof(Data), "Length of data for slice cannot exceed 1024"); + throw new ArgumentOutOfRangeException( + nameof(Data), $"Length of data for slice cannot exceed {ConnectionManager.MaxSliceSize}" + ); } if (SliceId == NumSlices - 1) { packet.Write((ushort) length); } - for (var i = 0; i < length; i++) { - packet.Write(Data[i]); - } + packet.Write(Data); } /// public void ReadData(IPacket packet) { ChunkId = packet.ReadByte(); - SliceId = packet.ReadByte(); + SliceId = packet.ReadUShort(); + NumSlices = packet.ReadUShort(); - // Read the encoded byte and shift it by 1 again - var encodedNumSlices = packet.ReadByte(); - NumSlices = (ushort) (encodedNumSlices + 1); + if (NumSlices is < 1 or > ConnectionManager.MaxSlicesPerChunk) { + throw new Exception($"Invalid slice count: {NumSlices}"); + } + + if (SliceId >= NumSlices) { + throw new Exception($"SliceId {SliceId} exceeds total slices {NumSlices}"); + } ushort length; if (SliceId == NumSlices - 1) { length = packet.ReadUShort(); + if (length is < 1 or > ConnectionManager.MaxSliceSize) { + throw new Exception( + $"Invalid slice data length: {length} must be between 1 and {ConnectionManager.MaxSliceSize}" + ); + } } else { length = ConnectionManager.MaxSliceSize; } - Data = new byte[length]; - for (var i = 0; i < length; i++) { - Data[i] = packet.ReadByte(); - } + Data = packet.ReadBytes(length); } } diff --git a/SSMP/Networking/Packet/IPacket.cs b/SSMP/Networking/Packet/IPacket.cs index 4f8ac4d6..442c407d 100644 --- a/SSMP/Networking/Packet/IPacket.cs +++ b/SSMP/Networking/Packet/IPacket.cs @@ -113,6 +113,12 @@ public interface IPacket { /// The enum type that the set also uses. void WriteBitFlag(ISet set) where TEnum : Enum; + /// + /// Write an array of bytes to the packet. + /// + /// A byte array of values to write. + void Write(byte[] values); + #endregion #region Reading integral numeric types @@ -218,5 +224,12 @@ public interface IPacket { /// The enum type that the set also uses. ISet ReadBitFlag() where TEnum : Enum; + /// + /// Read an array of bytes of the given length from the packet. + /// + /// The length to read. + /// A byte array of the given length containing the content at the current position in the packet. + byte[] ReadBytes(int length); + #endregion } diff --git a/SSMP/Networking/Packet/LengthCountingPacket.cs b/SSMP/Networking/Packet/LengthCountingPacket.cs new file mode 100644 index 00000000..0405aaac --- /dev/null +++ b/SSMP/Networking/Packet/LengthCountingPacket.cs @@ -0,0 +1,133 @@ +using System; +using System.Collections.Generic; +using SSMP.Math; + +namespace SSMP.Networking.Packet; + +/// +/// A lightweight implementation of IPacket that only counts the number of bytes written, +/// completely avoiding any heap allocations or byte copies during size validation. +/// +public sealed class LengthCountingPacket : IPacket { + /// + /// Gets the number of bytes counted. + /// + public int Length { get; private set; } + + /// + /// Resets the byte count back to zero. + /// + public void Reset() => Length = 0; + + /// + public void Write(byte value) => Length += 1; + + /// + public void Write(ushort value) => Length += 2; + + /// + public void Write(uint value) => Length += 4; + + /// + public void Write(ulong value) => Length += 8; + + /// + public void Write(sbyte value) => Length += 1; + + /// + public void Write(short value) => Length += 2; + + /// + public void Write(int value) => Length += 4; + + /// + public void Write(long value) => Length += 8; + + /// + public void Write(float value) => Length += 4; + + /// + public void Write(double value) => Length += 8; + + /// + public void Write(bool value) => Length += 1; + + /// + public void Write(string value) => Length += 2 + System.Text.Encoding.UTF8.GetByteCount(value); + + /// + public void Write(Vector2 value) => Length += 8; + + /// + public void Write(Vector3 value) => Length += 12; + + /// + public void Write(byte[] values) => Length += values.Length; + + /// + public void WriteBitFlag(ISet set) where TEnum : Enum { + var enumLength = Enum.GetValues(typeof(TEnum)).Length; + switch (enumLength) { + case <= 8: + Length += 1; + break; + case <= 16: + Length += 2; + break; + case <= 32: + Length += 4; + break; + case <= 64: + Length += 8; + break; + } + } + + /// + public byte ReadByte() => throw new NotSupportedException(); + + /// + public ushort ReadUShort() => throw new NotSupportedException(); + + /// + public uint ReadUInt() => throw new NotSupportedException(); + + /// + public ulong ReadULong() => throw new NotSupportedException(); + + /// + public sbyte ReadSByte() => throw new NotSupportedException(); + + /// + public short ReadShort() => throw new NotSupportedException(); + + /// + public int ReadInt() => throw new NotSupportedException(); + + /// + public long ReadLong() => throw new NotSupportedException(); + + /// + public float ReadFloat() => throw new NotSupportedException(); + + /// + public double ReadDouble() => throw new NotSupportedException(); + + /// + public bool ReadBool() => throw new NotSupportedException(); + + /// + public string ReadString() => throw new NotSupportedException(); + + /// + public Vector2 ReadVector2() => throw new NotSupportedException(); + + /// + public Vector3 ReadVector3() => throw new NotSupportedException(); + + /// + public ISet ReadBitFlag() where TEnum : Enum => throw new NotSupportedException(); + + /// + public byte[] ReadBytes(int length) => throw new NotSupportedException(); +} diff --git a/SSMP/Networking/Packet/Packet.cs b/SSMP/Networking/Packet/Packet.cs index 537c4d50..e9247478 100644 --- a/SSMP/Networking/Packet/Packet.cs +++ b/SSMP/Networking/Packet/Packet.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Text; +using System.Threading; using SSMP.Math; namespace SSMP.Networking.Packet; @@ -38,6 +39,12 @@ internal class Packet : IPacket { /// public int Length { get; private set; } + /// + /// Thread-local length counting packet for zero-allocation size validation. + /// + private static readonly ThreadLocal LengthCountingPacket = + new(() => new LengthCountingPacket()); + /// /// Creates a packet with the given byte array of data. /// Used when receiving packets to read data from. @@ -83,6 +90,12 @@ public Packet() { /// public void WriteLength() { if (_buffer == null) throw new InvalidOperationException("Cannot write to Read-Only Packet"); + if (_buffer.Count > ushort.MaxValue) { + throw new InvalidOperationException( + $"Packet size ({_buffer.Count} bytes) exceeds the {ushort.MaxValue} bytes limit for normal updates, causing truncation. Please use the SendChunkData API instead." + ); + } + var length = (ushort) _buffer.Count; _buffer.Insert(0, (byte) length); _buffer.Insert(1, (byte) (length >> 8)); @@ -101,6 +114,22 @@ public byte[] ToArray() { return copy; } + /// + /// Validates that the serialized size of the given packet data does not exceed the 64 KiB (65535 bytes) limit, + /// throwing an InvalidOperationException if it does. + /// + /// The packet data to validate. + public static void ValidateSize(IPacketData packetData) { + var counter = LengthCountingPacket.Value!; + counter.Reset(); + packetData.WriteData(counter); + if (counter.Length > ushort.MaxValue) { + throw new InvalidOperationException( + $"Addon packet data size ({counter.Length} bytes) exceeds the {ushort.MaxValue} bytes limit for normal updates. Please use the SendChunkData API instead." + ); + } + } + /// /// Clears the packet buffer, allowing reuse for write-mode packets. /// Resets length and read position to 0. @@ -113,9 +142,12 @@ public void Clear() { if (_buffer == null) throw new InvalidOperationException("Cannot clear Read-Only Packet"); // In write-mode, the default constructor initializes _readableBuffer to an empty array. // If _readableBuffer is non-empty, this packet was created from existing data and should not be cleared. - if (_readableBuffer.Length != 0) - throw new InvalidOperationException("Clear() can only be used on write-mode packets created with the default constructor."); - + if (_readableBuffer.Length != 0) { + throw new InvalidOperationException( + "Clear() can only be used on write-mode packets created with the default constructor." + ); + } + _buffer.Clear(); // Readable buffer assumes it mirrors _buffer in write mode, but usually _readableBuffer is a copy or view. // In Write Mode (constructor Packet()), _readableBuffer is initialized to empty array. diff --git a/SSMP/Networking/Packet/PacketManager.cs b/SSMP/Networking/Packet/PacketManager.cs index f590702c..10d29294 100644 --- a/SSMP/Networking/Packet/PacketManager.cs +++ b/SSMP/Networking/Packet/PacketManager.cs @@ -39,22 +39,26 @@ public delegate void GenericServerPacketHandler(ushort id, TPack /// Manages packets that are received by the given NetClient. /// internal class PacketManager { - #region Standard Packet Registries private readonly PacketHandlerRegistry _clientUpdateRegistry = new( "client update", ClientPacketHandlerRegistryDispatcher.Instance ); - private readonly PacketHandlerRegistry _clientConnectionRegistry = new( - "client connection", ClientPacketHandlerRegistryDispatcher.Instance - ); + + private readonly PacketHandlerRegistry _clientConnectionRegistry = + new( + "client connection", ClientPacketHandlerRegistryDispatcher.Instance + ); + private readonly PacketHandlerRegistry _serverUpdateRegistry = new( "server update", ServerPacketHandlerRegistryDispatcher.Instance ); - private readonly PacketHandlerRegistry _serverConnectionRegistry = new( - "server connection", ServerPacketHandlerRegistryDispatcher.Instance - ); - + + private readonly PacketHandlerRegistry _serverConnectionRegistry = + new( + "server connection", ServerPacketHandlerRegistryDispatcher.Instance + ); + #endregion #region Addon Packet Registries (Nested Dictionaries) @@ -66,18 +70,20 @@ internal class PacketManager { // This preserves the (addonId, packetId) addressing model while reusing PacketHandlerRegistry // for handler management within each addon. - private readonly Dictionary> _clientAddonUpdateRegistries = new(); + private readonly Dictionary> _clientAddonUpdateRegistries = + new(); private readonly Dictionary> _clientAddonConnectionRegistries = new(); - private readonly Dictionary> _serverAddonUpdateRegistries = new(); + private readonly Dictionary> _serverAddonUpdateRegistries = + new(); private readonly Dictionary> _serverAddonConnectionRegistries = new(); - + #endregion - + #region Packet Unpacking Helper @@ -155,7 +161,10 @@ public void HandleClientConnectionPacket(ClientConnectionPacket packet) { } } - private void RegisterClientConnectionPacketHandler(ClientConnectionPacketId packetId, ClientPacketHandler handler) => + private void RegisterClientConnectionPacketHandler( + ClientConnectionPacketId packetId, + ClientPacketHandler handler + ) => _clientConnectionRegistry.Register(packetId, handler); public void RegisterClientConnectionPacketHandler(ClientConnectionPacketId packetId, Action handler) => @@ -219,7 +228,10 @@ public void HandleServerConnectionPacket(ushort id, ServerConnectionPacket packe } } - private void RegisterServerConnectionPacketHandler(ServerConnectionPacketId packetId, ServerPacketHandler handler) => + private void RegisterServerConnectionPacketHandler( + ServerConnectionPacketId packetId, + ServerPacketHandler handler + ) => _serverConnectionRegistry.Register(packetId, handler); public void RegisterServerConnectionPacketHandler( @@ -259,7 +271,22 @@ string registryName ); } - private void RegisterClientAddonHandler( + private static void HandleClientAddonPacketSingle( + byte addonId, + byte packetId, + IPacketData packetData, + Dictionary> registryDict, + string registryName + ) { + if (!registryDict.TryGetValue(addonId, out var registry)) { + Logger.Warn($"There is no {registryName} handler registry for addon ID {addonId}"); + return; + } + + registry.Execute(packetId, handler => handler(packetData)); + } + + private static void RegisterClientAddonHandler( byte addonId, byte packetId, ClientPacketHandler handler, @@ -276,7 +303,7 @@ string nameType registry.Register(packetId, handler); } - private void DeregisterClientAddonHandler( + private static void DeregisterClientAddonHandler( byte addonId, byte packetId, Dictionary> registryDict @@ -296,7 +323,7 @@ Dictionary> registryDict #region Server Addon Helpers - private void HandleServerAddonPacket( + private static void HandleServerAddonPacket( ushort clientId, byte addonId, Dictionary packetDataDict, @@ -314,7 +341,23 @@ string registryName ); } - private void RegisterServerAddonHandler( + private static void HandleServerAddonPacketSingle( + ushort clientId, + byte addonId, + byte packetId, + IPacketData packetData, + Dictionary> registryDict, + string registryName + ) { + if (!registryDict.TryGetValue(addonId, out var registry)) { + Logger.Warn($"There is no {registryName} handler registry for addon ID {addonId}"); + return; + } + + registry.Execute(packetId, handler => handler(clientId, packetData)); + } + + private static void RegisterServerAddonHandler( byte addonId, byte packetId, ServerPacketHandler handler, @@ -331,7 +374,7 @@ string nameType registry.Register(packetId, handler); } - private void DeregisterServerAddonHandler( + private static void DeregisterServerAddonHandler( byte addonId, byte packetId, Dictionary> registryDict @@ -359,6 +402,10 @@ public void DeregisterClientAddonUpdatePacketHandler(byte addonId, byte packetId public void ClearClientAddonUpdatePacketHandlers() => _clientAddonUpdateRegistries.Clear(); + public void HandleClientAddonPacketSingle(byte addonId, byte packetId, IPacketData packetData) => + HandleClientAddonPacketSingle( + addonId, packetId, packetData, _clientAddonUpdateRegistries, "client addon update" + ); public void RegisterClientAddonConnectionPacketHandler(byte addonId, byte packetId, ClientPacketHandler handler) => RegisterClientAddonHandler(addonId, packetId, handler, _clientAddonConnectionRegistries, "connection"); @@ -378,6 +425,11 @@ public void RegisterServerAddonUpdatePacketHandler(byte addonId, byte packetId, public void DeregisterServerAddonUpdatePacketHandler(byte addonId, byte packetId) => DeregisterServerAddonHandler(addonId, packetId, _serverAddonUpdateRegistries); + public void HandleServerAddonPacketSingle(ushort clientId, byte addonId, byte packetId, IPacketData packetData) => + HandleServerAddonPacketSingle( + clientId, addonId, packetId, packetData, _serverAddonUpdateRegistries, "server addon update" + ); + public void RegisterServerAddonConnectionPacketHandler(byte addonId, byte packetId, ServerPacketHandler handler) => RegisterServerAddonHandler(addonId, packetId, handler, _serverAddonConnectionRegistries, "connection"); @@ -434,10 +486,14 @@ private static List ByteArrayToPackets(byte[] data, int length, ref byte var packetLength = (int) packetLengthValue; // Sanity check against allocation attacks or corruption. - // If the length reads as invalid, we imply that protocol framing is lost (e.g. we are reading garbage as length). - // In this case, we cannot safely find the next packet in the stream, so we must discard the rest of the buffer. + // If the length reads as invalid, we imply that protocol framing is lost (e.g. we are reading garbage as + // length). + // In this case, we cannot safely find the next packet in the stream, so we must discard the rest of the + // buffer. if (packetLength == 0) { - Logger.Warn($"Invalid packet length read: {packetLength}. Discarding buffer to prevent processing garbage."); + Logger.Warn( + $"Invalid packet length read: {packetLength}. Discarding buffer to prevent processing garbage." + ); break; } diff --git a/SSMP/Networking/Server/NetServer.cs b/SSMP/Networking/Server/NetServer.cs index 955b560d..ee29ca70 100644 --- a/SSMP/Networking/Server/NetServer.cs +++ b/SSMP/Networking/Server/NetServer.cs @@ -246,7 +246,7 @@ private void HandleClientTimeout(NetServerClient client) { ClientTimeoutEvent?.Invoke(id); } - client.Disconnect(); + client.Dispose(); _transportServer?.DisconnectClient(client.TransportClient); _clientsById.TryRemove(id, out _); @@ -285,8 +285,8 @@ private void HandleClientPackets(NetServerClient client, List pac client.UpdateManager.OnReceivePacket(serverUpdatePacket); var packetData = serverUpdatePacket.GetPacketData(); - if (packetData.Remove(ServerUpdatePacketId.Slice, out var sliceData)) { - client.ChunkReceiver.ProcessReceivedData((SliceData) sliceData); + if (packetData.Remove(ServerUpdatePacketId.Slice, out var sliceDataRaw)) { + client.ChunkReceiver.ProcessReceivedData((SliceData) sliceDataRaw); } if (packetData.Remove(ServerUpdatePacketId.SliceAck, out var sliceAckData)) { @@ -327,6 +327,8 @@ private void OnConnectionRequest(ushort clientId, ClientInfo clientInfo, ServerI Logger.Debug("Connection has finished sending data, registering client"); client.IsRegistered = true; + client.ChunkReceiver.MaxAllowedChunkSize = ConnectionManager.MaxChunkSize; + client.ConnectionManager.AllowAddonChunks = true; client.ConnectionManager.StopAcceptingConnection(); } ); @@ -387,6 +389,8 @@ public void Stop() { // Wait for processing thread to exit gracefully (with timeout) if (_processingThread is { IsAlive: true }) { if (!_processingThread.Join(1000)) { + // TODO: Stop currently falls back to best-effort teardown after the join timeout. + // TODO: Revisit shutdown sequencing so processing cannot observe partially torn-down state. Logger.Warn("Processing thread did not exit within timeout"); } @@ -405,7 +409,7 @@ public void Stop() { // Clean up existing clients foreach (var client in _clientsById.Values) { - client.Disconnect(); + client.Dispose(); } _clientsById.Clear(); @@ -440,7 +444,7 @@ public void OnClientDisconnect(ushort id) { return; } - client.Disconnect(); + client.Dispose(); _transportServer?.DisconnectClient(client.TransportClient); _clientsById.TryRemove(id, out _); @@ -539,11 +543,13 @@ Func packetInstantiator } // After we know that this call did not use a different generic, we can update packet info - ServerUpdatePacket.AddonPacketInfoDict[addon.Id.Value] = new AddonPacketInfo( + var addonPacketInfo = new AddonPacketInfo( // Transform the packet instantiator function from a TPacketId as parameter to byte networkReceiver?.TransformPacketInstantiator(packetInstantiator)!, (byte) Enum.GetValues(typeof(TPacketId)).Length ); + ServerUpdatePacket.AddonPacketInfoDict[addon.Id.Value] = addonPacketInfo; + ServerConnectionPacket.AddonPacketInfoDict[addon.Id.Value] = addonPacketInfo; return (addon.NetworkReceiver as IServerAddonNetworkReceiver)!; } diff --git a/SSMP/Networking/Server/NetServerClient.cs b/SSMP/Networking/Server/NetServerClient.cs index 3743c7c7..6cf3309e 100644 --- a/SSMP/Networking/Server/NetServerClient.cs +++ b/SSMP/Networking/Server/NetServerClient.cs @@ -1,3 +1,4 @@ +using System; using System.Collections.Concurrent; using SSMP.Networking.Chunk; using SSMP.Networking.Packet; @@ -8,7 +9,22 @@ namespace SSMP.Networking.Server; /// /// A client managed by the server. This is only used for communication from server to client. /// -internal class NetServerClient { +internal class NetServerClient : IDisposable { + /// + /// Lock object for atomic static ID generation. + /// + private static readonly object IdLock = new(); + + /// + /// Lock object for thread-safe disposal. + /// + private readonly object _disposeLock = new(); + + /// + /// Flag indicating whether this client has been disposed. + /// + private bool _isDisposed; + /// /// Concurrent dictionary for the set of IDs that are used. We use a dictionary because there is no /// standard implementation for a concurrent set. @@ -70,14 +86,17 @@ public NetServerClient(IEncryptedTransportClient transportClient, PacketManager }; // Create chunk sender/receiver with delegates to the update manager ChunkSender = new ChunkSender(UpdateManager.SetSliceData); - ChunkReceiver = new ChunkReceiver(UpdateManager.SetSliceAckData); + ChunkReceiver = new ChunkReceiver(UpdateManager.SetSliceAckData) { + MaxAllowedChunkSize = Networking.ConnectionManager.MaxPreAuthChunkSize + }; + UpdateManager.EnqueueChunkPacketAction = ChunkSender.EnqueuePacket; ConnectionManager = new ServerConnectionManager(packetManager, ChunkSender, ChunkReceiver, Id); } /// /// Disconnect the client from the server. /// - public void Disconnect() { + private void Disconnect() { UsedIds.TryRemove(Id, out _); UpdateManager.StopUpdates(); @@ -87,13 +106,31 @@ public void Disconnect() { ConnectionManager.StopAcceptingConnection(); } + /// + /// Dispose of the client and its owned disposable resources. + /// + public void Dispose() { + lock (_disposeLock) { + if (_isDisposed) { + return; + } + + _isDisposed = true; + } + + Disconnect(); + ChunkSender.Dispose(); + } + /// /// Resets the static ID counter and used IDs. /// Should be called when the server is stopped to ensure the next server session starts with ID 0. /// public static void ResetIds() { - UsedIds.Clear(); - _lastId = 0; + lock (IdLock) { + UsedIds.Clear(); + _lastId = 0; + } } /// @@ -101,12 +138,14 @@ public static void ResetIds() { /// /// An unused ID. private static ushort GetId() { - ushort newId; - do { - newId = _lastId++; - } while (UsedIds.ContainsKey(newId)); - - UsedIds[newId] = 0; - return newId; + lock (IdLock) { + ushort newId; + do { + newId = _lastId++; + } while (UsedIds.ContainsKey(newId)); + + UsedIds[newId] = 0; + return newId; + } } } diff --git a/SSMP/Networking/Server/ServerConnectionManager.cs b/SSMP/Networking/Server/ServerConnectionManager.cs index 8fcc2c79..eb91e317 100644 --- a/SSMP/Networking/Server/ServerConnectionManager.cs +++ b/SSMP/Networking/Server/ServerConnectionManager.cs @@ -16,6 +16,7 @@ internal class ServerConnectionManager : ConnectionManager { /// Server-side chunk sender used to handle sending chunks. /// private readonly ChunkSender _chunkSender; + /// /// Server-side chunk received used to receive chunks. /// @@ -35,11 +36,17 @@ internal class ServerConnectionManager : ConnectionManager { /// Event that is called when the client has sent the client info, and thus we can check the connection request. /// public event Action? ConnectionRequestEvent; + /// /// Event that is called when the connection times out. /// public event Action? ConnectionTimeoutEvent; + /// + /// Whether chunked addon payloads are allowed to be dispatched. + /// + public bool AllowAddonChunks { get; set; } + public ServerConnectionManager( PacketManager packetManager, ChunkSender chunkSender, @@ -65,7 +72,7 @@ ushort clientId /// public void StartAcceptingConnection() { Logger.Debug("StartAcceptingConnection"); - + _timeoutTimer.Start(); } @@ -74,7 +81,7 @@ public void StartAcceptingConnection() { /// public void StopAcceptingConnection() { Logger.Debug("StopAcceptingConnection"); - + _timeoutTimer.Stop(); } @@ -108,7 +115,7 @@ public ServerInfo ProcessClientInfo(ClientInfo clientInfo) { } SendServerInfo(serverInfo); - + return serverInfo; } @@ -137,6 +144,32 @@ private void OnChunkReceived(Packet.Packet packet) { return; } - PacketManager.HandleServerConnectionPacket(_clientId, connectionPacket); + var packetData = connectionPacket.GetPacketData(); + if (packetData.ContainsKey(ServerConnectionPacketId.ClientInfo)) { + PacketManager.HandleServerConnectionPacket(_clientId, connectionPacket); + return; + } + + if (!packetData.TryGetValue(ServerConnectionPacketId.ChunkAddonData, out var chunkAddonDataRaw)) { + Logger.Debug($"Received unexpected connection chunk packet from client: {_clientId}"); + return; + } + + if (!AllowAddonChunks) { + Logger.Warn($"Rejected pre-registration chunked addon payload from client: {_clientId}"); + return; + } + + var chunkAddonData = (ChunkAddonData) chunkAddonDataRaw; + if (!ServerConnectionPacket.AddonPacketInfoDict.TryGetValue(chunkAddonData.AddonId, out var addonPacketInfo)) { + Logger.Warn($"Received chunked addon payload for unknown addon ID {chunkAddonData.AddonId}"); + return; + } + + var packetDataInstance = addonPacketInfo.PacketDataInstantiator.Invoke(chunkAddonData.PacketId); + packetDataInstance.ReadData(new Packet.Packet(chunkAddonData.Payload)); + PacketManager.HandleServerAddonPacketSingle( + _clientId, chunkAddonData.AddonId, chunkAddonData.PacketId, packetDataInstance + ); } } diff --git a/SSMP/Networking/Server/ServerUpdateManager.cs b/SSMP/Networking/Server/ServerUpdateManager.cs index 38389ad3..47dd6e91 100644 --- a/SSMP/Networking/Server/ServerUpdateManager.cs +++ b/SSMP/Networking/Server/ServerUpdateManager.cs @@ -95,23 +95,21 @@ private PacketDataCollection GetOrCreateCollection(ClientUpdatePacketId pa } /// - /// Set slice data in the current packet. + /// Send a slice packet immediately, bypassing the gameplay tick loop. /// /// The ID of the chunk the slice belongs to. /// The ID of the slice within the chunk. /// The number of slices in the chunk. /// The raw data in the slice as a byte array. - public void SetSliceData(byte chunkId, byte sliceId, byte numSlices, byte[] data) { - var sliceData = new SliceData { + public void SetSliceData(byte chunkId, ushort sliceId, ushort numSlices, byte[] data) { + var slicePacket = new ClientUpdatePacket(); + slicePacket.SetSendingPacketData(ClientUpdatePacketId.Slice, new SliceData { ChunkId = chunkId, SliceId = sliceId, NumSlices = numSlices, Data = data - }; - - lock (Lock) { - CurrentUpdatePacket.SetSendingPacketData(ClientUpdatePacketId.Slice, sliceData); - } + }); + SendSlicePacket(slicePacket); } /// diff --git a/SSMP/Networking/UpdateManager.cs b/SSMP/Networking/UpdateManager.cs index 2571170b..e1dd5730 100644 --- a/SSMP/Networking/UpdateManager.cs +++ b/SSMP/Networking/UpdateManager.cs @@ -113,6 +113,11 @@ internal abstract class UpdateManager /// protected object Lock { get; } = new(); + /// + /// Lock object for serializing all transport writes to prevent concurrent/parallel socket writes. + /// + private readonly object _transportSendLock = new(); + /// /// Whether the transport requires application-level reliability. Protected for subclass access. /// @@ -182,6 +187,17 @@ private void InitializeManagersIfNeeded() { /// public event Action? TimeoutEvent; + /// + /// Action to enqueue a packet for chunked sending. + /// + public Action? EnqueueChunkPacketAction { get; set; } + + /// + /// Enqueues a packet to be sent reliably as a chunk to the destination. + /// + /// The packet to send. + public void SendChunkPacket(Packet.Packet packet) => EnqueueChunkPacketAction?.Invoke(packet); + /// /// Construct the update manager with a UDP socket. /// @@ -216,10 +232,19 @@ public void StopUpdates() { _isUpdating = false; Logger.Debug("Stopping UDP updates, sending last packet"); + // TODO: Split "flush final packet" from teardown so disconnect/shutdown does not depend on a synchronous + // TODO: transport write on the caller thread. CreateAndSendPacket(); _cancellationTokenSource?.Cancel(); + + lock (Lock) { + Monitor.PulseAll(Lock); + } + // Wait for thread to finish before disposing shared resources - _sendThread?.Join(); + if (Thread.CurrentThread != _sendThread) + _sendThread?.Join(); + _sendThread = null; _cancellationTokenSource?.Dispose(); _cancellationTokenSource = null; @@ -287,6 +312,8 @@ private void CreateAndSendPacket() { if (_requiresSequencing) { CurrentUpdatePacket.Sequence = _localSequence; CurrentUpdatePacket.Ack = _remoteSequence; + // TODO: PopulateAckField is called while Lock is already held and still re-locks internally. + // TODO: Flatten this re-entrant lock boundary in a dedicated concurrency cleanup pass. PopulateAckField(); } @@ -301,6 +328,9 @@ private void CreateAndSendPacket() { // but keep the original instance for reliability data re-sending packetToSend = CurrentUpdatePacket; CurrentUpdatePacket = new TOutgoing(); + + // Pulse to wake up any threads waiting to set new slice data + Monitor.PulseAll(Lock); } // Track send time for RTT measurement (all transports) @@ -433,6 +463,7 @@ private void SendLoop() { if (cts == null) { return; } + var token = cts.Token; // Safety constant: how many ms can we fall behind before giving up? @@ -509,22 +540,24 @@ private void SendPacket(Packet.Packet packet, bool isReliable) { var buffer = packet.ToArray(); var length = buffer.Length; - switch (_transportSender) { - case IReliableTransport reliableTransport when isReliable: - reliableTransport.SendReliable(buffer, 0, length); - break; + lock (_transportSendLock) { + switch (_transportSender) { + case IReliableTransport reliableTransport when isReliable: + reliableTransport.SendReliable(buffer, 0, length); + break; - case IEncryptedTransport transport: - transport.Send(buffer, 0, length); - break; + case IEncryptedTransport transport: + transport.Send(buffer, 0, length); + break; - case IReliableTransportClient reliableTransportClient when isReliable: - reliableTransportClient.SendReliable(buffer, 0, length); - break; + case IReliableTransportClient reliableTransportClient when isReliable: + reliableTransportClient.SendReliable(buffer, 0, length); + break; - case IEncryptedTransportClient transportClient: - transportClient.Send(buffer, 0, length); - break; + case IEncryptedTransportClient transportClient: + transportClient.Send(buffer, 0, length); + break; + } } } @@ -542,6 +575,7 @@ public void SetAddonData( IPacketData packetData ) { lock (Lock) { + Packet.Packet.ValidateSize(packetData); var addonPacketData = GetOrCreateAddonPacketData(addonId, packetIdSize); addonPacketData.PacketData[packetId] = packetData; } @@ -564,21 +598,39 @@ TPacketData packetData ) where TPacketData : IPacketData, new() { lock (Lock) { var addonPacketData = GetOrCreateAddonPacketData(addonId, packetIdSize); + var isNew = false; if (!addonPacketData.PacketData.TryGetValue(packetId, out var existingPacketData)) { existingPacketData = new PacketDataCollection(); addonPacketData.PacketData[packetId] = existingPacketData; + isNew = true; } if (existingPacketData is not RawPacketDataCollection existingDataCollection) { throw new InvalidOperationException("Could not add addon data with existing non-collection data"); } + var previousCount = existingDataCollection.DataInstances.Count; if (packetData is RawPacketDataCollection packetDataAsCollection) { existingDataCollection.DataInstances.AddRange(packetDataAsCollection.DataInstances); } else { existingDataCollection.DataInstances.Add(packetData); } + + try { + Packet.Packet.ValidateSize(existingPacketData); + } catch { + var addedCount = existingDataCollection.DataInstances.Count - previousCount; + if (addedCount > 0) { + existingDataCollection.DataInstances.RemoveRange(previousCount, addedCount); + } + + if (isNew) { + addonPacketData.PacketData.Remove(packetId); + } + + throw; + } } } @@ -597,4 +649,35 @@ private AddonPacketData GetOrCreateAddonPacketData(byte addonId, byte packetIdSi CurrentUpdatePacket.SetSendingAddonPacketData(addonId, addonPacketData); return addonPacketData; } + + /// + /// Sends a slice packet immediately, bypassing the gameplay tick send loop. + /// + protected void SendSlicePacket(TOutgoing slicePacket) { + var rawPacket = new Packet.Packet(); + lock (Lock) { + if (_requiresSequencing) { + slicePacket.Sequence = _localSequence; + slicePacket.Ack = _remoteSequence; + + var ackField = slicePacket.AckField; + for (ushort i = 0; i < ConnectionManager.AckSize; i++) { + var pastSequence = (ushort) (_remoteSequence - i - 1); + ackField[i] = _receivedSequences!.Contains(pastSequence); + } + } + + try { + slicePacket.CreatePacket(rawPacket); + } catch (Exception e) { + Logger.Error($"Failed to create slice packet: {e}"); + return; + } + + if (_requiresSequencing) + _localSequence++; + } + + SendPacket(rawPacket, slicePacket.ContainsReliableData); + } }