Implementing a periodic Kubernetes controller

Recently I was looking for a way to implement a Kubernetes controller which is triggered on regular basis, e.g. it reconciles whatever objects it is concerned about every Nth seconds.

Controllers in Kubernetes work by watching resources of a given kind and reacting to events when something happens with these objects. The job of the controller is to monitor the object and potentially perform changes to it, in order to get the object to the desired state based on the specification of the object itself. This process is known as reconciling.

The events which would trigger the controller to reconcile an object are Create, Update or Delete.

But, what if you don’t want to reconcile on Create, Update or Delete events? What if you want to reconcile your objects on periodic intervals instead?

Example use case for a periodic controller would be something along the lines of a custom autoscaler, which periodically checks some metrics and takes appropriate actions for your custom resources.

If that’s what you are looking for, then read on. Also, you can find the code from this post in the dnaeon/kubernetes-periodic-controller repo.

This post documents my learning process while implementing a periodic reconciler bootstrapped with kubebuilder. It will most likely serve my future self only, but hopefully it helps others as well.

These days it seems to be a common practice that new controllers are bootstrapped using either kubebuilder or operator-sdk. Both projects are great and come with really good documentation, but after looking through the documentation I was not able to find what I was looking for.

The rest of this post talks about how to develop a periodic Kubernetes controller, which will be bootstrapped using kubebuilder. The steps taken to bootstrap the controller should work fine with operator-sdk as well, since it is based on kubebuilder anyways, and the commands are pretty much the same.

First, lets create a new kubebuilder project.

kubebuilder init --domain dnaeon.github.io --repo github.com/dnaeon/kubernetes-periodic-controller

You should an output similar to the one below.

INFO Writing kustomize manifests for you to edit...
INFO Writing scaffold for you to edit...
INFO Get controller runtime:
$ go get sigs.k8s.io/controller-runtime@v0.17.0
go: downloading sigs.k8s.io/controller-runtime v0.17.0
go: downloading k8s.io/apimachinery v0.29.0
go: downloading k8s.io/utils v0.0.0-20230726121419-3b25d923346b
go: downloading k8s.io/client-go v0.29.0
go: downloading github.com/go-logr/logr v1.4.1
go: downloading k8s.io/api v0.29.0
go: downloading k8s.io/component-base v0.29.0
go: downloading github.com/evanphx/json-patch/v5 v5.8.0
go: downloading k8s.io/apiextensions-apiserver v0.29.0
go: downloading github.com/prometheus/client_golang v1.18.0
go: downloading k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00
go: downloading github.com/fsnotify/fsnotify v1.7.0
go: downloading golang.org/x/oauth2 v0.12.0
go: downloading golang.org/x/sys v0.16.0
go: downloading github.com/prometheus/client_model v0.5.0
go: downloading github.com/prometheus/common v0.45.0
go: downloading github.com/prometheus/procfs v0.12.0
go: downloading github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0
INFO Update dependencies:
$ go mod tidy
go: downloading github.com/onsi/gomega v1.30.0
go: downloading github.com/onsi/ginkgo/v2 v2.14.0
go: downloading github.com/go-logr/zapr v1.3.0
go: downloading go.uber.org/goleak v1.3.0
go: downloading golang.org/x/tools v0.16.1
Next: define a resource with:
$ kubebuilder create api

Now, we will generate the boilerplate code for our controller. In order to keep things simple we will not be introducing a custom resource here, but instead our controller will be reconciling Pod objects.

kubebuilder create api --controller --resource=false --group core --version v1 --kind Pod

You should a similar output.

INFO Writing kustomize manifests for you to edit...
INFO Writing scaffold for you to edit...
INFO internal/controller/suite_test.go
INFO internal/controller/pod_controller.go
INFO internal/controller/pod_controller_test.go
INFO Update dependencies:
$ go mod tidy

And with that we have successfully bootstrapped our controller. Now it’s time to get to the interesting stuff.

The first thing we are going to do is to add a new command-line flag, which will be used to specify the interval at which our controller should be reconciling. In order to do that open cmd/main.go. In the beginning of main() you will see a bunch of already existing flags, which kubebuilder generated for us. We will be adding our new flag next to the existing ones. The diff below shows the changes we need to do in cmd/main.go.

