diff --git a/Broca.ActivityPub.sln b/Broca.ActivityPub.sln index c888043..6ba775f 100644 --- a/Broca.ActivityPub.sln +++ b/Broca.ActivityPub.sln @@ -31,6 +31,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tools", "tools", "{07C2787E EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Broca.SampleData", "tools\Broca.SampleData\Broca.SampleData.csproj", "{EA3C5007-B9B1-431F-A44D-409FA8FBB11E}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Broca.ActivityPub.Persistence.MySql", "src\Broca.ActivityPub.Persistence.MySql\Broca.ActivityPub.Persistence.MySql.csproj", "{8370636B-1F27-4F3F-8DFC-3FC8ADB9125F}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -173,6 +175,18 @@ Global {EA3C5007-B9B1-431F-A44D-409FA8FBB11E}.Release|x64.Build.0 = Release|Any CPU {EA3C5007-B9B1-431F-A44D-409FA8FBB11E}.Release|x86.ActiveCfg = Release|Any CPU {EA3C5007-B9B1-431F-A44D-409FA8FBB11E}.Release|x86.Build.0 = Release|Any CPU + {8370636B-1F27-4F3F-8DFC-3FC8ADB9125F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {8370636B-1F27-4F3F-8DFC-3FC8ADB9125F}.Debug|Any CPU.Build.0 = Debug|Any CPU + {8370636B-1F27-4F3F-8DFC-3FC8ADB9125F}.Debug|x64.ActiveCfg = Debug|Any CPU + {8370636B-1F27-4F3F-8DFC-3FC8ADB9125F}.Debug|x64.Build.0 = Debug|Any CPU + {8370636B-1F27-4F3F-8DFC-3FC8ADB9125F}.Debug|x86.ActiveCfg = Debug|Any CPU + {8370636B-1F27-4F3F-8DFC-3FC8ADB9125F}.Debug|x86.Build.0 = Debug|Any CPU + {8370636B-1F27-4F3F-8DFC-3FC8ADB9125F}.Release|Any CPU.ActiveCfg = Release|Any CPU + {8370636B-1F27-4F3F-8DFC-3FC8ADB9125F}.Release|Any CPU.Build.0 = Release|Any CPU + {8370636B-1F27-4F3F-8DFC-3FC8ADB9125F}.Release|x64.ActiveCfg = Release|Any CPU + {8370636B-1F27-4F3F-8DFC-3FC8ADB9125F}.Release|x64.Build.0 = Release|Any CPU + {8370636B-1F27-4F3F-8DFC-3FC8ADB9125F}.Release|x86.ActiveCfg = Release|Any CPU + {8370636B-1F27-4F3F-8DFC-3FC8ADB9125F}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -189,5 +203,6 @@ Global {664ECEF2-8938-4BEA-9D1E-BC3D0C5EC4D0} = {827E0CD3-B72D-47B6-A68D-7590B98EB39B} {20644776-4EBD-4B61-8007-0A6430BF6CEB} = {827E0CD3-B72D-47B6-A68D-7590B98EB39B} {EA3C5007-B9B1-431F-A44D-409FA8FBB11E} = {07C2787E-EAC7-C090-1BA3-A61EC2A24D84} + {8370636B-1F27-4F3F-8DFC-3FC8ADB9125F} = {827E0CD3-B72D-47B6-A68D-7590B98EB39B} EndGlobalSection EndGlobal diff --git a/docker-compose.mysql.yml b/docker-compose.mysql.yml new file mode 100644 index 0000000..ef073de --- /dev/null +++ b/docker-compose.mysql.yml @@ -0,0 +1,12 @@ +services: + broca-api: + environment: + - Persistence__Driver=MySql + - Persistence__ConnectionString=Server=broca-mysql;Database=broca;User=broca;Password=broca_password; + depends_on: + broca-mysql: + condition: service_healthy + profiles: [] + + broca-mysql: + profiles: [] diff --git a/docker-compose.yml b/docker-compose.yml index 81907ef..58cd939 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,4 +1,26 @@ services: + broca-mysql: + image: mysql:9.1 + container_name: broca-mysql + environment: + - MYSQL_ROOT_PASSWORD=broca_root + - MYSQL_DATABASE=broca + - MYSQL_USER=broca + - MYSQL_PASSWORD=broca_password + volumes: + - broca-mysql-data:/var/lib/mysql + networks: + - broca-network + healthcheck: + test: ["CMD", "mysqladmin", "ping", "-h", "localhost", "-u", "broca", "-pbroca_password"] + interval: 10s + timeout: 5s + retries: 10 + start_period: 30s + restart: unless-stopped + profiles: + - mysql + broca-api: build: context: . @@ -74,6 +96,8 @@ services: volumes: broca-data: driver: local + broca-mysql-data: + driver: local networks: broca-network: diff --git a/src/Broca.API/Broca.API.csproj b/src/Broca.API/Broca.API.csproj index 27d6892..1851367 100644 --- a/src/Broca.API/Broca.API.csproj +++ b/src/Broca.API/Broca.API.csproj @@ -13,6 +13,7 @@ contentFiles + diff --git a/src/Broca.API/Program.cs b/src/Broca.API/Program.cs index ed0ec56..37fc4cc 100644 --- a/src/Broca.API/Program.cs +++ b/src/Broca.API/Program.cs @@ -1,6 +1,7 @@ using Broca.ActivityPub.Client.Extensions; using Broca.ActivityPub.Server.Extensions; using Broca.ActivityPub.Persistence.Extensions; +using Broca.ActivityPub.Persistence.MySql.Extensions; using Broca.ActivityPub.Core.Interfaces; using Broca.ActivityPub.Server.Services; @@ -22,6 +23,8 @@ // Configure persistence layer if (persistenceDriver.Equals("FileSystem", StringComparison.OrdinalIgnoreCase)) builder.Services.AddFileSystemPersistence(dataPath); +else if (persistenceDriver.Equals("MySql", StringComparison.OrdinalIgnoreCase)) + builder.Services.AddMySqlPersistence(builder.Configuration); else builder.Services.AddInMemoryPersistence(); @@ -45,6 +48,12 @@ var app = builder.Build(); +// Apply MySQL migrations on startup +if (persistenceDriver.Equals("MySql", StringComparison.OrdinalIgnoreCase)) +{ + await app.Services.MigrateAsync(); +} + // Initialize system actor on startup using (var scope = app.Services.CreateScope()) { diff --git a/src/Broca.ActivityPub.Persistence.MySql/Broca.ActivityPub.Persistence.MySql.csproj b/src/Broca.ActivityPub.Persistence.MySql/Broca.ActivityPub.Persistence.MySql.csproj new file mode 100644 index 0000000..d709e3d --- /dev/null +++ b/src/Broca.ActivityPub.Persistence.MySql/Broca.ActivityPub.Persistence.MySql.csproj @@ -0,0 +1,26 @@ + + + + + + + + + + + + + + + + + + + + + net9.0 + enable + enable + + + diff --git a/src/Broca.ActivityPub.Persistence.MySql/BrocaDbContext.cs b/src/Broca.ActivityPub.Persistence.MySql/BrocaDbContext.cs new file mode 100644 index 0000000..661692b --- /dev/null +++ b/src/Broca.ActivityPub.Persistence.MySql/BrocaDbContext.cs @@ -0,0 +1,152 @@ +using Broca.ActivityPub.Core.Models; +using Broca.ActivityPub.Persistence.MySql.Entities; +using Microsoft.EntityFrameworkCore; + +namespace Broca.ActivityPub.Persistence.MySql; + +public class BrocaDbContext : DbContext +{ + public BrocaDbContext(DbContextOptions options) : base(options) { } + + public DbSet Actors => Set(); + public DbSet Follows => Set(); + public DbSet CollectionDefinitions => Set(); + public DbSet CollectionItems => Set(); + public DbSet Activities => Set(); + public DbSet DeliveryQueue => Set(); + public DbSet Blobs => Set(); + public DbSet ActorSyncQueue => Set(); + + protected override void OnModelCreating(ModelBuilder modelBuilder) + { + modelBuilder.Entity(e => + { + e.ToTable("Actors"); + e.HasKey(a => a.Username); + e.Property(a => a.Username).HasMaxLength(255); + e.Property(a => a.ActorId).HasMaxLength(2048); + e.Property(a => a.IsLocal).IsRequired(); + e.Property(a => a.Domain).HasMaxLength(255); + e.Property(a => a.ActorJson).HasColumnType("json"); + e.HasIndex(a => a.ActorId); + e.HasIndex(a => a.IsLocal); + e.HasIndex(a => a.Domain); + }); + + modelBuilder.Entity(e => + { + e.ToTable("Follows"); + e.HasKey(f => f.Id); + e.Property(f => f.Id).ValueGeneratedOnAdd(); + e.Property(f => f.Username).HasMaxLength(255); + e.Property(f => f.ActorId).HasMaxLength(2048); + e.Property(f => f.FollowType).HasConversion(); + e.HasIndex(f => new { f.Username, f.FollowType }); + e.HasIndex(f => new { f.Username, f.ActorId, f.FollowType }).IsUnique(); + e.HasOne(f => f.Actor) + .WithMany(a => a.Follows) + .HasForeignKey(f => f.Username) + .OnDelete(DeleteBehavior.Cascade); + }); + + modelBuilder.Entity(e => + { + e.ToTable("CollectionDefinitions"); + e.HasKey(c => new { c.Username, c.CollectionId }); + e.Property(c => c.Username).HasMaxLength(255); + e.Property(c => c.CollectionId).HasMaxLength(255); + e.Property(c => c.DefinitionJson).HasColumnType("json"); + e.HasOne(c => c.Actor) + .WithMany(a => a.CollectionDefinitions) + .HasForeignKey(c => c.Username) + .OnDelete(DeleteBehavior.Cascade); + }); + + modelBuilder.Entity(e => + { + e.ToTable("CollectionItems"); + e.HasKey(c => new { c.Username, c.CollectionId, c.ItemId }); + e.Property(c => c.Username).HasMaxLength(255); + e.Property(c => c.CollectionId).HasMaxLength(255); + e.Property(c => c.ItemId).HasMaxLength(2048); + e.HasOne(c => c.Actor) + .WithMany(a => a.CollectionItems) + .HasForeignKey(c => c.Username) + .OnDelete(DeleteBehavior.Cascade); + e.HasOne(c => c.CollectionDefinition) + .WithMany(d => d.Items) + .HasForeignKey(c => new { c.Username, c.CollectionId }) + .OnDelete(DeleteBehavior.Cascade); + }); + + modelBuilder.Entity(e => + { + e.ToTable("Activities"); + e.HasKey(a => a.Id); + e.Property(a => a.Id).ValueGeneratedOnAdd(); + e.Property(a => a.ActivityId).HasMaxLength(2048); + e.Property(a => a.Username).HasMaxLength(255); + e.Property(a => a.Box).HasMaxLength(10); + e.Property(a => a.ActivityJson).HasColumnType("json"); + e.Property(a => a.ActivityType).HasMaxLength(255); + e.Property(a => a.ObjectId).HasMaxLength(2048); + e.Property(a => a.InReplyTo).HasMaxLength(2048); + e.HasIndex(a => a.ActivityId).IsUnique(); + e.HasIndex(a => new { a.Username, a.Box }); + e.HasIndex(a => a.CreatedAt); + e.HasIndex(a => new { a.ActivityType, a.ObjectId }); + e.HasIndex(a => a.InReplyTo); + e.HasOne(a => a.Actor) + .WithMany(ac => ac.Activities) + .HasForeignKey(a => a.Username) + .OnDelete(DeleteBehavior.Cascade); + }); + + modelBuilder.Entity(e => + { + e.ToTable("DeliveryQueue"); + e.HasKey(d => d.Id); + e.Property(d => d.Id).HasMaxLength(255); + e.Property(d => d.ActivityJson).HasColumnType("json"); + e.Property(d => d.InboxUrl).HasMaxLength(2048); + e.Property(d => d.TargetActorId).HasMaxLength(2048); + e.Property(d => d.SenderActorId).HasMaxLength(2048); + e.Property(d => d.SenderUsername).HasMaxLength(255); + e.Property(d => d.LastError).HasMaxLength(4096); + e.Property(d => d.Status).HasConversion(); + e.HasIndex(d => d.Status); + e.HasIndex(d => d.NextAttemptAt); + e.HasOne(d => d.SenderActor) + .WithMany(a => a.OutboundDeliveries) + .HasForeignKey(d => d.SenderUsername) + .OnDelete(DeleteBehavior.Cascade); + }); + + modelBuilder.Entity(e => + { + e.ToTable("Blobs"); + e.HasKey(b => new { b.Username, b.BlobId }); + e.Property(b => b.Username).HasMaxLength(255); + e.Property(b => b.BlobId).HasMaxLength(255); + e.Property(b => b.ContentType).HasMaxLength(255); + e.Property(b => b.StorageProvider).HasMaxLength(64); + e.Property(b => b.StorageKey).HasMaxLength(2048); + e.Property(b => b.Content).HasColumnType("longblob"); + e.HasIndex(b => b.StorageProvider); + e.HasOne(b => b.Actor) + .WithMany(a => a.Blobs) + .HasForeignKey(b => b.Username) + .OnDelete(DeleteBehavior.Cascade); + }); + + modelBuilder.Entity(e => + { + e.ToTable("ActorSyncQueue"); + e.HasKey(a => a.Id); + e.Property(a => a.Id).ValueGeneratedOnAdd(); + e.Property(a => a.ActorId).HasMaxLength(2048); + e.HasIndex(a => a.ActorId).IsUnique(); + e.HasIndex(a => a.EnqueuedAt); + }); + } +} diff --git a/src/Broca.ActivityPub.Persistence.MySql/Entities/ActivityEntity.cs b/src/Broca.ActivityPub.Persistence.MySql/Entities/ActivityEntity.cs new file mode 100644 index 0000000..91e463f --- /dev/null +++ b/src/Broca.ActivityPub.Persistence.MySql/Entities/ActivityEntity.cs @@ -0,0 +1,16 @@ +namespace Broca.ActivityPub.Persistence.MySql.Entities; + +public class ActivityEntity +{ + public long Id { get; set; } + public string ActivityId { get; set; } = string.Empty; + public string Username { get; set; } = string.Empty; + public string Box { get; set; } = string.Empty; + public string ActivityJson { get; set; } = string.Empty; + public string? ActivityType { get; set; } + public string? ObjectId { get; set; } + public string? InReplyTo { get; set; } + public DateTime CreatedAt { get; set; } = DateTime.UtcNow; + + public ActorEntity? Actor { get; set; } +} diff --git a/src/Broca.ActivityPub.Persistence.MySql/Entities/ActorEntity.cs b/src/Broca.ActivityPub.Persistence.MySql/Entities/ActorEntity.cs new file mode 100644 index 0000000..aba3460 --- /dev/null +++ b/src/Broca.ActivityPub.Persistence.MySql/Entities/ActorEntity.cs @@ -0,0 +1,19 @@ +namespace Broca.ActivityPub.Persistence.MySql.Entities; + +public class ActorEntity +{ + public string Username { get; set; } = string.Empty; + public string? ActorId { get; set; } + public bool IsLocal { get; set; } + public string? Domain { get; set; } + public string ActorJson { get; set; } = string.Empty; + public DateTime CreatedAt { get; set; } = DateTime.UtcNow; + public DateTime UpdatedAt { get; set; } = DateTime.UtcNow; + + public ICollection Follows { get; set; } = new List(); + public ICollection Activities { get; set; } = new List(); + public ICollection CollectionDefinitions { get; set; } = new List(); + public ICollection CollectionItems { get; set; } = new List(); + public ICollection Blobs { get; set; } = new List(); + public ICollection OutboundDeliveries { get; set; } = new List(); +} diff --git a/src/Broca.ActivityPub.Persistence.MySql/Entities/ActorSyncQueueEntity.cs b/src/Broca.ActivityPub.Persistence.MySql/Entities/ActorSyncQueueEntity.cs new file mode 100644 index 0000000..29eb8f8 --- /dev/null +++ b/src/Broca.ActivityPub.Persistence.MySql/Entities/ActorSyncQueueEntity.cs @@ -0,0 +1,8 @@ +namespace Broca.ActivityPub.Persistence.MySql.Entities; + +public class ActorSyncQueueEntity +{ + public long Id { get; set; } + public string ActorId { get; set; } = string.Empty; + public DateTime EnqueuedAt { get; set; } = DateTime.UtcNow; +} diff --git a/src/Broca.ActivityPub.Persistence.MySql/Entities/BlobEntity.cs b/src/Broca.ActivityPub.Persistence.MySql/Entities/BlobEntity.cs new file mode 100644 index 0000000..b6b9e84 --- /dev/null +++ b/src/Broca.ActivityPub.Persistence.MySql/Entities/BlobEntity.cs @@ -0,0 +1,14 @@ +namespace Broca.ActivityPub.Persistence.MySql.Entities; + +public class BlobEntity +{ + public string Username { get; set; } = string.Empty; + public string BlobId { get; set; } = string.Empty; + public string ContentType { get; set; } = "application/octet-stream"; + public string StorageProvider { get; set; } = "mysql"; + public string? StorageKey { get; set; } + public byte[]? Content { get; set; } + public long? Size { get; set; } + + public ActorEntity? Actor { get; set; } +} diff --git a/src/Broca.ActivityPub.Persistence.MySql/Entities/CollectionDefinitionEntity.cs b/src/Broca.ActivityPub.Persistence.MySql/Entities/CollectionDefinitionEntity.cs new file mode 100644 index 0000000..f729be1 --- /dev/null +++ b/src/Broca.ActivityPub.Persistence.MySql/Entities/CollectionDefinitionEntity.cs @@ -0,0 +1,11 @@ +namespace Broca.ActivityPub.Persistence.MySql.Entities; + +public class CollectionDefinitionEntity +{ + public string Username { get; set; } = string.Empty; + public string CollectionId { get; set; } = string.Empty; + public string DefinitionJson { get; set; } = string.Empty; + + public ActorEntity? Actor { get; set; } + public ICollection Items { get; set; } = new List(); +} diff --git a/src/Broca.ActivityPub.Persistence.MySql/Entities/CollectionItemEntity.cs b/src/Broca.ActivityPub.Persistence.MySql/Entities/CollectionItemEntity.cs new file mode 100644 index 0000000..46a08af --- /dev/null +++ b/src/Broca.ActivityPub.Persistence.MySql/Entities/CollectionItemEntity.cs @@ -0,0 +1,11 @@ +namespace Broca.ActivityPub.Persistence.MySql.Entities; + +public class CollectionItemEntity +{ + public string Username { get; set; } = string.Empty; + public string CollectionId { get; set; } = string.Empty; + public string ItemId { get; set; } = string.Empty; + + public ActorEntity? Actor { get; set; } + public CollectionDefinitionEntity? CollectionDefinition { get; set; } +} diff --git a/src/Broca.ActivityPub.Persistence.MySql/Entities/DeliveryQueueEntity.cs b/src/Broca.ActivityPub.Persistence.MySql/Entities/DeliveryQueueEntity.cs new file mode 100644 index 0000000..d821f93 --- /dev/null +++ b/src/Broca.ActivityPub.Persistence.MySql/Entities/DeliveryQueueEntity.cs @@ -0,0 +1,23 @@ +using Broca.ActivityPub.Core.Models; + +namespace Broca.ActivityPub.Persistence.MySql.Entities; + +public class DeliveryQueueEntity +{ + public string Id { get; set; } = string.Empty; + public string ActivityJson { get; set; } = string.Empty; + public string InboxUrl { get; set; } = string.Empty; + public string? TargetActorId { get; set; } + public string SenderActorId { get; set; } = string.Empty; + public string SenderUsername { get; set; } = string.Empty; + public DeliveryStatus Status { get; set; } = DeliveryStatus.Pending; + public int AttemptCount { get; set; } = 0; + public int MaxRetries { get; set; } = 5; + public DateTime CreatedAt { get; set; } = DateTime.UtcNow; + public DateTime? NextAttemptAt { get; set; } + public DateTime? LastAttemptAt { get; set; } + public DateTime? CompletedAt { get; set; } + public string? LastError { get; set; } + + public ActorEntity? SenderActor { get; set; } +} diff --git a/src/Broca.ActivityPub.Persistence.MySql/Entities/FollowEntity.cs b/src/Broca.ActivityPub.Persistence.MySql/Entities/FollowEntity.cs new file mode 100644 index 0000000..807593f --- /dev/null +++ b/src/Broca.ActivityPub.Persistence.MySql/Entities/FollowEntity.cs @@ -0,0 +1,19 @@ +namespace Broca.ActivityPub.Persistence.MySql.Entities; + +public enum FollowType +{ + Follower, + Following, + PendingFollower, + PendingFollowing +} + +public class FollowEntity +{ + public long Id { get; set; } + public string Username { get; set; } = string.Empty; + public string ActorId { get; set; } = string.Empty; + public FollowType FollowType { get; set; } + + public ActorEntity? Actor { get; set; } +} diff --git a/src/Broca.ActivityPub.Persistence.MySql/Extensions/MySqlPersistenceServiceCollectionExtensions.cs b/src/Broca.ActivityPub.Persistence.MySql/Extensions/MySqlPersistenceServiceCollectionExtensions.cs new file mode 100644 index 0000000..828ca72 --- /dev/null +++ b/src/Broca.ActivityPub.Persistence.MySql/Extensions/MySqlPersistenceServiceCollectionExtensions.cs @@ -0,0 +1,61 @@ +using Broca.ActivityPub.Core.Interfaces; +using Broca.ActivityPub.Persistence.MySql.MySql; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; + +namespace Broca.ActivityPub.Persistence.MySql.Extensions; + +public static class MySqlPersistenceServiceCollectionExtensions +{ + public static IServiceCollection AddMySqlPersistence( + this IServiceCollection services, + string connectionString, + string? baseUrl = null) + { + services.Configure(options => + { + options.ConnectionString = connectionString; + if (baseUrl is not null) + options.BaseUrl = baseUrl; + }); + + services.AddPooledDbContextFactory(options => + options.UseMySql(connectionString, ServerVersion.AutoDetect(connectionString))); + + services.AddScoped(); + services.AddScoped(); + services.AddScoped(); + services.AddScoped(); + services.AddSingleton(); + + services.AddScoped(sp => sp.GetRequiredService()); + services.AddScoped(sp => sp.GetRequiredService()); + services.AddScoped(sp => sp.GetRequiredService()); + services.AddScoped(sp => sp.GetRequiredService()); + services.AddScoped(sp => sp.GetRequiredService()); + services.AddScoped(sp => sp.GetRequiredService()); + services.AddScoped(sp => sp.GetRequiredService()); + services.AddSingleton(sp => sp.GetRequiredService()); + + return services; + } + + public static IServiceCollection AddMySqlPersistence( + this IServiceCollection services, + IConfiguration configuration) + { + var connectionString = configuration["Persistence:ConnectionString"] + ?? throw new InvalidOperationException("Persistence:ConnectionString is required for MySQL driver."); + var baseUrl = configuration["ActivityPub:BaseUrl"]; + return services.AddMySqlPersistence(connectionString, baseUrl); + } + + public static async Task MigrateAsync(this IServiceProvider serviceProvider) + { + await using var db = await serviceProvider + .GetRequiredService>() + .CreateDbContextAsync(); + await db.Database.MigrateAsync(); + } +} diff --git a/src/Broca.ActivityPub.Persistence.MySql/MySql/MySqlActivityRepository.cs b/src/Broca.ActivityPub.Persistence.MySql/MySql/MySqlActivityRepository.cs new file mode 100644 index 0000000..e421bf6 --- /dev/null +++ b/src/Broca.ActivityPub.Persistence.MySql/MySql/MySqlActivityRepository.cs @@ -0,0 +1,411 @@ +using System.Text.Json; +using Broca.ActivityPub.Core.Interfaces; +using Broca.ActivityPub.Core.Models; +using Broca.ActivityPub.Persistence.MySql.Entities; +using KristofferStrube.ActivityStreams; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Logging; + +namespace Broca.ActivityPub.Persistence.MySql.MySql; + +public class MySqlActivityRepository : IActivityRepository, IActivityStatistics, ISearchableActivityRepository +{ + private readonly IDbContextFactory _contextFactory; + private readonly ILogger _logger; + private readonly JsonSerializerOptions _jsonOptions; + + public MySqlActivityRepository( + IDbContextFactory contextFactory, + ILogger logger) + { + _contextFactory = contextFactory; + _logger = logger; + _jsonOptions = new JsonSerializerOptions + { + PropertyNamingPolicy = JsonNamingPolicy.CamelCase, + DefaultIgnoreCondition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull + }; + } + + public async Task SaveInboxActivityAsync(string username, string activityId, IObjectOrLink activity, CancellationToken cancellationToken = default) + => await SaveActivityAsync(username, activityId, activity, "inbox", cancellationToken); + + public async Task SaveOutboxActivityAsync(string username, string activityId, IObjectOrLink activity, CancellationToken cancellationToken = default) + => await SaveActivityAsync(username, activityId, activity, "outbox", cancellationToken); + + public async Task> GetInboxActivitiesAsync(string username, int limit = 20, int offset = 0, CancellationToken cancellationToken = default) + => await GetActivitiesAsync(username, "inbox", limit, offset, cancellationToken); + + public async Task> GetOutboxActivitiesAsync(string username, int limit = 20, int offset = 0, CancellationToken cancellationToken = default) + => await GetActivitiesAsync(username, "outbox", limit, offset, cancellationToken, activitiesOnly: true); + + public async Task GetActivityByIdAsync(string activityId, CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + var entity = await db.Activities.FirstOrDefaultAsync(a => a.ActivityId == activityId, cancellationToken); + return entity is null ? null : DeserializeActivity(entity.ActivityJson); + } + + public async Task DeleteActivityAsync(string activityId, CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + await db.Activities.Where(a => a.ActivityId == activityId).ExecuteDeleteAsync(cancellationToken); + } + + public async Task GetInboxCountAsync(string username, CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + return await db.Activities.CountAsync( + a => a.Username == username.ToLowerInvariant() && a.Box == "inbox", + cancellationToken); + } + + public async Task GetOutboxCountAsync(string username, CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + return await db.Activities.CountAsync( + a => a.Username == username.ToLowerInvariant() && a.Box == "outbox", + cancellationToken); + } + + public async Task> GetRepliesAsync(string objectId, int limit = 20, int offset = 0, CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + var entities = await db.Activities + .Where(a => a.InReplyTo == objectId) + .OrderByDescending(a => a.CreatedAt) + .Skip(offset).Take(limit) + .ToListAsync(cancellationToken); + return DeserializeActivities(entities); + } + + public async Task GetRepliesCountAsync(string objectId, CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + return await db.Activities.CountAsync(a => a.InReplyTo == objectId, cancellationToken); + } + + public async Task> GetLikesAsync(string objectId, int limit = 20, int offset = 0, CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + var entities = await db.Activities + .Where(a => a.ActivityType == "Like" && a.ObjectId == objectId) + .OrderByDescending(a => a.CreatedAt) + .Skip(offset).Take(limit) + .ToListAsync(cancellationToken); + return DeserializeActivities(entities); + } + + public async Task GetLikesCountAsync(string objectId, CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + return await db.Activities.CountAsync( + a => a.ActivityType == "Like" && a.ObjectId == objectId, + cancellationToken); + } + + public async Task> GetSharesAsync(string objectId, int limit = 20, int offset = 0, CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + var entities = await db.Activities + .Where(a => a.ActivityType == "Announce" && a.ObjectId == objectId) + .OrderByDescending(a => a.CreatedAt) + .Skip(offset).Take(limit) + .ToListAsync(cancellationToken); + return DeserializeActivities(entities); + } + + public async Task GetSharesCountAsync(string objectId, CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + return await db.Activities.CountAsync( + a => a.ActivityType == "Announce" && a.ObjectId == objectId, + cancellationToken); + } + + public async Task> GetLikedByActorAsync(string username, int limit = 20, int offset = 0, CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + var entities = await db.Activities + .Where(a => a.Username == username.ToLowerInvariant() && a.ActivityType == "Like") + .OrderByDescending(a => a.CreatedAt) + .Skip(offset).Take(limit) + .ToListAsync(cancellationToken); + return DeserializeActivities(entities); + } + + public async Task GetLikedByActorCountAsync(string username, CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + return await db.Activities.CountAsync( + a => a.Username == username.ToLowerInvariant() && a.ActivityType == "Like", + cancellationToken); + } + + public async Task> GetSharedByActorAsync(string username, int limit = 20, int offset = 0, CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + var entities = await db.Activities + .Where(a => a.Username == username.ToLowerInvariant() && a.ActivityType == "Announce") + .OrderByDescending(a => a.CreatedAt) + .Skip(offset).Take(limit) + .ToListAsync(cancellationToken); + return DeserializeActivities(entities); + } + + public async Task GetSharedByActorCountAsync(string username, CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + return await db.Activities.CountAsync( + a => a.Username == username.ToLowerInvariant() && a.ActivityType == "Announce", + cancellationToken); + } + + public async Task MarkObjectAsDeletedAsync(string objectId, CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + var entity = await db.Activities.FirstOrDefaultAsync(a => a.ActivityId == objectId, cancellationToken); + if (entity is not null) + { + var tombstone = new Tombstone { Id = objectId, FormerType = new List { "Note" } }; + entity.ActivityJson = JsonSerializer.Serialize(tombstone, _jsonOptions); + await db.SaveChangesAsync(cancellationToken); + } + } + + public async Task CountCreateActivitiesSinceAsync(DateTime since, CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + return await db.Activities.CountAsync( + a => a.ActivityType == "Create" && a.CreatedAt >= since, + cancellationToken); + } + + public async Task CountActiveActorsSinceAsync(DateTime since, CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + return await db.Activities + .Where(a => a.ActivityType == "Create" && a.CreatedAt >= since && a.Box == "outbox") + .Select(a => a.Username) + .Distinct() + .CountAsync(cancellationToken); + } + + // ISearchableActivityRepository + + public async Task> GetInboxActivitiesAsync( + string username, + CollectionSearchParameters search, + int limit = 20, + int offset = 0, + CancellationToken cancellationToken = default) + { + if (!search.HasSearchCriteria) + return await GetInboxActivitiesAsync(username, limit, offset, cancellationToken); + + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + var query = BuildSearchQuery(db, username, "inbox", search); + var entities = await query.Skip(offset).Take(limit).ToListAsync(cancellationToken); + return DeserializeActivities(entities); + } + + public async Task GetInboxCountAsync( + string username, + CollectionSearchParameters search, + CancellationToken cancellationToken = default) + { + if (!search.HasSearchCriteria) + return await GetInboxCountAsync(username, cancellationToken); + + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + return await BuildSearchQuery(db, username, "inbox", search).CountAsync(cancellationToken); + } + + public async Task> GetOutboxActivitiesAsync( + string username, + CollectionSearchParameters search, + int limit = 20, + int offset = 0, + CancellationToken cancellationToken = default) + { + if (!search.HasSearchCriteria) + return await GetOutboxActivitiesAsync(username, limit, offset, cancellationToken); + + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + var query = BuildSearchQuery(db, username, "outbox", search); + var entities = await query.Skip(offset).Take(limit).ToListAsync(cancellationToken); + return DeserializeActivities(entities); + } + + public async Task GetOutboxCountAsync( + string username, + CollectionSearchParameters search, + CancellationToken cancellationToken = default) + { + if (!search.HasSearchCriteria) + return await GetOutboxCountAsync(username, cancellationToken); + + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + return await BuildSearchQuery(db, username, "outbox", search).CountAsync(cancellationToken); + } + + public async Task> GetRepliesAsync( + string objectId, + CollectionSearchParameters search, + int limit = 20, + int offset = 0, + CancellationToken cancellationToken = default) + { + if (!search.HasSearchCriteria) + return await GetRepliesAsync(objectId, limit, offset, cancellationToken); + + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + var query = db.Activities.Where(a => a.InReplyTo == objectId); + query = ApplySearch(query, search); + var entities = await query.Skip(offset).Take(limit).ToListAsync(cancellationToken); + return DeserializeActivities(entities); + } + + public async Task GetRepliesCountAsync( + string objectId, + CollectionSearchParameters search, + CancellationToken cancellationToken = default) + { + if (!search.HasSearchCriteria) + return await GetRepliesCountAsync(objectId, cancellationToken); + + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + var query = db.Activities.Where(a => a.InReplyTo == objectId); + query = ApplySearch(query, search); + return await query.CountAsync(cancellationToken); + } + + private async Task SaveActivityAsync(string username, string activityId, IObjectOrLink activity, string box, CancellationToken cancellationToken) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + var json = JsonSerializer.Serialize(activity, typeof(IObjectOrLink), _jsonOptions); + var (activityType, objectId, inReplyTo) = ExtractMetadata(activity); + + var existing = await db.Activities.FirstOrDefaultAsync(a => a.ActivityId == activityId, cancellationToken); + if (existing is null) + { + db.Activities.Add(new ActivityEntity + { + ActivityId = activityId, + Username = username.ToLowerInvariant(), + Box = box, + ActivityJson = json, + ActivityType = activityType, + ObjectId = objectId, + InReplyTo = inReplyTo, + CreatedAt = DateTime.UtcNow + }); + } + else + { + existing.ActivityJson = json; + existing.ActivityType = activityType; + existing.ObjectId = objectId; + existing.InReplyTo = inReplyTo; + } + await db.SaveChangesAsync(cancellationToken); + _logger.LogInformation("Saved {Box} activity {ActivityId} for {Username}", box, activityId, username); + } + + private async Task> GetActivitiesAsync( + string username, string box, int limit, int offset, CancellationToken cancellationToken, + bool activitiesOnly = false) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + var query = db.Activities + .Where(a => a.Username == username.ToLowerInvariant() && a.Box == box); + if (activitiesOnly) + query = query.Where(a => a.ActivityType != null); + var entities = await query + .OrderByDescending(a => a.CreatedAt) + .Skip(offset).Take(limit) + .ToListAsync(cancellationToken); + return DeserializeActivities(entities); + } + + private IQueryable BuildSearchQuery(BrocaDbContext db, string username, string box, CollectionSearchParameters search) + { + var query = db.Activities.Where(a => a.Username == username.ToLowerInvariant() && a.Box == box); + return ApplySearch(query, search); + } + + private static IQueryable ApplySearch(IQueryable query, CollectionSearchParameters search) + { + if (!string.IsNullOrWhiteSpace(search.Search)) + query = query.Where(a => a.ActivityJson.Contains(search.Search)); + + if (!string.IsNullOrWhiteSpace(search.OrderBy) && + search.OrderBy.Equals("createdAt", StringComparison.OrdinalIgnoreCase)) + query = query.OrderBy(a => a.CreatedAt); + else + query = query.OrderByDescending(a => a.CreatedAt); + + return query; + } + + private IObjectOrLink? DeserializeActivity(string json) + { + try + { + return JsonSerializer.Deserialize(json, _jsonOptions); + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Failed to deserialize activity JSON"); + return null; + } + } + + private IEnumerable DeserializeActivities(IEnumerable entities) + { + var result = new List(); + foreach (var entity in entities) + { + var activity = DeserializeActivity(entity.ActivityJson); + if (activity is not null) + result.Add(activity); + } + return result; + } + + private static (string? activityType, string? objectId, string? inReplyTo) ExtractMetadata(IObjectOrLink activity) + { + if (activity is not IObject obj) + return (null, null, null); + + var activityType = obj.Type?.FirstOrDefault(); + string? objectId = null; + string? inReplyTo = null; + + var objectProp = obj.GetType().GetProperty("Object"); + if (objectProp?.GetValue(obj) is IEnumerable objects) + { + var first = objects.FirstOrDefault(); + objectId = first switch + { + IObject o when !string.IsNullOrEmpty(o.Id) => o.Id, + ILink l when l.Href != null => l.Href.ToString(), + _ => null + }; + } + + var inReplyToProp = obj.GetType().GetProperty("InReplyTo"); + if (inReplyToProp?.GetValue(obj) is IEnumerable replies) + { + var first = replies.FirstOrDefault(); + inReplyTo = first switch + { + IObject o when !string.IsNullOrEmpty(o.Id) => o.Id, + ILink l when l.Href != null => l.Href.ToString(), + _ => null + }; + } + + return (activityType, objectId, inReplyTo); + } +} diff --git a/src/Broca.ActivityPub.Persistence.MySql/MySql/MySqlActorRepository.cs b/src/Broca.ActivityPub.Persistence.MySql/MySql/MySqlActorRepository.cs new file mode 100644 index 0000000..558e6a1 --- /dev/null +++ b/src/Broca.ActivityPub.Persistence.MySql/MySql/MySqlActorRepository.cs @@ -0,0 +1,324 @@ +using System.Text.Json; +using Broca.ActivityPub.Core.Interfaces; +using Broca.ActivityPub.Core.Models; +using Broca.ActivityPub.Persistence.MySql.Entities; +using KristofferStrube.ActivityStreams; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +namespace Broca.ActivityPub.Persistence.MySql.MySql; + +public class MySqlActorRepository : IActorRepository, IActorStatistics +{ + private readonly IDbContextFactory _contextFactory; + private readonly ILogger _logger; + private readonly JsonSerializerOptions _jsonOptions; + private readonly ActivityPubServerOptions _serverOptions; + + public MySqlActorRepository( + IDbContextFactory contextFactory, + IOptions serverOptions, + ILogger logger) + { + _contextFactory = contextFactory; + _serverOptions = serverOptions.Value; + _logger = logger; + _jsonOptions = new JsonSerializerOptions + { + PropertyNamingPolicy = JsonNamingPolicy.CamelCase, + DefaultIgnoreCondition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull + }; + } + + public async Task GetActorByUsernameAsync(string username, CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + var entity = await db.Actors.FindAsync([username.ToLowerInvariant()], cancellationToken); + return entity is null ? null : JsonSerializer.Deserialize(entity.ActorJson, _jsonOptions); + } + + public async Task GetActorByIdAsync(string actorId, CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + var entity = await db.Actors.FirstOrDefaultAsync(a => a.ActorId == actorId, cancellationToken); + return entity is null ? null : JsonSerializer.Deserialize(entity.ActorJson, _jsonOptions); + } + + public async Task SaveActorAsync(string username, Actor actor, CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + var key = username.ToLowerInvariant(); + var json = JsonSerializer.Serialize(actor, _jsonOptions); + var isLocal = IsLocalActor(actor.Id); + var domain = ExtractDomain(actor.Id); + var existing = await db.Actors.FindAsync([key], cancellationToken); + if (existing is null) + { + db.Actors.Add(new ActorEntity + { + Username = key, + ActorId = actor.Id, + IsLocal = isLocal, + Domain = domain, + ActorJson = json, + CreatedAt = DateTime.UtcNow, + UpdatedAt = DateTime.UtcNow + }); + } + else + { + existing.ActorId = actor.Id; + existing.IsLocal = isLocal; + existing.Domain = domain; + existing.ActorJson = json; + existing.UpdatedAt = DateTime.UtcNow; + } + await db.SaveChangesAsync(cancellationToken); + _logger.LogInformation("Saved actor {Username}", username); + } + + public async Task DeleteActorAsync(string username, CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + var key = username.ToLowerInvariant(); + await db.Actors.Where(a => a.Username == key).ExecuteDeleteAsync(cancellationToken); + } + + public async Task> GetFollowersAsync(string username, CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + return await db.Follows + .Where(f => f.Username == username.ToLowerInvariant() && f.FollowType == FollowType.Follower) + .Select(f => f.ActorId) + .ToListAsync(cancellationToken); + } + + public async Task> GetFollowersAsync(string username, int limit, int offset, CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + return await db.Follows + .Where(f => f.Username == username.ToLowerInvariant() && f.FollowType == FollowType.Follower) + .Select(f => f.ActorId) + .Skip(offset).Take(limit) + .ToListAsync(cancellationToken); + } + + public async Task GetFollowersCountAsync(string username, CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + return await db.Follows.CountAsync( + f => f.Username == username.ToLowerInvariant() && f.FollowType == FollowType.Follower, + cancellationToken); + } + + public async Task> GetFollowingAsync(string username, CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + return await db.Follows + .Where(f => f.Username == username.ToLowerInvariant() && f.FollowType == FollowType.Following) + .Select(f => f.ActorId) + .ToListAsync(cancellationToken); + } + + public async Task> GetFollowingAsync(string username, int limit, int offset, CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + return await db.Follows + .Where(f => f.Username == username.ToLowerInvariant() && f.FollowType == FollowType.Following) + .Select(f => f.ActorId) + .Skip(offset).Take(limit) + .ToListAsync(cancellationToken); + } + + public async Task GetFollowingCountAsync(string username, CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + return await db.Follows.CountAsync( + f => f.Username == username.ToLowerInvariant() && f.FollowType == FollowType.Following, + cancellationToken); + } + + public async Task AddFollowerAsync(string username, string followerActorId, CancellationToken cancellationToken = default) + => await AddFollowAsync(username, followerActorId, FollowType.Follower, cancellationToken); + + public async Task RemoveFollowerAsync(string username, string followerActorId, CancellationToken cancellationToken = default) + => await RemoveFollowAsync(username, followerActorId, FollowType.Follower, cancellationToken); + + public async Task AddFollowingAsync(string username, string followingActorId, CancellationToken cancellationToken = default) + => await AddFollowAsync(username, followingActorId, FollowType.Following, cancellationToken); + + public async Task RemoveFollowingAsync(string username, string followingActorId, CancellationToken cancellationToken = default) + => await RemoveFollowAsync(username, followingActorId, FollowType.Following, cancellationToken); + + public async Task> GetPendingFollowersAsync(string username, CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + return await db.Follows + .Where(f => f.Username == username.ToLowerInvariant() && f.FollowType == FollowType.PendingFollower) + .Select(f => f.ActorId) + .ToListAsync(cancellationToken); + } + + public async Task AddPendingFollowerAsync(string username, string followerActorId, CancellationToken cancellationToken = default) + => await AddFollowAsync(username, followerActorId, FollowType.PendingFollower, cancellationToken); + + public async Task RemovePendingFollowerAsync(string username, string followerActorId, CancellationToken cancellationToken = default) + => await RemoveFollowAsync(username, followerActorId, FollowType.PendingFollower, cancellationToken); + + public async Task> GetPendingFollowingAsync(string username, CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + return await db.Follows + .Where(f => f.Username == username.ToLowerInvariant() && f.FollowType == FollowType.PendingFollowing) + .Select(f => f.ActorId) + .ToListAsync(cancellationToken); + } + + public async Task AddPendingFollowingAsync(string username, string followingActorId, CancellationToken cancellationToken = default) + => await AddFollowAsync(username, followingActorId, FollowType.PendingFollowing, cancellationToken); + + public async Task RemovePendingFollowingAsync(string username, string followingActorId, CancellationToken cancellationToken = default) + => await RemoveFollowAsync(username, followingActorId, FollowType.PendingFollowing, cancellationToken); + + public async Task> GetAllLocalUsernamesAsync(CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + return await db.Actors.Where(a => a.IsLocal).Select(a => a.Username).ToListAsync(cancellationToken); + } + + public async Task> GetCollectionDefinitionsAsync(string username, CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + var entities = await db.CollectionDefinitions + .Where(c => c.Username == username.ToLowerInvariant()) + .ToListAsync(cancellationToken); + return entities + .Select(e => JsonSerializer.Deserialize(e.DefinitionJson, _jsonOptions)!) + .Where(d => d is not null) + .ToList(); + } + + public async Task GetCollectionDefinitionAsync(string username, string collectionId, CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + var entity = await db.CollectionDefinitions.FindAsync( + [username.ToLowerInvariant(), collectionId], cancellationToken); + return entity is null ? null : JsonSerializer.Deserialize(entity.DefinitionJson, _jsonOptions); + } + + public async Task SaveCollectionDefinitionAsync(string username, CustomCollectionDefinition definition, CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + var key = username.ToLowerInvariant(); + var json = JsonSerializer.Serialize(definition, _jsonOptions); + var existing = await db.CollectionDefinitions.FindAsync([key, definition.Id], cancellationToken); + if (existing is null) + db.CollectionDefinitions.Add(new CollectionDefinitionEntity { Username = key, CollectionId = definition.Id, DefinitionJson = json }); + else + existing.DefinitionJson = json; + await db.SaveChangesAsync(cancellationToken); + } + + public async Task DeleteCollectionDefinitionAsync(string username, string collectionId, CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + var key = username.ToLowerInvariant(); + await db.CollectionDefinitions + .Where(c => c.Username == key && c.CollectionId == collectionId) + .ExecuteDeleteAsync(cancellationToken); + } + + public async Task AddToCollectionAsync(string username, string collectionId, string itemId, CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + var key = username.ToLowerInvariant(); + + var definition = await db.CollectionDefinitions.FindAsync([key, collectionId], cancellationToken); + if (definition is not null) + { + var parsed = JsonSerializer.Deserialize(definition.DefinitionJson, _jsonOptions); + if (parsed?.Type == CollectionType.Query) + throw new InvalidOperationException($"Cannot manually add items to query collection {collectionId}"); + } + + var exists = await db.CollectionItems.AnyAsync( + c => c.Username == key && c.CollectionId == collectionId && c.ItemId == itemId, + cancellationToken); + if (!exists) + { + db.CollectionItems.Add(new CollectionItemEntity { Username = key, CollectionId = collectionId, ItemId = itemId }); + await db.SaveChangesAsync(cancellationToken); + } + } + + public async Task RemoveFromCollectionAsync(string username, string collectionId, string itemId, CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + var key = username.ToLowerInvariant(); + + var definition = await db.CollectionDefinitions.FindAsync([key, collectionId], cancellationToken); + if (definition is not null) + { + var parsed = JsonSerializer.Deserialize(definition.DefinitionJson, _jsonOptions); + if (parsed?.Type == CollectionType.Query) + throw new InvalidOperationException($"Cannot manually remove items from query collection {collectionId}"); + } + + await db.CollectionItems + .Where(c => c.Username == key && c.CollectionId == collectionId && c.ItemId == itemId) + .ExecuteDeleteAsync(cancellationToken); + } + + public async Task CountLocalActorsAsync(CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + return await db.Actors.CountAsync(a => a.IsLocal && a.Username != SystemActorUsername, cancellationToken); + } + + private const string SystemActorUsername = "sys"; + + private async Task AddFollowAsync(string username, string actorId, FollowType type, CancellationToken cancellationToken) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + var key = username.ToLowerInvariant(); + var exists = await db.Follows.AnyAsync( + f => f.Username == key && f.ActorId == actorId && f.FollowType == type, + cancellationToken); + if (!exists) + { + db.Follows.Add(new FollowEntity { Username = key, ActorId = actorId, FollowType = type }); + await db.SaveChangesAsync(cancellationToken); + } + } + + private async Task RemoveFollowAsync(string username, string actorId, FollowType type, CancellationToken cancellationToken) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + var key = username.ToLowerInvariant(); + await db.Follows + .Where(f => f.Username == key && f.ActorId == actorId && f.FollowType == type) + .ExecuteDeleteAsync(cancellationToken); + } + + private bool IsLocalActor(string? actorId) + { + if (string.IsNullOrWhiteSpace(actorId)) return false; + var baseUrl = _serverOptions.BaseUrl?.TrimEnd('/'); + if (string.IsNullOrWhiteSpace(baseUrl)) + { + _logger.LogWarning("ActivityPub BaseUrl is not configured; actor locality cannot be determined accurately"); + return false; + } + return actorId.StartsWith(baseUrl, StringComparison.OrdinalIgnoreCase); + } + + private string? ExtractDomain(string? actorId) + { + if (string.IsNullOrWhiteSpace(actorId)) return null; + if (Uri.TryCreate(actorId, UriKind.Absolute, out var uri)) + return uri.Host.ToLowerInvariant(); + _logger.LogWarning("Could not extract domain from actor ID {ActorId}", actorId); + return null; + } +} diff --git a/src/Broca.ActivityPub.Persistence.MySql/MySql/MySqlActorSyncQueue.cs b/src/Broca.ActivityPub.Persistence.MySql/MySql/MySqlActorSyncQueue.cs new file mode 100644 index 0000000..18aab36 --- /dev/null +++ b/src/Broca.ActivityPub.Persistence.MySql/MySql/MySqlActorSyncQueue.cs @@ -0,0 +1,100 @@ +using Broca.ActivityPub.Core.Interfaces; +using Broca.ActivityPub.Persistence.MySql.Entities; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Logging; + +namespace Broca.ActivityPub.Persistence.MySql.MySql; + +public class MySqlActorSyncQueue : IActorSyncQueue +{ + private const int PollIntervalMs = 2000; + + private readonly IDbContextFactory _contextFactory; + private readonly ILogger _logger; + + public MySqlActorSyncQueue( + IDbContextFactory contextFactory, + ILogger logger) + { + _contextFactory = contextFactory; + _logger = logger; + } + + public void Enqueue(string actorId) + { + if (string.IsNullOrWhiteSpace(actorId)) + return; + + // Fire-and-forget: the unique index on ActorId handles deduplication atomically. + _ = Task.Run(async () => + { + try + { + await using var db = await _contextFactory.CreateDbContextAsync(); + db.ActorSyncQueue.Add(new ActorSyncQueueEntity + { + ActorId = actorId, + EnqueuedAt = DateTime.UtcNow + }); + await db.SaveChangesAsync(); + } + catch (DbUpdateException) + { + // Unique constraint violation: actor already queued. This is expected and safe. + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Failed to enqueue actor sync for {ActorId}", actorId); + } + }); + } + + public async Task ReadAsync(CancellationToken cancellationToken = default) + { + while (!cancellationToken.IsCancellationRequested) + { + var actorId = await TryDequeueAsync(cancellationToken); + if (actorId is not null) + return actorId; + + await Task.Delay(PollIntervalMs, cancellationToken); + } + + cancellationToken.ThrowIfCancellationRequested(); + return string.Empty; + } + + // NOTE: This method is safe to call only from background threads without an ASP.NET + // synchronization context, as it blocks synchronously on async code. + public bool TryRead(out string actorId) + { + var result = TryDequeueAsync(CancellationToken.None).GetAwaiter().GetResult(); + if (result is not null) + { + actorId = result; + return true; + } + actorId = string.Empty; + return false; + } + + private async Task TryDequeueAsync(CancellationToken cancellationToken) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + + var entity = await db.ActorSyncQueue + .OrderBy(a => a.EnqueuedAt) + .FirstOrDefaultAsync(cancellationToken); + + if (entity is null) + return null; + + // Optimistic delete: ExecuteDeleteAsync is atomic. If another consumer claimed this + // row first, deleted == 0 and we return null to retry on the next poll cycle. + var deleted = await db.ActorSyncQueue + .Where(a => a.Id == entity.Id) + .ExecuteDeleteAsync(cancellationToken); + + return deleted > 0 ? entity.ActorId : null; + } +} diff --git a/src/Broca.ActivityPub.Persistence.MySql/MySql/MySqlBlobStorageService.cs b/src/Broca.ActivityPub.Persistence.MySql/MySql/MySqlBlobStorageService.cs new file mode 100644 index 0000000..367140d --- /dev/null +++ b/src/Broca.ActivityPub.Persistence.MySql/MySql/MySqlBlobStorageService.cs @@ -0,0 +1,88 @@ +using Broca.ActivityPub.Core.Interfaces; +using Broca.ActivityPub.Persistence.MySql.Entities; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Options; + +namespace Broca.ActivityPub.Persistence.MySql.MySql; + +public class MySqlBlobStorageService : IBlobStorageService +{ + private const string ProviderName = "mysql"; + + private readonly IDbContextFactory _contextFactory; + private readonly string _baseUrl; + + public MySqlBlobStorageService( + IDbContextFactory contextFactory, + IOptions options) + { + _contextFactory = contextFactory; + _baseUrl = options.Value.BaseUrl?.TrimEnd('/') ?? "https://localhost"; + } + + public async Task StoreBlobAsync(string username, string blobId, Stream content, string? contentType = null, CancellationToken cancellationToken = default) + { + await using var ms = new MemoryStream(); + await content.CopyToAsync(ms, cancellationToken); + var data = ms.ToArray(); + + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + var existing = await db.Blobs.FindAsync([username, blobId], cancellationToken); + if (existing is null) + { + db.Blobs.Add(new BlobEntity + { + Username = username, + BlobId = blobId, + ContentType = contentType ?? "application/octet-stream", + StorageProvider = ProviderName, + Content = data, + Size = data.LongLength + }); + } + else + { + existing.ContentType = contentType ?? existing.ContentType; + existing.StorageProvider = ProviderName; + existing.StorageKey = null; + existing.Content = data; + existing.Size = data.LongLength; + } + await db.SaveChangesAsync(cancellationToken); + + return BuildBlobUrl(username, blobId); + } + + public async Task<(Stream Content, string ContentType)?> GetBlobAsync(string username, string blobId, CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + var entity = await db.Blobs.FindAsync([username, blobId], cancellationToken); + if (entity is null) + return null; + + if (entity.StorageProvider != ProviderName || entity.Content is null) + return null; + + return (new MemoryStream(entity.Content), entity.ContentType); + } + + public async Task DeleteBlobAsync(string username, string blobId, CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + await db.Blobs + .Where(b => b.Username == username && b.BlobId == blobId) + .ExecuteDeleteAsync(cancellationToken); + } + + public async Task BlobExistsAsync(string username, string blobId, CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + return await db.Blobs.AnyAsync(b => b.Username == username && b.BlobId == blobId, cancellationToken); + } + + public string BuildBlobUrl(string username, string blobId) + { + var encodedBlobId = Uri.EscapeDataString(blobId); + return $"{_baseUrl}/users/{username}/media/{encodedBlobId}"; + } +} diff --git a/src/Broca.ActivityPub.Persistence.MySql/MySql/MySqlDeliveryQueueRepository.cs b/src/Broca.ActivityPub.Persistence.MySql/MySql/MySqlDeliveryQueueRepository.cs new file mode 100644 index 0000000..d53e7b6 --- /dev/null +++ b/src/Broca.ActivityPub.Persistence.MySql/MySql/MySqlDeliveryQueueRepository.cs @@ -0,0 +1,193 @@ +using System.Text.Json; +using Broca.ActivityPub.Core.Interfaces; +using Broca.ActivityPub.Core.Models; +using Broca.ActivityPub.Persistence.MySql.Entities; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Logging; + +namespace Broca.ActivityPub.Persistence.MySql.MySql; + +public class MySqlDeliveryQueueRepository : IDeliveryQueueRepository +{ + private readonly IDbContextFactory _contextFactory; + private readonly ILogger _logger; + private readonly JsonSerializerOptions _jsonOptions; + + public MySqlDeliveryQueueRepository( + IDbContextFactory contextFactory, + ILogger logger) + { + _contextFactory = contextFactory; + _logger = logger; + _jsonOptions = new JsonSerializerOptions + { + PropertyNamingPolicy = JsonNamingPolicy.CamelCase, + DefaultIgnoreCondition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull + }; + } + + public async Task EnqueueAsync(DeliveryQueueItem item, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(item); + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + db.DeliveryQueue.Add(ToEntity(item)); + await db.SaveChangesAsync(cancellationToken); + } + + public async Task EnqueueBatchAsync(IEnumerable items, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(items); + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + db.DeliveryQueue.AddRange(items.Select(ToEntity)); + await db.SaveChangesAsync(cancellationToken); + } + + public async Task> GetPendingDeliveriesAsync(int batchSize = 100, CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + var now = DateTime.UtcNow; + var entities = await db.DeliveryQueue + .Where(d => d.Status == DeliveryStatus.Pending && + (d.NextAttemptAt == null || d.NextAttemptAt <= now)) + .OrderBy(d => d.CreatedAt) + .Take(batchSize) + .ToListAsync(cancellationToken); + + foreach (var entity in entities) + { + entity.Status = DeliveryStatus.Processing; + entity.LastAttemptAt = now; + entity.AttemptCount++; + } + await db.SaveChangesAsync(cancellationToken); + + return entities.Select(ToModel).ToList(); + } + + public async Task MarkAsDeliveredAsync(string deliveryId, CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + var entity = await db.DeliveryQueue.FindAsync([deliveryId], cancellationToken); + if (entity is not null) + { + entity.Status = DeliveryStatus.Delivered; + entity.CompletedAt = DateTime.UtcNow; + entity.LastError = null; + await db.SaveChangesAsync(cancellationToken); + } + } + + public async Task MarkAsFailedAsync(string deliveryId, string errorMessage, CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + var entity = await db.DeliveryQueue.FindAsync([deliveryId], cancellationToken); + if (entity is not null) + { + entity.LastError = errorMessage; + if (entity.AttemptCount >= entity.MaxRetries) + { + entity.Status = DeliveryStatus.Dead; + } + else + { + entity.Status = DeliveryStatus.Failed; + var delayMinutes = entity.AttemptCount switch + { + 1 => 1, + 2 => 5, + 3 => 15, + 4 => 60, + _ => 240 + }; + entity.NextAttemptAt = DateTime.UtcNow.AddMinutes(delayMinutes); + } + await db.SaveChangesAsync(cancellationToken); + } + } + + public async Task CleanupOldItemsAsync(TimeSpan maxAge, CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + var cutoff = DateTime.UtcNow - maxAge; + await db.DeliveryQueue + .Where(d => (d.Status == DeliveryStatus.Delivered && d.CompletedAt < cutoff) || + (d.Status == DeliveryStatus.Dead && d.LastAttemptAt < cutoff)) + .ExecuteDeleteAsync(cancellationToken); + } + + public async Task GetStatisticsAsync(CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + var groups = await db.DeliveryQueue + .GroupBy(d => d.Status) + .Select(g => new { Status = g.Key, Count = g.Count() }) + .ToListAsync(cancellationToken); + + var oldest = await db.DeliveryQueue + .Where(d => d.Status == DeliveryStatus.Pending) + .OrderBy(d => d.CreatedAt) + .Select(d => (DateTime?)d.CreatedAt) + .FirstOrDefaultAsync(cancellationToken); + + return new DeliveryStatistics + { + PendingCount = groups.FirstOrDefault(g => g.Status == DeliveryStatus.Pending)?.Count ?? 0, + ProcessingCount = groups.FirstOrDefault(g => g.Status == DeliveryStatus.Processing)?.Count ?? 0, + DeliveredCount = groups.FirstOrDefault(g => g.Status == DeliveryStatus.Delivered)?.Count ?? 0, + FailedCount = groups.FirstOrDefault(g => g.Status == DeliveryStatus.Failed)?.Count ?? 0, + DeadCount = groups.FirstOrDefault(g => g.Status == DeliveryStatus.Dead)?.Count ?? 0, + OldestPendingItem = oldest + }; + } + + public async Task> GetAllForDiagnosticsAsync(int maxResults = 100, CancellationToken cancellationToken = default) + { + await using var db = await _contextFactory.CreateDbContextAsync(cancellationToken); + var entities = await db.DeliveryQueue + .OrderByDescending(d => d.CreatedAt) + .Take(maxResults) + .ToListAsync(cancellationToken); + return entities.Select(ToModel).ToList(); + } + + private DeliveryQueueEntity ToEntity(DeliveryQueueItem item) => new() + { + Id = item.Id, + ActivityJson = JsonSerializer.Serialize(item.Activity, typeof(KristofferStrube.ActivityStreams.IObjectOrLink), _jsonOptions), + InboxUrl = item.InboxUrl, + TargetActorId = item.TargetActorId, + SenderActorId = item.SenderActorId, + SenderUsername = item.SenderUsername, + Status = item.Status, + AttemptCount = item.AttemptCount, + MaxRetries = item.MaxRetries, + CreatedAt = item.CreatedAt, + NextAttemptAt = item.NextAttemptAt, + LastAttemptAt = item.LastAttemptAt, + CompletedAt = item.CompletedAt, + LastError = item.LastError + }; + + private DeliveryQueueItem ToModel(DeliveryQueueEntity entity) + { + var activity = JsonSerializer.Deserialize( + entity.ActivityJson, _jsonOptions)!; + return new DeliveryQueueItem + { + Id = entity.Id, + Activity = activity, + InboxUrl = entity.InboxUrl, + TargetActorId = entity.TargetActorId, + SenderActorId = entity.SenderActorId, + SenderUsername = entity.SenderUsername, + Status = entity.Status, + AttemptCount = entity.AttemptCount, + MaxRetries = entity.MaxRetries, + CreatedAt = entity.CreatedAt, + NextAttemptAt = entity.NextAttemptAt, + LastAttemptAt = entity.LastAttemptAt, + CompletedAt = entity.CompletedAt, + LastError = entity.LastError + }; + } +} diff --git a/src/Broca.ActivityPub.Persistence.MySql/MySqlPersistenceOptions.cs b/src/Broca.ActivityPub.Persistence.MySql/MySqlPersistenceOptions.cs new file mode 100644 index 0000000..1c92f28 --- /dev/null +++ b/src/Broca.ActivityPub.Persistence.MySql/MySqlPersistenceOptions.cs @@ -0,0 +1,7 @@ +namespace Broca.ActivityPub.Persistence.MySql; + +public class MySqlPersistenceOptions +{ + public string ConnectionString { get; set; } = string.Empty; + public string? BaseUrl { get; set; } +}