@@ -30,6 +30,8 @@ import (
3030 "google.golang.org/grpc/status"
3131 "k8s.io/apimachinery/pkg/util/sets"
3232 "k8s.io/apimachinery/pkg/util/uuid"
33+ "k8s.io/apimachinery/pkg/util/wait"
34+ "k8s.io/client-go/util/workqueue"
3335 "k8s.io/klog"
3436
3537 "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
@@ -47,6 +49,22 @@ type GCEControllerServer struct {
4749 // operations for that same volume (as defined by Volume Key) return an
4850 // Aborted error
4951 volumeLocks * common.VolumeLocks
52+
53+ // queue is a rate limited work queue for Controller Publish/Unpublish
54+ // Volume calls
55+ queue workqueue.RateLimitingInterface
56+
57+ // publishErrorsSeenOnNode is a list of nodes with attach/detach
58+ // operation failures so those nodes shall be rate limited for all
59+ // the attach/detach operations until there is an attach / detach
60+ // operation succeeds
61+ publishErrorsSeenOnNode map [string ]bool
62+ }
63+
64+ type workItem struct {
65+ ctx context.Context
66+ publishReq * csi.ControllerPublishVolumeRequest
67+ unpublishReq * csi.ControllerUnpublishVolumeRequest
5068}
5169
5270var _ csi.ControllerServer = & GCEControllerServer {}
@@ -280,25 +298,113 @@ func (gceCS *GCEControllerServer) DeleteVolume(ctx context.Context, req *csi.Del
280298 return & csi.DeleteVolumeResponse {}, nil
281299}
282300
301+ // Run starts the GCEControllerServer.
302+ func (gceCS * GCEControllerServer ) Run () {
303+ go wait .Until (gceCS .worker , 1 * time .Second , wait .NeverStop )
304+ }
305+
306+ func (gceCS * GCEControllerServer ) worker () {
307+ // Runs until workqueue is shut down
308+ for gceCS .processNextWorkItem () {
309+ }
310+ }
311+
312+ func (gceCS * GCEControllerServer ) processNextWorkItem () bool {
313+ item , quit := gceCS .queue .Get ()
314+ if quit {
315+ return false
316+ }
317+ defer gceCS .queue .Done (item )
318+
319+ workItem , ok := item .(* workItem )
320+ if ! ok {
321+ gceCS .queue .AddRateLimited (item )
322+ return true
323+ }
324+
325+ if workItem .publishReq != nil {
326+ _ , err := gceCS .executeControllerPublishVolume (workItem .ctx , workItem .publishReq )
327+
328+ if err != nil {
329+ klog .Errorf ("ControllerPublishVolume failed with error: %v" , err )
330+ }
331+ }
332+
333+ if workItem .unpublishReq != nil {
334+ _ , err := gceCS .executeControllerUnpublishVolume (workItem .ctx , workItem .unpublishReq )
335+
336+ if err != nil {
337+ klog .Errorf ("ControllerUnpublishVolume failed with error: %v" , err )
338+ }
339+ }
340+
341+ gceCS .queue .Forget (item )
342+ return true
343+ }
344+
283345func (gceCS * GCEControllerServer ) ControllerPublishVolume (ctx context.Context , req * csi.ControllerPublishVolumeRequest ) (* csi.ControllerPublishVolumeResponse , error ) {
346+ // Only valid requests will be queued
347+ _ , _ , err := gceCS .validateControllerPublishVolumeRequest (ctx , req )
348+
349+ if err != nil {
350+ return nil , err
351+ }
352+
353+ // If the node is not marked, proceed the request
354+ if _ , found := gceCS .publishErrorsSeenOnNode [req .NodeId ]; ! found {
355+ return gceCS .executeControllerPublishVolume (ctx , req )
356+ }
357+
358+ // Node is marked so queue up the request
359+ gceCS .queue .AddRateLimited (& workItem {
360+ ctx : ctx ,
361+ publishReq : req ,
362+ })
363+ return & csi.ControllerPublishVolumeResponse {}, nil
364+ }
365+
366+ func (gceCS * GCEControllerServer ) validateControllerPublishVolumeRequest (ctx context.Context , req * csi.ControllerPublishVolumeRequest ) (string , * meta.Key , error ) {
284367 // Validate arguments
285368 volumeID := req .GetVolumeId ()
286- readOnly := req .GetReadonly ()
287369 nodeID := req .GetNodeId ()
288370 volumeCapability := req .GetVolumeCapability ()
289371 if len (volumeID ) == 0 {
290- return nil , status .Error (codes .InvalidArgument , "ControllerPublishVolume Volume ID must be provided" )
372+ return "" , nil , status .Error (codes .InvalidArgument , "ControllerPublishVolume Volume ID must be provided" )
291373 }
292374 if len (nodeID ) == 0 {
293- return nil , status .Error (codes .InvalidArgument , "ControllerPublishVolume Node ID must be provided" )
375+ return "" , nil , status .Error (codes .InvalidArgument , "ControllerPublishVolume Node ID must be provided" )
294376 }
295377 if volumeCapability == nil {
296- return nil , status .Error (codes .InvalidArgument , "ControllerPublishVolume Volume capability must be provided" )
378+ return "" , nil , status .Error (codes .InvalidArgument , "ControllerPublishVolume Volume capability must be provided" )
297379 }
298380
299381 project , volKey , err := common .VolumeIDToKey (volumeID )
300382 if err != nil {
301- return nil , status .Error (codes .InvalidArgument , fmt .Sprintf ("ControllerPublishVolume volume ID is invalid: %v" , err ))
383+ return "" , nil , status .Error (codes .InvalidArgument , fmt .Sprintf ("ControllerPublishVolume volume ID is invalid: %v" , err ))
384+ }
385+
386+ // TODO(#253): Check volume capability matches for ALREADY_EXISTS
387+ if err = validateVolumeCapability (volumeCapability ); err != nil {
388+ return "" , nil , status .Error (codes .InvalidArgument , fmt .Sprintf ("VolumeCapabilities is invalid: %v" , err ))
389+ }
390+
391+ return project , volKey , nil
392+ }
393+
394+ func (gceCS * GCEControllerServer ) executeControllerPublishVolume (ctx context.Context , req * csi.ControllerPublishVolumeRequest ) (* csi.ControllerPublishVolumeResponse , error ) {
395+ project , volKey , err := gceCS .validateControllerPublishVolumeRequest (ctx , req )
396+
397+ if err != nil {
398+ return nil , err
399+ }
400+
401+ volumeID := req .GetVolumeId ()
402+ readOnly := req .GetReadonly ()
403+ nodeID := req .GetNodeId ()
404+ volumeCapability := req .GetVolumeCapability ()
405+
406+ pubVolResp := & csi.ControllerPublishVolumeResponse {
407+ PublishContext : nil ,
302408 }
303409
304410 project , volKey , err = gceCS .CloudProvider .RepairUnderspecifiedVolumeKey (ctx , project , volKey )
@@ -317,15 +423,6 @@ func (gceCS *GCEControllerServer) ControllerPublishVolume(ctx context.Context, r
317423 }
318424 defer gceCS .volumeLocks .Release (lockingVolumeID )
319425
320- // TODO(#253): Check volume capability matches for ALREADY_EXISTS
321- if err = validateVolumeCapability (volumeCapability ); err != nil {
322- return nil , status .Error (codes .InvalidArgument , fmt .Sprintf ("VolumeCapabilities is invalid: %v" , err ))
323- }
324-
325- pubVolResp := & csi.ControllerPublishVolumeResponse {
326- PublishContext : nil ,
327- }
328-
329426 _ , err = gceCS .CloudProvider .GetDisk (ctx , project , volKey , gce .GCEAPIVersionV1 )
330427 if err != nil {
331428 if gce .IsGCENotFoundError (err ) {
@@ -375,29 +472,69 @@ func (gceCS *GCEControllerServer) ControllerPublishVolume(ctx context.Context, r
375472
376473 err = gceCS .CloudProvider .WaitForAttach (ctx , project , volKey , instanceZone , instanceName )
377474 if err != nil {
475+ // Mark the node and rate limit all the following attach/detach
476+ // operations for this node
477+ gceCS .publishErrorsSeenOnNode [nodeID ] = true
378478 return nil , status .Error (codes .Internal , fmt .Sprintf ("unknown WaitForAttach error: %v" , err ))
379479 }
380480
481+ // Attach succeeds so unmark the node
482+ delete (gceCS .publishErrorsSeenOnNode , nodeID )
483+
381484 klog .V (4 ).Infof ("ControllerPublishVolume succeeded for disk %v to instance %v" , volKey , nodeID )
382485 return pubVolResp , nil
383486}
384487
385488func (gceCS * GCEControllerServer ) ControllerUnpublishVolume (ctx context.Context , req * csi.ControllerUnpublishVolumeRequest ) (* csi.ControllerUnpublishVolumeResponse , error ) {
489+ // Only valid requests will be queued
490+ _ , _ , err := gceCS .validateControllerUnpublishVolumeRequest (ctx , req )
491+
492+ if err != nil {
493+ return nil , err
494+ }
495+
496+ // If the node is not marked, proceed the request
497+ if _ , found := gceCS .publishErrorsSeenOnNode [req .NodeId ]; ! found {
498+ return gceCS .executeControllerUnpublishVolume (ctx , req )
499+ }
500+
501+ // Node is marked so queue up the request
502+ gceCS .queue .AddRateLimited (& workItem {
503+ ctx : ctx ,
504+ unpublishReq : req ,
505+ })
506+
507+ return & csi.ControllerUnpublishVolumeResponse {}, nil
508+ }
509+
510+ func (gceCS * GCEControllerServer ) validateControllerUnpublishVolumeRequest (ctx context.Context , req * csi.ControllerUnpublishVolumeRequest ) (string , * meta.Key , error ) {
386511 // Validate arguments
387512 volumeID := req .GetVolumeId ()
388513 nodeID := req .GetNodeId ()
389514 if len (volumeID ) == 0 {
390- return nil , status .Error (codes .InvalidArgument , "ControllerUnpublishVolume Volume ID must be provided" )
515+ return "" , nil , status .Error (codes .InvalidArgument , "ControllerUnpublishVolume Volume ID must be provided" )
391516 }
392517 if len (nodeID ) == 0 {
393- return nil , status .Error (codes .InvalidArgument , "ControllerUnpublishVolume Node ID must be provided" )
518+ return "" , nil , status .Error (codes .InvalidArgument , "ControllerUnpublishVolume Node ID must be provided" )
394519 }
395520
396521 project , volKey , err := common .VolumeIDToKey (volumeID )
397522 if err != nil {
398- return nil , status .Error (codes .InvalidArgument , fmt .Sprintf ("ControllerUnpublishVolume Volume ID is invalid: %v" , err ))
523+ return "" , nil , status .Error (codes .InvalidArgument , fmt .Sprintf ("ControllerUnpublishVolume Volume ID is invalid: %v" , err ))
399524 }
400525
526+ return project , volKey , nil
527+ }
528+
529+ func (gceCS * GCEControllerServer ) executeControllerUnpublishVolume (ctx context.Context , req * csi.ControllerUnpublishVolumeRequest ) (* csi.ControllerUnpublishVolumeResponse , error ) {
530+ project , volKey , err := gceCS .validateControllerUnpublishVolumeRequest (ctx , req )
531+
532+ if err != nil {
533+ return nil , err
534+ }
535+
536+ volumeID := req .GetVolumeId ()
537+ nodeID := req .GetNodeId ()
401538 project , volKey , err = gceCS .CloudProvider .RepairUnderspecifiedVolumeKey (ctx , project , volKey )
402539 if err != nil {
403540 if gce .IsGCENotFoundError (err ) {
@@ -443,9 +580,15 @@ func (gceCS *GCEControllerServer) ControllerUnpublishVolume(ctx context.Context,
443580
444581 err = gceCS .CloudProvider .DetachDisk (ctx , project , deviceName , instanceZone , instanceName )
445582 if err != nil {
583+ // Mark the node and rate limit all the following attach/detach
584+ // operations for this node
585+ gceCS .publishErrorsSeenOnNode [nodeID ] = true
446586 return nil , status .Error (codes .Internal , fmt .Sprintf ("unknown detach error: %v" , err ))
447587 }
448588
589+ // Detach succeeds so unmark the node
590+ delete (gceCS .publishErrorsSeenOnNode , nodeID )
591+
449592 klog .V (4 ).Infof ("ControllerUnpublishVolume succeeded for disk %v from node %v" , volKey , nodeID )
450593 return & csi.ControllerUnpublishVolumeResponse {}, nil
451594}
0 commit comments