diff --git a/api/cmd/main.go b/api/cmd/main.go index 4fff1d2..a21a64c 100644 --- a/api/cmd/main.go +++ b/api/cmd/main.go @@ -26,6 +26,7 @@ import ( "github.com/streamspace-dev/streamspace/api/internal/handlers" "github.com/streamspace-dev/streamspace/api/internal/k8s" "github.com/streamspace-dev/streamspace/api/internal/middleware" + internalplugins "github.com/streamspace-dev/streamspace/api/internal/plugins" "github.com/streamspace-dev/streamspace/api/internal/quota" "github.com/streamspace-dev/streamspace/api/internal/services" "github.com/streamspace-dev/streamspace/api/internal/sync" @@ -36,15 +37,15 @@ import ( func main() { // Configuration from environment port := getEnv("API_PORT", "8000") - tlsCertFile := os.Getenv("TLS_CERT_FILE") // Path to TLS certificate file (PEM format) - tlsKeyFile := os.Getenv("TLS_KEY_FILE") // Path to TLS private key file (PEM format) - agentCACertFile := os.Getenv("AGENT_CA_CERT_FILE") // Path to CA cert for validating agent client certs (enables mTLS) + tlsCertFile := os.Getenv("TLS_CERT_FILE") // Path to TLS certificate file (PEM format) + tlsKeyFile := os.Getenv("TLS_KEY_FILE") // Path to TLS private key file (PEM format) + agentCACertFile := os.Getenv("AGENT_CA_CERT_FILE") // Path to CA cert for validating agent client certs (enables mTLS) requireClientCert := getEnv("REQUIRE_CLIENT_CERT", "false") == "true" // Require client cert (only with mTLS) - rateLimitEnabled := getEnv("RATE_LIMIT_ENABLED", "true") == "true" // Enable rate limiting (default: true) - rateLimitRPM := getEnvInt("RATE_LIMIT_REQUESTS_PER_MINUTE", 60) // Requests per minute (default: 60) + rateLimitEnabled := getEnv("RATE_LIMIT_ENABLED", "true") == "true" // Enable rate limiting (default: true) + rateLimitRPM := getEnvInt("RATE_LIMIT_REQUESTS_PER_MINUTE", 60) // Requests per minute (default: 60) // rateLimitBurst := getEnvInt("RATE_LIMIT_BURST", 10) // Burst capacity (default: 10) - reserved for future use auditLogEnabled := getEnv("AUDIT_LOG_ENABLED", "true") == "true" // Enable audit logging (default: true) - auditLogBodies := getEnv("AUDIT_LOG_BODIES", "false") == "true" // Log request bodies (default: false for privacy) + auditLogBodies := getEnv("AUDIT_LOG_BODIES", "false") == "true" // Log request bodies (default: false for privacy) dbHost := getEnv("DB_HOST", "localhost") dbPort := getEnv("DB_PORT", "5432") dbUser := getEnv("DB_USER", "streamspace") @@ -52,6 +53,7 @@ func main() { dbName := getEnv("DB_NAME", "streamspace") dbSSLMode := getEnv("DB_SSL_MODE", "disable") // SECURITY: Should be "require" in production pluginDir := getEnv("PLUGIN_DIR", "./plugins") + pluginRepositoryURL := getEnv("PLUGIN_REPOSITORY_URL", "https://raw.githubusercontent.com/JoshuaAFerguson/streamspace-plugins/main") log.Println("Starting StreamSpace API Server...") @@ -112,11 +114,20 @@ func main() { } defer redisCache.Close() - // Initialize Kubernetes client + // Initialize Kubernetes client. Optional in v2.0+ — the downstream + // handler signature explicitly accepts nil for standalone-API and + // docker-platform deployments (see line ~360 below). Only hard-fail + // when the operator declared kubernetes as the platform; otherwise + // log + continue with k8sClient=nil. log.Println("Initializing Kubernetes client...") k8sClient, err := k8s.NewClient() if err != nil { - log.Fatalf("Failed to initialize Kubernetes client: %v", err) + platformEnv := os.Getenv("PLATFORM") + if platformEnv == "" || platformEnv == events.PlatformKubernetes { + log.Fatalf("Failed to initialize Kubernetes client: %v", err) + } + log.Printf("Kubernetes client init failed (PLATFORM=%s, continuing without it): %v", platformEnv, err) + k8sClient = nil } // Initialize stub event publisher (NATS removed - WebSocket used instead) @@ -361,7 +372,10 @@ func main() { activityHandler := handlers.NewActivityHandler(k8sClient, activityTracker, database) catalogHandler := handlers.NewCatalogHandler(database) sharingHandler := handlers.NewSharingHandler(database) - pluginHandler := handlers.NewPluginHandler(database, pluginDir) + pluginRuntime := internalplugins.NewRuntimeV2(database, pluginDir) + pluginMarketplace := internalplugins.NewPluginMarketplace(database, pluginRepositoryURL, pluginDir) + pluginMarketplaceHandler := handlers.NewPluginMarketplaceHandler(database, pluginMarketplace, pluginRuntime) + pluginHandler := handlers.NewPluginHandler(database, pluginDir, pluginRuntime) dashboardHandler := handlers.NewDashboardHandler(database, k8sClient) sessionActivityHandler := handlers.NewSessionActivityHandler(database) apiKeyHandler := handlers.NewAPIKeyHandler(database) @@ -395,6 +409,18 @@ func main() { agentWebSocketHandler := handlers.NewAgentWebSocketHandler(agentHub, database) selkiesProxyHandler := handlers.NewSelkiesProxyHandler(database, agentHub, "streamspace") + if err := pluginMarketplace.SyncCatalog(context.Background()); err != nil { + log.Printf("Warning: Initial plugin catalog sync failed: %v", err) + } + if err := pluginRuntime.Start(context.Background()); err != nil { + log.Printf("Warning: Plugin runtime failed to start: %v", err) + } + defer func() { + if err := pluginRuntime.Stop(context.Background()); err != nil { + log.Printf("Warning: Plugin runtime failed to stop cleanly: %v", err) + } + }() + // SECURITY: Initialize webhook authentication webhookSecret := os.Getenv("WEBHOOK_SECRET") if webhookSecret == "" { @@ -403,7 +429,7 @@ func main() { } // Setup routes - setupRoutes(router, apiHandler, userHandler, groupHandler, authHandler, activityHandler, catalogHandler, sharingHandler, pluginHandler, dashboardHandler, sessionActivityHandler, apiKeyHandler, teamHandler, preferencesHandler, notificationsHandler, searchHandler, sessionTemplatesHandler, batchHandler, monitoringHandler, quotasHandler, nodeHandler, wsManager, consoleHandler, collaborationHandler, integrationsHandler, loadBalancingHandler, schedulingHandler, securityHandler, templateVersioningHandler, setupHandler, applicationHandler, auditHandler, configurationHandler, licenseHandler, recordingHandler, agentHandler, agentWebSocketHandler, selkiesProxyHandler, jwtManager, userDB, database, redisCache, webhookSecret, rateLimitEnabled, rateLimitRPM) + setupRoutes(router, apiHandler, userHandler, groupHandler, authHandler, activityHandler, catalogHandler, sharingHandler, pluginHandler, pluginMarketplaceHandler, dashboardHandler, sessionActivityHandler, apiKeyHandler, teamHandler, preferencesHandler, notificationsHandler, searchHandler, sessionTemplatesHandler, batchHandler, monitoringHandler, quotasHandler, nodeHandler, wsManager, consoleHandler, collaborationHandler, integrationsHandler, loadBalancingHandler, schedulingHandler, securityHandler, templateVersioningHandler, setupHandler, applicationHandler, auditHandler, configurationHandler, licenseHandler, recordingHandler, agentHandler, agentWebSocketHandler, selkiesProxyHandler, jwtManager, userDB, database, redisCache, webhookSecret, rateLimitEnabled, rateLimitRPM) // SECURITY: Configure mTLS for agent authentication (optional) var tlsConfig *tls.Config @@ -424,9 +450,9 @@ func main() { // Configure TLS with client certificate validation tlsConfig = &tls.Config{ - ClientCAs: caCertPool, + ClientCAs: caCertPool, ClientAuth: tls.VerifyClientCertIfGiven, // Default: optional client cert - MinVersion: tls.VersionTLS12, // Enforce TLS 1.2+ + MinVersion: tls.VersionTLS12, // Enforce TLS 1.2+ } // If REQUIRE_CLIENT_CERT is true, make client certs mandatory @@ -548,7 +574,7 @@ func main() { log.Println("Graceful shutdown completed") } -func setupRoutes(router *gin.Engine, h *api.Handler, userHandler *handlers.UserHandler, groupHandler *handlers.GroupHandler, authHandler *auth.AuthHandler, activityHandler *handlers.ActivityHandler, catalogHandler *handlers.CatalogHandler, sharingHandler *handlers.SharingHandler, pluginHandler *handlers.PluginHandler, dashboardHandler *handlers.DashboardHandler, sessionActivityHandler *handlers.SessionActivityHandler, apiKeyHandler *handlers.APIKeyHandler, teamHandler *handlers.TeamHandler, preferencesHandler *handlers.PreferencesHandler, notificationsHandler *handlers.NotificationsHandler, searchHandler *handlers.SearchHandler, sessionTemplatesHandler *handlers.SessionTemplatesHandler, batchHandler *handlers.BatchHandler, monitoringHandler *handlers.MonitoringHandler, quotasHandler *handlers.QuotasHandler, nodeHandler *handlers.NodeHandler, wsManager *internalWebsocket.Manager, consoleHandler *handlers.ConsoleHandler, collaborationHandler *handlers.CollaborationHandler, integrationsHandler *handlers.IntegrationsHandler, loadBalancingHandler *handlers.LoadBalancingHandler, schedulingHandler *handlers.SchedulingHandler, securityHandler *handlers.SecurityHandler, templateVersioningHandler *handlers.TemplateVersioningHandler, setupHandler *handlers.SetupHandler, applicationHandler *handlers.ApplicationHandler, auditHandler *handlers.AuditHandler, configurationHandler *handlers.ConfigurationHandler, licenseHandler *handlers.LicenseHandler, recordingHandler *handlers.RecordingHandler, agentHandler *handlers.AgentHandler, agentWebSocketHandler *handlers.AgentWebSocketHandler, selkiesProxyHandler *handlers.SelkiesProxyHandler, jwtManager *auth.JWTManager, userDB *db.UserDB, database *db.Database, redisCache *cache.Cache, webhookSecret string, rateLimitEnabled bool, rateLimitRPM int) { +func setupRoutes(router *gin.Engine, h *api.Handler, userHandler *handlers.UserHandler, groupHandler *handlers.GroupHandler, authHandler *auth.AuthHandler, activityHandler *handlers.ActivityHandler, catalogHandler *handlers.CatalogHandler, sharingHandler *handlers.SharingHandler, pluginHandler *handlers.PluginHandler, pluginMarketplaceHandler *handlers.PluginMarketplaceHandler, dashboardHandler *handlers.DashboardHandler, sessionActivityHandler *handlers.SessionActivityHandler, apiKeyHandler *handlers.APIKeyHandler, teamHandler *handlers.TeamHandler, preferencesHandler *handlers.PreferencesHandler, notificationsHandler *handlers.NotificationsHandler, searchHandler *handlers.SearchHandler, sessionTemplatesHandler *handlers.SessionTemplatesHandler, batchHandler *handlers.BatchHandler, monitoringHandler *handlers.MonitoringHandler, quotasHandler *handlers.QuotasHandler, nodeHandler *handlers.NodeHandler, wsManager *internalWebsocket.Manager, consoleHandler *handlers.ConsoleHandler, collaborationHandler *handlers.CollaborationHandler, integrationsHandler *handlers.IntegrationsHandler, loadBalancingHandler *handlers.LoadBalancingHandler, schedulingHandler *handlers.SchedulingHandler, securityHandler *handlers.SecurityHandler, templateVersioningHandler *handlers.TemplateVersioningHandler, setupHandler *handlers.SetupHandler, applicationHandler *handlers.ApplicationHandler, auditHandler *handlers.AuditHandler, configurationHandler *handlers.ConfigurationHandler, licenseHandler *handlers.LicenseHandler, recordingHandler *handlers.RecordingHandler, agentHandler *handlers.AgentHandler, agentWebSocketHandler *handlers.AgentWebSocketHandler, selkiesProxyHandler *handlers.SelkiesProxyHandler, jwtManager *auth.JWTManager, userDB *db.UserDB, database *db.Database, redisCache *cache.Cache, webhookSecret string, rateLimitEnabled bool, rateLimitRPM int) { // SECURITY: Create authentication middleware authMiddleware := auth.Middleware(jwtManager, userDB) adminMiddleware := auth.RequireRole("admin") @@ -906,6 +932,7 @@ func setupRoutes(router *gin.Engine, h *api.Handler, userHandler *handlers.UserH // Plugin system - using dedicated handler pluginHandler.RegisterRoutes(protected) + pluginMarketplaceHandler.RegisterRoutes(protected) // Installed applications management - using dedicated handler (admin only for management) applicationHandler.RegisterRoutes(protected) @@ -1050,27 +1077,27 @@ func setupRoutes(router *gin.Engine, h *api.Handler, userHandler *handlers.UserH protected.GET("/metrics", operatorMiddleware, h.GetMetrics) } - // v2.0 Agent self-service routes (require mTLS OR API key authentication, not JWT) - // These routes are for agents to register themselves and send heartbeats - // Authentication: mTLS (if configured) or API key fallback - // Rate limited to prevent brute-force attacks - agentRoutes := v1.Group("/agents") - agentRoutes.Use(agentRateLimit(globalRateLimiter, rateLimitEnabled, rateLimitRPM)) // Apply rate limiting first - agentRoutes.Use(agentAuth.RequireAuth()) // Then authentication - { - agentHandler.RegisterRoutes(agentRoutes) - } + // v2.0 Agent self-service routes (require mTLS OR API key authentication, not JWT) + // These routes are for agents to register themselves and send heartbeats + // Authentication: mTLS (if configured) or API key fallback + // Rate limited to prevent brute-force attacks + agentRoutes := v1.Group("/agents") + agentRoutes.Use(agentRateLimit(globalRateLimiter, rateLimitEnabled, rateLimitRPM)) // Apply rate limiting first + agentRoutes.Use(agentAuth.RequireAuth()) // Then authentication + { + agentHandler.RegisterRoutes(agentRoutes) + } - // v2.0 Agent WebSocket connections (require mTLS OR API key authentication, not JWT) - // Agents connect here to receive commands and send status updates - // Authentication: mTLS (if configured) or API key fallback - // Rate limited to prevent connection flooding - agentWSRoutes := v1.Group("") - agentWSRoutes.Use(agentRateLimit(globalRateLimiter, rateLimitEnabled, rateLimitRPM)) // Apply rate limiting first - agentWSRoutes.Use(agentAuth.RequireAuth()) // Then authentication - { - agentWebSocketHandler.RegisterRoutes(agentWSRoutes) - } + // v2.0 Agent WebSocket connections (require mTLS OR API key authentication, not JWT) + // Agents connect here to receive commands and send status updates + // Authentication: mTLS (if configured) or API key fallback + // Rate limited to prevent connection flooding + agentWSRoutes := v1.Group("") + agentWSRoutes.Use(agentRateLimit(globalRateLimiter, rateLimitEnabled, rateLimitRPM)) // Apply rate limiting first + agentWSRoutes.Use(agentAuth.RequireAuth()) // Then authentication + { + agentWebSocketHandler.RegisterRoutes(agentWSRoutes) + } } // WebSocket endpoints (require authentication) diff --git a/api/go.mod b/api/go.mod index 4da7751..66d7b94 100644 --- a/api/go.mod +++ b/api/go.mod @@ -1,8 +1,6 @@ module github.com/streamspace-dev/streamspace/api -go 1.24.0 - -toolchain go1.24.7 +go 1.25.0 require ( github.com/DATA-DOG/go-sqlmock v1.5.2 @@ -20,9 +18,9 @@ require ( github.com/redis/go-redis/v9 v9.16.0 github.com/robfig/cron/v3 v3.0.1 github.com/rs/zerolog v1.34.0 - github.com/stretchr/testify v1.10.0 - golang.org/x/crypto v0.45.0 - golang.org/x/oauth2 v0.28.0 + github.com/stretchr/testify v1.11.1 + golang.org/x/crypto v0.49.0 + golang.org/x/oauth2 v0.36.0 gopkg.in/yaml.v3 v3.0.1 k8s.io/api v0.34.2 k8s.io/apimachinery v0.34.2 @@ -44,7 +42,7 @@ require ( github.com/gabriel-vasile/mimetype v1.4.2 // indirect github.com/gin-contrib/sse v0.1.0 // indirect github.com/go-jose/go-jose/v4 v4.1.4 // indirect - github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/logr v1.4.3 // indirect github.com/go-openapi/jsonpointer v0.21.0 // indirect github.com/go-openapi/jsonreference v0.20.2 // indirect github.com/go-openapi/swag v0.23.0 // indirect @@ -70,6 +68,7 @@ require ( github.com/pelletier/go-toml/v2 v2.0.8 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/rogpeppe/go-internal v1.14.1 // indirect github.com/russellhaering/goxmldsig v1.6.0 // indirect github.com/spf13/pflag v1.0.6 // indirect github.com/stretchr/objx v0.5.2 // indirect @@ -80,12 +79,12 @@ require ( go.yaml.in/yaml/v2 v2.4.2 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect golang.org/x/arch v0.3.0 // indirect - golang.org/x/net v0.47.0 // indirect - golang.org/x/sys v0.38.0 // indirect - golang.org/x/term v0.37.0 // indirect - golang.org/x/text v0.31.0 // indirect - golang.org/x/time v0.9.0 // indirect - google.golang.org/protobuf v1.36.5 // indirect + golang.org/x/net v0.52.0 // indirect + golang.org/x/sys v0.42.0 // indirect + golang.org/x/term v0.41.0 // indirect + golang.org/x/text v0.35.0 // indirect + golang.org/x/time v0.15.0 // indirect + google.golang.org/protobuf v1.36.11 // indirect gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect k8s.io/klog/v2 v2.130.1 // indirect diff --git a/api/go.sum b/api/go.sum index 8549032..cf0e091 100644 --- a/api/go.sum +++ b/api/go.sum @@ -44,8 +44,8 @@ github.com/gin-gonic/gin v1.9.1 h1:4idEAncQnU5cB7BeOkPtxjfCSye0AAm1R0RVIqJ+Jmg= github.com/gin-gonic/gin v1.9.1/go.mod h1:hPrL7YrpYKXt5YId3A/Tnip5kqbEAP+KLuI3SUcPTeU= github.com/go-jose/go-jose/v4 v4.1.4 h1:moDMcTHmvE6Groj34emNPLs/qtYXRVcd6S7NHbHz3kA= github.com/go-jose/go-jose/v4 v4.1.4/go.mod h1:x4oUasVrzR7071A4TnHLGSPpNOm2a21K9Kf04k1rs08= -github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= -github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-openapi/jsonpointer v0.19.6/go.mod h1:osyAmYz/mB/C3I+WsTTSgw1ONzaLJoLCyoi6/zppojs= github.com/go-openapi/jsonpointer v0.21.0 h1:YgdVicSA9vH5RiHs9TZW5oyafXZFc6+2Vc1rr/O9oNQ= github.com/go-openapi/jsonpointer v0.21.0/go.mod h1:IUyH9l/+uyhIYQ/PXVA41Rexl+kOkAPDdXEYns6fzUY= @@ -145,8 +145,8 @@ github.com/redis/go-redis/v9 v9.16.0 h1:OotgqgLSRCmzfqChbQyG1PHC3tLNR89DG4jdOERS github.com/redis/go-redis/v9 v9.16.0/go.mod h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= -github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= -github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= +github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= +github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= github.com/rs/zerolog v1.34.0 h1:k43nTLIwcTVQAncfCw4KZ2VY6ukYoZaBPNOE8txlOeY= github.com/rs/zerolog v1.34.0/go.mod h1:bJsvje4Z08ROH4Nhs5iH600c3IkWhwp44iRc54W6wYQ= @@ -167,8 +167,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= -github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4dU= @@ -189,18 +189,18 @@ golang.org/x/arch v0.3.0/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.45.0 h1:jMBrvKuj23MTlT0bQEOBcAE0mjg8mK9RXFhRH6nyF3Q= -golang.org/x/crypto v0.45.0/go.mod h1:XTGrrkGJve7CYK7J8PEww4aY7gM3qMCElcJQ8n8JdX4= +golang.org/x/crypto v0.49.0 h1:+Ng2ULVvLHnJ/ZFEq4KdcDd/cfjrrjjNSXNzxg0Y4U4= +golang.org/x/crypto v0.49.0/go.mod h1:ErX4dUh2UM+CFYiXZRTcMpEcN8b/1gxEuv3nODoYtCA= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY= -golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU= -golang.org/x/oauth2 v0.28.0 h1:CrgCKl8PPAVtLnU3c+EDw6x11699EWlsDeWNWKdIOkc= -golang.org/x/oauth2 v0.28.0/go.mod h1:onh5ek6nERTohokkhCD/y2cV4Do3fxFHFuAejCkRWT8= +golang.org/x/net v0.52.0 h1:He/TN1l0e4mmR3QqHMT2Xab3Aj3L9qjbhRm78/6jrW0= +golang.org/x/net v0.52.0/go.mod h1:R1MAz7uMZxVMualyPXb+VaqGSa3LIaUqk0eEt3w36Sw= +golang.org/x/oauth2 v0.36.0 h1:peZ/1z27fi9hUOFCAZaHyrpWG5lwe0RJEEEeH0ThlIs= +golang.org/x/oauth2 v0.36.0/go.mod h1:YDBUJMTkDnJS+A4BP4eZBjCqtokkg1hODuPjwiGPO7Q= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -211,28 +211,28 @@ golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc= -golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= -golang.org/x/term v0.37.0 h1:8EGAD0qCmHYZg6J17DvsMy9/wJ7/D/4pV/wfnld5lTU= -golang.org/x/term v0.37.0/go.mod h1:5pB4lxRNYYVZuTLmy8oR2BH8dflOR+IbTYFD8fi3254= +golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= +golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= +golang.org/x/term v0.41.0 h1:QCgPso/Q3RTJx2Th4bDLqML4W6iJiaXFq2/ftQF13YU= +golang.org/x/term v0.41.0/go.mod h1:3pfBgksrReYfZ5lvYM0kSO0LIkAl4Yl2bXOkKP7Ec2A= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.31.0 h1:aC8ghyu4JhP8VojJ2lEHBnochRno1sgL6nEi9WGFGMM= -golang.org/x/text v0.31.0/go.mod h1:tKRAlv61yKIjGGHX/4tP1LTbc13YSec1pxVEWXzfoeM= -golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY= -golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8= +golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA= +golang.org/x/time v0.15.0 h1:bbrp8t3bGUeFOx08pvsMYRTCVSMk89u4tKbNOZbp88U= +golang.org/x/time v0.15.0/go.mod h1:Y4YMaQmXwGQZoFaVFk4YpCt4FLQMYKZe9oeV/f4MSno= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.38.0 h1:Hx2Xv8hISq8Lm16jvBZ2VQf+RLmbd7wVUsALibYI/IQ= -golang.org/x/tools v0.38.0/go.mod h1:yEsQ/d/YK8cjh0L6rZlY8tgtlKiBNTL14pGDJPJpYQs= +golang.org/x/tools v0.42.0 h1:uNgphsn75Tdz5Ji2q36v/nsFSfR/9BRFvqhGBaJGd5k= +golang.org/x/tools v0.42.0/go.mod h1:Ma6lCIwGZvHK6XtgbswSoWroEkhugApmsXyrUmBhfr0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= -google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= +google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/api/internal/db/database.go b/api/internal/db/database.go index 8807d91..27fbdd4 100644 --- a/api/internal/db/database.go +++ b/api/internal/db/database.go @@ -77,8 +77,8 @@ import ( "strings" "time" - "golang.org/x/crypto/bcrypt" _ "github.com/lib/pq" + "golang.org/x/crypto/bcrypt" ) // Config holds database configuration @@ -184,10 +184,10 @@ func NewDatabase(config Config) (*Database, error) { // Configure connection pool for optimal performance // These settings balance performance with resource usage - db.SetMaxOpenConns(25) // Maximum number of open connections to the database - db.SetMaxIdleConns(5) // Maximum number of connections in the idle connection pool - db.SetConnMaxLifetime(5 * time.Minute) // Maximum amount of time a connection may be reused (5 minutes) - db.SetConnMaxIdleTime(1 * time.Minute) // Maximum amount of time a connection may be idle (1 minute) + db.SetMaxOpenConns(25) // Maximum number of open connections to the database + db.SetMaxIdleConns(5) // Maximum number of connections in the idle connection pool + db.SetConnMaxLifetime(5 * time.Minute) // Maximum amount of time a connection may be reused (5 minutes) + db.SetConnMaxIdleTime(1 * time.Minute) // Maximum amount of time a connection may be idle (1 minute) // Test connection if err := db.Ping(); err != nil { @@ -772,6 +772,7 @@ func (d *Database) Migrate() error { version VARCHAR(50) NOT NULL, display_name VARCHAR(255), description TEXT, + source_path TEXT, category VARCHAR(100), plugin_type VARCHAR(50) DEFAULT 'extension', icon_url TEXT, @@ -789,6 +790,8 @@ func (d *Database) Migrate() error { `CREATE INDEX IF NOT EXISTS idx_catalog_plugins_category ON catalog_plugins(category)`, `CREATE INDEX IF NOT EXISTS idx_catalog_plugins_type ON catalog_plugins(plugin_type)`, `CREATE INDEX IF NOT EXISTS idx_catalog_plugins_category_rating ON catalog_plugins(category, avg_rating DESC)`, + `CREATE UNIQUE INDEX IF NOT EXISTS idx_catalog_plugins_repository_name ON catalog_plugins(repository_id, name)`, + `ALTER TABLE catalog_plugins ADD COLUMN IF NOT EXISTS source_path TEXT`, // Installed plugins (user-installed plugins) `CREATE TABLE IF NOT EXISTS installed_plugins ( diff --git a/api/internal/handlers/plugins.go b/api/internal/handlers/plugins.go index f596cf9..0a9f35d 100644 --- a/api/internal/handlers/plugins.go +++ b/api/internal/handlers/plugins.go @@ -48,21 +48,21 @@ // // Example Usage Flow: // -// 1. User browses catalog: -// GET /api/plugins/catalog?category=analytics&sort=popular +// 1. User browses catalog: +// GET /api/plugins/catalog?category=analytics&sort=popular // -// 2. User views plugin details: -// GET /api/plugins/catalog/42 -// (View count incremented async) +// 2. User views plugin details: +// GET /api/plugins/catalog/42 +// (View count incremented async) // -// 3. User installs plugin: -// POST /api/plugins/catalog/42/install -// Body: {"config": {"api_key": "..."}} -// (Plugin added to installed_plugins, install count incremented) +// 3. User installs plugin: +// POST /api/plugins/catalog/42/install +// Body: {"config": {"api_key": "..."}} +// (Plugin added to installed_plugins, install count incremented) // -// 4. User enables/disables plugin: -// POST /api/plugins/123/enable -// (Plugin enabled in database, runtime loads it on next restart/reload) +// 4. User enables/disables plugin: +// POST /api/plugins/123/enable +// (Plugin enabled in database, runtime loads it on next restart/reload) package handlers import ( @@ -83,6 +83,7 @@ import ( "github.com/gin-gonic/gin" "github.com/streamspace-dev/streamspace/api/internal/db" "github.com/streamspace-dev/streamspace/api/internal/models" + internalplugins "github.com/streamspace-dev/streamspace/api/internal/plugins" "github.com/streamspace-dev/streamspace/api/internal/validator" ) @@ -100,6 +101,8 @@ type PluginHandler struct { db *db.Database // pluginDir is the directory where plugins are installed. pluginDir string + // runtime manages immediate load/unload behavior when available. + runtime *internalplugins.RuntimeV2 } // NewPluginHandler creates a new plugin handler. @@ -115,27 +118,37 @@ type PluginHandler struct { // // handler := NewPluginHandler(db, "/plugins") // handler.RegisterRoutes(router.Group("/api")) -func NewPluginHandler(database *db.Database, pluginDir string) *PluginHandler { +func NewPluginHandler(database *db.Database, pluginDir string, runtime ...*internalplugins.RuntimeV2) *PluginHandler { // Create plugins directory if it doesn't exist if pluginDir != "" { if err := os.MkdirAll(pluginDir, 0755); err != nil { log.Printf("[PluginHandler] Warning: Failed to create plugins directory: %v", err) } } + + var pluginRuntime *internalplugins.RuntimeV2 + if len(runtime) > 0 { + pluginRuntime = runtime[0] + } return &PluginHandler{ db: database, pluginDir: pluginDir, + runtime: pluginRuntime, } } // downloadPluginFromRepository downloads a plugin from its repository to the local plugins directory. -// It attempts to download as a .tar.gz archive first, falling back to individual files. -func (h *PluginHandler) downloadPluginFromRepository(pluginName string, repoURL string) error { +// It prefers a packaged archive and only falls back to manifest-driven asset fetches. +func (h *PluginHandler) downloadPluginFromRepository(pluginName, repoURL, sourcePath string) error { if h.pluginDir == "" { log.Printf("[PluginHandler] No plugins directory configured, skipping download") return nil } + if sourcePath == "" { + sourcePath = pluginName + } + pluginPath := filepath.Join(h.pluginDir, pluginName) // Create plugin directory @@ -144,15 +157,15 @@ func (h *PluginHandler) downloadPluginFromRepository(pluginName string, repoURL } // Try to download as archive first - archiveURL := fmt.Sprintf("%s/%s/plugin.tar.gz", strings.TrimSuffix(repoURL, "/"), pluginName) + archiveURL := fmt.Sprintf("%s/%s/plugin.tar.gz", strings.TrimSuffix(repoURL, "/"), sourcePath) if err := h.downloadAndExtractArchive(archiveURL, pluginPath); err == nil { log.Printf("[PluginHandler] Downloaded plugin %s as archive", pluginName) return nil } - // Fallback: download individual files - log.Printf("[PluginHandler] Archive not available, downloading individual files for %s", pluginName) - return h.downloadPluginFiles(pluginName, repoURL, pluginPath) + // Fallback: download files referenced by manifest.json. + log.Printf("[PluginHandler] Archive not available, downloading runtime assets for %s", pluginName) + return h.downloadPluginFiles(pluginName, sourcePath, repoURL, pluginPath) } // downloadAndExtractArchive downloads a .tar.gz archive and extracts it to the target directory. @@ -214,31 +227,87 @@ func (h *PluginHandler) downloadAndExtractArchive(url string, targetDir string) return nil } -// downloadPluginFiles downloads individual plugin files from the repository. -func (h *PluginHandler) downloadPluginFiles(pluginName string, repoURL string, targetDir string) error { - // Files to download - files := []string{"plugin.json", "manifest.json", "README.md", "LICENSE"} +// downloadPluginFiles downloads manifest-driven plugin assets from the repository. +func (h *PluginHandler) downloadPluginFiles(pluginName, sourcePath, repoURL, targetDir string) error { + baseURL := strings.TrimSuffix(repoURL, "/") + manifestURL := fmt.Sprintf("%s/%s/manifest.json", baseURL, sourcePath) + manifestPath := filepath.Join(targetDir, "manifest.json") + if err := h.downloadFile(manifestURL, manifestPath); err != nil { + return fmt.Errorf("failed to download manifest.json: %w", err) + } - var downloadedAny bool - for _, file := range files { - fileURL := fmt.Sprintf("%s/%s/%s", strings.TrimSuffix(repoURL, "/"), pluginName, file) - targetPath := filepath.Join(targetDir, file) + manifestBytes, err := os.ReadFile(manifestPath) + if err != nil { + return fmt.Errorf("failed to read manifest.json: %w", err) + } - if err := h.downloadFile(fileURL, targetPath); err != nil { - // Only log error for required files - if file == "plugin.json" || file == "manifest.json" { - log.Printf("[PluginHandler] Warning: Failed to download %s: %v", file, err) - } + var manifest models.PluginManifest + if err := json.Unmarshal(manifestBytes, &manifest); err != nil { + return fmt.Errorf("failed to parse manifest.json: %w", err) + } + + _ = h.downloadFile(fmt.Sprintf("%s/%s/README.md", baseURL, sourcePath), filepath.Join(targetDir, "README.md")) + _ = h.downloadFile(fmt.Sprintf("%s/%s/LICENSE", baseURL, sourcePath), filepath.Join(targetDir, "LICENSE")) + if manifest.Icon != "" { + _ = h.downloadFile( + fmt.Sprintf("%s/%s/%s", baseURL, sourcePath, manifest.Icon), + filepath.Join(targetDir, filepath.Base(manifest.Icon)), + ) + } + + entrypoints := handlerEntrypoints(manifest) + if !handlerHasSharedObjectEntrypoint(entrypoints) { + return fmt.Errorf( + "plugin %s does not expose a runtime-loadable .so entrypoint; publish plugin.tar.gz or a bundle with .so assets", + pluginName, + ) + } + + for _, entry := range entrypoints { + entryURL := fmt.Sprintf("%s/%s/%s", baseURL, sourcePath, entry) + targetPath := filepath.Join(targetDir, filepath.Base(entry)) + if err := h.downloadFile(entryURL, targetPath); err != nil { + return fmt.Errorf("failed to download plugin entrypoint %s: %w", entry, err) + } + } + + return nil +} + +func handlerEntrypoints(manifest models.PluginManifest) []string { + entrypoints := []string{ + manifest.Entrypoints.Main, + manifest.Entrypoints.API, + manifest.Entrypoints.UI, + manifest.Entrypoints.Webhook, + manifest.Entrypoints.CLI, + } + + seen := make(map[string]struct{}, len(entrypoints)) + unique := make([]string, 0, len(entrypoints)) + for _, entry := range entrypoints { + entry = strings.TrimSpace(entry) + if entry == "" { + continue + } + if _, ok := seen[entry]; ok { continue } - downloadedAny = true + seen[entry] = struct{}{} + unique = append(unique, entry) } - if !downloadedAny { - return fmt.Errorf("failed to download any plugin files") + return unique +} + +func handlerHasSharedObjectEntrypoint(entrypoints []string) bool { + for _, entry := range entrypoints { + if strings.HasSuffix(entry, ".so") { + return true + } } - return nil + return false } // downloadFile downloads a single file from URL to the target path. @@ -635,11 +704,11 @@ func (h *PluginHandler) RatePlugin(c *gin.Context) { // } // // Behavior: -// 1. Fetches plugin details from catalog_plugins -// 2. Checks if already installed (returns 409 if yes) -// 3. Inserts into installed_plugins with enabled=true -// 4. Increments install count asynchronously -// 5. Updates plugin_stats table +// 1. Fetches plugin details from catalog_plugins +// 2. Checks if already installed (returns 409 if yes) +// 3. Inserts into installed_plugins with enabled=true +// 4. Increments install count asynchronously +// 5. Updates plugin_stats table // // Side Effects: // - Plugin install count incremented (async, non-blocking) @@ -686,15 +755,16 @@ func (h *PluginHandler) InstallPlugin(c *gin.Context) { var catalogPlugin models.CatalogPlugin var manifestJSON []byte var repoURL sql.NullString + var sourcePath sql.NullString err := h.db.DB().QueryRow(` - SELECT cp.id, cp.name, cp.version, cp.display_name, cp.description, cp.plugin_type, cp.icon_url, cp.manifest, r.url + SELECT cp.id, cp.name, cp.version, cp.display_name, cp.description, cp.plugin_type, cp.icon_url, cp.manifest, r.url, cp.source_path FROM catalog_plugins cp LEFT JOIN repositories r ON cp.repository_id = r.id WHERE cp.id = $1 `, catalogPluginID).Scan( &catalogPlugin.ID, &catalogPlugin.Name, &catalogPlugin.Version, &catalogPlugin.DisplayName, &catalogPlugin.Description, - &catalogPlugin.PluginType, &catalogPlugin.IconURL, &manifestJSON, &repoURL, + &catalogPlugin.PluginType, &catalogPlugin.IconURL, &manifestJSON, &repoURL, &sourcePath, ) if err == sql.ErrNoRows { @@ -738,7 +808,7 @@ func (h *PluginHandler) InstallPlugin(c *gin.Context) { // Download plugin files to local plugins directory if repoURL.Valid && h.pluginDir != "" { go func() { - if err := h.downloadPluginFromRepository(catalogPlugin.Name, repoURL.String); err != nil { + if err := h.downloadPluginFromRepository(catalogPlugin.Name, repoURL.String, sourcePath.String); err != nil { log.Printf("[PluginHandler] Warning: Failed to download plugin files for %s: %v", catalogPlugin.Name, err) } else { log.Printf("[PluginHandler] Plugin files downloaded to %s/%s", h.pluginDir, catalogPlugin.Name) @@ -764,10 +834,31 @@ func (h *PluginHandler) InstallPlugin(c *gin.Context) { `, catalogPlugin.ID, time.Now()) }() - c.JSON(http.StatusCreated, gin.H{ + var warning string + if h.runtime != nil { + var config map[string]interface{} + if len(req.Config) > 0 { + if err := json.Unmarshal(req.Config, &config); err != nil { + warning = "Plugin installed, but config could not be parsed for runtime load." + } + } + if config == nil { + config = make(map[string]interface{}) + } + if err := h.runtime.LoadPluginWithConfig(c.Request.Context(), catalogPlugin.Name, catalogPlugin.Version, config, catalogPlugin.Manifest); err != nil { + warning = err.Error() + } + } + + response := gin.H{ "message": "Plugin installed successfully", "pluginId": installedID, - }) + } + if warning != "" { + response["warning"] = warning + } + + c.JSON(http.StatusCreated, response) } // ListInstalledPlugins lists all installed plugins. @@ -1031,7 +1122,26 @@ func (h *PluginHandler) UpdateInstalledPlugin(c *gin.Context) { return } - c.JSON(http.StatusOK, gin.H{"message": "Plugin updated successfully"}) + var warning string + if h.runtime != nil { + var pluginName string + if err := h.db.DB().QueryRow(`SELECT name FROM installed_plugins WHERE id = $1`, id).Scan(&pluginName); err == nil { + if req.Enabled != nil && !*req.Enabled { + if err := h.runtime.UnloadPlugin(c.Request.Context(), pluginName); err != nil { + warning = err.Error() + } + } else if err := h.runtime.ReloadPlugin(c.Request.Context(), pluginName); err != nil { + warning = err.Error() + } + } + } + + response := gin.H{"message": "Plugin updated successfully"} + if warning != "" { + response["warning"] = warning + } + + c.JSON(http.StatusOK, response) } // UninstallPlugin removes a plugin from the system. @@ -1084,6 +1194,13 @@ func (h *PluginHandler) UninstallPlugin(c *gin.Context) { return } + warning := "" + if h.runtime != nil { + if err := h.runtime.UnloadPlugin(c.Request.Context(), pluginName); err != nil { + warning = err.Error() + } + } + // Remove plugin files from plugins directory if h.pluginDir != "" && pluginName != "" { pluginPath := filepath.Join(h.pluginDir, pluginName) @@ -1094,7 +1211,12 @@ func (h *PluginHandler) UninstallPlugin(c *gin.Context) { } } - c.JSON(http.StatusOK, gin.H{"message": "Plugin uninstalled successfully"}) + response := gin.H{"message": "Plugin uninstalled successfully"} + if warning != "" { + response["warning"] = warning + } + + c.JSON(http.StatusOK, response) } // EnablePlugin enables an installed plugin. @@ -1119,6 +1241,16 @@ func (h *PluginHandler) UninstallPlugin(c *gin.Context) { func (h *PluginHandler) EnablePlugin(c *gin.Context) { id := c.Param("id") + var pluginName string + if err := h.db.DB().QueryRow(`SELECT name FROM installed_plugins WHERE id = $1`, id).Scan(&pluginName); err != nil { + if err == sql.ErrNoRows { + c.JSON(http.StatusNotFound, gin.H{"error": "Plugin not found"}) + return + } + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to fetch plugin", "details": err.Error()}) + return + } + result, err := h.db.DB().Exec(` UPDATE installed_plugins SET enabled = true, updated_at = NOW() @@ -1136,7 +1268,14 @@ func (h *PluginHandler) EnablePlugin(c *gin.Context) { return } - c.JSON(http.StatusOK, gin.H{"message": "Plugin enabled successfully"}) + response := gin.H{"message": "Plugin enabled successfully"} + if h.runtime != nil { + if err := h.runtime.LoadPluginByName(c.Request.Context(), pluginName); err != nil { + response["warning"] = err.Error() + } + } + + c.JSON(http.StatusOK, response) } // DisablePlugin disables an installed plugin. @@ -1161,6 +1300,16 @@ func (h *PluginHandler) EnablePlugin(c *gin.Context) { func (h *PluginHandler) DisablePlugin(c *gin.Context) { id := c.Param("id") + var pluginName string + if err := h.db.DB().QueryRow(`SELECT name FROM installed_plugins WHERE id = $1`, id).Scan(&pluginName); err != nil { + if err == sql.ErrNoRows { + c.JSON(http.StatusNotFound, gin.H{"error": "Plugin not found"}) + return + } + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to fetch plugin", "details": err.Error()}) + return + } + result, err := h.db.DB().Exec(` UPDATE installed_plugins SET enabled = false, updated_at = NOW() @@ -1178,5 +1327,12 @@ func (h *PluginHandler) DisablePlugin(c *gin.Context) { return } - c.JSON(http.StatusOK, gin.H{"message": "Plugin disabled successfully"}) + response := gin.H{"message": "Plugin disabled successfully"} + if h.runtime != nil { + if err := h.runtime.UnloadPlugin(c.Request.Context(), pluginName); err != nil { + response["warning"] = err.Error() + } + } + + c.JSON(http.StatusOK, response) } diff --git a/api/internal/plugins/api_registry.go b/api/internal/plugins/api_registry.go index 7abef00..176e331 100644 --- a/api/internal/plugins/api_registry.go +++ b/api/internal/plugins/api_registry.go @@ -493,7 +493,7 @@ type PluginAPI struct { // // Creates a scoped API interface for a specific plugin, with automatic // namespace isolation. This is called by the plugin runtime during -//initialization, not by plugins directly. +// initialization, not by plugins directly. // // Parameters: // - registry: The global API registry @@ -562,7 +562,37 @@ type EndpointOptions struct { // Permissions: []string{"plugin.slack.send"}, // Description: "Send a Slack message", // }) -func (pa *PluginAPI) RegisterEndpoint(opts EndpointOptions) error { +func (pa *PluginAPI) RegisterEndpoint(args ...interface{}) error { + var opts EndpointOptions + switch len(args) { + case 1: + var ok bool + opts, ok = args[0].(EndpointOptions) + if !ok { + return fmt.Errorf("RegisterEndpoint requires EndpointOptions or method/path/handler") + } + case 3: + method, ok := args[0].(string) + if !ok { + return fmt.Errorf("RegisterEndpoint method must be a string") + } + path, ok := args[1].(string) + if !ok { + return fmt.Errorf("RegisterEndpoint path must be a string") + } + handler, ok := args[2].(gin.HandlerFunc) + if !ok { + return fmt.Errorf("RegisterEndpoint handler must be a gin.HandlerFunc") + } + opts = EndpointOptions{ + Method: method, + Path: path, + Handler: handler, + } + default: + return fmt.Errorf("RegisterEndpoint requires EndpointOptions or method/path/handler") + } + // Ensure path starts with / (normalize input) if len(opts.Path) == 0 || opts.Path[0] != '/' { opts.Path = "/" + opts.Path diff --git a/api/internal/plugins/database.go b/api/internal/plugins/database.go index 278b90b..fb0a0b3 100644 --- a/api/internal/plugins/database.go +++ b/api/internal/plugins/database.go @@ -186,8 +186,11 @@ package plugins import ( + "bytes" "database/sql" "fmt" + "sort" + "strings" "github.com/streamspace-dev/streamspace/api/internal/db" ) @@ -447,6 +450,45 @@ func (pd *PluginDatabase) QueryRow(query string, args ...interface{}) *sql.Row { return pd.db.DB().QueryRow(query, args...) } +// QueryInt is a convenience helper for scalar integer queries such as COUNT(*). +func (pd *PluginDatabase) QueryInt(query string, args ...interface{}) (int, error) { + var value int + err := pd.QueryRow(query, args...).Scan(&value) + return value, err +} + +// Insert inserts a simple key/value payload into the specified table. +func (pd *PluginDatabase) Insert(tableName string, values map[string]interface{}) error { + if len(values) == 0 { + return fmt.Errorf("no values provided for insert into %s", tableName) + } + + cols := make([]string, 0, len(values)) + for col := range values { + cols = append(cols, col) + } + sort.Strings(cols) + + args := make([]interface{}, 0, len(cols)) + placeholders := make([]string, 0, len(cols)) + for i, col := range cols { + args = append(args, values[col]) + placeholders = append(placeholders, fmt.Sprintf("$%d", i+1)) + } + + var query bytes.Buffer + query.WriteString("INSERT INTO ") + query.WriteString(tableName) + query.WriteString(" (") + query.WriteString(strings.Join(cols, ", ")) + query.WriteString(") VALUES (") + query.WriteString(strings.Join(placeholders, ", ")) + query.WriteString(")") + + _, err := pd.Exec(query.String(), args...) + return err +} + // Transaction executes a function within a database transaction. // // This method provides ACID guarantees for multiple SQL operations, diff --git a/api/internal/plugins/discovery.go b/api/internal/plugins/discovery.go index 6d0f130..072a63e 100644 --- a/api/internal/plugins/discovery.go +++ b/api/internal/plugins/discovery.go @@ -203,6 +203,7 @@ package plugins import ( + "encoding/json" "fmt" "log" "os" @@ -242,9 +243,9 @@ import ( // // Load specific plugin // handler, _ := discovery.LoadPlugin("analytics") type PluginDiscovery struct { - pluginDirs []string - builtinPlugins map[string]PluginFactory - dynamicPlugins map[string]*plugin.Plugin + pluginDirs []string + builtinPlugins map[string]PluginFactory + dynamicPlugins map[string]*plugin.Plugin } // PluginFactory is a function that creates a new plugin instance @@ -255,8 +256,8 @@ func NewPluginDiscovery(pluginDirs ...string) *PluginDiscovery { if len(pluginDirs) == 0 { // Default plugin directories pluginDirs = []string{ - "/plugins", // Container path - "./plugins", // Local development + "/plugins", // Container path + "./plugins", // Local development "/usr/local/share/streamspace/plugins", // System install } } @@ -414,6 +415,45 @@ func (pd *PluginDiscovery) findPluginFile(name string) string { return subPath } } + + if manifestPath := filepath.Join(dir, name, "manifest.json"); pd.hasFile(manifestPath) { + if entrypointPath := pd.findPluginFromManifest(manifestPath); entrypointPath != "" { + return entrypointPath + } + } + } + + return "" +} + +func (pd *PluginDiscovery) hasFile(path string) bool { + info, err := os.Stat(path) + return err == nil && !info.IsDir() +} + +func (pd *PluginDiscovery) findPluginFromManifest(manifestPath string) string { + manifestBytes, err := os.ReadFile(manifestPath) + if err != nil { + return "" + } + + var manifest struct { + Entrypoints struct { + Main string `json:"main"` + } `json:"entrypoints"` + } + if err := json.Unmarshal(manifestBytes, &manifest); err != nil { + return "" + } + + entrypoint := strings.TrimSpace(manifest.Entrypoints.Main) + if entrypoint == "" || !strings.HasSuffix(entrypoint, ".so") { + return "" + } + + path := filepath.Join(filepath.Dir(manifestPath), filepath.Clean(entrypoint)) + if pd.hasFile(path) { + return path } return "" diff --git a/api/internal/plugins/discovery_test.go b/api/internal/plugins/discovery_test.go new file mode 100644 index 0000000..ce904d1 --- /dev/null +++ b/api/internal/plugins/discovery_test.go @@ -0,0 +1,40 @@ +package plugins + +import ( + "os" + "path/filepath" + "testing" +) + +func TestFindPluginFileUsesManifestEntrypointWhenBundleUsesCustomSharedObjectName(t *testing.T) { + tempDir := t.TempDir() + pluginDir := filepath.Join(tempDir, "streamspace-example") + if err := os.MkdirAll(pluginDir, 0o755); err != nil { + t.Fatalf("mkdir plugin dir: %v", err) + } + + manifest := `{ + "name": "streamspace-example", + "entrypoints": { + "main": "bundle/plugin-runtime.so" + } +}` + if err := os.WriteFile(filepath.Join(pluginDir, "manifest.json"), []byte(manifest), 0o644); err != nil { + t.Fatalf("write manifest: %v", err) + } + + bundleDir := filepath.Join(pluginDir, "bundle") + if err := os.MkdirAll(bundleDir, 0o755); err != nil { + t.Fatalf("mkdir bundle dir: %v", err) + } + + expectedPath := filepath.Join(bundleDir, "plugin-runtime.so") + if err := os.WriteFile(expectedPath, []byte(""), 0o644); err != nil { + t.Fatalf("write shared object placeholder: %v", err) + } + + discovery := NewPluginDiscovery(tempDir) + if actual := discovery.findPluginFile("streamspace-example"); actual != expectedPath { + t.Fatalf("expected %s, got %s", expectedPath, actual) + } +} diff --git a/api/internal/plugins/logger.go b/api/internal/plugins/logger.go index 4774a1a..d0c0a6b 100644 --- a/api/internal/plugins/logger.go +++ b/api/internal/plugins/logger.go @@ -138,57 +138,70 @@ func (pl *PluginLogger) log(level string, message string, data map[string]interf // // Use for detailed diagnostic information during development. // Typically disabled in production. -func (pl *PluginLogger) Debug(message string, data ...map[string]interface{}) { - var d map[string]interface{} - if len(data) > 0 { - d = data[0] - } - pl.log("DEBUG", message, d) +func (pl *PluginLogger) Debug(message string, data ...interface{}) { + pl.log("DEBUG", message, normalizeLogData(data...)) } // Info logs an informational message. // // Use for general operational messages (startup, shutdown, state changes). -func (pl *PluginLogger) Info(message string, data ...map[string]interface{}) { - var d map[string]interface{} - if len(data) > 0 { - d = data[0] - } - pl.log("INFO", message, d) +func (pl *PluginLogger) Info(message string, data ...interface{}) { + pl.log("INFO", message, normalizeLogData(data...)) } // Warn logs a warning message. // // Use for potentially problematic situations that don't prevent operation. -func (pl *PluginLogger) Warn(message string, data ...map[string]interface{}) { - var d map[string]interface{} - if len(data) > 0 { - d = data[0] - } - pl.log("WARN", message, d) +func (pl *PluginLogger) Warn(message string, data ...interface{}) { + pl.log("WARN", message, normalizeLogData(data...)) } // Error logs an error message. // // Use for error conditions that are handled gracefully. -func (pl *PluginLogger) Error(message string, data ...map[string]interface{}) { - var d map[string]interface{} - if len(data) > 0 { - d = data[0] - } - pl.log("ERROR", message, d) +func (pl *PluginLogger) Error(message string, data ...interface{}) { + pl.log("ERROR", message, normalizeLogData(data...)) } // Fatal logs a fatal error message. // // NOTE: Unlike log.Fatal(), this does NOT exit the process. // It only logs at FATAL level to indicate critical plugin errors. -func (pl *PluginLogger) Fatal(message string, data ...map[string]interface{}) { - var d map[string]interface{} - if len(data) > 0 { - d = data[0] +func (pl *PluginLogger) Fatal(message string, data ...interface{}) { + pl.log("FATAL", message, normalizeLogData(data...)) +} + +// normalizeLogData accepts either a single structured map or alternating +// key/value pairs to keep plugin logging ergonomic across older plugin code. +func normalizeLogData(args ...interface{}) map[string]interface{} { + if len(args) == 0 { + return nil + } + + if len(args) == 1 { + if fields, ok := args[0].(map[string]interface{}); ok { + return fields + } + return map[string]interface{}{"value": args[0]} + } + + fields := make(map[string]interface{}, (len(args)+1)/2) + for i := 0; i < len(args); i += 2 { + key := "arg" + if s, ok := args[i].(string); ok && s != "" { + key = s + } else if i > 0 { + key = "arg" + } + + if i+1 < len(args) { + fields[key] = args[i+1] + } else { + fields[key] = nil + } } - pl.log("FATAL", message, d) + + return fields } // WithField returns a logger with a pre-configured field. diff --git a/api/internal/plugins/marketplace.go b/api/internal/plugins/marketplace.go index feb66d4..84c99f6 100644 --- a/api/internal/plugins/marketplace.go +++ b/api/internal/plugins/marketplace.go @@ -153,6 +153,7 @@ import ( "github.com/streamspace-dev/streamspace/api/internal/db" "github.com/streamspace-dev/streamspace/api/internal/models" + "gopkg.in/yaml.v3" ) // PluginMarketplace manages plugin discovery, download, and installation. @@ -202,18 +203,36 @@ type PluginMarketplace struct { // This combination allows the UI to show "Install", "Installed", or "Update Available" // buttons dynamically without extra database queries. type MarketplacePlugin struct { - Name string `json:"name"` - Version string `json:"version"` - DisplayName string `json:"displayName"` - Description string `json:"description"` - Author string `json:"author"` - Category string `json:"category"` - Tags []string `json:"tags"` - IconURL string `json:"iconUrl"` - Manifest models.PluginManifest `json:"manifest"` - DownloadURL string `json:"downloadUrl"` - Installed bool `json:"installed"` - Enabled bool `json:"enabled"` + Name string `json:"name"` + Version string `json:"version"` + DisplayName string `json:"displayName"` + Description string `json:"description"` + Author string `json:"author"` + Category string `json:"category"` + Tags []string `json:"tags"` + IconURL string `json:"iconUrl"` + Manifest models.PluginManifest `json:"manifest"` + DownloadURL string `json:"downloadUrl"` + Path string `json:"path"` + Installed bool `json:"installed"` + Enabled bool `json:"enabled"` +} + +type pluginCatalogYAML struct { + Spec struct { + Plugins []struct { + Name string `yaml:"name"` + Version string `yaml:"version"` + DisplayName string `yaml:"displayName"` + Description string `yaml:"description"` + Author string `yaml:"author"` + Category string `yaml:"category"` + Type string `yaml:"type"` + Tags []string `yaml:"tags"` + Path string `yaml:"path"` + Icon string `yaml:"icon"` + } `yaml:"plugins"` + } `yaml:"spec"` } // NewPluginMarketplace creates a new plugin marketplace instance. @@ -339,22 +358,9 @@ func (m *PluginMarketplace) SyncCatalog(ctx context.Context) error { log.Println("[Plugin Marketplace] Syncing plugin catalog from repository...") - // Fetch catalog from repository - catalogURL := fmt.Sprintf("%s/catalog.json", m.repositoryURL) - resp, err := http.Get(catalogURL) + plugins, err := m.fetchCatalog(ctx) if err != nil { - return fmt.Errorf("failed to fetch plugin catalog: %w", err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("failed to fetch catalog: HTTP %d", resp.StatusCode) - } - - // Parse catalog - var plugins []*MarketplacePlugin - if err := json.NewDecoder(resp.Body).Decode(&plugins); err != nil { - return fmt.Errorf("failed to parse plugin catalog: %w", err) + return err } // Update local cache @@ -645,7 +651,8 @@ func (m *PluginMarketplace) UninstallPlugin(ctx context.Context, name string) er // - Smaller bandwidth (gzip compression) // // **Example Archive URL**: -// https://github.com/JoshuaAFerguson/streamspace-plugins/releases/download/v1.2.3/streamspace-analytics.tar.gz +// +// https://github.com/JoshuaAFerguson/streamspace-plugins/releases/download/v1.2.3/streamspace-analytics.tar.gz // // **Archive Contents**: // @@ -694,11 +701,9 @@ func (m *PluginMarketplace) downloadPlugin(ctx context.Context, plugin *Marketpl return fmt.Errorf("failed to create plugin directory: %w", err) } - // Download plugin archive downloadURL := plugin.DownloadURL if downloadURL == "" { - // Default: GitHub raw content - downloadURL = fmt.Sprintf("%s/%s/plugin.tar.gz", m.repositoryURL, plugin.Name) + return m.downloadPluginFiles(plugin, pluginPath) } resp, err := http.Get(downloadURL) @@ -708,6 +713,9 @@ func (m *PluginMarketplace) downloadPlugin(ctx context.Context, plugin *Marketpl defer resp.Body.Close() if resp.StatusCode != http.StatusOK { + if strings.HasSuffix(downloadURL, ".tar.gz") || strings.HasSuffix(downloadURL, ".tgz") { + return m.downloadPluginFiles(plugin, pluginPath) + } return fmt.Errorf("download failed: HTTP %d", resp.StatusCode) } @@ -718,7 +726,7 @@ func (m *PluginMarketplace) downloadPlugin(ctx context.Context, plugin *Marketpl } } else { // Fallback: Download individual files - if err := m.downloadPluginFiles(plugin.Name, pluginPath); err != nil { + if err := m.downloadPluginFiles(plugin, pluginPath); err != nil { return fmt.Errorf("failed to download plugin files: %w", err) } } @@ -774,29 +782,198 @@ func (m *PluginMarketplace) downloadPlugin(ctx context.Context, plugin *Marketpl // - pluginPath: Local directory to save files // // Returns error if manifest.json download fails, nil otherwise. -func (m *PluginMarketplace) downloadPluginFiles(pluginName, pluginPath string) error { +func (m *PluginMarketplace) downloadPluginFiles(plugin *MarketplacePlugin, pluginPath string) error { + pluginSourcePath := plugin.Path + if pluginSourcePath == "" { + pluginSourcePath = plugin.Name + } + // Download manifest.json - manifestURL := fmt.Sprintf("%s/%s/manifest.json", m.repositoryURL, pluginName) - if err := m.downloadFile(manifestURL, filepath.Join(pluginPath, "manifest.json")); err != nil { + manifestURL := fmt.Sprintf("%s/%s/manifest.json", m.repositoryURL, pluginSourcePath) + manifestPath := filepath.Join(pluginPath, "manifest.json") + if err := m.downloadFile(manifestURL, manifestPath); err != nil { + return err + } + + manifestBytes, err := os.ReadFile(manifestPath) + if err != nil { return err } + var manifest models.PluginManifest + if err := json.Unmarshal(manifestBytes, &manifest); err != nil { + return fmt.Errorf("failed to parse manifest.json for %s: %w", plugin.Name, err) + } + // Download README.md - readmeURL := fmt.Sprintf("%s/%s/README.md", m.repositoryURL, pluginName) + readmeURL := fmt.Sprintf("%s/%s/README.md", m.repositoryURL, pluginSourcePath) _ = m.downloadFile(readmeURL, filepath.Join(pluginPath, "README.md")) // Optional, ignore errors - // Download plugin code (could be .go, .js, etc.) - // Try multiple extensions - for _, ext := range []string{".go", ".js", ".py", "_plugin.go"} { - codeURL := fmt.Sprintf("%s/%s/%s%s", m.repositoryURL, pluginName, pluginName, ext) - if err := m.downloadFile(codeURL, filepath.Join(pluginPath, pluginName+ext)); err == nil { - break // Success + if manifest.Icon != "" { + iconURL := fmt.Sprintf("%s/%s/%s", m.repositoryURL, pluginSourcePath, manifest.Icon) + _ = m.downloadFile(iconURL, filepath.Join(pluginPath, filepath.Base(manifest.Icon))) + } + + entrypoints := uniqueEntrypoints(manifest) + if !hasSharedObjectEntrypoint(entrypoints) { + return fmt.Errorf( + "plugin %s does not expose a runtime-loadable .so entrypoint; publish plugin.tar.gz or a bundle with .so assets", + plugin.Name, + ) + } + + for _, entry := range entrypoints { + entryURL := fmt.Sprintf("%s/%s/%s", m.repositoryURL, pluginSourcePath, entry) + if err := m.downloadFile(entryURL, filepath.Join(pluginPath, filepath.Base(entry))); err != nil { + return fmt.Errorf("failed to download plugin entrypoint %s: %w", entry, err) } } return nil } +func (m *PluginMarketplace) fetchCatalog(ctx context.Context) ([]*MarketplacePlugin, error) { + yamlURL := fmt.Sprintf("%s/catalog.yaml", m.repositoryURL) + resp, err := http.Get(yamlURL) + if err == nil && resp != nil { + defer resp.Body.Close() + if resp.StatusCode == http.StatusOK { + var catalog pluginCatalogYAML + if err := yaml.NewDecoder(resp.Body).Decode(&catalog); err != nil { + return nil, fmt.Errorf("failed to parse plugin catalog yaml: %w", err) + } + + plugins := make([]*MarketplacePlugin, 0, len(catalog.Spec.Plugins)) + for _, item := range catalog.Spec.Plugins { + plugin := &MarketplacePlugin{ + Name: item.Name, + Version: item.Version, + DisplayName: item.DisplayName, + Description: item.Description, + Author: item.Author, + Category: item.Category, + Tags: item.Tags, + Path: item.Path, + } + + if plugin.Path == "" { + plugin.Path = plugin.Name + } + + manifest, manifestErr := m.fetchManifest(ctx, plugin.Path) + if manifestErr != nil { + log.Printf("[Plugin Marketplace] Warning: Failed to fetch manifest for %s: %v", plugin.Name, manifestErr) + } else { + plugin.Manifest = manifest + if plugin.Version == "" { + plugin.Version = manifest.Version + } + if plugin.DisplayName == "" { + plugin.DisplayName = manifest.DisplayName + } + if plugin.Description == "" { + plugin.Description = manifest.Description + } + if plugin.Author == "" { + plugin.Author = manifest.Author + } + if plugin.Category == "" { + plugin.Category = manifest.Category + } + if len(plugin.Tags) == 0 { + plugin.Tags = manifest.Tags + } + if plugin.IconURL == "" && manifest.Icon != "" { + plugin.IconURL = fmt.Sprintf("%s/%s/%s", m.repositoryURL, plugin.Path, manifest.Icon) + } + } + + plugins = append(plugins, plugin) + } + + return plugins, nil + } + } + + catalogURL := fmt.Sprintf("%s/catalog.json", m.repositoryURL) + resp, err = http.Get(catalogURL) + if err != nil { + return nil, fmt.Errorf("failed to fetch plugin catalog: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("failed to fetch catalog: HTTP %d", resp.StatusCode) + } + + var plugins []*MarketplacePlugin + if err := json.NewDecoder(resp.Body).Decode(&plugins); err != nil { + return nil, fmt.Errorf("failed to parse plugin catalog json: %w", err) + } + + return plugins, nil +} + +func (m *PluginMarketplace) fetchManifest(ctx context.Context, pluginPath string) (models.PluginManifest, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/%s/manifest.json", m.repositoryURL, pluginPath), nil) + if err != nil { + return models.PluginManifest{}, err + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return models.PluginManifest{}, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return models.PluginManifest{}, fmt.Errorf("manifest request returned HTTP %d", resp.StatusCode) + } + + var manifest models.PluginManifest + if err := json.NewDecoder(resp.Body).Decode(&manifest); err != nil { + return models.PluginManifest{}, err + } + + return manifest, nil +} + +func uniqueEntrypoints(manifest models.PluginManifest) []string { + entrypoints := []string{ + manifest.Entrypoints.Main, + manifest.Entrypoints.API, + manifest.Entrypoints.UI, + manifest.Entrypoints.Webhook, + manifest.Entrypoints.CLI, + } + + seen := make(map[string]struct{}, len(entrypoints)) + unique := make([]string, 0, len(entrypoints)) + for _, entry := range entrypoints { + entry = strings.TrimSpace(entry) + if entry == "" { + continue + } + if _, ok := seen[entry]; ok { + continue + } + seen[entry] = struct{}{} + unique = append(unique, entry) + } + + return unique +} + +func hasSharedObjectEntrypoint(entrypoints []string) bool { + for _, entry := range entrypoints { + if strings.HasSuffix(entry, ".so") { + return true + } + } + + return false +} + // downloadFile downloads a single file from URL to local path. // // This is a simple HTTP GET → file write operation with minimal error handling. @@ -1122,23 +1299,24 @@ func (m *PluginMarketplace) updateDatabaseCatalog(ctx context.Context, plugins [ // Upsert to catalog _, err = m.db.DB().ExecContext(ctx, ` INSERT INTO catalog_plugins ( - repository_id, name, version, display_name, description, + repository_id, name, version, display_name, description, source_path, category, plugin_type, icon_url, manifest, tags, created_at, updated_at ) - VALUES (1, $1, $2, $3, $4, $5, $6, $7, $8, $9, NOW(), NOW()) - ON CONFLICT (name) + VALUES (1, $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, NOW(), NOW()) + ON CONFLICT (repository_id, name) DO UPDATE SET version = $2, display_name = $3, description = $4, - category = $5, - plugin_type = $6, - icon_url = $7, - manifest = $8, - tags = $9, + source_path = $5, + category = $6, + plugin_type = $7, + icon_url = $8, + manifest = $9, + tags = $10, updated_at = NOW() `, plugin.Name, plugin.Version, plugin.DisplayName, plugin.Description, - plugin.Category, plugin.Manifest.Type, plugin.IconURL, manifestJSON, plugin.Tags) + plugin.Path, plugin.Category, plugin.Manifest.Type, plugin.IconURL, manifestJSON, plugin.Tags) if err != nil { log.Printf("[Plugin Marketplace] Error updating catalog for %s: %v", plugin.Name, err) diff --git a/api/internal/plugins/marketplace_test.go b/api/internal/plugins/marketplace_test.go new file mode 100644 index 0000000..b572565 --- /dev/null +++ b/api/internal/plugins/marketplace_test.go @@ -0,0 +1,134 @@ +package plugins + +import ( + "context" + "io" + "net/http" + "os" + "strings" + "testing" +) + +func TestFetchCatalogSupportsYAMLCatalogWithManifestHydration(t *testing.T) { + oldTransport := http.DefaultTransport + http.DefaultTransport = roundTripFunc(func(req *http.Request) (*http.Response, error) { + switch req.URL.String() { + case "https://plugins.example/catalog.yaml": + return textResponse(` +spec: + plugins: + - name: streamspace-slack + version: "1.0.0" + displayName: Slack Integration + description: Slack notifications + author: StreamSpace Team + category: official + path: streamspace-slack + tags: [notifications, slack] +`) + case "https://plugins.example/streamspace-slack/manifest.json": + return textResponse(`{ + "name": "streamspace-slack", + "version": "1.0.1", + "displayName": "Slack Integration", + "description": "Send notifications to Slack", + "author": "StreamSpace Team", + "category": "Integrations", + "icon": "slack.png", + "tags": ["notifications", "slack"], + "entrypoints": {"main": "slack_plugin.go"} +}`) + default: + return &http.Response{ + StatusCode: http.StatusNotFound, + Body: io.NopCloser(strings.NewReader("not found")), + Header: make(http.Header), + }, nil + } + }) + defer func() { + http.DefaultTransport = oldTransport + }() + + marketplace := &PluginMarketplace{ + repositoryURL: "https://plugins.example", + } + + plugins, err := marketplace.fetchCatalog(context.Background()) + if err != nil { + t.Fatalf("fetchCatalog returned error: %v", err) + } + if len(plugins) != 1 { + t.Fatalf("expected 1 plugin, got %d", len(plugins)) + } + + plugin := plugins[0] + if plugin.Name != "streamspace-slack" { + t.Fatalf("expected plugin name streamspace-slack, got %s", plugin.Name) + } + if plugin.Version != "1.0.0" { + t.Fatalf("expected catalog version to be preserved, got %s", plugin.Version) + } + if plugin.Manifest.Entrypoints.Main != "slack_plugin.go" { + t.Fatalf("expected manifest main entrypoint slack_plugin.go, got %s", plugin.Manifest.Entrypoints.Main) + } + expectedIconURL := "https://plugins.example/streamspace-slack/slack.png" + if plugin.IconURL != expectedIconURL { + t.Fatalf("expected icon URL %s, got %s", expectedIconURL, plugin.IconURL) + } +} + +func TestDownloadPluginFilesRejectsSourceOnlyManifestEntrypoints(t *testing.T) { + oldTransport := http.DefaultTransport + http.DefaultTransport = roundTripFunc(func(req *http.Request) (*http.Response, error) { + switch req.URL.String() { + case "https://plugins.example/streamspace-slack/manifest.json": + return textResponse(`{ + "name": "streamspace-slack", + "entrypoints": {"main": "slack_plugin.go"} +}`) + case "https://plugins.example/streamspace-slack/README.md": + return textResponse("# Slack") + default: + return &http.Response{ + StatusCode: http.StatusNotFound, + Body: io.NopCloser(strings.NewReader("not found")), + Header: make(http.Header), + }, nil + } + }) + defer func() { + http.DefaultTransport = oldTransport + }() + + pluginDir := t.TempDir() + marketplace := &PluginMarketplace{ + repositoryURL: "https://plugins.example", + } + + err := marketplace.downloadPluginFiles(&MarketplacePlugin{ + Name: "streamspace-slack", + Path: "streamspace-slack", + }, pluginDir) + if err == nil || !strings.Contains(err.Error(), ".so entrypoint") { + t.Fatalf("expected .so entrypoint error, got %v", err) + } + + if _, statErr := os.Stat(pluginDir + "/manifest.json"); statErr != nil { + t.Fatalf("expected manifest.json to be downloaded before validation: %v", statErr) + } +} + +type roundTripFunc func(*http.Request) (*http.Response, error) + +func (f roundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) { + return f(req) +} + +func textResponse(body string) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader(body)), + Header: make(http.Header), + }, nil +} diff --git a/api/internal/plugins/runtime.go b/api/internal/plugins/runtime.go index dc4c554..85fa7b5 100644 --- a/api/internal/plugins/runtime.go +++ b/api/internal/plugins/runtime.go @@ -34,6 +34,7 @@ // 7. **Runtime**: Plugin handles events, serves API requests, runs jobs // 8. **Disabling**: Plugin stops receiving new events (OnDisable hook) // 9. **OnUnload Hook**: Plugin cleans up resources +// // 10. **Unloading**: Plugin removed from memory, all resources released // // # Concurrency Model @@ -561,13 +562,15 @@ type PluginContext struct { Manifest models.PluginManifest // Platform APIs - Database *PluginDatabase - Events *PluginEvents - API *PluginAPI - UI *PluginUI - Storage *PluginStorage - Logger *PluginLogger - Scheduler *PluginScheduler + Database *PluginDatabase + Events *PluginEvents + API *PluginAPI + APIRegistry *PluginAPI + UI *PluginUI + UIRegistry *PluginUI + Storage *PluginStorage + Logger *PluginLogger + Scheduler *PluginScheduler // Platform state runtime *Runtime @@ -808,7 +811,9 @@ func (r *Runtime) LoadPlugin(ctx context.Context, name, version string, config m pluginCtx.Database = NewPluginDatabase(r.db, name) pluginCtx.Events = NewPluginEvents(r.eventBus, name) pluginCtx.API = NewPluginAPI(r.apiRegistry, name) + pluginCtx.APIRegistry = pluginCtx.API pluginCtx.UI = NewPluginUI(r.uiRegistry, name) + pluginCtx.UIRegistry = pluginCtx.UI pluginCtx.Storage = NewPluginStorage(r.db, name) pluginCtx.Logger = NewPluginLogger(name) pluginCtx.Scheduler = NewPluginScheduler(r.scheduler, name) diff --git a/api/internal/plugins/runtime_v2.go b/api/internal/plugins/runtime_v2.go index 8abbdf8..439a375 100644 --- a/api/internal/plugins/runtime_v2.go +++ b/api/internal/plugins/runtime_v2.go @@ -6,11 +6,11 @@ // Design Rationale - Why RuntimeV2: // // RuntimeV2 is an evolution of the original Runtime that adds: -// 1. Automatic discovery of available plugins (filesystem + built-in) -// 2. Database-driven plugin loading (loads only enabled plugins) -// 3. Auto-start capability (plugins load on API startup) -// 4. Integrated event bus for inter-plugin communication -// 5. Centralized registries (API, UI, Events, Scheduler) +// 1. Automatic discovery of available plugins (filesystem + built-in) +// 2. Database-driven plugin loading (loads only enabled plugins) +// 3. Auto-start capability (plugins load on API startup) +// 4. Integrated event bus for inter-plugin communication +// 5. Centralized registries (API, UI, Events, Scheduler) // // Plugin Lifecycle Flow: // @@ -77,15 +77,15 @@ // // RuntimeV2 supports two plugin loading modes: // -// 1. Auto-start (default): Automatically loads all enabled plugins from database -// - Best for: Production deployments -// - Use case: Plugins are managed via UI/API, enabled state in database -// - Example: Admin enables "slack-notifications" via UI → loads on restart +// 1. Auto-start (default): Automatically loads all enabled plugins from database +// - Best for: Production deployments +// - Use case: Plugins are managed via UI/API, enabled state in database +// - Example: Admin enables "slack-notifications" via UI → loads on restart // -// 2. Manual loading: Plugins must be loaded via API calls -// - Best for: Development, testing, debugging -// - Use case: Fine-grained control over plugin loading -// - Example: Load specific plugin version for testing +// 2. Manual loading: Plugins must be loaded via API calls +// - Best for: Development, testing, debugging +// - Use case: Fine-grained control over plugin loading +// - Example: Load specific plugin version for testing // // Database Schema Integration: // @@ -236,7 +236,7 @@ type RuntimeV2 struct { // // Thread Safety: Constructor is not thread-safe. Do not call concurrently. func NewRuntimeV2(database *db.Database, pluginDirs ...string) *RuntimeV2 { - return &RuntimeV2{ + runtime := &RuntimeV2{ db: database, discovery: NewPluginDiscovery(pluginDirs...), plugins: make(map[string]*LoadedPlugin), @@ -246,6 +246,10 @@ func NewRuntimeV2(database *db.Database, pluginDirs ...string) *RuntimeV2 { uiRegistry: NewUIRegistry(), autoStart: true, } + + GetGlobalRegistry().ApplyToDiscovery(runtime.discovery) + + return runtime } // SetAutoStart enables/disables automatic plugin loading on Start(). @@ -293,8 +297,8 @@ func (r *RuntimeV2) SetAutoStart(enabled bool) { // // The plugin becomes available for loading but is not automatically loaded. // To load the plugin, either: -// 1. Enable it in database (for auto-start mode) -// 2. Call LoadPluginWithConfig manually +// 1. Enable it in database (for auto-start mode) +// 2. Call LoadPluginWithConfig manually // // Example: // @@ -633,7 +637,9 @@ func (r *RuntimeV2) LoadPluginWithConfig(ctx context.Context, name, version stri pluginCtx.Database = NewPluginDatabase(r.db, name) pluginCtx.Events = NewPluginEvents(r.eventBus, name) pluginCtx.API = NewPluginAPI(r.apiRegistry, name) + pluginCtx.APIRegistry = pluginCtx.API pluginCtx.UI = NewPluginUI(r.uiRegistry, name) + pluginCtx.UIRegistry = pluginCtx.UI pluginCtx.Storage = NewPluginStorage(r.db, name) pluginCtx.Logger = NewPluginLogger(name) pluginCtx.Scheduler = NewPluginScheduler(r.scheduler, name) diff --git a/api/internal/plugins/runtime_v2_test.go b/api/internal/plugins/runtime_v2_test.go new file mode 100644 index 0000000..9f647c6 --- /dev/null +++ b/api/internal/plugins/runtime_v2_test.go @@ -0,0 +1,26 @@ +package plugins + +import "testing" + +func TestNewRuntimeV2IncludesGloballyRegisteredPluginsInDiscovery(t *testing.T) { + const pluginName = "test-runtimev2-global-plugin" + + Register(pluginName, func() PluginHandler { + return &BasePlugin{Name: pluginName} + }) + + runtime := NewRuntimeV2(nil) + available := runtime.ListAvailablePlugins() + + found := false + for _, name := range available { + if name == pluginName { + found = true + break + } + } + + if !found { + t.Fatalf("expected %s to be discoverable via global registry", pluginName) + } +} diff --git a/api/internal/plugins/scheduler.go b/api/internal/plugins/scheduler.go index 17b1af6..a60cb63 100644 --- a/api/internal/plugins/scheduler.go +++ b/api/internal/plugins/scheduler.go @@ -273,7 +273,28 @@ func NewPluginScheduler(cronInstance *cron.Cron, pluginName string) *PluginSched // - job: Function to execute on schedule // // Returns nil on success, error if cron expression is invalid. -func (ps *PluginScheduler) Schedule(jobName string, cronExpr string, job func()) error { +func (ps *PluginScheduler) Schedule(args ...interface{}) error { + var jobName string + var cronExpr string + var job func() + + switch len(args) { + case 2: + cronExpr, _ = args[0].(string) + job, _ = args[1].(func()) + jobName = cronExpr + case 3: + jobName, _ = args[0].(string) + cronExpr, _ = args[1].(string) + job, _ = args[2].(func()) + default: + return fmt.Errorf("schedule expects either (cronExpr, job) or (jobName, cronExpr, job)") + } + + if cronExpr == "" || job == nil { + return fmt.Errorf("invalid schedule arguments") + } + // Remove existing job if any if existingID, exists := ps.jobIDs[jobName]; exists { ps.cron.Remove(existingID) diff --git a/api/internal/plugins/ui_registry.go b/api/internal/plugins/ui_registry.go index 263984d..34f48e6 100644 --- a/api/internal/plugins/ui_registry.go +++ b/api/internal/plugins/ui_registry.go @@ -756,18 +756,24 @@ type WidgetOptions struct { // Registers a widget for display on the user home dashboard. // // Returns: error if widget ID conflicts, nil on success -func (pu *PluginUI) RegisterWidget(opts WidgetOptions) error { - widget := &UIWidget{ - ID: opts.ID, - Title: opts.Title, - Component: opts.Component, - Position: opts.Position, - Width: opts.Width, - Icon: opts.Icon, - Permissions: opts.Permissions, +func (pu *PluginUI) RegisterWidget(input interface{}) error { + switch opts := input.(type) { + case WidgetOptions: + widget := &UIWidget{ + ID: opts.ID, + Title: opts.Title, + Component: opts.Component, + Position: opts.Position, + Width: opts.Width, + Icon: opts.Icon, + Permissions: opts.Permissions, + } + return pu.registry.RegisterWidget(pu.pluginName, widget) + case *UIWidget: + return pu.registry.RegisterWidget(pu.pluginName, opts) + default: + return fmt.Errorf("RegisterWidget requires WidgetOptions or *UIWidget") } - - return pu.registry.RegisterWidget(pu.pluginName, widget) } // RegisterAdminWidget registers an admin dashboard widget. @@ -834,19 +840,25 @@ type AdminPageOptions struct { } // RegisterAdminPage registers an admin page at /admin/plugins/{name}/{path}. -func (pu *PluginUI) RegisterAdminPage(opts AdminPageOptions) error { - page := &UIAdminPage{ - ID: opts.ID, - Title: opts.Title, - Path: opts.Path, - Component: opts.Component, - Icon: opts.Icon, - MenuLabel: opts.MenuLabel, - Permissions: opts.Permissions, - Order: opts.Order, +func (pu *PluginUI) RegisterAdminPage(input interface{}) error { + switch opts := input.(type) { + case AdminPageOptions: + page := &UIAdminPage{ + ID: opts.ID, + Title: opts.Title, + Path: opts.Path, + Component: opts.Component, + Icon: opts.Icon, + MenuLabel: opts.MenuLabel, + Permissions: opts.Permissions, + Order: opts.Order, + } + return pu.registry.RegisterAdminPage(pu.pluginName, page) + case *UIAdminPage: + return pu.registry.RegisterAdminPage(pu.pluginName, opts) + default: + return fmt.Errorf("RegisterAdminPage requires AdminPageOptions or *UIAdminPage") } - - return pu.registry.RegisterAdminPage(pu.pluginName, page) } // MenuItemOptions contains options for registering a menu item. @@ -866,16 +878,22 @@ type MenuItemOptions struct { } // RegisterMenuItem registers a navigation menu item. -func (pu *PluginUI) RegisterMenuItem(opts MenuItemOptions) error { - item := &UIMenuItem{ - ID: opts.ID, - Label: opts.Label, - Path: opts.Path, - Icon: opts.Icon, - Component: opts.Component, - Order: opts.Order, - Permissions: opts.Permissions, +func (pu *PluginUI) RegisterMenuItem(input interface{}) error { + switch opts := input.(type) { + case MenuItemOptions: + item := &UIMenuItem{ + ID: opts.ID, + Label: opts.Label, + Path: opts.Path, + Icon: opts.Icon, + Component: opts.Component, + Order: opts.Order, + Permissions: opts.Permissions, + } + return pu.registry.RegisterMenuItem(pu.pluginName, item) + case *UIMenuItem: + return pu.registry.RegisterMenuItem(pu.pluginName, opts) + default: + return fmt.Errorf("RegisterMenuItem requires MenuItemOptions or *UIMenuItem") } - - return pu.registry.RegisterMenuItem(pu.pluginName, item) } diff --git a/api/pkg/pluginsdk/sdk.go b/api/pkg/pluginsdk/sdk.go new file mode 100644 index 0000000..d37edff --- /dev/null +++ b/api/pkg/pluginsdk/sdk.go @@ -0,0 +1,47 @@ +package plugins + +import internalplugins "github.com/streamspace-dev/streamspace/api/internal/plugins" + +type ( + APIRegistry = internalplugins.APIRegistry + BasePlugin = internalplugins.BasePlugin + EventBus = internalplugins.EventBus + GlobalPluginRegistry = internalplugins.GlobalPluginRegistry + LoadedPlugin = internalplugins.LoadedPlugin + Plugin = internalplugins.PluginHandler + PluginAPI = internalplugins.PluginAPI + PluginContext = internalplugins.PluginContext + PluginDatabase = internalplugins.PluginDatabase + PluginEvents = internalplugins.PluginEvents + PluginFactory = internalplugins.PluginFactory + PluginHandler = internalplugins.PluginHandler + PluginLogger = internalplugins.PluginLogger + PluginScheduler = internalplugins.PluginScheduler + PluginStorage = internalplugins.PluginStorage + PluginUI = internalplugins.PluginUI + RuntimeV2 = internalplugins.RuntimeV2 + UIAdminPage = internalplugins.UIAdminPage + UIMenuItem = internalplugins.UIMenuItem + UIRegistry = internalplugins.UIRegistry + UIWidget = internalplugins.UIWidget +) + +func GetGlobalRegistry() *GlobalPluginRegistry { + return internalplugins.GetGlobalRegistry() +} + +// Register accepts either a factory function or a concrete plugin instance. +func Register(name string, plugin interface{}) { + switch p := plugin.(type) { + case PluginFactory: + internalplugins.Register(name, p) + case func() PluginHandler: + internalplugins.Register(name, internalplugins.PluginFactory(p)) + case PluginHandler: + internalplugins.Register(name, func() internalplugins.PluginHandler { + return p + }) + default: + panic("plugins.Register requires a plugin factory or PluginHandler") + } +}