@@ -47,6 +47,7 @@ import (
4747 helper "github.com/fluxcd/pkg/runtime/controller"
4848 "github.com/fluxcd/pkg/runtime/patch"
4949 "github.com/fluxcd/pkg/runtime/predicates"
50+ rreconcile "github.com/fluxcd/pkg/runtime/reconcile"
5051
5152 eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
5253 "github.com/fluxcd/pkg/sourceignore"
@@ -119,6 +120,8 @@ type BucketReconciler struct {
119120
120121 Storage * Storage
121122 ControllerName string
123+
124+ patchOptions []patch.Option
122125}
123126
124127type BucketReconcilerOptions struct {
@@ -151,7 +154,7 @@ type BucketProvider interface {
151154// bucketReconcileFunc is the function type for all the v1beta2.Bucket
152155// (sub)reconcile functions. The type implementations are grouped and
153156// executed serially to perform the complete reconcile of the object.
154- type bucketReconcileFunc func (ctx context.Context , obj * sourcev1.Bucket , index * etagIndex , dir string ) (sreconcile.Result , error )
157+ type bucketReconcileFunc func (ctx context.Context , sp * patch. SerialPatcher , obj * sourcev1.Bucket , index * etagIndex , dir string ) (sreconcile.Result , error )
155158
156159// etagIndex is an index of storage object keys and their Etag values.
157160type etagIndex struct {
@@ -234,6 +237,8 @@ func (r *BucketReconciler) SetupWithManager(mgr ctrl.Manager) error {
234237}
235238
236239func (r * BucketReconciler ) SetupWithManagerAndOptions (mgr ctrl.Manager , opts BucketReconcilerOptions ) error {
240+ r .patchOptions = getPatchOptions (bucketReadyCondition .Owned , r .ControllerName )
241+
237242 return ctrl .NewControllerManagedBy (mgr ).
238243 For (& sourcev1.Bucket {}).
239244 WithEventFilter (predicate .Or (predicate.GenerationChangedPredicate {}, predicates.ReconcileRequestedPredicate {})).
@@ -259,18 +264,15 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
259264 r .RecordSuspend (ctx , obj , obj .Spec .Suspend )
260265
261266 // Initialize the patch helper with the current version of the object.
262- patchHelper , err := patch .NewHelper (obj , r .Client )
263- if err != nil {
264- return ctrl.Result {}, err
265- }
267+ serialPatcher := patch .NewSerialPatcher (obj , r .Client )
266268
267269 // recResult stores the abstracted reconcile result.
268270 var recResult sreconcile.Result
269271
270272 // Always attempt to patch the object and status after each reconciliation
271273 // NOTE: The final runtime result and error are set in this block.
272274 defer func () {
273- summarizeHelper := summarize .NewHelper (r .EventRecorder , patchHelper )
275+ summarizeHelper := summarize .NewHelper (r .EventRecorder , serialPatcher )
274276 summarizeOpts := []summarize.Option {
275277 summarize .WithConditions (bucketReadyCondition ),
276278 summarize .WithReconcileResult (recResult ),
@@ -316,19 +318,35 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
316318 r .reconcileSource ,
317319 r .reconcileArtifact ,
318320 }
319- recResult , retErr = r .reconcile (ctx , obj , reconcilers )
321+ recResult , retErr = r .reconcile (ctx , serialPatcher , obj , reconcilers )
320322 return
321323}
322324
323325// reconcile iterates through the bucketReconcileFunc tasks for the
324326// object. It returns early on the first call that returns
325327// reconcile.ResultRequeue, or produces an error.
326- func (r * BucketReconciler ) reconcile (ctx context.Context , obj * sourcev1.Bucket , reconcilers []bucketReconcileFunc ) (sreconcile.Result , error ) {
328+ func (r * BucketReconciler ) reconcile (ctx context.Context , sp * patch. SerialPatcher , obj * sourcev1.Bucket , reconcilers []bucketReconcileFunc ) (sreconcile.Result , error ) {
327329 oldObj := obj .DeepCopy ()
328330
329- // Mark as reconciling if generation differs.
330- if obj .Generation != obj .Status .ObservedGeneration {
331- conditions .MarkReconciling (obj , "NewGeneration" , "reconciling new object generation (%d)" , obj .Generation )
331+ rreconcile .ProgressiveStatus (false , obj , meta .ProgressingReason , "reconciliation in progress" )
332+
333+ var recAtVal string
334+ if v , ok := meta .ReconcileAnnotationValue (obj .GetAnnotations ()); ok {
335+ recAtVal = v
336+ }
337+
338+ // Persist reconciling if generation differs or reconciliation is requested.
339+ switch {
340+ case obj .Generation != obj .Status .ObservedGeneration :
341+ rreconcile .ProgressiveStatus (false , obj , meta .ProgressingReason ,
342+ "processing object: new generation %d -> %d" , obj .Status .ObservedGeneration , obj .Generation )
343+ if err := sp .Patch (ctx , obj , r .patchOptions ... ); err != nil {
344+ return sreconcile .ResultEmpty , err
345+ }
346+ case recAtVal != obj .Status .GetLastHandledReconcileRequest ():
347+ if err := sp .Patch (ctx , obj , r .patchOptions ... ); err != nil {
348+ return sreconcile .ResultEmpty , err
349+ }
332350 }
333351
334352 // Create temp working dir
@@ -356,7 +374,7 @@ func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket,
356374 )
357375
358376 for _ , rec := range reconcilers {
359- recResult , err := rec (ctx , obj , index , tmpDir )
377+ recResult , err := rec (ctx , sp , obj , index , tmpDir )
360378 // Exit immediately on ResultRequeue.
361379 if recResult == sreconcile .ResultRequeue {
362380 return sreconcile .ResultRequeue , nil
@@ -421,22 +439,31 @@ func (r *BucketReconciler) notify(ctx context.Context, oldObj, newObj *sourcev1.
421439// condition is added.
422440// The hostname of any URL in the Status of the object are updated, to ensure
423441// they match the Storage server hostname of current runtime.
424- func (r * BucketReconciler ) reconcileStorage (ctx context.Context , obj * sourcev1.Bucket , _ * etagIndex , _ string ) (sreconcile.Result , error ) {
442+ func (r * BucketReconciler ) reconcileStorage (ctx context.Context , sp * patch. SerialPatcher , obj * sourcev1.Bucket , _ * etagIndex , _ string ) (sreconcile.Result , error ) {
425443 // Garbage collect previous advertised artifact(s) from storage
426444 _ = r .garbageCollect (ctx , obj )
427445
428446 // Determine if the advertised artifact is still in storage
447+ var artifactMissing bool
429448 if artifact := obj .GetArtifact (); artifact != nil && ! r .Storage .ArtifactExist (* artifact ) {
430449 obj .Status .Artifact = nil
431450 obj .Status .URL = ""
451+ artifactMissing = true
432452 // Remove the condition as the artifact doesn't exist.
433453 conditions .Delete (obj , sourcev1 .ArtifactInStorageCondition )
434454 }
435455
436456 // Record that we do not have an artifact
437457 if obj .GetArtifact () == nil {
438- conditions .MarkReconciling (obj , "NoArtifact" , "no artifact for resource in storage" )
458+ msg := "building artifact"
459+ if artifactMissing {
460+ msg += ": disappeared from storage"
461+ }
462+ rreconcile .ProgressiveStatus (true , obj , meta .ProgressingReason , msg )
439463 conditions .Delete (obj , sourcev1 .ArtifactInStorageCondition )
464+ if err := sp .Patch (ctx , obj , r .patchOptions ... ); err != nil {
465+ return sreconcile .ResultEmpty , err
466+ }
440467 return sreconcile .ResultSuccess , nil
441468 }
442469
@@ -453,7 +480,7 @@ func (r *BucketReconciler) reconcileStorage(ctx context.Context, obj *sourcev1.B
453480// When a SecretRef is defined, it attempts to fetch the Secret before calling
454481// the provider. If this fails, it records v1beta2.FetchFailedCondition=True on
455482// the object and returns early.
456- func (r * BucketReconciler ) reconcileSource (ctx context.Context , obj * sourcev1.Bucket , index * etagIndex , dir string ) (sreconcile.Result , error ) {
483+ func (r * BucketReconciler ) reconcileSource (ctx context.Context , sp * patch. SerialPatcher , obj * sourcev1.Bucket , index * etagIndex , dir string ) (sreconcile.Result , error ) {
457484 secret , err := r .getBucketSecret (ctx , obj )
458485 if err != nil {
459486 e := & serror.Event {Err : err , Reason : sourcev1 .AuthenticationFailedReason }
@@ -528,8 +555,14 @@ func (r *BucketReconciler) reconcileSource(ctx context.Context, obj *sourcev1.Bu
528555
529556 if ! obj .GetArtifact ().HasRevision (revision ) {
530557 message := fmt .Sprintf ("new upstream revision '%s'" , revision )
531- conditions .MarkTrue (obj , sourcev1 .ArtifactOutdatedCondition , "NewRevision" , message )
532- conditions .MarkReconciling (obj , "NewRevision" , message )
558+ if obj .GetArtifact () != nil {
559+ conditions .MarkTrue (obj , sourcev1 .ArtifactOutdatedCondition , "NewRevision" , message )
560+ }
561+ rreconcile .ProgressiveStatus (true , obj , meta .ProgressingReason , "building artifact: %s" , message )
562+ if err := sp .Patch (ctx , obj , r .patchOptions ... ); err != nil {
563+ ctrl .LoggerFrom (ctx ).Error (err , "failed to patch" )
564+ return
565+ }
533566 }
534567 }()
535568
@@ -554,7 +587,7 @@ func (r *BucketReconciler) reconcileSource(ctx context.Context, obj *sourcev1.Bu
554587// early.
555588// On a successful archive, the Artifact in the Status of the object is set,
556589// and the symlink in the Storage is updated to its path.
557- func (r * BucketReconciler ) reconcileArtifact (ctx context.Context , obj * sourcev1.Bucket , index * etagIndex , dir string ) (sreconcile.Result , error ) {
590+ func (r * BucketReconciler ) reconcileArtifact (ctx context.Context , sp * patch. SerialPatcher , obj * sourcev1.Bucket , index * etagIndex , dir string ) (sreconcile.Result , error ) {
558591 // Calculate revision
559592 revision , err := index .Revision ()
560593 if err != nil {
@@ -572,7 +605,7 @@ func (r *BucketReconciler) reconcileArtifact(ctx context.Context, obj *sourcev1.
572605 if obj .GetArtifact ().HasRevision (artifact .Revision ) {
573606 conditions .Delete (obj , sourcev1 .ArtifactOutdatedCondition )
574607 conditions .MarkTrue (obj , sourcev1 .ArtifactInStorageCondition , meta .SucceededReason ,
575- "stored artifact for revision '%s'" , artifact .Revision )
608+ "stored artifact: revision '%s'" , artifact .Revision )
576609 }
577610 }()
578611
0 commit comments