diff --git a/Makefile b/Makefile index 6ebaf4e..f167d48 100644 --- a/Makefile +++ b/Makefile @@ -35,9 +35,8 @@ stop-buildx: build-image: setup-buildx @echo "Building image for AMD64 and 386 🔨" - @docker buildx build --platform linux/amd64,linux/386 -t $(IMAGE_FULL_NAME) . + @docker buildx build --platform linux/amd64,linux/386 -t $(IMAGE_FULL_NAME) --load . @echo "Image built successfully 🎉" - @echo "Note: Multi-platform images are not loaded locally. Use --push to push to registry." build-image-arm64: setup-buildx @echo "Building image for ARM64 🔨" diff --git a/README.md b/README.md index 0ea5881..0c2866c 100644 --- a/README.md +++ b/README.md @@ -4,11 +4,13 @@ A lightweight Go service for processing and serving images with YAML-configurabl ## Features +- **Presigned Uploads**: `/v1/uploads/presign` - secure direct-to-S3 uploads with validation - **Original Image Serving**: `/originals/{type}/{image_id}` - serve original images directly from storage - **Thumbnail Generation**: `/thumb/{type}/{image_id}` - on-demand thumbnail generation -- **YAML Configuration**: Different processing rules per image type (avatar, photo, banner) +- **Unified Configuration**: Profile-based YAML config combining upload and processing rules - **Multiple Formats**: Convert images to WebP, JPEG, PNG with configurable quality -- **S3 Integration**: Fetch and store images in AWS S3 with folder organization +- **Video Support**: Ready for video upload and processing (processing features coming soon) +- **S3 Integration**: Direct S3 uploads with multipart support for large files - **CDN-Optimized**: Cache-Control and ETag headers for optimal CDN performance - **Graceful Shutdown**: Production-ready server lifecycle management @@ -19,17 +21,147 @@ A lightweight Go service for processing and serving images with YAML-configurabl ## API Endpoints +### Presigned Uploads +``` +POST /v1/uploads/presign +``` +Generates presigned URLs for secure direct-to-S3 uploads. + +**Request Body:** +```json +{ + "key_base": "unique-file-id", + "ext": "jpg", + "mime": "image/jpeg", + "size_bytes": 1024000, + "kind": "image", + "profile": "avatar", + "multipart": "auto" +} +``` + +**Response for Single Upload:** +```json +{ + "object_key": "originals/avatars/ab/unique-file-id.jpg", + "upload": { + "single": { + "method": "PUT", + "url": "https://presigned-s3-url", + "headers": { + "Content-Type": "image/jpeg", + "If-None-Match": "*" + }, + "expires_at": "2024-01-01T12:00:00Z" + } + } +} +``` + +**Response for Multipart Upload:** +```json +{ + "object_key": "originals/avatars/ab/unique-file-id.jpg", + "upload": { + "multipart": { + "upload_id": "abc123xyz", + "part_size": 8388608, + "parts": [ + { + "part_number": 1, + "method": "PUT", + "url": "https://presigned-s3-part-url-1", + "headers": {"Content-Type": "image/jpeg"}, + "expires_at": "2024-01-01T12:00:00Z" + } + ], + "complete": { + "method": "POST", + "url": "https://your-api/v1/uploads/originals%2Favatars%2Fab%2Funique-file-id.jpg/complete/abc123xyz", + "headers": {"Content-Type": "application/json"}, + "expires_at": "2024-01-01T12:00:00Z" + }, + "abort": { + "method": "DELETE", + "url": "https://your-api/v1/uploads/originals%2Favatars%2Fab%2Funique-file-id.jpg/abort/abc123xyz", + "headers": {}, + "expires_at": "2024-01-01T12:00:00Z" + } + } + } +} +``` + +**Parameters:** +- `key_base`: Unique identifier for the file +- `ext`: File extension (optional, for backward compatibility) +- `mime`: MIME type of the file +- `size_bytes`: File size in bytes +- `kind`: Media type (`image` or `video`) +- `profile`: Configuration profile to use (`avatar`, `photo`, `video`, etc.) +- `multipart`: Upload strategy (`auto`, `force`, or `off`) + +### Multipart Upload Completion +``` +POST /v1/uploads/{object_key}/complete/{upload_id} +``` +Completes a multipart upload by providing the ETags for all uploaded parts. + +**Request Body:** +```json +{ + "parts": [ + { + "part_number": 1, + "etag": "\"d41d8cd98f00b204e9800998ecf8427e\"" + }, + { + "part_number": 2, + "etag": "\"098f6bcd4621d373cade4e832627b4f6\"" + } + ] +} +``` + +**Response:** +```json +{ + "status": "completed", + "object_key": "originals/avatars/ab/unique-file-id.jpg" +} +``` + +### Multipart Upload Abort +``` +DELETE /v1/uploads/{object_key}/abort/{upload_id} +``` +Aborts a multipart upload and cleans up any uploaded parts. + +**Response:** +```json +{ + "status": "aborted", + "upload_id": "abc123xyz" +} +``` + ### Thumbnails ``` GET /thumb/{type}/{image_id}?width=512 +POST /thumb/{type}/{image_id} ``` -Generates and serves thumbnails with configurable dimensions. +Generates and serves thumbnails with configurable dimensions. POST requests require authentication. -**Parameters:** +**GET Parameters:** - `type`: Image category (avatar, photo, banner, or any configured type) - `image_id`: Unique identifier for the image - `width`: Image width in pixels (optional, defaults to the type's `default_size` from storage config) +**POST Parameters:** +- Requires authentication (API key) +- Request body should contain the image data +- Used for uploading images to be processed + ### Original Images ``` GET /originals/{type}/{image_id} @@ -50,12 +182,22 @@ Returns service health status. ### Storage Configuration (storage-config.yaml) -MediaFlow uses YAML configuration to define processing rules per image type: +MediaFlow uses YAML configuration to define profiles that combine upload settings and processing rules: ```yaml -storage_options: +profiles: avatar: - origin_folder: "originals/avatars" + # Upload configuration + kind: "image" + allowed_mimes: ["image/jpeg", "image/png", "image/webp"] + size_max_bytes: 5242880 # 5MB + multipart_threshold_mb: 15 + part_size_mb: 8 + token_ttl_seconds: 900 # 15 minutes + storage_path: "originals/avatars/{shard?}/{key_base}" + enable_sharding: true + + # Processing configuration thumb_folder: "thumbnails/avatars" sizes: ["128", "256"] default_size: "256" @@ -63,23 +205,46 @@ storage_options: convert_to: "webp" photo: - origin_folder: "originals/photos" + kind: "image" + allowed_mimes: ["image/jpeg", "image/png", "image/webp"] + size_max_bytes: 20971520 # 20MB + multipart_threshold_mb: 15 + part_size_mb: 8 + token_ttl_seconds: 900 + storage_path: "originals/photos/{shard?}/{key_base}" + enable_sharding: true + thumb_folder: "thumbnails/photos" sizes: ["256", "512", "1024"] default_size: "256" quality: 90 convert_to: "webp" - banner: - origin_folder: "originals/banners" - thumb_folder: "thumbnails/banners" - sizes: ["512", "1024", "2048"] - default_size: "512" - quality: 95 - convert_to: "webp" - + video: + kind: "video" + allowed_mimes: ["video/mp4", "video/quicktime", "video/webm"] + size_max_bytes: 104857600 # 100MB + multipart_threshold_mb: 15 + part_size_mb: 8 + token_ttl_seconds: 1800 # 30 minutes + storage_path: "originals/videos/{shard?}/{key_base}" + enable_sharding: true + + thumb_folder: "posters/videos" # Video thumbnails + proxy_folder: "proxies/videos" # Compressed versions + formats: ["mp4", "webm"] + quality: 80 + default: - origin_folder: "originals" + kind: "image" + allowed_mimes: ["image/jpeg", "image/png"] + size_max_bytes: 10485760 # 10MB + multipart_threshold_mb: 15 + part_size_mb: 8 + token_ttl_seconds: 900 + storage_path: "originals/{shard?}/{key_base}" + enable_sharding: true + thumb_folder: "thumbnails" sizes: ["256", "512"] default_size: "256" @@ -87,6 +252,50 @@ storage_options: convert_to: "webp" ``` +### Configuration Fields + +#### Upload Configuration +- `kind`: Media type (`image` or `video`) +- `allowed_mimes`: Array of allowed MIME types +- `size_max_bytes`: Maximum file size in bytes +- `multipart_threshold_mb`: Size threshold for multipart uploads +- `part_size_mb`: Size of each multipart chunk +- `token_ttl_seconds`: Presigned URL expiration time +- `storage_path`: Template for where files are stored in S3 (supports `{key_base}`, `{ext}`, `{shard}`, `{shard?}`) +- `enable_sharding`: Whether to use sharding for load distribution + +#### Processing Configuration +- `thumb_folder`: Folder for storing thumbnails +- `sizes`: Available thumbnail sizes +- `default_size`: Default thumbnail size if none specified +- `quality`: Image compression quality (1-100) +- `convert_to`: Format to convert images to (`webp`, `jpeg`, etc.) + +#### Storage Path Templates +The `storage_path` field uses a template system to define where files are stored: +- `{key_base}`: The unique file identifier +- `{ext}`: File extension +- `{shard}`: Shard value (only when `enable_sharding: true`) +- `{shard?}`: Optional shard (removed when `enable_sharding: false`) + +**Sharding Modes:** + +**Auto-sharding** (`enable_sharding: true`): +- `"originals/{shard?}/{key_base}"` → `originals/ab/my-file.jpg` +- Shards auto-generated from key_base hash +- Clients can optionally provide custom shard in request + +**Fixed organization** (`enable_sharding: false`): +- `"originals/user123/{key_base}"` → `originals/user123/my-file.jpg` +- `"uploads/{year}/{month}/{key_base}"` → Custom organization +- Any `{shard}` placeholders are removed +- Custom shards in requests are ignored + +Examples: +- `"originals/{key_base}"` → `originals/my-file.jpg` +- `"uploads/{shard?}/{key_base}"` → `uploads/ab/my-file.jpg` (with sharding) +- `"users/team-marketing/{key_base}"` → Fixed custom prefix + ### Environment Variables Create a `.env` file for local development: diff --git a/examples/storage-config.yaml b/examples/storage-config.yaml index 1132741..c7d77a8 100644 --- a/examples/storage-config.yaml +++ b/examples/storage-config.yaml @@ -1,6 +1,16 @@ -storage_options: +profiles: avatar: - origin_folder: "originals/avatars" + # Upload configuration + kind: "image" + allowed_mimes: ["image/jpeg", "image/png", "image/webp"] + size_max_bytes: 5242880 # 5MB + multipart_threshold_mb: 15 + part_size_mb: 8 + token_ttl_seconds: 900 # 15 minutes + storage_path: "originals/avatars/{shard?}/{key_base}" + enable_sharding: true + + # Processing configuration thumb_folder: "thumbnails/avatars" sizes: ["128", "256"] default_size: "256" @@ -8,7 +18,17 @@ storage_options: convert_to: "webp" photo: - origin_folder: "originals/photos" + # Upload configuration + kind: "image" + allowed_mimes: ["image/jpeg", "image/png", "image/webp"] + size_max_bytes: 20971520 # 20MB + multipart_threshold_mb: 15 + part_size_mb: 8 + token_ttl_seconds: 900 + storage_path: "originals/photos/{shard?}/{key_base}" + enable_sharding: true + + # Processing configuration thumb_folder: "thumbnails/photos" sizes: ["256", "512", "1024"] default_size: "256" @@ -16,76 +36,54 @@ storage_options: convert_to: "webp" banner: - origin_folder: "originals/banners" - thumb_folder: "thumbnails/banners" - sizes: ["512", "1024", "2048"] - default_size: "512" - quality: 95 - convert_to: "webp" - - default: - origin_folder: "originals" - thumb_folder: "thumbnails" - sizes: ["256", "512"] - default_size: "256" - quality: 90 - convert_to: "webp" - - -upload_options: - avatar: - kind: "image" - allowed_mimes: ["image/jpeg", "image/png", "image/webp"] - size_max_bytes: 5242880 # 5MB - multipart_threshold_mb: 15 - part_size_mb: 8 - token_ttl_seconds: 900 # 15 minutes - path_template: "raw/{shard?}/{key_base}.{ext}" - enable_sharding: true - - photo: + # Upload configuration kind: "image" allowed_mimes: ["image/jpeg", "image/png", "image/webp"] - size_max_bytes: 20971520 # 20MB + size_max_bytes: 10485760 # 10MB multipart_threshold_mb: 15 part_size_mb: 8 token_ttl_seconds: 900 - path_template: "raw/{shard?}/{key_base}.{ext}" + storage_path: "originals/banners/{shard?}/{key_base}" enable_sharding: true + + # Processing configuration + thumb_folder: "thumbnails/banners" + sizes: ["512", "1024", "2048"] + default_size: "512" + quality: 95 + convert_to: "webp" video: + # Upload configuration kind: "video" allowed_mimes: ["video/mp4", "video/quicktime", "video/webm"] size_max_bytes: 104857600 # 100MB multipart_threshold_mb: 15 part_size_mb: 8 token_ttl_seconds: 1800 # 30 minutes - path_template: "raw/{shard?}/{key_base}.{ext}" + storage_path: "originals/videos/{shard?}/{key_base}" enable_sharding: true + + # Processing configuration (future implementation) + proxy_folder: "proxies/videos" + formats: ["mp4", "webm"] + quality: 80 default: + # Upload configuration kind: "image" allowed_mimes: ["image/jpeg", "image/png"] size_max_bytes: 10485760 # 10MB multipart_threshold_mb: 15 part_size_mb: 8 token_ttl_seconds: 900 - path_template: "raw/{shard?}/{key_base}.{ext}" + storage_path: "originals/{shard?}/{key_base}" enable_sharding: true + + # Processing configuration + thumb_folder: "thumbnails" + sizes: ["256", "512"] + default_size: "256" + quality: 90 + convert_to: "webp" -videos: - product: - origin_folder: "originals/videos/products" - constraints: - max_fps: 30 - max_size_mb: 500 - proxy: - enabled: true - folder: "proxies/videos/products" - height: 240 - preview_seconds: 4 - posters: - folder: "posters/videos" - time_percent: 10 - format: "jpg" - quality: 90 \ No newline at end of file diff --git a/features/mediaflow_rolling_batch_parts.MD b/features/mediaflow_rolling_batch_parts.MD index 7217727..79bbced 100644 --- a/features/mediaflow_rolling_batch_parts.MD +++ b/features/mediaflow_rolling_batch_parts.MD @@ -78,9 +78,9 @@ New endpoint for requesting additional part batches as needed. ## 4) Configuration -### 4.1 Upload Config Extensions +### 4.1 Profile Config Extensions ```yaml -upload_options: +profiles: video: # ... existing config initial_batch_size: 10 # Parts in first response diff --git a/internal/api/image.go b/internal/api/image.go index 7127de0..e873d92 100644 --- a/internal/api/image.go +++ b/internal/api/image.go @@ -65,10 +65,14 @@ func (h *ImageAPI) HandleThumbnailTypes(w http.ResponseWriter, r *http.Request) } func (h *ImageAPI) HandleThumbnailType(w http.ResponseWriter, r *http.Request, imageData []byte, thumbType, imagePath string) { - so := h.storageConfig.GetStorageOptions(thumbType) + profile := h.storageConfig.GetProfile(thumbType) + if profile == nil { + response.JSON(fmt.Sprintf("Profile '%s' not found", thumbType)).WriteError(w, http.StatusNotFound) + return + } baseName := utils.BaseName(imagePath) if r.Method == http.MethodPost { - err := h.imageService.UploadImage(h.ctx, so, imageData, thumbType, baseName) + err := h.imageService.UploadImage(h.ctx, profile, imageData, thumbType, baseName) if err != nil { response.JSON(err.Error()).WriteError(w, http.StatusInternalServerError) return @@ -81,18 +85,18 @@ func (h *ImageAPI) HandleThumbnailType(w http.ResponseWriter, r *http.Request, i response.JSON(err.Error()).WriteError(w, http.StatusBadRequest) return } - imageData, err := h.imageService.GetImage(h.ctx, so, false, baseName, size) + imageData, err := h.imageService.GetImage(h.ctx, profile, false, baseName, size) if err != nil { response.JSON(err.Error()).WriteError(w, http.StatusInternalServerError) return } - cd := so.CacheDuration + cd := profile.CacheDuration if cd == 0 { // 24 hours cd = 86400 } - w.Header().Set("Content-Type", "image/"+so.ConvertTo) + w.Header().Set("Content-Type", "image/"+profile.ConvertTo) w.Header().Set("Cache-Control", fmt.Sprintf("public, max-age=%d", cd)) w.Header().Set("ETag", fmt.Sprintf(`"%s/%s_%s"`, thumbType, baseName, size)) w.Write(imageData) //nolint:errcheck @@ -105,15 +109,19 @@ func (h *ImageAPI) HandleOriginals(w http.ResponseWriter, r *http.Request) { fileName := parts[1] baseName := utils.BaseName(fileName) - so := h.storageConfig.GetStorageOptions(thumbType) + profile := h.storageConfig.GetProfile(thumbType) + if profile == nil { + response.JSON(fmt.Sprintf("Profile '%s' not found", thumbType)).WriteError(w, http.StatusNotFound) + return + } if r.Method == http.MethodGet { - imageData, err := h.imageService.GetImage(h.ctx, so, true, baseName, "") + imageData, err := h.imageService.GetImage(h.ctx, profile, true, baseName, "") if err != nil { response.JSON(err.Error()).WriteError(w, http.StatusInternalServerError) return } - w.Header().Set("Content-Type", "image/"+so.ConvertTo) - w.Header().Set("Cache-Control", fmt.Sprintf("public, max-age=%d", so.CacheDuration)) + w.Header().Set("Content-Type", "image/"+profile.ConvertTo) + w.Header().Set("Cache-Control", fmt.Sprintf("public, max-age=%d", profile.CacheDuration)) w.Header().Set("ETag", fmt.Sprintf(`"%s/%s"`, thumbType, baseName)) w.Write(imageData) //nolint:errcheck } diff --git a/internal/config/config.go b/internal/config/config.go index 40fb874..f557123 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -36,30 +36,35 @@ func Load() *Config { } } -type StorageOptions struct { - OriginFolder string `yaml:"origin_folder"` - ThumbFolder string `yaml:"thumb_folder"` - Sizes []string `yaml:"sizes"` - DefaultSize string `yaml:"default_size"` - Quality int `yaml:"quality"` - ConvertTo string `yaml:"convert_to"` - CacheDuration int `yaml:"cache_duration"` // in seconds +// Profile combines upload and processing configuration +type Profile struct { + // Upload configuration + Kind string `yaml:"kind"` + AllowedMimes []string `yaml:"allowed_mimes"` + SizeMaxBytes int64 `yaml:"size_max_bytes"` + MultipartThresholdMB int64 `yaml:"multipart_threshold_mb"` + PartSizeMB int64 `yaml:"part_size_mb"` + TokenTTLSeconds int64 `yaml:"token_ttl_seconds"` + StoragePath string `yaml:"storage_path"` + EnableSharding bool `yaml:"enable_sharding"` + + // Processing configuration (shared) + ThumbFolder string `yaml:"thumb_folder,omitempty"` + Quality int `yaml:"quality,omitempty"` + CacheDuration int `yaml:"cache_duration,omitempty"` // in seconds + + // Processing configuration (images) + Sizes []string `yaml:"sizes,omitempty"` + DefaultSize string `yaml:"default_size,omitempty"` + ConvertTo string `yaml:"convert_to,omitempty"` + + // Processing configuration (videos) + ProxyFolder string `yaml:"proxy_folder,omitempty"` + Formats []string `yaml:"formats,omitempty"` } type StorageConfig struct { - StorageOptions map[string]StorageOptions `yaml:"storage_options"` - UploadOptions map[string]UploadOptions `yaml:"upload_options,omitempty"` -} - -type UploadOptions struct { - Kind string `yaml:"kind"` - AllowedMimes []string `yaml:"allowed_mimes"` - SizeMaxBytes int64 `yaml:"size_max_bytes"` - MultipartThresholdMB int64 `yaml:"multipart_threshold_mb"` - PartSizeMB int64 `yaml:"part_size_mb"` - TokenTTLSeconds int64 `yaml:"token_ttl_seconds"` - PathTemplate string `yaml:"path_template"` - EnableSharding bool `yaml:"enable_sharding"` + Profiles map[string]Profile `yaml:"profiles"` } func LoadStorageConfig(s3 *s3.Client, config *Config) (*StorageConfig, error) { @@ -96,46 +101,61 @@ func LoadStorageConfig(s3 *s3.Client, config *Config) (*StorageConfig, error) { return nil, fmt.Errorf("failed to parse storage config: %w", err) } + // Validate that all profiles have required storage_path field + if err := validateStorageConfig(&storageConfig); err != nil { + return nil, err + } + return &storageConfig, nil } -func (sc *StorageConfig) GetStorageOptions(imageType string) *StorageOptions { - if options, exists := sc.StorageOptions[imageType]; exists { - return &options - } - - // Return default if type not found - if defaultOptions, exists := sc.StorageOptions["default"]; exists { - return &defaultOptions +// validateStorageConfig ensures all profiles have required fields +func validateStorageConfig(config *StorageConfig) error { + for profileName, profile := range config.Profiles { + if profile.StoragePath == "" { + return fmt.Errorf("profile '%s' is missing required 'storage_path' field", profileName) + } } - - // Fallback to hardcoded default - return DefaultStorageOptions() + return nil } -func (sc *StorageConfig) GetUploadOptions(profile string) *UploadOptions { - if options, exists := sc.UploadOptions[profile]; exists { - return &options +// GetProfile returns a profile by name +func (sc *StorageConfig) GetProfile(profileName string) *Profile { + if profile, exists := sc.Profiles[profileName]; exists { + return &profile } - // Return default if type not found - if defaultOptions, exists := sc.UploadOptions["default"]; exists { - return &defaultOptions + // Return default profile if explicitly requested and exists + if profileName == "default" { + if defaultProfile, exists := sc.Profiles["default"]; exists { + return &defaultProfile + } + // Fallback to hardcoded default + return DefaultProfile() } - // Return nil if no upload options configured + // Return nil for non-existent profiles return nil } -func DefaultStorageOptions() *StorageOptions { - return &StorageOptions{ - OriginFolder: "originals", - ThumbFolder: "thumbnails", - Sizes: []string{"256", "512", "1024"}, - Quality: 90, + +func DefaultProfile() *Profile { + return &Profile{ + Kind: "image", + AllowedMimes: []string{"image/jpeg", "image/png"}, + SizeMaxBytes: 10485760, // 10MB + MultipartThresholdMB: 15, + PartSizeMB: 8, + TokenTTLSeconds: 900, + StoragePath: "originals/{shard?}/{key_base}", + EnableSharding: true, + ThumbFolder: "thumbnails", + Sizes: []string{"256", "512", "1024"}, + Quality: 90, } } + func getEnv(key, defaultValue string) string { if value := os.Getenv(key); value != "" { return value diff --git a/internal/s3/client.go b/internal/s3/client.go index b79144a..cdb67ee 100644 --- a/internal/s3/client.go +++ b/internal/s3/client.go @@ -12,6 +12,7 @@ import ( "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/s3" + s3Types "github.com/aws/aws-sdk-go-v2/service/s3/types" ) type Client struct { @@ -140,5 +141,43 @@ func (c *Client) PresignUploadPart(ctx context.Context, key, uploadID string, pa return request.URL, nil } -// Note: AWS SDK doesn't support presigning CompleteMultipartUpload and AbortMultipartUpload -// These operations must be done server-side or using direct API calls +// CompleteMultipartUpload completes a multipart upload +func (c *Client) CompleteMultipartUpload(ctx context.Context, key, uploadID string, parts []PartInfo) error { + completedParts := make([]s3Types.CompletedPart, len(parts)) + for i, part := range parts { + completedParts[i] = s3Types.CompletedPart{ + ETag: aws.String(part.ETag), + PartNumber: aws.Int32(int32(part.PartNumber)), + } + } + + input := &s3.CompleteMultipartUploadInput{ + Bucket: aws.String(c.bucket), + Key: aws.String(key), + UploadId: aws.String(uploadID), + MultipartUpload: &s3Types.CompletedMultipartUpload{ + Parts: completedParts, + }, + } + + _, err := c.s3Client.CompleteMultipartUpload(ctx, input) + return err +} + +// AbortMultipartUpload aborts a multipart upload +func (c *Client) AbortMultipartUpload(ctx context.Context, key, uploadID string) error { + input := &s3.AbortMultipartUploadInput{ + Bucket: aws.String(c.bucket), + Key: aws.String(key), + UploadId: aws.String(uploadID), + } + + _, err := c.s3Client.AbortMultipartUpload(ctx, input) + return err +} + +// PartInfo represents a completed part for multipart upload +type PartInfo struct { + ETag string + PartNumber int +} diff --git a/internal/s3/client_test.go b/internal/s3/client_test.go new file mode 100644 index 0000000..599dbbe --- /dev/null +++ b/internal/s3/client_test.go @@ -0,0 +1,62 @@ +package s3 + +import ( + "context" + "testing" +) + +func TestPartInfo_Struct(t *testing.T) { + // Test that PartInfo struct has the expected fields + part := PartInfo{ + ETag: "test-etag", + PartNumber: 1, + } + + if part.ETag != "test-etag" { + t.Errorf("Expected ETag 'test-etag', got '%s'", part.ETag) + } + + if part.PartNumber != 1 { + t.Errorf("Expected PartNumber 1, got %d", part.PartNumber) + } +} + +func TestClient_CompleteMultipartUpload_Interface(t *testing.T) { + // Test that CompleteMultipartUpload method exists and has correct signature + // This is a compilation test to ensure the interface is correct + + // We can't easily test the actual AWS S3 calls without mocking or integration tests, + // but we can verify the method signature compiles correctly + var client *Client + if client != nil { + ctx := context.Background() + parts := []PartInfo{ + {ETag: "etag1", PartNumber: 1}, + {ETag: "etag2", PartNumber: 2}, + } + + // This should compile without errors + _ = client.CompleteMultipartUpload(ctx, "test-key", "test-upload-id", parts) + } +} + +func TestClient_AbortMultipartUpload_Interface(t *testing.T) { + // Test that AbortMultipartUpload method exists and has correct signature + // This is a compilation test to ensure the interface is correct + + var client *Client + if client != nil { + ctx := context.Background() + + // This should compile without errors + _ = client.AbortMultipartUpload(ctx, "test-key", "test-upload-id") + } +} + +// Note: Full integration tests for S3 client methods would require: +// 1. AWS credentials and real S3 bucket +// 2. Mocking the AWS SDK (complex) +// 3. Or using localstack/minio for testing +// +// The main logic testing is covered in the service layer tests +// which use the S3Client interface with mocks. \ No newline at end of file diff --git a/internal/service/image.go b/internal/service/image.go index d135a1f..0a10186 100644 --- a/internal/service/image.go +++ b/internal/service/image.go @@ -3,6 +3,7 @@ package service import ( "bytes" "context" + "crypto/sha1" "fmt" "io" "mime/multipart" @@ -41,9 +42,51 @@ func NewImageService(cfg *config.Config) *ImageService { } } -func (s *ImageService) UploadImage(ctx context.Context, so *config.StorageOptions, imageData []byte, thumbType, imagePath string) error { - orig_path := fmt.Sprintf("%s/%s", so.OriginFolder, imagePath) - convertType := so.ConvertTo +// buildStoragePath builds the storage path from template and filename +// This derives the path where files are actually stored based on the upload template +func (s *ImageService) buildStoragePath(template, filename string, enableSharding bool) string { + // Extract key_base and ext from filename + parts := strings.Split(filename, ".") + keyBase := parts[0] + ext := "" + if len(parts) > 1 { + ext = parts[len(parts)-1] + } + + // Generate shard if sharding is enabled + shard := "" + if enableSharding { + shard = s.generateShard(keyBase) + } + + // Template replacement for image retrieval + path := template + path = strings.ReplaceAll(path, "{key_base}", keyBase) + path = strings.ReplaceAll(path, "{ext}", ext) + + // Handle shard placeholders + if shard != "" { + path = strings.ReplaceAll(path, "{shard?}", shard) + path = strings.ReplaceAll(path, "{shard}", shard) + } else { + // Remove shard placeholders if no shard + path = strings.ReplaceAll(path, "/{shard?}", "") + path = strings.ReplaceAll(path, "{shard?}/", "") + path = strings.ReplaceAll(path, "{shard?}", "") + } + + return path +} + +// generateShard creates a shard from key_base using SHA1 hash (same logic as upload service) +func (s *ImageService) generateShard(keyBase string) string { + hash := sha1.Sum([]byte(keyBase)) + return fmt.Sprintf("%02x", hash[:1]) // First 2 hex characters +} + +func (s *ImageService) UploadImage(ctx context.Context, profile *config.Profile, imageData []byte, thumbType, imagePath string) error { + orig_path := s.buildStoragePath(profile.StoragePath, imagePath, profile.EnableSharding) + convertType := profile.ConvertTo // Upload original image in parallel with thumbnail generation origUploadChan := make(chan error, 1) @@ -64,11 +107,11 @@ func (s *ImageService) UploadImage(ctx context.Context, so *config.StorageOption err error } - thumbJobs := make(chan thumbnailJob, len(so.Sizes)) - uploadErrors := make(chan error, len(so.Sizes)) + thumbJobs := make(chan thumbnailJob, len(profile.Sizes)) + uploadErrors := make(chan error, len(profile.Sizes)) // Generate thumbnails in parallel - for _, sizeStr := range so.Sizes { + for _, sizeStr := range profile.Sizes { go func(size string) { sizeInt, err := strconv.Atoi(size) if err != nil { @@ -76,14 +119,14 @@ func (s *ImageService) UploadImage(ctx context.Context, so *config.StorageOption return } - thumbnailData, err := s.generateThumbnail(imageData, sizeInt, so.Quality, convertType) + thumbnailData, err := s.generateThumbnail(imageData, sizeInt, profile.Quality, convertType) if err != nil { thumbJobs <- thumbnailJob{sizeStr: size, err: fmt.Errorf("failed to generate thumbnail for size %s: %w", size, err)} return } thumbSizePath := s.createThumbnailPathForSize(imagePath, size, convertType) - thumbFullPath := fmt.Sprintf("%s/%s", so.ThumbFolder, thumbSizePath) + thumbFullPath := fmt.Sprintf("%s/%s", profile.ThumbFolder, thumbSizePath) thumbJobs <- thumbnailJob{ sizeStr: size, @@ -95,7 +138,7 @@ func (s *ImageService) UploadImage(ctx context.Context, so *config.StorageOption } // Upload thumbnails in parallel as they're generated - for i := 0; i < len(so.Sizes); i++ { + for i := 0; i < len(profile.Sizes); i++ { go func() { job := <-thumbJobs if job.err != nil { @@ -118,7 +161,7 @@ func (s *ImageService) UploadImage(ctx context.Context, so *config.StorageOption } // Wait for all thumbnail uploads - for i := 0; i < len(so.Sizes); i++ { + for i := 0; i < len(profile.Sizes); i++ { if err := <-uploadErrors; err != nil { return err } @@ -165,19 +208,19 @@ func (s *ImageService) createThumbnailPathForSize(originalPath, size, newType st } // GetImage gets the image from the S3 bucket -func (s *ImageService) GetImage(ctx context.Context, so *config.StorageOptions, original bool, baseImageName, size string) ([]byte, error) { +func (s *ImageService) GetImage(ctx context.Context, profile *config.Profile, original bool, baseImageName, size string) ([]byte, error) { var path string if original { - path = fmt.Sprintf("%s/%s", so.OriginFolder, baseImageName) + path = s.buildStoragePath(profile.StoragePath, baseImageName, profile.EnableSharding) } else { if size == "" && !original { - if so.DefaultSize == "" { + if profile.DefaultSize == "" { return nil, fmt.Errorf("please specify a size, as `default_size` is not set for this configuration") } - size = so.DefaultSize + size = profile.DefaultSize } // example -> folder/file_size.ext - path = fmt.Sprintf("%s/%s_%s.%s", so.ThumbFolder, baseImageName, size, so.ConvertTo) + path = fmt.Sprintf("%s/%s_%s.%s", profile.ThumbFolder, baseImageName, size, profile.ConvertTo) } imageData, err := s.S3Client.GetObject(ctx, path) diff --git a/internal/upload/handlers.go b/internal/upload/handlers.go index 2bbb5e1..3440728 100644 --- a/internal/upload/handlers.go +++ b/internal/upload/handlers.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "net/http" + "strings" "mediaflow/internal/config" ) @@ -63,27 +64,34 @@ func (h *Handler) HandlePresign(w http.ResponseWriter, r *http.Request) { return } - // Get upload options for the profile - uploadOptions := h.storageConfig.GetUploadOptions(req.Profile) - if uploadOptions == nil { - h.writeError(w, http.StatusBadRequest, ErrBadRequest, fmt.Sprintf("No upload configuration for profile: %s", req.Profile), "Configure upload_options in your storage config") + // Get profile configuration + profile := h.storageConfig.GetProfile(req.Profile) + if profile == nil { + h.writeError(w, http.StatusBadRequest, ErrBadRequest, fmt.Sprintf("No configuration for profile: %s", req.Profile), "Configure profile in your storage config") return } - // Validate kind matches upload options - if uploadOptions.Kind != req.Kind { - h.writeError(w, http.StatusBadRequest, ErrBadRequest, fmt.Sprintf("Kind mismatch: expected %s, got %s", uploadOptions.Kind, req.Kind), "") + // Validate kind matches profile + if profile.Kind != req.Kind { + h.writeError(w, http.StatusBadRequest, ErrBadRequest, fmt.Sprintf("Kind mismatch: expected %s, got %s", profile.Kind, req.Kind), "") return } + // Construct base URL from request + scheme := "http" + if r.TLS != nil { + scheme = "https" + } + baseURL := fmt.Sprintf("%s://%s", scheme, r.Host) + // Generate presigned upload - presignResp, err := h.uploadService.PresignUpload(h.ctx, &req, uploadOptions) + presignResp, err := h.uploadService.PresignUpload(h.ctx, &req, profile, baseURL) if err != nil { if err.Error() == fmt.Sprintf("mime type not allowed: %s", req.Mime) { h.writeError(w, http.StatusBadRequest, ErrMimeNotAllowed, err.Error(), "Check allowed_mimes in upload configuration") return } - if err.Error() == fmt.Sprintf("file size exceeds maximum: %d > %d", req.SizeBytes, uploadOptions.SizeMaxBytes) { + if err.Error() == fmt.Sprintf("file size exceeds maximum: %d > %d", req.SizeBytes, profile.SizeMaxBytes) { h.writeError(w, http.StatusBadRequest, ErrSizeTooLarge, err.Error(), "Reduce file size or check size_max_bytes in configuration") return } @@ -99,8 +107,84 @@ func (h *Handler) HandlePresign(w http.ResponseWriter, r *http.Request) { _ = json.NewEncoder(w).Encode(presignResp) } -// Note: Part presigning is now handled via batch presigning in the main endpoint -// No separate part endpoint needed for stateless design +// HandleCompleteMultipart handles POST /v1/uploads/{object_key}/complete/{upload_id} +func (h *Handler) HandleCompleteMultipart(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + h.writeError(w, http.StatusMethodNotAllowed, ErrBadRequest, "Method not allowed", "") + return + } + + // Extract object_key and upload_id from URL path + path := strings.TrimPrefix(r.URL.Path, "/v1/uploads/") + parts := strings.Split(path, "/complete/") + if len(parts) != 2 { + h.writeError(w, http.StatusBadRequest, ErrBadRequest, "Invalid URL format", "Expected /v1/uploads/{object_key}/complete/{upload_id}") + return + } + + objectKey := parts[0] + uploadID := parts[1] + + // Parse request body + var req CompleteMultipartRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + h.writeError(w, http.StatusBadRequest, ErrBadRequest, "Invalid request body", "") + return + } + + // Validate required fields + if len(req.Parts) == 0 { + h.writeError(w, http.StatusBadRequest, ErrBadRequest, "parts is required and cannot be empty", "") + return + } + + // Complete the multipart upload + err := h.uploadService.CompleteMultipartUpload(h.ctx, objectKey, uploadID, &req) + if err != nil { + fmt.Printf("Complete multipart error: %v\n", err) + h.writeError(w, http.StatusInternalServerError, ErrBadRequest, fmt.Sprintf("Failed to complete multipart upload: %v", err), "") + return + } + + // Return success response + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + response := map[string]string{"status": "completed", "object_key": objectKey} + _ = json.NewEncoder(w).Encode(response) +} + +// HandleAbortMultipart handles DELETE /v1/uploads/{object_key}/abort/{upload_id} +func (h *Handler) HandleAbortMultipart(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodDelete { + h.writeError(w, http.StatusMethodNotAllowed, ErrBadRequest, "Method not allowed", "") + return + } + + // Extract object_key and upload_id from URL path + path := strings.TrimPrefix(r.URL.Path, "/v1/uploads/") + parts := strings.Split(path, "/abort/") + if len(parts) != 2 { + h.writeError(w, http.StatusBadRequest, ErrBadRequest, "Invalid URL format", "Expected /v1/uploads/{object_key}/abort/{upload_id}") + return + } + + objectKey := parts[0] + uploadID := parts[1] + + // Abort the multipart upload + err := h.uploadService.AbortMultipartUpload(h.ctx, objectKey, uploadID) + if err != nil { + fmt.Printf("Abort multipart error: %v\n", err) + h.writeError(w, http.StatusInternalServerError, ErrBadRequest, fmt.Sprintf("Failed to abort multipart upload: %v", err), "") + return + } + + // Return success response + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + response := map[string]string{"status": "aborted", "upload_id": uploadID} + _ = json.NewEncoder(w).Encode(response) +} // writeError writes a standardized error response func (h *Handler) writeError(w http.ResponseWriter, statusCode int, code, message, hint string) { diff --git a/internal/upload/handlers_test.go b/internal/upload/handlers_test.go index 75b7276..0bb416a 100644 --- a/internal/upload/handlers_test.go +++ b/internal/upload/handlers_test.go @@ -60,21 +60,28 @@ func (h *TestHandler) HandlePresign(w http.ResponseWriter, r *http.Request) { return } - // Get upload options for the profile - uploadOptions := h.storageConfig.GetUploadOptions(req.Profile) - if uploadOptions == nil { - h.writeError(w, http.StatusBadRequest, ErrBadRequest, fmt.Sprintf("No upload configuration for profile: %s", req.Profile), "Configure upload_options in your storage config") + // Get profile configuration + profile := h.storageConfig.GetProfile(req.Profile) + if profile == nil { + h.writeError(w, http.StatusBadRequest, ErrBadRequest, fmt.Sprintf("No configuration for profile: %s", req.Profile), "Configure profile in your storage config") return } - // Validate kind matches upload options - if uploadOptions.Kind != req.Kind { - h.writeError(w, http.StatusBadRequest, ErrBadRequest, fmt.Sprintf("Kind mismatch: expected %s, got %s", uploadOptions.Kind, req.Kind), "") + // Validate kind matches profile + if profile.Kind != req.Kind { + h.writeError(w, http.StatusBadRequest, ErrBadRequest, fmt.Sprintf("Kind mismatch: expected %s, got %s", profile.Kind, req.Kind), "") return } + // Construct base URL from request + scheme := "http" + if r.TLS != nil { + scheme = "https" + } + baseURL := fmt.Sprintf("%s://%s", scheme, r.Host) + // Generate presigned upload - presignResp, err := h.uploadService.PresignUpload(h.ctx, &req, uploadOptions) + presignResp, err := h.uploadService.PresignUpload(h.ctx, &req, profile, baseURL) if err != nil { errStr := err.Error() if strings.Contains(errStr, "mime type not allowed:") { @@ -95,6 +102,81 @@ func (h *TestHandler) HandlePresign(w http.ResponseWriter, r *http.Request) { _ = json.NewEncoder(w).Encode(presignResp) } +func (h *TestHandler) HandleCompleteMultipart(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + h.writeError(w, http.StatusMethodNotAllowed, ErrBadRequest, "Method not allowed", "") + return + } + + // Extract object_key and upload_id from URL path + path := strings.TrimPrefix(r.URL.Path, "/v1/uploads/") + parts := strings.Split(path, "/complete/") + if len(parts) != 2 { + h.writeError(w, http.StatusBadRequest, ErrBadRequest, "Invalid URL format", "Expected /v1/uploads/{object_key}/complete/{upload_id}") + return + } + + objectKey := parts[0] + uploadID := parts[1] + + // Parse request body + var req CompleteMultipartRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + h.writeError(w, http.StatusBadRequest, ErrBadRequest, "Invalid request body", "") + return + } + + // Validate required fields + if len(req.Parts) == 0 { + h.writeError(w, http.StatusBadRequest, ErrBadRequest, "parts is required and cannot be empty", "") + return + } + + // Complete the multipart upload + err := h.uploadService.CompleteMultipartUpload(h.ctx, objectKey, uploadID, &req) + if err != nil { + h.writeError(w, http.StatusInternalServerError, ErrBadRequest, fmt.Sprintf("Failed to complete multipart upload: %v", err), "") + return + } + + // Return success response + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + response := map[string]string{"status": "completed", "object_key": objectKey} + _ = json.NewEncoder(w).Encode(response) +} + +func (h *TestHandler) HandleAbortMultipart(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodDelete { + h.writeError(w, http.StatusMethodNotAllowed, ErrBadRequest, "Method not allowed", "") + return + } + + // Extract object_key and upload_id from URL path + path := strings.TrimPrefix(r.URL.Path, "/v1/uploads/") + parts := strings.Split(path, "/abort/") + if len(parts) != 2 { + h.writeError(w, http.StatusBadRequest, ErrBadRequest, "Invalid URL format", "Expected /v1/uploads/{object_key}/abort/{upload_id}") + return + } + + objectKey := parts[0] + uploadID := parts[1] + + // Abort the multipart upload + err := h.uploadService.AbortMultipartUpload(h.ctx, objectKey, uploadID) + if err != nil { + h.writeError(w, http.StatusInternalServerError, ErrBadRequest, fmt.Sprintf("Failed to abort multipart upload: %v", err), "") + return + } + + // Return success response + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + response := map[string]string{"status": "aborted", "upload_id": uploadID} + _ = json.NewEncoder(w).Encode(response) +} + func (h *TestHandler) writeError(w http.ResponseWriter, statusCode int, code, message, hint string) { errorResp := ErrorResponse{ Code: code, @@ -109,26 +191,30 @@ func (h *TestHandler) writeError(w http.ResponseWriter, statusCode int, code, me // UploadService interface for dependency injection type UploadService interface { - PresignUpload(ctx context.Context, req *PresignRequest, uploadOptions *config.UploadOptions) (*PresignResponse, error) + PresignUpload(ctx context.Context, req *PresignRequest, profile *config.Profile, baseURL string) (*PresignResponse, error) + CompleteMultipartUpload(ctx context.Context, objectKey, uploadID string, req *CompleteMultipartRequest) error + AbortMultipartUpload(ctx context.Context, objectKey, uploadID string) error } // MockUploadService implements the upload service interface for testing type MockUploadService struct { - presignUploadFunc func(ctx context.Context, req *PresignRequest, uploadOptions *config.UploadOptions) (*PresignResponse, error) + presignUploadFunc func(ctx context.Context, req *PresignRequest, profile *config.Profile, baseURL string) (*PresignResponse, error) + completeMultipartUploadFunc func(ctx context.Context, objectKey, uploadID string, req *CompleteMultipartRequest) error + abortMultipartUploadFunc func(ctx context.Context, objectKey, uploadID string) error } -func (m *MockUploadService) PresignUpload(ctx context.Context, req *PresignRequest, uploadOptions *config.UploadOptions) (*PresignResponse, error) { +func (m *MockUploadService) PresignUpload(ctx context.Context, req *PresignRequest, profile *config.Profile, baseURL string) (*PresignResponse, error) { if m.presignUploadFunc != nil { - return m.presignUploadFunc(ctx, req, uploadOptions) + return m.presignUploadFunc(ctx, req, profile, baseURL) } // Default mock response return &PresignResponse{ - ObjectKey: "raw/ab/test-key.jpg", + ObjectKey: "originals/ab/test-key.jpg", Upload: &UploadDetails{ Single: &SingleUpload{ Method: "PUT", - URL: "https://test.s3.amazonaws.com/bucket/raw/ab/test-key.jpg", + URL: "https://test.s3.amazonaws.com/bucket/originals/ab/test-key.jpg", Headers: map[string]string{"Content-Type": "image/jpeg"}, ExpiresAt: time.Now().Add(15 * time.Minute), }, @@ -136,11 +222,25 @@ func (m *MockUploadService) PresignUpload(ctx context.Context, req *PresignReque }, nil } +func (m *MockUploadService) CompleteMultipartUpload(ctx context.Context, objectKey, uploadID string, req *CompleteMultipartRequest) error { + if m.completeMultipartUploadFunc != nil { + return m.completeMultipartUploadFunc(ctx, objectKey, uploadID, req) + } + return nil +} + +func (m *MockUploadService) AbortMultipartUpload(ctx context.Context, objectKey, uploadID string) error { + if m.abortMultipartUploadFunc != nil { + return m.abortMultipartUploadFunc(ctx, objectKey, uploadID) + } + return nil +} + func TestHandler_HandlePresign_Success(t *testing.T) { // Setup mockService := &MockUploadService{} storageConfig := &config.StorageConfig{ - UploadOptions: map[string]config.UploadOptions{ + Profiles: map[string]config.Profile{ "avatar": { Kind: "image", AllowedMimes: []string{"image/jpeg", "image/png"}, @@ -148,7 +248,7 @@ func TestHandler_HandlePresign_Success(t *testing.T) { MultipartThresholdMB: 15, PartSizeMB: 8, TokenTTLSeconds: 900, - PathTemplate: "raw/{shard?}/{key_base}.{ext}", + StoragePath: "originals/{shard?}/{key_base}.{ext}", EnableSharding: true, }, }, @@ -201,7 +301,7 @@ func TestHandler_HandlePresign_Success(t *testing.T) { func TestHandler_HandlePresign_ValidationErrors(t *testing.T) { // Setup storageConfig := &config.StorageConfig{ - UploadOptions: map[string]config.UploadOptions{ + Profiles: map[string]config.Profile{ "avatar": { Kind: "image", AllowedMimes: []string{"image/jpeg", "image/png"}, @@ -209,7 +309,7 @@ func TestHandler_HandlePresign_ValidationErrors(t *testing.T) { MultipartThresholdMB: 15, PartSizeMB: 8, TokenTTLSeconds: 900, - PathTemplate: "raw/{shard?}/{key_base}.{ext}", + StoragePath: "originals/{shard?}/{key_base}.{ext}", EnableSharding: true, }, }, @@ -378,7 +478,7 @@ func TestHandler_HandlePresign_ValidationErrors(t *testing.T) { func TestHandler_HandlePresign_ServiceErrors(t *testing.T) { // Setup storageConfig := &config.StorageConfig{ - UploadOptions: map[string]config.UploadOptions{ + Profiles: map[string]config.Profile{ "avatar": { Kind: "image", AllowedMimes: []string{"image/jpeg", "image/png"}, @@ -386,7 +486,7 @@ func TestHandler_HandlePresign_ServiceErrors(t *testing.T) { MultipartThresholdMB: 15, PartSizeMB: 8, TokenTTLSeconds: 900, - PathTemplate: "raw/{shard?}/{key_base}.{ext}", + StoragePath: "originals/{shard?}/{key_base}.{ext}", EnableSharding: true, }, }, @@ -421,7 +521,7 @@ func TestHandler_HandlePresign_ServiceErrors(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { mockService := &MockUploadService{ - presignUploadFunc: func(ctx context.Context, req *PresignRequest, uploadOptions *config.UploadOptions) (*PresignResponse, error) { + presignUploadFunc: func(ctx context.Context, req *PresignRequest, profile *config.Profile, baseURL string) (*PresignResponse, error) { return nil, tt.serviceError }, } @@ -467,7 +567,7 @@ func TestHandler_HandlePresign_ServiceErrors(t *testing.T) { func TestHandler_HandlePresign_InvalidJSON(t *testing.T) { storageConfig := &config.StorageConfig{ - UploadOptions: map[string]config.UploadOptions{ + Profiles: map[string]config.Profile{ "avatar": { Kind: "image", }, @@ -535,3 +635,329 @@ func TestHandler_writeError(t *testing.T) { t.Errorf("Expected hint 'Test hint', got '%s'", errorResp.Hint) } } + +func TestHandler_HandleCompleteMultipart_Success(t *testing.T) { + called := false + var capturedObjectKey, capturedUploadID string + var capturedRequest *CompleteMultipartRequest + + mockService := &MockUploadService{ + completeMultipartUploadFunc: func(ctx context.Context, objectKey, uploadID string, req *CompleteMultipartRequest) error { + called = true + capturedObjectKey = objectKey + capturedUploadID = uploadID + capturedRequest = req + return nil + }, + } + + handler := &TestHandler{ + uploadService: mockService, + ctx: context.Background(), + } + + requestBody := CompleteMultipartRequest{ + Parts: []CompletedPart{ + {PartNumber: 1, ETag: "etag1"}, + {PartNumber: 2, ETag: "etag2"}, + }, + } + + body, _ := json.Marshal(requestBody) + req := httptest.NewRequest("POST", "/v1/uploads/raw/test-object-key.jpg/complete/test-upload-id", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + + rr := httptest.NewRecorder() + handler.HandleCompleteMultipart(rr, req) + + if rr.Code != http.StatusOK { + t.Errorf("Expected status %d, got %d. Body: %s", http.StatusOK, rr.Code, rr.Body.String()) + } + + if !called { + t.Errorf("Expected service method to be called") + } + + if capturedObjectKey != "raw/test-object-key.jpg" { + t.Errorf("Expected object key 'raw/test-object-key.jpg', got '%s'", capturedObjectKey) + } + + if capturedUploadID != "test-upload-id" { + t.Errorf("Expected upload ID 'test-upload-id', got '%s'", capturedUploadID) + } + + if len(capturedRequest.Parts) != 2 { + t.Errorf("Expected 2 parts, got %d", len(capturedRequest.Parts)) + } + + var response map[string]string + if err := json.Unmarshal(rr.Body.Bytes(), &response); err != nil { + t.Errorf("Failed to parse response: %v", err) + } + + if response["status"] != "completed" { + t.Errorf("Expected status 'completed', got '%s'", response["status"]) + } + + if response["object_key"] != "raw/test-object-key.jpg" { + t.Errorf("Expected object_key 'raw/test-object-key.jpg', got '%s'", response["object_key"]) + } +} + +func TestHandler_HandleCompleteMultipart_ValidationErrors(t *testing.T) { + handler := &TestHandler{ + uploadService: &MockUploadService{}, + ctx: context.Background(), + } + + tests := []struct { + name string + method string + url string + requestBody interface{} + expectedStatus int + expectedCode string + }{ + { + name: "Invalid method", + method: "GET", + url: "/v1/uploads/raw/test-object-key.jpg/complete/test-upload-id", + requestBody: map[string]interface{}{}, + expectedStatus: http.StatusMethodNotAllowed, + expectedCode: ErrBadRequest, + }, + { + name: "Invalid URL format", + method: "POST", + url: "/v1/uploads/invalid-url", + requestBody: map[string]interface{}{}, + expectedStatus: http.StatusBadRequest, + expectedCode: ErrBadRequest, + }, + { + name: "Empty parts", + method: "POST", + url: "/v1/uploads/raw/test-object-key.jpg/complete/test-upload-id", + requestBody: map[string]interface{}{ + "parts": []interface{}{}, + }, + expectedStatus: http.StatusBadRequest, + expectedCode: ErrBadRequest, + }, + { + name: "Invalid JSON", + method: "POST", + url: "/v1/uploads/raw/test-object-key.jpg/complete/test-upload-id", + requestBody: "invalid json", + expectedStatus: http.StatusBadRequest, + expectedCode: ErrBadRequest, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var body []byte + if tt.requestBody != nil { + if s, ok := tt.requestBody.(string); ok && s == "invalid json" { + body = []byte(s) + } else { + body, _ = json.Marshal(tt.requestBody) + } + } + + req := httptest.NewRequest(tt.method, tt.url, bytes.NewReader(body)) + if tt.method == "POST" { + req.Header.Set("Content-Type", "application/json") + } + + rr := httptest.NewRecorder() + handler.HandleCompleteMultipart(rr, req) + + if rr.Code != tt.expectedStatus { + t.Errorf("Expected status %d, got %d. Body: %s", tt.expectedStatus, rr.Code, rr.Body.String()) + } + + var errorResp ErrorResponse + if err := json.Unmarshal(rr.Body.Bytes(), &errorResp); err != nil { + t.Errorf("Failed to parse error response: %v", err) + } + + if errorResp.Code != tt.expectedCode { + t.Errorf("Expected error code '%s', got '%s'", tt.expectedCode, errorResp.Code) + } + }) + } +} + +func TestHandler_HandleAbortMultipart_Success(t *testing.T) { + called := false + var capturedObjectKey, capturedUploadID string + + mockService := &MockUploadService{ + abortMultipartUploadFunc: func(ctx context.Context, objectKey, uploadID string) error { + called = true + capturedObjectKey = objectKey + capturedUploadID = uploadID + return nil + }, + } + + handler := &TestHandler{ + uploadService: mockService, + ctx: context.Background(), + } + + req := httptest.NewRequest("DELETE", "/v1/uploads/raw/test-object-key.jpg/abort/test-upload-id", nil) + + rr := httptest.NewRecorder() + handler.HandleAbortMultipart(rr, req) + + if rr.Code != http.StatusOK { + t.Errorf("Expected status %d, got %d. Body: %s", http.StatusOK, rr.Code, rr.Body.String()) + } + + if !called { + t.Errorf("Expected service method to be called") + } + + if capturedObjectKey != "raw/test-object-key.jpg" { + t.Errorf("Expected object key 'raw/test-object-key.jpg', got '%s'", capturedObjectKey) + } + + if capturedUploadID != "test-upload-id" { + t.Errorf("Expected upload ID 'test-upload-id', got '%s'", capturedUploadID) + } + + var response map[string]string + if err := json.Unmarshal(rr.Body.Bytes(), &response); err != nil { + t.Errorf("Failed to parse response: %v", err) + } + + if response["status"] != "aborted" { + t.Errorf("Expected status 'aborted', got '%s'", response["status"]) + } + + if response["upload_id"] != "test-upload-id" { + t.Errorf("Expected upload_id 'test-upload-id', got '%s'", response["upload_id"]) + } +} + +func TestHandler_HandleAbortMultipart_ValidationErrors(t *testing.T) { + handler := &TestHandler{ + uploadService: &MockUploadService{}, + ctx: context.Background(), + } + + tests := []struct { + name string + method string + url string + expectedStatus int + expectedCode string + }{ + { + name: "Invalid method", + method: "POST", + url: "/v1/uploads/raw/test-object-key.jpg/abort/test-upload-id", + expectedStatus: http.StatusMethodNotAllowed, + expectedCode: ErrBadRequest, + }, + { + name: "Invalid URL format", + method: "DELETE", + url: "/v1/uploads/invalid-url", + expectedStatus: http.StatusBadRequest, + expectedCode: ErrBadRequest, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + req := httptest.NewRequest(tt.method, tt.url, nil) + + rr := httptest.NewRecorder() + handler.HandleAbortMultipart(rr, req) + + if rr.Code != tt.expectedStatus { + t.Errorf("Expected status %d, got %d. Body: %s", tt.expectedStatus, rr.Code, rr.Body.String()) + } + + var errorResp ErrorResponse + if err := json.Unmarshal(rr.Body.Bytes(), &errorResp); err != nil { + t.Errorf("Failed to parse error response: %v", err) + } + + if errorResp.Code != tt.expectedCode { + t.Errorf("Expected error code '%s', got '%s'", tt.expectedCode, errorResp.Code) + } + }) + } +} + +func TestHandler_HandleCompleteMultipart_ServiceError(t *testing.T) { + mockService := &MockUploadService{ + completeMultipartUploadFunc: func(ctx context.Context, objectKey, uploadID string, req *CompleteMultipartRequest) error { + return fmt.Errorf("service error") + }, + } + + handler := &TestHandler{ + uploadService: mockService, + ctx: context.Background(), + } + + requestBody := CompleteMultipartRequest{ + Parts: []CompletedPart{{PartNumber: 1, ETag: "etag1"}}, + } + + body, _ := json.Marshal(requestBody) + req := httptest.NewRequest("POST", "/v1/uploads/raw/test-object-key.jpg/complete/test-upload-id", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + + rr := httptest.NewRecorder() + handler.HandleCompleteMultipart(rr, req) + + if rr.Code != http.StatusInternalServerError { + t.Errorf("Expected status %d, got %d", http.StatusInternalServerError, rr.Code) + } + + var errorResp ErrorResponse + if err := json.Unmarshal(rr.Body.Bytes(), &errorResp); err != nil { + t.Errorf("Failed to parse error response: %v", err) + } + + if errorResp.Code != ErrBadRequest { + t.Errorf("Expected error code '%s', got '%s'", ErrBadRequest, errorResp.Code) + } +} + +func TestHandler_HandleAbortMultipart_ServiceError(t *testing.T) { + mockService := &MockUploadService{ + abortMultipartUploadFunc: func(ctx context.Context, objectKey, uploadID string) error { + return fmt.Errorf("service error") + }, + } + + handler := &TestHandler{ + uploadService: mockService, + ctx: context.Background(), + } + + req := httptest.NewRequest("DELETE", "/v1/uploads/raw/test-object-key.jpg/abort/test-upload-id", nil) + + rr := httptest.NewRecorder() + handler.HandleAbortMultipart(rr, req) + + if rr.Code != http.StatusInternalServerError { + t.Errorf("Expected status %d, got %d", http.StatusInternalServerError, rr.Code) + } + + var errorResp ErrorResponse + if err := json.Unmarshal(rr.Body.Bytes(), &errorResp); err != nil { + t.Errorf("Failed to parse error response: %v", err) + } + + if errorResp.Code != ErrBadRequest { + t.Errorf("Expected error code '%s', got '%s'", ErrBadRequest, errorResp.Code) + } +} diff --git a/internal/upload/integration_test.go b/internal/upload/integration_test.go index 039b09d..52eb213 100644 --- a/internal/upload/integration_test.go +++ b/internal/upload/integration_test.go @@ -12,6 +12,7 @@ import ( "mediaflow/internal/auth" "mediaflow/internal/config" + "mediaflow/internal/s3" ) // Integration tests that test the complete upload flow with authentication @@ -22,7 +23,7 @@ func TestUploadIntegration_WithAuth(t *testing.T) { } storageConfig := &config.StorageConfig{ - UploadOptions: map[string]config.UploadOptions{ + Profiles: map[string]config.Profile{ "avatar": { Kind: "image", AllowedMimes: []string{"image/jpeg", "image/png"}, @@ -30,7 +31,7 @@ func TestUploadIntegration_WithAuth(t *testing.T) { MultipartThresholdMB: 15, PartSizeMB: 8, TokenTTLSeconds: 900, - PathTemplate: "raw/{shard?}/{key_base}.{ext}", + StoragePath: "originals/{shard?}/{key_base}.{ext}", EnableSharding: true, }, }, @@ -38,13 +39,13 @@ func TestUploadIntegration_WithAuth(t *testing.T) { // Create a mock upload service mockService := &MockUploadService{ - presignUploadFunc: func(ctx context.Context, req *PresignRequest, uploadOptions *config.UploadOptions) (*PresignResponse, error) { + presignUploadFunc: func(ctx context.Context, req *PresignRequest, profile *config.Profile, baseURL string) (*PresignResponse, error) { return &PresignResponse{ - ObjectKey: "raw/ab/test-key.jpg", + ObjectKey: "originals/ab/test-key.jpg", Upload: &UploadDetails{ Single: &SingleUpload{ Method: "PUT", - URL: "https://test.s3.amazonaws.com/bucket/raw/ab/test-key.jpg", + URL: "https://test.s3.amazonaws.com/bucket/originals/ab/test-key.jpg", Headers: map[string]string{"Content-Type": "image/jpeg"}, ExpiresAt: time.Now().Add(15 * time.Minute), }, @@ -191,7 +192,7 @@ func TestUploadIntegration_ValidationFlow(t *testing.T) { } storageConfig := &config.StorageConfig{ - UploadOptions: map[string]config.UploadOptions{ + Profiles: map[string]config.Profile{ "avatar": { Kind: "image", AllowedMimes: []string{"image/jpeg", "image/png"}, @@ -199,7 +200,7 @@ func TestUploadIntegration_ValidationFlow(t *testing.T) { MultipartThresholdMB: 15, PartSizeMB: 8, TokenTTLSeconds: 900, - PathTemplate: "raw/{shard?}/{key_base}.{ext}", + StoragePath: "originals/{shard?}/{key_base}.{ext}", EnableSharding: true, }, }, @@ -318,8 +319,8 @@ func TestUploadIntegration_ValidationFlow(t *testing.T) { } // Verify object key follows template pattern - if !strings.Contains(response.ObjectKey, "raw/") { - t.Errorf("Expected object key to contain 'raw/', got: %s", response.ObjectKey) + if !strings.Contains(response.ObjectKey, "originals/") { + t.Errorf("Expected object key to contain 'originals/', got: %s", response.ObjectKey) } if !strings.Contains(response.ObjectKey, ".jpg") { @@ -346,7 +347,7 @@ func TestUploadIntegration_MultipartStrategy(t *testing.T) { } storageConfig := &config.StorageConfig{ - UploadOptions: map[string]config.UploadOptions{ + Profiles: map[string]config.Profile{ "video": { Kind: "video", AllowedMimes: []string{"video/mp4"}, @@ -354,7 +355,7 @@ func TestUploadIntegration_MultipartStrategy(t *testing.T) { MultipartThresholdMB: 15, // 15MB threshold PartSizeMB: 8, // 8MB parts TokenTTLSeconds: 900, - PathTemplate: "raw/{shard?}/{key_base}.{ext}", + StoragePath: "originals/{shard?}/{key_base}.{ext}", EnableSharding: true, }, }, @@ -441,4 +442,240 @@ func TestUploadIntegration_MultipartStrategy(t *testing.T) { t.Errorf("Expected non-empty URL for part %d", part.PartNumber) } } + + // Verify complete and abort URLs are present + if response.Upload.Multipart.Complete == nil { + t.Errorf("Expected complete URL to be populated") + } else { + if response.Upload.Multipart.Complete.Method != "POST" { + t.Errorf("Expected complete method to be POST, got %s", response.Upload.Multipart.Complete.Method) + } + if !strings.Contains(response.Upload.Multipart.Complete.URL, "/complete/") { + t.Errorf("Complete URL should contain '/complete/', got: %s", response.Upload.Multipart.Complete.URL) + } + } + + if response.Upload.Multipart.Abort == nil { + t.Errorf("Expected abort URL to be populated") + } else { + if response.Upload.Multipart.Abort.Method != "DELETE" { + t.Errorf("Expected abort method to be DELETE, got %s", response.Upload.Multipart.Abort.Method) + } + if !strings.Contains(response.Upload.Multipart.Abort.URL, "/abort/") { + t.Errorf("Abort URL should contain '/abort/', got: %s", response.Upload.Multipart.Abort.URL) + } + } +} + +func TestUploadIntegration_CompleteMultipartFlow(t *testing.T) { + // Setup configuration + cfg := &config.Config{ + APIKey: "test-api-key", + } + + storageConfig := &config.StorageConfig{ + Profiles: map[string]config.Profile{ + "video": { + Kind: "video", + AllowedMimes: []string{"video/mp4"}, + SizeMaxBytes: 100 * 1024 * 1024, + MultipartThresholdMB: 15, + PartSizeMB: 8, + TokenTTLSeconds: 900, + StoragePath: "originals/{key_base}.{ext}", + EnableSharding: false, + }, + }, + } + + // Create mock S3 client + mockS3 := &MockS3Client{ + createMultipartUploadFunc: func(ctx context.Context, key string, headers map[string]string) (string, error) { + return "test-upload-id", nil + }, + presignUploadPartFunc: func(ctx context.Context, key, uploadID string, partNumber int32, expires time.Duration) (string, error) { + return "https://test.s3.amazonaws.com/bucket/" + key + "?partNumber=" + string(rune(partNumber+'0')), nil + }, + completeMultipartUploadFunc: func(ctx context.Context, key, uploadID string, parts []s3.PartInfo) error { + // Verify the expected parameters + if key != "originals/test-video.mp4" { + t.Errorf("Expected key 'originals/test-video.mp4', got '%s'", key) + } + if uploadID != "test-upload-id" { + t.Errorf("Expected upload ID 'test-upload-id', got '%s'", uploadID) + } + if len(parts) != 2 { + t.Errorf("Expected 2 parts, got %d", len(parts)) + } + return nil + }, + } + + realService := NewService(mockS3, &config.Config{S3Bucket: "test-bucket"}) + + handler := &Handler{ + uploadService: realService, + storageConfig: storageConfig, + ctx: context.Background(), + } + + // Wrap with auth middleware + authConfig := &auth.Config{APIKey: cfg.APIKey} + middleware := auth.APIKeyMiddleware(authConfig) + + // Test complete multipart upload + requestBody := CompleteMultipartRequest{ + Parts: []CompletedPart{ + {PartNumber: 1, ETag: "etag1"}, + {PartNumber: 2, ETag: "etag2"}, + }, + } + + body, _ := json.Marshal(requestBody) + req := httptest.NewRequest("POST", "/v1/uploads/originals/test-video.mp4/complete/test-upload-id", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer test-api-key") + + authenticatedHandler := middleware(http.HandlerFunc(handler.HandleCompleteMultipart)) + rr := httptest.NewRecorder() + authenticatedHandler.ServeHTTP(rr, req) + + if rr.Code != http.StatusOK { + t.Errorf("Expected status %d, got %d. Body: %s", http.StatusOK, rr.Code, rr.Body.String()) + } + + var response map[string]string + if err := json.Unmarshal(rr.Body.Bytes(), &response); err != nil { + t.Errorf("Failed to parse response: %v", err) + } + + if response["status"] != "completed" { + t.Errorf("Expected status 'completed', got '%s'", response["status"]) + } + + if response["object_key"] != "originals/test-video.mp4" { + t.Errorf("Expected object_key 'originals/test-video.mp4', got '%s'", response["object_key"]) + } +} + +func TestUploadIntegration_AbortMultipartFlow(t *testing.T) { + // Setup configuration + cfg := &config.Config{ + APIKey: "test-api-key", + } + + // Create mock S3 client + mockS3 := &MockS3Client{ + abortMultipartUploadFunc: func(ctx context.Context, key, uploadID string) error { + // Verify the expected parameters + if key != "originals/test-video.mp4" { + t.Errorf("Expected key 'originals/test-video.mp4', got '%s'", key) + } + if uploadID != "test-upload-id" { + t.Errorf("Expected upload ID 'test-upload-id', got '%s'", uploadID) + } + return nil + }, + } + + realService := NewService(mockS3, &config.Config{S3Bucket: "test-bucket"}) + + handler := &Handler{ + uploadService: realService, + storageConfig: &config.StorageConfig{}, + ctx: context.Background(), + } + + // Wrap with auth middleware + authConfig := &auth.Config{APIKey: cfg.APIKey} + middleware := auth.APIKeyMiddleware(authConfig) + + req := httptest.NewRequest("DELETE", "/v1/uploads/originals/test-video.mp4/abort/test-upload-id", nil) + req.Header.Set("Authorization", "Bearer test-api-key") + + authenticatedHandler := middleware(http.HandlerFunc(handler.HandleAbortMultipart)) + rr := httptest.NewRecorder() + authenticatedHandler.ServeHTTP(rr, req) + + if rr.Code != http.StatusOK { + t.Errorf("Expected status %d, got %d. Body: %s", http.StatusOK, rr.Code, rr.Body.String()) + } + + var response map[string]string + if err := json.Unmarshal(rr.Body.Bytes(), &response); err != nil { + t.Errorf("Failed to parse response: %v", err) + } + + if response["status"] != "aborted" { + t.Errorf("Expected status 'aborted', got '%s'", response["status"]) + } + + if response["upload_id"] != "test-upload-id" { + t.Errorf("Expected upload_id 'test-upload-id', got '%s'", response["upload_id"]) + } +} + +func TestUploadIntegration_CompleteMultipartAuth(t *testing.T) { + // Test authentication for complete endpoint + cfg := &config.Config{ + APIKey: "test-api-key", + } + + mockS3 := &MockS3Client{} + realService := NewService(mockS3, &config.Config{S3Bucket: "test-bucket"}) + + handler := &Handler{ + uploadService: realService, + storageConfig: &config.StorageConfig{}, + ctx: context.Background(), + } + + authConfig := &auth.Config{APIKey: cfg.APIKey} + middleware := auth.APIKeyMiddleware(authConfig) + + requestBody := CompleteMultipartRequest{ + Parts: []CompletedPart{{PartNumber: 1, ETag: "etag1"}}, + } + + tests := []struct { + name string + authHeader string + expectedStatus int + }{ + { + name: "Valid auth", + authHeader: "Bearer test-api-key", + expectedStatus: http.StatusOK, + }, + { + name: "Invalid auth", + authHeader: "Bearer wrong-key", + expectedStatus: http.StatusUnauthorized, + }, + { + name: "No auth", + authHeader: "", + expectedStatus: http.StatusUnauthorized, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + body, _ := json.Marshal(requestBody) + req := httptest.NewRequest("POST", "/v1/uploads/originals/test-video.mp4/complete/test-upload-id", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + + if tt.authHeader != "" { + req.Header.Set("Authorization", tt.authHeader) + } + + authenticatedHandler := middleware(http.HandlerFunc(handler.HandleCompleteMultipart)) + rr := httptest.NewRecorder() + authenticatedHandler.ServeHTTP(rr, req) + + if rr.Code != tt.expectedStatus { + t.Errorf("Expected status %d, got %d. Body: %s", tt.expectedStatus, rr.Code, rr.Body.String()) + } + }) + } } \ No newline at end of file diff --git a/internal/upload/interfaces.go b/internal/upload/interfaces.go index 3ab10c5..15d1003 100644 --- a/internal/upload/interfaces.go +++ b/internal/upload/interfaces.go @@ -3,6 +3,8 @@ package upload import ( "context" "time" + + "mediaflow/internal/s3" ) // S3Client interface for dependency injection and testing @@ -10,4 +12,6 @@ type S3Client interface { CreateMultipartUpload(ctx context.Context, key string, headers map[string]string) (string, error) PresignPutObject(ctx context.Context, key string, expires time.Duration, headers map[string]string) (string, error) PresignUploadPart(ctx context.Context, key, uploadID string, partNumber int32, expires time.Duration) (string, error) + CompleteMultipartUpload(ctx context.Context, key, uploadID string, parts []s3.PartInfo) error + AbortMultipartUpload(ctx context.Context, key, uploadID string) error } \ No newline at end of file diff --git a/internal/upload/service.go b/internal/upload/service.go index bc305cf..236b61a 100644 --- a/internal/upload/service.go +++ b/internal/upload/service.go @@ -9,6 +9,7 @@ import ( "time" "mediaflow/internal/config" + "mediaflow/internal/s3" ) type Service struct { @@ -24,35 +25,39 @@ func NewService(s3Client S3Client, config *config.Config) *Service { } // PresignUpload generates presigned URLs for upload based on the request -func (s *Service) PresignUpload(ctx context.Context, req *PresignRequest, uploadOptions *config.UploadOptions) (*PresignResponse, error) { +func (s *Service) PresignUpload(ctx context.Context, req *PresignRequest, profile *config.Profile, baseURL string) (*PresignResponse, error) { // Validate MIME type - if !s.isMimeAllowed(req.Mime, uploadOptions.AllowedMimes) { + if !s.isMimeAllowed(req.Mime, profile.AllowedMimes) { return nil, fmt.Errorf("mime type not allowed: %s", req.Mime) } // Validate file size - if req.SizeBytes > uploadOptions.SizeMaxBytes { - return nil, fmt.Errorf("file size exceeds maximum: %d > %d", req.SizeBytes, uploadOptions.SizeMaxBytes) + if req.SizeBytes > profile.SizeMaxBytes { + return nil, fmt.Errorf("file size exceeds maximum: %d > %d", req.SizeBytes, profile.SizeMaxBytes) } - // Generate shard if not provided and sharding is enabled - shard := req.Shard - if shard == "" && uploadOptions.EnableSharding { - shard = GenerateShard(req.KeyBase) + // Generate shard only if auto-sharding is enabled + shard := "" + if profile.EnableSharding { + shard = req.Shard + if shard == "" { + shard = GenerateShard(req.KeyBase) + } } + // Note: If EnableSharding is false, any shard in request is ignored // Build object key from template - objectKey := s.buildObjectKey(uploadOptions.PathTemplate, req.KeyBase, req.Ext, shard) + objectKey := s.buildObjectKey(profile.StoragePath, req.KeyBase, req.Ext, shard) // Determine upload strategy - strategy := s.determineStrategy(req.Multipart, req.SizeBytes, uploadOptions.MultipartThresholdMB) + strategy := s.determineStrategy(req.Multipart, req.SizeBytes, profile.MultipartThresholdMB) // Create required headers headers := s.buildRequiredHeaders(req.Mime) // Create presigned URLs based on strategy - expiresAt := time.Now().Add(time.Duration(uploadOptions.TokenTTLSeconds) * time.Second) - uploadDetails, err := s.createUploadDetails(ctx, strategy, objectKey, headers, expiresAt, uploadOptions.PartSizeMB, req.SizeBytes) + expiresAt := time.Now().Add(time.Duration(profile.TokenTTLSeconds) * time.Second) + uploadDetails, err := s.createUploadDetails(ctx, strategy, objectKey, headers, expiresAt, profile.PartSizeMB, req.SizeBytes, baseURL) if err != nil { return nil, fmt.Errorf("failed to create upload details: %w", err) } @@ -124,7 +129,7 @@ func (s *Service) buildRequiredHeaders(mime string) map[string]string { return headers } -func (s *Service) createUploadDetails(ctx context.Context, strategy, objectKey string, headers map[string]string, expiresAt time.Time, partSizeMB int64, totalSizeBytes int64) (*UploadDetails, error) { +func (s *Service) createUploadDetails(ctx context.Context, strategy, objectKey string, headers map[string]string, expiresAt time.Time, partSizeMB int64, totalSizeBytes int64, baseURL string) (*UploadDetails, error) { expires := time.Until(expiresAt) if strategy == "single" { @@ -183,17 +188,54 @@ func (s *Service) createUploadDetails(ctx context.Context, strategy, objectKey s } } + // Generate server-side URLs for complete and abort operations + if baseURL == "" { + baseURL = "http://localhost:8080" // Default fallback + } + + completeURL := fmt.Sprintf("%s/v1/uploads/%s/complete/%s", baseURL, objectKey, uploadID) + abortURL := fmt.Sprintf("%s/v1/uploads/%s/abort/%s", baseURL, objectKey, uploadID) + return &UploadDetails{ Multipart: &MultipartUpload{ UploadID: uploadID, PartSize: partSizeBytes, Parts: parts, - // Note: Complete and Abort operations aren't presignable, - // client must handle these via direct API calls + Complete: &UploadAction{ + Method: "POST", + URL: completeURL, + Headers: map[string]string{"Content-Type": "application/json"}, + ExpiresAt: expiresAt, + }, + Abort: &UploadAction{ + Method: "DELETE", + URL: abortURL, + Headers: map[string]string{}, + ExpiresAt: expiresAt, + }, }, }, nil } +// CompleteMultipartUpload completes a multipart upload +func (s *Service) CompleteMultipartUpload(ctx context.Context, objectKey, uploadID string, req *CompleteMultipartRequest) error { + // Convert request parts to s3.PartInfo + parts := make([]s3.PartInfo, len(req.Parts)) + for i, part := range req.Parts { + parts[i] = s3.PartInfo{ + PartNumber: part.PartNumber, + ETag: part.ETag, + } + } + + return s.s3Client.CompleteMultipartUpload(ctx, objectKey, uploadID, parts) +} + +// AbortMultipartUpload aborts a multipart upload +func (s *Service) AbortMultipartUpload(ctx context.Context, objectKey, uploadID string) error { + return s.s3Client.AbortMultipartUpload(ctx, objectKey, uploadID) +} + // GenerateShard creates a shard from key_base using SHA1 hash func GenerateShard(keyBase string) string { hash := sha1.Sum([]byte(keyBase)) diff --git a/internal/upload/service_test.go b/internal/upload/service_test.go index efae285..2d31a4f 100644 --- a/internal/upload/service_test.go +++ b/internal/upload/service_test.go @@ -2,17 +2,21 @@ package upload import ( "context" + "strings" "testing" "time" "mediaflow/internal/config" + "mediaflow/internal/s3" ) // MockS3Client implements S3Client interface for testing type MockS3Client struct { - createMultipartUploadFunc func(ctx context.Context, key string, headers map[string]string) (string, error) - presignPutObjectFunc func(ctx context.Context, key string, expires time.Duration, headers map[string]string) (string, error) - presignUploadPartFunc func(ctx context.Context, key, uploadID string, partNumber int32, expires time.Duration) (string, error) + createMultipartUploadFunc func(ctx context.Context, key string, headers map[string]string) (string, error) + presignPutObjectFunc func(ctx context.Context, key string, expires time.Duration, headers map[string]string) (string, error) + presignUploadPartFunc func(ctx context.Context, key, uploadID string, partNumber int32, expires time.Duration) (string, error) + completeMultipartUploadFunc func(ctx context.Context, key, uploadID string, parts []s3.PartInfo) error + abortMultipartUploadFunc func(ctx context.Context, key, uploadID string) error } func (m *MockS3Client) CreateMultipartUpload(ctx context.Context, key string, headers map[string]string) (string, error) { @@ -36,6 +40,20 @@ func (m *MockS3Client) PresignUploadPart(ctx context.Context, key, uploadID stri return "https://test.s3.amazonaws.com/bucket/" + key + "?partNumber=" + string(rune(partNumber)), nil } +func (m *MockS3Client) CompleteMultipartUpload(ctx context.Context, key, uploadID string, parts []s3.PartInfo) error { + if m.completeMultipartUploadFunc != nil { + return m.completeMultipartUploadFunc(ctx, key, uploadID, parts) + } + return nil +} + +func (m *MockS3Client) AbortMultipartUpload(ctx context.Context, key, uploadID string) error { + if m.abortMultipartUploadFunc != nil { + return m.abortMultipartUploadFunc(ctx, key, uploadID) + } + return nil +} + func TestGenerateShard(t *testing.T) { tests := []struct { keyBase string @@ -213,19 +231,19 @@ func TestService_buildObjectKey(t *testing.T) { }{ { name: "With shard", - template: "raw/{shard?}/{key_base}.{ext}", + template: "originals/{shard?}/{key_base}.{ext}", keyBase: "test-key", ext: "jpg", shard: "ab", - expected: "raw/ab/test-key.jpg", + expected: "originals/ab/test-key.jpg", }, { name: "Without shard", - template: "raw/{shard?}/{key_base}.{ext}", + template: "originals/{shard?}/{key_base}.{ext}", keyBase: "test-key", ext: "jpg", shard: "", - expected: "raw/test-key.jpg", + expected: "originals/test-key.jpg", }, { name: "Simple template", @@ -252,13 +270,14 @@ func TestService_PresignUpload_Validation(t *testing.T) { cfg := &config.Config{S3Bucket: "test-bucket"} service := NewService(mockS3, cfg) - uploadOptions := &config.UploadOptions{ + profile := &config.Profile{ + Kind: "image", AllowedMimes: []string{"image/jpeg", "image/png"}, SizeMaxBytes: 5 * 1024 * 1024, // 5MB MultipartThresholdMB: 15, PartSizeMB: 8, TokenTTLSeconds: 900, - PathTemplate: "raw/{shard?}/{key_base}.{ext}", + StoragePath: "originals/{shard?}/{key_base}.{ext}", EnableSharding: true, } @@ -314,7 +333,7 @@ func TestService_PresignUpload_Validation(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { ctx := context.Background() - result, err := service.PresignUpload(ctx, tt.request, uploadOptions) + result, err := service.PresignUpload(ctx, tt.request, profile, "https://test-api.com") if tt.expectError { if err == nil { @@ -351,13 +370,14 @@ func TestService_PresignUpload_SingleStrategy(t *testing.T) { cfg := &config.Config{S3Bucket: "test-bucket"} service := NewService(mockS3, cfg) - uploadOptions := &config.UploadOptions{ + profile := &config.Profile{ + Kind: "image", AllowedMimes: []string{"image/jpeg"}, SizeMaxBytes: 5 * 1024 * 1024, MultipartThresholdMB: 15, PartSizeMB: 8, TokenTTLSeconds: 900, - PathTemplate: "raw/{key_base}.{ext}", + StoragePath: "originals/{key_base}.{ext}", EnableSharding: false, } @@ -372,7 +392,7 @@ func TestService_PresignUpload_SingleStrategy(t *testing.T) { } ctx := context.Background() - result, err := service.PresignUpload(ctx, request, uploadOptions) + result, err := service.PresignUpload(ctx, request, profile, "https://test-api.com") if err != nil { t.Errorf("Unexpected error: %v", err) @@ -390,8 +410,8 @@ func TestService_PresignUpload_SingleStrategy(t *testing.T) { t.Errorf("Expected PUT method, got %s", result.Upload.Single.Method) } - if result.ObjectKey != "raw/test-key.jpg" { - t.Errorf("Expected object key 'raw/test-key.jpg', got '%s'", result.ObjectKey) + if result.ObjectKey != "originals/test-key.jpg" { + t.Errorf("Expected object key 'originals/test-key.jpg', got '%s'", result.ObjectKey) } } @@ -407,13 +427,14 @@ func TestService_PresignUpload_MultipartStrategy(t *testing.T) { cfg := &config.Config{S3Bucket: "test-bucket"} service := NewService(mockS3, cfg) - uploadOptions := &config.UploadOptions{ + profile := &config.Profile{ + Kind: "video", AllowedMimes: []string{"video/mp4"}, SizeMaxBytes: 100 * 1024 * 1024, MultipartThresholdMB: 15, PartSizeMB: 8, TokenTTLSeconds: 900, - PathTemplate: "raw/{key_base}.{ext}", + StoragePath: "originals/{key_base}.{ext}", EnableSharding: false, } @@ -428,7 +449,7 @@ func TestService_PresignUpload_MultipartStrategy(t *testing.T) { } ctx := context.Background() - result, err := service.PresignUpload(ctx, request, uploadOptions) + result, err := service.PresignUpload(ctx, request, profile, "https://test-api.com") if err != nil { t.Errorf("Unexpected error: %v", err) @@ -460,4 +481,172 @@ func TestService_PresignUpload_MultipartStrategy(t *testing.T) { t.Errorf("Expected PUT method for part, got %s", part.Method) } } + + // Check complete and abort URLs are populated + if result.Upload.Multipart.Complete == nil { + t.Errorf("Expected complete URL to be populated") + } else { + if result.Upload.Multipart.Complete.Method != "POST" { + t.Errorf("Expected complete method to be POST, got %s", result.Upload.Multipart.Complete.Method) + } + expectedCompleteURL := "https://test-api.com/v1/uploads/originals/test-video.mp4/complete/test-upload-id" + if result.Upload.Multipart.Complete.URL != expectedCompleteURL { + t.Errorf("Expected complete URL '%s', got '%s'", expectedCompleteURL, result.Upload.Multipart.Complete.URL) + } + } + + if result.Upload.Multipart.Abort == nil { + t.Errorf("Expected abort URL to be populated") + } else { + if result.Upload.Multipart.Abort.Method != "DELETE" { + t.Errorf("Expected abort method to be DELETE, got %s", result.Upload.Multipart.Abort.Method) + } + expectedAbortURL := "https://test-api.com/v1/uploads/originals/test-video.mp4/abort/test-upload-id" + if result.Upload.Multipart.Abort.URL != expectedAbortURL { + t.Errorf("Expected abort URL '%s', got '%s'", expectedAbortURL, result.Upload.Multipart.Abort.URL) + } + } +} + +func TestService_CompleteMultipartUpload(t *testing.T) { + called := false + var capturedParts []s3.PartInfo + + mockS3 := &MockS3Client{ + completeMultipartUploadFunc: func(ctx context.Context, key, uploadID string, parts []s3.PartInfo) error { + called = true + capturedParts = parts + if key != "test-object-key" { + t.Errorf("Expected key 'test-object-key', got '%s'", key) + } + if uploadID != "test-upload-id" { + t.Errorf("Expected upload ID 'test-upload-id', got '%s'", uploadID) + } + return nil + }, + } + + cfg := &config.Config{S3Bucket: "test-bucket"} + service := NewService(mockS3, cfg) + + request := &CompleteMultipartRequest{ + Parts: []CompletedPart{ + {PartNumber: 1, ETag: "etag1"}, + {PartNumber: 2, ETag: "etag2"}, + }, + } + + ctx := context.Background() + err := service.CompleteMultipartUpload(ctx, "test-object-key", "test-upload-id", request) + + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + + if !called { + t.Errorf("Expected S3 CompleteMultipartUpload to be called") + } + + if len(capturedParts) != 2 { + t.Errorf("Expected 2 parts, got %d", len(capturedParts)) + } + + if capturedParts[0].PartNumber != 1 || capturedParts[0].ETag != "etag1" { + t.Errorf("Part 1 mismatch: expected {1, etag1}, got {%d, %s}", capturedParts[0].PartNumber, capturedParts[0].ETag) + } + + if capturedParts[1].PartNumber != 2 || capturedParts[1].ETag != "etag2" { + t.Errorf("Part 2 mismatch: expected {2, etag2}, got {%d, %s}", capturedParts[1].PartNumber, capturedParts[1].ETag) + } +} + +func TestService_AbortMultipartUpload(t *testing.T) { + called := false + + mockS3 := &MockS3Client{ + abortMultipartUploadFunc: func(ctx context.Context, key, uploadID string) error { + called = true + if key != "test-object-key" { + t.Errorf("Expected key 'test-object-key', got '%s'", key) + } + if uploadID != "test-upload-id" { + t.Errorf("Expected upload ID 'test-upload-id', got '%s'", uploadID) + } + return nil + }, + } + + cfg := &config.Config{S3Bucket: "test-bucket"} + service := NewService(mockS3, cfg) + + ctx := context.Background() + err := service.AbortMultipartUpload(ctx, "test-object-key", "test-upload-id") + + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + + if !called { + t.Errorf("Expected S3 AbortMultipartUpload to be called") + } +} + +func TestService_PresignUpload_WithURLEncoding(t *testing.T) { + mockS3 := &MockS3Client{ + createMultipartUploadFunc: func(ctx context.Context, key string, headers map[string]string) (string, error) { + return "test-upload-id", nil + }, + presignUploadPartFunc: func(ctx context.Context, key, uploadID string, partNumber int32, expires time.Duration) (string, error) { + return "https://test.s3.amazonaws.com/bucket/" + key, nil + }, + } + cfg := &config.Config{S3Bucket: "test-bucket"} + service := NewService(mockS3, cfg) + + profile := &config.Profile{ + Kind: "image", + AllowedMimes: []string{"image/jpeg"}, + SizeMaxBytes: 100 * 1024 * 1024, + MultipartThresholdMB: 15, + PartSizeMB: 8, + TokenTTLSeconds: 900, + StoragePath: "originals/{shard?}/{key_base}.{ext}", + EnableSharding: true, + } + + request := &PresignRequest{ + KeyBase: "test-key-with-chars", + Ext: "jpg", + Mime: "image/jpeg", + SizeBytes: 50 * 1024 * 1024, // Force multipart + Kind: "image", + Profile: "avatar", + Multipart: "force", + } + + ctx := context.Background() + result, err := service.PresignUpload(ctx, request, profile, "https://test-api.com") + + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + + // Object key should contain shard due to sharding being enabled + expectedPrefix := "originals/" + if !strings.HasPrefix(result.ObjectKey, expectedPrefix) { + t.Errorf("Expected object key to start with '%s', got '%s'", expectedPrefix, result.ObjectKey) + } + + // URLs should be properly URL encoded + if result.Upload.Multipart.Complete != nil { + if !strings.Contains(result.Upload.Multipart.Complete.URL, "/v1/uploads/") { + t.Errorf("Complete URL should contain '/v1/uploads/', got: %s", result.Upload.Multipart.Complete.URL) + } + } + + if result.Upload.Multipart.Abort != nil { + if !strings.Contains(result.Upload.Multipart.Abort.URL, "/v1/uploads/") { + t.Errorf("Abort URL should contain '/v1/uploads/', got: %s", result.Upload.Multipart.Abort.URL) + } + } } \ No newline at end of file diff --git a/internal/upload/types.go b/internal/upload/types.go index f618a0d..7d24215 100644 --- a/internal/upload/types.go +++ b/internal/upload/types.go @@ -78,7 +78,7 @@ type UploadConfig struct { TokenTTLSeconds int64 `yaml:"token_ttl_seconds"` SigningAlgorithm string `yaml:"signing_alg"` ActiveKeyID string `yaml:"active_kid"` - PathTemplateRaw string `yaml:"path_template_raw"` + StoragePathRaw string `yaml:"storage_path_raw"` EnableSharding bool `yaml:"enable_sharding"` Policies []UploadPolicy `yaml:"policies"` } @@ -91,6 +91,17 @@ type ErrorResponse struct { RetryAfterSeconds int `json:"retry_after_seconds,omitempty"` } +// CompleteMultipartRequest represents the request to complete a multipart upload +type CompleteMultipartRequest struct { + Parts []CompletedPart `json:"parts" validate:"required,min=1"` +} + +// CompletedPart represents a completed part with its ETag +type CompletedPart struct { + PartNumber int `json:"part_number" validate:"required,min=1"` + ETag string `json:"etag" validate:"required"` +} + // Standard error codes const ( ErrUnauthorized = "unauthorized" diff --git a/main.go b/main.go index fffefa9..b564f00 100644 --- a/main.go +++ b/main.go @@ -7,6 +7,7 @@ import ( "net/http" "os" "os/signal" + "strings" "syscall" "time" @@ -19,6 +20,18 @@ import ( "mediaflow/internal/upload" ) +// methodBasedAuth applies authentication middleware only to specific HTTP methods +func methodBasedAuth(authMiddleware func(http.Handler) http.Handler, handler http.HandlerFunc) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodPost || r.Method == http.MethodPut || r.Method == http.MethodPatch || r.Method == http.MethodDelete { + authMiddleware(http.HandlerFunc(handler)).ServeHTTP(w, r) + } else { + // No authentication for read methods (GET, HEAD, OPTIONS) + handler(w, r) + } + }) +} + func main() { cfg := config.Load() ctx := context.Background() @@ -30,7 +43,7 @@ func main() { } imageAPI := api.NewImageAPI(ctx, imageService, storageConfig) - // Setup upload service and handlers + // Setup upload service and handlers uploadService := upload.NewService(imageService.S3Client, cfg) uploadHandler := upload.NewHandler(ctx, uploadService, storageConfig) @@ -40,13 +53,22 @@ func main() { mux := http.NewServeMux() - // Image APIs (no auth required) - mux.HandleFunc("/thumb/{type}/{image_id}", imageAPI.HandleThumbnailTypes) - mux.HandleFunc("/originals/{type}/{image_id}", imageAPI.HandleOriginals) - + // Image APIs + mux.Handle("/thumb/{type}/{image_id}", methodBasedAuth(authMiddleware, imageAPI.HandleThumbnailTypes)) + mux.Handle("/originals/{type}/{image_id}", authMiddleware(http.HandlerFunc(imageAPI.HandleOriginals))) + // Upload APIs (auth required) mux.Handle("/v1/uploads/presign", authMiddleware(http.HandlerFunc(uploadHandler.HandlePresign))) - + mux.HandleFunc("/v1/uploads/", func(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodPost && strings.Contains(r.URL.Path, "/complete/") { + authMiddleware(http.HandlerFunc(uploadHandler.HandleCompleteMultipart)).ServeHTTP(w, r) + } else if r.Method == http.MethodDelete && strings.Contains(r.URL.Path, "/abort/") { + authMiddleware(http.HandlerFunc(uploadHandler.HandleAbortMultipart)).ServeHTTP(w, r) + } else { + http.NotFound(w, r) + } + }) + // Health check mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { response.JSON("OK").Write(w)