diff --git a/cmd/main.go b/cmd/main.go
index a790ba1..114fec8 100644
--- a/cmd/main.go
+++ b/cmd/main.go
@@ -20,6 +20,7 @@ import (
        "crypto/tls"
        "flag"
        "os"
+       "time"

        // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
        // to ensure that exec-entrypoint and run can make use of them.
@@ -55,6 +56,8 @@ func main() {
        var probeAddr string
        var secureMetrics bool
        var enableHTTP2 bool
+       var interval time.Duration
+
        flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
        flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
        flag.BoolVar(&enableLeaderElection, "leader-elect", false,
@@ -64,6 +67,7 @@ func main() {
                "If set the metrics endpoint is served securely")
        flag.BoolVar(&enableHTTP2, "enable-http2", false,
                "If set, HTTP/2 will be enabled for the metrics and webhook servers")
+       flag.DurationVar(&interval, "interval", 30*time.Second, "The interval at which to run the periodic check")
        opts := zap.Options{
                Development: true,
        }

Before we move on it’s worth saying a few more words about the periodic controller implementation. As mentioned previously the controllers usually reconcile objects on Create, Update and Delete events. If you look in the internal/controller/pod_controller.go file you will this method for our reconciler.

// SetupWithManager sets up the controller with the Manager.
func (r *PodReconciler) SetupWithManager(mgr ctrl.Manager) error {
	return ctrl.NewControllerManagedBy(mgr).
		For(&corev1.Pod{}).
		Complete(r)
}

What this method does is to register the controller with the manager and configure it, so that it reacts on events on the corev1.Pod{} object. This is how watching resources is configured. If our controller owns other resources, it would also register them here, so that the reconcile loop is executed whenever we have an event for an owned resource.

Registering owned resources is done using the Builder.Owns() method, and controlled resources are registered (once only) using the Builder.For() method. Now, if you check the documentation for these methods you will see that they can be replaced with calls to Builder.Watches(), instead.

There is however another way to configure watching for resources, and that is by using the Builder.WatchesRawSource(), which uses a source.Source implementation, and source.Source is what allows us to hook into the Kubernetes event stream, so that we can generate events, which in turn would cause our controller to start the reconciliation process.

If you check the sigs.k8s.io/controller-runtime/pkg/source package you will notice that there are multiple implementations of source.Source, and the one in particular which we’ll be using in this post is source.Channel.

This is what we are going to use in order to implement a periodic runner, which will check every Nth seconds (minutes, hours or days, etc.), whether we have some objects for reconciling, and if yes, then it will emit events for our controller.

Another thing that is worth mentioning here, and this one is related around efficiency and overall performance, is that probably we do not want to reconcile every single Pod in our cluster. Instead, we will reconcile only select pods, which contain our own special annotation.

So, let’s create a new package for our annotations. Create the file internal/annotation/annotation.go with the following contents.

package annotation

// ReconcileMe is an annotation which tells the controller to reconcile the pod,
// if it is annotated with it.
const ReconcileMe = "dnaeon.github.io/reconcile-me"

The next thing we are going to do is to add an index, so that we can easily lookup the Pods which contain our custom annotation. This is done using a FieldIndexer.

Create the internal/index/index.go file with the following contents.

package index

import (
	"github.com/dnaeon/kubernetes-periodic-controller/internal/annotation"

	corev1 "k8s.io/api/core/v1"
	"sigs.k8s.io/controller-runtime/pkg/client"
)

// Key is the index key we add to the Pods.
const Key = "dnaeon.github.io/pod_idx"

// IndexerFunc is a [sigs.k8s.io/controller-runtime/pkg/client.IndexerFunc],
// which knows how to extract values for index [Key].
func IndexerFunc(rawObj client.Object) []string {
	obj, ok := rawObj.(*corev1.Pod)
	if !ok {
		return []string{}
	}

	value, exists := obj.Annotations[annotation.ReconcileMe]
	if !exists {
		return []string{}
	}

	return []string{value}
}

Notice the IndexerFunc from the code above. This function will be used to extract the values for the index key from our Pods. This function essentially is a sigs.k8s.io/controller-runtime/pkg/client.IndexerFunc.

Okay, time to create our index. Open cmd/main.go and register the index with our manager. The diff below shows the changes we need to make.

diff --git a/cmd/main.go b/cmd/main.go
index 114fec8..292af04 100644
--- a/cmd/main.go
+++ b/cmd/main.go
@@ -26,6 +26,7 @@ import (
        // to ensure that exec-entrypoint and run can make use of them.
        _ "k8s.io/client-go/plugin/pkg/client/auth"

+       corev1 "k8s.io/api/core/v1"
        "k8s.io/apimachinery/pkg/runtime"
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
        clientgoscheme "k8s.io/client-go/kubernetes/scheme"
@@ -36,6 +37,7 @@ import (
        "sigs.k8s.io/controller-runtime/pkg/webhook"

        "github.com/dnaeon/kubernetes-periodic-controller/internal/controller"
+       "github.com/dnaeon/kubernetes-periodic-controller/internal/index"
        //+kubebuilder:scaffold:imports
 )

@@ -124,6 +126,14 @@ func main() {
                os.Exit(1)
        }

+       ctx := ctrl.SetupSignalHandler()
+
+       // Create our index
+       if err := mgr.GetFieldIndexer().IndexField(ctx, &corev1.Pod{}, index.Key, index.IndexerFunc); err != nil {
+               setupLog.Error(err, "unable to create index", "controller", "Pod")
+               os.Exit(1)
+       }
+
        if err = (&controller.PodReconciler{
                Client: mgr.GetClient(),
                Scheme: mgr.GetScheme(),
@@ -143,7 +153,7 @@ func main() {
        }

        setupLog.Info("starting manager")
-       if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
+       if err := mgr.Start(ctx); err != nil {
                setupLog.Error(err, "problem running manager")
                os.Exit(1)
        }

It’s time to implement our periodic runner. Remember that our periodic runner will be using a sigs.k8s.io/controller-runtime/pkg/source.Channel to hook into the Kubernetes event stream? A source.Channel uses a channel over which sigs.k8s.io/controller-runtime/pkg/event.GenericEvent events are being sent to. When coupled with a proper EventHandler these generic events get transformed into a sigs.k8s.io/controller-runtime/pkg/reconcile.Request, which is what our controller operates on.

Create the internal/periodic/periodic.go file with the following contents.

package periodic

import (
	"context"
	"time"

	"github.com/dnaeon/kubernetes-periodic-controller/internal/index"

	corev1 "k8s.io/api/core/v1"
	"sigs.k8s.io/controller-runtime/pkg/client"
	"sigs.k8s.io/controller-runtime/pkg/event"
	"sigs.k8s.io/controller-runtime/pkg/log"
)

// Runner is a periodic runner which enqueues Pods for reconciliation on regular
// intervals.
type Runner struct {
	client   client.Client
	interval time.Duration
	eventCh  chan event.GenericEvent
}

// Option is a function which configures the [Runner].
type Option func(c *Runner) error

// New creates a new periodic runner and configures it using the provided
// options.
func New(opts ...Option) (*Runner, error) {
	r := &Runner{}
	for _, opt := range opts {
		if err := opt(r); err != nil {
			return nil, err
		}
	}

	return r, nil
}

// WithClient configures the [Runner] with the given client.
func WithClient(c client.Client) Option {
	opt := func(r *Runner) error {
		r.client = c
		return nil
	}

	return opt
}

// WithInterval configures the [Runner] with the given interval.
func WithInterval(interval time.Duration) Option {
	opt := func(r *Runner) error {
		r.interval = interval
		return nil
	}

	return opt
}

// WithEventChannel configures the [Runner] to use the given channel for
// enqueuing.
func WithEventChannel(ch chan event.GenericEvent) Option {
	opt := func(r *Runner) error {
		r.eventCh = ch
		return nil
	}

	return opt
}

// Start implements the
// [sigs.k8s.io/controller-runtime/pkg/manager.Runnable] interface.
func (r *Runner) Start(ctx context.Context) error {
	ticker := time.NewTicker(r.interval)
	logger := log.FromContext(ctx)
	defer ticker.Stop()
	defer close(r.eventCh)

	for {
		select {
		case <-ticker.C:
			if err := r.enqueuePods(ctx); err != nil {
				logger.Error(err, "failed to enqueue pods")
			}
		case <-ctx.Done():
			return nil
		}
	}
}

// enqueuePods enqueues the Pods which are properly annotated
func (r *Runner) enqueuePods(ctx context.Context) error {
	var items corev1.PodList
	opts := client.MatchingFields{index.Key: "true"}
	if err := r.client.List(ctx, &items, opts); err != nil {
		return err
	}

	for _, item := range items.Items {
		event := event.GenericEvent{
			Object: &item,
		}
		r.eventCh <- event
	}

	return nil
}

Notice how in the enqueuePods method we are filtering out the pods by the index key we’ve previously created. Also worth mentioning here is that our Runner implementation implements the sigs.k8s.io/controller-runtime/pkg/manager.Runnable interface.

We will now register this periodic runner with our manager. Open cmd/main.go and apply the following changes.

diff --git a/cmd/main.go b/cmd/main.go
index 292af04..18d2b38 100644
--- a/cmd/main.go
+++ b/cmd/main.go
@@ -31,6 +31,7 @@ import (
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
        clientgoscheme "k8s.io/client-go/kubernetes/scheme"
        ctrl "sigs.k8s.io/controller-runtime"
+       "sigs.k8s.io/controller-runtime/pkg/event"
        "sigs.k8s.io/controller-runtime/pkg/healthz"
        "sigs.k8s.io/controller-runtime/pkg/log/zap"
        metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
@@ -38,6 +39,7 @@ import (

        "github.com/dnaeon/kubernetes-periodic-controller/internal/controller"
        "github.com/dnaeon/kubernetes-periodic-controller/internal/index"
+       "github.com/dnaeon/kubernetes-periodic-controller/internal/periodic"
        //+kubebuilder:scaffold:imports
 )

@@ -127,6 +129,7 @@ func main() {
        }

        ctx := ctrl.SetupSignalHandler()
+       eventCh := make(chan event.GenericEvent)

        // Create our index
        if err := mgr.GetFieldIndexer().IndexField(ctx, &corev1.Pod{}, index.Key, index.IndexerFunc); err != nil {
@@ -134,6 +137,20 @@ func main() {
                os.Exit(1)
        }

+       // Add our periodic runner
+       runner, err := periodic.New(
+               periodic.WithClient(mgr.GetClient()),
+               periodic.WithInterval(interval),
+               periodic.WithEventChannel(eventCh),
+       )
+       if err != nil {
+               setupLog.Error(err, "unable to create periodic runner", "controller", "Pod")
+               os.Exit(1)
+       }
+       if err := mgr.Add(runner); err != nil {
+               setupLog.Error(err, "unable to add periodic runner", "controller", "Pod")
+       }
+
        if err = (&controller.PodReconciler{
                Client: mgr.GetClient(),
                Scheme: mgr.GetScheme(),

We are almost there. A few more things remain to be done. We will now configure our controller to react on events originating from the events channel.

In order to do that open internal/controller/pod_controller.go and apply the following changes.

diff --git a/internal/controller/pod_controller.go b/internal/controller/pod_controller.go
index 8f4ee7b..6532bd3 100644
--- a/internal/controller/pod_controller.go
+++ b/internal/controller/pod_controller.go
@@ -19,17 +19,20 @@ package controller
 import (
        "context"

-       corev1 "k8s.io/api/core/v1"
        "k8s.io/apimachinery/pkg/runtime"
        ctrl "sigs.k8s.io/controller-runtime"
        "sigs.k8s.io/controller-runtime/pkg/client"
+       "sigs.k8s.io/controller-runtime/pkg/event"
+       "sigs.k8s.io/controller-runtime/pkg/handler"
        "sigs.k8s.io/controller-runtime/pkg/log"
+       "sigs.k8s.io/controller-runtime/pkg/source"
 )

 // PodReconciler reconciles a Pod object
 type PodReconciler struct {
        client.Client
-       Scheme *runtime.Scheme
+       Scheme  *runtime.Scheme
+       EventCh chan event.GenericEvent
 }

 //+kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;create;update;patch;delete
@@ -46,7 +49,8 @@ type PodReconciler struct {
 // For more details, check Reconcile and its Result here:
 // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.17.0/pkg/reconcile
 func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
-       _ = log.FromContext(ctx)
+       logger := log.FromContext(ctx)
+       logger.Info("reconcile")

        // TODO(user): your logic here

@@ -55,7 +59,13 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R

 // SetupWithManager sets up the controller with the Manager.
 func (r *PodReconciler) SetupWithManager(mgr ctrl.Manager) error {
+       src := &source.Channel{
+               Source: r.EventCh,
+       }
+       handler := &handler.EnqueueRequestForObject{}
+
        return ctrl.NewControllerManagedBy(mgr).
-               For(&corev1.Pod{}).
+               Named("pod_controller").
+               WatchesRawSource(src, handler).
                Complete(r)
 }

And finally, open cmd/main.go and apply the following changes, so that our PodReconciler is properly instantiated with the events channel.

diff --git a/cmd/main.go b/cmd/main.go
index 18d2b38..22e1376 100644
--- a/cmd/main.go
+++ b/cmd/main.go
@@ -152,8 +152,9 @@ func main() {
        }

        if err = (&controller.PodReconciler{
-               Client: mgr.GetClient(),
-               Scheme: mgr.GetScheme(),
+               Client:  mgr.GetClient(),
+               Scheme:  mgr.GetScheme(),
+               EventCh: eventCh,
        }).SetupWithManager(mgr); err != nil {
                setupLog.Error(err, "unable to create controller", "controller", "Pod")
                os.Exit(1)

We are now ready to build and test our controller. Build the controller to make sure it compiles.

make

In order to test things out we are going to create a local Kubernetes cluster using kind.

kind create cluster

Once the kind cluster is up and running we can start up our controller. Start the controller using this command.

make run

You should a similar output.

2024-03-23T21:22:36+02:00       INFO    setup   starting manager
2024-03-23T21:22:36+02:00       INFO    controller-runtime.metrics      Starting metrics server
2024-03-23T21:22:36+02:00       INFO    starting server {"kind": "health probe", "addr": "[::]:8081"}
2024-03-23T21:22:36+02:00       INFO    controller-runtime.metrics      Serving metrics server  {"bindAddress": ":8080", "secure": false}
2024-03-23T21:22:36+02:00       INFO    Starting EventSource    {"controller": "pod_controller", "source": "channel source: 0xc00046d600"}
2024-03-23T21:22:36+02:00       INFO    Starting Controller     {"controller": "pod_controller"}
2024-03-23T21:22:36+02:00       INFO    Starting workers        {"controller": "pod_controller", "worker count": 1}

For now, that’s pretty much what we are going to see from our controller until we have some pods to work with.

Let’s create a sample pod to test things out with. This is what our sample pod manifest looks like.

---
apiVersion: v1
kind: Pod
metadata:
  name: busybox
  namespace: default
spec:
  containers:
  - name: busybox
    image: busybox
    command:
      - sleep
      - infinity
    imagePullPolicy: IfNotPresent
  restartPolicy: Always

Create the pod.

kubectl apply -f /path/to/sample-pod.yaml

Check that we have the pod up and running.

kubectl get pods
NAME      READY   STATUS    RESTARTS   AGE
busybox   1/1     Running   0          98s

And now, let’s annotate our pod properly, so that it is considered by our controller.

kubectl annotate pod busybox dnaeon.github.io/reconcile-me=true

If we switch back to the logs of our controller we should see messages that the pod is being picked up by the controller every 30 seconds, which is the default interval we’ve configured.

2024-03-23T21:28:06+02:00       INFO    reconcile       {"controller": "pod_controller", "namespace": "default", "name": "busybox", "reconcileID": "fa88e341-bb90-448e-a206-9f0cb8e52778"}
2024-03-23T21:28:36+02:00       INFO    reconcile       {"controller": "pod_controller", "namespace": "default", "name": "busybox", "reconcileID": "1676b1fd-3d0e-4f43-8542-109b0ac88fd2"}
2024-03-23T21:29:06+02:00       INFO    reconcile       {"controller": "pod_controller", "namespace": "default", "name": "busybox", "reconcileID": "a2588b69-416c-435d-bab3-06f88f26843e"}

And that’s pretty much it, we now have a periodic reconciler!

Another thing to mention before we wrap up is that depending on what your controller will be doing you might want to have a look at the existing kubebuilder RBAC markers and make any required changes there. Also, make sure to add your test cases and don’t forget to check the excellent Kubebuilder Book!

Written on March 23, 2024