Skip to content

API Server - Watch

https://github.com/kubernetes/kubernetes/issues/10475

https://github.com/kubernetes/kubernetes/pull/10679/commits/dd6ba6a07fb75adfc86845bedafc9f73acf67615?short_path=761c110#diff-761c110bfd0f08200286dea321c01ddee6f9ddf5b362c674598530ed8880204a

Overview

stateDiagram-v2
    state client {
        [*] --> watch_apiserver
    }
    state apiserver {
        [*] --> handler
        handler-->cache
        watch_etcd
    }
    state etcd {
        watch
    }
    watch_apiserver-->long_polling
    watch_apiserver-->web_socket
    long_polling-->apiserver
    web_socket-->apiserver
    cache-->handler
    watch_etcd-->watch
    watch_etcd-->cache

API Server Source Code

How API Server Serves Watch Request and Watches Cache

Source code: k8s.io/apiserver/pkg/storage/cacher

While one can direct access etcd's watch api, it's preferred to access the api-server instead. In that case, object conversions are handled by the api-server automatically.

api-server, implements a cacher with a storage interface working as a decorator. If enabled, the cacher works as an interceptor to avoid massive connections open to etcd.

Cacher is responsible to serve requests and implements the interface of storage (GET, LIST, DELETE, WATCH, etc). It plays the role like etcd, and exposes same interfaces.

When a client sends a watch request, the Cacher runs a cacheWatcher against the watchCache and Cacher register a handler to watchCache.

If the watchCache modifies objects, it will send the event to the Cacher's handler. Then the cacher will send the event to all the related watchers.

watchCache works as the storage and implements ADD, UPDATE, DELETE, LIST, andGET. Upon changes are made to etcd, the cache will be updated and the cacheWatchers are notified.

stateDiagram-v2
    state api_server {
        [*] --> StorageWith/WithoutCacher
        cacher
        state cache {
            watchCache
        }
    }
    request
    etcd
    [*] --> request
    request --> etcd
    request --> api_server
    StorageWith/WithoutCacher --> cacher
    StorageWith/WithoutCacher --> etcd
    cacher --> cache
    cache --> etcd

How API Server Handles Client's Watch Request

k8s.io/apiserver/pkg/endpoints/handlers/watch.go

HTTP

stateDiagram-v2
    [*] --> serveWatch()
    state serveWatch() {
        server
    }
    state WatchServer {
        state ServeHTTP() {
            IsWebSocket
            IsHTTP
            object
            s.Fixup(event.Object)
            s.EmbeddedEncoder.Encode(obj,buf)
            metav1.InternalEvent(event)
            metav1.Convert_v1_InternalEvent_To_v1_WatchEvent
            e.Encode(outEvent)
            flusher.Flush()
        }
        channel
        event
    }
    server --> WatchServer
    channel --> event
    event --> ServeHTTP()
    IsHTTP --> s.Fixup(event.Object)
    s.Fixup(event.Object)--> object
    object --> s.EmbeddedEncoder.Encode(obj,buf)
    s.EmbeddedEncoder.Encode(obj,buf) --> metav1.InternalEvent(event)
    metav1.InternalEvent(event) --> metav1.Convert_v1_InternalEvent_To_v1_WatchEvent
    metav1.Convert_v1_InternalEvent_To_v1_WatchEvent --> e.Encode(outEvent)
    e.Encode(outEvent) --> flusher.Flush()

WebSocket

stateDiagram-v2
    state serveWatch() {
        server
    }
    state WatchServer {
        state ServeHTTP() {
            IsWebSocket
            IsHTTP
        }
        channel
        event
        [*] --> channel
    }
    state HandleWS() {
        server.Encoder.Encode()
        streamBuf
        websocket.Message.Send()
    }
    [*] --> serveWatch()
    server --> WatchServer
    IsWebSocket --> HandleWS()
    channel --> event
    event --> ServeHTTP()
    server.Encoder.Encode() --> streamBuf
    streamBuf --> websocket.Message.Send()

How a Cacher works

Setup a Cacher with watchCache

stateDiagram-v2
    state cacher.go {
        [*] --> NewCacherFromConfig()

        state Cacher {
            processEvent(*watchCacheEvent)
        }
    }
    state watch-cache.go {
        newWatchCache(..,cacher.processEvent)
        state watchCache {
            eventHandler
            processEvent()
            updateCache()
            [*] --> processEvent()
        }
    }
    NewCacherFromConfig() --> Cacher
    Cacher --> newWatchCache(..,cacher.processEvent)
    newWatchCache(..,cacher.processEvent) --> watchCache

    processEvent() --> updateCache()
    updateCache() --> eventHandler
    processEvent(*watchCacheEvent) --> eventHandler

Cacher.processEvent()

stateDiagram-v2
    state cacher.go {
        state Cacher {
            [*] --> processEvent(*watchCacheEvent)
            dispatchEvent(*watchCacheEvent)
            dispatchEvents()
            cacheWatcher.add(event,timer)
        }
        state cacheWatcher {
            [*] --> add(*watchCacheEvent,*time.Timer)
            process()
            sendWatchCacheEvent(*watchCacheEvent)
        }
    }
    processEvent(*watchCacheEvent) --> dispatchEvents()
    dispatchEvents() --> dispatchEvent(*watchCacheEvent)
    dispatchEvent(*watchCacheEvent) --> cacheWatcher.add(event,timer)
    Cacher --> cacheWatcher
    add(*watchCacheEvent,*time.Timer) --> process()
    process() --> sendWatchCacheEvent(*watchCacheEvent)

How to watch etcd resources

k8s.io/apiserver/pkg/storage/etcd3

stateDiagram-v2
    state etcd {
        clientv3.Client
    }
    state watcher.go {
        [*] --> newWatcher()
        newWatcher()
        state type_watcher {
            client
            Watch()
            createWatchChan()
            [*] --> Watch()
        }
        state type_watchChan {
            watcher
            incomingEventChan
            resultChan
            run()
            processEvent()
            sendEvent(parsedEvent)
            transform(e)
            startWatching() --> Event
            [*] --> run()
        }
    }
    state event.go {
        type_event
        func_parseEvent()
        func_parseEvent() --> type_event
    }
    client --> etcd
    newWatcher() --> type_watcher
    Watch() --> createWatchChan()
    createWatchChan() --> type_watchChan
    run() --> startWatching()
    run() --> processEvent()
    Event --> func_parseEvent()
    type_event --> sendEvent(parsedEvent)
    sendEvent(parsedEvent) --> incomingEventChan
    incomingEventChan --> processEvent()
    processEvent() --> transform(e)
    transform(e) --> resultChan
    watcher --> type_watcher
    resultChan --> [*]