From 25db0ce9a7843b984e38c8cdb266dbad31d08fe9 Mon Sep 17 00:00:00 2001 From: Paramjit Singh Sandhu Date: Tue, 2 Jun 2026 09:18:48 -0700 Subject: [PATCH 1/4] feat: implement interactive gRPC harness client and support stream termination via go_away command --- cmd/ax/harness.go | 23 ++++++-- cmd/ax/harnessclient.go | 128 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 146 insertions(+), 5 deletions(-) create mode 100644 cmd/ax/harnessclient.go diff --git a/cmd/ax/harness.go b/cmd/ax/harness.go index 51c53eb..00dce1f 100644 --- a/cmd/ax/harness.go +++ b/cmd/ax/harness.go @@ -82,12 +82,10 @@ func NewHarnessServiceServer() *HarnessServiceServer { } // Connect implements the bidirectional gRPC streaming capability. -// It receives client inputs and responds only with "Hello world". +// It receives client inputs and responds with "hello world" unless the input message text is "go_away". func (s *HarnessServiceServer) Connect(stream proto.HarnessService_ConnectServer) error { - // TODO: Connect will be implemented to serve the built in harnesses - // as an isolated actor. for { - _, err := stream.Recv() + req, err := stream.Recv() if err == io.EOF { return nil } @@ -95,6 +93,21 @@ func (s *HarnessServiceServer) Connect(stream proto.HarnessService_ConnectServer return err } + shouldGoAway := false + for _, m := range req.Messages { + if textBlock, ok := m.Content.Type.(*proto.Content_Text); ok { + if textBlock.Text.Text == "go_away" { + shouldGoAway = true + break + } + } + } + + if shouldGoAway { + log.Println("Received 'go_away' message, closing stream...") + return nil + } + err = stream.Send(&proto.HarnessMessage{ Messages: []*proto.Message{ { @@ -102,7 +115,7 @@ func (s *HarnessServiceServer) Connect(stream proto.HarnessService_ConnectServer Content: &proto.Content{ Type: &proto.Content_Text{ Text: &proto.TextContent{ - Text: "Hello world", + Text: "hello world", }, }, }, diff --git a/cmd/ax/harnessclient.go b/cmd/ax/harnessclient.go new file mode 100644 index 0000000..c72ba14 --- /dev/null +++ b/cmd/ax/harnessclient.go @@ -0,0 +1,128 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package main implements a simple client for the fake HarnessService. +package main + +import ( + "bufio" + "context" + "fmt" + "io" + "log" + "os" + + "github.com/google/ax/proto" + "github.com/spf13/cobra" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +var ( + harnessServerAddr string +) + +var harnessClientCmd = &cobra.Command{ + Use: "harnessclient", + Short: "Run the harness client to connect to the server", + Hidden: true, + RunE: runHarnessClient, +} + +func init() { + harnessClientCmd.Flags().StringVar(&harnessServerAddr, "server", "localhost:50053", "The server address for the gRPC HarnessService.") + rootCmd.AddCommand(harnessClientCmd) +} + +func runHarnessClient(cmd *cobra.Command, args []string) error { + ctx := context.Background() + + log.Printf("Connecting to HarnessService at %s...", harnessServerAddr) + conn, err := grpc.NewClient(harnessServerAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return fmt.Errorf("failed to connect to server: %v", err) + } + defer conn.Close() + + client := proto.NewHarnessServiceClient(conn) + + stream, err := client.Connect(ctx) + if err != nil { + return fmt.Errorf("Failed to open connection stream: %v", err) + } + + scanner := bufio.NewScanner(os.Stdin) + fmt.Println("Interactive client started. Type your messages below.") + fmt.Println("Type 'go_away' to close the stream and exit.") + for { + fmt.Print("\nClient > ") + if !scanner.Scan() { + break + } + text := scanner.Text() + if text == "" { + continue + } + + msg := &proto.HarnessMessage{ + Messages: []*proto.Message{ + { + Role: "user", + Content: &proto.Content{ + Type: &proto.Content_Text{ + Text: &proto.TextContent{ + Text: text, + }, + }, + }, + }, + }, + } + + if err := stream.Send(msg); err != nil { + return fmt.Errorf("Failed to send message: %v", err) + } + + if text == "go_away" { + log.Println("Sending 'go_away' to close the stream...") + break + } + + resp, err := stream.Recv() + if err != nil { + return fmt.Errorf("Failed to receive response: %v", err) + } + + for i, m := range resp.Messages { + var textContent string + if textBlock, ok := m.Content.Type.(*proto.Content_Text); ok { + textContent = textBlock.Text.Text + } + fmt.Printf("Server > message[%d] (%s): %s\n", i, m.Role, textContent) + } + } + + // Close send side to signal request completion + if err := stream.CloseSend(); err != nil { + return fmt.Errorf("Failed to close send side of stream: %v", err) + } + + log.Println("Waiting for final stream EOF...") + _, err = stream.Recv() + if err != io.EOF { + return fmt.Errorf("Expected EOF from server, got: %v", err) + } + log.Println("Stream closed successfully by server.") + return nil +} From a01b921f97a371bc63a729959fe7912ee2450f01 Mon Sep 17 00:00:00 2001 From: Paramjit Singh Sandhu Date: Tue, 2 Jun 2026 09:26:09 -0700 Subject: [PATCH 2/4] Adding a todo comment. --- cmd/ax/harness.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/ax/harness.go b/cmd/ax/harness.go index 00dce1f..f6af3e4 100644 --- a/cmd/ax/harness.go +++ b/cmd/ax/harness.go @@ -83,6 +83,7 @@ func NewHarnessServiceServer() *HarnessServiceServer { // Connect implements the bidirectional gRPC streaming capability. // It receives client inputs and responds with "hello world" unless the input message text is "go_away". +// TODO(params): Update the implementation to be a proper one. func (s *HarnessServiceServer) Connect(stream proto.HarnessService_ConnectServer) error { for { req, err := stream.Recv() From c1a296ca0ece1ae32e703166a23caa00fc6f35c6 Mon Sep 17 00:00:00 2001 From: Paramjit Singh Sandhu Date: Tue, 2 Jun 2026 11:49:47 -0700 Subject: [PATCH 3/4] Adding a todo for goaway --- cmd/ax/harness.go | 1 + cmd/ax/harnessclient.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd/ax/harness.go b/cmd/ax/harness.go index f6af3e4..71275dc 100644 --- a/cmd/ax/harness.go +++ b/cmd/ax/harness.go @@ -97,6 +97,7 @@ func (s *HarnessServiceServer) Connect(stream proto.HarnessService_ConnectServer shouldGoAway := false for _, m := range req.Messages { if textBlock, ok := m.Content.Type.(*proto.Content_Text); ok { + // TODO(params): Replace this with a proper protocol for go away. if textBlock.Text.Text == "go_away" { shouldGoAway = true break diff --git a/cmd/ax/harnessclient.go b/cmd/ax/harnessclient.go index c72ba14..8c438f2 100644 --- a/cmd/ax/harnessclient.go +++ b/cmd/ax/harnessclient.go @@ -93,7 +93,7 @@ func runHarnessClient(cmd *cobra.Command, args []string) error { if err := stream.Send(msg); err != nil { return fmt.Errorf("Failed to send message: %v", err) } - + // TODO(params): Replace this with a proper protocol for go away. if text == "go_away" { log.Println("Sending 'go_away' to close the stream...") break From e543a20432bb45654a5ef436ad36f15135e0683c Mon Sep 17 00:00:00 2001 From: Paramjit Singh Sandhu Date: Tue, 2 Jun 2026 11:50:57 -0700 Subject: [PATCH 4/4] formatting --- cmd/ax/harnessclient.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/ax/harnessclient.go b/cmd/ax/harnessclient.go index 8c438f2..e02c361 100644 --- a/cmd/ax/harnessclient.go +++ b/cmd/ax/harnessclient.go @@ -93,7 +93,7 @@ func runHarnessClient(cmd *cobra.Command, args []string) error { if err := stream.Send(msg); err != nil { return fmt.Errorf("Failed to send message: %v", err) } - // TODO(params): Replace this with a proper protocol for go away. + // TODO(params): Replace this with a proper protocol for go away. if text == "go_away" { log.Println("Sending 'go_away' to close the stream...") break