@@ -17,7 +17,6 @@ import (
1717 "google.golang.org/protobuf/types/known/timestamppb"
1818 appsv1 "k8s.io/api/apps/v1"
1919 v1 "k8s.io/api/core/v1"
20- "k8s.io/apimachinery/pkg/fields"
2120 "k8s.io/apimachinery/pkg/types"
2221 "k8s.io/apimachinery/pkg/util/wait"
2322 "sigs.k8s.io/controller-runtime/pkg/client"
@@ -26,6 +25,7 @@ import (
2625 agentgrpc "github.com/nginx/nginx-gateway-fabric/v2/internal/controller/nginx/agent/grpc"
2726 grpcContext "github.com/nginx/nginx-gateway-fabric/v2/internal/controller/nginx/agent/grpc/context"
2827 "github.com/nginx/nginx-gateway-fabric/v2/internal/controller/nginx/agent/grpc/messenger"
28+ nginxTypes "github.com/nginx/nginx-gateway-fabric/v2/internal/controller/nginx/types"
2929 "github.com/nginx/nginx-gateway-fabric/v2/internal/controller/status"
3030)
3131
@@ -86,8 +86,9 @@ func (cs *commandService) CreateConnection(
8686 podName := resource .GetContainerInfo ().GetHostname ()
8787 cs .logger .Info (fmt .Sprintf ("Creating connection for nginx pod: %s" , podName ))
8888
89- owner , _ , err := cs .getPodOwner (podName )
90- if err != nil {
89+ name , depType := getAgentDeploymentNameAndType (resource .GetInstances ())
90+ if name == (types.NamespacedName {}) || depType == "" {
91+ err := errors .New ("agent labels missing" )
9192 response := & pb.CreateConnectionResponse {
9293 Response : & pb.CommandResponse {
9394 Status : pb .CommandResponse_COMMAND_STATUS_ERROR ,
@@ -96,12 +97,12 @@ func (cs *commandService) CreateConnection(
9697 },
9798 }
9899 cs .logger .Error (err , "error getting pod owner" )
99- return response , grpcStatus .Errorf (codes .Internal , "error getting pod owner %s" , err .Error ())
100+ return response , grpcStatus .Errorf (codes .InvalidArgument , "error getting pod owner: %s" , err .Error ())
100101 }
101102
102103 conn := agentgrpc.Connection {
103- Parent : owner ,
104- PodName : podName ,
104+ ParentName : name ,
105+ ParentType : depType ,
105106 InstanceID : getNginxInstanceID (resource .GetInstances ()),
106107 }
107108 cs .connTracker .Track (gi .UUID , conn )
@@ -138,15 +139,19 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error
138139 cs .logger .Error (err , "error waiting for connection" )
139140 return err
140141 }
141- defer deployment .RemovePodStatus (conn . PodName )
142+ defer deployment .RemovePodStatus (gi . UUID )
142143
143- cs .logger .Info (fmt .Sprintf ("Successfully connected to nginx agent %s" , conn .PodName ))
144+ cs .logger .Info (
145+ "Successfully connected to nginx agent" ,
146+ conn .ParentType , conn .ParentName ,
147+ "uuid" , gi .UUID ,
148+ )
144149
145150 msgr := messenger .New (in )
146151 go msgr .Run (ctx )
147152
148153 // apply current config before starting event loop
149- if err := cs .setInitialConfig (ctx , deployment , conn , msgr ); err != nil {
154+ if err := cs .setInitialConfig (ctx , & gi , deployment , conn , msgr ); err != nil {
150155 return err
151156 }
152157
@@ -188,7 +193,7 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error
188193 cs .logger .V (1 ).Info ("Sending configuration to agent" , "requestType" , msg .Type )
189194 if err := msgr .Send (ctx , req ); err != nil {
190195 cs .logger .Error (err , "error sending request to agent" )
191- deployment .SetPodErrorStatus (conn . PodName , err )
196+ deployment .SetPodErrorStatus (gi . UUID , err )
192197 channels .ResponseCh <- struct {}{}
193198
194199 return grpcStatus .Error (codes .Internal , err .Error ())
@@ -198,8 +203,8 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error
198203 // Only broadcast operations should signal ResponseCh for coordination.
199204 pendingBroadcastRequest = & msg
200205 case err = <- msgr .Errors ():
201- cs .logger .Error (err , "connection error" , "pod " , conn .PodName )
202- deployment .SetPodErrorStatus (conn . PodName , err )
206+ cs .logger .Error (err , "connection error" , "deployment " , conn .ParentName , "uuid" , gi . UUID )
207+ deployment .SetPodErrorStatus (gi . UUID , err )
203208 select {
204209 case channels .ResponseCh <- struct {}{}:
205210 default :
@@ -220,9 +225,9 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error
220225 continue
221226 }
222227 err := fmt .Errorf ("msg: %s; error: %s" , res .GetMessage (), res .GetError ())
223- deployment .SetPodErrorStatus (conn . PodName , err )
228+ deployment .SetPodErrorStatus (gi . UUID , err )
224229 } else {
225- deployment .SetPodErrorStatus (conn . PodName , nil )
230+ deployment .SetPodErrorStatus (gi . UUID , nil )
226231 }
227232
228233 // Signal broadcast completion only for tracked broadcast operations.
@@ -231,7 +236,11 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error
231236 pendingBroadcastRequest = nil
232237 channels .ResponseCh <- struct {}{}
233238 } else {
234- cs .logger .V (1 ).Info ("Received response for non-broadcast request (likely initial config)" , "pod" , conn .PodName )
239+ cs .logger .V (1 ).Info (
240+ "Received response for non-broadcast request (likely initial config)" ,
241+ conn .ParentType , conn .ParentName ,
242+ "uuid" , gi .UUID ,
243+ )
235244 }
236245 }
237246 }
@@ -260,7 +269,7 @@ func (cs *commandService) waitForConnection(
260269 case <- ticker .C :
261270 if conn := cs .connTracker .GetConnection (gi .UUID ); conn .Ready () {
262271 // connection has been established, now ensure that the deployment exists in the store
263- if deployment := cs .nginxDeployments .Get (conn .Parent ); deployment != nil {
272+ if deployment := cs .nginxDeployments .Get (conn .ParentName ); deployment != nil {
264273 return & conn , deployment , nil
265274 }
266275 err = deploymentStoreErr
@@ -274,30 +283,30 @@ func (cs *commandService) waitForConnection(
274283// setInitialConfig gets the initial configuration for this connection and applies it.
275284func (cs * commandService ) setInitialConfig (
276285 ctx context.Context ,
286+ gi * grpcContext.GrpcInfo ,
277287 deployment * Deployment ,
278288 conn * agentgrpc.Connection ,
279289 msgr messenger.Messenger ,
280290) error {
281291 deployment .FileLock .Lock ()
282292 defer deployment .FileLock .Unlock ()
283293
284- _ , pod , err := cs .getPodOwner (conn .PodName )
285- if err != nil {
286- cs .logAndSendErrorStatus (deployment , conn , err )
287-
288- return grpcStatus .Error (codes .Internal , err .Error ())
289- }
290- if err := cs .validatePodImageVersion (pod , deployment .imageVersion ); err != nil {
291- cs .logAndSendErrorStatus (deployment , conn , err )
294+ if err := cs .validatePodImageVersion (conn .ParentName , conn .ParentType , deployment .imageVersion ); err != nil {
295+ cs .logAndSendErrorStatus (gi , deployment , conn , err )
292296 return grpcStatus .Errorf (codes .FailedPrecondition , "nginx image version validation failed: %s" , err .Error ())
293297 }
294298
295299 fileOverviews , configVersion := deployment .GetFileOverviews ()
296300
297- cs .logger .Info ("Sending initial configuration to agent" , "pod" , conn .PodName , "configVersion" , configVersion )
301+ cs .logger .Info (
302+ "Sending initial configuration to agent" ,
303+ conn .ParentType , conn .ParentName ,
304+ "uuid" , gi .UUID ,
305+ "configVersion" , configVersion ,
306+ )
298307
299308 if err := msgr .Send (ctx , buildRequest (fileOverviews , conn .InstanceID , configVersion )); err != nil {
300- cs .logAndSendErrorStatus (deployment , conn , err )
309+ cs .logAndSendErrorStatus (gi , deployment , conn , err )
301310
302311 return grpcStatus .Error (codes .Internal , err .Error ())
303312 }
@@ -321,7 +330,7 @@ func (cs *commandService) setInitialConfig(
321330 true , // poll immediately
322331 func (ctx context.Context ) (bool , error ) {
323332 if err := msgr .Send (ctx , buildPlusAPIRequest (action , conn .InstanceID )); err != nil {
324- cs .logAndSendErrorStatus (deployment , conn , err )
333+ cs .logAndSendErrorStatus (gi , deployment , conn , err )
325334
326335 return false , grpcStatus .Error (codes .Internal , err .Error ())
327336 }
@@ -350,7 +359,7 @@ func (cs *commandService) setInitialConfig(
350359 cancel ()
351360 }
352361 // send the status (error or nil) to the status queue
353- cs .logAndSendErrorStatus (deployment , conn , errors .Join (errs ... ))
362+ cs .logAndSendErrorStatus (gi , deployment , conn , errors .Join (errs ... ))
354363
355364 return nil
356365}
@@ -393,16 +402,25 @@ func (cs *commandService) waitForInitialConfigApply(
393402// the full Deployment error status to the status queue. This ensures that any other Pod errors that already
394403// exist on the Deployment are not overwritten.
395404// If the error is nil, then we just enqueue the nil value and don't log it, which indicates success.
396- func (cs * commandService ) logAndSendErrorStatus (deployment * Deployment , conn * agentgrpc.Connection , err error ) {
405+ func (cs * commandService ) logAndSendErrorStatus (
406+ gi * grpcContext.GrpcInfo ,
407+ deployment * Deployment ,
408+ conn * agentgrpc.Connection ,
409+ err error ,
410+ ) {
397411 if err != nil {
398412 cs .logger .Error (err , "error sending request to agent" )
399413 } else {
400- cs .logger .Info ("Successfully configured nginx for new subscription" , "pod" , conn .PodName )
414+ cs .logger .Info (
415+ "Successfully configured nginx for new subscription" ,
416+ conn .ParentType , conn .ParentName ,
417+ "uuid" , gi .UUID ,
418+ )
401419 }
402- deployment .SetPodErrorStatus (conn . PodName , err )
420+ deployment .SetPodErrorStatus (gi . UUID , err )
403421
404422 queueObj := & status.QueueObject {
405- Deployment : conn .Parent ,
423+ Deployment : conn .ParentName ,
406424 Error : deployment .GetConfigurationStatus (),
407425 UpdateType : status .UpdateAll ,
408426 }
@@ -454,99 +472,56 @@ func buildPlusAPIRequest(action *pb.NGINXPlusAction, instanceID string) *pb.Mana
454472 }
455473}
456474
457- func (cs * commandService ) getPodOwner (podName string ) (types.NamespacedName , * v1.Pod , error ) {
458- ctx , cancel := context .WithTimeout (context .Background (), 30 * time .Second )
459- defer cancel ()
460-
461- var pods v1.PodList
462- listOpts := & client.ListOptions {
463- FieldSelector : fields .SelectorFromSet (fields.Set {"metadata.name" : podName }),
464- }
465- if err := cs .k8sReader .List (ctx , & pods , listOpts ); err != nil {
466- return types.NamespacedName {}, nil , fmt .Errorf ("error listing pods: %w" , err )
467- }
468-
469- if len (pods .Items ) == 0 {
470- return types.NamespacedName {}, nil , fmt .Errorf ("no pods found with name %q" , podName )
471- }
472-
473- if len (pods .Items ) > 1 {
474- return types.NamespacedName {}, nil , fmt .Errorf ("should only be one pod with name %q" , podName )
475- }
476- pod := & pods .Items [0 ]
477-
478- podOwnerRefs := pod .GetOwnerReferences ()
479- if len (podOwnerRefs ) != 1 {
480- tooManyOwnersError := "expected one owner reference of the nginx Pod, got %d"
481- return types.NamespacedName {}, nil , fmt .Errorf (tooManyOwnersError , len (podOwnerRefs ))
482- }
483-
484- if podOwnerRefs [0 ].Kind != "ReplicaSet" && podOwnerRefs [0 ].Kind != "DaemonSet" {
485- err := fmt .Errorf ("expected pod owner reference to be ReplicaSet or DaemonSet, got %s" , podOwnerRefs [0 ].Kind )
486- return types.NamespacedName {}, nil , err
487- }
488-
489- if podOwnerRefs [0 ].Kind == "DaemonSet" {
490- return types.NamespacedName {Namespace : pod .Namespace , Name : podOwnerRefs [0 ].Name }, pod , nil
491- }
492-
493- var replicaSet appsv1.ReplicaSet
494- var replicaSetErr error
495- if err := wait .PollUntilContextCancel (
496- ctx ,
497- 500 * time .Millisecond ,
498- true , /* poll immediately */
499- func (ctx context.Context ) (bool , error ) {
500- if err := cs .k8sReader .Get (
501- ctx ,
502- types.NamespacedName {Namespace : pod .Namespace , Name : podOwnerRefs [0 ].Name },
503- & replicaSet ,
504- ); err != nil {
505- replicaSetErr = err
506- return false , nil //nolint:nilerr // error is returned at the end
507- }
508-
509- return true , nil
510- },
511- ); err != nil {
512- return types.NamespacedName {}, nil , fmt .Errorf ("failed to get nginx Pod's ReplicaSet: %w" , replicaSetErr )
513- }
514-
515- replicaOwnerRefs := replicaSet .GetOwnerReferences ()
516- if len (replicaOwnerRefs ) != 1 {
517- err := fmt .Errorf ("expected one owner reference of the nginx ReplicaSet, got %d" , len (replicaOwnerRefs ))
518- return types.NamespacedName {}, nil , err
519- }
520-
521- return types.NamespacedName {Namespace : pod .Namespace , Name : replicaOwnerRefs [0 ].Name }, pod , nil
522- }
523-
524475// validatePodImageVersion checks if the pod's nginx container image version matches the expected version
525476// from its deployment. Returns an error if versions don't match.
526477func (cs * commandService ) validatePodImageVersion (
527- pod * v1.Pod ,
478+ parent types.NamespacedName ,
479+ parentType string ,
528480 expectedImage string ,
529481) error {
530- var podNginxImage string
482+ var nginxImage string
483+ var found bool
484+
485+ getNginxContainerImage := func (containers []v1.Container ) (string , bool ) {
486+ for _ , c := range containers {
487+ if c .Name == "nginx" {
488+ return c .Image , true
489+ }
490+ }
491+ return "" , false
492+ }
493+
494+ ctx , cancel := context .WithTimeout (context .Background (), 30 * time .Second )
495+ defer cancel ()
531496
532- for _ , container := range pod .Spec .Containers {
533- if container .Name == "nginx" {
534- podNginxImage = container .Image
535- break
497+ switch parentType {
498+ case nginxTypes .DaemonSetType :
499+ ds := & appsv1.DaemonSet {}
500+ if err := cs .k8sReader .Get (ctx , parent , ds ); err != nil {
501+ return fmt .Errorf ("failed to get DaemonSet %s: %w" , parent .String (), err )
536502 }
503+ nginxImage , found = getNginxContainerImage (ds .Spec .Template .Spec .Containers )
504+ case nginxTypes .DeploymentType :
505+ deploy := & appsv1.Deployment {}
506+ if err := cs .k8sReader .Get (ctx , parent , deploy ); err != nil {
507+ return fmt .Errorf ("failed to get Deployment %s: %w" , parent .String (), err )
508+ }
509+ nginxImage , found = getNginxContainerImage (deploy .Spec .Template .Spec .Containers )
510+ default :
511+ return fmt .Errorf ("unknown parentType: %s" , parentType )
537512 }
538- if podNginxImage == "" {
539- return fmt .Errorf ("nginx container not found in pod %q" , pod .Name )
513+
514+ if ! found {
515+ return fmt .Errorf ("nginx container not found in %s %q" , parentType , parent .Name )
540516 }
541517
542- // Compare images
543- if podNginxImage != expectedImage {
544- return fmt .Errorf ("nginx image version mismatch: pod has %q but expected %q" , podNginxImage , expectedImage )
518+ if nginxImage != expectedImage {
519+ return fmt .Errorf ("nginx image version mismatch: has %q but expected %q" , nginxImage , expectedImage )
545520 }
546521
547- cs .logger .V (1 ).Info ("Pod nginx image version validated successfully" ,
548- "podName " , pod . Name ,
549- "image" , podNginxImage )
522+ cs .logger .V (1 ).Info ("nginx image version validated successfully" ,
523+ "parent " , parent . String () ,
524+ "image" , nginxImage )
550525
551526 return nil
552527}
@@ -589,6 +564,35 @@ func getNginxInstanceID(instances []*pb.Instance) string {
589564 return ""
590565}
591566
567+ func getAgentDeploymentNameAndType (instances []* pb.Instance ) (types.NamespacedName , string ) {
568+ var name types.NamespacedName
569+ var depType string
570+
571+ for _ , instance := range instances {
572+ instanceType := instance .GetInstanceMeta ().GetInstanceType ()
573+ if instanceType == pb .InstanceMeta_INSTANCE_TYPE_AGENT {
574+ labels := instance .GetInstanceConfig ().GetAgentConfig ().GetLabels ()
575+
576+ for _ , label := range labels {
577+ fields := label .GetFields ()
578+
579+ if val , ok := fields [nginxTypes .AgentOwnerNameLabel ]; ok {
580+ fullName := val .GetStringValue ()
581+ parts := strings .SplitN (fullName , "_" , 2 )
582+ if len (parts ) == 2 {
583+ name = types.NamespacedName {Namespace : parts [0 ], Name : parts [1 ]}
584+ }
585+ }
586+ if val , ok := fields [nginxTypes .AgentOwnerTypeLabel ]; ok {
587+ depType = val .GetStringValue ()
588+ }
589+ }
590+ }
591+ }
592+
593+ return name , depType
594+ }
595+
592596// UpdateDataPlaneHealth includes full health information about the data plane as reported by the agent.
593597func (* commandService ) UpdateDataPlaneHealth (
594598 context.Context ,
0 commit comments