diff --git a/internal/experimental/k8s/ate/client.go b/internal/experimental/k8s/ate/client.go index 8f36a17..4237285 100644 --- a/internal/experimental/k8s/ate/client.go +++ b/internal/experimental/k8s/ate/client.go @@ -69,6 +69,19 @@ func (c *Client) CreateActor(ctx context.Context, id string) (*ateapipb.CreateAc return resp, nil } +// ResumeActor resumes the actor, scheduling it onto a worker. The returned +// actor carries the worker IP once it is running. +func (c *Client) ResumeActor(ctx context.Context, id string) (*ateapipb.ResumeActorResponse, error) { + client := ateapipb.NewControlClient(c.conn) + resp, err := client.ResumeActor(ctx, &ateapipb.ResumeActorRequest{ + ActorId: id, + }) + if err != nil { + return nil, fmt.Errorf("error when calling Control.ResumeActor: %w", err) + } + return resp, nil +} + // SuspendActor suspends the actor. func (c *Client) SuspendActor(ctx context.Context, id string) (*ateapipb.SuspendActorResponse, error) { client := ateapipb.NewControlClient(c.conn) diff --git a/internal/harness/substrate.go b/internal/harness/substrate.go index 9dafb9c..9126ea4 100644 --- a/internal/harness/substrate.go +++ b/internal/harness/substrate.go @@ -16,6 +16,7 @@ package harness import ( "context" + "crypto/tls" "errors" "fmt" "io" @@ -24,7 +25,10 @@ import ( "time" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" "github.com/google/ax/internal/experimental/k8s/ate" "github.com/google/ax/proto" @@ -49,7 +53,8 @@ func NewSubstrateHarness(endpoint string, namespace string, template string, por if template == "" { template = "ax-harness-template" } - client, err := ate.NewClient(namespace, template, endpoint) + controlCreds := grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})) + client, err := ate.NewClient(namespace, template, endpoint, controlCreds) if err != nil { return nil, fmt.Errorf("failed to create ATE client: %w", err) } @@ -69,11 +74,18 @@ func (h *SubstrateHarness) Start(ctx context.Context, conversationID string) (Ex return nil, errors.New("SubstrateHarness needs valid conversationID") } - resp, err := h.ateClient.CreateActor(ctx, conversationID) - if err != nil { + // CreateActor is idempotent here: on follow-up turns the actor was created + // (and suspended) on a previous turn, so AlreadyExists is expected and fine. + if _, err := h.ateClient.CreateActor(ctx, conversationID); err != nil && status.Code(err) != codes.AlreadyExists { return nil, fmt.Errorf("failed to create substrate actor %s: %w", conversationID, err) } - actor := resp.Actor + + // Resume the actor so it is scheduled onto a worker and gets a routable IP. + resumeResp, err := h.ateClient.ResumeActor(ctx, conversationID) + if err != nil { + return nil, fmt.Errorf("failed to resume substrate actor %s: %w", conversationID, err) + } + actor := resumeResp.Actor if actor == nil { return nil, fmt.Errorf("received nil actor in response for %s", conversationID) } diff --git a/internal/manifests/README.md b/internal/manifests/README.md index d44ed46..acb6cb2 100644 --- a/internal/manifests/README.md +++ b/internal/manifests/README.md @@ -41,6 +41,12 @@ This command will: - Create the `ax` namespace. - Create the `WorkerPool` and `ActorTemplate` for the AX harness. - Create the `ax-server` `ReplicaSet` (the controller front-end). +- Create the `ax-server-config` `ConfigMap` that tells the `ax-server` which + harnesses to serve (mounted at `/etc/ax/ax.yaml`). + +The harness registry lives in that `ConfigMap`. By default it registers a +substrate harness (`hello-world`) backed by the `ax-harness-template`, marked as +the default via `harnesses.default`. Wait until the template is ready: ```bash @@ -70,9 +76,9 @@ Conversation: fb344a18-3720-4c4f-8a6e-2ce34db975b3 ⏺ hello -Hello world +hello world ``` -*The request is served directly by the `ax-server`.* +*The request is served by the harness actor running on Substrate.* ## 🧹 How to Uninstall