Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .mockery.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,11 @@ packages:
config:
all: true
interfaces:
github.com/codesphere-cloud/cs-go/pkg/deploy:
config:
all: true
interfaces:
github.com/codesphere-cloud/cs-go/pkg/pipeline:
config:
all: true
interfaces:
14 changes: 8 additions & 6 deletions api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ import (
)

type Client struct {
ctx context.Context
api *openapi_client.APIClient
time Time
ctx context.Context
api *openapi_client.APIClient
time Time
baseUrl *url.URL
}

type Configuration struct {
Expand Down Expand Up @@ -43,9 +44,10 @@ func (c Configuration) GetApiUrl() *url.URL {
// For use in tests
func NewClientWithCustomDeps(ctx context.Context, opts Configuration, api *openapi_client.APIClient, time Time) *Client {
return &Client{
ctx: context.WithValue(ctx, openapi_client.ContextAccessToken, opts.Token),
api: api,
time: time,
ctx: context.WithValue(ctx, openapi_client.ContextAccessToken, opts.Token),
api: api,
time: time,
baseUrl: opts.GetApiUrl(),
}
}

Expand Down
101 changes: 101 additions & 0 deletions api/workspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,14 @@
package api

import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"strings"

"github.com/codesphere-cloud/cs-go/api/errors"
"github.com/codesphere-cloud/cs-go/api/openapi_client"
Expand Down Expand Up @@ -219,3 +226,97 @@ func (c Client) GitPull(workspaceId int, remote string, branch string) error {
r, err := req.Execute()
return errors.FormatAPIError(r, err)
}

// logEntry represents a single log line from the SSE stream.
type logEntry struct {
Timestamp string `json:"timestamp"`
Kind string `json:"kind"`
Data string `json:"data"`
}

// StreamLogs connects to the Codesphere SSE log endpoint and writes parsed
// log entries to the provided writer until the context is cancelled or the
// stream ends. This is used during pipeline execution to provide real-time
// log output.
func (c *Client) StreamLogs(ctx context.Context, wsId int, stage string, step int, w io.Writer) error {
endpoint := fmt.Sprintf("%s/workspaces/%d/logs/%s/%d", c.baseUrl.String(), wsId, stage, step)

req, err := http.NewRequestWithContext(ctx, "GET", endpoint, nil)
if err != nil {
return fmt.Errorf("failed to construct log stream request: %w", err)
}

req.Header.Set("Accept", "text/event-stream")

// Set auth from the client's context token
if token, ok := ctx.Value(openapi_client.ContextAccessToken).(string); ok && token != "" {
req.Header.Set("Authorization", "Bearer "+token)
} else if token, ok := c.ctx.Value(openapi_client.ContextAccessToken).(string); ok && token != "" {
req.Header.Set("Authorization", "Bearer "+token)
}

resp, err := http.DefaultClient.Do(req)
if err != nil {
// Context cancellation is expected when the stage finishes
if ctx.Err() != nil {
return nil
}
return fmt.Errorf("failed to connect to log stream: %w", err)
}
defer func() { _ = resp.Body.Close() }()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("log stream responded with status %d", resp.StatusCode)
}

reader := bufio.NewReader(resp.Body)

for {
// Check if context is done
select {
case <-ctx.Done():
return nil
default:
}

// Parse one SSE event
var eventData string
for {
line, err := reader.ReadString('\n')
if err != nil {
if ctx.Err() != nil || err == io.EOF {
return nil
}
return fmt.Errorf("failed to read log stream: %w", err)
}

line = strings.TrimSpace(line)

if strings.HasPrefix(line, "data:") {
data := strings.TrimSpace(strings.TrimPrefix(line, "data:"))
if eventData != "" {
eventData += "\n" + data
} else {
eventData = data
}
} else if line == "" && eventData != "" {
// Empty line marks end of SSE event
break
}
}

// Parse and print log entries
var entries []logEntry
if err := json.Unmarshal([]byte(eventData), &entries); err != nil {
// Skip unparseable events (e.g. error responses)
log.Printf("⚠ log stream: %s", eventData)
eventData = ""
continue
}

for _, entry := range entries {
_, _ = fmt.Fprintf(w, "%s | %s\n", entry.Timestamp, entry.Data)
}
eventData = ""
}
}
1 change: 1 addition & 0 deletions cli/cmd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Client interface {
GetPipelineState(wsId int, stage string) ([]api.PipelineStatus, error)
GitPull(wsId int, remote string, branch string) error
DeployLandscape(wsId int, profile string) error
StreamLogs(ctx context.Context, wsId int, stage string, step int, w io.Writer) error
}

// CommandExecutor abstracts command execution for testing
Expand Down
75 changes: 75 additions & 0 deletions cli/cmd/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

114 changes: 6 additions & 108 deletions cli/cmd/start_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@ package cmd

import (
"fmt"
"log"
"slices"
"time"

"github.com/codesphere-cloud/cs-go/api"
"github.com/codesphere-cloud/cs-go/pkg/io"
"github.com/codesphere-cloud/cs-go/pkg/pipeline"

"github.com/spf13/cobra"
)
Expand All @@ -27,8 +26,6 @@ type StartPipelineOpts struct {
Timeout *time.Duration
}

const IdeServer string = "codesphere-ide"

func (c *StartPipelineCmd) RunE(_ *cobra.Command, args []string) error {

workspaceId, err := c.Opts.GetWorkspaceId()
Expand Down Expand Up @@ -80,108 +77,9 @@ func AddStartPipelineCmd(start *cobra.Command, opts *GlobalOptions) {
}

func (c *StartPipelineCmd) StartPipelineStages(client Client, wsId int, stages []string) error {
for _, stage := range stages {
if !isValidStage(stage) {
return fmt.Errorf("invalid pipeline stage: %s", stage)
}
}
for _, stage := range stages {
err := c.startStage(client, wsId, stage)
if err != nil {
return err
}
}
return nil
}

func isValidStage(stage string) bool {
return slices.Contains([]string{"prepare", "test", "run"}, stage)
}

func (c *StartPipelineCmd) startStage(client Client, wsId int, stage string) error {
log.Printf("starting %s stage on workspace %d...", stage, wsId)

err := client.StartPipelineStage(wsId, *c.Opts.Profile, stage)
if err != nil {
log.Println()
return fmt.Errorf("failed to start pipeline stage %s: %w", stage, err)
}

err = c.waitForPipelineStage(client, wsId, stage)
if err != nil {
return fmt.Errorf("failed waiting for stage %s to finish: %w", stage, err)

}
return nil
}

func (c *StartPipelineCmd) waitForPipelineStage(client Client, wsId int, stage string) error {
delay := 5 * time.Second

maxWaitTime := c.Time.Now().Add(*c.Opts.Timeout)
for {
status, err := client.GetPipelineState(wsId, stage)
if err != nil {
log.Printf("\nError getting pipeline status: %s, trying again...", err.Error())
c.Time.Sleep(delay)
continue
}

if c.allFinished(status) {
log.Println("(finished)")
break
}

if allRunning(status) && stage == "run" {
log.Println("(running)")
break
}

err = shouldAbort(status)
if err != nil {
log.Println("(failed)")
return fmt.Errorf("stage %s failed: %w", stage, err)
}

log.Print(".")
if c.Time.Now().After(maxWaitTime) {
log.Println()
return fmt.Errorf("timed out waiting for pipeline stage %s to be complete", stage)
}
c.Time.Sleep(delay)
}
return nil
}

func allRunning(status []api.PipelineStatus) bool {
for _, s := range status {
// Run stage is only running customer servers, ignore IDE server
if s.Server != IdeServer && s.State != "running" {
return false
}
}
return true
}

func (c *StartPipelineCmd) allFinished(status []api.PipelineStatus) bool {
io.Verboseln(c.Opts.Verbose, "====")
for _, s := range status {
io.Verbosef(c.Opts.Verbose, "Server: %s, State: %s, Replica: %s\n", s.Server, s.State, s.Replica)
}
for _, s := range status {
// Prepare and Test stage is only running in the IDE server, ignore customer servers
if s.Server == IdeServer && s.State != "success" {
return false
}
}
return true
}

func shouldAbort(status []api.PipelineStatus) error {
for _, s := range status {
if slices.Contains([]string{"failure", "aborted"}, s.State) {
return fmt.Errorf("server %s, replica %s reached unexpected state %s", s.Server, s.Replica, s.State)
}
}
return nil
runner := pipeline.NewRunner(client, c.Time)
return runner.RunStages(wsId, stages, pipeline.Config{
Profile: *c.Opts.Profile,
Timeout: *c.Opts.Timeout,
})
}
Loading
Loading