API Server - Watch¶
Related Issue and Pull Request¶
Overview¶
stateDiagram-v2
state client {
[*] --> watch_apiserver
}
state apiserver {
[*] --> handler
handler-->cache
watch_etcd
}
state etcd {
watch
}
watch_apiserver-->long_polling
watch_apiserver-->web_socket
long_polling-->apiserver
web_socket-->apiserver
cache-->handler
watch_etcd-->watch
watch_etcd-->cache
API Server Source Code¶
How API Server Serves Watch Request and Watches Cache¶
Source code: k8s.io/apiserver/pkg/storage/cacher
While one can direct access etcd
's watch
api, it's preferred to access the api-server
instead. In that case, object conversions are handled by the api-server
automatically.
api-server
, implements a cacher with a storage interface working as a decorator. If enabled, the cacher works as an interceptor to avoid massive connections open to etcd
.
Cacher
is responsible to serve requests and implements the interface of storage (GET
, LIST
, DELETE
, WATCH
, etc). It plays the role like etcd
, and exposes same interfaces.
When a client sends a watch request, the Cacher
runs a cacheWatcher
against the watchCache
and Cacher
register a handler to watchCache
.
If the watchCache
modifies objects, it will send the event to the Cacher
's handler. Then the cacher will send the event to all the related watchers.
watchCache
works as the storage and implements ADD
, UPDATE
, DELETE
, LIST
, andGET
. Upon changes are made to etcd
, the cache will be updated and the cacheWatchers
are notified.
stateDiagram-v2
state api_server {
[*] --> StorageWith/WithoutCacher
cacher
state cache {
watchCache
}
}
request
etcd
[*] --> request
request --> etcd
request --> api_server
StorageWith/WithoutCacher --> cacher
StorageWith/WithoutCacher --> etcd
cacher --> cache
cache --> etcd
How API Server Handles Client's Watch Request¶
k8s.io/apiserver/pkg/endpoints/handlers/watch.go
HTTP¶
stateDiagram-v2
[*] --> serveWatch()
state serveWatch() {
server
}
state WatchServer {
state ServeHTTP() {
IsWebSocket
IsHTTP
object
s.Fixup(event.Object)
s.EmbeddedEncoder.Encode(obj,buf)
metav1.InternalEvent(event)
metav1.Convert_v1_InternalEvent_To_v1_WatchEvent
e.Encode(outEvent)
flusher.Flush()
}
channel
event
}
server --> WatchServer
channel --> event
event --> ServeHTTP()
IsHTTP --> s.Fixup(event.Object)
s.Fixup(event.Object)--> object
object --> s.EmbeddedEncoder.Encode(obj,buf)
s.EmbeddedEncoder.Encode(obj,buf) --> metav1.InternalEvent(event)
metav1.InternalEvent(event) --> metav1.Convert_v1_InternalEvent_To_v1_WatchEvent
metav1.Convert_v1_InternalEvent_To_v1_WatchEvent --> e.Encode(outEvent)
e.Encode(outEvent) --> flusher.Flush()
WebSocket¶
stateDiagram-v2
state serveWatch() {
server
}
state WatchServer {
state ServeHTTP() {
IsWebSocket
IsHTTP
}
channel
event
[*] --> channel
}
state HandleWS() {
server.Encoder.Encode()
streamBuf
websocket.Message.Send()
}
[*] --> serveWatch()
server --> WatchServer
IsWebSocket --> HandleWS()
channel --> event
event --> ServeHTTP()
server.Encoder.Encode() --> streamBuf
streamBuf --> websocket.Message.Send()
How a Cacher works¶
Setup a Cacher with watchCache¶
stateDiagram-v2
state cacher.go {
[*] --> NewCacherFromConfig()
state Cacher {
processEvent(*watchCacheEvent)
}
}
state watch-cache.go {
newWatchCache(..,cacher.processEvent)
state watchCache {
eventHandler
processEvent()
updateCache()
[*] --> processEvent()
}
}
NewCacherFromConfig() --> Cacher
Cacher --> newWatchCache(..,cacher.processEvent)
newWatchCache(..,cacher.processEvent) --> watchCache
processEvent() --> updateCache()
updateCache() --> eventHandler
processEvent(*watchCacheEvent) --> eventHandler
Cacher.processEvent()¶
stateDiagram-v2
state cacher.go {
state Cacher {
[*] --> processEvent(*watchCacheEvent)
dispatchEvent(*watchCacheEvent)
dispatchEvents()
cacheWatcher.add(event,timer)
}
state cacheWatcher {
[*] --> add(*watchCacheEvent,*time.Timer)
process()
sendWatchCacheEvent(*watchCacheEvent)
}
}
processEvent(*watchCacheEvent) --> dispatchEvents()
dispatchEvents() --> dispatchEvent(*watchCacheEvent)
dispatchEvent(*watchCacheEvent) --> cacheWatcher.add(event,timer)
Cacher --> cacheWatcher
add(*watchCacheEvent,*time.Timer) --> process()
process() --> sendWatchCacheEvent(*watchCacheEvent)
How to watch etcd resources¶
k8s.io/apiserver/pkg/storage/etcd3
stateDiagram-v2
state etcd {
clientv3.Client
}
state watcher.go {
[*] --> newWatcher()
newWatcher()
state type_watcher {
client
Watch()
createWatchChan()
[*] --> Watch()
}
state type_watchChan {
watcher
incomingEventChan
resultChan
run()
processEvent()
sendEvent(parsedEvent)
transform(e)
startWatching() --> Event
[*] --> run()
}
}
state event.go {
type_event
func_parseEvent()
func_parseEvent() --> type_event
}
client --> etcd
newWatcher() --> type_watcher
Watch() --> createWatchChan()
createWatchChan() --> type_watchChan
run() --> startWatching()
run() --> processEvent()
Event --> func_parseEvent()
type_event --> sendEvent(parsedEvent)
sendEvent(parsedEvent) --> incomingEventChan
incomingEventChan --> processEvent()
processEvent() --> transform(e)
transform(e) --> resultChan
watcher --> type_watcher
resultChan --> [*]