@@ -30,6 +30,8 @@ import (
3030 "google.golang.org/grpc/status"
3131 "k8s.io/apimachinery/pkg/util/sets"
3232 "k8s.io/apimachinery/pkg/util/uuid"
33+ "k8s.io/apimachinery/pkg/util/wait"
34+ "k8s.io/client-go/util/workqueue"
3335 "k8s.io/klog"
3436
3537 "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
@@ -47,6 +49,22 @@ type GCEControllerServer struct {
4749 // operations for that same volume (as defined by Volume Key) return an
4850 // Aborted error
4951 volumeLocks * common.VolumeLocks
52+
53+ // queue is a rate limited work queue for Controller Publish/Unpublish
54+ // Volume calls
55+ queue workqueue.RateLimitingInterface
56+
57+ // publishErrorsSeenOnNode is a list of nodes with attach/detach
58+ // operation failures so those nodes shall be rate limited for all
59+ // the attach/detach operations until there is an attach / detach
60+ // operation succeeds
61+ publishErrorsSeenOnNode map [string ]bool
62+ }
63+
64+ type workItem struct {
65+ ctx context.Context
66+ publishReq * csi.ControllerPublishVolumeRequest
67+ unpublishReq * csi.ControllerUnpublishVolumeRequest
5068}
5169
5270var _ csi.ControllerServer = & GCEControllerServer {}
@@ -309,25 +327,113 @@ func (gceCS *GCEControllerServer) DeleteVolume(ctx context.Context, req *csi.Del
309327 return & csi.DeleteVolumeResponse {}, nil
310328}
311329
330+ // Run starts the GCEControllerServer.
331+ func (gceCS * GCEControllerServer ) Run () {
332+ go wait .Until (gceCS .worker , 1 * time .Second , wait .NeverStop )
333+ }
334+
335+ func (gceCS * GCEControllerServer ) worker () {
336+ // Runs until workqueue is shut down
337+ for gceCS .processNextWorkItem () {
338+ }
339+ }
340+
341+ func (gceCS * GCEControllerServer ) processNextWorkItem () bool {
342+ item , quit := gceCS .queue .Get ()
343+ if quit {
344+ return false
345+ }
346+ defer gceCS .queue .Done (item )
347+
348+ workItem , ok := item .(* workItem )
349+ if ! ok {
350+ gceCS .queue .AddRateLimited (item )
351+ return true
352+ }
353+
354+ if workItem .publishReq != nil {
355+ _ , err := gceCS .executeControllerPublishVolume (workItem .ctx , workItem .publishReq )
356+
357+ if err != nil {
358+ klog .Errorf ("ControllerPublishVolume failed with error: %v" , err )
359+ }
360+ }
361+
362+ if workItem .unpublishReq != nil {
363+ _ , err := gceCS .executeControllerUnpublishVolume (workItem .ctx , workItem .unpublishReq )
364+
365+ if err != nil {
366+ klog .Errorf ("ControllerUnpublishVolume failed with error: %v" , err )
367+ }
368+ }
369+
370+ gceCS .queue .Forget (item )
371+ return true
372+ }
373+
312374func (gceCS * GCEControllerServer ) ControllerPublishVolume (ctx context.Context , req * csi.ControllerPublishVolumeRequest ) (* csi.ControllerPublishVolumeResponse , error ) {
375+ // Only valid requests will be queued
376+ _ , _ , err := gceCS .validateControllerPublishVolumeRequest (ctx , req )
377+
378+ if err != nil {
379+ return nil , err
380+ }
381+
382+ // If the node is not marked, proceed the request
383+ if _ , found := gceCS .publishErrorsSeenOnNode [req .NodeId ]; ! found {
384+ return gceCS .executeControllerPublishVolume (ctx , req )
385+ }
386+
387+ // Node is marked so queue up the request
388+ gceCS .queue .AddRateLimited (& workItem {
389+ ctx : ctx ,
390+ publishReq : req ,
391+ })
392+ return & csi.ControllerPublishVolumeResponse {}, nil
393+ }
394+
395+ func (gceCS * GCEControllerServer ) validateControllerPublishVolumeRequest (ctx context.Context , req * csi.ControllerPublishVolumeRequest ) (string , * meta.Key , error ) {
313396 // Validate arguments
314397 volumeID := req .GetVolumeId ()
315- readOnly := req .GetReadonly ()
316398 nodeID := req .GetNodeId ()
317399 volumeCapability := req .GetVolumeCapability ()
318400 if len (volumeID ) == 0 {
319- return nil , status .Error (codes .InvalidArgument , "ControllerPublishVolume Volume ID must be provided" )
401+ return "" , nil , status .Error (codes .InvalidArgument , "ControllerPublishVolume Volume ID must be provided" )
320402 }
321403 if len (nodeID ) == 0 {
322- return nil , status .Error (codes .InvalidArgument , "ControllerPublishVolume Node ID must be provided" )
404+ return "" , nil , status .Error (codes .InvalidArgument , "ControllerPublishVolume Node ID must be provided" )
323405 }
324406 if volumeCapability == nil {
325- return nil , status .Error (codes .InvalidArgument , "ControllerPublishVolume Volume capability must be provided" )
407+ return "" , nil , status .Error (codes .InvalidArgument , "ControllerPublishVolume Volume capability must be provided" )
326408 }
327409
328410 project , volKey , err := common .VolumeIDToKey (volumeID )
329411 if err != nil {
330- return nil , status .Error (codes .InvalidArgument , fmt .Sprintf ("ControllerPublishVolume volume ID is invalid: %v" , err ))
412+ return "" , nil , status .Error (codes .InvalidArgument , fmt .Sprintf ("ControllerPublishVolume volume ID is invalid: %v" , err ))
413+ }
414+
415+ // TODO(#253): Check volume capability matches for ALREADY_EXISTS
416+ if err = validateVolumeCapability (volumeCapability ); err != nil {
417+ return "" , nil , status .Error (codes .InvalidArgument , fmt .Sprintf ("VolumeCapabilities is invalid: %v" , err ))
418+ }
419+
420+ return project , volKey , nil
421+ }
422+
423+ func (gceCS * GCEControllerServer ) executeControllerPublishVolume (ctx context.Context , req * csi.ControllerPublishVolumeRequest ) (* csi.ControllerPublishVolumeResponse , error ) {
424+ project , volKey , err := gceCS .validateControllerPublishVolumeRequest (ctx , req )
425+
426+ if err != nil {
427+ return nil , err
428+ }
429+
430+ volumeID := req .GetVolumeId ()
431+ readOnly := req .GetReadonly ()
432+ nodeID := req .GetNodeId ()
433+ volumeCapability := req .GetVolumeCapability ()
434+
435+ pubVolResp := & csi.ControllerPublishVolumeResponse {
436+ PublishContext : nil ,
331437 }
332438
333439 project , volKey , err = gceCS .CloudProvider .RepairUnderspecifiedVolumeKey (ctx , project , volKey )
@@ -346,15 +452,6 @@ func (gceCS *GCEControllerServer) ControllerPublishVolume(ctx context.Context, r
346452 }
347453 defer gceCS .volumeLocks .Release (lockingVolumeID )
348454
349- // TODO(#253): Check volume capability matches for ALREADY_EXISTS
350- if err = validateVolumeCapability (volumeCapability ); err != nil {
351- return nil , status .Error (codes .InvalidArgument , fmt .Sprintf ("VolumeCapabilities is invalid: %v" , err ))
352- }
353-
354- pubVolResp := & csi.ControllerPublishVolumeResponse {
355- PublishContext : nil ,
356- }
357-
358455 _ , err = gceCS .CloudProvider .GetDisk (ctx , project , volKey , gce .GCEAPIVersionV1 )
359456 if err != nil {
360457 if gce .IsGCENotFoundError (err ) {
@@ -404,29 +501,69 @@ func (gceCS *GCEControllerServer) ControllerPublishVolume(ctx context.Context, r
404501
405502 err = gceCS .CloudProvider .WaitForAttach (ctx , project , volKey , instanceZone , instanceName )
406503 if err != nil {
504+ // Mark the node and rate limit all the following attach/detach
505+ // operations for this node
506+ gceCS .publishErrorsSeenOnNode [nodeID ] = true
407507 return nil , status .Error (codes .Internal , fmt .Sprintf ("unknown WaitForAttach error: %v" , err ))
408508 }
409509
510+ // Attach succeeds so unmark the node
511+ delete (gceCS .publishErrorsSeenOnNode , nodeID )
512+
410513 klog .V (4 ).Infof ("ControllerPublishVolume succeeded for disk %v to instance %v" , volKey , nodeID )
411514 return pubVolResp , nil
412515}
413516
414517func (gceCS * GCEControllerServer ) ControllerUnpublishVolume (ctx context.Context , req * csi.ControllerUnpublishVolumeRequest ) (* csi.ControllerUnpublishVolumeResponse , error ) {
518+ // Only valid requests will be queued
519+ _ , _ , err := gceCS .validateControllerUnpublishVolumeRequest (ctx , req )
520+
521+ if err != nil {
522+ return nil , err
523+ }
524+
525+ // If the node is not marked, proceed the request
526+ if _ , found := gceCS .publishErrorsSeenOnNode [req .NodeId ]; ! found {
527+ return gceCS .executeControllerUnpublishVolume (ctx , req )
528+ }
529+
530+ // Node is marked so queue up the request
531+ gceCS .queue .AddRateLimited (& workItem {
532+ ctx : ctx ,
533+ unpublishReq : req ,
534+ })
535+
536+ return & csi.ControllerUnpublishVolumeResponse {}, nil
537+ }
538+
539+ func (gceCS * GCEControllerServer ) validateControllerUnpublishVolumeRequest (ctx context.Context , req * csi.ControllerUnpublishVolumeRequest ) (string , * meta.Key , error ) {
415540 // Validate arguments
416541 volumeID := req .GetVolumeId ()
417542 nodeID := req .GetNodeId ()
418543 if len (volumeID ) == 0 {
419- return nil , status .Error (codes .InvalidArgument , "ControllerUnpublishVolume Volume ID must be provided" )
544+ return "" , nil , status .Error (codes .InvalidArgument , "ControllerUnpublishVolume Volume ID must be provided" )
420545 }
421546 if len (nodeID ) == 0 {
422- return nil , status .Error (codes .InvalidArgument , "ControllerUnpublishVolume Node ID must be provided" )
547+ return "" , nil , status .Error (codes .InvalidArgument , "ControllerUnpublishVolume Node ID must be provided" )
423548 }
424549
425550 project , volKey , err := common .VolumeIDToKey (volumeID )
426551 if err != nil {
427- return nil , status .Error (codes .InvalidArgument , fmt .Sprintf ("ControllerUnpublishVolume Volume ID is invalid: %v" , err ))
552+ return "" , nil , status .Error (codes .InvalidArgument , fmt .Sprintf ("ControllerUnpublishVolume Volume ID is invalid: %v" , err ))
428553 }
429554
555+ return project , volKey , nil
556+ }
557+
558+ func (gceCS * GCEControllerServer ) executeControllerUnpublishVolume (ctx context.Context , req * csi.ControllerUnpublishVolumeRequest ) (* csi.ControllerUnpublishVolumeResponse , error ) {
559+ project , volKey , err := gceCS .validateControllerUnpublishVolumeRequest (ctx , req )
560+
561+ if err != nil {
562+ return nil , err
563+ }
564+
565+ volumeID := req .GetVolumeId ()
566+ nodeID := req .GetNodeId ()
430567 project , volKey , err = gceCS .CloudProvider .RepairUnderspecifiedVolumeKey (ctx , project , volKey )
431568 if err != nil {
432569 if gce .IsGCENotFoundError (err ) {
@@ -472,9 +609,15 @@ func (gceCS *GCEControllerServer) ControllerUnpublishVolume(ctx context.Context,
472609
473610 err = gceCS .CloudProvider .DetachDisk (ctx , project , deviceName , instanceZone , instanceName )
474611 if err != nil {
612+ // Mark the node and rate limit all the following attach/detach
613+ // operations for this node
614+ gceCS .publishErrorsSeenOnNode [nodeID ] = true
475615 return nil , status .Error (codes .Internal , fmt .Sprintf ("unknown detach error: %v" , err ))
476616 }
477617
618+ // Detach succeeds so unmark the node
619+ delete (gceCS .publishErrorsSeenOnNode , nodeID )
620+
478621 klog .V (4 ).Infof ("ControllerUnpublishVolume succeeded for disk %v from node %v" , volKey , nodeID )
479622 return & csi.ControllerUnpublishVolumeResponse {}, nil
480623}
0 commit comments