From 73719c311adeaeb832236a30fc4aa30c8c177513 Mon Sep 17 00:00:00 2001 From: Alex <132889147+alexvcodesphere@users.noreply.github.com> Date: Fri, 20 Feb 2026 19:27:21 +0100 Subject: [PATCH 1/2] feat: refactor pipeline into shared package with step-aware log streaming Extract pipeline stage execution from cli/cmd/start_pipeline.go into a reusable pkg/pipeline package with a Runner that orchestrates stage execution, discovers sub-steps from the IDE server's pipeline status API, and streams logs per-step via SSE. - Add pkg/pipeline with Runner, Client interface, and step discovery - Add StreamLogs SSE method to api/workspace.go for real-time log output - Add StreamLogs to cli/cmd Client interface - Refactor start_pipeline.go to delegate to pipeline.Runner - Add pipeline streaming unit tests (single/multi-step, no-stream cases) - Update .mockery.yml for pipeline mock generation Signed-off-by: Alex <132889147+alexvcodesphere@users.noreply.github.com> --- .mockery.yml | 8 + api/workspace.go | 101 +++++++++ cli/cmd/client.go | 1 + cli/cmd/mocks.go | 81 ++++++++ cli/cmd/start_pipeline.go | 114 +--------- cli/cmd/start_pipeline_test.go | 10 +- pkg/pipeline/mocks.go | 308 ++++++++++++++++++++++++++++ pkg/pipeline/pipeline.go | 233 +++++++++++++++++++++ pkg/pipeline/pipeline_suite_test.go | 16 ++ pkg/pipeline/pipeline_test.go | 177 ++++++++++++++++ 10 files changed, 937 insertions(+), 112 deletions(-) create mode 100644 pkg/pipeline/mocks.go create mode 100644 pkg/pipeline/pipeline.go create mode 100644 pkg/pipeline/pipeline_suite_test.go create mode 100644 pkg/pipeline/pipeline_test.go diff --git a/.mockery.yml b/.mockery.yml index c1fe4a6..4e084af 100644 --- a/.mockery.yml +++ b/.mockery.yml @@ -38,3 +38,11 @@ packages: config: all: true interfaces: + github.com/codesphere-cloud/cs-go/pkg/deploy: + config: + all: true + interfaces: + github.com/codesphere-cloud/cs-go/pkg/pipeline: + config: + all: true + interfaces: diff --git a/api/workspace.go b/api/workspace.go index 56977d6..7f79c8d 100644 --- a/api/workspace.go +++ b/api/workspace.go @@ -4,7 +4,14 @@ package api import ( + "bufio" + "context" + "encoding/json" "fmt" + "io" + "log" + "net/http" + "strings" "github.com/codesphere-cloud/cs-go/api/errors" "github.com/codesphere-cloud/cs-go/api/openapi_client" @@ -219,3 +226,97 @@ func (c Client) GitPull(workspaceId int, remote string, branch string) error { r, err := req.Execute() return errors.FormatAPIError(r, err) } + +// logEntry represents a single log line from the SSE stream. +type logEntry struct { + Timestamp string `json:"timestamp"` + Kind string `json:"kind"` + Data string `json:"data"` +} + +// StreamLogs connects to the Codesphere SSE log endpoint and writes parsed +// log entries to the provided writer until the context is cancelled or the +// stream ends. This is used during pipeline execution to provide real-time +// log output. +func (c *Client) StreamLogs(ctx context.Context, apiUrl string, wsId int, stage string, step int, w io.Writer) error { + endpoint := fmt.Sprintf("%s/workspaces/%d/logs/%s/%d", apiUrl, wsId, stage, step) + + req, err := http.NewRequestWithContext(ctx, "GET", endpoint, nil) + if err != nil { + return fmt.Errorf("failed to construct log stream request: %w", err) + } + + req.Header.Set("Accept", "text/event-stream") + + // Set auth from the client's context token + if token, ok := ctx.Value(openapi_client.ContextAccessToken).(string); ok && token != "" { + req.Header.Set("Authorization", "Bearer "+token) + } else if token, ok := c.ctx.Value(openapi_client.ContextAccessToken).(string); ok && token != "" { + req.Header.Set("Authorization", "Bearer "+token) + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + // Context cancellation is expected when the stage finishes + if ctx.Err() != nil { + return nil + } + return fmt.Errorf("failed to connect to log stream: %w", err) + } + defer func() { _ = resp.Body.Close() }() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("log stream responded with status %d", resp.StatusCode) + } + + reader := bufio.NewReader(resp.Body) + + for { + // Check if context is done + select { + case <-ctx.Done(): + return nil + default: + } + + // Parse one SSE event + var eventData string + for { + line, err := reader.ReadString('\n') + if err != nil { + if ctx.Err() != nil || err == io.EOF { + return nil + } + return fmt.Errorf("failed to read log stream: %w", err) + } + + line = strings.TrimSpace(line) + + if strings.HasPrefix(line, "data:") { + data := strings.TrimSpace(strings.TrimPrefix(line, "data:")) + if eventData != "" { + eventData += "\n" + data + } else { + eventData = data + } + } else if line == "" && eventData != "" { + // Empty line marks end of SSE event + break + } + } + + // Parse and print log entries + var entries []logEntry + if err := json.Unmarshal([]byte(eventData), &entries); err != nil { + // Skip unparseable events (e.g. error responses) + log.Printf("⚠ log stream: %s", eventData) + eventData = "" + continue + } + + for _, entry := range entries { + _, _ = fmt.Fprintf(w, "%s | %s\n", entry.Timestamp, entry.Data) + } + eventData = "" + } +} diff --git a/cli/cmd/client.go b/cli/cmd/client.go index c0a3440..f54e64b 100644 --- a/cli/cmd/client.go +++ b/cli/cmd/client.go @@ -33,6 +33,7 @@ type Client interface { GetPipelineState(wsId int, stage string) ([]api.PipelineStatus, error) GitPull(wsId int, remote string, branch string) error DeployLandscape(wsId int, profile string) error + StreamLogs(ctx context.Context, apiUrl string, wsId int, stage string, step int, w io.Writer) error } // CommandExecutor abstracts command execution for testing diff --git a/cli/cmd/mocks.go b/cli/cmd/mocks.go index 42246f9..29e1e4f 100644 --- a/cli/cmd/mocks.go +++ b/cli/cmd/mocks.go @@ -945,6 +945,87 @@ func (_c *MockClient_StartPipelineStage_Call) RunAndReturn(run func(wsId int, pr return _c } +// StreamLogs provides a mock function for the type MockClient +func (_mock *MockClient) StreamLogs(ctx context.Context, apiUrl string, wsId int, stage string, step int, w io.Writer) error { + ret := _mock.Called(ctx, apiUrl, wsId, stage, step, w) + + if len(ret) == 0 { + panic("no return value specified for StreamLogs") + } + + var r0 error + if returnFunc, ok := ret.Get(0).(func(context.Context, string, int, string, int, io.Writer) error); ok { + r0 = returnFunc(ctx, apiUrl, wsId, stage, step, w) + } else { + r0 = ret.Error(0) + } + return r0 +} + +// MockClient_StreamLogs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'StreamLogs' +type MockClient_StreamLogs_Call struct { + *mock.Call +} + +// StreamLogs is a helper method to define mock.On call +// - ctx context.Context +// - apiUrl string +// - wsId int +// - stage string +// - step int +// - w io.Writer +func (_e *MockClient_Expecter) StreamLogs(ctx interface{}, apiUrl interface{}, wsId interface{}, stage interface{}, step interface{}, w interface{}) *MockClient_StreamLogs_Call { + return &MockClient_StreamLogs_Call{Call: _e.mock.On("StreamLogs", ctx, apiUrl, wsId, stage, step, w)} +} + +func (_c *MockClient_StreamLogs_Call) Run(run func(ctx context.Context, apiUrl string, wsId int, stage string, step int, w io.Writer)) *MockClient_StreamLogs_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 string + if args[1] != nil { + arg1 = args[1].(string) + } + var arg2 int + if args[2] != nil { + arg2 = args[2].(int) + } + var arg3 string + if args[3] != nil { + arg3 = args[3].(string) + } + var arg4 int + if args[4] != nil { + arg4 = args[4].(int) + } + var arg5 io.Writer + if args[5] != nil { + arg5 = args[5].(io.Writer) + } + run( + arg0, + arg1, + arg2, + arg3, + arg4, + arg5, + ) + }) + return _c +} + +func (_c *MockClient_StreamLogs_Call) Return(err error) *MockClient_StreamLogs_Call { + _c.Call.Return(err) + return _c +} + +func (_c *MockClient_StreamLogs_Call) RunAndReturn(run func(ctx context.Context, apiUrl string, wsId int, stage string, step int, w io.Writer) error) *MockClient_StreamLogs_Call { + _c.Call.Return(run) + return _c +} + // WaitForWorkspaceRunning provides a mock function for the type MockClient func (_mock *MockClient) WaitForWorkspaceRunning(workspace *api.Workspace, timeout time.Duration) error { ret := _mock.Called(workspace, timeout) diff --git a/cli/cmd/start_pipeline.go b/cli/cmd/start_pipeline.go index 5f3824f..e85676c 100644 --- a/cli/cmd/start_pipeline.go +++ b/cli/cmd/start_pipeline.go @@ -5,12 +5,11 @@ package cmd import ( "fmt" - "log" - "slices" "time" "github.com/codesphere-cloud/cs-go/api" "github.com/codesphere-cloud/cs-go/pkg/io" + "github.com/codesphere-cloud/cs-go/pkg/pipeline" "github.com/spf13/cobra" ) @@ -27,8 +26,6 @@ type StartPipelineOpts struct { Timeout *time.Duration } -const IdeServer string = "codesphere-ide" - func (c *StartPipelineCmd) RunE(_ *cobra.Command, args []string) error { workspaceId, err := c.Opts.GetWorkspaceId() @@ -80,108 +77,9 @@ func AddStartPipelineCmd(start *cobra.Command, opts *GlobalOptions) { } func (c *StartPipelineCmd) StartPipelineStages(client Client, wsId int, stages []string) error { - for _, stage := range stages { - if !isValidStage(stage) { - return fmt.Errorf("invalid pipeline stage: %s", stage) - } - } - for _, stage := range stages { - err := c.startStage(client, wsId, stage) - if err != nil { - return err - } - } - return nil -} - -func isValidStage(stage string) bool { - return slices.Contains([]string{"prepare", "test", "run"}, stage) -} - -func (c *StartPipelineCmd) startStage(client Client, wsId int, stage string) error { - log.Printf("starting %s stage on workspace %d...", stage, wsId) - - err := client.StartPipelineStage(wsId, *c.Opts.Profile, stage) - if err != nil { - log.Println() - return fmt.Errorf("failed to start pipeline stage %s: %w", stage, err) - } - - err = c.waitForPipelineStage(client, wsId, stage) - if err != nil { - return fmt.Errorf("failed waiting for stage %s to finish: %w", stage, err) - - } - return nil -} - -func (c *StartPipelineCmd) waitForPipelineStage(client Client, wsId int, stage string) error { - delay := 5 * time.Second - - maxWaitTime := c.Time.Now().Add(*c.Opts.Timeout) - for { - status, err := client.GetPipelineState(wsId, stage) - if err != nil { - log.Printf("\nError getting pipeline status: %s, trying again...", err.Error()) - c.Time.Sleep(delay) - continue - } - - if c.allFinished(status) { - log.Println("(finished)") - break - } - - if allRunning(status) && stage == "run" { - log.Println("(running)") - break - } - - err = shouldAbort(status) - if err != nil { - log.Println("(failed)") - return fmt.Errorf("stage %s failed: %w", stage, err) - } - - log.Print(".") - if c.Time.Now().After(maxWaitTime) { - log.Println() - return fmt.Errorf("timed out waiting for pipeline stage %s to be complete", stage) - } - c.Time.Sleep(delay) - } - return nil -} - -func allRunning(status []api.PipelineStatus) bool { - for _, s := range status { - // Run stage is only running customer servers, ignore IDE server - if s.Server != IdeServer && s.State != "running" { - return false - } - } - return true -} - -func (c *StartPipelineCmd) allFinished(status []api.PipelineStatus) bool { - io.Verboseln(c.Opts.Verbose, "====") - for _, s := range status { - io.Verbosef(c.Opts.Verbose, "Server: %s, State: %s, Replica: %s\n", s.Server, s.State, s.Replica) - } - for _, s := range status { - // Prepare and Test stage is only running in the IDE server, ignore customer servers - if s.Server == IdeServer && s.State != "success" { - return false - } - } - return true -} - -func shouldAbort(status []api.PipelineStatus) error { - for _, s := range status { - if slices.Contains([]string{"failure", "aborted"}, s.State) { - return fmt.Errorf("server %s, replica %s reached unexpected state %s", s.Server, s.Replica, s.State) - } - } - return nil + runner := pipeline.NewRunner(client, c.Time) + return runner.RunStages(wsId, stages, pipeline.Config{ + Profile: *c.Opts.Profile, + Timeout: *c.Opts.Timeout, + }) } diff --git a/cli/cmd/start_pipeline_test.go b/cli/cmd/start_pipeline_test.go index 9a1b0c8..4f6d857 100644 --- a/cli/cmd/start_pipeline_test.go +++ b/cli/cmd/start_pipeline_test.go @@ -95,7 +95,8 @@ var _ = Describe("StartPipeline", func() { testStartCall := mockClient.EXPECT().StartPipelineStage(wsId, profile, stages[1]).Return(nil).NotBefore(prepareStatusCall) testStatusCall := mockClient.EXPECT().GetPipelineState(wsId, stages[1]).Return(reportedStatusSuccess, nil).NotBefore(testStartCall) - runStartCall := mockClient.EXPECT().StartPipelineStage(wsId, profile, stages[2]).Return(nil).NotBefore(testStatusCall) + syncCall := mockClient.EXPECT().DeployLandscape(wsId, profile).Return(nil).NotBefore(testStatusCall) + runStartCall := mockClient.EXPECT().StartPipelineStage(wsId, profile, stages[2]).Return(nil).NotBefore(syncCall) mockClient.EXPECT().GetPipelineState(wsId, stages[2]).Return(reportedStatusRunning, nil).NotBefore(runStartCall) }) @@ -125,7 +126,8 @@ var _ = Describe("StartPipeline", func() { testStatusCall := mockClient.EXPECT().GetPipelineState(wsId, stages[1]).Return(reportedStatusRunning, nil).Times(2).NotBefore(testStartCall) testStatusCallSuccess := mockClient.EXPECT().GetPipelineState(wsId, stages[1]).Return(reportedStatusSuccess, nil).NotBefore(testStatusCall) - runStartCall := mockClient.EXPECT().StartPipelineStage(wsId, profile, stages[2]).Return(nil).NotBefore(testStatusCallSuccess) + syncCall := mockClient.EXPECT().DeployLandscape(wsId, profile).Return(nil).NotBefore(testStatusCallSuccess) + runStartCall := mockClient.EXPECT().StartPipelineStage(wsId, profile, stages[2]).Return(nil).NotBefore(syncCall) mockClient.EXPECT().GetPipelineState(wsId, stages[2]).Return(reportedStatusWaiting, nil).Times(2).NotBefore(runStartCall) mockClient.EXPECT().GetPipelineState(wsId, stages[2]).Return(reportedStatusRunning, nil).NotBefore(runStartCall) @@ -145,7 +147,7 @@ var _ = Describe("StartPipeline", func() { mockClient.EXPECT().GetPipelineState(wsId, stages[1]).Return(reportedStatusRunning, nil).Times(8).NotBefore(testStartCall) err := c.StartPipelineStages(mockClient, wsId, stages) - Expect(err).To(MatchError("failed waiting for stage test to finish: timed out waiting for pipeline stage test to be complete")) + Expect(err).To(MatchError("timed out waiting for pipeline stage test to be complete")) }) }) @@ -155,7 +157,7 @@ var _ = Describe("StartPipeline", func() { mockClient.EXPECT().GetPipelineState(wsId, stages[0]).Return(reportedStatusFailure, nil).NotBefore(prepareStartCall) err := c.StartPipelineStages(mockClient, wsId, stages) - Expect(err).To(MatchError("failed waiting for stage prepare to finish: stage prepare failed: server A, replica 0 reached unexpected state failure")) + Expect(err).To(MatchError("stage prepare failed: server A, replica 0 reached unexpected state failure")) }) }) }) diff --git a/pkg/pipeline/mocks.go b/pkg/pipeline/mocks.go new file mode 100644 index 0000000..dc0aeb6 --- /dev/null +++ b/pkg/pipeline/mocks.go @@ -0,0 +1,308 @@ +// Code generated by mockery; DO NOT EDIT. +// github.com/vektra/mockery +// template: testify + +package pipeline + +import ( + "context" + "github.com/codesphere-cloud/cs-go/api" + mock "github.com/stretchr/testify/mock" + "io" +) + +// NewMockClient creates a new instance of MockClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockClient(t interface { + mock.TestingT + Cleanup(func()) +}) *MockClient { + mock := &MockClient{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} + +// MockClient is an autogenerated mock type for the Client type +type MockClient struct { + mock.Mock +} + +type MockClient_Expecter struct { + mock *mock.Mock +} + +func (_m *MockClient) EXPECT() *MockClient_Expecter { + return &MockClient_Expecter{mock: &_m.Mock} +} + +// DeployLandscape provides a mock function for the type MockClient +func (_mock *MockClient) DeployLandscape(wsId int, profile string) error { + ret := _mock.Called(wsId, profile) + + if len(ret) == 0 { + panic("no return value specified for DeployLandscape") + } + + var r0 error + if returnFunc, ok := ret.Get(0).(func(int, string) error); ok { + r0 = returnFunc(wsId, profile) + } else { + r0 = ret.Error(0) + } + return r0 +} + +// MockClient_DeployLandscape_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeployLandscape' +type MockClient_DeployLandscape_Call struct { + *mock.Call +} + +// DeployLandscape is a helper method to define mock.On call +// - wsId int +// - profile string +func (_e *MockClient_Expecter) DeployLandscape(wsId interface{}, profile interface{}) *MockClient_DeployLandscape_Call { + return &MockClient_DeployLandscape_Call{Call: _e.mock.On("DeployLandscape", wsId, profile)} +} + +func (_c *MockClient_DeployLandscape_Call) Run(run func(wsId int, profile string)) *MockClient_DeployLandscape_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 int + if args[0] != nil { + arg0 = args[0].(int) + } + var arg1 string + if args[1] != nil { + arg1 = args[1].(string) + } + run( + arg0, + arg1, + ) + }) + return _c +} + +func (_c *MockClient_DeployLandscape_Call) Return(err error) *MockClient_DeployLandscape_Call { + _c.Call.Return(err) + return _c +} + +func (_c *MockClient_DeployLandscape_Call) RunAndReturn(run func(wsId int, profile string) error) *MockClient_DeployLandscape_Call { + _c.Call.Return(run) + return _c +} + +// GetPipelineState provides a mock function for the type MockClient +func (_mock *MockClient) GetPipelineState(wsId int, stage string) ([]api.PipelineStatus, error) { + ret := _mock.Called(wsId, stage) + + if len(ret) == 0 { + panic("no return value specified for GetPipelineState") + } + + var r0 []api.PipelineStatus + var r1 error + if returnFunc, ok := ret.Get(0).(func(int, string) ([]api.PipelineStatus, error)); ok { + return returnFunc(wsId, stage) + } + if returnFunc, ok := ret.Get(0).(func(int, string) []api.PipelineStatus); ok { + r0 = returnFunc(wsId, stage) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]api.PipelineStatus) + } + } + if returnFunc, ok := ret.Get(1).(func(int, string) error); ok { + r1 = returnFunc(wsId, stage) + } else { + r1 = ret.Error(1) + } + return r0, r1 +} + +// MockClient_GetPipelineState_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetPipelineState' +type MockClient_GetPipelineState_Call struct { + *mock.Call +} + +// GetPipelineState is a helper method to define mock.On call +// - wsId int +// - stage string +func (_e *MockClient_Expecter) GetPipelineState(wsId interface{}, stage interface{}) *MockClient_GetPipelineState_Call { + return &MockClient_GetPipelineState_Call{Call: _e.mock.On("GetPipelineState", wsId, stage)} +} + +func (_c *MockClient_GetPipelineState_Call) Run(run func(wsId int, stage string)) *MockClient_GetPipelineState_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 int + if args[0] != nil { + arg0 = args[0].(int) + } + var arg1 string + if args[1] != nil { + arg1 = args[1].(string) + } + run( + arg0, + arg1, + ) + }) + return _c +} + +func (_c *MockClient_GetPipelineState_Call) Return(vs []api.PipelineStatus, err error) *MockClient_GetPipelineState_Call { + _c.Call.Return(vs, err) + return _c +} + +func (_c *MockClient_GetPipelineState_Call) RunAndReturn(run func(wsId int, stage string) ([]api.PipelineStatus, error)) *MockClient_GetPipelineState_Call { + _c.Call.Return(run) + return _c +} + +// StartPipelineStage provides a mock function for the type MockClient +func (_mock *MockClient) StartPipelineStage(wsId int, profile string, stage string) error { + ret := _mock.Called(wsId, profile, stage) + + if len(ret) == 0 { + panic("no return value specified for StartPipelineStage") + } + + var r0 error + if returnFunc, ok := ret.Get(0).(func(int, string, string) error); ok { + r0 = returnFunc(wsId, profile, stage) + } else { + r0 = ret.Error(0) + } + return r0 +} + +// MockClient_StartPipelineStage_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'StartPipelineStage' +type MockClient_StartPipelineStage_Call struct { + *mock.Call +} + +// StartPipelineStage is a helper method to define mock.On call +// - wsId int +// - profile string +// - stage string +func (_e *MockClient_Expecter) StartPipelineStage(wsId interface{}, profile interface{}, stage interface{}) *MockClient_StartPipelineStage_Call { + return &MockClient_StartPipelineStage_Call{Call: _e.mock.On("StartPipelineStage", wsId, profile, stage)} +} + +func (_c *MockClient_StartPipelineStage_Call) Run(run func(wsId int, profile string, stage string)) *MockClient_StartPipelineStage_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 int + if args[0] != nil { + arg0 = args[0].(int) + } + var arg1 string + if args[1] != nil { + arg1 = args[1].(string) + } + var arg2 string + if args[2] != nil { + arg2 = args[2].(string) + } + run( + arg0, + arg1, + arg2, + ) + }) + return _c +} + +func (_c *MockClient_StartPipelineStage_Call) Return(err error) *MockClient_StartPipelineStage_Call { + _c.Call.Return(err) + return _c +} + +func (_c *MockClient_StartPipelineStage_Call) RunAndReturn(run func(wsId int, profile string, stage string) error) *MockClient_StartPipelineStage_Call { + _c.Call.Return(run) + return _c +} + +// StreamLogs provides a mock function for the type MockClient +func (_mock *MockClient) StreamLogs(ctx context.Context, apiUrl string, wsId int, stage string, step int, w io.Writer) error { + ret := _mock.Called(ctx, apiUrl, wsId, stage, step, w) + + if len(ret) == 0 { + panic("no return value specified for StreamLogs") + } + + var r0 error + if returnFunc, ok := ret.Get(0).(func(context.Context, string, int, string, int, io.Writer) error); ok { + r0 = returnFunc(ctx, apiUrl, wsId, stage, step, w) + } else { + r0 = ret.Error(0) + } + return r0 +} + +// MockClient_StreamLogs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'StreamLogs' +type MockClient_StreamLogs_Call struct { + *mock.Call +} + +// StreamLogs is a helper method to define mock.On call +// - ctx context.Context +// - apiUrl string +// - wsId int +// - stage string +// - step int +// - w io.Writer +func (_e *MockClient_Expecter) StreamLogs(ctx interface{}, apiUrl interface{}, wsId interface{}, stage interface{}, step interface{}, w interface{}) *MockClient_StreamLogs_Call { + return &MockClient_StreamLogs_Call{Call: _e.mock.On("StreamLogs", ctx, apiUrl, wsId, stage, step, w)} +} + +func (_c *MockClient_StreamLogs_Call) Run(run func(ctx context.Context, apiUrl string, wsId int, stage string, step int, w io.Writer)) *MockClient_StreamLogs_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 string + if args[1] != nil { + arg1 = args[1].(string) + } + var arg2 int + if args[2] != nil { + arg2 = args[2].(int) + } + var arg3 string + if args[3] != nil { + arg3 = args[3].(string) + } + var arg4 int + if args[4] != nil { + arg4 = args[4].(int) + } + var arg5 io.Writer + if args[5] != nil { + arg5 = args[5].(io.Writer) + } + run( + arg0, + arg1, + arg2, + arg3, + arg4, + arg5, + ) + }) + return _c +} + +func (_c *MockClient_StreamLogs_Call) Return(err error) *MockClient_StreamLogs_Call { + _c.Call.Return(err) + return _c +} + +func (_c *MockClient_StreamLogs_Call) RunAndReturn(run func(ctx context.Context, apiUrl string, wsId int, stage string, step int, w io.Writer) error) *MockClient_StreamLogs_Call { + _c.Call.Return(run) + return _c +} diff --git a/pkg/pipeline/pipeline.go b/pkg/pipeline/pipeline.go new file mode 100644 index 0000000..249ed2b --- /dev/null +++ b/pkg/pipeline/pipeline.go @@ -0,0 +1,233 @@ +// Copyright (c) Codesphere Inc. +// SPDX-License-Identifier: Apache-2.0 + +package pipeline + +//go:generate go tool mockery + +import ( + "context" + "fmt" + "io" + "log" + "os" + "slices" + "sync" + "time" + + "github.com/codesphere-cloud/cs-go/api" +) + +const IdeServer string = "codesphere-ide" + +// Client defines the API operations needed for pipeline execution. +type Client interface { + StartPipelineStage(wsId int, profile string, stage string) error + GetPipelineState(wsId int, stage string) ([]api.PipelineStatus, error) + DeployLandscape(wsId int, profile string) error + StreamLogs(ctx context.Context, apiUrl string, wsId int, stage string, step int, w io.Writer) error +} + +// Config holds parameters for pipeline execution. +type Config struct { + Profile string + Timeout time.Duration + ApiUrl string +} + +// Runner orchestrates pipeline stage execution. +type Runner struct { + Client Client + Time api.Time +} + +// NewRunner creates a new pipeline runner with the given API client. +func NewRunner(client Client, clock api.Time) *Runner { + if clock == nil { + clock = &api.RealTime{} + } + return &Runner{Client: client, Time: clock} +} + +// RunStages runs pipeline stages sequentially: prepare and test are awaited, +// the run stage is preceded by a landscape sync and then fire-and-forget. +func (r *Runner) RunStages(wsId int, stages []string, cfg Config) error { + for _, stage := range stages { + if !IsValidStage(stage) { + return fmt.Errorf("invalid pipeline stage: %s", stage) + } + } + + for _, stage := range stages { + // Sync the landscape before the run stage + if stage == "run" { + fmt.Println(" 🔄 Syncing landscape...") + if err := r.Client.DeployLandscape(wsId, cfg.Profile); err != nil { + return fmt.Errorf("syncing landscape: %w", err) + } + fmt.Println(" ✅ Landscape synced.") + } + + if err := r.runStage(wsId, stage, cfg); err != nil { + return err + } + } + return nil +} + +func (r *Runner) runStage(wsId int, stage string, cfg Config) error { + log.Printf("starting %s stage on workspace %d...", stage, wsId) + + if err := r.Client.StartPipelineStage(wsId, cfg.Profile, stage); err != nil { + log.Println() + return fmt.Errorf("failed to start pipeline stage %s: %w", stage, err) + } + + // Step-aware log streaming for non-run stages. + // Each step gets its own context; when a new step is discovered the + // previous step's stream is cancelled and drained before moving on. + streamEnabled := stage != "run" && cfg.ApiUrl != "" + streamingStep := -1 + var stepCancel context.CancelFunc + var stepWg sync.WaitGroup + + // drainStream waits for the current stream to deliver logs, then cancels. + drainStream := func() { + if stepCancel == nil { + return + } + done := make(chan struct{}) + go func() { + stepWg.Wait() + close(done) + }() + select { + case <-done: + case <-time.After(3 * time.Second): + stepCancel() + stepWg.Wait() + } + } + + startStreamForStep := func(step int, totalSteps int) { + if !streamEnabled || step <= streamingStep { + return + } + + // Drain previous step before starting next + drainStream() + + streamingStep = step + fmt.Printf("\n 📋 Step %d/%d\n", step+1, totalSteps) + + ctx, cancel := context.WithCancel(context.Background()) + stepCancel = cancel + stepWg.Add(1) + go func() { + defer stepWg.Done() + if err := r.Client.StreamLogs(ctx, cfg.ApiUrl, wsId, stage, step, os.Stdout); err != nil { + _, _ = fmt.Fprintf(os.Stderr, "⚠ log stream error (step %d): %v\n", step, err) + } + }() + } + + err := r.waitForStageWithStepCallback(wsId, stage, cfg, startStreamForStep) + + // Drain final step's logs + drainStream() + + return err +} + +func (r *Runner) waitForStageWithStepCallback(wsId int, stage string, cfg Config, onStep func(step int, total int)) error { + delay := 5 * time.Second + timeout := cfg.Timeout + if timeout == 0 { + timeout = 30 * time.Minute + } + + maxWaitTime := r.Time.Now().Add(timeout) + for { + status, err := r.Client.GetPipelineState(wsId, stage) + if err != nil { + log.Printf("\nError getting pipeline status: %s, trying again...", err.Error()) + r.Time.Sleep(delay) + continue + } + + // Discover active step from IDE server's Steps array + if onStep != nil { + for _, s := range status { + if s.Server == IdeServer { + total := len(s.Steps) + for i, step := range s.Steps { + if step.State == "running" || step.State == "success" { + onStep(i, total) + } + } + break + } + } + } + + if AllFinished(status) { + log.Println("(finished)") + break + } + + if AllRunning(status) && stage == "run" { + log.Println("(running)") + break + } + + if err = ShouldAbort(status); err != nil { + log.Println("(failed)") + return fmt.Errorf("stage %s failed: %w", stage, err) + } + + log.Print(".") + if r.Time.Now().After(maxWaitTime) { + log.Println() + return fmt.Errorf("timed out waiting for pipeline stage %s to be complete", stage) + } + r.Time.Sleep(delay) + } + return nil +} + +// IsValidStage returns true if the given stage name is valid. +func IsValidStage(stage string) bool { + return slices.Contains([]string{"prepare", "test", "run"}, stage) +} + +// AllFinished returns true when all IDE server replicas have succeeded. +// Prepare and test stages only run in the IDE server; customer servers are ignored. +func AllFinished(status []api.PipelineStatus) bool { + for _, s := range status { + if s.Server == IdeServer && s.State != "success" { + return false + } + } + return true +} + +// AllRunning returns true when all customer server replicas are running. +// The IDE server is ignored since the run stage only applies to customer servers. +func AllRunning(status []api.PipelineStatus) bool { + for _, s := range status { + if s.Server != IdeServer && s.State != "running" { + return false + } + } + return true +} + +// ShouldAbort returns an error if any replica has reached a terminal failure state. +func ShouldAbort(status []api.PipelineStatus) error { + for _, s := range status { + if slices.Contains([]string{"failure", "aborted"}, s.State) { + return fmt.Errorf("server %s, replica %s reached unexpected state %s", s.Server, s.Replica, s.State) + } + } + return nil +} diff --git a/pkg/pipeline/pipeline_suite_test.go b/pkg/pipeline/pipeline_suite_test.go new file mode 100644 index 0000000..e353b48 --- /dev/null +++ b/pkg/pipeline/pipeline_suite_test.go @@ -0,0 +1,16 @@ +// Copyright (c) Codesphere Inc. +// SPDX-License-Identifier: Apache-2.0 + +package pipeline_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestPipeline(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Pipeline Suite") +} diff --git a/pkg/pipeline/pipeline_test.go b/pkg/pipeline/pipeline_test.go new file mode 100644 index 0000000..fc6e458 --- /dev/null +++ b/pkg/pipeline/pipeline_test.go @@ -0,0 +1,177 @@ +// Copyright (c) Codesphere Inc. +// SPDX-License-Identifier: Apache-2.0 + +package pipeline_test + +import ( + "context" + "fmt" + "io" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/stretchr/testify/mock" + + "github.com/codesphere-cloud/cs-go/api" + openapi "github.com/codesphere-cloud/cs-go/api/openapi_client" + "github.com/codesphere-cloud/cs-go/pkg/pipeline" +) + +func statusWithSteps(server string, state string, stepStates ...string) api.PipelineStatus { + steps := make([]openapi.WorkspacesPipelineStatus200ResponseInnerStepsInner, len(stepStates)) + for i, s := range stepStates { + steps[i] = openapi.WorkspacesPipelineStatus200ResponseInnerStepsInner{State: s} + } + return api.PipelineStatus{ + State: state, + Replica: "0", + Server: server, + Steps: steps, + } +} + +var _ = Describe("Runner", func() { + var ( + mockClient *pipeline.MockClient + mockTime *api.MockTime + runner *pipeline.Runner + wsId int + cfg pipeline.Config + ) + + BeforeEach(func() { + wsId = 42 + cfg = pipeline.Config{ + Profile: "", + Timeout: 30 * time.Second, + ApiUrl: "https://codesphere.com/api", + } + }) + + JustBeforeEach(func() { + mockClient = pipeline.NewMockClient(GinkgoT()) + mockTime = api.NewMockTime(GinkgoT()) + runner = pipeline.NewRunner(mockClient, mockTime) + + currentTime := time.Unix(1746190963, 0) + mockTime.EXPECT().Now().RunAndReturn(func() time.Time { + return currentTime + }).Maybe() + mockTime.EXPECT().Sleep(mock.Anything).Run(func(t time.Duration) { + currentTime = currentTime.Add(t) + }).Maybe() + }) + + Describe("log streaming during prepare stage", func() { + Context("with a single step", func() { + It("calls StreamLogs with step 0", func() { + mockClient.EXPECT().StartPipelineStage(wsId, cfg.Profile, "prepare").Return(nil) + + pollCount := 0 + mockClient.EXPECT().GetPipelineState(wsId, "prepare").RunAndReturn( + func(_ int, _ string) ([]api.PipelineStatus, error) { + pollCount++ + if pollCount == 1 { + return []api.PipelineStatus{ + statusWithSteps("codesphere-ide", "running", "running"), + }, nil + } + return []api.PipelineStatus{ + statusWithSteps("codesphere-ide", "success", "success"), + }, nil + }, + ) + + mockClient.EXPECT().StreamLogs( + mock.Anything, cfg.ApiUrl, wsId, "prepare", 0, mock.Anything, + ).Return(nil) + + err := runner.RunStages(wsId, []string{"prepare"}, cfg) + Expect(err).ToNot(HaveOccurred()) + }) + }) + + Context("with multiple steps", func() { + It("streams each step sequentially", func() { + mockClient.EXPECT().StartPipelineStage(wsId, cfg.Profile, "prepare").Return(nil) + + pollCount := 0 + mockClient.EXPECT().GetPipelineState(wsId, "prepare").RunAndReturn( + func(_ int, _ string) ([]api.PipelineStatus, error) { + pollCount++ + switch pollCount { + case 1: + return []api.PipelineStatus{ + statusWithSteps("codesphere-ide", "running", "running", "waiting"), + }, nil + case 2: + return []api.PipelineStatus{ + statusWithSteps("codesphere-ide", "running", "success", "running"), + }, nil + default: + return []api.PipelineStatus{ + statusWithSteps("codesphere-ide", "success", "success", "success"), + }, nil + } + }, + ) + + // Step 0 stream + step0Called := make(chan struct{}) + mockClient.EXPECT().StreamLogs( + mock.Anything, cfg.ApiUrl, wsId, "prepare", 0, mock.Anything, + ).RunAndReturn(func(_ context.Context, _ string, _ int, _ string, _ int, _ io.Writer) error { + close(step0Called) + return nil + }) + + // Step 1 stream — only called after step 0 + mockClient.EXPECT().StreamLogs( + mock.Anything, cfg.ApiUrl, wsId, "prepare", 1, mock.Anything, + ).RunAndReturn(func(_ context.Context, _ string, _ int, _ string, _ int, _ io.Writer) error { + select { + case <-step0Called: + // good — step 0 was called first + default: + return fmt.Errorf("step 1 started before step 0") + } + return nil + }) + + err := runner.RunStages(wsId, []string{"prepare"}, cfg) + Expect(err).ToNot(HaveOccurred()) + }) + }) + + Context("when ApiUrl is empty", func() { + It("does not call StreamLogs", func() { + cfg.ApiUrl = "" + + startCall := mockClient.EXPECT().StartPipelineStage(wsId, cfg.Profile, "prepare").Return(nil).Call + mockClient.EXPECT().GetPipelineState(wsId, "prepare").Return([]api.PipelineStatus{ + statusWithSteps("codesphere-ide", "success", "success"), + }, nil).NotBefore(startCall) + // StreamLogs should NOT be called — mockery will fail if it is + + err := runner.RunStages(wsId, []string{"prepare"}, cfg) + Expect(err).ToNot(HaveOccurred()) + }) + }) + + Context("for the run stage", func() { + It("does not stream logs", func() { + syncCall := mockClient.EXPECT().DeployLandscape(wsId, cfg.Profile).Return(nil).Call + startCall := mockClient.EXPECT().StartPipelineStage(wsId, cfg.Profile, "run").Return(nil).NotBefore(syncCall) + mockClient.EXPECT().GetPipelineState(wsId, "run").Return([]api.PipelineStatus{ + {State: "running", Replica: "0", Server: "A"}, + {State: "waiting", Replica: "0", Server: "codesphere-ide"}, + }, nil).NotBefore(startCall) + // StreamLogs should NOT be called + + err := runner.RunStages(wsId, []string{"run"}, cfg) + Expect(err).ToNot(HaveOccurred()) + }) + }) + }) +}) From 322050a6ad17abe2432615e0d5cdb8cee49ea483 Mon Sep 17 00:00:00 2001 From: Alex <132889147+alexvcodesphere@users.noreply.github.com> Date: Wed, 18 Mar 2026 16:26:37 +0100 Subject: [PATCH 2/2] =?UTF-8?q?refactor:=20address=20PR=20review=20?= =?UTF-8?q?=E2=80=94=20simplify=20streaming,=20remove=20apiUrl,=20extract?= =?UTF-8?q?=20stepStreamer?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Remove apiUrl from StreamLogs interface/Config — client uses internal baseUrl - Extract inline closures into stepStreamer struct (step_streamer.go) - Simplify drain: cancel() + wg.Wait() instead of timeout race - Use defer streamer.drain() for cleanup safety - Rename waitForStageWithStepCallback → waitForStage (no callback indirection) - Replace fmt with log framework in pipeline.go - Only stream steps with state 'running' (not already-completed ones) - Regenerate mocks Signed-off-by: Alex <132889147+alexvcodesphere@users.noreply.github.com> --- api/client.go | 14 +++--- api/workspace.go | 4 +- cli/cmd/client.go | 2 +- cli/cmd/mocks.go | 38 +++++++--------- pkg/pipeline/mocks.go | 38 +++++++--------- pkg/pipeline/pipeline.go | 82 ++++++----------------------------- pkg/pipeline/pipeline_test.go | 17 +++----- pkg/pipeline/step_streamer.go | 74 +++++++++++++++++++++++++++++++ 8 files changed, 137 insertions(+), 132 deletions(-) create mode 100644 pkg/pipeline/step_streamer.go diff --git a/api/client.go b/api/client.go index 51d794a..e2d459f 100644 --- a/api/client.go +++ b/api/client.go @@ -13,9 +13,10 @@ import ( ) type Client struct { - ctx context.Context - api *openapi_client.APIClient - time Time + ctx context.Context + api *openapi_client.APIClient + time Time + baseUrl *url.URL } type Configuration struct { @@ -43,9 +44,10 @@ func (c Configuration) GetApiUrl() *url.URL { // For use in tests func NewClientWithCustomDeps(ctx context.Context, opts Configuration, api *openapi_client.APIClient, time Time) *Client { return &Client{ - ctx: context.WithValue(ctx, openapi_client.ContextAccessToken, opts.Token), - api: api, - time: time, + ctx: context.WithValue(ctx, openapi_client.ContextAccessToken, opts.Token), + api: api, + time: time, + baseUrl: opts.GetApiUrl(), } } diff --git a/api/workspace.go b/api/workspace.go index 7f79c8d..48e22d5 100644 --- a/api/workspace.go +++ b/api/workspace.go @@ -238,8 +238,8 @@ type logEntry struct { // log entries to the provided writer until the context is cancelled or the // stream ends. This is used during pipeline execution to provide real-time // log output. -func (c *Client) StreamLogs(ctx context.Context, apiUrl string, wsId int, stage string, step int, w io.Writer) error { - endpoint := fmt.Sprintf("%s/workspaces/%d/logs/%s/%d", apiUrl, wsId, stage, step) +func (c *Client) StreamLogs(ctx context.Context, wsId int, stage string, step int, w io.Writer) error { + endpoint := fmt.Sprintf("%s/workspaces/%d/logs/%s/%d", c.baseUrl.String(), wsId, stage, step) req, err := http.NewRequestWithContext(ctx, "GET", endpoint, nil) if err != nil { diff --git a/cli/cmd/client.go b/cli/cmd/client.go index f54e64b..6f1a5b4 100644 --- a/cli/cmd/client.go +++ b/cli/cmd/client.go @@ -33,7 +33,7 @@ type Client interface { GetPipelineState(wsId int, stage string) ([]api.PipelineStatus, error) GitPull(wsId int, remote string, branch string) error DeployLandscape(wsId int, profile string) error - StreamLogs(ctx context.Context, apiUrl string, wsId int, stage string, step int, w io.Writer) error + StreamLogs(ctx context.Context, wsId int, stage string, step int, w io.Writer) error } // CommandExecutor abstracts command execution for testing diff --git a/cli/cmd/mocks.go b/cli/cmd/mocks.go index 29e1e4f..03e76f4 100644 --- a/cli/cmd/mocks.go +++ b/cli/cmd/mocks.go @@ -946,16 +946,16 @@ func (_c *MockClient_StartPipelineStage_Call) RunAndReturn(run func(wsId int, pr } // StreamLogs provides a mock function for the type MockClient -func (_mock *MockClient) StreamLogs(ctx context.Context, apiUrl string, wsId int, stage string, step int, w io.Writer) error { - ret := _mock.Called(ctx, apiUrl, wsId, stage, step, w) +func (_mock *MockClient) StreamLogs(ctx context.Context, wsId int, stage string, step int, w io.Writer) error { + ret := _mock.Called(ctx, wsId, stage, step, w) if len(ret) == 0 { panic("no return value specified for StreamLogs") } var r0 error - if returnFunc, ok := ret.Get(0).(func(context.Context, string, int, string, int, io.Writer) error); ok { - r0 = returnFunc(ctx, apiUrl, wsId, stage, step, w) + if returnFunc, ok := ret.Get(0).(func(context.Context, int, string, int, io.Writer) error); ok { + r0 = returnFunc(ctx, wsId, stage, step, w) } else { r0 = ret.Error(0) } @@ -969,40 +969,35 @@ type MockClient_StreamLogs_Call struct { // StreamLogs is a helper method to define mock.On call // - ctx context.Context -// - apiUrl string // - wsId int // - stage string // - step int // - w io.Writer -func (_e *MockClient_Expecter) StreamLogs(ctx interface{}, apiUrl interface{}, wsId interface{}, stage interface{}, step interface{}, w interface{}) *MockClient_StreamLogs_Call { - return &MockClient_StreamLogs_Call{Call: _e.mock.On("StreamLogs", ctx, apiUrl, wsId, stage, step, w)} +func (_e *MockClient_Expecter) StreamLogs(ctx interface{}, wsId interface{}, stage interface{}, step interface{}, w interface{}) *MockClient_StreamLogs_Call { + return &MockClient_StreamLogs_Call{Call: _e.mock.On("StreamLogs", ctx, wsId, stage, step, w)} } -func (_c *MockClient_StreamLogs_Call) Run(run func(ctx context.Context, apiUrl string, wsId int, stage string, step int, w io.Writer)) *MockClient_StreamLogs_Call { +func (_c *MockClient_StreamLogs_Call) Run(run func(ctx context.Context, wsId int, stage string, step int, w io.Writer)) *MockClient_StreamLogs_Call { _c.Call.Run(func(args mock.Arguments) { var arg0 context.Context if args[0] != nil { arg0 = args[0].(context.Context) } - var arg1 string + var arg1 int if args[1] != nil { - arg1 = args[1].(string) + arg1 = args[1].(int) } - var arg2 int + var arg2 string if args[2] != nil { - arg2 = args[2].(int) + arg2 = args[2].(string) } - var arg3 string + var arg3 int if args[3] != nil { - arg3 = args[3].(string) + arg3 = args[3].(int) } - var arg4 int + var arg4 io.Writer if args[4] != nil { - arg4 = args[4].(int) - } - var arg5 io.Writer - if args[5] != nil { - arg5 = args[5].(io.Writer) + arg4 = args[4].(io.Writer) } run( arg0, @@ -1010,7 +1005,6 @@ func (_c *MockClient_StreamLogs_Call) Run(run func(ctx context.Context, apiUrl s arg2, arg3, arg4, - arg5, ) }) return _c @@ -1021,7 +1015,7 @@ func (_c *MockClient_StreamLogs_Call) Return(err error) *MockClient_StreamLogs_C return _c } -func (_c *MockClient_StreamLogs_Call) RunAndReturn(run func(ctx context.Context, apiUrl string, wsId int, stage string, step int, w io.Writer) error) *MockClient_StreamLogs_Call { +func (_c *MockClient_StreamLogs_Call) RunAndReturn(run func(ctx context.Context, wsId int, stage string, step int, w io.Writer) error) *MockClient_StreamLogs_Call { _c.Call.Return(run) return _c } diff --git a/pkg/pipeline/mocks.go b/pkg/pipeline/mocks.go index dc0aeb6..d3ee76a 100644 --- a/pkg/pipeline/mocks.go +++ b/pkg/pipeline/mocks.go @@ -227,16 +227,16 @@ func (_c *MockClient_StartPipelineStage_Call) RunAndReturn(run func(wsId int, pr } // StreamLogs provides a mock function for the type MockClient -func (_mock *MockClient) StreamLogs(ctx context.Context, apiUrl string, wsId int, stage string, step int, w io.Writer) error { - ret := _mock.Called(ctx, apiUrl, wsId, stage, step, w) +func (_mock *MockClient) StreamLogs(ctx context.Context, wsId int, stage string, step int, w io.Writer) error { + ret := _mock.Called(ctx, wsId, stage, step, w) if len(ret) == 0 { panic("no return value specified for StreamLogs") } var r0 error - if returnFunc, ok := ret.Get(0).(func(context.Context, string, int, string, int, io.Writer) error); ok { - r0 = returnFunc(ctx, apiUrl, wsId, stage, step, w) + if returnFunc, ok := ret.Get(0).(func(context.Context, int, string, int, io.Writer) error); ok { + r0 = returnFunc(ctx, wsId, stage, step, w) } else { r0 = ret.Error(0) } @@ -250,40 +250,35 @@ type MockClient_StreamLogs_Call struct { // StreamLogs is a helper method to define mock.On call // - ctx context.Context -// - apiUrl string // - wsId int // - stage string // - step int // - w io.Writer -func (_e *MockClient_Expecter) StreamLogs(ctx interface{}, apiUrl interface{}, wsId interface{}, stage interface{}, step interface{}, w interface{}) *MockClient_StreamLogs_Call { - return &MockClient_StreamLogs_Call{Call: _e.mock.On("StreamLogs", ctx, apiUrl, wsId, stage, step, w)} +func (_e *MockClient_Expecter) StreamLogs(ctx interface{}, wsId interface{}, stage interface{}, step interface{}, w interface{}) *MockClient_StreamLogs_Call { + return &MockClient_StreamLogs_Call{Call: _e.mock.On("StreamLogs", ctx, wsId, stage, step, w)} } -func (_c *MockClient_StreamLogs_Call) Run(run func(ctx context.Context, apiUrl string, wsId int, stage string, step int, w io.Writer)) *MockClient_StreamLogs_Call { +func (_c *MockClient_StreamLogs_Call) Run(run func(ctx context.Context, wsId int, stage string, step int, w io.Writer)) *MockClient_StreamLogs_Call { _c.Call.Run(func(args mock.Arguments) { var arg0 context.Context if args[0] != nil { arg0 = args[0].(context.Context) } - var arg1 string + var arg1 int if args[1] != nil { - arg1 = args[1].(string) + arg1 = args[1].(int) } - var arg2 int + var arg2 string if args[2] != nil { - arg2 = args[2].(int) + arg2 = args[2].(string) } - var arg3 string + var arg3 int if args[3] != nil { - arg3 = args[3].(string) + arg3 = args[3].(int) } - var arg4 int + var arg4 io.Writer if args[4] != nil { - arg4 = args[4].(int) - } - var arg5 io.Writer - if args[5] != nil { - arg5 = args[5].(io.Writer) + arg4 = args[4].(io.Writer) } run( arg0, @@ -291,7 +286,6 @@ func (_c *MockClient_StreamLogs_Call) Run(run func(ctx context.Context, apiUrl s arg2, arg3, arg4, - arg5, ) }) return _c @@ -302,7 +296,7 @@ func (_c *MockClient_StreamLogs_Call) Return(err error) *MockClient_StreamLogs_C return _c } -func (_c *MockClient_StreamLogs_Call) RunAndReturn(run func(ctx context.Context, apiUrl string, wsId int, stage string, step int, w io.Writer) error) *MockClient_StreamLogs_Call { +func (_c *MockClient_StreamLogs_Call) RunAndReturn(run func(ctx context.Context, wsId int, stage string, step int, w io.Writer) error) *MockClient_StreamLogs_Call { _c.Call.Return(run) return _c } diff --git a/pkg/pipeline/pipeline.go b/pkg/pipeline/pipeline.go index 249ed2b..48c727a 100644 --- a/pkg/pipeline/pipeline.go +++ b/pkg/pipeline/pipeline.go @@ -10,9 +10,7 @@ import ( "fmt" "io" "log" - "os" "slices" - "sync" "time" "github.com/codesphere-cloud/cs-go/api" @@ -25,14 +23,13 @@ type Client interface { StartPipelineStage(wsId int, profile string, stage string) error GetPipelineState(wsId int, stage string) ([]api.PipelineStatus, error) DeployLandscape(wsId int, profile string) error - StreamLogs(ctx context.Context, apiUrl string, wsId int, stage string, step int, w io.Writer) error + StreamLogs(ctx context.Context, wsId int, stage string, step int, w io.Writer) error } // Config holds parameters for pipeline execution. type Config struct { Profile string Timeout time.Duration - ApiUrl string } // Runner orchestrates pipeline stage execution. @@ -61,11 +58,11 @@ func (r *Runner) RunStages(wsId int, stages []string, cfg Config) error { for _, stage := range stages { // Sync the landscape before the run stage if stage == "run" { - fmt.Println(" 🔄 Syncing landscape...") + log.Println(" 🔄 Syncing landscape...") if err := r.Client.DeployLandscape(wsId, cfg.Profile); err != nil { return fmt.Errorf("syncing landscape: %w", err) } - fmt.Println(" ✅ Landscape synced.") + log.Println(" ✅ Landscape synced.") } if err := r.runStage(wsId, stage, cfg); err != nil { @@ -83,63 +80,13 @@ func (r *Runner) runStage(wsId int, stage string, cfg Config) error { return fmt.Errorf("failed to start pipeline stage %s: %w", stage, err) } - // Step-aware log streaming for non-run stages. - // Each step gets its own context; when a new step is discovered the - // previous step's stream is cancelled and drained before moving on. - streamEnabled := stage != "run" && cfg.ApiUrl != "" - streamingStep := -1 - var stepCancel context.CancelFunc - var stepWg sync.WaitGroup - - // drainStream waits for the current stream to deliver logs, then cancels. - drainStream := func() { - if stepCancel == nil { - return - } - done := make(chan struct{}) - go func() { - stepWg.Wait() - close(done) - }() - select { - case <-done: - case <-time.After(3 * time.Second): - stepCancel() - stepWg.Wait() - } - } - - startStreamForStep := func(step int, totalSteps int) { - if !streamEnabled || step <= streamingStep { - return - } - - // Drain previous step before starting next - drainStream() - - streamingStep = step - fmt.Printf("\n 📋 Step %d/%d\n", step+1, totalSteps) - - ctx, cancel := context.WithCancel(context.Background()) - stepCancel = cancel - stepWg.Add(1) - go func() { - defer stepWg.Done() - if err := r.Client.StreamLogs(ctx, cfg.ApiUrl, wsId, stage, step, os.Stdout); err != nil { - _, _ = fmt.Fprintf(os.Stderr, "⚠ log stream error (step %d): %v\n", step, err) - } - }() - } - - err := r.waitForStageWithStepCallback(wsId, stage, cfg, startStreamForStep) - - // Drain final step's logs - drainStream() + streamer := newStepStreamer(r.Client, wsId, stage) + defer streamer.drain() - return err + return r.waitForStage(wsId, stage, cfg, streamer) } -func (r *Runner) waitForStageWithStepCallback(wsId int, stage string, cfg Config, onStep func(step int, total int)) error { +func (r *Runner) waitForStage(wsId int, stage string, cfg Config, streamer *stepStreamer) error { delay := 5 * time.Second timeout := cfg.Timeout if timeout == 0 { @@ -156,17 +103,14 @@ func (r *Runner) waitForStageWithStepCallback(wsId int, stage string, cfg Config } // Discover active step from IDE server's Steps array - if onStep != nil { - for _, s := range status { - if s.Server == IdeServer { - total := len(s.Steps) - for i, step := range s.Steps { - if step.State == "running" || step.State == "success" { - onStep(i, total) - } + for _, s := range status { + if s.Server == IdeServer { + for i, step := range s.Steps { + if step.State == "running" { + streamer.startStep(i, len(s.Steps)) } - break } + break } } diff --git a/pkg/pipeline/pipeline_test.go b/pkg/pipeline/pipeline_test.go index fc6e458..c4cffad 100644 --- a/pkg/pipeline/pipeline_test.go +++ b/pkg/pipeline/pipeline_test.go @@ -45,7 +45,6 @@ var _ = Describe("Runner", func() { cfg = pipeline.Config{ Profile: "", Timeout: 30 * time.Second, - ApiUrl: "https://codesphere.com/api", } }) @@ -84,7 +83,7 @@ var _ = Describe("Runner", func() { ) mockClient.EXPECT().StreamLogs( - mock.Anything, cfg.ApiUrl, wsId, "prepare", 0, mock.Anything, + mock.Anything, wsId, "prepare", 0, mock.Anything, ).Return(nil) err := runner.RunStages(wsId, []string{"prepare"}, cfg) @@ -120,16 +119,16 @@ var _ = Describe("Runner", func() { // Step 0 stream step0Called := make(chan struct{}) mockClient.EXPECT().StreamLogs( - mock.Anything, cfg.ApiUrl, wsId, "prepare", 0, mock.Anything, - ).RunAndReturn(func(_ context.Context, _ string, _ int, _ string, _ int, _ io.Writer) error { + mock.Anything, wsId, "prepare", 0, mock.Anything, + ).RunAndReturn(func(_ context.Context, _ int, _ string, _ int, _ io.Writer) error { close(step0Called) return nil }) // Step 1 stream — only called after step 0 mockClient.EXPECT().StreamLogs( - mock.Anything, cfg.ApiUrl, wsId, "prepare", 1, mock.Anything, - ).RunAndReturn(func(_ context.Context, _ string, _ int, _ string, _ int, _ io.Writer) error { + mock.Anything, wsId, "prepare", 1, mock.Anything, + ).RunAndReturn(func(_ context.Context, _ int, _ string, _ int, _ io.Writer) error { select { case <-step0Called: // good — step 0 was called first @@ -144,15 +143,13 @@ var _ = Describe("Runner", func() { }) }) - Context("when ApiUrl is empty", func() { + Context("when stage finishes immediately", func() { It("does not call StreamLogs", func() { - cfg.ApiUrl = "" - startCall := mockClient.EXPECT().StartPipelineStage(wsId, cfg.Profile, "prepare").Return(nil).Call mockClient.EXPECT().GetPipelineState(wsId, "prepare").Return([]api.PipelineStatus{ statusWithSteps("codesphere-ide", "success", "success"), }, nil).NotBefore(startCall) - // StreamLogs should NOT be called — mockery will fail if it is + // StreamLogs should NOT be called err := runner.RunStages(wsId, []string{"prepare"}, cfg) Expect(err).ToNot(HaveOccurred()) diff --git a/pkg/pipeline/step_streamer.go b/pkg/pipeline/step_streamer.go new file mode 100644 index 0000000..ef74f55 --- /dev/null +++ b/pkg/pipeline/step_streamer.go @@ -0,0 +1,74 @@ +// Copyright (c) Codesphere Inc. +// SPDX-License-Identifier: Apache-2.0 + +package pipeline + +import ( + "context" + "log" + "os" + "sync" +) + +// stepStreamer manages log streaming for individual pipeline steps. +// It ensures only one step streams at a time and drains the previous +// step's stream before starting the next one. +type stepStreamer struct { + client Client + wsId int + stage string + enabled bool + + currentStep int + cancel context.CancelFunc + wg sync.WaitGroup +} + +// newStepStreamer creates a streamer for the given stage. +// Streaming is disabled for the "run" stage. +func newStepStreamer(client Client, wsId int, stage string) *stepStreamer { + return &stepStreamer{ + client: client, + wsId: wsId, + stage: stage, + enabled: stage != "run", + currentStep: -1, + } +} + +// startStep begins streaming logs for a new step, draining any +// previous step first. It is safe to call multiple times with the +// same step number. +func (s *stepStreamer) startStep(step int, totalSteps int) { + if !s.enabled || step <= s.currentStep { + return + } + + // Drain previous step before starting next + s.drain() + + s.currentStep = step + log.Printf(" 📋 Step %d/%d", step+1, totalSteps) + + ctx, cancel := context.WithCancel(context.Background()) + s.cancel = cancel + s.wg.Add(1) + go func() { + defer s.wg.Done() + if err := s.client.StreamLogs(ctx, s.wsId, s.stage, step, os.Stdout); err != nil { + if ctx.Err() == nil { + log.Printf("⚠ log stream error (step %d): %v", step, err) + } + } + }() +} + +// drain cancels the active stream and waits for it to finish. +func (s *stepStreamer) drain() { + if s.cancel == nil { + return + } + s.cancel() + s.wg.Wait() + s.cancel = nil +}