cgroup monitors: Add mem/disk/cpu usage metrics for V2 (#16905)

* cgroup monitors: Add mem/disk/cpu usage metrics for V2

* intellij inspection

* docs and checks

* fix-dos

* add comments

* comments
This commit is contained in:
Adithya Chakilam 2024-09-23 22:32:01 -05:00 committed by GitHub
parent ba8245f114
commit 8eaac2c051
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 832 additions and 35 deletions

View File

@ -396,6 +396,9 @@ Metric monitoring is an essential part of Druid operations. The following monito
|`org.apache.druid.java.util.metrics.CgroupCpuSetMonitor`|Reports CPU core/HT and memory node allocations as per the `cpuset` cgroup.|
|`org.apache.druid.java.util.metrics.CgroupDiskMonitor`|Reports disk statistic as per the blkio cgroup.|
|`org.apache.druid.java.util.metrics.CgroupMemoryMonitor`|Reports memory statistic as per the memory cgroup.|
|`org.apache.druid.java.util.metrics.CgroupV2CpuMonitor`| **EXPERIMENTAL** Reports CPU usage from `cpu.stat` file. Only applicable to `cgroupv2`.|
|`org.apache.druid.java.util.metrics.CgroupV2DiskMonitor`| **EXPERIMENTAL** Reports disk usage from `io.stat` file. Only applicable to `cgroupv2`.|
|`org.apache.druid.java.util.metrics.CgroupV2MemoryMonitor`| **EXPERIMENTAL** Reports memory usage from `memory.current` and `memory.max` files. Only applicable to `cgroupv2`.|
|`org.apache.druid.server.metrics.HistoricalMetricsMonitor`|Reports statistics on Historical services. Available only on Historical services.|
|`org.apache.druid.server.metrics.SegmentStatsMonitor` | **EXPERIMENTAL** Reports statistics about segments on Historical services. Available only on Historical services. Not to be used when lazy loading is configured.|
|`org.apache.druid.server.metrics.QueryCountStatsMonitor`|Reports how many queries have been successful/failed/interrupted.|

View File

@ -38,12 +38,6 @@ public class CgroupCpuMonitor extends FeedDefiningMonitor
{
private static final Logger LOG = new Logger(CgroupCpuMonitor.class);
private static final Long DEFAULT_USER_HZ = 100L;
public static final String TOTAL_USAGE_METRIC = "cgroup/cpu/usage/total/percentage";
public static final String USER_USAGE_METRIC = "cgroup/cpu/usage/user/percentage";
public static final String SYS_USAGE_METRIC = "cgroup/cpu/usage/sys/percentage";
private static final String TOTAL = "total";
private static final String USER = "user";
private static final String SYSTEM = "system";
final CgroupDiscoverer cgroupDiscoverer;
final Map<String, String[]> dimensions;
private Long userHz;
@ -111,18 +105,18 @@ public class CgroupCpuMonitor extends FeedDefiningMonitor
final Map<String, Long> elapsedJiffies = jiffies.to(
"usage",
ImmutableMap.<String, Long>builder()
.put(USER, cpuSnapshot.getUserJiffies())
.put(SYSTEM, cpuSnapshot.getSystemJiffies())
.put(TOTAL, cpuSnapshot.getTotalJiffies())
.put(CgroupUtil.USER, cpuSnapshot.getUserJiffies())
.put(CgroupUtil.SYSTEM, cpuSnapshot.getSystemJiffies())
.put(CgroupUtil.TOTAL, cpuSnapshot.getTotalJiffies())
.build()
);
if (elapsedJiffies != null) {
double totalUsagePct = 100.0 * elapsedJiffies.get(TOTAL) / userHz / elapsedJiffiesSnapshotSecs;
double sysUsagePct = 100.0 * elapsedJiffies.get(SYSTEM) / userHz / elapsedJiffiesSnapshotSecs;
double userUsagePct = 100.0 * elapsedJiffies.get(USER) / userHz / elapsedJiffiesSnapshotSecs;
emitter.emit(builder.setMetric(TOTAL_USAGE_METRIC, totalUsagePct));
emitter.emit(builder.setMetric(SYS_USAGE_METRIC, sysUsagePct));
emitter.emit(builder.setMetric(USER_USAGE_METRIC, userUsagePct));
double totalUsagePct = 100.0 * elapsedJiffies.get(CgroupUtil.TOTAL) / userHz / elapsedJiffiesSnapshotSecs;
double sysUsagePct = 100.0 * elapsedJiffies.get(CgroupUtil.SYSTEM) / userHz / elapsedJiffiesSnapshotSecs;
double userUsagePct = 100.0 * elapsedJiffies.get(CgroupUtil.USER) / userHz / elapsedJiffiesSnapshotSecs;
emitter.emit(builder.setMetric(CgroupUtil.CPU_TOTAL_USAGE_METRIC, totalUsagePct));
emitter.emit(builder.setMetric(CgroupUtil.CPU_SYS_USAGE_METRIC, sysUsagePct));
emitter.emit(builder.setMetric(CgroupUtil.CPU_USER_USAGE_METRIC, userUsagePct));
}
}
return true;

View File

@ -64,10 +64,10 @@ public class CgroupDiskMonitor extends FeedDefiningMonitor
final Map<String, Long> stats = diff.to(
entry.getKey(),
ImmutableMap.<String, Long>builder()
.put("cgroup/disk/read/bytes", entry.getValue().getReadBytes())
.put("cgroup/disk/read/count", entry.getValue().getReadCount())
.put("cgroup/disk/write/bytes", entry.getValue().getWriteBytes())
.put("cgroup/disk/write/count", entry.getValue().getWriteCount())
.put(CgroupUtil.DISK_READ_BYTES_METRIC, entry.getValue().getReadBytes())
.put(CgroupUtil.DISK_READ_COUNT_METRIC, entry.getValue().getReadCount())
.put(CgroupUtil.DISK_WRITE_BYTES_METRIC, entry.getValue().getWriteBytes())
.put(CgroupUtil.DISK_WRITE_COUNT_METRIC, entry.getValue().getWriteCount())
.build()
);

View File

@ -60,7 +60,7 @@ public class CgroupMemoryMonitor extends FeedDefiningMonitor
public boolean doMonitor(ServiceEmitter emitter)
{
final Memory memory = new Memory(cgroupDiscoverer);
final Memory.MemoryStat stat = memory.snapshot();
final Memory.MemoryStat stat = memory.snapshot(memoryUsageFile(), memoryLimitFile());
final ServiceMetricEvent.Builder builder = builder();
MonitorUtils.addDimensionsToBuilder(builder, dimensions);
emitter.emit(builder.setMetric("cgroup/memory/usage/bytes", stat.getUsage()));
@ -77,4 +77,14 @@ public class CgroupMemoryMonitor extends FeedDefiningMonitor
});
return true;
}
public String memoryUsageFile()
{
return "memory.usage_in_bytes";
}
public String memoryLimitFile()
{
return "memory.limit_in_bytes";
}
}

