mirror of
https://github.com/fluxcd/flux2.git
synced 2026-02-11 02:07:30 +00:00
Merge pull request #4311 from fluxcd/kstatus-readiness
Check readiness of Flux kinds using kstatus
This commit is contained in:
commit
6135c326d8
23 changed files with 464 additions and 420 deletions
|
|
@ -132,7 +132,7 @@ func (names apiType) upsertAndWait(object upsertWaitable, mutate func() error) e
|
|||
|
||||
logger.Waitingf("waiting for %s reconciliation", names.kind)
|
||||
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
|
||||
isReady(kubeClient, namespacedName, object)); err != nil {
|
||||
isObjectReadyConditionFunc(kubeClient, namespacedName, object.asClientObject())); err != nil {
|
||||
return err
|
||||
}
|
||||
logger.Successf("%s reconciliation completed", names.kind)
|
||||
|
|
|
|||
|
|
@ -22,7 +22,6 @@ import (
|
|||
|
||||
"github.com/spf13/cobra"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
apimeta "k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
|
|
@ -133,7 +132,7 @@ func createAlertCmdRun(cmd *cobra.Command, args []string) error {
|
|||
|
||||
logger.Waitingf("waiting for Alert reconciliation")
|
||||
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
|
||||
isAlertReady(kubeClient, namespacedName, &alert)); err != nil {
|
||||
isObjectReadyConditionFunc(kubeClient, namespacedName, &alert)); err != nil {
|
||||
return err
|
||||
}
|
||||
logger.Successf("Alert %s is ready", name)
|
||||
|
|
@ -170,22 +169,3 @@ func upsertAlert(ctx context.Context, kubeClient client.Client,
|
|||
logger.Successf("Alert updated")
|
||||
return namespacedName, nil
|
||||
}
|
||||
|
||||
func isAlertReady(kubeClient client.Client, namespacedName types.NamespacedName, alert *notificationv1b2.Alert) wait.ConditionWithContextFunc {
|
||||
return func(ctx context.Context) (bool, error) {
|
||||
err := kubeClient.Get(ctx, namespacedName, alert)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if c := apimeta.FindStatusCondition(alert.Status.Conditions, meta.ReadyCondition); c != nil {
|
||||
switch c.Status {
|
||||
case metav1.ConditionTrue:
|
||||
return true, nil
|
||||
case metav1.ConditionFalse:
|
||||
return false, fmt.Errorf(c.Message)
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,7 +22,6 @@ import (
|
|||
|
||||
"github.com/spf13/cobra"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
apimeta "k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
|
|
@ -128,7 +127,7 @@ func createAlertProviderCmdRun(cmd *cobra.Command, args []string) error {
|
|||
|
||||
logger.Waitingf("waiting for Provider reconciliation")
|
||||
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
|
||||
isAlertProviderReady(kubeClient, namespacedName, &provider)); err != nil {
|
||||
isObjectReadyConditionFunc(kubeClient, namespacedName, &provider)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
@ -167,22 +166,3 @@ func upsertAlertProvider(ctx context.Context, kubeClient client.Client,
|
|||
logger.Successf("Provider updated")
|
||||
return namespacedName, nil
|
||||
}
|
||||
|
||||
func isAlertProviderReady(kubeClient client.Client, namespacedName types.NamespacedName, provider *notificationv1.Provider) wait.ConditionWithContextFunc {
|
||||
return func(ctx context.Context) (bool, error) {
|
||||
err := kubeClient.Get(ctx, namespacedName, provider)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if c := apimeta.FindStatusCondition(provider.Status.Conditions, meta.ReadyCondition); c != nil {
|
||||
switch c.Status {
|
||||
case metav1.ConditionTrue:
|
||||
return true, nil
|
||||
case metav1.ConditionFalse:
|
||||
return false, fmt.Errorf(c.Message)
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -32,7 +32,6 @@ import (
|
|||
"github.com/spf13/cobra"
|
||||
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
apimeta "k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
|
|
@ -304,7 +303,7 @@ func createHelmReleaseCmdRun(cmd *cobra.Command, args []string) error {
|
|||
|
||||
logger.Waitingf("waiting for HelmRelease reconciliation")
|
||||
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
|
||||
isHelmReleaseReady(kubeClient, namespacedName, &helmRelease)); err != nil {
|
||||
isObjectReadyConditionFunc(kubeClient, namespacedName, &helmRelease)); err != nil {
|
||||
return err
|
||||
}
|
||||
logger.Successf("HelmRelease %s is ready", name)
|
||||
|
|
@ -344,22 +343,6 @@ func upsertHelmRelease(ctx context.Context, kubeClient client.Client,
|
|||
return namespacedName, nil
|
||||
}
|
||||
|
||||
func isHelmReleaseReady(kubeClient client.Client, namespacedName types.NamespacedName, helmRelease *helmv2.HelmRelease) wait.ConditionWithContextFunc {
|
||||
return func(ctx context.Context) (bool, error) {
|
||||
err := kubeClient.Get(ctx, namespacedName, helmRelease)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// Confirm the state we are observing is for the current generation
|
||||
if helmRelease.Generation != helmRelease.Status.ObservedGeneration {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return apimeta.IsStatusConditionTrue(helmRelease.Status.Conditions, meta.ReadyCondition), nil
|
||||
}
|
||||
}
|
||||
|
||||
func validateStrategy(input string) bool {
|
||||
allowedStrategy := []string{"Revision", "ChartVersion"}
|
||||
|
||||
|
|
|
|||
|
|
@ -24,7 +24,6 @@ import (
|
|||
|
||||
"github.com/spf13/cobra"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
apimeta "k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
|
|
@ -264,7 +263,7 @@ func createKsCmdRun(cmd *cobra.Command, args []string) error {
|
|||
|
||||
logger.Waitingf("waiting for Kustomization reconciliation")
|
||||
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
|
||||
isKustomizationReady(kubeClient, namespacedName, &kustomization)); err != nil {
|
||||
isObjectReadyConditionFunc(kubeClient, namespacedName, &kustomization)); err != nil {
|
||||
return err
|
||||
}
|
||||
logger.Successf("Kustomization %s is ready", name)
|
||||
|
|
@ -303,27 +302,3 @@ func upsertKustomization(ctx context.Context, kubeClient client.Client,
|
|||
logger.Successf("Kustomization updated")
|
||||
return namespacedName, nil
|
||||
}
|
||||
|
||||
func isKustomizationReady(kubeClient client.Client, namespacedName types.NamespacedName, kustomization *kustomizev1.Kustomization) wait.ConditionWithContextFunc {
|
||||
return func(ctx context.Context) (bool, error) {
|
||||
err := kubeClient.Get(ctx, namespacedName, kustomization)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// Confirm the state we are observing is for the current generation
|
||||
if kustomization.Generation != kustomization.Status.ObservedGeneration {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if c := apimeta.FindStatusCondition(kustomization.Status.Conditions, meta.ReadyCondition); c != nil {
|
||||
switch c.Status {
|
||||
case metav1.ConditionTrue:
|
||||
return true, nil
|
||||
case metav1.ConditionFalse:
|
||||
return false, fmt.Errorf(c.Message)
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,7 +22,6 @@ import (
|
|||
|
||||
"github.com/spf13/cobra"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
apimeta "k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
|
|
@ -140,7 +139,7 @@ func createReceiverCmdRun(cmd *cobra.Command, args []string) error {
|
|||
|
||||
logger.Waitingf("waiting for Receiver reconciliation")
|
||||
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
|
||||
isReceiverReady(kubeClient, namespacedName, &receiver)); err != nil {
|
||||
isObjectReadyConditionFunc(kubeClient, namespacedName, &receiver)); err != nil {
|
||||
return err
|
||||
}
|
||||
logger.Successf("Receiver %s is ready", name)
|
||||
|
|
@ -179,22 +178,3 @@ func upsertReceiver(ctx context.Context, kubeClient client.Client,
|
|||
logger.Successf("Receiver updated")
|
||||
return namespacedName, nil
|
||||
}
|
||||
|
||||
func isReceiverReady(kubeClient client.Client, namespacedName types.NamespacedName, receiver *notificationv1.Receiver) wait.ConditionWithContextFunc {
|
||||
return func(ctx context.Context) (bool, error) {
|
||||
err := kubeClient.Get(ctx, namespacedName, receiver)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if c := apimeta.FindStatusCondition(receiver.Status.Conditions, meta.ReadyCondition); c != nil {
|
||||
switch c.Status {
|
||||
case metav1.ConditionTrue:
|
||||
return true, nil
|
||||
case metav1.ConditionFalse:
|
||||
return false, fmt.Errorf(c.Message)
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -31,7 +31,6 @@ import (
|
|||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
"github.com/fluxcd/pkg/apis/meta"
|
||||
"github.com/fluxcd/pkg/runtime/conditions"
|
||||
|
||||
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
|
||||
|
||||
|
|
@ -205,7 +204,7 @@ func createSourceBucketCmdRun(cmd *cobra.Command, args []string) error {
|
|||
|
||||
logger.Waitingf("waiting for Bucket source reconciliation")
|
||||
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
|
||||
isBucketReady(kubeClient, namespacedName, bucket)); err != nil {
|
||||
isObjectReadyConditionFunc(kubeClient, namespacedName, bucket)); err != nil {
|
||||
return err
|
||||
}
|
||||
logger.Successf("Bucket source reconciliation completed")
|
||||
|
|
@ -247,29 +246,3 @@ func upsertBucket(ctx context.Context, kubeClient client.Client,
|
|||
logger.Successf("Bucket source updated")
|
||||
return namespacedName, nil
|
||||
}
|
||||
|
||||
func isBucketReady(kubeClient client.Client, namespacedName types.NamespacedName, bucket *sourcev1.Bucket) wait.ConditionWithContextFunc {
|
||||
return func(ctx context.Context) (bool, error) {
|
||||
err := kubeClient.Get(ctx, namespacedName, bucket)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if c := conditions.Get(bucket, meta.ReadyCondition); c != nil {
|
||||
// Confirm the Ready condition we are observing is for the
|
||||
// current generation
|
||||
if c.ObservedGeneration != bucket.GetGeneration() {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Further check the Status
|
||||
switch c.Status {
|
||||
case metav1.ConditionTrue:
|
||||
return true, nil
|
||||
case metav1.ConditionFalse:
|
||||
return false, fmt.Errorf(c.Message)
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -35,7 +35,6 @@ import (
|
|||
"sigs.k8s.io/yaml"
|
||||
|
||||
"github.com/fluxcd/pkg/apis/meta"
|
||||
"github.com/fluxcd/pkg/runtime/conditions"
|
||||
|
||||
sourcev1 "github.com/fluxcd/source-controller/api/v1"
|
||||
|
||||
|
|
@ -326,7 +325,7 @@ func createSourceGitCmdRun(cmd *cobra.Command, args []string) error {
|
|||
|
||||
logger.Waitingf("waiting for GitRepository source reconciliation")
|
||||
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
|
||||
isGitRepositoryReady(kubeClient, namespacedName, &gitRepository)); err != nil {
|
||||
isObjectReadyConditionFunc(kubeClient, namespacedName, &gitRepository)); err != nil {
|
||||
return err
|
||||
}
|
||||
logger.Successf("GitRepository source reconciliation completed")
|
||||
|
|
@ -368,29 +367,3 @@ func upsertGitRepository(ctx context.Context, kubeClient client.Client,
|
|||
logger.Successf("GitRepository source updated")
|
||||
return namespacedName, nil
|
||||
}
|
||||
|
||||
func isGitRepositoryReady(kubeClient client.Client, namespacedName types.NamespacedName, gitRepository *sourcev1.GitRepository) wait.ConditionWithContextFunc {
|
||||
return func(ctx context.Context) (bool, error) {
|
||||
err := kubeClient.Get(ctx, namespacedName, gitRepository)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if c := conditions.Get(gitRepository, meta.ReadyCondition); c != nil {
|
||||
// Confirm the Ready condition we are observing is for the
|
||||
// current generation
|
||||
if c.ObservedGeneration != gitRepository.GetGeneration() {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Further check the Status
|
||||
switch c.Status {
|
||||
case metav1.ConditionTrue:
|
||||
return true, nil
|
||||
case metav1.ConditionFalse:
|
||||
return false, fmt.Errorf(c.Message)
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -181,12 +181,21 @@ func TestCreateSourceGit(t *testing.T) {
|
|||
Time: time.Now(),
|
||||
},
|
||||
}
|
||||
repo.Status.ObservedGeneration = repo.GetGeneration()
|
||||
},
|
||||
}, {
|
||||
"Failed",
|
||||
command,
|
||||
assertError("failed message"),
|
||||
func(repo *sourcev1.GitRepository) {
|
||||
stalledCondition := metav1.Condition{
|
||||
Type: meta.StalledCondition,
|
||||
Status: metav1.ConditionTrue,
|
||||
Reason: sourcev1.URLInvalidReason,
|
||||
Message: "failed message",
|
||||
ObservedGeneration: repo.GetGeneration(),
|
||||
}
|
||||
apimeta.SetStatusCondition(&repo.Status.Conditions, stalledCondition)
|
||||
newCondition := metav1.Condition{
|
||||
Type: meta.ReadyCondition,
|
||||
Status: metav1.ConditionFalse,
|
||||
|
|
@ -195,6 +204,7 @@ func TestCreateSourceGit(t *testing.T) {
|
|||
ObservedGeneration: repo.GetGeneration(),
|
||||
}
|
||||
apimeta.SetStatusCondition(&repo.Status.Conditions, newCondition)
|
||||
repo.Status.ObservedGeneration = repo.GetGeneration()
|
||||
},
|
||||
}, {
|
||||
"NoArtifact",
|
||||
|
|
@ -210,6 +220,7 @@ func TestCreateSourceGit(t *testing.T) {
|
|||
ObservedGeneration: repo.GetGeneration(),
|
||||
}
|
||||
apimeta.SetStatusCondition(&repo.Status.Conditions, newCondition)
|
||||
repo.Status.ObservedGeneration = repo.GetGeneration()
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
|
|||
|
|
@ -23,7 +23,6 @@ import (
|
|||
"os"
|
||||
|
||||
"github.com/fluxcd/pkg/apis/meta"
|
||||
"github.com/fluxcd/pkg/runtime/conditions"
|
||||
"github.com/spf13/cobra"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
|
|
@ -232,7 +231,7 @@ func createSourceHelmCmdRun(cmd *cobra.Command, args []string) error {
|
|||
|
||||
logger.Waitingf("waiting for HelmRepository source reconciliation")
|
||||
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
|
||||
isHelmRepositoryReady(kubeClient, namespacedName, helmRepository)); err != nil {
|
||||
isObjectReadyConditionFunc(kubeClient, namespacedName, helmRepository)); err != nil {
|
||||
return err
|
||||
}
|
||||
logger.Successf("HelmRepository source reconciliation completed")
|
||||
|
|
@ -279,29 +278,3 @@ func upsertHelmRepository(ctx context.Context, kubeClient client.Client,
|
|||
logger.Successf("source updated")
|
||||
return namespacedName, nil
|
||||
}
|
||||
|
||||
func isHelmRepositoryReady(kubeClient client.Client, namespacedName types.NamespacedName, helmRepository *sourcev1.HelmRepository) wait.ConditionWithContextFunc {
|
||||
return func(ctx context.Context) (bool, error) {
|
||||
err := kubeClient.Get(ctx, namespacedName, helmRepository)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if c := conditions.Get(helmRepository, meta.ReadyCondition); c != nil {
|
||||
// Confirm the Ready condition we are observing is for the
|
||||
// current generation
|
||||
if c.ObservedGeneration != helmRepository.GetGeneration() {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Further check the Status
|
||||
switch c.Status {
|
||||
case metav1.ConditionTrue:
|
||||
return true, nil
|
||||
case metav1.ConditionFalse:
|
||||
return false, fmt.Errorf(c.Message)
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,7 +29,6 @@ import (
|
|||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
"github.com/fluxcd/pkg/apis/meta"
|
||||
"github.com/fluxcd/pkg/runtime/conditions"
|
||||
|
||||
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
|
||||
|
||||
|
|
@ -193,7 +192,7 @@ func createSourceOCIRepositoryCmdRun(cmd *cobra.Command, args []string) error {
|
|||
|
||||
logger.Waitingf("waiting for OCIRepository reconciliation")
|
||||
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
|
||||
isOCIRepositoryReady(kubeClient, namespacedName, repository)); err != nil {
|
||||
isObjectReadyConditionFunc(kubeClient, namespacedName, repository)); err != nil {
|
||||
return err
|
||||
}
|
||||
logger.Successf("OCIRepository reconciliation completed")
|
||||
|
|
@ -235,29 +234,3 @@ func upsertOCIRepository(ctx context.Context, kubeClient client.Client,
|
|||
logger.Successf("OCIRepository updated")
|
||||
return namespacedName, nil
|
||||
}
|
||||
|
||||
func isOCIRepositoryReady(kubeClient client.Client, namespacedName types.NamespacedName, ociRepository *sourcev1.OCIRepository) wait.ConditionWithContextFunc {
|
||||
return func(ctx context.Context) (bool, error) {
|
||||
err := kubeClient.Get(ctx, namespacedName, ociRepository)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if c := conditions.Get(ociRepository, meta.ReadyCondition); c != nil {
|
||||
// Confirm the Ready condition we are observing is for the
|
||||
// current generation
|
||||
if c.ObservedGeneration != ociRepository.GetGeneration() {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Further check the Status
|
||||
switch c.Status {
|
||||
case metav1.ConditionTrue:
|
||||
return true, nil
|
||||
case metav1.ConditionFalse:
|
||||
return false, fmt.Errorf(c.Message)
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -25,10 +25,15 @@ import (
|
|||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/go-logr/logr"
|
||||
"sigs.k8s.io/controller-runtime/pkg/log"
|
||||
|
||||
"github.com/fluxcd/flux2/v2/internal/utils"
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
log.SetLogger(logr.New(log.NullLogSink{}))
|
||||
|
||||
// Ensure tests print consistent timestamps regardless of timezone
|
||||
os.Setenv("TZ", "UTC")
|
||||
|
||||
|
|
|
|||
|
|
@ -22,10 +22,13 @@ package main
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/go-logr/logr"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"sigs.k8s.io/controller-runtime/pkg/log"
|
||||
)
|
||||
|
||||
// The test environment is long running process shared between tests, initialized
|
||||
|
|
@ -34,6 +37,8 @@ import (
|
|||
var testEnv *testEnvKubeManager
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
log.SetLogger(logr.New(log.NullLogSink{}))
|
||||
|
||||
// Ensure tests print consistent timestamps regardless of timezone
|
||||
os.Setenv("TZ", "UTC")
|
||||
|
||||
|
|
|
|||
149
cmd/flux/readiness.go
Normal file
149
cmd/flux/readiness.go
Normal file
|
|
@ -0,0 +1,149 @@
|
|||
/*
|
||||
Copyright 2023 The Flux authors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
kstatus "github.com/fluxcd/cli-utils/pkg/kstatus/status"
|
||||
apimeta "k8s.io/apimachinery/pkg/api/meta"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
"github.com/fluxcd/pkg/apis/meta"
|
||||
"github.com/fluxcd/pkg/runtime/object"
|
||||
"github.com/fluxcd/pkg/runtime/patch"
|
||||
)
|
||||
|
||||
// objectStatusType is the type of object in terms of status when computing the
|
||||
// readiness of an object. Readiness check method depends on the type of object.
|
||||
// For a dynamic object, Ready status condition is considered only for the
|
||||
// latest generation of the object. For a static object that don't have any
|
||||
// condition, the object generation is not considered.
|
||||
type objectStatusType int
|
||||
|
||||
const (
|
||||
objectStatusDynamic objectStatusType = iota
|
||||
objectStatusStatic
|
||||
)
|
||||
|
||||
// isObjectReady determines if an object is ready using the kstatus.Compute()
|
||||
// result. statusType helps differenciate between static and dynamic objects to
|
||||
// accurately check the object's readiness. A dynamic object may have some extra
|
||||
// considerations depending on the object.
|
||||
func isObjectReady(obj client.Object, statusType objectStatusType) (bool, error) {
|
||||
observedGen, err := object.GetStatusObservedGeneration(obj)
|
||||
if err != nil && err != object.ErrObservedGenerationNotFound {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if statusType == objectStatusDynamic {
|
||||
// Object not reconciled yet.
|
||||
if observedGen < 1 {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
cobj, ok := obj.(meta.ObjectWithConditions)
|
||||
if !ok {
|
||||
return false, fmt.Errorf("unable to get conditions from object")
|
||||
}
|
||||
|
||||
if c := apimeta.FindStatusCondition(cobj.GetConditions(), meta.ReadyCondition); c != nil {
|
||||
// Ensure that the ready condition is for the latest generation of
|
||||
// the object.
|
||||
// NOTE: Some APIs like ImageUpdateAutomation and HelmRelease don't
|
||||
// support per condition observed generation yet. Per condition
|
||||
// observed generation for them are always zero.
|
||||
// There are two strategies used across different object kinds to
|
||||
// check the latest ready condition:
|
||||
// - check that the ready condition's generation matches the
|
||||
// object's generation.
|
||||
// - check that the observed generation of the object in the
|
||||
// status matches the object's generation.
|
||||
//
|
||||
// TODO: Once ImageUpdateAutomation and HelmRelease APIs have per
|
||||
// condition observed generation, remove the object's observed
|
||||
// generation and object's generation check (the second condition
|
||||
// below). Also, try replacing this readiness check function with
|
||||
// fluxcd/pkg/ssa's ResourceManager.Wait(), which uses kstatus
|
||||
// internally to check readiness of the objects.
|
||||
if c.ObservedGeneration != 0 && c.ObservedGeneration != obj.GetGeneration() {
|
||||
return false, nil
|
||||
}
|
||||
if c.ObservedGeneration == 0 && observedGen != obj.GetGeneration() {
|
||||
return false, nil
|
||||
}
|
||||
} else {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
u, err := patch.ToUnstructured(obj)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
result, err := kstatus.Compute(u)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
switch result.Status {
|
||||
case kstatus.CurrentStatus:
|
||||
return true, nil
|
||||
case kstatus.InProgressStatus:
|
||||
return false, nil
|
||||
default:
|
||||
return false, fmt.Errorf(result.Message)
|
||||
}
|
||||
}
|
||||
|
||||
// isObjectReadyConditionFunc returns a wait.ConditionFunc to be used with
|
||||
// wait.Poll* while polling for an object with dynamic status to be ready.
|
||||
func isObjectReadyConditionFunc(kubeClient client.Client, namespaceName types.NamespacedName, obj client.Object) wait.ConditionWithContextFunc {
|
||||
return func(ctx context.Context) (bool, error) {
|
||||
err := kubeClient.Get(ctx, namespaceName, obj)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return isObjectReady(obj, objectStatusDynamic)
|
||||
}
|
||||
}
|
||||
|
||||
// isStaticObjectReadyConditionFunc returns a wait.ConditionFunc to be used with
|
||||
// wait.Poll* while polling for an object with static or no status to be
|
||||
// ready.
|
||||
func isStaticObjectReadyConditionFunc(kubeClient client.Client, namespaceName types.NamespacedName, obj client.Object) wait.ConditionWithContextFunc {
|
||||
return func(ctx context.Context) (bool, error) {
|
||||
err := kubeClient.Get(ctx, namespaceName, obj)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return isObjectReady(obj, objectStatusStatic)
|
||||
}
|
||||
}
|
||||
|
||||
// kstatusCompute returns the kstatus computed result of a given object.
|
||||
func kstatusCompute(obj client.Object) (result *kstatus.Result, err error) {
|
||||
u, err := patch.ToUnstructured(obj)
|
||||
if err != nil {
|
||||
return result, err
|
||||
}
|
||||
return kstatus.Compute(u)
|
||||
}
|
||||
139
cmd/flux/readiness_test.go
Normal file
139
cmd/flux/readiness_test.go
Normal file
|
|
@ -0,0 +1,139 @@
|
|||
/*
|
||||
Copyright 2023 The Flux authors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
notificationv1 "github.com/fluxcd/notification-controller/api/v1beta3"
|
||||
"github.com/fluxcd/pkg/apis/meta"
|
||||
"github.com/fluxcd/pkg/runtime/conditions"
|
||||
sourcev1 "github.com/fluxcd/source-controller/api/v1"
|
||||
)
|
||||
|
||||
func Test_isObjectReady(t *testing.T) {
|
||||
// Ready object.
|
||||
readyObj := &sourcev1.GitRepository{}
|
||||
readyObj.Generation = 1
|
||||
readyObj.Status.ObservedGeneration = 1
|
||||
conditions.MarkTrue(readyObj, meta.ReadyCondition, "foo1", "bar1")
|
||||
|
||||
// Not ready object.
|
||||
notReadyObj := readyObj.DeepCopy()
|
||||
conditions.MarkFalse(notReadyObj, meta.ReadyCondition, "foo2", "bar2")
|
||||
|
||||
// Not reconciled object.
|
||||
notReconciledObj := readyObj.DeepCopy()
|
||||
notReconciledObj.Status = sourcev1.GitRepositoryStatus{ObservedGeneration: -1}
|
||||
|
||||
// No condition.
|
||||
noConditionObj := readyObj.DeepCopy()
|
||||
noConditionObj.Status = sourcev1.GitRepositoryStatus{ObservedGeneration: 1}
|
||||
|
||||
// Outdated condition.
|
||||
readyObjOutdated := readyObj.DeepCopy()
|
||||
readyObjOutdated.Generation = 2
|
||||
|
||||
// Object without per condition observed generation.
|
||||
oldObj := readyObj.DeepCopy()
|
||||
readyTrueCondn := conditions.TrueCondition(meta.ReadyCondition, "foo3", "bar3")
|
||||
oldObj.Status.Conditions = []metav1.Condition{*readyTrueCondn}
|
||||
|
||||
// Outdated object without per condition observed generation.
|
||||
oldObjOutdated := oldObj.DeepCopy()
|
||||
oldObjOutdated.Generation = 2
|
||||
|
||||
// Empty status object.
|
||||
staticObj := readyObj.DeepCopy()
|
||||
staticObj.Status = sourcev1.GitRepositoryStatus{}
|
||||
|
||||
// No status object.
|
||||
noStatusObj := ¬ificationv1.Provider{}
|
||||
noStatusObj.Generation = 1
|
||||
|
||||
type args struct {
|
||||
obj client.Object
|
||||
statusType objectStatusType
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
want bool
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "dynamic ready",
|
||||
args: args{obj: readyObj, statusType: objectStatusDynamic},
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "dynamic not ready",
|
||||
args: args{obj: notReadyObj, statusType: objectStatusDynamic},
|
||||
want: false,
|
||||
},
|
||||
{
|
||||
name: "dynamic not reconciled",
|
||||
args: args{obj: notReconciledObj, statusType: objectStatusDynamic},
|
||||
want: false,
|
||||
},
|
||||
{
|
||||
name: "dynamic not condition",
|
||||
args: args{obj: noConditionObj, statusType: objectStatusDynamic},
|
||||
want: false,
|
||||
},
|
||||
{
|
||||
name: "dynamic ready outdated",
|
||||
args: args{obj: readyObjOutdated, statusType: objectStatusDynamic},
|
||||
want: false,
|
||||
},
|
||||
{
|
||||
name: "dynamic ready without per condition gen",
|
||||
args: args{obj: oldObj, statusType: objectStatusDynamic},
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "dynamic outdated ready status without per condition gen",
|
||||
args: args{obj: oldObjOutdated, statusType: objectStatusDynamic},
|
||||
want: false,
|
||||
},
|
||||
{
|
||||
name: "static empty status",
|
||||
args: args{obj: staticObj, statusType: objectStatusStatic},
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "static no status",
|
||||
args: args{obj: noStatusObj, statusType: objectStatusStatic},
|
||||
want: true,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got, err := isObjectReady(tt.args.obj, tt.args.statusType)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("isObjectReady() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
if got != tt.want {
|
||||
t.Errorf("isObjectReady() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -21,6 +21,7 @@ import (
|
|||
"fmt"
|
||||
"time"
|
||||
|
||||
kstatus "github.com/fluxcd/cli-utils/pkg/kstatus/status"
|
||||
"github.com/spf13/cobra"
|
||||
apimeta "k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
|
@ -30,8 +31,6 @@ import (
|
|||
"k8s.io/client-go/util/retry"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
notificationv1 "github.com/fluxcd/notification-controller/api/v1"
|
||||
notificationv1b2 "github.com/fluxcd/notification-controller/api/v1beta2"
|
||||
"github.com/fluxcd/pkg/apis/meta"
|
||||
|
||||
"github.com/fluxcd/flux2/v2/internal/utils"
|
||||
|
|
@ -112,16 +111,6 @@ func (reconcile reconcileCommand) run(cmd *cobra.Command, args []string) error {
|
|||
}
|
||||
logger.Successf("%s annotated", reconcile.kind)
|
||||
|
||||
if reconcile.kind == notificationv1b2.AlertKind || reconcile.kind == notificationv1.ReceiverKind {
|
||||
if err = wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
|
||||
isReconcileReady(kubeClient, namespacedName, reconcile.object)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
logger.Successf(reconcile.object.successMessage())
|
||||
return nil
|
||||
}
|
||||
|
||||
lastHandledReconcileAt := reconcile.object.lastHandledReconcileRequest()
|
||||
logger.Waitingf("waiting for %s reconciliation", reconcile.kind)
|
||||
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
|
||||
|
|
@ -146,9 +135,17 @@ func reconciliationHandled(kubeClient client.Client, namespacedName types.Namesp
|
|||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
isProgressing := apimeta.IsStatusConditionPresentAndEqual(reconcilableConditions(obj),
|
||||
meta.ReadyCondition, metav1.ConditionUnknown)
|
||||
return obj.lastHandledReconcileRequest() != lastHandledReconcileAt && !isProgressing, nil
|
||||
|
||||
if obj.lastHandledReconcileRequest() == lastHandledReconcileAt {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
result, err := kstatusCompute(obj.asClientObject())
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return result.Status == kstatus.CurrentStatus, nil
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -174,22 +171,3 @@ func requestReconciliation(ctx context.Context, kubeClient client.Client,
|
|||
return kubeClient.Patch(ctx, object, patch)
|
||||
})
|
||||
}
|
||||
|
||||
func isReconcileReady(kubeClient client.Client, namespacedName types.NamespacedName, obj reconcilable) wait.ConditionWithContextFunc {
|
||||
return func(ctx context.Context) (bool, error) {
|
||||
err := kubeClient.Get(ctx, namespacedName, obj.asClientObject())
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if c := apimeta.FindStatusCondition(reconcilableConditions(obj), meta.ReadyCondition); c != nil {
|
||||
switch c.Status {
|
||||
case metav1.ConditionTrue:
|
||||
return true, nil
|
||||
case metav1.ConditionFalse:
|
||||
return false, fmt.Errorf(c.Message)
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -40,5 +40,5 @@ func init() {
|
|||
}
|
||||
|
||||
func (obj alertAdapter) lastHandledReconcileRequest() string {
|
||||
return ""
|
||||
return obj.Status.GetLastHandledReconcileRequest()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,18 +17,9 @@ limitations under the License.
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
|
||||
notificationv1 "github.com/fluxcd/notification-controller/api/v1beta2"
|
||||
"github.com/fluxcd/pkg/apis/meta"
|
||||
|
||||
"github.com/fluxcd/flux2/v2/internal/utils"
|
||||
)
|
||||
|
||||
var reconcileAlertProviderCmd = &cobra.Command{
|
||||
|
|
@ -38,56 +29,16 @@ var reconcileAlertProviderCmd = &cobra.Command{
|
|||
Example: ` # Trigger a reconciliation for an existing provider
|
||||
flux reconcile alert-provider slack`,
|
||||
ValidArgsFunction: resourceNamesCompletionFunc(notificationv1.GroupVersion.WithKind(notificationv1.ProviderKind)),
|
||||
RunE: reconcileAlertProviderCmdRun,
|
||||
RunE: reconcileCommand{
|
||||
apiType: alertProviderType,
|
||||
object: alertProviderAdapter{¬ificationv1.Provider{}},
|
||||
}.run,
|
||||
}
|
||||
|
||||
func init() {
|
||||
reconcileCmd.AddCommand(reconcileAlertProviderCmd)
|
||||
}
|
||||
|
||||
func reconcileAlertProviderCmdRun(cmd *cobra.Command, args []string) error {
|
||||
if len(args) < 1 {
|
||||
return fmt.Errorf("Provider name is required")
|
||||
}
|
||||
name := args[0]
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), rootArgs.timeout)
|
||||
defer cancel()
|
||||
|
||||
kubeClient, err := utils.KubeClient(kubeconfigArgs, kubeclientOptions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
namespacedName := types.NamespacedName{
|
||||
Namespace: *kubeconfigArgs.Namespace,
|
||||
Name: name,
|
||||
}
|
||||
|
||||
logger.Actionf("annotating Provider %s in %s namespace", name, *kubeconfigArgs.Namespace)
|
||||
var alertProvider notificationv1.Provider
|
||||
err = kubeClient.Get(ctx, namespacedName, &alertProvider)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if alertProvider.Annotations == nil {
|
||||
alertProvider.Annotations = map[string]string{
|
||||
meta.ReconcileRequestAnnotation: time.Now().Format(time.RFC3339Nano),
|
||||
}
|
||||
} else {
|
||||
alertProvider.Annotations[meta.ReconcileRequestAnnotation] = time.Now().Format(time.RFC3339Nano)
|
||||
}
|
||||
if err := kubeClient.Update(ctx, &alertProvider); err != nil {
|
||||
return err
|
||||
}
|
||||
logger.Successf("Provider annotated")
|
||||
|
||||
logger.Waitingf("waiting for reconciliation")
|
||||
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
|
||||
isAlertProviderReady(kubeClient, namespacedName, &alertProvider)); err != nil {
|
||||
return err
|
||||
}
|
||||
logger.Successf("Provider reconciliation completed")
|
||||
return nil
|
||||
func (obj alertProviderAdapter) lastHandledReconcileRequest() string {
|
||||
return obj.Status.GetLastHandledReconcileRequest()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,18 +17,9 @@ limitations under the License.
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
|
||||
notificationv1 "github.com/fluxcd/notification-controller/api/v1"
|
||||
"github.com/fluxcd/pkg/apis/meta"
|
||||
|
||||
"github.com/fluxcd/flux2/v2/internal/utils"
|
||||
)
|
||||
|
||||
var reconcileReceiverCmd = &cobra.Command{
|
||||
|
|
@ -38,62 +29,16 @@ var reconcileReceiverCmd = &cobra.Command{
|
|||
Example: ` # Trigger a reconciliation for an existing receiver
|
||||
flux reconcile receiver main`,
|
||||
ValidArgsFunction: resourceNamesCompletionFunc(notificationv1.GroupVersion.WithKind(notificationv1.ReceiverKind)),
|
||||
RunE: reconcileReceiverCmdRun,
|
||||
RunE: reconcileCommand{
|
||||
apiType: receiverType,
|
||||
object: receiverAdapter{¬ificationv1.Receiver{}},
|
||||
}.run,
|
||||
}
|
||||
|
||||
func init() {
|
||||
reconcileCmd.AddCommand(reconcileReceiverCmd)
|
||||
}
|
||||
|
||||
func reconcileReceiverCmdRun(cmd *cobra.Command, args []string) error {
|
||||
if len(args) < 1 {
|
||||
return fmt.Errorf("receiver name is required")
|
||||
}
|
||||
name := args[0]
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), rootArgs.timeout)
|
||||
defer cancel()
|
||||
|
||||
kubeClient, err := utils.KubeClient(kubeconfigArgs, kubeclientOptions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
namespacedName := types.NamespacedName{
|
||||
Namespace: *kubeconfigArgs.Namespace,
|
||||
Name: name,
|
||||
}
|
||||
|
||||
var receiver notificationv1.Receiver
|
||||
err = kubeClient.Get(ctx, namespacedName, &receiver)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if receiver.Spec.Suspend {
|
||||
return fmt.Errorf("resource is suspended")
|
||||
}
|
||||
|
||||
logger.Actionf("annotating Receiver %s in %s namespace", name, *kubeconfigArgs.Namespace)
|
||||
if receiver.Annotations == nil {
|
||||
receiver.Annotations = map[string]string{
|
||||
meta.ReconcileRequestAnnotation: time.Now().Format(time.RFC3339Nano),
|
||||
}
|
||||
} else {
|
||||
receiver.Annotations[meta.ReconcileRequestAnnotation] = time.Now().Format(time.RFC3339Nano)
|
||||
}
|
||||
if err := kubeClient.Update(ctx, &receiver); err != nil {
|
||||
return err
|
||||
}
|
||||
logger.Successf("Receiver annotated")
|
||||
|
||||
logger.Waitingf("waiting for Receiver reconciliation")
|
||||
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
|
||||
isReceiverReady(kubeClient, namespacedName, &receiver)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
logger.Successf("Receiver reconciliation completed")
|
||||
|
||||
return nil
|
||||
func (obj receiverAdapter) lastHandledReconcileRequest() string {
|
||||
return obj.Status.GetLastHandledReconcileRequest()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -213,7 +213,7 @@ func (resume resumeCommand) reconcile(ctx context.Context, res resumable) reconc
|
|||
logger.Waitingf("waiting for %s reconciliation", resume.kind)
|
||||
|
||||
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
|
||||
isReady(resume.client, namespacedName, res)); err != nil {
|
||||
isObjectReadyConditionFunc(resume.client, namespacedName, res.asClientObject())); err != nil {
|
||||
return reconcileResponse{
|
||||
resumable: res,
|
||||
err: err,
|
||||
|
|
|
|||
60
cmd/flux/resume_alertprovider.go
Normal file
60
cmd/flux/resume_alertprovider.go
Normal file
|
|
@ -0,0 +1,60 @@
|
|||
/*
|
||||
Copyright 2023 The Flux authors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/spf13/cobra"
|
||||
|
||||
notificationv1 "github.com/fluxcd/notification-controller/api/v1beta2"
|
||||
)
|
||||
|
||||
var resumeAlertProviderCmd = &cobra.Command{
|
||||
Use: "alert-provider [name]",
|
||||
Short: "Resume a suspended Provider",
|
||||
Long: `The resume command marks a previously suspended Provider resource for reconciliation and waits for it to
|
||||
finish the apply.`,
|
||||
Example: ` # Resume reconciliation for an existing Provider
|
||||
flux resume alert-provider main
|
||||
|
||||
# Resume reconciliation for multiple Providers
|
||||
flux resume alert-provider main-1 main-2`,
|
||||
ValidArgsFunction: resourceNamesCompletionFunc(notificationv1.GroupVersion.WithKind(notificationv1.ProviderKind)),
|
||||
RunE: resumeCommand{
|
||||
apiType: alertProviderType,
|
||||
list: &alertProviderListAdapter{¬ificationv1.ProviderList{}},
|
||||
}.run,
|
||||
}
|
||||
|
||||
func init() {
|
||||
resumeCmd.AddCommand(resumeAlertProviderCmd)
|
||||
}
|
||||
|
||||
func (obj alertProviderAdapter) getObservedGeneration() int64 {
|
||||
return obj.Provider.Status.ObservedGeneration
|
||||
}
|
||||
|
||||
func (obj alertProviderAdapter) setUnsuspended() {
|
||||
obj.Provider.Spec.Suspend = false
|
||||
}
|
||||
|
||||
func (obj alertProviderAdapter) successMessage() string {
|
||||
return "Provider reconciliation completed"
|
||||
}
|
||||
|
||||
func (a alertProviderListAdapter) resumeItem(i int) resumable {
|
||||
return &alertProviderAdapter{&a.ProviderList.Items[i]}
|
||||
}
|
||||
|
|
@ -17,18 +17,9 @@ limitations under the License.
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
apimeta "k8s.io/apimachinery/pkg/api/meta"
|
||||
"github.com/fluxcd/cli-utils/pkg/object"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
"github.com/fluxcd/cli-utils/pkg/object"
|
||||
"github.com/fluxcd/pkg/apis/meta"
|
||||
)
|
||||
|
||||
// statusable is used to see if a resource is considered ready in the usual way
|
||||
|
|
@ -45,42 +36,6 @@ type oldConditions interface {
|
|||
GetStatusConditions() *[]metav1.Condition
|
||||
}
|
||||
|
||||
func statusableConditions(object statusable) []metav1.Condition {
|
||||
if s, ok := object.(meta.ObjectWithConditions); ok {
|
||||
return s.GetConditions()
|
||||
}
|
||||
|
||||
if s, ok := object.(oldConditions); ok {
|
||||
return *s.GetStatusConditions()
|
||||
}
|
||||
|
||||
return []metav1.Condition{}
|
||||
}
|
||||
|
||||
func isReady(kubeClient client.Client, namespacedName types.NamespacedName, object statusable) wait.ConditionWithContextFunc {
|
||||
return func(ctx context.Context) (bool, error) {
|
||||
err := kubeClient.Get(ctx, namespacedName, object.asClientObject())
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// Confirm the state we are observing is for the current generation
|
||||
if object.GetGeneration() != object.getObservedGeneration() {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if c := apimeta.FindStatusCondition(statusableConditions(object), meta.ReadyCondition); c != nil {
|
||||
switch c.Status {
|
||||
case metav1.ConditionTrue:
|
||||
return true, nil
|
||||
case metav1.ConditionFalse:
|
||||
return false, fmt.Errorf(c.Message)
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
func buildComponentObjectRefs(components ...string) ([]object.ObjMetadata, error) {
|
||||
var objRefs []object.ObjMetadata
|
||||
for _, deployment := range components {
|
||||
|
|
|
|||
56
cmd/flux/suspend_alertprovider.go
Normal file
56
cmd/flux/suspend_alertprovider.go
Normal file
|
|
@ -0,0 +1,56 @@
|
|||
/*
|
||||
Copyright 2023 The Flux authors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/spf13/cobra"
|
||||
|
||||
notificationv1 "github.com/fluxcd/notification-controller/api/v1beta2"
|
||||
)
|
||||
|
||||
var suspendAlertProviderCmd = &cobra.Command{
|
||||
Use: "alert-provider [name]",
|
||||
Short: "Suspend reconciliation of Provider",
|
||||
Long: `The suspend command disables the reconciliation of a Provider resource.`,
|
||||
Example: ` # Suspend reconciliation for an existing Provider
|
||||
flux suspend alert-provider main
|
||||
|
||||
# Suspend reconciliation for multiple Providers
|
||||
flux suspend alert-providers main-1 main-2`,
|
||||
ValidArgsFunction: resourceNamesCompletionFunc(notificationv1.GroupVersion.WithKind(notificationv1.ProviderKind)),
|
||||
RunE: suspendCommand{
|
||||
apiType: alertProviderType,
|
||||
object: &alertProviderAdapter{¬ificationv1.Provider{}},
|
||||
list: &alertProviderListAdapter{¬ificationv1.ProviderList{}},
|
||||
}.run,
|
||||
}
|
||||
|
||||
func init() {
|
||||
suspendCmd.AddCommand(suspendAlertProviderCmd)
|
||||
}
|
||||
|
||||
func (obj alertProviderAdapter) isSuspended() bool {
|
||||
return obj.Provider.Spec.Suspend
|
||||
}
|
||||
|
||||
func (obj alertProviderAdapter) setSuspended() {
|
||||
obj.Provider.Spec.Suspend = true
|
||||
}
|
||||
|
||||
func (a alertProviderListAdapter) item(i int) suspendable {
|
||||
return &alertProviderAdapter{&a.ProviderList.Items[i]}
|
||||
}
|
||||
Loading…
Reference in a new issue