kubernetes liveness probe 流程

1 概述

kubernetes提供了的Probe可以进行健康检查。

https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-probes/

对pod中的每个容器通过配置liveness或者readiness。

当liveness probe failed后,该container会杀掉,并重新创建;而readinessProbe失败,则该pod ip 会从service的endpoints列表中删除,即隔离到该后端的请求。

如liveness 配置如下:

1livenessProbe:
2  httpGet:
3    port: 10252
4    path: “/healthz”
5  initialDelaySeconds: 15
6  timeoutSeconds: 15

文中尝试端到端的看下整个过程有哪些组件参与进来,怎么配合工作的。

2 配置

pkg/api/types.go#Probe结构描述了Probe的一个定义。其中Handler是执行的动作,initialDelaySeconds表示容器启动后延迟多少秒初始化probe,考虑一般应用启动需要一定时间。periodSeconds 表示周期检查的间隔,默认10秒,最小1秒。timeoutSeconds会告诉健康检查等待多长时间,默认1秒,最小1秒。successThreshold表示连续探测多少次成功才算成功。failureThreshold表示连续探测多少次失败才算失败。默认是3.

1type Probe struct {
2Handler json:",inline"
3InitialDelaySeconds int32 json:"initialDelaySeconds,omitempty"
4TimeoutSeconds int32 json:"timeoutSeconds,omitempty"
5PeriodSeconds int32 json:"periodSeconds,omitempty"
6SuccessThreshold int32 json:"successThreshold,omitempty"
7FailureThreshold int32 json:"failureThreshold,omitempty"
8}

探测动作Handler支持httpget tcd 和exec三种动作。

httpGet对应一个http请求,主要是配置http端口和path;TCPSocket对应一个TCP请求,主要是配置一个TCP端口,EXEC表示执行一个命令。各个handler详细的定义不看了。

 1type Handler struct {
 2Exec *ExecAction json:"exec,omitempty"
 3// HTTPGet specifies the http request to perform.
 4// +optional
 5HTTPGet *HTTPGetAction json:"httpGet,omitempty"
 6// TCPSocket specifies an action involving a TCP port.
 7// TODO: implement a realistic TCP lifecycle hook
 8// +optional
 9TCPSocket *TCPSocketAction json:"tcpSocket,omitempty"
10}

下面看下probe的探测是怎么被执行的,探测结果又是如何处理的。

在kubelet启动的时候根据配置定期执行配置的Probe,写状态。

pkg/kubelet/kubelet.go# NewMainKubelet

1klet.probeManager = prober.NewManager(
2klet.statusManager,
3klet.livenessManager,
4klet.runner,
5containerRefManager,
6kubeDeps.Recorder)

kubelete创建时会创建一个prober_manager. 负责pod 的probe。主要工作是:对每个容器创建一个worker,worker周期的执行探测并保存探测结果。

pkg/kubelet/prober/prober_manager.go

kubelet的addpod的操作HandlePodAdditions会触发prober_manager的AddPod,对pod中每个容器检查是否配置了readiness或者liveness的probe,并创建启动对应的worker。

kubelete创建时会创建一个prober_manager. 负责pod 的probe。主要工作是:对每个容器创建一个worker,worker周期的执行探测并保存探测结果。

