diff --git a/internal/target/dokploy/apply.go b/internal/target/dokploy/apply.go index 117ef5f..7f20ecc 100644 --- a/internal/target/dokploy/apply.go +++ b/internal/target/dokploy/apply.go @@ -2,6 +2,7 @@ package dokploy import ( "context" + "errors" "fmt" "os" "path/filepath" @@ -237,6 +238,7 @@ type appCache struct { ComposeAppName string DiscoveryRedeployAttempted bool TargetWritersStopped []dockerContainer + MigratedVolumeMounts map[string]migratedVolumeMount } type applyContext struct { @@ -317,7 +319,7 @@ func (c *Client) Apply(ctx context.Context, plan Plan) error { err := c.applyStep(ctx, actx, step) if err != nil { emitProgress(plan.OnProgress, StepProgress{Index: index, Total: total, Step: step, Status: StepStatusError, Err: err}) - c.bestEffortResume(ctx, actx, plan, total, pausedApps, coolifyProxyStopped) + c.bestEffortResume(ctx, actx, plan, total, pausedApps, coolifyProxyStopped, isUnsafeTargetResumeError(err)) return fmt.Errorf("dokploy step %s for %s (%s): %w", step.Kind, step.App, step.Ref, err) } emitProgress(plan.OnProgress, StepProgress{Index: index, Total: total, Step: step, Status: StepStatusOK}) @@ -341,6 +343,9 @@ func isPlatformAppRole(role string) bool { } func (c *Client) primeResumeState(ctx context.Context, actx *applyContext, completed []Step, next Step, pausedApps map[string]struct{}, coolifyProxyStopped *bool) error { + if err := actx.loadMigratedVolumeMounts(); err != nil { + return err + } pushedApps := map[string]struct{}{} resumedTargetApps := map[string]struct{}{} for _, step := range completed { @@ -401,11 +406,17 @@ func (c *Client) primeResumeState(ctx context.Context, actx *applyContext, compl // original failure; operators can still re-run apply or rollback. it // uses a fresh context so a Ctrl-C that cancelled apply can still // clean up — without that, source containers would stay stopped. -func (c *Client) bestEffortResume(_ context.Context, actx *applyContext, plan Plan, total int, pausedApps map[string]struct{}, coolifyProxyStopped bool) { - if len(pausedApps) == 0 && !coolifyProxyStopped && !actx.hasStoppedTargetWriters() { +func (c *Client) bestEffortResume(_ context.Context, actx *applyContext, plan Plan, total int, pausedApps map[string]struct{}, coolifyProxyStopped bool, skipTargetWriters bool) { + if actx == nil { + actx = &applyContext{} + } + actx.plan = plan + if len(pausedApps) == 0 && !coolifyProxyStopped && (skipTargetWriters || !actx.hasStoppedTargetWriters()) { return } - c.bestEffortResumeTargetWriters(actx, plan, total) + if !skipTargetWriters { + c.bestEffortResumeTargetWriters(actx, plan, total) + } for app := range pausedApps { cleanupCtx, cancel := context.WithTimeout(context.Background(), dockerStartTimeout) resumeStep := Step{Kind: StepResumeSource, App: app, Ref: app} @@ -451,18 +462,16 @@ func (c *Client) bestEffortResumeTargetWriters(actx *applyContext, plan Plan, to if actx == nil { return } - runner := c.dockerRunner() for app, entry := range actx.cache { if len(entry.TargetWritersStopped) == 0 { continue } resumeStep := Step{Kind: StepResumeTarget, App: app, Ref: app} emitProgress(plan.OnProgress, StepProgress{Index: total, Total: total, Step: resumeStep, Status: StepStatusStarted}) - if err := startStoppedTargetWriters(runner, entry.TargetWritersStopped); err != nil { + if err := c.applyResumeTarget(context.Background(), actx, resumeStep); err != nil { emitProgress(plan.OnProgress, StepProgress{Index: total, Total: total, Step: resumeStep, Status: StepStatusError, Err: err}) continue } - entry.TargetWritersStopped = nil emitProgress(plan.OnProgress, StepProgress{Index: total, Total: total, Step: resumeStep, Status: StepStatusOK}) } } @@ -474,6 +483,11 @@ func emitProgress(fn *func(StepProgress), progress StepProgress) { (*fn)(progress) } +func isUnsafeTargetResumeError(err error) bool { + var unsafe unsafeTargetResumeError + return errors.As(err, &unsafe) +} + func (c *Client) applyStep(ctx context.Context, actx *applyContext, step Step) error { switch step.Kind { case StepCreateProject: @@ -605,6 +619,12 @@ func (c *Client) applyPushImage(ctx context.Context, actx *applyContext, step St return err } if err := c.DeployCompose(ctx, entry.ComposeID, deployComposeTitle(actx.plan)); err != nil { + if safetyErr := c.validateMigratedVolumeMountsAfterDeploy(ctx, actx, step.App); safetyErr != nil && isUnsafeTargetResumeError(safetyErr) { + return fmt.Errorf("deploy dokploy compose: %w; post-deploy safety check: %v", err, safetyErr) + } + return err + } + if err := c.validateMigratedVolumeMountsAfterDeploy(ctx, actx, step.App); err != nil { return err } return c.pauseTargetWritersForState(ctx, c.dockerRunner(), actx, step.App) @@ -804,7 +824,13 @@ func (c *Client) applyActivateRoutes(ctx context.Context, actx *applyContext, st return err } } - return c.DeployCompose(ctx, entry.ComposeID, deployComposeTitle(actx.plan)) + if err := c.DeployCompose(ctx, entry.ComposeID, deployComposeTitle(actx.plan)); err != nil { + if safetyErr := c.validateMigratedVolumeMountsAfterDeploy(ctx, actx, step.App); safetyErr != nil && isUnsafeTargetResumeError(safetyErr) { + return fmt.Errorf("deploy dokploy compose: %w; post-deploy safety check: %v", err, safetyErr) + } + return err + } + return c.validateMigratedVolumeMountsAfterDeploy(ctx, actx, step.App) } func (c *Client) ensureRouteDomain(ctx context.Context, composeID string, route gateway.Route) error { @@ -1119,9 +1145,56 @@ func readComposeFile(plan Plan, appName string) (string, error) { if err != nil { return "", fmt.Errorf("prepare compose file %s for dokploy: %w", full, err) } + if shouldAttachDokployNetworkAliases(app) { + compose, err = attachDokployNetworkAliases(compose) + if err != nil { + return "", fmt.Errorf("prepare dokploy network aliases for %s: %w", full, err) + } + } return compose, nil } +func shouldAttachDokployNetworkAliases(app preparer.AppPlan) bool { + return strings.EqualFold(strings.TrimSpace(app.Role), "support") +} + +func attachDokployNetworkAliases(contents string) (string, error) { + var doc yaml.Node + if err := yaml.Unmarshal([]byte(contents), &doc); err != nil { + return "", err + } + root := &doc + if doc.Kind == yaml.DocumentNode && len(doc.Content) > 0 { + root = doc.Content[0] + } + if root.Kind != yaml.MappingNode { + return contents, nil + } + services := mappingValue(root, "services") + if services == nil || services.Kind != yaml.MappingNode { + return contents, nil + } + changed := ensureTopLevelDokployNetwork(root) + for i := 0; i+1 < len(services.Content); i += 2 { + serviceName := strings.TrimSpace(services.Content[i].Value) + service := services.Content[i+1] + if serviceName == "" || service.Kind != yaml.MappingNode { + continue + } + if ensureServiceDokployNetworkAlias(service, serviceName) { + changed = true + } + } + if !changed { + return contents, nil + } + out, err := yaml.Marshal(&doc) + if err != nil { + return "", err + } + return string(out), nil +} + func inlineComposeEnvFiles(contents, appDir string) (string, error) { return inlineComposeEnvFilesWithFallbacks(contents, appDir, nil) } @@ -1432,6 +1505,97 @@ func sanitizeComposeTopLevelResources(root *yaml.Node) bool { return changed } +func ensureTopLevelDokployNetwork(root *yaml.Node) bool { + networks := mappingValue(root, "networks") + changed := false + if networks == nil || networks.Kind != yaml.MappingNode { + networks = &yaml.Node{Kind: yaml.MappingNode} + setMappingNode(root, "networks", networks) + changed = true + } + network := mappingValue(networks, "dokploy-network") + if network == nil || network.Kind != yaml.MappingNode { + network = &yaml.Node{Kind: yaml.MappingNode} + setMappingNode(networks, "dokploy-network", network) + changed = true + } + if ensureMappingBool(network, "external", true) { + changed = true + } + return changed +} + +func ensureServiceDokployNetworkAlias(service *yaml.Node, serviceName string) bool { + networks := mappingValue(service, "networks") + changed := false + if networks == nil { + networks = &yaml.Node{Kind: yaml.MappingNode} + setMappingNode(service, "networks", networks) + setMappingNode(networks, "default", &yaml.Node{Kind: yaml.MappingNode}) + changed = true + } else if networks.Kind == yaml.SequenceNode { + networks = composeNetworkSequenceToMapping(networks) + setMappingNode(service, "networks", networks) + changed = true + } else if networks.Kind != yaml.MappingNode { + networks = &yaml.Node{Kind: yaml.MappingNode} + setMappingNode(service, "networks", networks) + changed = true + } + network := mappingValue(networks, "dokploy-network") + if network == nil || network.Kind != yaml.MappingNode { + network = &yaml.Node{Kind: yaml.MappingNode} + setMappingNode(networks, "dokploy-network", network) + changed = true + } + if ensureServiceNetworkAlias(network, serviceName) { + changed = true + } + return changed +} + +func composeNetworkSequenceToMapping(sequence *yaml.Node) *yaml.Node { + mapping := &yaml.Node{Kind: yaml.MappingNode} + for _, item := range sequence.Content { + if item.Kind != yaml.ScalarNode || strings.TrimSpace(item.Value) == "" { + continue + } + setMappingNode(mapping, strings.TrimSpace(item.Value), &yaml.Node{Kind: yaml.MappingNode}) + } + return mapping +} + +func ensureServiceNetworkAlias(network *yaml.Node, alias string) bool { + alias = strings.TrimSpace(alias) + if alias == "" { + return false + } + aliases := mappingValue(network, "aliases") + if aliases == nil { + setMappingNode(network, "aliases", &yaml.Node{Kind: yaml.SequenceNode, Content: []*yaml.Node{stringNode(alias)}}) + return true + } + if aliases.Kind != yaml.SequenceNode { + preserved := strings.TrimSpace(aliases.Value) + items := []*yaml.Node{} + if preserved != "" { + items = append(items, stringNode(preserved)) + } + if preserved != alias { + items = append(items, stringNode(alias)) + } + setMappingNode(network, "aliases", &yaml.Node{Kind: yaml.SequenceNode, Content: items}) + return true + } + for _, item := range aliases.Content { + if item.Kind == yaml.ScalarNode && strings.TrimSpace(item.Value) == alias { + return false + } + } + aliases.Content = append(aliases.Content, stringNode(alias)) + return true +} + func readComposeEnvFilePathListValues(appDir string, paths []string) (map[string]string, error) { values := map[string]string{} for _, envPath := range paths { @@ -1655,6 +1819,37 @@ func setMappingScalar(node *yaml.Node, key, value string) { ) } +func setMappingNode(node *yaml.Node, key string, value *yaml.Node) { + if node == nil || node.Kind != yaml.MappingNode { + return + } + for i := 0; i+1 < len(node.Content); i += 2 { + if node.Content[i].Value != key { + continue + } + node.Content[i+1] = value + return + } + node.Content = append(node.Content, stringNode(key), value) +} + +func ensureMappingBool(node *yaml.Node, key string, value bool) bool { + want := "false" + if value { + want = "true" + } + current := mappingValue(node, key) + if current != nil && current.Kind == yaml.ScalarNode && current.Value == want { + return false + } + setMappingNode(node, key, &yaml.Node{Kind: yaml.ScalarNode, Tag: "!!bool", Value: want}) + return true +} + +func stringNode(value string) *yaml.Node { + return &yaml.Node{Kind: yaml.ScalarNode, Tag: "!!str", Value: value} +} + func removeMappingKey(node *yaml.Node, key string) bool { if node == nil || node.Kind != yaml.MappingNode { return false diff --git a/internal/target/dokploy/apply_test.go b/internal/target/dokploy/apply_test.go index 9444535..b5d12df 100644 --- a/internal/target/dokploy/apply_test.go +++ b/internal/target/dokploy/apply_test.go @@ -665,3 +665,50 @@ func TestApplyActivateRoutesUpdatesDomainsAndRedeploys(t *testing.T) { t.Fatalf("expected one compose deploy after domain update, got %d", deploys) } } + +func TestApplyActivateRoutesDetectsMigratedVolumeDriftAfterDeploy(t *testing.T) { + bundleDir := t.TempDir() + appDir := filepath.Join(bundleDir, "api") + if err := os.MkdirAll(appDir, 0o700); err != nil { + t.Fatalf("mkdir app dir: %v", err) + } + if err := os.WriteFile(filepath.Join(appDir, "compose.yaml"), []byte("services:\n web:\n image: example/web\n"), 0o600); err != nil { + t.Fatalf("write compose: %v", err) + } + deploys := 0 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case r.Method == http.MethodPost && r.URL.Path == "/api/compose.update": + w.WriteHeader(http.StatusOK) + case r.Method == http.MethodPost && r.URL.Path == "/api/compose.deploy": + deploys++ + w.WriteHeader(http.StatusOK) + default: + http.NotFound(w, r) + } + })) + defer server.Close() + runner := &latePostDeployTargetRunner{} + client := &Client{BaseURL: server.URL, Token: "secret", HTTPClient: server.Client(), Docker: runner} + app := preparer.AppPlan{Name: "api", Directory: "api"} + app.Resources.Volumes = []preparer.VolumeResource{{Service: "web", Type: "volume", Target: "/data"}} + app.TargetResources = &preparer.TargetResources{Dokploy: &preparer.DokployResources{ComposeApp: preparer.DokployComposeApp{ComposePath: "compose.yaml"}}} + actx := &applyContext{cache: map[string]*appCache{}, plan: Plan{Prepare: preparer.Result{BundleDir: bundleDir, Apps: []preparer.AppPlan{app}}}} + entry := actx.entry("api") + entry.ComposeID = "c1" + entry.ComposeAppName = "stack-1" + entry.MigratedVolumeMounts = map[string]migratedVolumeMount{ + migratedMountKey("web", "/data"): {Service: "web", Target: "/data", VolumeName: "migrated-vol"}, + } + + err := client.applyActivateRoutes(context.Background(), actx, Step{Kind: StepActivateRoutes, App: "api", Ref: "routes"}) + if err == nil || !strings.Contains(err.Error(), "changed from migrated volume migrated-vol to fresh-vol") { + t.Fatalf("expected migrated volume drift after deploy, got %v", err) + } + if deploys != 1 { + t.Fatalf("expected route activation deploy before validation, got %d", deploys) + } + if !fakeOutputCalled(&fakeDockerRunner{outputArgs: runner.outputArgs}, "stop", "web-id") { + t.Fatalf("expected drifted target container to stop, calls=%#v", runner.outputArgs) + } +} diff --git a/internal/target/dokploy/datamove.go b/internal/target/dokploy/datamove.go index 736bf59..a467430 100644 --- a/internal/target/dokploy/datamove.go +++ b/internal/target/dokploy/datamove.go @@ -3,6 +3,7 @@ package dokploy import ( "bytes" "context" + "encoding/json" "fmt" "os" "path/filepath" @@ -45,41 +46,176 @@ func (c *Client) applySyncVolume(ctx context.Context, actx *applyContext, step S if err != nil { return err } - return withTargetStopped(ctx, runner, target, copy) + if err := withTargetStopped(ctx, runner, target, copy.Run); err != nil { + return err + } + if copy.TargetVolumeName != "" { + if err := actx.recordMigratedVolumeMount(step.App, migratedVolumeMount{ + Service: volume.Service, + Target: volume.Target, + VolumeName: copy.TargetVolumeName, + }); err != nil { + return err + } + } + return nil +} + +type plannedVolumeCopy struct { + Run func() error + TargetVolumeName string +} + +type migratedVolumeMount struct { + Service string `json:"service"` + Target string `json:"target"` + VolumeName string `json:"volumeName"` } // planVolumeCopy resolves source/target locations and returns the copy // closure to invoke once the target container is stopped. -func (c *Client) planVolumeCopy(ctx context.Context, runner dockerRunner, volume preparer.VolumeResource, target dockerContainer) (func() error, error) { +func (c *Client) planVolumeCopy(ctx context.Context, runner dockerRunner, volume preparer.VolumeResource, target dockerContainer) (plannedVolumeCopy, error) { switch volume.Type { case "volume": mount, ok := findMountByTarget(target, volume.Target) if !ok || mount.Type != "volume" || mount.Name == "" { - return nil, fmt.Errorf("named volume for target %s not found on dokploy compose container", volume.Target) + return plannedVolumeCopy{}, fmt.Errorf("named volume for target %s not found on dokploy compose container", volume.Target) } srcVolName, err := resolveSourceVolume(ctx, runner, volume) if err != nil { - return nil, err + return plannedVolumeCopy{}, err } dstVolName := mount.Name - return func() error { return copyNamedVolume(ctx, runner, srcVolName, dstVolName) }, nil + return plannedVolumeCopy{ + Run: func() error { return copyNamedVolume(ctx, runner, srcVolName, dstVolName) }, + TargetVolumeName: dstVolName, + }, nil case "bind": mount, ok := findMountByTarget(target, volume.Target) if !ok || mount.Type != "bind" || mount.Source == "" { - return nil, fmt.Errorf("bind mount for target %s not found on dokploy compose container", volume.Target) + return plannedVolumeCopy{}, fmt.Errorf("bind mount for target %s not found on dokploy compose container", volume.Target) } if !mount.RW { - return nil, fmt.Errorf("dokploy bind mount %s -> %s is read-only; refusing to sync", mount.Source, volume.Target) + return plannedVolumeCopy{}, fmt.Errorf("dokploy bind mount %s -> %s is read-only; refusing to sync", mount.Source, volume.Target) } srcPath, err := resolveSourceBindPath(ctx, runner, volume) if err != nil { - return nil, err + return plannedVolumeCopy{}, err } dstPath := mount.Source - return func() error { return rsyncBindMount(ctx, runner, srcPath, dstPath) }, nil + return plannedVolumeCopy{Run: func() error { return rsyncBindMount(ctx, runner, srcPath, dstPath) }}, nil default: - return nil, fmt.Errorf("%w: volume type %q is not supported", ErrNotImplemented, volume.Type) + return plannedVolumeCopy{}, fmt.Errorf("%w: volume type %q is not supported", ErrNotImplemented, volume.Type) + } +} + +func (a *applyContext) recordMigratedVolumeMount(appName string, mount migratedVolumeMount) error { + if a == nil || strings.TrimSpace(mount.Service) == "" || strings.TrimSpace(mount.Target) == "" || strings.TrimSpace(mount.VolumeName) == "" { + return nil + } + entry := a.entry(appName) + if entry.MigratedVolumeMounts == nil { + entry.MigratedVolumeMounts = map[string]migratedVolumeMount{} + } + entry.MigratedVolumeMounts[migratedMountKey(mount.Service, mount.Target)] = mount + return a.persistMigratedVolumeMounts() +} + +func migratedMountKey(service, target string) string { + return strings.TrimSpace(service) + "\x00" + strings.TrimSpace(target) +} + +const migratedVolumeMountsArtifact = "migrated-volumes.json" + +type migratedVolumeMountsState struct { + APIVersion string `json:"apiVersion"` + Apps map[string][]migratedVolumeMount `json:"apps,omitempty"` +} + +const migratedVolumeMountsAPIVersion = "bort.migrated-volumes/v1alpha1" + +func (a *applyContext) persistMigratedVolumeMounts() error { + if a == nil || strings.TrimSpace(a.plan.RunDir) == "" { + return nil + } + path, err := migratedVolumeMountsPath(a.plan.RunDir) + if err != nil { + return err + } + state := migratedVolumeMountsState{ + APIVersion: migratedVolumeMountsAPIVersion, + Apps: map[string][]migratedVolumeMount{}, + } + for app, entry := range a.cache { + if len(entry.MigratedVolumeMounts) == 0 { + continue + } + mounts := make([]migratedVolumeMount, 0, len(entry.MigratedVolumeMounts)) + for _, mount := range entry.MigratedVolumeMounts { + mounts = append(mounts, mount) + } + sort.Slice(mounts, func(i, j int) bool { + return migratedMountKey(mounts[i].Service, mounts[i].Target) < migratedMountKey(mounts[j].Service, mounts[j].Target) + }) + state.Apps[app] = mounts + } + if err := os.MkdirAll(filepath.Dir(path), 0o700); err != nil { + return fmt.Errorf("prepare migrated volume state dir: %w", err) + } + contents, err := json.MarshalIndent(state, "", " ") + if err != nil { + return fmt.Errorf("encode migrated volume state: %w", err) + } + contents = append(contents, '\n') + if err := os.WriteFile(path, contents, 0o600); err != nil { + return fmt.Errorf("write migrated volume state: %w", err) + } + return nil +} + +func (a *applyContext) loadMigratedVolumeMounts() error { + if a == nil || strings.TrimSpace(a.plan.RunDir) == "" { + return nil + } + path, err := migratedVolumeMountsPath(a.plan.RunDir) + if err != nil { + return err + } + contents, err := os.ReadFile(path) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return fmt.Errorf("read migrated volume state: %w", err) + } + var state migratedVolumeMountsState + if err := json.Unmarshal(contents, &state); err != nil { + return fmt.Errorf("decode migrated volume state: %w", err) + } + if state.APIVersion != "" && state.APIVersion != migratedVolumeMountsAPIVersion { + return fmt.Errorf("%s has unsupported apiVersion %q (want %q)", path, state.APIVersion, migratedVolumeMountsAPIVersion) + } + for app, mounts := range state.Apps { + entry := a.entry(app) + if entry.MigratedVolumeMounts == nil { + entry.MigratedVolumeMounts = map[string]migratedVolumeMount{} + } + for _, mount := range mounts { + if strings.TrimSpace(mount.Service) == "" || strings.TrimSpace(mount.Target) == "" || strings.TrimSpace(mount.VolumeName) == "" { + continue + } + entry.MigratedVolumeMounts[migratedMountKey(mount.Service, mount.Target)] = mount + } + } + return nil +} + +func migratedVolumeMountsPath(runDir string) (string, error) { + path := filepath.Join(runDir, migratedVolumeMountsArtifact) + if err := safepath.ContainedPath(runDir, path); err != nil { + return "", err } + return path, nil } // withTargetStopped runs op while the dokploy target container is @@ -311,16 +447,176 @@ func targetContainerSetSignature(containers []dockerContainer) string { func (c *Client) applyResumeTarget(ctx context.Context, actx *applyContext, step Step) error { entry := actx.entry(step.App) - if len(entry.TargetWritersStopped) == 0 { - return nil + if err := c.validateMigratedVolumeMounts(ctx, actx, step.App); err != nil { + if isUnsafeTargetResumeError(err) { + stopCtx, cancelStop := context.WithTimeout(context.Background(), targetDiscoveryTimeout) + defer cancelStop() + if stopErr := c.stopTargetComposeContainers(stopCtx, actx, step.App); stopErr != nil { + return fmt.Errorf("%w (also failed to stop unsafe target containers: %v)", err, stopErr) + } + } + return err } - if err := startStoppedTargetWriters(c.dockerRunner(), entry.TargetWritersStopped); err != nil { + if len(entry.TargetWritersStopped) > 0 { + if err := startStoppedTargetWriters(c.dockerRunner(), entry.TargetWritersStopped); err != nil { + return err + } + entry.TargetWritersStopped = nil + } + return nil +} + +func (c *Client) validateMigratedVolumeMountsAfterDeploy(_ context.Context, actx *applyContext, appName string) error { + validationCtx, cancelValidation := context.WithTimeout(context.Background(), targetDiscoveryTimeout) + defer cancelValidation() + if err := c.validateMigratedVolumeMountsStable(validationCtx, actx, appName); err != nil { + if isUnsafeTargetResumeError(err) { + stopCtx, cancelStop := context.WithTimeout(context.Background(), targetDiscoveryTimeout) + defer cancelStop() + if stopErr := c.stopTargetComposeContainers(stopCtx, actx, appName); stopErr != nil { + return fmt.Errorf("%w (also failed to stop unsafe target containers: %v)", err, stopErr) + } + } return err } - entry.TargetWritersStopped = nil return nil } +func (c *Client) validateMigratedVolumeMounts(ctx context.Context, actx *applyContext, appName string) error { + _, err := c.validateMigratedVolumeMountsSnapshot(ctx, actx, appName) + return err +} + +func (c *Client) validateMigratedVolumeMountsStable(ctx context.Context, actx *applyContext, appName string) error { + entry := actx.entry(appName) + if len(entry.MigratedVolumeMounts) == 0 { + return nil + } + lastSignature := "" + stableSince := time.Time{} + for { + signature, err := c.validateMigratedVolumeMountsSnapshot(ctx, actx, appName) + if err != nil { + return err + } + now := time.Now() + if signature != lastSignature { + lastSignature = signature + stableSince = now + } else if !stableSince.IsZero() && now.Sub(stableSince) >= targetPostDeployStableWindow { + return nil + } + select { + case <-ctx.Done(): + return unsafeTargetResumeError{err: ctx.Err()} + case <-time.After(targetWriterDiscoveryDelay): + } + } +} + +func (c *Client) validateMigratedVolumeMountsSnapshot(ctx context.Context, actx *applyContext, appName string) (string, error) { + entry := actx.entry(appName) + if len(entry.MigratedVolumeMounts) == 0 { + return "", nil + } + runner := c.dockerRunner() + parts := make([]string, 0, len(entry.MigratedVolumeMounts)) + for _, expected := range entry.MigratedVolumeMounts { + container, err := c.targetContainerForServiceNoRedeploy(ctx, runner, actx, appName, expected.Service) + if err != nil { + return "", unsafeTargetResumeError{err: err} + } + mount, ok := findMountByTarget(container, expected.Target) + if !ok || mount.Type != "volume" || mount.Name == "" { + return "", unsafeTargetResumeError{err: fmt.Errorf("target service %s no longer has migrated volume mounted at %s", expected.Service, expected.Target)} + } + if mount.Name != expected.VolumeName { + return "", unsafeTargetResumeError{err: fmt.Errorf("target service %s volume mount %s changed from migrated volume %s to %s; refusing to accept a deploy that may be using fresh state", expected.Service, expected.Target, expected.VolumeName, mount.Name)} + } + parts = append(parts, migratedMountKey(expected.Service, expected.Target)+"\x00"+container.ID+"\x00"+mount.Name) + } + sort.Strings(parts) + return strings.Join(parts, "\x1e"), nil +} + +type unsafeTargetResumeError struct { + err error +} + +func (e unsafeTargetResumeError) Error() string { + if e.err == nil { + return "target resume is unsafe" + } + return e.err.Error() +} + +func (e unsafeTargetResumeError) Unwrap() error { + return e.err +} + +func (c *Client) stopTargetComposeContainers(ctx context.Context, actx *applyContext, appName string) error { + entry := actx.entry(appName) + if entry.ComposeAppName == "" { + if err := c.refreshComposeAppName(ctx, entry); err != nil { + return err + } + } + runner := c.dockerRunner() + var result error + var quietSince time.Time + stoppedIDs := map[string]struct{}{} + for { + containers, err := listContainersByLabel(ctx, runner, "com.docker.compose.project="+entry.ComposeAppName) + if err != nil { + if result != nil { + return fmt.Errorf("%w; list target containers: %v", result, err) + } + return err + } + sawRunning := false + hadStopError := false + for _, container := range containers { + if !container.State.Running { + continue + } + if _, stopped := stoppedIDs[container.ID]; stopped { + continue + } + sawRunning = true + quietSince = time.Time{} + if err := stopContainer(ctx, runner, container.ID); err != nil { + hadStopError = true + if result != nil { + result = fmt.Errorf("%w; stop target container %s: %v", result, container.ID, err) + continue + } + result = fmt.Errorf("stop target container %s: %w", container.ID, err) + continue + } + stoppedIDs[container.ID] = struct{}{} + } + if hadStopError { + return result + } + if !sawRunning { + if quietSince.IsZero() { + quietSince = time.Now() + } + if time.Since(quietSince) >= targetStopQuietWindow { + return result + } + } + select { + case <-ctx.Done(): + if result != nil { + return result + } + return ctx.Err() + case <-time.After(targetWriterDiscoveryDelay): + } + } +} + func startStoppedTargetWriters(runner dockerRunner, containers []dockerContainer) error { var result error for _, container := range containers { @@ -410,7 +706,7 @@ func (c *Client) applyRestoreDataStore(ctx context.Context, actx *applyContext, } creds := postgresCredsFromEnv(envMap(dst.Config.Env)) - return c.withTargetWritersStopped(ctx, runner, actx, step.App, map[string]struct{}{store.Service: {}}, func() error { + if err := c.withTargetWritersStopped(ctx, runner, actx, step.App, map[string]struct{}{store.Service: {}}, func() error { if err := waitPostgresReady(ctx, runner, dst.ID, creds); err != nil { return err } @@ -445,7 +741,30 @@ func (c *Client) applyRestoreDataStore(ctx context.Context, actx *applyContext, args = append(args, "-L", restoreListPath) } return runner.Run(ctx, file, nil, args...) - }) + }); err != nil { + return err + } + return recordMigratedStoreMounts(actx, step.App, app, store.Service, dst) +} + +func recordMigratedStoreMounts(actx *applyContext, appName string, app preparer.AppPlan, service string, container dockerContainer) error { + for _, volume := range app.Resources.Volumes { + if volume.Service != service || volume.Type != "volume" || volume.Target == "" { + continue + } + mount, ok := findMountByTarget(container, volume.Target) + if !ok || mount.Type != "volume" || mount.Name == "" { + continue + } + if err := actx.recordMigratedVolumeMount(appName, migratedVolumeMount{ + Service: service, + Target: volume.Target, + VolumeName: mount.Name, + }); err != nil { + return err + } + } + return nil } func preparePgRestoreList(ctx context.Context, runner dockerRunner, containerID, dumpPath, appName, ref string) (string, func(), error) { @@ -935,15 +1254,25 @@ func resolveSourceBindPath(ctx context.Context, runner dockerRunner, volume prep } const ( - targetDiscoveryTimeout = 60 * time.Second - targetDiscoveryDelay = 2 * time.Second - targetWriterDiscoveryDelay = 250 * time.Millisecond + targetDiscoveryTimeout = 60 * time.Second + targetDiscoveryDelay = 2 * time.Second + targetWriterDiscoveryDelay = 250 * time.Millisecond + targetStopQuietWindow = 2 * targetWriterDiscoveryDelay + targetPostDeployStableWindow = targetDiscoveryDelay ) // targetContainerForService discovers the dokploy-managed container for a // compose service by docker label, polling briefly because compose.deploy // returns before the containers may finish starting. func (c *Client) targetContainerForService(ctx context.Context, runner dockerRunner, actx *applyContext, appName, service string) (dockerContainer, error) { + return c.targetContainerForServiceWithRedeploy(ctx, runner, actx, appName, service, true) +} + +func (c *Client) targetContainerForServiceNoRedeploy(ctx context.Context, runner dockerRunner, actx *applyContext, appName, service string) (dockerContainer, error) { + return c.targetContainerForServiceWithRedeploy(ctx, runner, actx, appName, service, false) +} + +func (c *Client) targetContainerForServiceWithRedeploy(ctx context.Context, runner dockerRunner, actx *applyContext, appName, service string, allowRedeploy bool) (dockerContainer, error) { entry := actx.entry(appName) if entry.ComposeAppName == "" { if err := c.refreshComposeAppName(ctx, entry); err != nil { @@ -962,7 +1291,7 @@ func (c *Client) targetContainerForService(ctx context.Context, runner dockerRun if err := c.composeDeploymentError(ctx, entry); err != nil { return dockerContainer{}, err } - if len(containers) == 0 && entry.ComposeID != "" && !entry.DiscoveryRedeployAttempted { + if allowRedeploy && len(containers) == 0 && entry.ComposeID != "" && !entry.DiscoveryRedeployAttempted { entry.DiscoveryRedeployAttempted = true if err := c.DeployCompose(ctx, entry.ComposeID, deployComposeTitle(actx.plan)); err != nil { return dockerContainer{}, fmt.Errorf("redeploy dokploy compose project %q for target discovery: %w", entry.ComposeAppName, err) diff --git a/internal/target/dokploy/datamove_test.go b/internal/target/dokploy/datamove_test.go index 22e82e8..17ee7d6 100644 --- a/internal/target/dokploy/datamove_test.go +++ b/internal/target/dokploy/datamove_test.go @@ -113,6 +113,73 @@ func (r *stagedTargetWriterRunner) Run(context.Context, io.Reader, io.Writer, .. return nil } +type latePostDeployTargetRunner struct { + psCalls int + outputArgs [][]string + stopped bool + emptyFirst bool +} + +func (r *latePostDeployTargetRunner) Output(_ context.Context, args ...string) ([]byte, error) { + r.outputArgs = append(r.outputArgs, append([]string{}, args...)) + key := strings.Join(args, " ") + switch key { + case "ps -a --filter label=com.docker.compose.project=stack-1 --format {{.ID}}": + r.psCalls++ + if r.emptyFirst && r.psCalls == 1 { + return []byte(""), nil + } + return []byte("web-id\n"), nil + case "inspect --type container web-id": + if r.stopped { + return []byte(`[{"Id":"web-id","Name":"/web","Config":{"Labels":{"com.docker.compose.service":"web","com.docker.compose.project":"stack-1"}},"State":{"Running":false,"Status":"exited"},"Mounts":[{"Type":"volume","Name":"fresh-vol","Destination":"/data","RW":true}]}]`), nil + } + return []byte(`[{"Id":"web-id","Name":"/web","Config":{"Labels":{"com.docker.compose.service":"web","com.docker.compose.project":"stack-1"}},"State":{"Running":true,"Status":"running"},"Mounts":[{"Type":"volume","Name":"fresh-vol","Destination":"/data","RW":true}]}]`), nil + case "stop web-id": + r.stopped = true + return []byte("web-id\n"), nil + default: + return nil, errors.New("docker output not stubbed: " + key) + } +} + +func (r *latePostDeployTargetRunner) Run(context.Context, io.Reader, io.Writer, ...string) error { + return nil +} + +type postDeployRecreateRunner struct { + inspectCalls int + outputArgs [][]string + stopped bool +} + +func (r *postDeployRecreateRunner) Output(_ context.Context, args ...string) ([]byte, error) { + r.outputArgs = append(r.outputArgs, append([]string{}, args...)) + key := strings.Join(args, " ") + switch key { + case "ps -a --filter label=com.docker.compose.project=stack-1 --format {{.ID}}": + return []byte("web-id\n"), nil + case "inspect --type container web-id": + r.inspectCalls++ + if r.stopped { + return []byte(`[{"Id":"web-id","Name":"/web","Config":{"Labels":{"com.docker.compose.service":"web","com.docker.compose.project":"stack-1"}},"State":{"Running":false,"Status":"exited"},"Mounts":[{"Type":"volume","Name":"fresh-vol","Destination":"/data","RW":true}]}]`), nil + } + if r.inspectCalls == 1 { + return []byte(`[{"Id":"web-id","Name":"/web","Config":{"Labels":{"com.docker.compose.service":"web","com.docker.compose.project":"stack-1"}},"State":{"Running":true,"Status":"running"},"Mounts":[{"Type":"volume","Name":"migrated-vol","Destination":"/data","RW":true}]}]`), nil + } + return []byte(`[{"Id":"web-id","Name":"/web","Config":{"Labels":{"com.docker.compose.service":"web","com.docker.compose.project":"stack-1"}},"State":{"Running":true,"Status":"running"},"Mounts":[{"Type":"volume","Name":"fresh-vol","Destination":"/data","RW":true}]}]`), nil + case "stop web-id": + r.stopped = true + return []byte("web-id\n"), nil + default: + return nil, errors.New("docker output not stubbed: " + key) + } +} + +func (r *postDeployRecreateRunner) Run(context.Context, io.Reader, io.Writer, ...string) error { + return nil +} + func TestStopContainerKillsAfterStopTimeout(t *testing.T) { runner := &timeoutStopRunner{} ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond) @@ -306,6 +373,50 @@ func TestApplyPushImageTagsMissingComposeImageFromSourceContainer(t *testing.T) } } +func TestApplyPushImageDetectsMigratedVolumeDriftAfterDeploy(t *testing.T) { + bundleDir := t.TempDir() + appDir := filepath.Join(bundleDir, "api") + if err := os.MkdirAll(appDir, 0o700); err != nil { + t.Fatalf("mkdir app dir: %v", err) + } + if err := os.WriteFile(filepath.Join(appDir, "compose.yaml"), []byte("services:\n web:\n image: example/web\n"), 0o600); err != nil { + t.Fatalf("write compose: %v", err) + } + deploys := 0 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodPost && r.URL.Path == "/api/compose.deploy" { + deploys++ + w.WriteHeader(http.StatusOK) + return + } + http.NotFound(w, r) + })) + defer server.Close() + runner := &latePostDeployTargetRunner{} + client := &Client{BaseURL: server.URL, Token: "secret", HTTPClient: server.Client(), Docker: runner} + app := preparer.AppPlan{Name: "api", Directory: "api"} + app.Resources.Volumes = []preparer.VolumeResource{{Service: "web", Type: "volume", Target: "/data"}} + app.TargetResources = &preparer.TargetResources{Dokploy: &preparer.DokployResources{ComposeApp: preparer.DokployComposeApp{ComposePath: "compose.yaml"}}} + actx := &applyContext{cache: map[string]*appCache{}, plan: Plan{Prepare: preparer.Result{BundleDir: bundleDir, Apps: []preparer.AppPlan{app}}}} + entry := actx.entry("api") + entry.ComposeID = "c1" + entry.ComposeAppName = "stack-1" + entry.MigratedVolumeMounts = map[string]migratedVolumeMount{ + migratedMountKey("web", "/data"): {Service: "web", Target: "/data", VolumeName: "migrated-vol"}, + } + + err := client.applyPushImage(context.Background(), actx, Step{Kind: StepPushImage, App: "api", Ref: "api"}) + if err == nil || !strings.Contains(err.Error(), "changed from migrated volume migrated-vol to fresh-vol") { + t.Fatalf("expected migrated volume drift after push deploy, got %v", err) + } + if deploys != 1 { + t.Fatalf("expected push deploy before validation, got %d", deploys) + } + if !fakeOutputCalled(&fakeDockerRunner{outputArgs: runner.outputArgs}, "stop", "web-id") { + t.Fatalf("expected drifted target container to stop, calls=%#v", runner.outputArgs) + } +} + func TestComposeFileForApplyReusesSourceImageForBuildOnlyService(t *testing.T) { bundleDir := t.TempDir() appDir := filepath.Join(bundleDir, "api") @@ -712,6 +823,57 @@ func TestInlineComposeEnvFilesAddsSharedEnvFileForAppServices(t *testing.T) { } } +func TestReadComposeFileAddsDokployNetworkAliasForSupportCompose(t *testing.T) { + bundleDir := t.TempDir() + appDir := filepath.Join(bundleDir, "sts-db-instance") + if err := os.MkdirAll(appDir, 0o700); err != nil { + t.Fatalf("mkdir app dir: %v", err) + } + compose := `services: + p0sow4sccoo8cssoos8w0g0o: + image: postgres:17-alpine + expose: + - "5432" + volumes: + - postgres-data-p0sow4sccoo8cssoos8w0g0o:/var/lib/postgresql/data +volumes: + postgres-data-p0sow4sccoo8cssoos8w0g0o: {} +` + if err := os.WriteFile(filepath.Join(appDir, "compose.yaml"), []byte(compose), 0o600); err != nil { + t.Fatalf("write compose: %v", err) + } + app := preparer.AppPlan{Name: "sts-db-instance", Directory: "sts-db-instance", Role: "support"} + app.TargetResources = &preparer.TargetResources{Dokploy: &preparer.DokployResources{ComposeApp: preparer.DokployComposeApp{ComposePath: "compose.yaml"}}} + out, err := readComposeFile(Plan{Prepare: preparer.Result{BundleDir: bundleDir, Apps: []preparer.AppPlan{app}}}, "sts-db-instance") + if err != nil { + t.Fatalf("readComposeFile: %v", err) + } + var parsed struct { + Services map[string]struct { + Networks map[string]struct { + Aliases []string `yaml:"aliases"` + } `yaml:"networks"` + } `yaml:"services"` + Networks map[string]struct { + External bool `yaml:"external"` + } `yaml:"networks"` + } + if err := yaml.Unmarshal([]byte(out), &parsed); err != nil { + t.Fatalf("parse compose: %v\n%s", err, out) + } + service := parsed.Services["p0sow4sccoo8cssoos8w0g0o"] + if _, ok := service.Networks["default"]; !ok { + t.Fatalf("expected support service to keep default network, got:\n%s", out) + } + aliases := service.Networks["dokploy-network"].Aliases + if len(aliases) != 1 || aliases[0] != "p0sow4sccoo8cssoos8w0g0o" { + t.Fatalf("expected dokploy-network alias for source host, got %#v in:\n%s", aliases, out) + } + if !parsed.Networks["dokploy-network"].External { + t.Fatalf("expected top-level dokploy-network external=true, got:\n%s", out) + } +} + func TestInlineComposeEnvFilesSanitizesCoolifyRuntimeCompose(t *testing.T) { appDir := t.TempDir() compose := `services: @@ -947,6 +1109,250 @@ func TestApplySyncVolumeStopsRunningTargetForCopy(t *testing.T) { } } +func TestApplyResumeTargetDetectsMigratedVolumeDrift(t *testing.T) { + app := preparer.AppPlan{Name: "api"} + app.Resources.Volumes = []preparer.VolumeResource{{ + Service: "redis", + Type: "volume", + Name: "src-vol", + Target: "/data", + }} + runner := &fakeDockerRunner{ + outputs: map[string][]byte{ + "ps -a --filter label=com.docker.compose.project=stack-1 --format {{.ID}}": []byte("dst-id\n"), + "inspect --type container dst-id": []byte(`[{"Id":"dst-id","Name":"/dokploy-redis","Config":{"Labels":{"com.docker.compose.service":"redis","com.docker.compose.project":"stack-1"}},"State":{"Running":true,"Status":"running"},"Mounts":[{"Type":"volume","Name":"migrated-vol","Destination":"/data","RW":true}]}]`), + "volume inspect src-vol": []byte(`[{"Name":"src-vol"}]`), + "volume inspect migrated-vol": []byte(`[{"Name":"migrated-vol"}]`), + "stop dst-id": []byte("dst-id\n"), + "start dst-id": []byte("dst-id\n"), + }, + } + client := &Client{Docker: runner} + actx := &applyContext{cache: map[string]*appCache{}} + actx.entry("api").ComposeAppName = "stack-1" + actx.plan = Plan{Prepare: preparer.Result{Apps: []preparer.AppPlan{app}}} + + step := Step{Kind: StepSyncVolume, App: "api", Ref: "volume:redis -> /data"} + if err := client.applySyncVolume(context.Background(), actx, step); err != nil { + t.Fatalf("applySyncVolume: %v", err) + } + actx.entry("api").TargetWritersStopped = []dockerContainer{{ID: "writer-id"}} + runner.outputs["inspect --type container dst-id"] = []byte(`[{"Id":"dst-id","Name":"/dokploy-redis","Config":{"Labels":{"com.docker.compose.service":"redis","com.docker.compose.project":"stack-1"}},"State":{"Running":true,"Status":"running"},"Mounts":[{"Type":"volume","Name":"fresh-vol","Destination":"/data","RW":true}]}]`) + runner.outputArgs = nil + + err := client.applyResumeTarget(context.Background(), actx, Step{Kind: StepResumeTarget, App: "api", Ref: "api"}) + if err == nil || !strings.Contains(err.Error(), "changed from migrated volume migrated-vol to fresh-vol") { + t.Fatalf("expected migrated volume drift error, got %v", err) + } + if !fakeOutputCalled(runner, "stop", "dst-id") { + t.Fatalf("expected unsafe target container to stop, calls=%#v", runner.outputArgs) + } + if fakeOutputCalled(runner, "start", "writer-id") { + t.Fatalf("target writer started before volume drift validation, calls=%#v", runner.outputArgs) + } +} + +func TestApplyResumeTargetLoadsPersistedMigratedVolumeState(t *testing.T) { + runDir := t.TempDir() + app := preparer.AppPlan{Name: "api"} + app.Resources.Volumes = []preparer.VolumeResource{{ + Service: "redis", + Type: "volume", + Name: "src-vol", + Target: "/data", + }} + runner := &fakeDockerRunner{ + outputs: map[string][]byte{ + "ps -a --filter label=com.docker.compose.project=stack-1 --format {{.ID}}": []byte("dst-id\n"), + "inspect --type container dst-id": []byte(`[{"Id":"dst-id","Name":"/dokploy-redis","Config":{"Labels":{"com.docker.compose.service":"redis","com.docker.compose.project":"stack-1"}},"State":{"Running":true,"Status":"running"},"Mounts":[{"Type":"volume","Name":"migrated-vol","Destination":"/data","RW":true}]}]`), + "volume inspect src-vol": []byte(`[{"Name":"src-vol"}]`), + "volume inspect migrated-vol": []byte(`[{"Name":"migrated-vol"}]`), + "stop dst-id": []byte("dst-id\n"), + "start dst-id": []byte("dst-id\n"), + }, + } + client := &Client{Docker: runner} + plan := Plan{Prepare: preparer.Result{Apps: []preparer.AppPlan{app}}, RunDir: runDir} + actx := &applyContext{cache: map[string]*appCache{}, plan: plan} + actx.entry("api").ComposeAppName = "stack-1" + step := Step{Kind: StepSyncVolume, App: "api", Ref: "volume:redis -> /data"} + if err := client.applySyncVolume(context.Background(), actx, step); err != nil { + t.Fatalf("applySyncVolume: %v", err) + } + + resumed := &applyContext{cache: map[string]*appCache{}, plan: plan} + resumed.entry("api").ComposeAppName = "stack-1" + pausedApps := map[string]struct{}{} + coolifyProxyStopped := false + if err := client.primeResumeState(context.Background(), resumed, nil, Step{Kind: StepResumeTarget, App: "api", Ref: "api"}, pausedApps, &coolifyProxyStopped); err != nil { + t.Fatalf("primeResumeState: %v", err) + } + runner.outputs["inspect --type container dst-id"] = []byte(`[{"Id":"dst-id","Name":"/dokploy-redis","Config":{"Labels":{"com.docker.compose.service":"redis","com.docker.compose.project":"stack-1"}},"State":{"Running":true,"Status":"running"},"Mounts":[{"Type":"volume","Name":"fresh-vol","Destination":"/data","RW":true}]}]`) + + err := client.applyResumeTarget(context.Background(), resumed, Step{Kind: StepResumeTarget, App: "api", Ref: "api"}) + if err == nil || !strings.Contains(err.Error(), "changed from migrated volume migrated-vol to fresh-vol") { + t.Fatalf("expected persisted migrated volume drift error, got %v", err) + } +} + +func TestRecordMigratedStoreMountsFeedsResumeValidation(t *testing.T) { + app := preparer.AppPlan{Name: "api"} + app.Resources.Volumes = []preparer.VolumeResource{{ + Service: "db", + Type: "volume", + Target: "/var/lib/postgresql/data", + }} + runner := &fakeDockerRunner{outputs: map[string][]byte{ + "ps -a --filter label=com.docker.compose.project=stack-1 --format {{.ID}}": []byte("db-id\n"), + "inspect --type container db-id": []byte(`[{"Id":"db-id","Name":"/db","Config":{"Labels":{"com.docker.compose.service":"db","com.docker.compose.project":"stack-1"}},"State":{"Running":true,"Status":"running"},"Mounts":[{"Type":"volume","Name":"fresh-db","Destination":"/var/lib/postgresql/data","RW":true}]}]`), + }} + client := &Client{Docker: runner} + actx := &applyContext{cache: map[string]*appCache{}, plan: Plan{Prepare: preparer.Result{Apps: []preparer.AppPlan{app}}}} + actx.entry("api").ComposeAppName = "stack-1" + if err := recordMigratedStoreMounts(actx, "api", app, "db", dockerContainer{Mounts: []dockerMount{{Type: "volume", Name: "restored-db", Destination: "/var/lib/postgresql/data"}}}); err != nil { + t.Fatalf("recordMigratedStoreMounts: %v", err) + } + + err := client.applyResumeTarget(context.Background(), actx, Step{Kind: StepResumeTarget, App: "api", Ref: "api"}) + if err == nil || !strings.Contains(err.Error(), "changed from migrated volume restored-db to fresh-db") { + t.Fatalf("expected restored store volume drift error, got %v", err) + } +} + +func TestBestEffortResumeSkipsTargetWritersAfterUnsafeResumeError(t *testing.T) { + runner := &fakeDockerRunner{outputs: map[string][]byte{ + "inspect --type container source-id": []byte(`[{"Id":"source-id","Name":"/source","State":{"Running":false,"Status":"exited"}}]`), + "start source-id": []byte("source-id\n"), + }} + client := &Client{Docker: runner} + actx := &applyContext{cache: map[string]*appCache{}} + actx.entry("api").TargetWritersStopped = []dockerContainer{{ID: "target-id"}} + plan := Plan{Prepare: preparer.Result{Apps: []preparer.AppPlan{{Name: "source-app", Resources: preparer.ResourceSpecs{Volumes: []preparer.VolumeResource{{Service: "web", Type: "volume", Target: "/data", SourceContainerID: "source-id"}}}}}}} + client.bestEffortResume(context.Background(), actx, plan, 1, map[string]struct{}{"source-app": {}}, false, true) + + if fakeOutputCalled(runner, "start", "target-id") { + t.Fatalf("target writer restarted after unsafe resume error, calls=%#v", runner.outputArgs) + } + if !fakeOutputCalled(runner, "start", "source-id") { + t.Fatalf("expected source app to resume, calls=%#v", runner.outputArgs) + } +} + +func TestBestEffortResumeValidatesTargetWritersBeforeRestart(t *testing.T) { + runner := &fakeDockerRunner{outputs: map[string][]byte{ + "ps -a --filter label=com.docker.compose.project=stack-1 --format {{.ID}}": []byte("web-id\n"), + "inspect --type container web-id": []byte(`[{"Id":"web-id","Name":"/web","Config":{"Labels":{"com.docker.compose.service":"web","com.docker.compose.project":"stack-1"}},"State":{"Running":false,"Status":"exited"},"Mounts":[{"Type":"volume","Name":"fresh-vol","Destination":"/data","RW":true}]}]`), + }} + client := &Client{Docker: runner} + actx := &applyContext{cache: map[string]*appCache{}, plan: Plan{Prepare: preparer.Result{Apps: []preparer.AppPlan{{Name: "api"}}}}} + entry := actx.entry("api") + entry.ComposeAppName = "stack-1" + entry.TargetWritersStopped = []dockerContainer{{ID: "target-id"}} + entry.MigratedVolumeMounts = map[string]migratedVolumeMount{ + migratedMountKey("web", "/data"): {Service: "web", Target: "/data", VolumeName: "migrated-vol"}, + } + + client.bestEffortResume(context.Background(), actx, actx.plan, 1, nil, false, false) + + if fakeOutputCalled(runner, "start", "target-id") { + t.Fatalf("target writer restarted despite migrated volume drift, calls=%#v", runner.outputArgs) + } +} + +func TestValidateMigratedVolumeMountsDoesNotRedeployForDiscovery(t *testing.T) { + deploys := 0 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case r.Method == http.MethodGet && r.URL.Path == "/api/compose.one": + _ = json.NewEncoder(w).Encode(Compose{ComposeID: "c1", AppName: "stack-1", ComposeStatus: "done"}) + case r.Method == http.MethodPost && r.URL.Path == "/api/compose.deploy": + deploys++ + w.WriteHeader(http.StatusOK) + default: + http.NotFound(w, r) + } + })) + defer server.Close() + runner := &fakeDockerRunner{outputs: map[string][]byte{ + "ps -a --filter label=com.docker.compose.project=stack-1 --format {{.ID}}": []byte(""), + }} + client := &Client{BaseURL: server.URL, Token: "secret", HTTPClient: server.Client(), Docker: runner} + actx := &applyContext{cache: map[string]*appCache{}, plan: Plan{}} + entry := actx.entry("api") + entry.ComposeID = "c1" + entry.ComposeAppName = "stack-1" + entry.MigratedVolumeMounts = map[string]migratedVolumeMount{ + migratedMountKey("web", "/data"): {Service: "web", Target: "/data", VolumeName: "migrated-vol"}, + } + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + + err := client.validateMigratedVolumeMounts(ctx, actx, "api") + if err == nil || !isUnsafeTargetResumeError(err) { + t.Fatalf("expected unsafe validation error while target service is missing, got %v", err) + } + if deploys != 0 { + t.Fatalf("validation redeployed compose during discovery, deploys=%d", deploys) + } +} + +func TestStopTargetComposeContainersWaitsForLatePostDeployTarget(t *testing.T) { + runner := &latePostDeployTargetRunner{emptyFirst: true} + client := &Client{Docker: runner} + actx := &applyContext{cache: map[string]*appCache{}, plan: Plan{}} + entry := actx.entry("api") + entry.ComposeAppName = "stack-1" + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + if err := client.stopTargetComposeContainers(ctx, actx, "api"); err != nil { + t.Fatalf("stopTargetComposeContainers: %v", err) + } + if !fakeOutputCalled(&fakeDockerRunner{outputArgs: runner.outputArgs}, "stop", "web-id") { + t.Fatalf("expected late post-deploy target to stop, calls=%#v", runner.outputArgs) + } +} + +func TestPostDeployValidationIgnoresCanceledApplyContextAndStopsLateTarget(t *testing.T) { + runner := &latePostDeployTargetRunner{emptyFirst: true} + client := &Client{Docker: runner} + actx := &applyContext{cache: map[string]*appCache{}, plan: Plan{}} + entry := actx.entry("api") + entry.ComposeAppName = "stack-1" + entry.MigratedVolumeMounts = map[string]migratedVolumeMount{ + migratedMountKey("web", "/data"): {Service: "web", Target: "/data", VolumeName: "migrated-vol"}, + } + canceledCtx, cancel := context.WithCancel(context.Background()) + cancel() + + err := client.validateMigratedVolumeMountsAfterDeploy(canceledCtx, actx, "api") + if err == nil || !strings.Contains(err.Error(), "changed from migrated volume migrated-vol to fresh-vol") { + t.Fatalf("expected fresh safety context to detect late migrated volume drift, got %v", err) + } + if !fakeOutputCalled(&fakeDockerRunner{outputArgs: runner.outputArgs}, "stop", "web-id") { + t.Fatalf("expected late post-deploy target to stop, calls=%#v", runner.outputArgs) + } +} + +func TestPostDeployValidationDoesNotPassOnPreDeployContainer(t *testing.T) { + runner := &postDeployRecreateRunner{} + client := &Client{Docker: runner} + actx := &applyContext{cache: map[string]*appCache{}, plan: Plan{}} + entry := actx.entry("api") + entry.ComposeAppName = "stack-1" + entry.MigratedVolumeMounts = map[string]migratedVolumeMount{ + migratedMountKey("web", "/data"): {Service: "web", Target: "/data", VolumeName: "migrated-vol"}, + } + + err := client.validateMigratedVolumeMountsAfterDeploy(context.Background(), actx, "api") + if err == nil || !strings.Contains(err.Error(), "changed from migrated volume migrated-vol to fresh-vol") { + t.Fatalf("expected post-deploy validation to keep watching past the pre-deploy container, got %v", err) + } + if !fakeOutputCalled(&fakeDockerRunner{outputArgs: runner.outputArgs}, "stop", "web-id") { + t.Fatalf("expected recreated target container to stop, calls=%#v", runner.outputArgs) + } +} + func TestTargetWriterKeepServicesKeepsOnlyLogicalStores(t *testing.T) { app := preparer.AppPlan{Name: "api"} app.Resources.DataStores = []preparer.DataStoreResource{