Skip to content

Commit 2b49026

Browse files
authored
move AllPodsPredicate to datastore package (#1939)
1 parent cf06da1 commit 2b49026

File tree

10 files changed

+21
-24
lines changed

10 files changed

+21
-24
lines changed

cmd/epp/runner/runner.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,7 @@ func (r *Runner) parseConfigurationPhaseOne(ctx context.Context) (*configapi.End
479479
// Return a function that can be used in the EPP Handle to list pod names.
480480
func makePodListFunc(ds datastore.Datastore) func() []types.NamespacedName {
481481
return func() []types.NamespacedName {
482-
pods := ds.PodList(backendmetrics.AllPodsPredicate)
482+
pods := ds.PodList(datastore.AllPodsPredicate)
483483
names := make([]types.NamespacedName, 0, len(pods))
484484

485485
for _, p := range pods {

pkg/epp/backend/metrics/types.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,6 @@ import (
2727
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
2828
)
2929

30-
var (
31-
AllPodsPredicate = func(PodMetrics) bool { return true }
32-
)
33-
3430
func PodsWithFreshMetrics(stalenessThreshold time.Duration) func(PodMetrics) bool {
3531
return func(pm PodMetrics) bool {
3632
if pm == nil {

pkg/epp/controller/inferencepool_reconciler_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -182,8 +182,8 @@ type diffStoreParams struct {
182182
wantObjectives []*v1alpha2.InferenceObjective
183183
}
184184

185-
func diffStore(datastore datastore.Datastore, params diffStoreParams) string {
186-
gotPool, _ := datastore.PoolGet()
185+
func diffStore(store datastore.Datastore, params diffStoreParams) string {
186+
gotPool, _ := store.PoolGet()
187187
if diff := cmp.Diff(params.wantPool, gotPool); diff != "" {
188188
return "inferencePool:" + diff
189189
}
@@ -193,7 +193,7 @@ func diffStore(datastore datastore.Datastore, params diffStoreParams) string {
193193
params.wantPods = []string{}
194194
}
195195
gotPods := []string{}
196-
for _, pm := range datastore.PodList(backendmetrics.AllPodsPredicate) {
196+
for _, pm := range store.PodList(datastore.AllPodsPredicate) {
197197
gotPods = append(gotPods, pm.GetPod().NamespacedName.Name)
198198
}
199199
if diff := cmp.Diff(params.wantPods, gotPods, cmpopts.SortSlices(func(a, b string) bool { return a < b })); diff != "" {
@@ -205,7 +205,7 @@ func diffStore(datastore datastore.Datastore, params diffStoreParams) string {
205205
params.wantObjectives = []*v1alpha2.InferenceObjective{}
206206
}
207207

208-
if diff := cmp.Diff(params.wantObjectives, datastore.ObjectiveGetAll(), cmpopts.SortSlices(func(a, b *v1alpha2.InferenceObjective) bool {
208+
if diff := cmp.Diff(params.wantObjectives, store.ObjectiveGetAll(), cmpopts.SortSlices(func(a, b *v1alpha2.InferenceObjective) bool {
209209
return a.Name < b.Name
210210
})); diff != "" {
211211
return "models:" + diff
@@ -328,8 +328,8 @@ type xDiffStoreParams struct {
328328
wantObjectives []*v1alpha2.InferenceObjective
329329
}
330330

331-
func xDiffStore(datastore datastore.Datastore, params xDiffStoreParams) string {
332-
gotPool, _ := datastore.PoolGet()
331+
func xDiffStore(store datastore.Datastore, params xDiffStoreParams) string {
332+
gotPool, _ := store.PoolGet()
333333
if gotPool == nil && params.wantPool == nil {
334334
return ""
335335
}
@@ -343,7 +343,7 @@ func xDiffStore(datastore datastore.Datastore, params xDiffStoreParams) string {
343343
params.wantPods = []string{}
344344
}
345345
gotPods := []string{}
346-
for _, pm := range datastore.PodList(backendmetrics.AllPodsPredicate) {
346+
for _, pm := range store.PodList(datastore.AllPodsPredicate) {
347347
gotPods = append(gotPods, pm.GetPod().NamespacedName.Name)
348348
}
349349
if diff := cmp.Diff(params.wantPods, gotPods, cmpopts.SortSlices(func(a, b string) bool { return a < b })); diff != "" {
@@ -355,7 +355,7 @@ func xDiffStore(datastore datastore.Datastore, params xDiffStoreParams) string {
355355
params.wantObjectives = []*v1alpha2.InferenceObjective{}
356356
}
357357

358-
if diff := cmp.Diff(params.wantObjectives, datastore.ObjectiveGetAll(), cmpopts.SortSlices(func(a, b *v1alpha2.InferenceObjective) bool {
358+
if diff := cmp.Diff(params.wantObjectives, store.ObjectiveGetAll(), cmpopts.SortSlices(func(a, b *v1alpha2.InferenceObjective) bool {
359359
return a.Name < b.Name
360360
})); diff != "" {
361361
return "models:" + diff

pkg/epp/controller/pod_reconciler_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ func TestPodReconciler(t *testing.T) {
213213
}
214214

215215
var gotPods []*corev1.Pod
216-
for _, pm := range store.PodList(backendmetrics.AllPodsPredicate) {
216+
for _, pm := range store.PodList(datastore.AllPodsPredicate) {
217217
pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pm.GetPod().PodName, Namespace: pm.GetPod().NamespacedName.Namespace}, Status: corev1.PodStatus{PodIP: pm.GetPod().GetIPAddress()}}
218218
gotPods = append(gotPods, pod)
219219
}

pkg/epp/datastore/datastore.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040

4141
var (
4242
errPoolNotSynced = errors.New("InferencePool is not initialized in data store")
43+
AllPodsPredicate = func(_ datalayer.Endpoint) bool { return true }
4344
)
4445

4546
// The datastore is a local cache of relevant data for the given InferencePool (currently all pulled from k8s-api)

pkg/epp/datastore/datastore_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ func TestMetrics(t *testing.T) {
335335
}
336336
time.Sleep(1 * time.Second) // Give some time for the metrics to be fetched.
337337
if test.predict == nil {
338-
test.predict = backendmetrics.AllPodsPredicate
338+
test.predict = AllPodsPredicate
339339
}
340340
assert.EventuallyWithT(t, func(t *assert.CollectT) {
341341
got := ds.PodList(test.predict)
@@ -407,7 +407,7 @@ func TestPods(t *testing.T) {
407407

408408
test.op(ctx, ds)
409409
var gotPods []*corev1.Pod
410-
for _, pm := range ds.PodList(backendmetrics.AllPodsPredicate) {
410+
for _, pm := range ds.PodList(AllPodsPredicate) {
411411
pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pm.GetPod().PodName, Namespace: pm.GetPod().NamespacedName.Namespace}, Status: corev1.PodStatus{PodIP: pm.GetPod().GetIPAddress()}}
412412
gotPods = append(gotPods, pod)
413413
}
@@ -591,7 +591,7 @@ func TestPodInfo(t *testing.T) {
591591

592592
test.op(ctx, ds)
593593
var gotPodInfos []*datalayer.PodInfo
594-
for _, pm := range ds.PodList(backendmetrics.AllPodsPredicate) {
594+
for _, pm := range ds.PodList(AllPodsPredicate) {
595595
gotPodInfos = append(gotPodInfos, pm.GetPod())
596596
}
597597
if diff := cmp.Diff(test.wantPodInfos, gotPodInfos, cmpopts.SortSlices(func(a, b *datalayer.PodInfo) bool { return a.NamespacedName.Name < b.NamespacedName.Name })); diff != "" {

pkg/epp/metrics/collectors/inference_pool.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"github.com/prometheus/client_golang/prometheus"
2121
compbasemetrics "k8s.io/component-base/metrics"
2222

23-
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
2423
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
2524
metricsutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/metrics"
2625
)
@@ -63,7 +62,7 @@ func (c *inferencePoolMetricsCollector) Collect(ch chan<- prometheus.Metric) {
6362
return
6463
}
6564

66-
podMetrics := c.ds.PodList(backendmetrics.AllPodsPredicate)
65+
podMetrics := c.ds.PodList(datastore.AllPodsPredicate)
6766
if len(podMetrics) == 0 {
6867
return
6968
}

pkg/epp/requestcontrol/director.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
3232
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
3333
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
34+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
3435
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers"
3536
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata"
3637
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
@@ -240,13 +241,13 @@ func (d *Director) getCandidatePodsForScheduling(ctx context.Context, requestMet
240241

241242
subsetMap, found := requestMetadata[metadata.SubsetFilterNamespace].(map[string]any)
242243
if !found {
243-
return d.datastore.PodList(backendmetrics.AllPodsPredicate)
244+
return d.datastore.PodList(datastore.AllPodsPredicate)
244245
}
245246

246247
// Check if endpoint key is present in the subset map and ensure there is at least one value
247248
endpointSubsetList, found := subsetMap[metadata.SubsetFilterKey].([]any)
248249
if !found {
249-
return d.datastore.PodList(backendmetrics.AllPodsPredicate)
250+
return d.datastore.PodList(datastore.AllPodsPredicate)
250251
} else if len(endpointSubsetList) == 0 {
251252
loggerTrace.Info("found empty subset filter in request metadata, filtering all pods")
252253
return []backendmetrics.PodMetrics{}
@@ -362,7 +363,7 @@ func (d *Director) HandleResponseBodyComplete(ctx context.Context, reqCtx *handl
362363
}
363364

364365
func (d *Director) GetRandomPod() *backend.Pod {
365-
pods := d.datastore.PodList(backendmetrics.AllPodsPredicate)
366+
pods := d.datastore.PodList(datastore.AllPodsPredicate)
366367
if len(pods) == 0 {
367368
return nil
368369
}

pkg/epp/requestcontrol/director_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -661,7 +661,7 @@ func TestDirector_HandleRequest(t *testing.T) {
661661
config = config.WithAdmissionPlugins(newMockAdmissionPlugin("test-admit-plugin", test.admitRequestDenialError))
662662
director := NewDirectorWithConfig(ds, mockSched, test.mockAdmissionController, config)
663663
if test.name == "successful request with model rewrite" {
664-
director.datastore = &mockDatastore{pods: ds.PodList(backendmetrics.AllPodsPredicate), rewrites: []*v1alpha2.InferenceModelRewrite{rewrite}}
664+
director.datastore = &mockDatastore{pods: ds.PodList(datastore.AllPodsPredicate), rewrites: []*v1alpha2.InferenceModelRewrite{rewrite}}
665665
}
666666

667667
reqCtx := &handlers.RequestContext{

test/integration/epp/hermetic_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1096,7 +1096,7 @@ func setUpHermeticServer(t *testing.T, podAndMetrics map[*backend.Pod]*backendme
10961096

10971097
// check if all pods are synced to datastore
10981098
assert.EventuallyWithT(t, func(t *assert.CollectT) {
1099-
assert.Len(t, serverRunner.Datastore.PodList(backendmetrics.AllPodsPredicate), len(podAndMetrics), "Datastore not synced")
1099+
assert.Len(t, serverRunner.Datastore.PodList(datastore.AllPodsPredicate), len(podAndMetrics), "Datastore not synced")
11001100
}, 10*time.Second, time.Second)
11011101

11021102
// Create a grpc connection

0 commit comments

Comments
 (0)