Skip to content

Write a Minimal Mutating Webhook

In this article, let's write a minial mutating webhook that add a new label to the targeted pods.

Folder Structure

.
├── deploy
   ├── test.yml
   └── webhook.yml
├── requirements.txt
├── run-server.sh
└── webhook
    ├── app.py
    ├── __init__.py
    ├── k8s.py
    └── models.py

Dependencies

Install Requirements

=== requirements.txt

```yaml
fastapi==0.70.1
uvicorn==0.16.0
loguru==0.5.3
jsonpatch==1.32
kubernetes==20.13.0
```

Webhook Codes

We use fastapi to develop the webhook.

Webhook Codes

from typing import Optional

from pydantic import BaseModel

class WebhookRequestDetail(BaseModel):
    uid: str
    kind: dict
    resource: dict
    name: Optional[str]
    namespace: str
    operation: str
    object: dict

class WebhookRequest(BaseModel):
    apiVersion: str
    kind: str
    request: WebhookRequestDetail

class WebhookResponseDetail(BaseModel):
    allowed: bool
    uid: str
    patch: Optional[str]
    patchType: Optional[str]

class WebhookResponse(BaseModel):
    apiVersion: str
    kind: str
    response: WebhookResponseDetail
from kubernetes.client.api_client import ApiClient

class Converter(ApiClient):
    def from_dict(self, data, kind):
        return self._ApiClient__deserialize(data, kind)
import base64
import logging
from typing import Optional

import jsonpatch
from fastapi import FastAPI
from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse
from loguru import logger

from .k8s import Converter
from .models import WebhookRequest, WebhookResponse, WebhookResponseDetail

logging.root.setLevel(logging.DEBUG)

app = FastAPI()

@app.post(
    "/mutate-pods", response_model=WebhookResponse, response_model_exclude_unset=True
)
async def mutate_pods(webhook_request: WebhookRequest, timeout: Optional[str] = "10s"):
    resp = WebhookResponse(
        apiVersion=webhook_request.apiVersion,
        kind=webhook_request.kind,
        response=WebhookResponseDetail(allowed=True, uid=webhook_request.request.uid),
    )
    pod = Converter().from_dict(webhook_request.request.object, "V1Pod")
    original = pod.to_dict()
    try:
        pod.metadata.labels["is_patched"] = "true"
    except KeyError as exc:
        logger.error("Patch failed: key not found")
        logger.error(exc)
        logger.error(webhook_request.json())
        return resp
    # patch = jsonpatch.JsonPatch.from_diff(webhook_request.request.object, patched_pod)
    patch = jsonpatch.JsonPatch.from_diff(original, pod.to_dict())
    resp.response.patchType = "JSONPatch"
    resp.response.patch = base64.b64encode(str(patch).encode()).decode()
    logger.info("Response.apiVersion: {}", resp.apiVersion)
    logger.info("Response.kind: {}", resp.kind)
    logger.info("Response.response.allowed: {}", resp.response.allowed)
    logger.info("Response.response.uid: {}", resp.response.uid)
    logger.info("Response.response.patch: {}", resp.response.patch)
    logger.info("Response.response.patchType: {}", resp.response.patchType)
    return resp

Run the Server

$ uvicorn webhook.app:app --reload --host=0.0.0.0 --port=8000 --debug
INFO:     Will watch for changes in these directories: ['/home/test/webhook']
INFO:     Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)
INFO:     Started reloader process [31408] using watchgod
INFO:     Started server process [31417]
INFO:     Waiting for application startup.
INFO:     Application startup complete.