核心实现思路:
coroot 使用数据库 SQLite(生产环境 Click House)+ Prometheus + Opentelemetry 去做应用(网络、IO、磁盘、文件等)可视化。
coroot-node-agent 使用 eBPF(tracepoint + uprobe)+ 容器网络工具(cgroup + namespace)收集节点及其上运行的容器相关的信息,并以 Prometheus 暴露指标,支持的最低 Linux 内核版本为 4.16+。
介绍
前置条件
- Coroot 依赖 eBPF,支持的最小 Linux 版本 4.16+;
- 基于 eBPF 的持续性能分析(profiling)利用了 CO-RE。大多数现代Linux发行版都支持 CO-RE,如下所示:
- Ubuntu 20.10及以上版本
- Debian 11及以上版本
- RHEL 8.2及以上版本
- Coroot 收集指标(Metrics)、日志(Logs)、跟踪(Traces)和性能分析数据(Profiling),每个遥测 telemetry 都与容器相关联。在这个上下文中,容器指的是在专用 cgroup 中运行的一组进程。支持以下类型的容器:
- 使用Docker、Containerd 或 CRI-O 作为运行时环境的 Kubernetes Pods 等
- 独立容器:Docker、Containerd、CRI-O 等
- Docker Swarm 等
- Systemd units:任何systemd服务也被视为容器。
- 支持的容器编排系统如下所示:
- Kubernetes:自建 Kubernetes 集群、EKS(支持 AWS Fargate)、GKE、AKS、OKE
- OpenShift
- K3s
- MicroK8s
- Docker Swarm
- 限制
- 由于 eBPF 限制,Coroot 不支持 Docker-in-Docker环境,如MiniKube
- 目前还不支持WSL(Windows Subsystem for Linux)
coroot
https://github.com/coroot/coroot 是一款基于 eBPF 的开源可观测性工具,可将遥测数据转化为可操作视图,帮助快速识别和解决应用程序问题。官方示例可参考 https://community-demo.coroot.com/p/qcih204s。coroot 的架构设计基于 prometheus,同时也依赖了 eBPF,同时官方也开源了不少 exporter,比如 node,pg,aws 等。
重点功能
- Zero-instrumentation observability
- Distributed systems are no longer blackboxes
- Built-in expertise
- Explore any outlier requests with distributed tracing
- Grasp insights from logs with just a quick glance
- Profile any application in 1 click
- Deployment Tracking
- Cost Monitoring
coroot-node-agent
https://github.com/coroot/coroot-node-agent ,使用 eBPF 与 network 相关工具收集节点及其上运行的容器相关的信息,并以 Prometheus 格式公开这些指标。它使用 eBPF 来跟踪与容器相关的事件,如 TCP 连接,因此支持的最低Linux内核版本为 4.16+。
重点功能
- TCP connection tracing
- Log patterns extraction
- Delay accounting
- Out-of-memory events tracing
- Instance meta information
架构图如下所示:
片段 C 代码如下所示:
struct trace_event_raw_args_with_fd__stub {
__u64 unused;
long int id;
__u64 fd;
};
SEC("tracepoint/syscalls/sys_enter_connect")
int sys_enter_connect(void *ctx) {
struct trace_event_raw_args_with_fd__stub args = {};
if (bpf_probe_read(&args, sizeof(args), ctx) < 0) {
return 0;
}
__u64 id = bpf_get_current_pid_tgid();
bpf_map_update_elem(&fd_by_pid_tgid, &id, &args.fd, BPF_ANY);
return 0;
}
SEC("tracepoint/syscalls/sys_exit_connect")
int sys_exit_connect(void *ctx) {
__u64 id = bpf_get_current_pid_tgid();
bpf_map_delete_elem(&fd_by_pid_tgid, &id);
return 0;
}
指标:coroot 暴露的 metrics 指标,可参考:metric-exporters。核心网络、IO、磁盘、文件等指标如下所示:
root@cce-lo9d0ykc-lq252ra3:/# curl http://10.2.1.243/metrics
# HELP container_application_type Type of the application running in the container (e.g. memcached, postgres, mysql)
# TYPE container_application_type gauge
container_application_type{application_type="envoy",container_id="/k8s/istio-system/istio-egressgateway-6664b79cb7-ch98h/istio-proxy",machine_id="8a79accec39b4434969f48d0d837c935"} 1
container_application_type{application_type="envoy",container_id="/k8s/istio-system/istio-ingressgateway-787dc49b87-w4lw8/istio-proxy",machine_id="8a79accec39b4434969f48d0d837c935"} 1
container_application_type{application_type="kubelet",container_id="/system.slice/kubelet.service",machine_id="8a79accec39b4434969f48d0d837c935"} 1
# HELP node_net_received_packets_total The total number of packets received
# TYPE node_net_received_packets_total counter
node_net_received_packets_total{interface="eth0",machine_id="8a79accec39b4434969f48d0d837c935"} 1.20680598e+08
# HELP node_net_transmitted_bytes_total The total number of bytes transmitted
# TYPE node_net_transmitted_bytes_total counter
node_net_transmitted_bytes_total{interface="eth0",machine_id="8a79accec39b4434969f48d0d837c935"} 9.801357895e+09
# HELP node_net_transmitted_packets_total The total number of packets transmitted
# TYPE node_net_transmitted_packets_total counter
node_net_transmitted_packets_total{interface="eth0",machine_id="8a79accec39b4434969f48d0d837c935"} 9.4272602e+07
# HELP node_resources_cpu_logical_cores The number of logical CPU cores
# TYPE node_resources_cpu_logical_cores gauge
node_resources_cpu_logical_cores{machine_id="8a79accec39b4434969f48d0d837c935"} 2
# HELP node_resources_cpu_usage_seconds_total The amount of CPU time spent in each mode
# TYPE node_resources_cpu_usage_seconds_total counter
node_resources_cpu_usage_seconds_total{machine_id="8a79accec39b4434969f48d0d837c935",mode="idle"} 4.22894521e+06
node_resources_cpu_usage_seconds_total{machine_id="8a79accec39b4434969f48d0d837c935",mode="iowait"} 1582.97
node_resources_cpu_usage_seconds_total{machine_id="8a79accec39b4434969f48d0d837c935",mode="irq"} 0
node_resources_cpu_usage_seconds_total{machine_id="8a79accec39b4434969f48d0d837c935",mode="nice"} 349.25
node_resources_cpu_usage_seconds_total{machine_id="8a79accec39b4434969f48d0d837c935",mode="softirq"} 4837.57
node_resources_cpu_usage_seconds_total{machine_id="8a79accec39b4434969f48d0d837c935",mode="steal"} 0
node_resources_cpu_usage_seconds_total{machine_id="8a79accec39b4434969f48d0d837c935",mode="system"} 25167.31
node_resources_cpu_usage_seconds_total{machine_id="8a79accec39b4434969f48d0d837c935",mode="user"} 56604.1
# HELP node_resources_disk_io_time_seconds_total The total number of seconds the disk spent doing I/O
# TYPE node_resources_disk_io_time_seconds_total counter
node_resources_disk_io_time_seconds_total{device="vda",machine_id="8a79accec39b4434969f48d0d837c935"} 4896.908
......
核心实现思路:tracepoint (uprobe) + cilium/ebpf
- SEC(“tracepoint/sock/inet_sock_set_state”)、SEC(“tracepoint/syscalls/sys_enter_connect”)、SEC(“tracepoint/syscalls/sys_exit_connect”)
- SEC(“tracepoint/tcp/tcp_retransmit_skb”)
- SEC(“tracepoint/syscalls/sys_enter_open”)、SEC(“tracepoint/syscalls/sys_exit_open”)、SEC(“tracepoint/syscalls/sys_enter_openat”)、SEC(“tracepoint/syscalls/sys_exit_openat”)
- SEC(“tracepoint/task/task_newtask”)、SEC(“tracepoint/sched/sched_process_exit”)、SEC(“tracepoint/oom/mark_victim”)
- SEC(“uprobe/go_crypto_tls_write_enter”)、SEC(“uprobe/go_crypto_tls_read_enter”)、SEC(“uprobe/go_crypto_tls_read_exit”)
- SEC(“tracepoint/syscalls/sys_enter_write”)、SEC(“tracepoint/syscalls/sys_enter_writev”)、SEC(“tracepoint/syscalls/sys_enter_sendmsg”)、SEC(“tracepoint/syscalls/sys_enter_sendto”)、SEC(“tracepoint/syscalls/sys_enter_read”)、SEC(“tracepoint/syscalls/sys_enter_readv”)、SEC(“tracepoint/syscalls/sys_enter_recvmsg”)、SEC(“tracepoint/syscalls/sys_enter_recvfrom”)、SEC(“tracepoint/syscalls/sys_exit_read”)、SEC(“tracepoint/syscalls/sys_exit_readv”)、SEC(“tracepoint/syscalls/sys_exit_recvmsg”)、SEC(“tracepoint/syscalls/sys_exit_recvfrom”)
- SEC(“uprobe/openssl_SSL_write_enter”)、SEC(“uprobe/openssl_SSL_write_enter_v1_1_1”)、SEC(“uprobe/openssl_SSL_write_enter_v3_0”)、SEC(“uprobe/openssl_SSL_read_enter”)、SEC(“uprobe/openssl_SSL_read_ex_enter”)、SEC(“uprobe/openssl_SSL_read_enter_v1_1_1”)、SEC(“uprobe/openssl_SSL_read_ex_enter_v1_1_1”)、SEC(“uprobe/openssl_SSL_read_enter_v3_0”)、SEC(“uprobe/openssl_SSL_read_ex_enter_v3_0”)、SEC(“uprobe/openssl_SSL_read_exit”)
demo 演示
安装
参考 coroot部署文档 安装 coroot。
- 部署并创建 StorageClass,用于动态创建 pv
- 创建两个 sc
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: cds-hp
parameters:
cdsSizeInGB: "100"
dynamicVolume: "true"
paymentTiming: Prepaid
reservationLength: "3"
storageType: hdd
provisioner: csi-cdsplugin
reclaimPolicy: Retain
volumeBindingMode: Immediate
---
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: cds-hp-test
parameters:
cdsSizeInGB: "100"
dynamicVolume: "true"
paymentTiming: Prepaid
reservationLength: "3"
storageType: hdd
provisioner: csi-cdsplugin
reclaimPolicy: Retain
volumeBindingMode: Immediate
- 安装 coroot
helm repo add coroot https://coroot.github.io/helm-charts
helm repo update
helm install --namespace coroot --create-namespace coroot coroot/coroot
- 在 pvc 指定 sc,先删除 coroot 空间下所有的 pvc,再使用下面的文件创建,在
storageClassNamestorageClassName
字段声明让 pvc 与 sc 绑定。
apiVersion: v1
items:
- apiVersion: v1
kind: PersistentVolumeClaim
metadata:
annotations:
meta.helm.sh/release-name: coroot
meta.helm.sh/release-namespace: coroot
finalizers:
- kubernetes.io/pvc-protection
labels:
app.kubernetes.io/instance: coroot
app.kubernetes.io/managed-by: Helm
app.kubernetes.io/name: coroot
app.kubernetes.io/version: 0.19.2
helm.sh/chart: coroot-0.4.7
name: coroot-data
namespace: coroot
spec:
accessModes:
- ReadWriteOnce
storageClassName: cds-hp-test
resources:
requests:
storage: 10Gi
volumeMode: Filesystem
status:
phase: Pending
- apiVersion: v1
kind: PersistentVolumeClaim
metadata:
annotations:
meta.helm.sh/release-name: coroot
meta.helm.sh/release-namespace: coroot
finalizers:
- kubernetes.io/pvc-protection
labels:
app: prometheus
app.kubernetes.io/managed-by: Helm
chart: prometheus-15.16.1
component: server
heritage: Helm
release: coroot
name: coroot-prometheus-server
namespace: coroot
spec:
accessModes:
- ReadWriteOnce
storageClassName: cds-hp-test
resources:
requests:
storage: 10Gi
volumeMode: Filesystem
status:
phase: Pending
- apiVersion: v1
kind: PersistentVolumeClaim
metadata:
annotations:
meta.helm.sh/release-name: coroot
meta.helm.sh/release-namespace: coroot
finalizers:
- kubernetes.io/pvc-protection
labels:
app.kubernetes.io/instance: coroot
app.kubernetes.io/managed-by: Helm
app.kubernetes.io/name: pyroscope
app.kubernetes.io/version: 0.37.2
helm.sh/chart: pyroscope-0.2.92
name: coroot-pyroscope
namespace: coroot
spec:
accessModes:
- ReadWriteOnce
storageClassName: cds-hp-test
resources:
requests:
storage: 30Gi
volumeMode: Filesystem
status:
phase: Pending
- apiVersion: v1
kind: PersistentVolumeClaim
metadata:
finalizers:
- kubernetes.io/pvc-protection
labels:
app.kubernetes.io/component: clickhouse
app.kubernetes.io/instance: coroot
app.kubernetes.io/managed-by: Helm
app.kubernetes.io/name: clickhouse
helm.sh/chart: clickhouse-3.1.6
name: data-coroot-clickhouse-shard0-0
namespace: coroot
spec:
accessModes:
- ReadWriteOnce
storageClassName: cds-hp
resources:
requests:
storage: 50Gi
volumeMode: Filesystem
status:
phase: Pending
kind: List
metadata:
resourceVersion: ""
- 重启所有有问题的 pod
- 查看效果 http://localhost:8080/ 。
kubectl port-forward -n coroot service/coroot 8080:8080
示例
Pyroscope 是一个开源的持续分析系统,使用 Go 语言实现。服务端使用 web 页面查看,提供丰富的分析的功能,客户端提供 Go、Java、Python、Ruby、PHP、.NET 等多种语言的支持,并且支持 PUSH、PULL 两种采集方式。
代码
registry.go
func NewRegistry(reg prometheus.Registerer, kernelVersion string) (*Registry, error) {
ns, err := proc.GetSelfNetNs()
if err != nil {
return nil, err
}
selfNetNs = ns
hostNetNs, err := proc.GetHostNetNs()
if err != nil {
return nil, err
}
defer hostNetNs.Close()
hostNetNsId = hostNetNs.UniqueId()
err = proc.ExecuteInNetNs(hostNetNs, selfNetNs, func() error {
if err := TaskstatsInit(); err != nil {
return err
}
return nil
})
if err != nil {
return nil, err
}
// 初始化某些 ns 命名空间
if err := cgroup.Init(); err != nil {
return nil, err
}
// 初始化 docker
if err := DockerdInit(); err != nil {
klog.Warningln(err)
}
// 初始化 containerd
if err := ContainerdInit(); err != nil {
klog.Warningln(err)
}
// 初始化 crio
if err := CrioInit(); err != nil {
klog.Warningln(err)
}
// 初始化 journal
if err := JournaldInit(); err != nil {
klog.Warningln(err)
}
// 初始化 journal
ct, err := NewConntrack(hostNetNs)
if err != nil {
return nil, err
}
r := &Registry{
reg: reg,
events: make(chan ebpftracer.Event, 10000),
hostConntrack: ct,
containersById: map[ContainerID]*Container{},
containersByCgroupId: map[string]*Container{},
containersByPid: map[uint32]*Container{},
tracer: ebpftracer.NewTracer(kernelVersion, *flags.DisableL7Tracing),
}
// 处理事件 收集容器、主机等指标
go r.handleEvents(r.events)
// eBPF 数据与程序收集器交换媒介
if err = r.tracer.Run(r.events); err != nil {
close(r.events)
return nil, err
}
return r, nil
}
cgroup_linux.go
func Init() error {
selfNs, err := netns.GetFromPath("/proc/self/ns/cgroup")
if err != nil {
return err
}
defer selfNs.Close()
hostNs, err := netns.GetFromPath("/proc/1/ns/cgroup")
if err != nil {
return err
}
defer hostNs.Close()
if selfNs.Equal(hostNs) {
return nil
}
runtime.LockOSThread()
defer runtime.UnlockOSThread()
if err := unix.Setns(int(hostNs), unix.CLONE_NEWCGROUP); err != nil {
return err
}
cg, err := NewFromProcessCgroupFile("/proc/self/cgroup")
if err != nil {
return err
}
baseCgroupPath = cg.Id
if err := unix.Setns(int(selfNs), unix.CLONE_NEWCGROUP); err != nil {
return err
}
return nil
}
handleEvents.go
func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
gcTicker := time.NewTicker(gcInterval)
defer gcTicker.Stop()
for {
select {
case now := <-gcTicker.C:
for pid, c := range r.containersByPid {
cg, err := proc.ReadCgroup(pid)
if err != nil {
delete(r.containersByPid, pid)
if c != nil {
c.onProcessExit(pid, false)
}
continue
}
if c != nil && cg.Id != c.cgroup.Id {
delete(r.containersByPid, pid)
c.onProcessExit(pid, false)
}
}
for id, c := range r.containersById {
if !c.Dead(now) {
continue
}
klog.Infoln("deleting dead container:", id)
for cg, cc := range r.containersByCgroupId {
if cc == c {
delete(r.containersByCgroupId, cg)
}
}
for pid, cc := range r.containersByPid {
if cc == c {
delete(r.containersByPid, pid)
}
}
if ok := prometheus.WrapRegistererWith(prometheus.Labels{"container_id": string(id)}, r.reg).Unregister(c); !ok {
klog.Warningln("failed to unregister container:", id)
}
delete(r.containersById, id)
c.Close()
}
case e, more := <-ch:
if !more {
return
}
switch e.Type {
case ebpftracer.EventTypeProcessStart:
c, seen := r.containersByPid[e.Pid]
switch { // possible pids wraparound + missed `process-exit` event
case c == nil && seen: // ignored
delete(r.containersByPid, e.Pid)
case c != nil: // revalidating by cgroup
cg, err := proc.ReadCgroup(e.Pid)
if err != nil || cg.Id != c.cgroup.Id {
delete(r.containersByPid, e.Pid)
c.onProcessExit(e.Pid, false)
}
}
if c := r.getOrCreateContainer(e.Pid); c != nil {
c.onProcessStart(e.Pid)
}
case ebpftracer.EventTypeProcessExit:
if c := r.containersByPid[e.Pid]; c != nil {
c.onProcessExit(e.Pid, e.Reason == ebpftracer.EventReasonOOMKill)
}
delete(r.containersByPid, e.Pid)
case ebpftracer.EventTypeFileOpen:
if c := r.getOrCreateContainer(e.Pid); c != nil {
c.onFileOpen(e.Pid, e.Fd)
}
case ebpftracer.EventTypeListenOpen:
if c := r.getOrCreateContainer(e.Pid); c != nil {
c.onListenOpen(e.Pid, e.SrcAddr, false)
} else {
klog.Infoln("TCP listen open from unknown container", e)
}
case ebpftracer.EventTypeListenClose:
if c := r.containersByPid[e.Pid]; c != nil {
c.onListenClose(e.Pid, e.SrcAddr)
}
case ebpftracer.EventTypeConnectionOpen:
if c := r.getOrCreateContainer(e.Pid); c != nil {
c.onConnectionOpen(e.Pid, e.Fd, e.SrcAddr, e.DstAddr, e.Timestamp, false)
c.attachTlsUprobes(r.tracer, e.Pid)
} else {
klog.Infoln("TCP connection from unknown container", e)
}
case ebpftracer.EventTypeConnectionError:
if c := r.getOrCreateContainer(e.Pid); c != nil {
c.onConnectionOpen(e.Pid, e.Fd, e.SrcAddr, e.DstAddr, 0, true)
} else {
klog.Infoln("TCP connection error from unknown container", e)
}
case ebpftracer.EventTypeConnectionClose:
srcDst := AddrPair{src: e.SrcAddr, dst: e.DstAddr}
for _, c := range r.containersById {
if c.onConnectionClose(srcDst) {
break
}
}
case ebpftracer.EventTypeTCPRetransmit:
srcDst := AddrPair{src: e.SrcAddr, dst: e.DstAddr}
for _, c := range r.containersById {
if c.onRetransmit(srcDst) {
break
}
}
case ebpftracer.EventTypeL7Request:
if e.L7Request == nil {
continue
}
if c := r.containersByPid[e.Pid]; c != nil {
c.onL7Request(e.Pid, e.Fd, e.Timestamp, e.L7Request)
}
}
}
}
}
onL7Request.go
func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) {
c.lock.Lock()
defer c.lock.Unlock()
var dest AddrPair
var conn *ActiveConnection
var found bool
for dest, conn = range c.connectionsActive {
if conn.Pid == pid && conn.Fd == fd && (timestamp == 0 || conn.Timestamp == timestamp) {
found = true
break
}
}
if !found {
return
}
stats := c.l7Stats.get(r.Protocol, dest.dst, conn.ActualDest)
trace := tracing.NewTrace(string(c.id), conn.ActualDest)
switch r.Protocol {
case l7.ProtocolHTTP:
stats.observe(r.Status.Http(), "", r.Duration)
method, path := l7.ParseHttp(r.Payload)
trace.HttpRequest(method, path, r.Status, r.Duration)
case l7.ProtocolHTTP2:
if conn.http2Parser == nil {
conn.http2Parser = l7.NewHttp2Parser()
}
requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration))
for _, req := range requests {
stats.observe(req.Status.Http(), "", req.Duration)
trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
}
case l7.ProtocolPostgres:
if r.Method != l7.MethodStatementClose {
stats.observe(r.Status.String(), "", r.Duration)
}
if conn.postgresParser == nil {
conn.postgresParser = l7.NewPostgresParser()
}
query := conn.postgresParser.Parse(r.Payload)
trace.PostgresQuery(query, r.Status.Error(), r.Duration)
case l7.ProtocolMysql:
if r.Method != l7.MethodStatementClose {
stats.observe(r.Status.String(), "", r.Duration)
}
if conn.mysqlParser == nil {
conn.mysqlParser = l7.NewMysqlParser()
}
query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
trace.MysqlQuery(query, r.Status.Error(), r.Duration)
case l7.ProtocolMemcached:
stats.observe(r.Status.String(), "", r.Duration)
cmd, items := l7.ParseMemcached(r.Payload)
trace.MemcachedQuery(cmd, items, r.Status.Error(), r.Duration)
case l7.ProtocolRedis:
stats.observe(r.Status.String(), "", r.Duration)
cmd, args := l7.ParseRedis(r.Payload)
trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
case l7.ProtocolMongo:
stats.observe(r.Status.String(), "", r.Duration)
query := l7.ParseMongo(r.Payload)
trace.MongoQuery(query, r.Status.Error(), r.Duration)
case l7.ProtocolKafka, l7.ProtocolCassandra:
stats.observe(r.Status.String(), "", r.Duration)
case l7.ProtocolRabbitmq, l7.ProtocolNats:
stats.observe(r.Status.String(), r.Method.String(), 0)
}
}
container 如何收集数据?Collect.go 代码如下所示:
func (c *Container) Collect(ch chan<- prometheus.Metric) {
c.lock.RLock()
defer c.lock.RUnlock()
if c.metadata.image != "" {
ch <- gauge(metrics.ContainerInfo, 1, c.metadata.image)
}
ch <- counter(metrics.Restarts, float64(c.restarts))
if cpu, err := c.cgroup.CpuStat(); err == nil {
if cpu.LimitCores > 0 {
ch <- gauge(metrics.CPULimit, cpu.LimitCores)
}
ch <- counter(metrics.CPUUsage, cpu.UsageSeconds)
ch <- counter(metrics.ThrottledTime, cpu.ThrottledTimeSeconds)
}
if taskstatsClient != nil {
c.updateDelays()
ch <- counter(metrics.CPUDelay, float64(c.delays.cpu)/float64(time.Second))
ch <- counter(metrics.DiskDelay, float64(c.delays.disk)/float64(time.Second))
}
if s, err := c.cgroup.MemoryStat(); err == nil {
ch <- gauge(metrics.MemoryRss, float64(s.RSS))
ch <- gauge(metrics.MemoryCache, float64(s.Cache))
if s.Limit > 0 {
ch <- gauge(metrics.MemoryLimit, float64(s.Limit))
}
}
if c.oomKills > 0 {
ch <- counter(metrics.OOMKills, float64(c.oomKills))
}
if disks, err := node.GetDisks(); err == nil {
ioStat, _ := c.cgroup.IOStat()
for majorMinor, mounts := range c.getMounts() {
dev := disks.GetParentBlockDevice(majorMinor)
if dev == nil {
continue
}
for mountPoint, fsStat := range mounts {
dls := []string{mountPoint, dev.Name, c.metadata.volumes[mountPoint]}
ch <- gauge(metrics.DiskSize, float64(fsStat.CapacityBytes), dls...)
ch <- gauge(metrics.DiskUsed, float64(fsStat.UsedBytes), dls...)
ch <- gauge(metrics.DiskReserved, float64(fsStat.ReservedBytes), dls...)
if io, ok := ioStat[majorMinor]; ok {
ch <- counter(metrics.DiskReadOps, float64(io.ReadOps), dls...)
ch <- counter(metrics.DiskReadBytes, float64(io.ReadBytes), dls...)
ch <- counter(metrics.DiskWriteOps, float64(io.WriteOps), dls...)
ch <- counter(metrics.DiskWriteBytes, float64(io.WrittenBytes), dls...)
}
}
}
}
for addr, open := range c.getListens() {
ch <- gauge(metrics.NetListenInfo, float64(open), addr.String(), "")
}
for proxy, addrs := range c.getProxiedListens() {
for addr := range addrs {
ch <- gauge(metrics.NetListenInfo, 1, addr.String(), proxy)
}
}
for d, count := range c.connectsSuccessful {
ch <- counter(metrics.NetConnectsSuccessful, float64(count), d.src.String(), d.dst.String())
}
for dst, count := range c.connectsFailed {
ch <- counter(metrics.NetConnectsFailed, float64(count), dst.String())
}
for d, count := range c.retransmits {
ch <- counter(metrics.NetRetransmits, float64(count), d.src.String(), d.dst.String())
}
connections := map[AddrPair]int{}
for c, conn := range c.connectionsActive {
if !conn.Closed.IsZero() {
continue
}
connections[AddrPair{src: c.dst, dst: conn.ActualDest}]++
}
for d, count := range connections {
ch <- gauge(metrics.NetConnectionsActive, float64(count), d.src.String(), d.dst.String())
}
for source, p := range c.logParsers {
for _, c := range p.parser.GetCounters() {
ch <- counter(metrics.LogMessages, float64(c.Messages), source, c.Level.String(), c.Hash, c.Sample)
}
}
appTypes := map[string]struct{}{}
seenJvms := map[string]bool{}
for pid := range c.processes {
cmdline := proc.GetCmdline(pid)
if len(cmdline) == 0 {
continue
}
appType := guessApplicationType(cmdline)
if appType != "" {
appTypes[appType] = struct{}{}
}
if isJvm(cmdline) {
jvm, jMetrics := jvmMetrics(pid)
if len(jMetrics) > 0 && !seenJvms[jvm] {
seenJvms[jvm] = true
for _, m := range jMetrics {
ch <- m
}
}
}
}
for appType := range appTypes {
ch <- gauge(metrics.ApplicationType, 1, appType)
}
c.l7Stats.collect(ch)
if !*flags.DisablePinger {
for ip, rtt := range c.ping() {
ch <- gauge(metrics.NetLatency, rtt, ip.String())
}
}
}
coroot-node-agent 暴露了哪些指标?metric.go 代码如下所示:
package containers
import (
"github.com/coroot/coroot-node-agent/ebpftracer/l7"
"github.com/prometheus/client_golang/prometheus"
)
var metrics = struct {
ContainerInfo *prometheus.Desc
Restarts *prometheus.Desc
CPULimit *prometheus.Desc
CPUUsage *prometheus.Desc
CPUDelay *prometheus.Desc
ThrottledTime *prometheus.Desc
MemoryLimit *prometheus.Desc
MemoryRss *prometheus.Desc
MemoryCache *prometheus.Desc
OOMKills *prometheus.Desc
DiskDelay *prometheus.Desc
DiskSize *prometheus.Desc
DiskUsed *prometheus.Desc
DiskReserved *prometheus.Desc
DiskReadOps *prometheus.Desc
DiskReadBytes *prometheus.Desc
DiskWriteOps *prometheus.Desc
DiskWriteBytes *prometheus.Desc
NetListenInfo *prometheus.Desc
NetConnectsSuccessful *prometheus.Desc
NetConnectsFailed *prometheus.Desc
NetConnectionsActive *prometheus.Desc
NetRetransmits *prometheus.Desc
NetLatency *prometheus.Desc
LogMessages *prometheus.Desc
ApplicationType *prometheus.Desc
JvmInfo *prometheus.Desc
JvmHeapSize *prometheus.Desc
JvmHeapUsed *prometheus.Desc
JvmGCTime *prometheus.Desc
JvmSafepointTime *prometheus.Desc
JvmSafepointSyncTime *prometheus.Desc
}{
ContainerInfo: metric("container_info", "Meta information about the container", "image"),
Restarts: metric("container_restarts_total", "Number of times the container was restarted"),
CPULimit: metric("container_resources_cpu_limit_cores", "CPU limit of the container"),
CPUUsage: metric("container_resources_cpu_usage_seconds_total", "Total CPU time consumed by the container"),
CPUDelay: metric("container_resources_cpu_delay_seconds_total", "Total time duration processes of the container have been waiting for a CPU (while being runnable)"),
ThrottledTime: metric("container_resources_cpu_throttled_seconds_total", "Total time duration the container has been throttled"),
MemoryLimit: metric("container_resources_memory_limit_bytes", "Memory limit of the container"),
MemoryRss: metric("container_resources_memory_rss_bytes", "Amount of physical memory used by the container (doesn't include page cache)"),
MemoryCache: metric("container_resources_memory_cache_bytes", "Amount of page cache memory allocated by the container"),
OOMKills: metric("container_oom_kills_total", "Total number of times the container was terminated by the OOM killer"),
DiskDelay: metric("container_resources_disk_delay_seconds_total", "Total time duration processes of the container have been waiting fot I/Os to complete"),
DiskSize: metric("container_resources_disk_size_bytes", "Total capacity of the volume", "mount_point", "device", "volume"),
DiskUsed: metric("container_resources_disk_used_bytes", "Used capacity of the volume", "mount_point", "device", "volume"),
DiskReserved: metric("container_resources_disk_reserved_bytes", "Reserved capacity of the volume", "mount_point", "device", "volume"),
DiskReadOps: metric("container_resources_disk_reads_total", "Total number of reads completed successfully by the container", "mount_point", "device", "volume"),
DiskReadBytes: metric("container_resources_disk_read_bytes_total", "Total number of bytes read from the disk by the container", "mount_point", "device", "volume"),
DiskWriteOps: metric("container_resources_disk_writes_total", "Total number of writes completed successfully by the container", "mount_point", "device", "volume"),
DiskWriteBytes: metric("container_resources_disk_written_bytes_total", "Total number of bytes written to the disk by the container", "mount_point", "device", "volume"),
NetListenInfo: metric("container_net_tcp_listen_info", "Listen address of the container", "listen_addr", "proxy"),
NetConnectsSuccessful: metric("container_net_tcp_successful_connects_total", "Total number of successful TCP connects", "destination", "actual_destination"),
NetConnectsFailed: metric("container_net_tcp_failed_connects_total", "Total number of failed TCP connects", "destination"),
NetConnectionsActive: metric("container_net_tcp_active_connections", "Number of active outbound connections used by the container", "destination", "actual_destination"),
NetRetransmits: metric("container_net_tcp_retransmits_total", "Total number of retransmitted TCP segments", "destination", "actual_destination"),
NetLatency: metric("container_net_latency_seconds", "Round-trip time between the container and a remote IP", "destination_ip"),
LogMessages: metric("container_log_messages_total", "Number of messages grouped by the automatically extracted repeated pattern", "source", "level", "pattern_hash", "sample"),
ApplicationType: metric("container_application_type", "Type of the application running in the container (e.g. memcached, postgres, mysql)", "application_type"),
JvmInfo: metric("container_jvm_info", "Meta information about the JVM", "jvm", "java_version"),
JvmHeapSize: metric("container_jvm_heap_size_bytes", "Total heap size in bytes", "jvm"),
JvmHeapUsed: metric("container_jvm_heap_used_bytes", "Used heap size in bytes", "jvm"),
JvmGCTime: metric("container_jvm_gc_time_seconds", "Time spent in the given JVM garbage collector in seconds", "jvm", "gc"),
JvmSafepointTime: metric("container_jvm_safepoint_time_seconds", "Time the application has been stopped for safepoint operations in seconds", "jvm"),
JvmSafepointSyncTime: metric("container_jvm_safepoint_sync_time_seconds", "Time spent getting to safepoints in seconds", "jvm"),
}
var (
L7Requests = map[l7.Protocol]prometheus.CounterOpts{
l7.ProtocolHTTP: {Name: "container_http_requests_total", Help: "Total number of outbound HTTP requests"},
l7.ProtocolPostgres: {Name: "container_postgres_queries_total", Help: "Total number of outbound Postgres queries"},
l7.ProtocolRedis: {Name: "container_redis_queries_total", Help: "Total number of outbound Redis queries"},
l7.ProtocolMemcached: {Name: "container_memcached_queries_total", Help: "Total number of outbound Memcached queries"},
l7.ProtocolMysql: {Name: "container_mysql_queries_total", Help: "Total number of outbound Mysql queries"},
l7.ProtocolMongo: {Name: "container_mongo_queries_total", Help: "Total number of outbound Mongo queries"},
l7.ProtocolKafka: {Name: "container_kafka_requests_total", Help: "Total number of outbound Kafka requests"},
l7.ProtocolCassandra: {Name: "container_cassandra_queries_total", Help: "Total number of outbound Cassandra requests"},
l7.ProtocolRabbitmq: {Name: "container_rabbitmq_messages_total", Help: "Total number of Rabbitmq messages produced or consumed by the container"},
l7.ProtocolNats: {Name: "container_nats_messages_total", Help: "Total number of NATS messages produced or consumed by the container"},
}
L7Latency = map[l7.Protocol]prometheus.HistogramOpts{
l7.ProtocolHTTP: {Name: "container_http_requests_duration_seconds_total", Help: "Histogram of the response time for each outbound HTTP request"},
l7.ProtocolPostgres: {Name: "container_postgres_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Postgres query"},
l7.ProtocolRedis: {Name: "container_redis_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Redis query"},
l7.ProtocolMemcached: {Name: "container_memcached_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Memcached query"},
l7.ProtocolMysql: {Name: "container_mysql_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Mysql query"},
l7.ProtocolMongo: {Name: "container_mongo_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Mongo query"},
l7.ProtocolKafka: {Name: "container_kafka_requests_duration_seconds_total", Help: "Histogram of the execution time for each outbound Kafka request"},
l7.ProtocolCassandra: {Name: "container_cassandra_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Cassandra request"},
}
)
func metric(name, help string, labels ...string) *prometheus.Desc {
return prometheus.NewDesc(name, help, labels, nil)
}
ebpftracer.go
package ebpftracer
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"github.com/cilium/ebpf"
"github.com/cilium/ebpf/link"
"github.com/cilium/ebpf/perf"
"github.com/coroot/coroot-node-agent/common"
"github.com/coroot/coroot-node-agent/ebpftracer/l7"
"github.com/coroot/coroot-node-agent/proc"
"golang.org/x/mod/semver"
"golang.org/x/sys/unix"
"inet.af/netaddr"
"k8s.io/klog/v2"
"os"
"runtime"
"strconv"
"strings"
"time"
)
const MaxPayloadSize = 1024
type EventType uint32
type EventReason uint32
const (
EventTypeProcessStart EventType = 1
EventTypeProcessExit EventType = 2
EventTypeConnectionOpen EventType = 3
EventTypeConnectionClose EventType = 4
EventTypeConnectionError EventType = 5
EventTypeListenOpen EventType = 6
EventTypeListenClose EventType = 7
EventTypeFileOpen EventType = 8
EventTypeTCPRetransmit EventType = 9
EventTypeL7Request EventType = 10
EventReasonNone EventReason = 0
EventReasonOOMKill EventReason = 1
)
type Event struct {
Type EventType
Reason EventReason
Pid uint32
SrcAddr netaddr.IPPort
DstAddr netaddr.IPPort
Fd uint64
Timestamp uint64
L7Request *l7.RequestData
}
type Tracer struct {
kernelVersion string
disableL7Tracing bool
collection *ebpf.Collection
readers map[string]*perf.Reader
links []link.Link
uprobes map[string]*ebpf.Program
}
func NewTracer(kernelVersion string, disableL7Tracing bool) *Tracer {
if disableL7Tracing {
klog.Infoln("L7 tracing is disabled")
}
return &Tracer{
kernelVersion: kernelVersion,
disableL7Tracing: disableL7Tracing,
readers: map[string]*perf.Reader{},
uprobes: map[string]*ebpf.Program{},
}
}
func (t *Tracer) Run(events chan<- Event) error {
if err := t.ebpf(events); err != nil {
return err
}
if err := t.init(events); err != nil {
return err
}
return nil
}
func (t *Tracer) Close() {
for _, p := range t.uprobes {
_ = p.Close()
}
for _, l := range t.links {
_ = l.Close()
}
for _, r := range t.readers {
_ = r.Close()
}
t.collection.Close()
}
func (t *Tracer) init(ch chan<- Event) error {
pids, err := proc.ListPids()
if err != nil {
return fmt.Errorf("failed to list pids: %w", err)
}
for _, pid := range pids {
ch <- Event{Type: EventTypeProcessStart, Pid: pid}
}
fds, sockets := readFds(pids)
for _, fd := range fds {
ch <- Event{Type: EventTypeFileOpen, Pid: fd.pid, Fd: fd.fd}
}
listens := map[uint64]bool{}
for _, s := range sockets {
if s.Listen {
listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] = true
}
}
for _, s := range sockets {
typ := EventTypeConnectionOpen
if s.Listen {
typ = EventTypeListenOpen
} else if listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] || s.DAddr.Port() > s.SAddr.Port() { // inbound
continue
}
ch <- Event{
Type: typ,
Pid: s.pid,
Fd: s.fd,
SrcAddr: s.SAddr,
DstAddr: s.DAddr,
}
}
return nil
}
type perfMap struct {
name string
perCPUBufferSizePages int
event rawEvent
}
func (t *Tracer) ebpf(ch chan<- Event) error {
if _, ok := ebpfProg[runtime.GOARCH]; !ok {
return fmt.Errorf("unsupported architecture: %s", runtime.GOARCH)
}
kv := "v" + common.KernelMajorMinor(t.kernelVersion)
var prg []byte
for _, p := range ebpfProg[runtime.GOARCH] {
if semver.Compare(kv, p.v) >= 0 {
prg = p.p
break
}
}
if len(prg) == 0 {
return fmt.Errorf("unsupported kernel version: %s", t.kernelVersion)
}
if _, err := os.Stat("/sys/kernel/debug/tracing"); err != nil {
return fmt.Errorf("kernel tracing is not available: %w", err)
}
collectionSpec, err := ebpf.LoadCollectionSpecFromReader(bytes.NewReader(prg))
if err != nil {
return fmt.Errorf("failed to load collection spec: %w", err)
}
_ = unix.Setrlimit(unix.RLIMIT_MEMLOCK, &unix.Rlimit{Cur: unix.RLIM_INFINITY, Max: unix.RLIM_INFINITY})
c, err := ebpf.NewCollectionWithOptions(collectionSpec, ebpf.CollectionOptions{
//Programs: ebpf.ProgramOptions{LogLevel: 2, LogSize: 20 * 1024 * 1024},
})
if err != nil {
var verr *ebpf.VerifierError
if errors.As(err, &verr) {
klog.Errorf("%+v", verr)
}
return fmt.Errorf("failed to load collection: %w", err)
}
t.collection = c
// 核心部分,name 的值对应于 C 代码中的 map 名称,可见下述示例中的 C 代码
perfMaps := []perfMap{
{name: "proc_events", event: &procEvent{}, perCPUBufferSizePages: 4},
{name: "tcp_listen_events", event: &tcpEvent{}, perCPUBufferSizePages: 4},
{name: "tcp_connect_events", event: &tcpEvent{}, perCPUBufferSizePages: 8},
{name: "tcp_retransmit_events", event: &tcpEvent{}, perCPUBufferSizePages: 4},
{name: "file_events", event: &fileEvent{}, perCPUBufferSizePages: 4},
}
if !t.disableL7Tracing {
perfMaps = append(perfMaps, perfMap{name: "l7_events", event: &l7Event{}, perCPUBufferSizePages: 32})
}
for _, pm := range perfMaps {
r, err := perf.NewReader(t.collection.Maps[pm.name], pm.perCPUBufferSizePages*os.Getpagesize())
if err != nil {
t.Close()
return fmt.Errorf("failed to create ebpf reader: %w", err)
}
t.readers[pm.name] = r
go runEventsReader(pm.name, r, ch, pm.event)
}
for _, programSpec := range collectionSpec.Programs {
program := t.collection.Programs[programSpec.Name]
if t.disableL7Tracing {
switch programSpec.Name {
case "sys_enter_writev", "sys_enter_write", "sys_enter_sendto", "sys_enter_sendmsg":
continue
case "sys_enter_read", "sys_enter_readv", "sys_enter_recvfrom", "sys_enter_recvmsg":
continue
case "sys_exit_read", "sys_exit_readv", "sys_exit_recvfrom", "sys_exit_recvmsg":
continue
}
}
var l link.Link
switch programSpec.Type {
case ebpf.TracePoint:
parts := strings.SplitN(programSpec.AttachTo, "/", 2)
l, err = link.Tracepoint(parts[0], parts[1], program, nil)
case ebpf.Kprobe:
if strings.HasPrefix(programSpec.SectionName, "uprobe/") {
t.uprobes[programSpec.Name] = program
continue
}
l, err = link.Kprobe(programSpec.AttachTo, program, nil)
}
if err != nil {
t.Close()
return fmt.Errorf("failed to link program: %w", err)
}
t.links = append(t.links, l)
}
return nil
}
func (t EventType) String() string {
switch t {
case EventTypeProcessStart:
return "process-start"
case EventTypeProcessExit:
return "process-exit"
case EventTypeConnectionOpen:
return "connection-open"
case EventTypeConnectionClose:
return "connection-close"
case EventTypeConnectionError:
return "connection-error"
case EventTypeListenOpen:
return "listen-open"
case EventTypeListenClose:
return "listen-close"
case EventTypeFileOpen:
return "file-open"
case EventTypeTCPRetransmit:
return "tcp-retransmit"
case EventTypeL7Request:
return "l7-request"
}
return "unknown: " + strconv.Itoa(int(t))
}
func (t EventReason) String() string {
switch t {
case EventReasonNone:
return "none"
case EventReasonOOMKill:
return "oom-kill"
}
return "unknown: " + strconv.Itoa(int(t))
}
type rawEvent interface {
Event() Event
}
type procEvent struct {
Type uint32
Pid uint32
Reason uint32
}
func (e procEvent) Event() Event {
return Event{Type: EventType(e.Type), Reason: EventReason(e.Reason), Pid: e.Pid}
}
type tcpEvent struct {
Fd uint64
Timestamp uint64
Type uint32
Pid uint32
SPort uint16
DPort uint16
SAddr [16]byte
DAddr [16]byte
}
func (e tcpEvent) Event() Event {
return Event{
Type: EventType(e.Type),
Pid: e.Pid,
SrcAddr: ipPort(e.SAddr, e.SPort),
DstAddr: ipPort(e.DAddr, e.DPort),
Fd: e.Fd,
Timestamp: e.Timestamp,
}
}
type fileEvent struct {
Type uint32
Pid uint32
Fd uint64
}
func (e fileEvent) Event() Event {
return Event{Type: EventType(e.Type), Pid: e.Pid, Fd: e.Fd}
}
type l7Event struct {
Fd uint64
ConnectionTimestamp uint64
Pid uint32
Status uint32
Duration uint64
Protocol uint8
Method uint8
Padding uint16
StatementId uint32
PayloadSize uint64
Payload [MaxPayloadSize]byte
}
func (e l7Event) Event() Event {
r := &l7.RequestData{
Protocol: l7.Protocol(e.Protocol),
Status: l7.Status(e.Status),
Duration: time.Duration(e.Duration),
Method: l7.Method(e.Method),
StatementId: e.StatementId,
}
switch {
case e.PayloadSize == 0:
case e.PayloadSize > MaxPayloadSize:
r.Payload = e.Payload[:MaxPayloadSize]
default:
r.Payload = e.Payload[:e.PayloadSize]
}
return Event{Type: EventTypeL7Request, Pid: e.Pid, Fd: e.Fd, Timestamp: e.ConnectionTimestamp, L7Request: r}
}
func runEventsReader(name string, r *perf.Reader, ch chan<- Event, e rawEvent) {
for {
rec, err := r.Read()
if err != nil {
if errors.Is(err, perf.ErrClosed) {
break
}
continue
}
if rec.LostSamples > 0 {
klog.Errorln(name, "lost samples:", rec.LostSamples)
continue
}
if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, e); err != nil {
klog.Warningln("failed to read msg:", err)
continue
}
ch <- e.Event()
}
}
func ipPort(ip [16]byte, port uint16) netaddr.IPPort {
i, _ := netaddr.FromStdIP(ip[:])
return netaddr.IPPortFrom(i, port)
}
retransmit.c 重传代码如下所示:
struct {
__uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
__uint(key_size, sizeof(int));
__uint(value_size, sizeof(int));
} tcp_retransmit_events SEC(".maps");
struct trace_event_raw_tcp_event_sk_skb__stub {
__u64 unused;
void *sbkaddr;
void *skaddr;
#if __KERNEL_FROM >= 420
int state;
#endif
__u16 sport;
__u16 dport;
#if __KERNEL_FROM >= 512
__u16 family;
#endif
__u8 saddr[4];
__u8 daddr[4];
__u8 saddr_v6[16];
__u8 daddr_v6[16];
};
SEC("tracepoint/tcp/tcp_retransmit_skb")
int tcp_retransmit_skb(struct trace_event_raw_tcp_event_sk_skb__stub *args)
{
struct tcp_event e = {
.type = EVENT_TYPE_TCP_RETRANSMIT,
.sport = args->sport,
.dport = args->dport,
};
__builtin_memcpy(&e.saddr, &args->saddr_v6, sizeof(e.saddr));
__builtin_memcpy(&e.daddr, &args->daddr_v6, sizeof(e.daddr));
bpf_perf_event_output(args, &tcp_retransmit_events, BPF_F_CURRENT_CPU, &e, sizeof(e));
return 0;
}