kubernetes liveness probe 流程
1 概述
kubernetes提供了的Probe可以进行健康检查。
https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-probes/
对pod中的每个容器通过配置liveness或者readiness。
当liveness probe failed后,该container会杀掉,并重新创建;而readinessProbe失败,则该pod ip 会从service的endpoints列表中删除,即隔离到该后端的请求。
如liveness 配置如下:
1livenessProbe:
2 httpGet:
3 port: 10252
4 path: “/healthz”
5 initialDelaySeconds: 15
6 timeoutSeconds: 15
文中尝试端到端的看下整个过程有哪些组件参与进来,怎么配合工作的。
2 配置
pkg/api/types.go#Probe结构描述了Probe的一个定义。其中Handler是执行的动作,initialDelaySeconds表示容器启动后延迟多少秒初始化probe,考虑一般应用启动需要一定时间。periodSeconds 表示周期检查的间隔,默认10秒,最小1秒。timeoutSeconds会告诉健康检查等待多长时间,默认1秒,最小1秒。successThreshold表示连续探测多少次成功才算成功。failureThreshold表示连续探测多少次失败才算失败。默认是3.
1type Probe struct {
2Handler json:",inline"
3InitialDelaySeconds int32 json:"initialDelaySeconds,omitempty"
4TimeoutSeconds int32 json:"timeoutSeconds,omitempty"
5PeriodSeconds int32 json:"periodSeconds,omitempty"
6SuccessThreshold int32 json:"successThreshold,omitempty"
7FailureThreshold int32 json:"failureThreshold,omitempty"
8}
探测动作Handler支持httpget tcd 和exec三种动作。
httpGet对应一个http请求,主要是配置http端口和path;TCPSocket对应一个TCP请求,主要是配置一个TCP端口,EXEC表示执行一个命令。各个handler详细的定义不看了。
1type Handler struct {
2Exec *ExecAction json:"exec,omitempty"
3// HTTPGet specifies the http request to perform.
4// +optional
5HTTPGet *HTTPGetAction json:"httpGet,omitempty"
6// TCPSocket specifies an action involving a TCP port.
7// TODO: implement a realistic TCP lifecycle hook
8// +optional
9TCPSocket *TCPSocketAction json:"tcpSocket,omitempty"
10}
下面看下probe的探测是怎么被执行的,探测结果又是如何处理的。
在kubelet启动的时候根据配置定期执行配置的Probe,写状态。
pkg/kubelet/kubelet.go# NewMainKubelet
1klet.probeManager = prober.NewManager(
2klet.statusManager,
3klet.livenessManager,
4klet.runner,
5containerRefManager,
6kubeDeps.Recorder)
kubelete创建时会创建一个prober_manager. 负责pod 的probe。主要工作是:对每个容器创建一个worker,worker周期的执行探测并保存探测结果。
pkg/kubelet/prober/prober_manager.go
kubelet的addpod的操作HandlePodAdditions会触发prober_manager的AddPod,对pod中每个容器检查是否配置了readiness或者liveness的probe,并创建启动对应的worker。
kubelete创建时会创建一个prober_manager. 负责pod 的probe。主要工作是:对每个容器创建一个worker,worker周期的执行探测并保存探测结果。
pkg/kubelet/prober/prober_manager.go
1func (m *manager) AddPod(pod *api.Pod) {
2for _, c := range pod.Spec.Containers {
3if c.ReadinessProbe != nil {
4key.probeType = readiness
5w := newWorker(m, readiness, pod, c)
6m.workers[key] = w
7go w.run()
8}
9if c.LivenessProbe != nil {
10key.probeType = liveness
11w := newWorker(m, liveness, pod, c)
12m.workers[key] = w
13go w.run()
14}
根据probe中配置的PeriodSeconds周期执行probe动作。
pkg/kubelet/prober/worker.go # doProbe
1func (w *worker) doProbe() (keepGoing bool) {
2result, err := w.probeManager.prober.probe(w.probeType, w.pod, status, w.container, w.containerID, prev)
3w.resultsManager.Set(w.containerID, result, w.pod)
执行prober中的探测动作,并将探测结果保持到resultsManager中。
忽略细节,调用过程是; pkg/kubelet/prober/prober.go#probe –> pkg/kubelet/prober/prober.go#runProbeWithRetries –> pkg/kubelet/prober/prober.go#runProbe
根据配置的是HTTPGET、Exec还是TCPSocket执行不同的探测。
1func (pb *prober) runProbe(p *api.Probe, pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID) (probe.Result, string, error) {
2if p.Exec != nil {
3return pb.exec.Probe(pb.newExecInContainer(container, containerID, p.Exec.Command, timeout))
4}
5if p.HTTPGet != nil {
6return pb.http.Probe(url, headers, timeout)
7}
8if p.TCPSocket != nil {
9return pb.tcp.Probe(status.PodIP, port, timeout)
10}
11}<
分别对应pkg/probe/http/http.go,pkg/probe/tcp/tcp.go,pkg/probe/exec/exec.go 三种探测方式。各种方式也就是检查执行结果是否有错。
Tcp探测看连接后是否error
1func DoTCPProbe(addr string, timeout time.Duration) (probe.Result, string, error) {
2conn, err := net.DialTimeout(“tcp”, addr, timeout)
3if err != nil {
4// Convert errors to failures to handle timeouts.
5return probe.Failure, err.Error(), nil
6}
http探测看StatusCode
1func DoHTTPProbe(url *url.URL, headers http.Header, client HTTPGetInterface) (probe.Result, string, error) {
2req, err := http.NewRequest(“GET”, url.String(), nil)
3if err != nil {
4// Convert errors into failures to catch timeouts.
5return probe.Failure, err.Error(), nil
6}
7res, err := client.Do(req)
8if res.StatusCode >= http.StatusOK && res.StatusCode < http.StatusBadRequest {
9return probe.Success, body, nil
10}
exec探测看执行返回码
1func (pr execProber) Probe(e exec.Cmd) (probe.Result, string, error) {
2data, err := e.CombinedOutput()
3if err != nil {
4exit, ok := err.(exec.ExitError)
5if ok {
6if exit.ExitStatus() == 0 {
7return probe.Success, string(data), nil
8} else {
9return probe.Failure, string(data), nil
10}
11}
12return probe.Unknown, “”, err
至此探测流程很简单的看完了,关注下探测的结果的保存。
在prober_manager中,根据不同的探测类型,构造不同的worker,结果会保存到对应的prober_manager的readinessManager或者livenessManger中。
4 根据状态 action
pkg/kubelet/kubelet.go#NewMainKubelet
kubelet启动时候会构造podWorker
1klet.podWorkers = newPodWorkers(klet.syncPod, klet.syncPodWithProcess, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache, klet.processCache)
syncPod是kubelet一个比较核心的方法,做了包括更新为static pod创建mirrorpod,为pod创建数据目录等操作,还有就是调用容器runtime的SyncPod的回调。
1result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff)
2kl.reasonCache.Update(pod.UID, result)
pkg/kubelet/container/runtime.go
定义了对pod的各种操作。是容器运行时需要实现的接口。
1type Runtime interface {
2Type() string
3Status() (*RuntimeStatus, error)
4GetPods(all bool) ([]*Pod, error)
5GarbageCollect(gcPolicy ContainerGCPolicy, allSourcesReady bool) error
6SyncPod(pod *api.Pod, apiPodStatus api.PodStatus, podStatus *PodStatus, pullSecrets []api.Secret, backOff *flowcontrol.Backoff) PodSyncResult
7KillPod(pod *api.Pod, runningPod Pod, gracePeriodOverride *int64) error
8GetPodStatus(uid types.UID, name, namespace string) (*PodStatus, error)
9GetNetNS(containerID ContainerID) (string, error)
10GetPodContainerID(*Pod) (ContainerID, error)
11GetContainerLogs(pod *api.Pod, containerID ContainerID, logOptions *api.PodLogOptions, stdout, stderr io.Writer) (err error)
12DeleteContainer(containerID ContainerID) error
13UpdatePodCIDR(podCIDR string) error
14}
两种实现pkg/kubelet/dockertools/docker_manager.go 中的DockerManager 和
pkg/kubelet/rkt/rkt.go中Runtime
如pkg/kubelet/dockertools/docker_manager.go # SyncPod
1containerChanges, err := dm.computePodContainerChanges(pod, podStatus)
2for _, containerStatus := range runningContainerStatues {
3_, keep := containerChanges.ContainersToKeep[kubecontainer.DockerID(containerStatus.ID.ID)]
4if !keep && !keepInit {
5if err := dm.KillContainerInPod(containerStatus.ID, podContainer, pod, killMessage, nil); }
6}
pkg/kubelet/dockertools/docker_manager.go# computePodContainerChanges
1liveness, found := dm.livenessManager.Get(containerStatus.ID)
2if !found || liveness == proberesults.Success {
3containersToKeep[containerID] = index
4continue
5}
6if pod.Spec.RestartPolicy != api.RestartPolicyNever {
7message := fmt.Sprintf(“pod %q container %q is unhealthy, it will be killed and re-created.”, format.Pod(pod), container.Name)
8glog.Info(message)
9containersToStart[index] = message
10}
从livenessmanger获取对应container的状态,状态不是Success的都会被kill掉,在这里就是不在containersToKeep和initContainersToKeep的 container都会被杀掉
pkg/kubelet/rkt/rkt.go 中的Runtime的处理也是类似,从livenessmanager中获取状态,并执行杀pod的动作。
1liveness, found := r.livenessManager.Get(c.ID)
2if found && liveness != proberesults.Success && pod.Spec.RestartPolicy != api.RestartPolicyNever {
3glog.Infof(“Pod %q container %q is unhealthy, it will be killed and re-created.”, format.Pod(pod), container.Name)
4restartPod = true
5}
6delete(unidentifiedContainers, c.ID)
7}
8if restartPod {
9if err = r.KillPod(pod, runningPod, nil); }
5 总结
对于文章前probe的处理过程,加上执行的主体,整个过程应该描述如下:
Kubelet 在延迟initialDelaySeconds后以periodSeconds为周期,执行定义的探测(包括httpGet,TcpSocket或者exec),并保存探测结果。对于liveness,kubelet会读取该探测结果,杀掉对应容器。