pkg/kubelet/prober/prober_manager.go

 1func (m *manager) AddPod(pod *api.Pod) {
 2for _, c := range pod.Spec.Containers {
 3if c.ReadinessProbe != nil {
 4key.probeType = readiness
 5w := newWorker(m, readiness, pod, c)
 6m.workers[key] = w
 7go w.run()
 8}
 9if c.LivenessProbe != nil {
10key.probeType = liveness
11w := newWorker(m, liveness, pod, c)
12m.workers[key] = w
13go w.run()
14}

根据probe中配置的PeriodSeconds周期执行probe动作。

pkg/kubelet/prober/worker.go # doProbe

1func (w *worker) doProbe() (keepGoing bool) {
2result, err := w.probeManager.prober.probe(w.probeType, w.pod, status, w.container, w.containerID, prev)
3w.resultsManager.Set(w.containerID, result, w.pod)

执行prober中的探测动作,并将探测结果保持到resultsManager中。

忽略细节,调用过程是; pkg/kubelet/prober/prober.go#probe –> pkg/kubelet/prober/prober.go#runProbeWithRetries –> pkg/kubelet/prober/prober.go#runProbe

根据配置的是HTTPGET、Exec还是TCPSocket执行不同的探测。

 1func (pb *prober) runProbe(p *api.Probe, pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID) (probe.Result, string, error) {
 2if p.Exec != nil {
 3return pb.exec.Probe(pb.newExecInContainer(container, containerID, p.Exec.Command, timeout))
 4}
 5if p.HTTPGet != nil {
 6return pb.http.Probe(url, headers, timeout)
 7}
 8if p.TCPSocket != nil {
 9return pb.tcp.Probe(status.PodIP, port, timeout)
10}
11}<

分别对应pkg/probe/http/http.go,pkg/probe/tcp/tcp.go,pkg/probe/exec/exec.go 三种探测方式。各种方式也就是检查执行结果是否有错。

Tcp探测看连接后是否error

1func DoTCPProbe(addr string, timeout time.Duration) (probe.Result, string, error) {
2conn, err := net.DialTimeout(tcp, addr, timeout)
3if err != nil {
4// Convert errors to failures to handle timeouts.
5return probe.Failure, err.Error(), nil
6}

http探测看StatusCode

 1func DoHTTPProbe(url *url.URL, headers http.Header, client HTTPGetInterface) (probe.Result, string, error) {
 2req, err := http.NewRequest(GET, url.String(), nil)
 3if err != nil {
 4// Convert errors into failures to catch timeouts.
 5return probe.Failure, err.Error(), nil
 6}
 7res, err := client.Do(req)
 8if res.StatusCode &gt;= http.StatusOK && res.StatusCode &lt; http.StatusBadRequest {
 9return probe.Success, body, nil
10}

exec探测看执行返回码

 1func (pr execProber) Probe(e exec.Cmd) (probe.Result, string, error) {
 2data, err := e.CombinedOutput()
 3if err != nil {
 4exit, ok := err.(exec.ExitError)
 5if ok {
 6if exit.ExitStatus() == 0 {
 7return probe.Success, string(data), nil
 8} else {
 9return probe.Failure, string(data), nil
10}
11}
12return probe.Unknown, “”, err

至此探测流程很简单的看完了,关注下探测的结果的保存。

在prober_manager中,根据不同的探测类型,构造不同的worker,结果会保存到对应的prober_manager的readinessManager或者livenessManger中。

 4 根据状态 action

pkg/kubelet/kubelet.go#NewMainKubelet

kubelet启动时候会构造podWorker

1klet.podWorkers = newPodWorkers(klet.syncPod, klet.syncPodWithProcess, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache, klet.processCache)

syncPod是kubelet一个比较核心的方法,做了包括更新为static pod创建mirrorpod,为pod创建数据目录等操作,还有就是调用容器runtime的SyncPod的回调。

1result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff)
2kl.reasonCache.Update(pod.UID, result)

pkg/kubelet/container/runtime.go

定义了对pod的各种操作。是容器运行时需要实现的接口。

 1type Runtime interface {
 2Type() string
 3Status() (*RuntimeStatus, error)
 4GetPods(all bool) ([]*Pod, error)
 5GarbageCollect(gcPolicy ContainerGCPolicy, allSourcesReady bool) error
 6SyncPod(pod *api.Pod, apiPodStatus api.PodStatus, podStatus *PodStatus, pullSecrets []api.Secret, backOff *flowcontrol.Backoff) PodSyncResult
 7KillPod(pod *api.Pod, runningPod Pod, gracePeriodOverride *int64) error
 8GetPodStatus(uid types.UID, name, namespace string) (*PodStatus, error)
 9GetNetNS(containerID ContainerID) (string, error)
10GetPodContainerID(*Pod) (ContainerID, error)
11GetContainerLogs(pod *api.Pod, containerID ContainerID, logOptions *api.PodLogOptions, stdout, stderr io.Writer) (err error)
12DeleteContainer(containerID ContainerID) error
13UpdatePodCIDR(podCIDR string) error
14}

两种实现pkg/kubelet/dockertools/docker_manager.go 中的DockerManager 和

pkg/kubelet/rkt/rkt.go中Runtime

如pkg/kubelet/dockertools/docker_manager.go # SyncPod

1containerChanges, err := dm.computePodContainerChanges(pod, podStatus)
2for _, containerStatus := range runningContainerStatues {
3_, keep := containerChanges.ContainersToKeep[kubecontainer.DockerID(containerStatus.ID.ID)]
4if !keep && !keepInit {
5if err := dm.KillContainerInPod(containerStatus.ID, podContainer, pod, killMessage, nil); }
6}

pkg/kubelet/dockertools/docker_manager.go# computePodContainerChanges

 1liveness, found := dm.livenessManager.Get(containerStatus.ID)
 2if !found || liveness == proberesults.Success {
 3containersToKeep[containerID] = index
 4continue
 5}
 6if pod.Spec.RestartPolicy != api.RestartPolicyNever {
 7message := fmt.Sprintf(pod %q container %q is unhealthy, it will be killed and re-created., format.Pod(pod), container.Name)
 8glog.Info(message)
 9containersToStart[index] = message
10}

从livenessmanger获取对应container的状态,状态不是Success的都会被kill掉,在这里就是不在containersToKeep和initContainersToKeep的 container都会被杀掉

pkg/kubelet/rkt/rkt.go 中的Runtime的处理也是类似,从livenessmanager中获取状态,并执行杀pod的动作。

1liveness, found := r.livenessManager.Get(c.ID)
2if found && liveness != proberesults.Success && pod.Spec.RestartPolicy != api.RestartPolicyNever {
3glog.Infof(Pod %q container %q is unhealthy, it will be killed and re-created., format.Pod(pod), container.Name)
4restartPod = true
5}
6delete(unidentifiedContainers, c.ID)
7}
8if restartPod {
9if err = r.KillPod(pod, runningPod, nil); }

5 总结

对于文章前probe的处理过程,加上执行的主体,整个过程应该描述如下:

Kubelet 在延迟initialDelaySeconds后以periodSeconds为周期,执行定义的探测(包括httpGet,TcpSocket或者exec),并保存探测结果。对于liveness,kubelet会读取该探测结果,杀掉对应容器。