diff --git a/test/integration/bbr/harness_test.go b/test/integration/bbr/harness_test.go new file mode 100644 index 000000000..468ce9a42 --- /dev/null +++ b/test/integration/bbr/harness_test.go @@ -0,0 +1,103 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package bbr + +import ( + "context" + "fmt" + "strings" + "testing" + + extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/server" + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" + "sigs.k8s.io/gateway-api-inference-extension/test/integration" +) + +var logger = logutil.NewTestLogger().V(logutil.VERBOSE) + +// BBRHarness encapsulates the environment for a single isolated BBR test run. +type BBRHarness struct { + t *testing.T + ctx context.Context + Client extProcPb.ExternalProcessor_ProcessClient + + // Internal handles for cleanup + server *runserver.ExtProcServerRunner + grpcConn *grpc.ClientConn +} + +// NewBBRHarness boots up an isolated BBR server on a random port. +// streaming: determines if the BBR server runs in streaming mode or unary/buffered mode. +func NewBBRHarness(t *testing.T, ctx context.Context, streaming bool) *BBRHarness { + t.Helper() + + // 1. Allocate Free Port + tcpAddr, err := integration.GetFreePort() + require.NoError(t, err, "failed to acquire free port for BBR server") + port := tcpAddr.Port + + // 2. Configure BBR Server + // BBR is simpler than EPP; it doesn't need a K8s Manager. + runner := runserver.NewDefaultExtProcServerRunner(port, false) + runner.SecureServing = false + runner.Streaming = streaming + + // 3. Start Server in Background + serverCtx, serverCancel := context.WithCancel(ctx) + go func() { + logger.Info("Starting BBR server", "port", port, "streaming", streaming) + if err := runner.AsRunnable(logger.WithName("bbr-server")).Start(serverCtx); err != nil { + // Context cancellation is expected during teardown. + if !strings.Contains(err.Error(), "context canceled") { + logger.Error(err, "BBR server stopped unexpectedly") + } + } + }() + + // 4. Connect Client + // Blocking dial ensures the server is reachable before the test logic begins. + addr := fmt.Sprintf("127.0.0.1:%d", port) + conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err, "failed to create grpc connection to BBR server") + + extProcClient, err := extProcPb.NewExternalProcessorClient(conn).Process(ctx) + require.NoError(t, err, "failed to initialize ext_proc stream client") + + h := &BBRHarness{ + t: t, + ctx: ctx, + Client: extProcClient, + server: runner, + grpcConn: conn, + } + + // 5. Register Cleanup + t.Cleanup(func() { + logger.Info("Tearing down BBR server", "port", port) + serverCancel() + if err := h.grpcConn.Close(); err != nil { + t.Logf("Warning: failed to close grpc connection: %v", err) + } + }) + + return h +} diff --git a/test/integration/bbr/hermetic_test.go b/test/integration/bbr/hermetic_test.go index e1c25a78f..3edcb70f4 100644 --- a/test/integration/bbr/hermetic_test.go +++ b/test/integration/bbr/hermetic_test.go @@ -19,87 +19,67 @@ package bbr import ( "context" - "fmt" "testing" - "time" - configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" "github.com/google/go-cmp/cmp" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" + "github.com/stretchr/testify/require" "google.golang.org/protobuf/testing/protocmp" - - runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/server" - logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" - integrationutils "sigs.k8s.io/gateway-api-inference-extension/test/integration" + "sigs.k8s.io/gateway-api-inference-extension/test/integration" ) -var logger = logutil.NewTestLogger().V(logutil.VERBOSE) - +// TestBodyBasedRouting validates the "Unary" (Non-Streaming) behavior of BBR. +// This simulates scenarios where Envoy buffers the body before sending it to ext_proc. func TestBodyBasedRouting(t *testing.T) { + t.Parallel() + tests := []struct { - name string - req *extProcPb.ProcessingRequest - wantHeaders []*configPb.HeaderValueOption - wantErr bool + name string + req *extProcPb.ProcessingRequest + wantResponse *extProcPb.ProcessingResponse + wantErr bool }{ { - name: "success adding model parameter to header", - req: integrationutils.GenerateRequest(logger, "test", "llama", nil), - wantHeaders: []*configPb.HeaderValueOption{ - { - Header: &configPb.HeaderValue{ - Key: "X-Gateway-Model-Name", - RawValue: []byte("llama"), - }, - }, - }, - wantErr: false, + name: "success: extracts model and sets header", + req: integration.ReqLLMUnary(logger, "test", "llama"), + wantResponse: ExpectBBRUnaryResponse("llama"), + wantErr: false, }, { - name: "no model parameter", - req: integrationutils.GenerateRequest(logger, "test1", "", nil), - wantHeaders: []*configPb.HeaderValueOption{}, - wantErr: false, + name: "noop: no model parameter in body", + req: integration.ReqLLMUnary(logger, "test1", ""), + wantResponse: ExpectBBRUnaryResponse(""), // Expect no headers. + wantErr: false, }, } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - client, cleanup := setUpHermeticServer(false) - t.Cleanup(cleanup) + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + ctx := context.Background() + h := NewBBRHarness(t, ctx, false) + + res, err := integration.SendRequest(t, h.Client, tc.req) - want := &extProcPb.ProcessingResponse{} - if len(test.wantHeaders) > 0 { - want.Response = &extProcPb.ProcessingResponse_RequestBody{ - RequestBody: &extProcPb.BodyResponse{ - Response: &extProcPb.CommonResponse{ - HeaderMutation: &extProcPb.HeaderMutation{ - SetHeaders: test.wantHeaders, - }, - ClearRouteCache: true, - }, - }, - } + if tc.wantErr { + require.Error(t, err, "expected error during request processing") } else { - want.Response = &extProcPb.ProcessingResponse_RequestBody{ - RequestBody: &extProcPb.BodyResponse{}, - } + require.NoError(t, err, "unexpected error during request processing") } - res, err := integrationutils.SendRequest(t, client, test.req) - if err != nil && !test.wantErr { - t.Errorf("Unexpected error, got: %v, want error: %v", err, test.wantErr) - } - if diff := cmp.Diff(want, res, protocmp.Transform()); diff != "" { - t.Errorf("Unexpected response, (-want +got): %v", diff) + if diff := cmp.Diff(tc.wantResponse, res, protocmp.Transform()); diff != "" { + t.Errorf("Response mismatch (-want +got): %v", diff) } }) } } +// TestFullDuplexStreamed_BodyBasedRouting validates the "Streaming" behavior of BBR. +// This validates that BBR correctly buffers streamed chunks, inspects the body, and injects the header. func TestFullDuplexStreamed_BodyBasedRouting(t *testing.T) { + t.Parallel() + tests := []struct { name string reqs []*extProcPb.ProcessingRequest @@ -107,188 +87,53 @@ func TestFullDuplexStreamed_BodyBasedRouting(t *testing.T) { wantErr bool }{ { - name: "success adding model parameter to header", - reqs: integrationutils.GenerateStreamedRequestSet(logger, "test", "foo", "foo", nil), + name: "success: adds model header from simple body", + reqs: integration.ReqLLM(logger, "test", "foo", "bar"), wantResponses: []*extProcPb.ProcessingResponse{ - { - Response: &extProcPb.ProcessingResponse_RequestHeaders{ - RequestHeaders: &extProcPb.HeadersResponse{ - Response: &extProcPb.CommonResponse{ - ClearRouteCache: true, - HeaderMutation: &extProcPb.HeaderMutation{ - SetHeaders: []*configPb.HeaderValueOption{ - { - Header: &configPb.HeaderValue{ - Key: "X-Gateway-Model-Name", - RawValue: []byte("foo"), - }, - }, - }}, - }, - }, - }, - }, - { - Response: &extProcPb.ProcessingResponse_RequestBody{ - RequestBody: &extProcPb.BodyResponse{ - Response: &extProcPb.CommonResponse{ - BodyMutation: &extProcPb.BodyMutation{ - Mutation: &extProcPb.BodyMutation_StreamedResponse{ - StreamedResponse: &extProcPb.StreamedBodyResponse{ - Body: []byte("{\"max_tokens\":100,\"model\":\"foo\",\"prompt\":\"test\",\"temperature\":0}"), - EndOfStream: true, - }, - }, - }, - }, - }, - }, - }, + ExpectBBRHeader("foo"), + ExpectBBRBodyPassThrough("test", "foo"), }, }, { - name: "success adding model parameter to header with multiple body chunks", - reqs: []*extProcPb.ProcessingRequest{ - { - Request: &extProcPb.ProcessingRequest_RequestHeaders{ - RequestHeaders: &extProcPb.HttpHeaders{ - Headers: &configPb.HeaderMap{ - Headers: []*configPb.HeaderValue{ - { - Key: "hi", - Value: "mom", - }, - }, - }, - }, - }, - }, - { - Request: &extProcPb.ProcessingRequest_RequestBody{ - RequestBody: &extProcPb.HttpBody{Body: []byte("{\"max_tokens\":100,\"model\":\"sql-lo"), EndOfStream: false}, - }, - }, - { - Request: &extProcPb.ProcessingRequest_RequestBody{ - RequestBody: &extProcPb.HttpBody{Body: []byte("ra-sheddable\",\"prompt\":\"test\",\"temperature\":0}"), EndOfStream: true}, - }, - }, - }, + name: "success: buffers split chunks and extracts model", + reqs: integration.ReqRaw( + map[string]string{"hi": "mom"}, + `{"max_tokens":100,"model":"sql-lo`, + `ra-sheddable","prompt":"test","temperature":0}`, + ), wantResponses: []*extProcPb.ProcessingResponse{ - { - Response: &extProcPb.ProcessingResponse_RequestHeaders{ - RequestHeaders: &extProcPb.HeadersResponse{ - Response: &extProcPb.CommonResponse{ - ClearRouteCache: true, - HeaderMutation: &extProcPb.HeaderMutation{ - SetHeaders: []*configPb.HeaderValueOption{ - { - Header: &configPb.HeaderValue{ - Key: "X-Gateway-Model-Name", - RawValue: []byte("sql-lora-sheddable"), - }, - }, - }}, - }, - }, - }, - }, - { - Response: &extProcPb.ProcessingResponse_RequestBody{ - RequestBody: &extProcPb.BodyResponse{ - Response: &extProcPb.CommonResponse{ - BodyMutation: &extProcPb.BodyMutation{ - Mutation: &extProcPb.BodyMutation_StreamedResponse{ - StreamedResponse: &extProcPb.StreamedBodyResponse{ - Body: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-sheddable\",\"prompt\":\"test\",\"temperature\":0}"), - EndOfStream: true, - }, - }, - }, - }, - }, - }, - }, + ExpectBBRHeader("sql-lora-sheddable"), + ExpectBBRBodyPassThrough("test", "sql-lora-sheddable"), }, }, { - name: "no model parameter", - reqs: integrationutils.GenerateStreamedRequestSet(logger, "test", "", "", nil), + name: "noop: handles missing model field gracefully", + reqs: integration.ReqLLM(logger, "test", "", ""), wantResponses: []*extProcPb.ProcessingResponse{ - { - Response: &extProcPb.ProcessingResponse_RequestHeaders{ - RequestHeaders: &extProcPb.HeadersResponse{}, - }, - }, - { - Response: &extProcPb.ProcessingResponse_RequestBody{ - RequestBody: &extProcPb.BodyResponse{ - Response: &extProcPb.CommonResponse{ - BodyMutation: &extProcPb.BodyMutation{ - Mutation: &extProcPb.BodyMutation_StreamedResponse{ - StreamedResponse: &extProcPb.StreamedBodyResponse{ - Body: []byte("{\"max_tokens\":100,\"prompt\":\"test\",\"temperature\":0}"), - EndOfStream: true, - }, - }, - }, - }, - }, - }, - }, + ExpectBBRNoOpHeader(), + ExpectBBRBodyPassThrough("test", ""), }, }, } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - client, cleanup := setUpHermeticServer(true) - t.Cleanup(cleanup) + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + ctx := context.Background() + h := NewBBRHarness(t, ctx, true) + + responses, err := integration.StreamedRequest(t, h.Client, tc.reqs, len(tc.wantResponses)) - responses, err := integrationutils.StreamedRequest(t, client, test.reqs, len(test.wantResponses)) - if err != nil && !test.wantErr { - t.Errorf("Unexpected error, got: %v, want error: %v", err, test.wantErr) + if tc.wantErr { + require.Error(t, err, "expected stream error") + } else { + require.NoError(t, err, "unexpected stream error") } - if diff := cmp.Diff(test.wantResponses, responses, protocmp.Transform()); diff != "" { - t.Errorf("Unexpected response, (-want +got): %v", diff) + if diff := cmp.Diff(tc.wantResponses, responses, protocmp.Transform()); diff != "" { + t.Errorf("Response mismatch (-want +got): %v", diff) } }) } } - -func setUpHermeticServer(streaming bool) (client extProcPb.ExternalProcessor_ProcessClient, cleanup func()) { - port := 9004 - - serverCtx, stopServer := context.WithCancel(context.Background()) - serverRunner := runserver.NewDefaultExtProcServerRunner(port, false) - serverRunner.SecureServing = false - serverRunner.Streaming = streaming - - go func() { - if err := serverRunner.AsRunnable(logger.WithName("ext-proc")).Start(serverCtx); err != nil { - logutil.Fatal(logger, err, "Failed to start ext-proc server") - } - }() - - address := fmt.Sprintf("localhost:%v", port) - // Create a grpc connection - conn, err := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - logutil.Fatal(logger, err, "Failed to connect", "address", address) - } - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - client, err = extProcPb.NewExternalProcessorClient(conn).Process(ctx) - if err != nil { - logutil.Fatal(logger, err, "Failed to create client") - } - return client, func() { - cancel() - conn.Close() - stopServer() - - // wait a little until the goroutines actually exit - time.Sleep(5 * time.Second) - } -} diff --git a/test/integration/bbr/util_test.go b/test/integration/bbr/util_test.go new file mode 100644 index 000000000..72fc6a574 --- /dev/null +++ b/test/integration/bbr/util_test.go @@ -0,0 +1,121 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package bbr + +import ( + "encoding/json" + + envoyCorev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" +) + +// --- Response Expectations (Streaming) --- + +// ExpectBBRHeader asserts that BBR set the specific model header and cleared the route cache. +func ExpectBBRHeader(modelName string) *extProcPb.ProcessingResponse { + return &extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_RequestHeaders{ + RequestHeaders: &extProcPb.HeadersResponse{ + Response: &extProcPb.CommonResponse{ + ClearRouteCache: true, + HeaderMutation: &extProcPb.HeaderMutation{ + SetHeaders: []*envoyCorev3.HeaderValueOption{ + { + Header: &envoyCorev3.HeaderValue{ + Key: "X-Gateway-Model-Name", + RawValue: []byte(modelName), + }, + }, + }, + }, + }, + }, + }, + } +} + +// ExpectBBRBodyPassThrough asserts that BBR reconstructs and passes the body through. +// BBR buffers the body to inspect it, then sends it downstream as a single chunk (usually). +func ExpectBBRBodyPassThrough(prompt, model string) *extProcPb.ProcessingResponse { + j := map[string]any{ + "max_tokens": 100, "prompt": prompt, "temperature": 0, + } + if model != "" { + j["model"] = model + } + b, _ := json.Marshal(j) + + return &extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_RequestBody{ + RequestBody: &extProcPb.BodyResponse{ + Response: &extProcPb.CommonResponse{ + BodyMutation: &extProcPb.BodyMutation{ + Mutation: &extProcPb.BodyMutation_StreamedResponse{ + StreamedResponse: &extProcPb.StreamedBodyResponse{ + Body: b, + EndOfStream: true, + }, + }, + }, + }, + }, + }, + } +} + +// ExpectBBRNoOpHeader asserts that BBR did nothing to the headers (e.g., when no model is found). +func ExpectBBRNoOpHeader() *extProcPb.ProcessingResponse { + return &extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_RequestHeaders{ + RequestHeaders: &extProcPb.HeadersResponse{}, + }, + } +} + +// --- Response Expectations (Unary) --- + +// ExpectBBRUnaryResponse creates expected response for unary tests where the body is mutated directly. +func ExpectBBRUnaryResponse(modelName string) *extProcPb.ProcessingResponse { + resp := &extProcPb.ProcessingResponse{} + + // If modelName is present, we expect header mutations. + if modelName != "" { + resp.Response = &extProcPb.ProcessingResponse_RequestBody{ + RequestBody: &extProcPb.BodyResponse{ + Response: &extProcPb.CommonResponse{ + ClearRouteCache: true, + HeaderMutation: &extProcPb.HeaderMutation{ + SetHeaders: []*envoyCorev3.HeaderValueOption{ + { + Header: &envoyCorev3.HeaderValue{ + Key: "X-Gateway-Model-Name", + RawValue: []byte(modelName), + }, + }, + }, + }, + }, + }, + } + } else { + // Otherwise, expect a No-Op on the body. + resp.Response = &extProcPb.ProcessingResponse_RequestBody{ + RequestBody: &extProcPb.BodyResponse{}, + } + } + return resp +} diff --git a/test/integration/epp/hermetic_test.go b/test/integration/epp/hermetic_test.go index bfb7f99f5..2bc7ba55d 100644 --- a/test/integration/epp/hermetic_test.go +++ b/test/integration/epp/hermetic_test.go @@ -228,7 +228,7 @@ func TestFullDuplexStreamed_KubeInferenceObjectiveRequest(t *testing.T) { wantErr: false, wantResponses: integrationutils.NewImmediateErrorResponse( envoyTypePb.StatusCode_BadRequest, - "inference gateway: BadRequest - Error unmarshaling request body: no healthy upstream", + "inference gateway: BadRequest - Error unmarshaling request body", ), }, { @@ -804,7 +804,7 @@ func TestFullDuplexStreamed_KubeInferenceObjectiveRequest(t *testing.T) { }, }, }, - wantResponses: []*extProcPb.ProcessingResponse{}, + wantResponses: nil, pods: newPodStates( podState{index: 0, queueSize: 4, kvCacheUsage: 0.2, activeModels: []string{"foo", "bar", modelSheddableTarget}}, ), diff --git a/test/integration/util.go b/test/integration/util.go index 9f0dcde7b..8b17f38e4 100644 --- a/test/integration/util.go +++ b/test/integration/util.go @@ -14,11 +14,21 @@ See the License for the specific language governing permissions and limitations under the License. */ +// Package integration provides shared utilities, request builders, and assertions for the hermetic integration test +// suites of the Gateway API Inference Extension. +// +// It encapsulates the complexity of constructing Envoy ext_proc Protobuf messages and managing gRPC streams, allowing +// individual test suites (e.g., test/integration/epp, test/integration/bbr) to focus on behavioral assertions rather +// than protocol boilerplate. package integration import ( + "context" "encoding/json" + "errors" + "fmt" "io" + "net" "strconv" "testing" "time" @@ -30,77 +40,89 @@ import ( "google.golang.org/protobuf/types/known/structpb" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata" - logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" requtil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/request" ) -const ( - headerKeyContentLength = "Content-Length" -) +// --- Request Builders (High-Level DSL) --- -func SendRequest(t *testing.T, client extProcPb.ExternalProcessor_ProcessClient, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) { - t.Logf("Sending request: %v", req) - if err := client.Send(req); err != nil { - t.Logf("Failed to send request %+v: %v", req, err) - return nil, err - } +// ReqLLM creates a sequence of gRPC messages representing a standard, streamed LLM inference request. +// It generates: +// 1. A RequestHeaders message containing standard inference headers (Objective, Model Rewrite, Request ID). +// 2. A RequestBody message containing the JSON payload with EndOfStream=true. +// +// Use this for the majority of "Happy Path" EPP and BBR streaming tests. +func ReqLLM(logger logr.Logger, prompt, model, targetModel string) []*extProcPb.ProcessingRequest { + return GenerateStreamedRequestSet(logger, prompt, model, targetModel, nil) +} - res, err := client.Recv() - if err != nil { - t.Logf("Failed to receive: %v", err) - return nil, err - } - t.Logf("Received response %+v", res) - return res, err +// ReqLLMUnary creates a single `ProcessingRequest` containing a complete JSON body. +// This simulates a scenario where Envoy has buffered the request body before sending it to the external processor +// (unary mode). +// +// Use this for tests where `streaming: false` or when testing legacy buffered behavior. +func ReqLLMUnary(logger logr.Logger, prompt, model string) *extProcPb.ProcessingRequest { + return GenerateRequest(logger, prompt, model, nil) } -// StreamedRequest sends a series of requests and collects the specified number of responses. -func StreamedRequest( - t *testing.T, - client extProcPb.ExternalProcessor_ProcessClient, - requests []*extProcPb.ProcessingRequest, - expectedResponses int, -) ([]*extProcPb.ProcessingResponse, error) { - for _, req := range requests { - t.Logf("Sending request: %v", req) - if err := client.Send(req); err != nil { - t.Logf("Failed to send request %+v: %v", req, err) - return nil, err - } - } +// ReqRaw creates a custom sequence of gRPC messages with specific headers and arbitrary body chunks. +// This is a lower-level helper useful for testing edge cases, such as: +// - Invalid JSON bodies (to test error handling). +// - Fragmentation (split bodies) to ensure the processor handles accumulation correctly. +// - Protocol attacks (e.g., missing headers). +func ReqRaw(headers map[string]string, bodyChunks ...string) []*extProcPb.ProcessingRequest { + reqs := []*extProcPb.ProcessingRequest{} - responses := []*extProcPb.ProcessingResponse{} - for i := range expectedResponses { - type recvResult struct { - res *extProcPb.ProcessingResponse - err error - } - recvChan := make(chan recvResult, 1) + // 1. Headers Phase + hList := []*envoyCorev3.HeaderValue{} + for k, v := range headers { + hList = append(hList, &envoyCorev3.HeaderValue{Key: k, Value: v}) + } + reqs = append(reqs, &extProcPb.ProcessingRequest{ + Request: &extProcPb.ProcessingRequest_RequestHeaders{ + RequestHeaders: &extProcPb.HttpHeaders{ + Headers: &envoyCorev3.HeaderMap{Headers: hList}, + }, + }, + }) - go func() { - res, err := client.Recv() - recvChan <- recvResult{res, err} - }() + // 2. Body Phase (Chunks) + for i, chunk := range bodyChunks { + reqs = append(reqs, &extProcPb.ProcessingRequest{ + Request: &extProcPb.ProcessingRequest_RequestBody{ + RequestBody: &extProcPb.HttpBody{ + Body: []byte(chunk), + EndOfStream: i == len(bodyChunks)-1, + }, + }, + }) + } + return reqs +} - select { - case <-time.After(10 * time.Second): - t.Logf("Timeout waiting for response %d of %d", i+1, expectedResponses) - return responses, nil - case result := <-recvChan: - if result.err != nil { - if result.err == io.EOF { - return responses, nil - } - t.Logf("Failed to receive: %v", result.err) - return nil, result.err - } - t.Logf("Received response %+v", result.res) - responses = append(responses, result.res) - } +// ReqHeaderOnly creates a request sequence consisting solely of headers, with no body. +// It sets `EndOfStream: true` on the headers frame. +// +// Use this for testing non-inference traffic, such as GET requests, health checks, or requests that should bypass the +// inference processor logic. +func ReqHeaderOnly(headers map[string]string) []*extProcPb.ProcessingRequest { + hList := []*envoyCorev3.HeaderValue{} + for k, v := range headers { + hList = append(hList, &envoyCorev3.HeaderValue{Key: k, Value: v}) } - return responses, nil + return []*extProcPb.ProcessingRequest{{ + Request: &extProcPb.ProcessingRequest_RequestHeaders{ + RequestHeaders: &extProcPb.HttpHeaders{ + Headers: &envoyCorev3.HeaderMap{Headers: hList}, + EndOfStream: true, + }, + }, + }} } +// --- Request Builders (Low-Level Generators) --- + +// GenerateRequest constructs a `ProcessingRequest` containing a JSON-formatted LLM payload. +// It accepts a filterMetadata slice to inject Envoy Dynamic Metadata (used for subset load balancing). func GenerateRequest(logger logr.Logger, prompt, model string, filterMetadata []string) *extProcPb.ProcessingRequest { j := map[string]any{ "prompt": prompt, @@ -111,11 +133,13 @@ func GenerateRequest(logger logr.Logger, prompt, model string, filterMetadata [] j["model"] = model } + // Panic on marshal failure is acceptable in test helpers as it implies a bug in the test code itself. llmReq, err := json.Marshal(j) if err != nil { - logutil.Fatal(logger, err, "Failed to unmarshal LLM request") + panic(fmt.Errorf("failed to marshal LLM request: %w", err)) } - req := &extProcPb.ProcessingRequest{ + + return &extProcPb.ProcessingRequest{ Request: &extProcPb.ProcessingRequest_RequestBody{ RequestBody: &extProcPb.HttpBody{Body: llmReq, EndOfStream: true}, }, @@ -123,47 +147,49 @@ func GenerateRequest(logger logr.Logger, prompt, model string, filterMetadata [] FilterMetadata: GenerateRequestMetadata(filterMetadata), }, } - return req } -func GenerateStreamedRequestSet(logger logr.Logger, prompt, model, targetModel string, filterMetadata []string) []*extProcPb.ProcessingRequest { +// GenerateStreamedRequestSet creates a slice of requests simulating an Envoy stream: +// 1. A Headers frame with standard Inference Extension headers. +// 2. A Body frame with the JSON payload. +func GenerateStreamedRequestSet( + logger logr.Logger, + prompt, model, targetModel string, + filterMetadata []string, +) []*extProcPb.ProcessingRequest { requests := []*extProcPb.ProcessingRequest{} + + // Headers + headers := []*envoyCorev3.HeaderValue{ + {Key: "hi", Value: "mom"}, + {Key: requtil.RequestIdHeaderKey, Value: "test-request-id"}, + } + if model != "" { + headers = append(headers, &envoyCorev3.HeaderValue{Key: metadata.ObjectiveKey, Value: model}) + } + if targetModel != "" { + headers = append(headers, &envoyCorev3.HeaderValue{Key: metadata.ModelNameRewriteKey, Value: targetModel}) + } + headerReq := &extProcPb.ProcessingRequest{ Request: &extProcPb.ProcessingRequest_RequestHeaders{ RequestHeaders: &extProcPb.HttpHeaders{ - Headers: &envoyCorev3.HeaderMap{ - Headers: []*envoyCorev3.HeaderValue{ - { - Key: "hi", - Value: "mom", - }, - { - Key: metadata.ObjectiveKey, - Value: model, - }, - { - Key: metadata.ModelNameRewriteKey, - Value: targetModel, - }, - { - Key: requtil.RequestIdHeaderKey, - Value: "test-request-id", - }, - }, - }, + Headers: &envoyCorev3.HeaderMap{Headers: headers}, }, }, + MetadataContext: &envoyCorev3.Metadata{ + FilterMetadata: GenerateRequestMetadata(filterMetadata), + }, } - - headerReq.MetadataContext = &envoyCorev3.Metadata{ - FilterMetadata: GenerateRequestMetadata(filterMetadata), - } - requests = append(requests, headerReq) + + // Body requests = append(requests, GenerateRequest(logger, prompt, model, filterMetadata)) return requests } +// GenerateRequestMetadata constructs the Envoy Dynamic Metadata structure. +// This is primarily used to inject "envoy.lb" subset keys for testing logic that depends on specific backend subsets. func GenerateRequestMetadata(filterMetadata []string) map[string]*structpb.Struct { requestMetadata := make(map[string]*structpb.Struct) interfaceList := make([]any, len(filterMetadata)) @@ -179,10 +205,19 @@ func GenerateRequestMetadata(filterMetadata []string) map[string]*structpb.Struc return requestMetadata } -// NewRequestBufferedResponse creates a complete set of responses for the request phase. -// It modifies request headers (e.g., for routing) and replaces the entire request body. -// It returns a slice of two messages, representing the complete buffered action. -func NewRequestBufferedResponse(destinationEndpoint string, rewrittenBody string, otherHeaders ...*envoyCorev3.HeaderValueOption) []*extProcPb.ProcessingResponse { +// --- Response Builders --- + +// NewRequestBufferedResponse creates a complete set of responses for the Request phase. +// It simulates the EPP deciding to: +// 1. Modify headers (e.g., set destination endpoint). +// 2. Replace the entire request body (e.g., rewriting the model name). +// +// It returns two messages: one for the Header response and one for the Body response. +func NewRequestBufferedResponse( + destinationEndpoint string, + rewrittenBody string, + otherHeaders ...*envoyCorev3.HeaderValueOption, +) []*extProcPb.ProcessingResponse { setHeaders := []*envoyCorev3.HeaderValueOption{ { Header: &envoyCorev3.HeaderValue{ @@ -210,7 +245,7 @@ func NewRequestBufferedResponse(destinationEndpoint string, rewrittenBody string }, }, }, - DynamicMetadata: makeMetadata(destinationEndpoint), + DynamicMetadata: makeDestinationMetadata(destinationEndpoint), } bodyResponse := &extProcPb.ProcessingResponse{ @@ -233,18 +268,21 @@ func NewRequestBufferedResponse(destinationEndpoint string, rewrittenBody string return []*extProcPb.ProcessingResponse{headerResponse, bodyResponse} } -// NewResponseBufferedResponse creates a complete set of responses for the response phase. -// It modifies response headers and replaces the entire response body. -// It is used when the processor buffers the upstream response before sending its own. -func NewResponseBufferedResponse(rewrittenBody string, headersToSet ...*envoyCorev3.HeaderValueOption) []*extProcPb.ProcessingResponse { +// NewResponseBufferedResponse creates a complete set of responses for the Response phase. +// It simulates the EPP modifying the upstream response before sending it to the client. +// It returns a Header mutation message followed by a Body replacement message. +func NewResponseBufferedResponse( + rewrittenBody string, + headersToSet ...*envoyCorev3.HeaderValueOption, +) []*extProcPb.ProcessingResponse { return []*extProcPb.ProcessingResponse{ NewResponseHeaders(headersToSet...), NewResponseStreamChunk(rewrittenBody, true), } } -// NewResponseHeaders creates a single response message to modify the response headers. -// This is the first step in either a buffered or streaming response modification. +// NewResponseHeaders creates a single response message to modify response headers. +// Use this when testing header mutations without body changes, or as the first step in a streamed response test. func NewResponseHeaders(headersToSet ...*envoyCorev3.HeaderValueOption) *extProcPb.ProcessingResponse { return &extProcPb.ProcessingResponse{ Response: &extProcPb.ProcessingResponse_ResponseHeaders{ @@ -259,8 +297,8 @@ func NewResponseHeaders(headersToSet ...*envoyCorev3.HeaderValueOption) *extProc } } -// NewResponseStreamChunk creates a single response for one body chunk in a stream. -// This is used to test streaming behaviors like text/event-stream pass-through. +// NewResponseStreamChunk creates a single gRPC message representing one chunk of a streaming response. +// Use this to verify that EPP correctly passes through chunks (e.g., SSE events) or injects specific chunks. func NewResponseStreamChunk(body string, endOfStream bool) *extProcPb.ProcessingResponse { return &extProcPb.ProcessingResponse{ Response: &extProcPb.ProcessingResponse_ResponseBody{ @@ -280,24 +318,148 @@ func NewResponseStreamChunk(body string, endOfStream bool) *extProcPb.Processing } } -// NewImmediateErrorResponse creates an immediate response to terminate processing. -// This is used for errors like load shedding or bad requests. +// NewImmediateErrorResponse creates a response that immediately terminates the request with a specific HTTP status code +// and body. +// Use this for testing Load Shedding (503), Rate Limiting (429), or Bad Request (400) logic. func NewImmediateErrorResponse(code envoyTypePb.StatusCode, body string) []*extProcPb.ProcessingResponse { - response := &extProcPb.ProcessingResponse{ + return []*extProcPb.ProcessingResponse{{ Response: &extProcPb.ProcessingResponse_ImmediateResponse{ ImmediateResponse: &extProcPb.ImmediateResponse{ - Status: &envoyTypePb.HttpStatus{ - Code: code, - }, - Body: []byte(body), + Status: &envoyTypePb.HttpStatus{Code: code}, + Body: []byte(body), }, }, + }} +} + +// --- Execution Helpers --- + +// SendRequest is a helper for Unary (One-Shot) test scenarios. +// It sends a single request message and waits for exactly one response. +func SendRequest( + t *testing.T, + client extProcPb.ExternalProcessor_ProcessClient, + req *extProcPb.ProcessingRequest, +) (*extProcPb.ProcessingResponse, error) { + t.Helper() + t.Logf("Sending request: %v", req) + + if err := client.Send(req); err != nil { + t.Logf("Failed to send request: %v", err) + return nil, err + } + + res, err := client.Recv() + if err != nil { + t.Logf("Failed to receive response: %v", err) + return nil, err + } + t.Logf("Received response: %+v", res) + return res, err +} + +// StreamedRequest is a helper for Full-Duplex Streaming test scenarios. +// It performs the following actions: +// 1. Sends all requests in the provided slice to the server. +// 2. Listens for responses on the stream until 'expectedResponses' count is reached. +// 3. Enforces a 10-second timeout to prevent deadlocks if the server hangs. +// 4. Handles io.EOF gracefully (server closed stream). +func StreamedRequest( + t *testing.T, + client extProcPb.ExternalProcessor_ProcessClient, + requests []*extProcPb.ProcessingRequest, + expectedResponses int, +) ([]*extProcPb.ProcessingResponse, error) { + t.Helper() + + // 1. Send Phase + for _, req := range requests { + t.Logf("Sending request: %v", req) + if err := client.Send(req); err != nil { + t.Logf("Failed to send request: %v", err) + return nil, err + } + } + + // 2. Receive Phase + // We use a channel and a separate goroutine for receiving to allow for a strict timeout via select{}. + type recvResult struct { + res *extProcPb.ProcessingResponse + err error + } + + // Buffered channel avoids blocking the goroutine on the last read. + recvChan := make(chan recvResult, expectedResponses+1) + + // Start reading in background. + go func() { + for range expectedResponses { + res, err := client.Recv() + recvChan <- recvResult{res, err} + if err != nil { + return // Stop reading on error or EOF. + } + } + }() + + var responses []*extProcPb.ProcessingResponse + + ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second) + defer cancel() + + // Collect results with timeout. + for i := range expectedResponses { + select { + case <-ctx.Done(): + t.Logf("Timeout waiting for response %d of %d: %v", i+1, expectedResponses, ctx.Err()) + return responses, fmt.Errorf("timeout waiting for responses: %w", ctx.Err()) + + case result := <-recvChan: + if result.err != nil { + // io.EOF is a valid termination from the server side (e.g. rejection). + if result.err == io.EOF { + return responses, nil + } + t.Logf("Failed to receive: %v", result.err) + return nil, result.err + } + t.Logf("Received response: %+v", result.res) + responses = append(responses, result.res) + } + } + + return responses, nil +} + +// --- System Utilities --- + +// GetFreePort finds an available IPv4 TCP port on localhost. +// It works by asking the OS to allocate a port by listening on port 0, capturing the assigned address, and then +// immediately closing the listener. +// +// Note: There is a theoretical race condition where another process grabs the port between the Close() call and the +// subsequent usage, but this is generally acceptable in hermetic test environments. +func GetFreePort() (*net.TCPAddr, error) { + // Force IPv4 to prevent flakes on dual-stack CI environments + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return nil, fmt.Errorf("failed to listen on a free port: %w", err) } - return []*extProcPb.ProcessingResponse{response} + + // Critical: Close the listener immediately so the caller can bind to it. + defer listener.Close() + + addr, ok := listener.Addr().(*net.TCPAddr) + if !ok { + return nil, errors.New("failed to cast listener address to TCPAddr") + } + return addr, nil } -// makeMetadata creates the dynamic metadata struct that Envoy uses for routing hints. -func makeMetadata(endpoint string) *structpb.Struct { +// --- Internal Helpers --- + +// makeDestinationMetadata helper to construct the Envoy dynamic metadata for routing. +func makeDestinationMetadata(endpoint string) *structpb.Struct { return &structpb.Struct{ Fields: map[string]*structpb.Value{ metadata.DestinationEndpointNamespace: { @@ -316,3 +478,7 @@ func makeMetadata(endpoint string) *structpb.Struct { }, } } + +const ( + headerKeyContentLength = "Content-Length" +)