@@ -31,14 +31,18 @@ import (
3131 "google.golang.org/grpc/status"
3232 "k8s.io/apimachinery/pkg/util/sets"
3333 "k8s.io/apimachinery/pkg/util/uuid"
34- "k8s.io/apimachinery/pkg/util/wait"
35- "k8s.io/client-go/util/workqueue"
34+ "k8s.io/client-go/util/flowcontrol"
3635 "k8s.io/klog"
3736
3837 "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
3938 gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute"
4039)
4140
41+ const (
42+ nodeBackoffInitialDuration = 200 * time .Millisecond
43+ nodeBackoffMaxDuration = 5 * time .Minute
44+ )
45+
4246type GCEControllerServer struct {
4347 Driver * GCEDriver
4448 CloudProvider gce.GCECompute
@@ -54,15 +58,9 @@ type GCEControllerServer struct {
5458 // Aborted error
5559 volumeLocks * common.VolumeLocks
5660
57- // queue is a rate limited work queue for Controller Publish/Unpublish
58- // Volume calls
59- queue workqueue.RateLimitingInterface
60-
61- // publishErrorsSeenOnNode is a list of nodes with attach/detach
62- // operation failures so those nodes shall be rate limited for all
63- // the attach/detach operations until there is an attach / detach
64- // operation succeeds
65- publishErrorsSeenOnNode map [string ]bool
61+ // When the attacher sidecar issues controller publish/unpublish for multiple disks for a given node, the per-instance operation queue in GCE fills up causing attach/detach disk requests to immediately return with an error until the queue drains. nodeBackoff keeps track of any active backoff condition on a given node, and the time when retry of controller publish/unpublish is permissible. A node is marked with backoff when any error is encountered by the driver during controller publish/unpublish calls.
62+ // If the controller eventually allows controller publish/publish requests for volumes (because the backoff time expired), and those requests fail, the next backoff retry time will be updated on every failure and capped at 'nodeBackoffMaxDuration'. Also, any successful controller publish/unpublish call will clear the backoff condition for the node.
63+ nodeBackoff * flowcontrol.Backoff
6664}
6765
6866type workItem struct {
@@ -364,73 +362,26 @@ func (gceCS *GCEControllerServer) DeleteVolume(ctx context.Context, req *csi.Del
364362 return & csi.DeleteVolumeResponse {}, nil
365363}
366364
367- // Run starts the GCEControllerServer.
368- func (gceCS * GCEControllerServer ) Run () {
369- go wait .Until (gceCS .worker , 1 * time .Second , wait .NeverStop )
370- }
371-
372- func (gceCS * GCEControllerServer ) worker () {
373- // Runs until workqueue is shut down
374- for gceCS .processNextWorkItem () {
375- }
376- }
377-
378- func (gceCS * GCEControllerServer ) processNextWorkItem () bool {
379- item , quit := gceCS .queue .Get ()
380- if quit {
381- return false
382- }
383- defer gceCS .queue .Done (item )
384-
385- workItem , ok := item .(* workItem )
386- if ! ok {
387- gceCS .queue .AddRateLimited (item )
388- return true
389- }
390-
391- if workItem .publishReq != nil {
392- _ , err := gceCS .executeControllerPublishVolume (workItem .ctx , workItem .publishReq )
393-
394- if err != nil {
395- klog .Errorf ("ControllerPublishVolume failed with error: %v" , err )
396- }
397- }
398-
399- if workItem .unpublishReq != nil {
400- _ , err := gceCS .executeControllerUnpublishVolume (workItem .ctx , workItem .unpublishReq )
401-
402- if err != nil {
403- klog .Errorf ("ControllerUnpublishVolume failed with error: %v" , err )
404- }
405- }
406-
407- gceCS .queue .Forget (item )
408- return true
409- }
410-
411365func (gceCS * GCEControllerServer ) ControllerPublishVolume (ctx context.Context , req * csi.ControllerPublishVolumeRequest ) (* csi.ControllerPublishVolumeResponse , error ) {
412- // Only valid requests will be queued
366+ // Only valid requests will be accepted
413367 _ , _ , err := gceCS .validateControllerPublishVolumeRequest (ctx , req )
414-
415368 if err != nil {
416369 return nil , err
417370 }
418371
419- // If the node is not marked, proceed the request
420- if _ , found := gceCS .publishErrorsSeenOnNode [req .NodeId ]; ! found {
421- return gceCS .executeControllerPublishVolume (ctx , req )
372+ if gceCS .nodeBackoff .IsInBackOffSinceUpdate (req .NodeId , gceCS .nodeBackoff .Clock .Now ()) {
373+ return nil , status .Errorf (codes .Unavailable , "ControllerPublish not permitted on node %q due to backoff condition" , req .NodeId )
422374 }
423375
424- // Node is marked so queue up the request. Note the original gRPC context may get canceled,
425- // so a new one is created here.
426- //
427- // Note that the original context probably has a timeout (see csiAttach in external-attacher),
428- // which is ignored.
429- gceCS .queue .AddRateLimited (& workItem {
430- ctx : context .Background (),
431- publishReq : req ,
432- })
433- return nil , status .Error (codes .Unavailable , "Request queued due to error condition on node" )
376+ resp , err := gceCS .executeControllerPublishVolume (ctx , req )
377+ if err != nil {
378+ klog .Infof ("For node %s adding backoff due to error for volume %s" , req .NodeId , req .VolumeId )
379+ gceCS .nodeBackoff .Next (req .NodeId , gceCS .nodeBackoff .Clock .Now ())
380+ } else {
381+ klog .Infof ("For node %s clear backoff due to successful publish of volume %v" , req .NodeId , req .VolumeId )
382+ gceCS .nodeBackoff .Reset (req .NodeId )
383+ }
384+ return resp , err
434385}
435386
436387func (gceCS * GCEControllerServer ) validateControllerPublishVolumeRequest (ctx context.Context , req * csi.ControllerPublishVolumeRequest ) (string , * meta.Key , error ) {
@@ -542,39 +493,32 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con
542493
543494 err = gceCS .CloudProvider .WaitForAttach (ctx , project , volKey , instanceZone , instanceName )
544495 if err != nil {
545- // Mark the node and rate limit all the following attach/detach
546- // operations for this node
547- gceCS .publishErrorsSeenOnNode [nodeID ] = true
548496 return nil , status .Error (codes .Internal , fmt .Sprintf ("unknown WaitForAttach error: %v" , err ))
549497 }
550-
551- // Attach succeeds so unmark the node
552- delete (gceCS .publishErrorsSeenOnNode , nodeID )
553-
554498 klog .V (4 ).Infof ("ControllerPublishVolume succeeded for disk %v to instance %v" , volKey , nodeID )
555499 return pubVolResp , nil
556500}
557501
558502func (gceCS * GCEControllerServer ) ControllerUnpublishVolume (ctx context.Context , req * csi.ControllerUnpublishVolumeRequest ) (* csi.ControllerUnpublishVolumeResponse , error ) {
559503 // Only valid requests will be queued
560504 _ , _ , err := gceCS .validateControllerUnpublishVolumeRequest (ctx , req )
561-
562505 if err != nil {
563506 return nil , err
564507 }
565508
566- // If the node is not marked, proceed the request
567- if _ , found := gceCS .publishErrorsSeenOnNode [req .NodeId ]; ! found {
568- return gceCS .executeControllerUnpublishVolume (ctx , req )
509+ if gceCS .nodeBackoff .IsInBackOffSinceUpdate (req .NodeId , gceCS .nodeBackoff .Clock .Now ()) {
510+ return nil , status .Errorf (codes .Unavailable , "ControllerUnpublish not permitted on node %q due to backoff condition" , req .NodeId )
569511 }
570512
571- // Node is marked so queue up the request
572- gceCS .queue .AddRateLimited (& workItem {
573- ctx : context .Background (),
574- unpublishReq : req ,
575- })
576-
577- return nil , status .Error (codes .Unavailable , "Request queued due to error condition on node" )
513+ resp , err := gceCS .executeControllerUnpublishVolume (ctx , req )
514+ if err != nil {
515+ klog .Infof ("For node %s adding backoff due to error for volume %s" , req .NodeId , req .VolumeId )
516+ gceCS .nodeBackoff .Next (req .NodeId , gceCS .nodeBackoff .Clock .Now ())
517+ } else {
518+ klog .Infof ("For node %s clear backoff due to successful unpublish of volume %v" , req .NodeId , req .VolumeId )
519+ gceCS .nodeBackoff .Reset (req .NodeId )
520+ }
521+ return resp , err
578522}
579523
580524func (gceCS * GCEControllerServer ) validateControllerUnpublishVolumeRequest (ctx context.Context , req * csi.ControllerUnpublishVolumeRequest ) (string , * meta.Key , error ) {
@@ -650,15 +594,9 @@ func (gceCS *GCEControllerServer) executeControllerUnpublishVolume(ctx context.C
650594
651595 err = gceCS .CloudProvider .DetachDisk (ctx , project , deviceName , instanceZone , instanceName )
652596 if err != nil {
653- // Mark the node and rate limit all the following attach/detach
654- // operations for this node
655- gceCS .publishErrorsSeenOnNode [nodeID ] = true
656597 return nil , status .Error (codes .Internal , fmt .Sprintf ("unknown detach error: %v" , err ))
657598 }
658599
659- // Detach succeeds so unmark the node
660- delete (gceCS .publishErrorsSeenOnNode , nodeID )
661-
662600 klog .V (4 ).Infof ("ControllerUnpublishVolume succeeded for disk %v from node %v" , volKey , nodeID )
663601 return & csi.ControllerUnpublishVolumeResponse {}, nil
664602}
0 commit comments