Implementing a periodic Kubernetes controller
Recently I was looking for a way to implement a Kubernetes controller which is triggered on regular basis, e.g. it reconciles whatever objects it is concerned about every Nth seconds.
Controllers in Kubernetes work by watching resources of a given kind and reacting to events when something happens with these objects. The job of the controller is to monitor the object and potentially perform changes to it, in order to get the object to the desired state based on the specification of the object itself. This process is known as reconciling.
The events which would trigger the controller to reconcile an object are Create, Update or Delete.
But, what if you don’t want to reconcile on Create
, Update
or Delete
events? What if you want to reconcile your objects on periodic intervals
instead?
Example use case for a periodic controller would be something along the lines of a custom autoscaler, which periodically checks some metrics and takes appropriate actions for your custom resources.
If that’s what you are looking for, then read on. Also, you can find the code from this post in the dnaeon/kubernetes-periodic-controller repo.
This post documents my learning process while implementing a periodic reconciler
bootstrapped with kubebuilder
. It will most likely serve my future self only,
but hopefully it helps others as well.
These days it seems to be a common practice that new controllers are bootstrapped using either kubebuilder or operator-sdk. Both projects are great and come with really good documentation, but after looking through the documentation I was not able to find what I was looking for.
The rest of this post talks about how to develop a periodic Kubernetes
controller, which will be bootstrapped using kubebuilder
. The steps taken to
bootstrap the controller should work fine with operator-sdk
as well, since it
is based on kubebuilder
anyways, and the commands are pretty much the same.
First, lets create a new kubebuilder
project.
kubebuilder init --domain dnaeon.github.io --repo github.com/dnaeon/kubernetes-periodic-controller
You should an output similar to the one below.
INFO Writing kustomize manifests for you to edit...
INFO Writing scaffold for you to edit...
INFO Get controller runtime:
$ go get sigs.k8s.io/controller-runtime@v0.17.0
go: downloading sigs.k8s.io/controller-runtime v0.17.0
go: downloading k8s.io/apimachinery v0.29.0
go: downloading k8s.io/utils v0.0.0-20230726121419-3b25d923346b
go: downloading k8s.io/client-go v0.29.0
go: downloading github.com/go-logr/logr v1.4.1
go: downloading k8s.io/api v0.29.0
go: downloading k8s.io/component-base v0.29.0
go: downloading github.com/evanphx/json-patch/v5 v5.8.0
go: downloading k8s.io/apiextensions-apiserver v0.29.0
go: downloading github.com/prometheus/client_golang v1.18.0
go: downloading k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00
go: downloading github.com/fsnotify/fsnotify v1.7.0
go: downloading golang.org/x/oauth2 v0.12.0
go: downloading golang.org/x/sys v0.16.0
go: downloading github.com/prometheus/client_model v0.5.0
go: downloading github.com/prometheus/common v0.45.0
go: downloading github.com/prometheus/procfs v0.12.0
go: downloading github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0
INFO Update dependencies:
$ go mod tidy
go: downloading github.com/onsi/gomega v1.30.0
go: downloading github.com/onsi/ginkgo/v2 v2.14.0
go: downloading github.com/go-logr/zapr v1.3.0
go: downloading go.uber.org/goleak v1.3.0
go: downloading golang.org/x/tools v0.16.1
Next: define a resource with:
$ kubebuilder create api
Now, we will generate the boilerplate code for our controller. In order to keep things simple we will not be introducing a custom resource here, but instead our controller will be reconciling Pod objects.
kubebuilder create api --controller --resource=false --group core --version v1 --kind Pod
You should a similar output.
INFO Writing kustomize manifests for you to edit...
INFO Writing scaffold for you to edit...
INFO internal/controller/suite_test.go
INFO internal/controller/pod_controller.go
INFO internal/controller/pod_controller_test.go
INFO Update dependencies:
$ go mod tidy
And with that we have successfully bootstrapped our controller. Now it’s time to get to the interesting stuff.
The first thing we are going to do is to add a new command-line flag, which will
be used to specify the interval at which our controller should be
reconciling. In order to do that open cmd/main.go
. In the beginning of
main()
you will see a bunch of already existing flags, which kubebuilder
generated for us. We will be adding our new flag next to the existing ones. The
diff below shows the changes we need to do in cmd/main.go
.
diff --git a/cmd/main.go b/cmd/main.go
index a790ba1..114fec8 100644
--- a/cmd/main.go
+++ b/cmd/main.go
@@ -20,6 +20,7 @@ import (
"crypto/tls"
"flag"
"os"
+ "time"
// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
@@ -55,6 +56,8 @@ func main() {
var probeAddr string
var secureMetrics bool
var enableHTTP2 bool
+ var interval time.Duration
+
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
@@ -64,6 +67,7 @@ func main() {
"If set the metrics endpoint is served securely")
flag.BoolVar(&enableHTTP2, "enable-http2", false,
"If set, HTTP/2 will be enabled for the metrics and webhook servers")
+ flag.DurationVar(&interval, "interval", 30*time.Second, "The interval at which to run the periodic check")
opts := zap.Options{
Development: true,
}
Before we move on it’s worth saying a few more words about the periodic
controller implementation. As mentioned previously the controllers usually
reconcile objects on Create
, Update
and Delete
events. If you look in the
internal/controller/pod_controller.go
file you will this method for our
reconciler.
// SetupWithManager sets up the controller with the Manager.
func (r *PodReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&corev1.Pod{}).
Complete(r)
}
What this method does is to register the controller with the manager and
configure it, so that it reacts on events on the corev1.Pod{}
object. This is
how watching
resources is
configured. If our controller owns other
resources,
it would also register them here, so that the reconcile loop is executed
whenever we have an event for an owned resource.
Registering owned resources is done using the Builder.Owns() method, and controlled resources are registered (once only) using the Builder.For() method. Now, if you check the documentation for these methods you will see that they can be replaced with calls to Builder.Watches(), instead.
There is however another way to configure watching for resources, and that is by
using the
Builder.WatchesRawSource(),
which uses a
source.Source
implementation, and source.Source
is what allows us to hook into the
Kubernetes event stream, so that we can generate events, which in turn would
cause our controller to start the reconciliation process.
If you check the
sigs.k8s.io/controller-runtime/pkg/source
package you will notice that there are multiple implementations of
source.Source
, and the one in particular which we’ll be using in this post is
source.Channel.
This is what we are going to use in order to implement a periodic runner, which will check every Nth seconds (minutes, hours or days, etc.), whether we have some objects for reconciling, and if yes, then it will emit events for our controller.
Another thing that is worth mentioning here, and this one is related around efficiency and overall performance, is that probably we do not want to reconcile every single Pod in our cluster. Instead, we will reconcile only select pods, which contain our own special annotation.
So, let’s create a new package for our annotations. Create the file
internal/annotation/annotation.go
with the following contents.
package annotation
// ReconcileMe is an annotation which tells the controller to reconcile the pod,
// if it is annotated with it.
const ReconcileMe = "dnaeon.github.io/reconcile-me"
The next thing we are going to do is to add an index, so that we can easily lookup the Pods which contain our custom annotation. This is done using a FieldIndexer.
Create the internal/index/index.go
file with the following contents.
package index
import (
"github.com/dnaeon/kubernetes-periodic-controller/internal/annotation"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// Key is the index key we add to the Pods.
const Key = "dnaeon.github.io/pod_idx"
// IndexerFunc is a [sigs.k8s.io/controller-runtime/pkg/client.IndexerFunc],
// which knows how to extract values for index [Key].
func IndexerFunc(rawObj client.Object) []string {
obj, ok := rawObj.(*corev1.Pod)
if !ok {
return []string{}
}
value, exists := obj.Annotations[annotation.ReconcileMe]
if !exists {
return []string{}
}
return []string{value}
}
Notice the IndexerFunc
from the code above. This function will be used to
extract the values for the index key from our Pods. This function essentially is
a
sigs.k8s.io/controller-runtime/pkg/client.IndexerFunc.
Okay, time to create our index. Open cmd/main.go
and register the index with
our manager. The diff below shows the changes we need to make.
diff --git a/cmd/main.go b/cmd/main.go
index 114fec8..292af04 100644
--- a/cmd/main.go
+++ b/cmd/main.go
@@ -26,6 +26,7 @@ import (
// to ensure that exec-entrypoint and run can make use of them.
_ "k8s.io/client-go/plugin/pkg/client/auth"
+ corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
@@ -36,6 +37,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/webhook"
"github.com/dnaeon/kubernetes-periodic-controller/internal/controller"
+ "github.com/dnaeon/kubernetes-periodic-controller/internal/index"
//+kubebuilder:scaffold:imports
)
@@ -124,6 +126,14 @@ func main() {
os.Exit(1)
}
+ ctx := ctrl.SetupSignalHandler()
+
+ // Create our index
+ if err := mgr.GetFieldIndexer().IndexField(ctx, &corev1.Pod{}, index.Key, index.IndexerFunc); err != nil {
+ setupLog.Error(err, "unable to create index", "controller", "Pod")
+ os.Exit(1)
+ }
+
if err = (&controller.PodReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
@@ -143,7 +153,7 @@ func main() {
}
setupLog.Info("starting manager")
- if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
+ if err := mgr.Start(ctx); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
It’s time to implement our periodic runner. Remember that our periodic runner
will be using a
sigs.k8s.io/controller-runtime/pkg/source.Channel
to hook into the Kubernetes event stream? A source.Channel
uses a channel over
which
sigs.k8s.io/controller-runtime/pkg/event.GenericEvent
events are being sent to. When coupled with a proper
EventHandler
these generic events get transformed into a
sigs.k8s.io/controller-runtime/pkg/reconcile.Request,
which is what our controller operates on.
Create the internal/periodic/periodic.go
file with the following contents.
package periodic
import (
"context"
"time"
"github.com/dnaeon/kubernetes-periodic-controller/internal/index"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/log"
)
// Runner is a periodic runner which enqueues Pods for reconciliation on regular
// intervals.
type Runner struct {
client client.Client
interval time.Duration
eventCh chan event.GenericEvent
}
// Option is a function which configures the [Runner].
type Option func(c *Runner) error
// New creates a new periodic runner and configures it using the provided
// options.
func New(opts ...Option) (*Runner, error) {
r := &Runner{}
for _, opt := range opts {
if err := opt(r); err != nil {
return nil, err
}
}
return r, nil
}
// WithClient configures the [Runner] with the given client.
func WithClient(c client.Client) Option {
opt := func(r *Runner) error {
r.client = c
return nil
}
return opt
}
// WithInterval configures the [Runner] with the given interval.
func WithInterval(interval time.Duration) Option {
opt := func(r *Runner) error {
r.interval = interval
return nil
}
return opt
}
// WithEventChannel configures the [Runner] to use the given channel for
// enqueuing.
func WithEventChannel(ch chan event.GenericEvent) Option {
opt := func(r *Runner) error {
r.eventCh = ch
return nil
}
return opt
}
// Start implements the
// [sigs.k8s.io/controller-runtime/pkg/manager.Runnable] interface.
func (r *Runner) Start(ctx context.Context) error {
ticker := time.NewTicker(r.interval)
logger := log.FromContext(ctx)
defer ticker.Stop()
defer close(r.eventCh)
for {
select {
case <-ticker.C:
if err := r.enqueuePods(ctx); err != nil {
logger.Error(err, "failed to enqueue pods")
}
case <-ctx.Done():
return nil
}
}
}
// enqueuePods enqueues the Pods which are properly annotated
func (r *Runner) enqueuePods(ctx context.Context) error {
var items corev1.PodList
opts := client.MatchingFields{index.Key: "true"}
if err := r.client.List(ctx, &items, opts); err != nil {
return err
}
for _, item := range items.Items {
event := event.GenericEvent{
Object: &item,
}
r.eventCh <- event
}
return nil
}
Notice how in the enqueuePods
method we are filtering out the pods by the
index key we’ve previously created. Also worth mentioning here is that our
Runner
implementation implements the
sigs.k8s.io/controller-runtime/pkg/manager.Runnable
interface.
We will now register this periodic runner with our manager. Open cmd/main.go
and apply the following changes.
diff --git a/cmd/main.go b/cmd/main.go
index 292af04..18d2b38 100644
--- a/cmd/main.go
+++ b/cmd/main.go
@@ -31,6 +31,7 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
+ "sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
@@ -38,6 +39,7 @@ import (
"github.com/dnaeon/kubernetes-periodic-controller/internal/controller"
"github.com/dnaeon/kubernetes-periodic-controller/internal/index"
+ "github.com/dnaeon/kubernetes-periodic-controller/internal/periodic"
//+kubebuilder:scaffold:imports
)
@@ -127,6 +129,7 @@ func main() {
}
ctx := ctrl.SetupSignalHandler()
+ eventCh := make(chan event.GenericEvent)
// Create our index
if err := mgr.GetFieldIndexer().IndexField(ctx, &corev1.Pod{}, index.Key, index.IndexerFunc); err != nil {
@@ -134,6 +137,20 @@ func main() {
os.Exit(1)
}
+ // Add our periodic runner
+ runner, err := periodic.New(
+ periodic.WithClient(mgr.GetClient()),
+ periodic.WithInterval(interval),
+ periodic.WithEventChannel(eventCh),
+ )
+ if err != nil {
+ setupLog.Error(err, "unable to create periodic runner", "controller", "Pod")
+ os.Exit(1)
+ }
+ if err := mgr.Add(runner); err != nil {
+ setupLog.Error(err, "unable to add periodic runner", "controller", "Pod")
+ }
+
if err = (&controller.PodReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
We are almost there. A few more things remain to be done. We will now configure our controller to react on events originating from the events channel.
In order to do that open internal/controller/pod_controller.go
and apply the
following changes.
diff --git a/internal/controller/pod_controller.go b/internal/controller/pod_controller.go
index 8f4ee7b..6532bd3 100644
--- a/internal/controller/pod_controller.go
+++ b/internal/controller/pod_controller.go
@@ -19,17 +19,20 @@ package controller
import (
"context"
- corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
+ "sigs.k8s.io/controller-runtime/pkg/event"
+ "sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
+ "sigs.k8s.io/controller-runtime/pkg/source"
)
// PodReconciler reconciles a Pod object
type PodReconciler struct {
client.Client
- Scheme *runtime.Scheme
+ Scheme *runtime.Scheme
+ EventCh chan event.GenericEvent
}
//+kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;create;update;patch;delete
@@ -46,7 +49,8 @@ type PodReconciler struct {
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.17.0/pkg/reconcile
func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
- _ = log.FromContext(ctx)
+ logger := log.FromContext(ctx)
+ logger.Info("reconcile")
// TODO(user): your logic here
@@ -55,7 +59,13 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
// SetupWithManager sets up the controller with the Manager.
func (r *PodReconciler) SetupWithManager(mgr ctrl.Manager) error {
+ src := &source.Channel{
+ Source: r.EventCh,
+ }
+ handler := &handler.EnqueueRequestForObject{}
+
return ctrl.NewControllerManagedBy(mgr).
- For(&corev1.Pod{}).
+ Named("pod_controller").
+ WatchesRawSource(src, handler).
Complete(r)
}
And finally, open cmd/main.go
and apply the following changes, so that our
PodReconciler
is properly instantiated with the events channel.
diff --git a/cmd/main.go b/cmd/main.go
index 18d2b38..22e1376 100644
--- a/cmd/main.go
+++ b/cmd/main.go
@@ -152,8 +152,9 @@ func main() {
}
if err = (&controller.PodReconciler{
- Client: mgr.GetClient(),
- Scheme: mgr.GetScheme(),
+ Client: mgr.GetClient(),
+ Scheme: mgr.GetScheme(),
+ EventCh: eventCh,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Pod")
os.Exit(1)
We are now ready to build and test our controller. Build the controller to make sure it compiles.
make
In order to test things out we are going to create a local Kubernetes cluster using kind.
kind create cluster
Once the kind
cluster is up and running we can start up our controller. Start
the controller using this command.
make run
You should a similar output.
2024-03-23T21:22:36+02:00 INFO setup starting manager
2024-03-23T21:22:36+02:00 INFO controller-runtime.metrics Starting metrics server
2024-03-23T21:22:36+02:00 INFO starting server {"kind": "health probe", "addr": "[::]:8081"}
2024-03-23T21:22:36+02:00 INFO controller-runtime.metrics Serving metrics server {"bindAddress": ":8080", "secure": false}
2024-03-23T21:22:36+02:00 INFO Starting EventSource {"controller": "pod_controller", "source": "channel source: 0xc00046d600"}
2024-03-23T21:22:36+02:00 INFO Starting Controller {"controller": "pod_controller"}
2024-03-23T21:22:36+02:00 INFO Starting workers {"controller": "pod_controller", "worker count": 1}
For now, that’s pretty much what we are going to see from our controller until we have some pods to work with.
Let’s create a sample pod to test things out with. This is what our sample pod manifest looks like.
---
apiVersion: v1
kind: Pod
metadata:
name: busybox
namespace: default
spec:
containers:
- name: busybox
image: busybox
command:
- sleep
- infinity
imagePullPolicy: IfNotPresent
restartPolicy: Always
Create the pod.
kubectl apply -f /path/to/sample-pod.yaml
Check that we have the pod up and running.
kubectl get pods
NAME READY STATUS RESTARTS AGE
busybox 1/1 Running 0 98s
And now, let’s annotate our pod properly, so that it is considered by our controller.
kubectl annotate pod busybox dnaeon.github.io/reconcile-me=true
If we switch back to the logs of our controller we should see messages that the pod is being picked up by the controller every 30 seconds, which is the default interval we’ve configured.
2024-03-23T21:28:06+02:00 INFO reconcile {"controller": "pod_controller", "namespace": "default", "name": "busybox", "reconcileID": "fa88e341-bb90-448e-a206-9f0cb8e52778"}
2024-03-23T21:28:36+02:00 INFO reconcile {"controller": "pod_controller", "namespace": "default", "name": "busybox", "reconcileID": "1676b1fd-3d0e-4f43-8542-109b0ac88fd2"}
2024-03-23T21:29:06+02:00 INFO reconcile {"controller": "pod_controller", "namespace": "default", "name": "busybox", "reconcileID": "a2588b69-416c-435d-bab3-06f88f26843e"}
And that’s pretty much it, we now have a periodic reconciler!
Another thing to mention before we wrap up is that depending on what your controller will be doing you might want to have a look at the existing kubebuilder RBAC markers and make any required changes there. Also, make sure to add your test cases and don’t forget to check the excellent Kubebuilder Book!