Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 60 additions & 33 deletions api/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/streamspace-dev/streamspace/api/internal/handlers"
"github.com/streamspace-dev/streamspace/api/internal/k8s"
"github.com/streamspace-dev/streamspace/api/internal/middleware"
internalplugins "github.com/streamspace-dev/streamspace/api/internal/plugins"
"github.com/streamspace-dev/streamspace/api/internal/quota"
"github.com/streamspace-dev/streamspace/api/internal/services"
"github.com/streamspace-dev/streamspace/api/internal/sync"
Expand All @@ -36,22 +37,23 @@ import (
func main() {
// Configuration from environment
port := getEnv("API_PORT", "8000")
tlsCertFile := os.Getenv("TLS_CERT_FILE") // Path to TLS certificate file (PEM format)
tlsKeyFile := os.Getenv("TLS_KEY_FILE") // Path to TLS private key file (PEM format)
agentCACertFile := os.Getenv("AGENT_CA_CERT_FILE") // Path to CA cert for validating agent client certs (enables mTLS)
tlsCertFile := os.Getenv("TLS_CERT_FILE") // Path to TLS certificate file (PEM format)
tlsKeyFile := os.Getenv("TLS_KEY_FILE") // Path to TLS private key file (PEM format)
agentCACertFile := os.Getenv("AGENT_CA_CERT_FILE") // Path to CA cert for validating agent client certs (enables mTLS)
requireClientCert := getEnv("REQUIRE_CLIENT_CERT", "false") == "true" // Require client cert (only with mTLS)
rateLimitEnabled := getEnv("RATE_LIMIT_ENABLED", "true") == "true" // Enable rate limiting (default: true)
rateLimitRPM := getEnvInt("RATE_LIMIT_REQUESTS_PER_MINUTE", 60) // Requests per minute (default: 60)
rateLimitEnabled := getEnv("RATE_LIMIT_ENABLED", "true") == "true" // Enable rate limiting (default: true)
rateLimitRPM := getEnvInt("RATE_LIMIT_REQUESTS_PER_MINUTE", 60) // Requests per minute (default: 60)
// rateLimitBurst := getEnvInt("RATE_LIMIT_BURST", 10) // Burst capacity (default: 10) - reserved for future use
auditLogEnabled := getEnv("AUDIT_LOG_ENABLED", "true") == "true" // Enable audit logging (default: true)
auditLogBodies := getEnv("AUDIT_LOG_BODIES", "false") == "true" // Log request bodies (default: false for privacy)
auditLogBodies := getEnv("AUDIT_LOG_BODIES", "false") == "true" // Log request bodies (default: false for privacy)
dbHost := getEnv("DB_HOST", "localhost")
dbPort := getEnv("DB_PORT", "5432")
dbUser := getEnv("DB_USER", "streamspace")
dbPassword := getEnv("DB_PASSWORD", "streamspace")
dbName := getEnv("DB_NAME", "streamspace")
dbSSLMode := getEnv("DB_SSL_MODE", "disable") // SECURITY: Should be "require" in production
pluginDir := getEnv("PLUGIN_DIR", "./plugins")
pluginRepositoryURL := getEnv("PLUGIN_REPOSITORY_URL", "https://raw.githubusercontent.com/JoshuaAFerguson/streamspace-plugins/main")

log.Println("Starting StreamSpace API Server...")

Expand Down Expand Up @@ -112,11 +114,20 @@ func main() {
}
defer redisCache.Close()

// Initialize Kubernetes client
// Initialize Kubernetes client. Optional in v2.0+ — the downstream
// handler signature explicitly accepts nil for standalone-API and
// docker-platform deployments (see line ~360 below). Only hard-fail
// when the operator declared kubernetes as the platform; otherwise
// log + continue with k8sClient=nil.
log.Println("Initializing Kubernetes client...")
k8sClient, err := k8s.NewClient()
if err != nil {
log.Fatalf("Failed to initialize Kubernetes client: %v", err)
platformEnv := os.Getenv("PLATFORM")
if platformEnv == "" || platformEnv == events.PlatformKubernetes {
log.Fatalf("Failed to initialize Kubernetes client: %v", err)
}
log.Printf("Kubernetes client init failed (PLATFORM=%s, continuing without it): %v", platformEnv, err)
k8sClient = nil
}

// Initialize stub event publisher (NATS removed - WebSocket used instead)
Expand Down Expand Up @@ -361,7 +372,10 @@ func main() {
activityHandler := handlers.NewActivityHandler(k8sClient, activityTracker, database)
catalogHandler := handlers.NewCatalogHandler(database)
sharingHandler := handlers.NewSharingHandler(database)
pluginHandler := handlers.NewPluginHandler(database, pluginDir)
pluginRuntime := internalplugins.NewRuntimeV2(database, pluginDir)
pluginMarketplace := internalplugins.NewPluginMarketplace(database, pluginRepositoryURL, pluginDir)
pluginMarketplaceHandler := handlers.NewPluginMarketplaceHandler(database, pluginMarketplace, pluginRuntime)
pluginHandler := handlers.NewPluginHandler(database, pluginDir, pluginRuntime)
dashboardHandler := handlers.NewDashboardHandler(database, k8sClient)
sessionActivityHandler := handlers.NewSessionActivityHandler(database)
apiKeyHandler := handlers.NewAPIKeyHandler(database)
Expand Down Expand Up @@ -395,6 +409,18 @@ func main() {
agentWebSocketHandler := handlers.NewAgentWebSocketHandler(agentHub, database)
selkiesProxyHandler := handlers.NewSelkiesProxyHandler(database, agentHub, "streamspace")

if err := pluginMarketplace.SyncCatalog(context.Background()); err != nil {
log.Printf("Warning: Initial plugin catalog sync failed: %v", err)
}
if err := pluginRuntime.Start(context.Background()); err != nil {
log.Printf("Warning: Plugin runtime failed to start: %v", err)
}
defer func() {
if err := pluginRuntime.Stop(context.Background()); err != nil {
log.Printf("Warning: Plugin runtime failed to stop cleanly: %v", err)
}
}()

// SECURITY: Initialize webhook authentication
webhookSecret := os.Getenv("WEBHOOK_SECRET")
if webhookSecret == "" {
Expand All @@ -403,7 +429,7 @@ func main() {
}

// Setup routes
setupRoutes(router, apiHandler, userHandler, groupHandler, authHandler, activityHandler, catalogHandler, sharingHandler, pluginHandler, dashboardHandler, sessionActivityHandler, apiKeyHandler, teamHandler, preferencesHandler, notificationsHandler, searchHandler, sessionTemplatesHandler, batchHandler, monitoringHandler, quotasHandler, nodeHandler, wsManager, consoleHandler, collaborationHandler, integrationsHandler, loadBalancingHandler, schedulingHandler, securityHandler, templateVersioningHandler, setupHandler, applicationHandler, auditHandler, configurationHandler, licenseHandler, recordingHandler, agentHandler, agentWebSocketHandler, selkiesProxyHandler, jwtManager, userDB, database, redisCache, webhookSecret, rateLimitEnabled, rateLimitRPM)
setupRoutes(router, apiHandler, userHandler, groupHandler, authHandler, activityHandler, catalogHandler, sharingHandler, pluginHandler, pluginMarketplaceHandler, dashboardHandler, sessionActivityHandler, apiKeyHandler, teamHandler, preferencesHandler, notificationsHandler, searchHandler, sessionTemplatesHandler, batchHandler, monitoringHandler, quotasHandler, nodeHandler, wsManager, consoleHandler, collaborationHandler, integrationsHandler, loadBalancingHandler, schedulingHandler, securityHandler, templateVersioningHandler, setupHandler, applicationHandler, auditHandler, configurationHandler, licenseHandler, recordingHandler, agentHandler, agentWebSocketHandler, selkiesProxyHandler, jwtManager, userDB, database, redisCache, webhookSecret, rateLimitEnabled, rateLimitRPM)

// SECURITY: Configure mTLS for agent authentication (optional)
var tlsConfig *tls.Config
Expand All @@ -424,9 +450,9 @@ func main() {

// Configure TLS with client certificate validation
tlsConfig = &tls.Config{
ClientCAs: caCertPool,
ClientCAs: caCertPool,
ClientAuth: tls.VerifyClientCertIfGiven, // Default: optional client cert
MinVersion: tls.VersionTLS12, // Enforce TLS 1.2+
MinVersion: tls.VersionTLS12, // Enforce TLS 1.2+
}

// If REQUIRE_CLIENT_CERT is true, make client certs mandatory
Expand Down Expand Up @@ -548,7 +574,7 @@ func main() {
log.Println("Graceful shutdown completed")
}

func setupRoutes(router *gin.Engine, h *api.Handler, userHandler *handlers.UserHandler, groupHandler *handlers.GroupHandler, authHandler *auth.AuthHandler, activityHandler *handlers.ActivityHandler, catalogHandler *handlers.CatalogHandler, sharingHandler *handlers.SharingHandler, pluginHandler *handlers.PluginHandler, dashboardHandler *handlers.DashboardHandler, sessionActivityHandler *handlers.SessionActivityHandler, apiKeyHandler *handlers.APIKeyHandler, teamHandler *handlers.TeamHandler, preferencesHandler *handlers.PreferencesHandler, notificationsHandler *handlers.NotificationsHandler, searchHandler *handlers.SearchHandler, sessionTemplatesHandler *handlers.SessionTemplatesHandler, batchHandler *handlers.BatchHandler, monitoringHandler *handlers.MonitoringHandler, quotasHandler *handlers.QuotasHandler, nodeHandler *handlers.NodeHandler, wsManager *internalWebsocket.Manager, consoleHandler *handlers.ConsoleHandler, collaborationHandler *handlers.CollaborationHandler, integrationsHandler *handlers.IntegrationsHandler, loadBalancingHandler *handlers.LoadBalancingHandler, schedulingHandler *handlers.SchedulingHandler, securityHandler *handlers.SecurityHandler, templateVersioningHandler *handlers.TemplateVersioningHandler, setupHandler *handlers.SetupHandler, applicationHandler *handlers.ApplicationHandler, auditHandler *handlers.AuditHandler, configurationHandler *handlers.ConfigurationHandler, licenseHandler *handlers.LicenseHandler, recordingHandler *handlers.RecordingHandler, agentHandler *handlers.AgentHandler, agentWebSocketHandler *handlers.AgentWebSocketHandler, selkiesProxyHandler *handlers.SelkiesProxyHandler, jwtManager *auth.JWTManager, userDB *db.UserDB, database *db.Database, redisCache *cache.Cache, webhookSecret string, rateLimitEnabled bool, rateLimitRPM int) {
func setupRoutes(router *gin.Engine, h *api.Handler, userHandler *handlers.UserHandler, groupHandler *handlers.GroupHandler, authHandler *auth.AuthHandler, activityHandler *handlers.ActivityHandler, catalogHandler *handlers.CatalogHandler, sharingHandler *handlers.SharingHandler, pluginHandler *handlers.PluginHandler, pluginMarketplaceHandler *handlers.PluginMarketplaceHandler, dashboardHandler *handlers.DashboardHandler, sessionActivityHandler *handlers.SessionActivityHandler, apiKeyHandler *handlers.APIKeyHandler, teamHandler *handlers.TeamHandler, preferencesHandler *handlers.PreferencesHandler, notificationsHandler *handlers.NotificationsHandler, searchHandler *handlers.SearchHandler, sessionTemplatesHandler *handlers.SessionTemplatesHandler, batchHandler *handlers.BatchHandler, monitoringHandler *handlers.MonitoringHandler, quotasHandler *handlers.QuotasHandler, nodeHandler *handlers.NodeHandler, wsManager *internalWebsocket.Manager, consoleHandler *handlers.ConsoleHandler, collaborationHandler *handlers.CollaborationHandler, integrationsHandler *handlers.IntegrationsHandler, loadBalancingHandler *handlers.LoadBalancingHandler, schedulingHandler *handlers.SchedulingHandler, securityHandler *handlers.SecurityHandler, templateVersioningHandler *handlers.TemplateVersioningHandler, setupHandler *handlers.SetupHandler, applicationHandler *handlers.ApplicationHandler, auditHandler *handlers.AuditHandler, configurationHandler *handlers.ConfigurationHandler, licenseHandler *handlers.LicenseHandler, recordingHandler *handlers.RecordingHandler, agentHandler *handlers.AgentHandler, agentWebSocketHandler *handlers.AgentWebSocketHandler, selkiesProxyHandler *handlers.SelkiesProxyHandler, jwtManager *auth.JWTManager, userDB *db.UserDB, database *db.Database, redisCache *cache.Cache, webhookSecret string, rateLimitEnabled bool, rateLimitRPM int) {
// SECURITY: Create authentication middleware
authMiddleware := auth.Middleware(jwtManager, userDB)
adminMiddleware := auth.RequireRole("admin")
Expand Down Expand Up @@ -906,6 +932,7 @@ func setupRoutes(router *gin.Engine, h *api.Handler, userHandler *handlers.UserH

// Plugin system - using dedicated handler
pluginHandler.RegisterRoutes(protected)
pluginMarketplaceHandler.RegisterRoutes(protected)

// Installed applications management - using dedicated handler (admin only for management)
applicationHandler.RegisterRoutes(protected)
Expand Down Expand Up @@ -1050,27 +1077,27 @@ func setupRoutes(router *gin.Engine, h *api.Handler, userHandler *handlers.UserH
protected.GET("/metrics", operatorMiddleware, h.GetMetrics)
}

// v2.0 Agent self-service routes (require mTLS OR API key authentication, not JWT)
// These routes are for agents to register themselves and send heartbeats
// Authentication: mTLS (if configured) or API key fallback
// Rate limited to prevent brute-force attacks
agentRoutes := v1.Group("/agents")
agentRoutes.Use(agentRateLimit(globalRateLimiter, rateLimitEnabled, rateLimitRPM)) // Apply rate limiting first
agentRoutes.Use(agentAuth.RequireAuth()) // Then authentication
{
agentHandler.RegisterRoutes(agentRoutes)
}
// v2.0 Agent self-service routes (require mTLS OR API key authentication, not JWT)
// These routes are for agents to register themselves and send heartbeats
// Authentication: mTLS (if configured) or API key fallback
// Rate limited to prevent brute-force attacks
agentRoutes := v1.Group("/agents")
agentRoutes.Use(agentRateLimit(globalRateLimiter, rateLimitEnabled, rateLimitRPM)) // Apply rate limiting first
agentRoutes.Use(agentAuth.RequireAuth()) // Then authentication
{
agentHandler.RegisterRoutes(agentRoutes)
}

// v2.0 Agent WebSocket connections (require mTLS OR API key authentication, not JWT)
// Agents connect here to receive commands and send status updates
// Authentication: mTLS (if configured) or API key fallback
// Rate limited to prevent connection flooding
agentWSRoutes := v1.Group("")
agentWSRoutes.Use(agentRateLimit(globalRateLimiter, rateLimitEnabled, rateLimitRPM)) // Apply rate limiting first
agentWSRoutes.Use(agentAuth.RequireAuth()) // Then authentication
{
agentWebSocketHandler.RegisterRoutes(agentWSRoutes)
}
// v2.0 Agent WebSocket connections (require mTLS OR API key authentication, not JWT)
// Agents connect here to receive commands and send status updates
// Authentication: mTLS (if configured) or API key fallback
// Rate limited to prevent connection flooding
agentWSRoutes := v1.Group("")
agentWSRoutes.Use(agentRateLimit(globalRateLimiter, rateLimitEnabled, rateLimitRPM)) // Apply rate limiting first
agentWSRoutes.Use(agentAuth.RequireAuth()) // Then authentication
{
agentWebSocketHandler.RegisterRoutes(agentWSRoutes)
}
}

// WebSocket endpoints (require authentication)
Expand Down
25 changes: 12 additions & 13 deletions api/go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
module github.com/streamspace-dev/streamspace/api

go 1.24.0

toolchain go1.24.7
go 1.25.0

require (
github.com/DATA-DOG/go-sqlmock v1.5.2
Expand All @@ -20,9 +18,9 @@ require (
github.com/redis/go-redis/v9 v9.16.0
github.com/robfig/cron/v3 v3.0.1
github.com/rs/zerolog v1.34.0
github.com/stretchr/testify v1.10.0
golang.org/x/crypto v0.45.0
golang.org/x/oauth2 v0.28.0
github.com/stretchr/testify v1.11.1
golang.org/x/crypto v0.49.0
golang.org/x/oauth2 v0.36.0
gopkg.in/yaml.v3 v3.0.1
k8s.io/api v0.34.2
k8s.io/apimachinery v0.34.2
Expand All @@ -44,7 +42,7 @@ require (
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-jose/go-jose/v4 v4.1.4 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-openapi/jsonpointer v0.21.0 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.23.0 // indirect
Expand All @@ -70,6 +68,7 @@ require (
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/rogpeppe/go-internal v1.14.1 // indirect
github.com/russellhaering/goxmldsig v1.6.0 // indirect
github.com/spf13/pflag v1.0.6 // indirect
github.com/stretchr/objx v0.5.2 // indirect
Expand All @@ -80,12 +79,12 @@ require (
go.yaml.in/yaml/v2 v2.4.2 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/arch v0.3.0 // indirect
golang.org/x/net v0.47.0 // indirect
golang.org/x/sys v0.38.0 // indirect
golang.org/x/term v0.37.0 // indirect
golang.org/x/text v0.31.0 // indirect
golang.org/x/time v0.9.0 // indirect
google.golang.org/protobuf v1.36.5 // indirect
golang.org/x/net v0.52.0 // indirect
golang.org/x/sys v0.42.0 // indirect
golang.org/x/term v0.41.0 // indirect
golang.org/x/text v0.35.0 // indirect
golang.org/x/time v0.15.0 // indirect
google.golang.org/protobuf v1.36.11 // indirect
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
k8s.io/klog/v2 v2.130.1 // indirect
Expand Down
Loading
Loading