View File

@ -35,6 +35,17 @@ public class CgroupUtil
private static final Logger LOG = new Logger(CgroupUtil.class);
public static final String SPACE_MATCH = Pattern.quote(" ");
public static final String COMMA_MATCH = Pattern.quote(",");
public static final String TOTAL = "total";
public static final String USER = "user";
public static final String SYSTEM = "system";
public static final String CPU_TOTAL_USAGE_METRIC = "cgroup/cpu/usage/total/percentage";
public static final String CPU_USER_USAGE_METRIC = "cgroup/cpu/usage/user/percentage";
public static final String CPU_SYS_USAGE_METRIC = "cgroup/cpu/usage/sys/percentage";
public static final String DISK_READ_BYTES_METRIC = "cgroup/disk/read/bytes";
public static final String DISK_READ_COUNT_METRIC = "cgroup/disk/read/count";
public static final String DISK_WRITE_BYTES_METRIC = "cgroup/disk/write/bytes";
public static final String DISK_WRITE_COUNT_METRIC = "cgroup/disk/write/count";
public static long readLongValue(CgroupDiscoverer discoverer, String cgroup, String fileName, long defaultValue)
{

View File

@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.metrics;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.primitives.Longs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer;
import org.apache.druid.java.util.metrics.cgroups.Cpu;
import org.apache.druid.java.util.metrics.cgroups.ProcCgroupV2Discoverer;
import org.apache.druid.java.util.metrics.cgroups.ProcSelfCgroupDiscoverer;
import java.io.BufferedReader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;
/**
* Monitor that reports cpu usage stats by reading `cpu.stat` reported by cgroupv2
*/
public class CgroupV2CpuMonitor extends FeedDefiningMonitor
{
private static final Logger LOG = new Logger(CgroupV2CpuMonitor.class);
private static final String CPU_STAT_FILE = "cpu.stat";
private static final String SNAPSHOT = "snapshot";
final CgroupDiscoverer cgroupDiscoverer;
final Map<String, String[]> dimensions;
private final KeyedDiff diff = new KeyedDiff();
public CgroupV2CpuMonitor(CgroupDiscoverer cgroupDiscoverer, final Map<String, String[]> dimensions, String feed)
{
super(feed);
this.cgroupDiscoverer = cgroupDiscoverer;
this.dimensions = dimensions;
}
@VisibleForTesting
CgroupV2CpuMonitor(CgroupDiscoverer cgroupDiscoverer)
{
this(cgroupDiscoverer, ImmutableMap.of(), DEFAULT_METRICS_FEED);
}
CgroupV2CpuMonitor()
{
this(new ProcSelfCgroupDiscoverer(ProcCgroupV2Discoverer.class));
}
@Override
public boolean doMonitor(ServiceEmitter emitter)
{
final ServiceMetricEvent.Builder builder = builder();
MonitorUtils.addDimensionsToBuilder(builder, dimensions);
Snapshot snapshot = snapshot();
final Map<String, Long> elapsed = diff.to(
"usage",
ImmutableMap.<String, Long>builder()
.put(CgroupUtil.USER, snapshot.getUserUsec())
.put(CgroupUtil.SYSTEM, snapshot.getSystemUsec())
.put(CgroupUtil.TOTAL, snapshot.getUsageUsec())
.put(SNAPSHOT, ChronoUnit.MICROS.between(Instant.EPOCH, Instant.now()))
.build()
);
if (elapsed != null) {
long elapsedUsecs = elapsed.get(SNAPSHOT);
double totalUsagePct = 100.0 * elapsed.get(CgroupUtil.TOTAL) / elapsedUsecs;
double sysUsagePct = 100.0 * elapsed.get(CgroupUtil.SYSTEM) / elapsedUsecs;
double userUsagePct = 100.0 * elapsed.get(CgroupUtil.USER) / elapsedUsecs;
emitter.emit(builder.setMetric(CgroupUtil.CPU_TOTAL_USAGE_METRIC, totalUsagePct));
emitter.emit(builder.setMetric(CgroupUtil.CPU_SYS_USAGE_METRIC, sysUsagePct));
emitter.emit(builder.setMetric(CgroupUtil.CPU_USER_USAGE_METRIC, userUsagePct));
}
return true;
}
/*
file: cpu.stat
sample content:
usage_usec 2379951538
user_usec 1802023024
system_usec 577928513
nr_periods 1581231
nr_throttled 59
throttled_usec 3095133
*/
public Snapshot snapshot()
{
Map<String, Long> entries = new HashMap<>();
try (final BufferedReader reader = Files.newBufferedReader(
Paths.get(cgroupDiscoverer.discover(Cpu.CGROUP).toString(), CPU_STAT_FILE)
)) {
for (String line = reader.readLine(); line != null; line = reader.readLine()) {
final String[] parts = line.split(Pattern.quote(" "));
if (parts.length != 2) {
// ignore
continue;
}
entries.put(parts[0], Longs.tryParse(parts[1]));
}
}
catch (IOException | RuntimeException ex) {
LOG.error(ex, "Unable to fetch cpu snapshot");
}
return new Snapshot(entries.get("usage_usec"), entries.get("user_usec"), entries.get("system_usec"));
}
public static class Snapshot
{
private final long usageUsec;
private final long userUsec;
private final long systemUsec;
public Snapshot(long usageUsec, long userUsec, long systemUsec)
{
this.usageUsec = usageUsec;
this.userUsec = userUsec;
this.systemUsec = systemUsec;
}
public long getUsageUsec()
{
return usageUsec;
}
public long getUserUsec()
{
return userUsec;
}
public long getSystemUsec()
{
return systemUsec;
}
}
}

View File

@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.metrics;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer;
import org.apache.druid.java.util.metrics.cgroups.Disk;
import org.apache.druid.java.util.metrics.cgroups.ProcCgroupV2Discoverer;
import org.apache.druid.java.util.metrics.cgroups.ProcSelfCgroupDiscoverer;
import java.io.BufferedReader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
/**
* Monitor that reports disk usage stats by reading `io.stat` reported by cgroupv2
*/
public class CgroupV2DiskMonitor extends FeedDefiningMonitor
{
private static final Logger LOG = new Logger(CgroupV2DiskMonitor.class);
private static final String IO_STAT = "io.stat";
final CgroupDiscoverer cgroupDiscoverer;
final Map<String, String[]> dimensions;
private final KeyedDiff diff = new KeyedDiff();
public CgroupV2DiskMonitor(CgroupDiscoverer cgroupDiscoverer, final Map<String, String[]> dimensions, String feed)
{
super(feed);
this.cgroupDiscoverer = cgroupDiscoverer;
this.dimensions = dimensions;
}
@VisibleForTesting
CgroupV2DiskMonitor(CgroupDiscoverer cgroupDiscoverer)
{
this(cgroupDiscoverer, ImmutableMap.of(), DEFAULT_METRICS_FEED);
}
CgroupV2DiskMonitor()
{
this(new ProcSelfCgroupDiscoverer(ProcCgroupV2Discoverer.class));
}
@Override
public boolean doMonitor(ServiceEmitter emitter)
{
for (Disk.Metrics entry : snapshot()) {
final Map<String, Long> stats = diff.to(
entry.getDiskName(),
ImmutableMap.<String, Long>builder()
.put(CgroupUtil.DISK_READ_BYTES_METRIC, entry.getReadBytes())
.put(CgroupUtil.DISK_READ_COUNT_METRIC, entry.getReadCount())
.put(CgroupUtil.DISK_WRITE_BYTES_METRIC, entry.getWriteBytes())
.put(CgroupUtil.DISK_WRITE_COUNT_METRIC, entry.getWriteCount())
.build()
);
if (stats != null) {
final ServiceMetricEvent.Builder builder = builder()
.setDimension("diskName", entry.getDiskName());
MonitorUtils.addDimensionsToBuilder(builder, dimensions);
for (Map.Entry<String, Long> stat : stats.entrySet()) {
emitter.emit(builder.setMetric(stat.getKey(), stat.getValue()));
}
}
}
return true;
}
/*
file: io.stat
sample content:
8:0 rbytes=933898 wbytes=110870538 rios=238 wios=7132 dbytes=0 dios=0
15:0 rbytes=34566 wbytes=3466756 rios=23 wios=71 dbytes=0 dios=0
*/
public List<Disk.Metrics> snapshot()
{
List<Disk.Metrics> diskStats = new ArrayList<>();
try (final BufferedReader reader = Files.newBufferedReader(
Paths.get(cgroupDiscoverer.discover("disk").toString(), IO_STAT))) {
for (String line = reader.readLine(); line != null; line = reader.readLine()) {
Disk.Metrics disk = getDiskMetrics(line);
diskStats.add(disk);
}
}
catch (IOException | RuntimeException ex) {
LOG.error(ex, "Unable to fetch memory snapshot");
}
return diskStats;
}
private static Disk.Metrics getDiskMetrics(String line)
{
final String[] parts = line.split(Pattern.quote(" "));
Disk.Metrics disk = new Disk.Metrics(parts[0]);
Map<String, Long> stats = new HashMap<>();
for (int i = 1; i < parts.length; i++) {
String[] keyValue = parts[i].split("=");
if (keyValue.length == 2) {
stats.put(keyValue[0], Long.parseLong(keyValue[1]));
}
}
disk.setReadBytes(stats.get("rbytes"));
disk.setReadCount(stats.get("rios"));
disk.setWriteBytes(stats.get("wbytes"));
disk.setWriteCount(stats.get("wios"));
return disk;
}
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.metrics;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer;
import org.apache.druid.java.util.metrics.cgroups.ProcCgroupV2Discoverer;
import org.apache.druid.java.util.metrics.cgroups.ProcSelfCgroupDiscoverer;
/**
* Monitor that reports memory usage stats by reading `memory.*` files reported by cgroupv2
*/
public class CgroupV2MemoryMonitor extends CgroupMemoryMonitor
{
@VisibleForTesting
CgroupV2MemoryMonitor(CgroupDiscoverer cgroupDiscoverer)
{
super(cgroupDiscoverer, ImmutableMap.of(), DEFAULT_METRICS_FEED);
}
// This would be invoked when monitor is specified in config (supressing to satisy intellij-inspections)
@SuppressWarnings("unused")
CgroupV2MemoryMonitor()
{
this(new ProcSelfCgroupDiscoverer(ProcCgroupV2Discoverer.class));
}
@Override
public String memoryUsageFile()
{
return "memory.current";
}
@Override
public String memoryLimitFile()
{
return "memory.max";
}
}

View File

@ -34,8 +34,8 @@ import java.util.regex.Pattern;
*/
public class Cpu
{
public static final String CGROUP = "cpu";
private static final Logger LOG = new Logger(Cpu.class);
private static final String CGROUP = "cpu";
private static final String CPUACCT_STAT_FILE = "cpuacct.stat";
private static final String CPU_SHARES_FILE = "cpu.shares";
private static final String CPU_QUOTA_FILE = "cpu.cfs_quota_us";

View File

@ -153,6 +153,26 @@ public class Disk
return diskName;
}
public void setReadCount(long readCount)
{
this.readCount = readCount;
}
public void setWriteCount(long writeCount)
{
this.writeCount = writeCount;
}
public void setReadBytes(long readBytes)
{
this.readBytes = readBytes;
}
public void setWriteBytes(long writeBytes)
{
this.writeBytes = writeBytes;
}
@Override
public boolean equals(Object o)
{

View File

@ -37,8 +37,6 @@ public class Memory
private static final Logger LOG = new Logger(Memory.class);
private static final String CGROUP = "memory";
private static final String CGROUP_MEMORY_FILE = "memory.stat";
private static final String MEMORY_USAGE_FILE = "memory.usage_in_bytes";
private static final String MEMORY_LIMIT_FILE = "memory.limit_in_bytes";
private static final String CGROUP_MEMORY_NUMA_FILE = "memory.numa_stat";
private final CgroupDiscoverer cgroupDiscoverer;
@ -47,11 +45,11 @@ public class Memory
this.cgroupDiscoverer = cgroupDiscoverer;
}
public MemoryStat snapshot()
public MemoryStat snapshot(String memoryUsageFile, String memoryLimitFile)
{
final MemoryStat memoryStat = new MemoryStat();
memoryStat.usage = CgroupUtil.readLongValue(cgroupDiscoverer, CGROUP, MEMORY_USAGE_FILE, -1);
memoryStat.limit = CgroupUtil.readLongValue(cgroupDiscoverer, CGROUP, MEMORY_LIMIT_FILE, -1);
memoryStat.usage = CgroupUtil.readLongValue(cgroupDiscoverer, CGROUP, memoryUsageFile, -1);
memoryStat.limit = CgroupUtil.readLongValue(cgroupDiscoverer, CGROUP, memoryLimitFile, -1);
try (final BufferedReader reader = Files.newBufferedReader(
Paths.get(cgroupDiscoverer.discover(CGROUP).toString(), CGROUP_MEMORY_FILE)

View File

@ -41,7 +41,7 @@ public class ProcCgroupDiscoverer implements CgroupDiscoverer
{
private static final String CGROUP_TYPE = "cgroup";
private final File procDir;
protected final File procDir;
/**
* Create a proc discovery mechanism based on a `/proc` directory.
@ -118,7 +118,7 @@ public class ProcCgroupDiscoverer implements CgroupDiscoverer
throw new RE("Hierarchy for [%s] not found", cgroup);
}
private ProcMountsEntry getMountEntry(final File procMounts, final String cgroup)
protected ProcMountsEntry getMountEntry(final File procMounts, final String cgroup)
{
final List<String> lines;
try {

View File

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.metrics.cgroups;
import com.google.common.io.Files;
import org.apache.druid.java.util.common.RE;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
public class ProcCgroupV2Discoverer extends ProcCgroupDiscoverer
{
private static final String CGROUP_TYPE = "cgroup2";
/**
* Create a proc discovery mechanism based on a `/proc` directory.
*
* @param procDir The directory under proc. This is usually `/proc/self` or `/proc/#pid`
*/
public ProcCgroupV2Discoverer(Path procDir)
{
super(procDir);
}
@Override
public Path discover(String cgroup)
{
try {
for (final String line : Files.readLines(new File(procDir, "mounts"), StandardCharsets.UTF_8)) {
final ProcMountsEntry entry = ProcMountsEntry.parse(line);
if (CGROUP_TYPE.equals(entry.type)) {
return entry.path;
}
}
}
catch (IOException e) {
throw new RuntimeException(e);
}
throw new RE("Cgroup location not found");
}
}

View File

@ -24,11 +24,21 @@ import java.nio.file.Paths;
public class ProcSelfCgroupDiscoverer implements CgroupDiscoverer
{
private final ProcCgroupDiscoverer delegate;
private final CgroupDiscoverer delegate;
public ProcSelfCgroupDiscoverer()
{
delegate = new ProcCgroupDiscoverer(Paths.get("/proc/self"));
this(ProcCgroupDiscoverer.class);
}
public ProcSelfCgroupDiscoverer(Class<? extends CgroupDiscoverer> discoverer)
{
try {
delegate = discoverer.getDeclaredConstructor(Path.class).newInstance(Paths.get("/proc/self"));
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override

View File

@ -100,9 +100,9 @@ public class CgroupCpuMonitorTest
.collect(Collectors.toList())
.containsAll(
ImmutableSet.of(
CgroupCpuMonitor.TOTAL_USAGE_METRIC,
CgroupCpuMonitor.USER_USAGE_METRIC,
CgroupCpuMonitor.SYS_USAGE_METRIC
CgroupUtil.CPU_TOTAL_USAGE_METRIC,
CgroupUtil.CPU_USER_USAGE_METRIC,
CgroupUtil.CPU_SYS_USAGE_METRIC
)));
}

View File

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.metrics;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer;
import org.apache.druid.java.util.metrics.cgroups.ProcCgroupV2Discoverer;
import org.apache.druid.java.util.metrics.cgroups.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
public class CgroupV2CpuMonitorTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private File procDir;
private File cgroupDir;
private File statFile;
private CgroupDiscoverer discoverer;
@Before
public void setUp() throws IOException
{
cgroupDir = temporaryFolder.newFolder();
procDir = temporaryFolder.newFolder();
discoverer = new ProcCgroupV2Discoverer(procDir.toPath());
TestUtils.setUpCgroupsV2(procDir, cgroupDir);
statFile = new File(cgroupDir, "cpu.stat");
TestUtils.copyOrReplaceResource("/cgroupv2/cpu.stat", statFile);
}
@Test
public void testMonitor() throws IOException, InterruptedException
{
final CgroupV2CpuMonitor monitor = new CgroupV2CpuMonitor(discoverer);
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
Assert.assertTrue(monitor.doMonitor(emitter));
final List<Event> actualEvents = emitter.getEvents();
Assert.assertEquals(0, actualEvents.size());
emitter.flush();
TestUtils.copyOrReplaceResource("/cgroupv2/cpu.stat-2", statFile);
// We need to pass atleast a second for the calculation to trigger
// to avoid divide by zero.
Thread.sleep(1000);
Assert.assertTrue(monitor.doMonitor(emitter));
Assert.assertTrue(
emitter
.getEvents()
.stream()
.map(e -> e.toMap().get("metric"))
.collect(Collectors.toList())
.containsAll(
ImmutableSet.of(
CgroupUtil.CPU_TOTAL_USAGE_METRIC,
CgroupUtil.CPU_USER_USAGE_METRIC,
CgroupUtil.CPU_SYS_USAGE_METRIC
)));
}
}

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.metrics;
import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer;
import org.apache.druid.java.util.metrics.cgroups.ProcCgroupV2Discoverer;
import org.apache.druid.java.util.metrics.cgroups.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
public class CgroupV2DiskMonitorTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private File procDir;
private File cgroupDir;
private File statFile;
private CgroupDiscoverer discoverer;
@Before
public void setUp() throws IOException
{
cgroupDir = temporaryFolder.newFolder();
procDir = temporaryFolder.newFolder();
discoverer = new ProcCgroupV2Discoverer(procDir.toPath());
TestUtils.setUpCgroupsV2(procDir, cgroupDir);
statFile = new File(cgroupDir, "io.stat");
TestUtils.copyOrReplaceResource("/cgroupv2/io.stat", statFile);
}
@Test
public void testMonitor() throws IOException
{
final CgroupV2DiskMonitor monitor = new CgroupV2DiskMonitor(discoverer);
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
Assert.assertTrue(monitor.doMonitor(emitter));
Assert.assertEquals(0, emitter.getEvents().size());
emitter.flush();
TestUtils.copyOrReplaceResource("/cgroupv2/io.stat-2", statFile);
Assert.assertTrue(monitor.doMonitor(emitter));
Assert.assertEquals(4, emitter.getEvents().size());
Assert.assertTrue(
emitter
.getEvents()
.stream()
.map(e -> e.toMap().get("value"))
.allMatch(val -> Long.valueOf(10).equals(val)));
}
}

View File

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.metrics;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer;
import org.apache.druid.java.util.metrics.cgroups.ProcCgroupV2Discoverer;
import org.apache.druid.java.util.metrics.cgroups.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.List;
public class CgroupV2MemoryMonitorTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private File procDir;
private File cgroupDir;
private CgroupDiscoverer discoverer;
@Before
public void setUp() throws IOException
{
cgroupDir = temporaryFolder.newFolder();
procDir = temporaryFolder.newFolder();
discoverer = new ProcCgroupV2Discoverer(procDir.toPath());
TestUtils.setUpCgroupsV2(procDir, cgroupDir);
TestUtils.copyResource("/memory.stat", new File(cgroupDir, "memory.stat"));
TestUtils.copyResource("/memory.numa_stat", new File(cgroupDir, "memory.numa_stat"));
TestUtils.copyResource("/memory.usage_in_bytes", new File(cgroupDir, "memory.current"));
TestUtils.copyResource("/memory.limit_in_bytes", new File(cgroupDir, "memory.max"));
}
@Test
public void testMonitor()
{
final CgroupMemoryMonitor monitor = new CgroupV2MemoryMonitor(discoverer);
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
Assert.assertTrue(monitor.doMonitor(emitter));
final List<Event> actualEvents = emitter.getEvents();
Assert.assertEquals(46, actualEvents.size());
}
}

View File

@ -67,7 +67,7 @@ public class MemoryTest
final Memory memory = new Memory((cgroup) -> {
throw new RuntimeException("shouldContinue");
});
final Memory.MemoryStat stat = memory.snapshot();
final Memory.MemoryStat stat = memory.snapshot("memory.usage_in_bytes", "memory.limit_in_bytes");
Assert.assertEquals(ImmutableMap.of(), stat.getNumaMemoryStats());
Assert.assertEquals(ImmutableMap.of(), stat.getMemoryStats());
}
@ -76,7 +76,7 @@ public class MemoryTest
public void testSimpleSnapshot()
{
final Memory memory = new Memory(discoverer);
final Memory.MemoryStat stat = memory.snapshot();
final Memory.MemoryStat stat = memory.snapshot("memory.usage_in_bytes", "memory.limit_in_bytes");
Assert.assertEquals(5000000, stat.getUsage());
Assert.assertEquals(8000000, stat.getLimit());

View File

@ -30,6 +30,21 @@ import java.nio.file.StandardCopyOption;
public class TestUtils
{
public static void setUpCgroupsV2(
File procDir,
File cgroupDir
) throws IOException
{
final File procMounts = new File(procDir, "mounts");
copyResource("/cgroupv2/proc.mounts", procMounts);
final String procMountsString = StringUtils.fromUtf8(Files.readAllBytes(procMounts.toPath()));
Files.write(
procMounts.toPath(),
StringUtils.toUtf8(StringUtils.replace(procMountsString, "/sys/fs/cgroup", cgroupDir.getAbsolutePath()))
);
}
public static void setUpCgroups(
File procDir,
File cgroupDir

View File

@ -0,0 +1,6 @@
usage_usec 2379931538
user_usec 1802013024
system_usec 577918513
nr_periods 1581231
nr_throttled 59
throttled_usec 3095133

View File

@ -0,0 +1,6 @@
usage_usec 2379951538
user_usec 1802023024
system_usec 577928513
nr_periods 1581231
nr_throttled 59
throttled_usec 3095133

View File

@ -0,0 +1 @@
8:0 rbytes=933888 wbytes=110870528 rios=228 wios=7122 dbytes=0 dios=0

View File

@ -0,0 +1 @@
8:0 rbytes=933898 wbytes=110870538 rios=238 wios=7132 dbytes=0 dios=0

View File

@ -0,0 +1,58 @@
overlay / overlay rw,relatime,lowerdir=/var/lib/containerd/io.containerd.snapshotter.v1.overlayfs/snapshots/1276/fs:/var/lib/containerd/io.containerd.snapshotter.v1.overlayfs/snapshots/1275/fs:/var/lib/containerd/io.containerd.snapshotter.v1.overlayfs/snapshots/1274/fs:/var/lib/containerd/io.containerd.snapshotter.v1.overlayfs/snapshots/694/fs:/var/lib/containerd/io.containerd.snapshotter.v1.overlayfs/snapshots/690/fs:/var/lib/containerd/io.containerd.snapshotter.v1.overlayfs/snapshots/689/fs:/var/lib/containerd/io.containerd.snapshotter.v1.overlayfs/snapshots/687/fs,upperdir=/var/lib/containerd/io.containerd.snapshotter.v1.overlayfs/snapshots/1411/fs,workdir=/var/lib/containerd/io.containerd.snapshotter.v1.overlayfs/snapshots/1411/work 0 0
proc /proc proc rw,nosuid,nodev,noexec,relatime 0 0
tmpfs /dev tmpfs rw,nosuid,size=65536k,mode=755,inode64 0 0
devpts /dev/pts devpts rw,nosuid,noexec,relatime,gid=5,mode=620,ptmxmode=666 0 0
mqueue /dev/mqueue mqueue rw,nosuid,nodev,noexec,relatime 0 0
sysfs /sys sysfs ro,nosuid,nodev,noexec,relatime 0 0
cgroup /sys/fs/cgroup cgroup2 ro,nosuid,nodev,noexec,relatime 0 0
/dev/root /run-druid ext4 ro,relatime,discard,errors=remount-ro 0 0
/dev/root /handle-coredump ext4 ro,relatime,discard,errors=remount-ro 0 0
/dev/root /root/handle-coredump ext4 ro,relatime,discard,errors=remount-ro 0 0
/dev/root /etc/hosts ext4 rw,relatime,discard,errors=remount-ro 0 0
/dev/root /root/fetch-additional-files ext4 ro,relatime,discard,errors=remount-ro 0 0
/dev/root /root/get-keycloak-token ext4 ro,relatime,discard,errors=remount-ro 0 0
/dev/root /root/check-health ext4 ro,relatime,discard,errors=remount-ro 0 0
/dev/root /root/post-init-actions-k8s ext4 ro,relatime,discard,errors=remount-ro 0 0
/dev/root /root/readiness-check ext4 ro,relatime,discard,errors=remount-ro 0 0
/dev/root /root/remove-node ext4 ro,relatime,discard,errors=remount-ro 0 0
/dev/root /root/remove-node-v2 ext4 ro,relatime,discard,errors=remount-ro 0 0
/dev/root /root/run-druid ext4 ro,relatime,discard,errors=remount-ro 0 0
/dev/root /root/become-druid ext4 ro,relatime,discard,errors=remount-ro 0 0
/dev/root /mnt/var ext4 rw,relatime,discard,errors=remount-ro 0 0
/dev/root /etc/hostname ext4 rw,relatime,discard,errors=remount-ro 0 0
/dev/root /etc/resolv.conf ext4 rw,relatime,discard,errors=remount-ro 0 0
shm /dev/shm tmpfs rw,nosuid,nodev,noexec,relatime,size=65536k,inode64 0 0
/dev/root /dev/termination-log ext4 rw,relatime,discard,errors=remount-ro 0 0
/dev/root /root/download-bundle ext4 ro,relatime,discard,errors=remount-ro 0 0
/dev/root /opt/imply/grove-init.yaml ext4 ro,relatime,discard,errors=remount-ro 0 0
/dev/root /etc/syslog-ng/conf.d/udp_source.conf ext4 ro,relatime,discard,errors=remount-ro 0 0
tmpfs /run/secrets/azure/tokens tmpfs ro,relatime,size=7812500k,inode64 0 0
/dev/root /opt/imply/conf/logging/logging-config ext4 ro,relatime,discard,errors=remount-ro 0 0
tmpfs /run/secrets/kubernetes.io/serviceaccount tmpfs ro,relatime,size=7812500k,inode64 0 0
/dev/root /opt/imply/conf/supervise/master.conf ext4 ro,relatime,discard,errors=remount-ro 0 0
/dev/root /opt/imply/conf/supervise/druid.conf ext4 ro,relatime,discard,errors=remount-ro 0 0
/dev/root /opt/imply/conf/druid/overlord/jvm.config ext4 ro,relatime,discard,errors=remount-ro 0 0
/dev/root /opt/imply/conf/druid/overlord/1-peon-config.yaml ext4 ro,relatime,discard,errors=remount-ro 0 0
/dev/root /opt/imply/conf/druid/overlord/2-peon-config.yaml ext4 ro,relatime,discard,errors=remount-ro 0 0
/dev/root /opt/imply/conf/druid/overlord/3-peon-config.yaml ext4 ro,relatime,discard,errors=remount-ro 0 0
/dev/root /opt/imply/conf/druid/overlord/4-peon-config.yaml ext4 ro,relatime,discard,errors=remount-ro 0 0
/dev/root /opt/imply/conf/druid/overlord/base-peon-config.yaml ext4 ro,relatime,discard,errors=remount-ro 0 0
/dev/root /opt/imply/conf/druid/overlord/runtime.properties ext4 ro,relatime,discard,errors=remount-ro 0 0
/dev/root /opt/imply/conf/druid/overlord/main.config ext4 ro,relatime,discard,errors=remount-ro 0 0
/dev/root /opt/imply/conf/druid/coordinator/runtime.properties ext4 ro,relatime,discard,errors=remount-ro 0 0
/dev/root /opt/imply/conf/druid/coordinator/main.config ext4 ro,relatime,discard,errors=remount-ro 0 0
/dev/root /opt/imply/conf/druid/coordinator/jvm.config ext4 ro,relatime,discard,errors=remount-ro 0 0
/dev/root /opt/imply/conf/druid/_common/roles.json ext4 ro,relatime,discard,errors=remount-ro 0 0
/dev/root /opt/imply/conf/druid/_common/log4j2.xml ext4 ro,relatime,discard,errors=remount-ro 0 0
/dev/root /opt/imply/conf/druid/_common/common.runtime.properties ext4 ro,relatime,discard,errors=remount-ro 0 0
proc /proc/bus proc ro,nosuid,nodev,noexec,relatime 0 0
proc /proc/fs proc ro,nosuid,nodev,noexec,relatime 0 0
proc /proc/irq proc ro,nosuid,nodev,noexec,relatime 0 0
proc /proc/sys proc ro,nosuid,nodev,noexec,relatime 0 0
proc /proc/sysrq-trigger proc ro,nosuid,nodev,noexec,relatime 0 0
tmpfs /proc/acpi tmpfs ro,relatime,inode64 0 0
tmpfs /proc/kcore tmpfs rw,nosuid,size=65536k,mode=755,inode64 0 0
tmpfs /proc/keys tmpfs rw,nosuid,size=65536k,mode=755,inode64 0 0
tmpfs /proc/timer_list tmpfs rw,nosuid,size=65536k,mode=755,inode64 0 0
tmpfs /proc/scsi tmpfs ro,relatime,inode64 0 0
tmpfs /sys/firmware tmpfs ro,relatime,inode64 0 0