Add cgroup cpu/mem/disk usage metrics (#16472)

* Add cgroup cpu/mem usage metrics

* checks

* comments

* docs fix

* add disk metrics

* fapi check

* checkstyle

* issues

* spelling

* change asserts

* checks

* use proc builder instead of runtime

* specify charset

* spotbug
This commit is contained in:
Adithya Chakilam 2024-05-29 14:44:37 -05:00 committed by GitHub
parent 75937c98e8
commit a9044ac235
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 717 additions and 33 deletions

View File

@ -394,6 +394,7 @@ Metric monitoring is an essential part of Druid operations. The following monito
|`org.apache.druid.java.util.metrics.JvmThreadsMonitor`|Reports Thread statistics in the JVM, like numbers of total, daemon, started, died threads.|
|`org.apache.druid.java.util.metrics.CgroupCpuMonitor`|Reports CPU shares and quotas as per the `cpu` cgroup.|
|`org.apache.druid.java.util.metrics.CgroupCpuSetMonitor`|Reports CPU core/HT and memory node allocations as per the `cpuset` cgroup.|
|`org.apache.druid.java.util.metrics.CgroupDiskMonitor`|Reports disk statistic as per the blkio cgroup.|
|`org.apache.druid.java.util.metrics.CgroupMemoryMonitor`|Reports memory statistic as per the memory cgroup.|
|`org.apache.druid.server.metrics.EventReceiverFirehoseMonitor`|Reports how many events have been queued in the EventReceiverFirehose.|
|`org.apache.druid.server.metrics.HistoricalMetricsMonitor`|Reports statistics on Historical services. Available only on Historical services.|

View File

@ -516,8 +516,17 @@ These metrics are available on operating systems with the cgroup kernel feature.
|------|-----------|----------|------------|
|`cgroup/cpu/shares`|Relative value of CPU time available to this process. Read from `cpu.shares`.||Varies|
|`cgroup/cpu/cores_quota`|Number of cores available to this process. Derived from `cpu.cfs_quota_us`/`cpu.cfs_period_us`.||Varies. A value of -1 indicates there is no explicit quota set.|
|`cgroup/cpu/usage/total/percentage`|Total cpu percentage used by cgroup of process that is running||0-100|
|`cgroup/cpu/usage/user/percentage`|User cpu percentage used by cgroup of process that is running||0-100|
|`cgroup/cpu/usage/sys/percentage`|Sys cpu percentage used by cgroup of process that is running||0-100|
|`cgroup/disk/read/size`|Reports the number of bytes transferred to specific devices by a cgroup of process that is running.|`diskName`|Varies|
|`cgroup/disk/write/size`|Reports the number of bytes transferred from specific devices by a cgroup of process that is running.|`diskName`|Varies|
|`cgroup/disk/read/count`|Reports the number of read operations performed on specific devices by a cgroup of process that is running.|`diskName`|Varies|
|`cgroup/disk/write/count`|Reports the number of write operations performed on specific devices by a cgroup of process that is running.|`diskName`|Varies|
|`cgroup/memory/*`|Memory stats for this process, such as `cache` and `total_swap`. Each stat produces a separate metric. Read from `memory.stat`.||Varies|
|`cgroup/memory_numa/*/pages`|Memory stats, per NUMA node, for this process, such as `total` and `unevictable`. Each stat produces a separate metric. Read from `memory.num_stat`.|`numaZone`|Varies|
|`cgroup/memory/limit/bytes`|Reports the maximum memory that can be used by processes in the cgroup (in bytes)||Varies|
|`cgroup/memory/usage/bytes`|Reports the maximum amount of user memory (including file cache)||Varies|
|`cgroup/cpuset/cpu_count`|Total number of CPUs available to the process. Derived from `cpuset.cpus`.||Varies|
|`cgroup/cpuset/effective_cpu_count`|Total number of active CPUs available to the process. Derived from `cpuset.effective_cpus`.||Varies|
|`cgroup/cpuset/mems_count`|Total number of memory nodes available to the process. Derived from `cpuset.mems`.||Varies|

View File

@ -20,28 +20,64 @@
package org.apache.druid.java.util.metrics;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer;
import org.apache.druid.java.util.metrics.cgroups.Cpu;
import org.apache.druid.java.util.metrics.cgroups.ProcSelfCgroupDiscoverer;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Map;
public class CgroupCpuMonitor extends FeedDefiningMonitor
{
private static final Logger LOG = new Logger(CgroupCpuMonitor.class);
private static final Long DEFAULT_USER_HZ = 100L;
public static final String TOTAL_USAGE_METRIC = "cgroup/cpu/usage/total/percentage";
public static final String USER_USAGE_METRIC = "cgroup/cpu/usage/user/percentage";
public static final String SYS_USAGE_METRIC = "cgroup/cpu/usage/sys/percentage";
private static final String TOTAL = "total";
private static final String USER = "user";
private static final String SYSTEM = "system";
final CgroupDiscoverer cgroupDiscoverer;
final Map<String, String[]> dimensions;
private Long userHz;
private KeyedDiff jiffies = new KeyedDiff();
private long prevJiffiesSnapshotAt = 0;
public CgroupCpuMonitor(CgroupDiscoverer cgroupDiscoverer, final Map<String, String[]> dimensions, String feed)
{
super(feed);
this.cgroupDiscoverer = cgroupDiscoverer;
this.dimensions = dimensions;
try {
Process p = new ProcessBuilder("getconf", "CLK_TCK").start();
try (BufferedReader in = new BufferedReader(new InputStreamReader(p.getInputStream(), StandardCharsets.UTF_8))) {
String line = in.readLine();
if (line != null) {
userHz = Long.valueOf(line.trim());
}
}
}
catch (IOException | NumberFormatException e) {
LOG.warn(e, "Error getting the USER_HZ value");
}
finally {
if (userHz == null) {
LOG.warn("Using default value for USER_HZ");
userHz = DEFAULT_USER_HZ;
}
}
}
public CgroupCpuMonitor(final Map<String, String[]> dimensions, String feed)
{
this(null, dimensions, feed);
this(new ProcSelfCgroupDiscoverer(), dimensions, feed);
}
public CgroupCpuMonitor(final Map<String, String[]> dimensions)
@ -58,7 +94,8 @@ public class CgroupCpuMonitor extends FeedDefiningMonitor
public boolean doMonitor(ServiceEmitter emitter)
{
final Cpu cpu = new Cpu(cgroupDiscoverer);
final Cpu.CpuAllocationMetric cpuSnapshot = cpu.snapshot();
final Cpu.CpuMetrics cpuSnapshot = cpu.snapshot();
long now = Instant.now().getEpochSecond();
final ServiceMetricEvent.Builder builder = builder();
MonitorUtils.addDimensionsToBuilder(builder, dimensions);
@ -68,6 +105,26 @@ public class CgroupCpuMonitor extends FeedDefiningMonitor
computeProcessorQuota(cpuSnapshot.getQuotaUs(), cpuSnapshot.getPeriodUs())
));
long elapsedJiffiesSnapshotSecs = now - prevJiffiesSnapshotAt;
if (elapsedJiffiesSnapshotSecs > 0) {
prevJiffiesSnapshotAt = now;
final Map<String, Long> elapsedJiffies = jiffies.to(
"usage",
ImmutableMap.<String, Long>builder()
.put(USER, cpuSnapshot.getUserJiffies())
.put(SYSTEM, cpuSnapshot.getSystemJiffies())
.put(TOTAL, cpuSnapshot.getTotalJiffies())
.build()
);
if (elapsedJiffies != null) {
double totalUsagePct = 100.0 * elapsedJiffies.get(TOTAL) / userHz / elapsedJiffiesSnapshotSecs;
double sysUsagePct = 100.0 * elapsedJiffies.get(SYSTEM) / userHz / elapsedJiffiesSnapshotSecs;
double userUsagePct = 100.0 * elapsedJiffies.get(USER) / userHz / elapsedJiffiesSnapshotSecs;
emitter.emit(builder.setMetric(TOTAL_USAGE_METRIC, totalUsagePct));
emitter.emit(builder.setMetric(SYS_USAGE_METRIC, sysUsagePct));
emitter.emit(builder.setMetric(USER_USAGE_METRIC, userUsagePct));
}
}
return true;
}

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.metrics;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer;
import org.apache.druid.java.util.metrics.cgroups.Disk;
import org.apache.druid.java.util.metrics.cgroups.ProcSelfCgroupDiscoverer;
import java.util.Map;
public class CgroupDiskMonitor extends FeedDefiningMonitor
{
final CgroupDiscoverer cgroupDiscoverer;
final Map<String, String[]> dimensions;
private final KeyedDiff diff = new KeyedDiff();
public CgroupDiskMonitor(CgroupDiscoverer cgroupDiscoverer, final Map<String, String[]> dimensions, String feed)
{
super(feed);
this.cgroupDiscoverer = cgroupDiscoverer;
this.dimensions = dimensions;
}
public CgroupDiskMonitor(final Map<String, String[]> dimensions, String feed)
{
this(new ProcSelfCgroupDiscoverer(), dimensions, feed);
}
public CgroupDiskMonitor(final Map<String, String[]> dimensions)
{
this(dimensions, DEFAULT_METRICS_FEED);
}
public CgroupDiskMonitor()
{
this(ImmutableMap.of());
}
@Override
public boolean doMonitor(ServiceEmitter emitter)
{
Map<String, Disk.Metrics> snapshot = new Disk(cgroupDiscoverer).snapshot();
for (Map.Entry<String, Disk.Metrics> entry : snapshot.entrySet()) {
final Map<String, Long> stats = diff.to(
entry.getKey(),
ImmutableMap.<String, Long>builder()
.put("cgroup/disk/read/bytes", entry.getValue().getReadBytes())
.put("cgroup/disk/read/count", entry.getValue().getReadCount())
.put("cgroup/disk/write/bytes", entry.getValue().getWriteBytes())
.put("cgroup/disk/write/count", entry.getValue().getWriteCount())
.build()
);
if (stats != null) {
final ServiceMetricEvent.Builder builder = builder()
.setDimension("diskName", entry.getValue().getDiskName());
MonitorUtils.addDimensionsToBuilder(builder, dimensions);
for (Map.Entry<String, Long> stat : stats.entrySet()) {
emitter.emit(builder.setMetric(stat.getKey(), stat.getValue()));
}
}
}
return true;
}
}

View File

@ -25,6 +25,7 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer;
import org.apache.druid.java.util.metrics.cgroups.Memory;
import org.apache.druid.java.util.metrics.cgroups.ProcSelfCgroupDiscoverer;
import java.util.Map;
@ -42,7 +43,7 @@ public class CgroupMemoryMonitor extends FeedDefiningMonitor
public CgroupMemoryMonitor(final Map<String, String[]> dimensions, String feed)
{
this(null, dimensions, feed);
this(new ProcSelfCgroupDiscoverer(), dimensions, feed);
}
public CgroupMemoryMonitor(final Map<String, String[]> dimensions)
@ -60,16 +61,18 @@ public class CgroupMemoryMonitor extends FeedDefiningMonitor
{
final Memory memory = new Memory(cgroupDiscoverer);
final Memory.MemoryStat stat = memory.snapshot();
final ServiceMetricEvent.Builder builder = builder();
MonitorUtils.addDimensionsToBuilder(builder, dimensions);
emitter.emit(builder.setMetric("cgroup/memory/usage/bytes", stat.getUsage()));
emitter.emit(builder.setMetric("cgroup/memory/limit/bytes", stat.getLimit()));
stat.getMemoryStats().forEach((key, value) -> {
final ServiceMetricEvent.Builder builder = builder();
MonitorUtils.addDimensionsToBuilder(builder, dimensions);
// See https://www.kernel.org/doc/Documentation/cgroup-v1/memory.txt
// There are inconsistent units for these. Most are bytes.
emitter.emit(builder.setMetric(StringUtils.format("cgroup/memory/%s", key), value));
});
stat.getNumaMemoryStats().forEach((key, value) -> {
final ServiceMetricEvent.Builder builder = builder().setDimension("numaZone", Long.toString(key));
MonitorUtils.addDimensionsToBuilder(builder, dimensions);
builder().setDimension("numaZone", Long.toString(key));
value.forEach((k, v) -> emitter.emit(builder.setMetric(StringUtils.format("cgroup/memory_numa/%s/pages", k), v)));
});
return true;

View File

@ -19,10 +19,32 @@
package org.apache.druid.java.util.metrics;
import com.google.common.primitives.Longs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.Objects;
import java.util.regex.Pattern;
public class CgroupUtil
{
private static final Logger LOG = new Logger(CgroupUtil.class);
public static final String SPACE_MATCH = Pattern.quote(" ");
public static final String COMMA_MATCH = Pattern.quote(",");
public static long readLongValue(CgroupDiscoverer discoverer, String cgroup, String fileName, long defaultValue)
{
try {
List<String> lines = Files.readAllLines(Paths.get(discoverer.discover(cgroup).toString(), fileName));
return lines.stream().map(Longs::tryParse).filter(Objects::nonNull).findFirst().orElse(defaultValue);
}
catch (RuntimeException | IOException ex) {
LOG.warn(ex, "Unable to fetch %s", fileName);
return defaultValue;
}
}
}

View File

@ -21,12 +21,13 @@ package org.apache.druid.java.util.metrics.cgroups;
import com.google.common.primitives.Longs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.metrics.CgroupUtil;
import java.io.BufferedReader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.Objects;
import java.util.regex.Pattern;
/**
* Collect CPU share and quota information from cpu cgroup files.
@ -35,6 +36,7 @@ public class Cpu
{
private static final Logger LOG = new Logger(Cpu.class);
private static final String CGROUP = "cpu";
private static final String CPUACCT_STAT_FILE = "cpuacct.stat";
private static final String CPU_SHARES_FILE = "cpu.shares";
private static final String CPU_QUOTA_FILE = "cpu.cfs_quota_us";
private static final String CPU_PERIOD_FILE = "cpu.cfs_period_us";
@ -51,28 +53,43 @@ public class Cpu
*
* @return A snapshot with the data populated.
*/
public CpuAllocationMetric snapshot()
public CpuMetrics snapshot()
{
return new CpuAllocationMetric(
readLongValue(CPU_SHARES_FILE, -1),
readLongValue(CPU_QUOTA_FILE, 0),
readLongValue(CPU_PERIOD_FILE, 0)
long userJiffies = -1L;
long systemJiffies = -1L;
try (final BufferedReader reader = Files.newBufferedReader(
Paths.get(cgroupDiscoverer.discover(CGROUP).toString(), CPUACCT_STAT_FILE)
)) {
for (String line = reader.readLine(); line != null; line = reader.readLine()) {
final String[] parts = line.split(Pattern.quote(" "));
if (parts.length != 2) {
// ignore
continue;
}
switch (parts[0]) {
case "user":
userJiffies = Longs.tryParse(parts[1]);
break;
case "system":
systemJiffies = Longs.tryParse(parts[1]);
break;
}
}
}
catch (IOException | RuntimeException ex) {
LOG.error(ex, "Unable to fetch cpu snapshot");
}
return new CpuMetrics(
CgroupUtil.readLongValue(cgroupDiscoverer, CGROUP, CPU_SHARES_FILE, -1),
CgroupUtil.readLongValue(cgroupDiscoverer, CGROUP, CPU_QUOTA_FILE, 0),
CgroupUtil.readLongValue(cgroupDiscoverer, CGROUP, CPU_PERIOD_FILE, 0),
systemJiffies, userJiffies
);
}
private long readLongValue(String fileName, long defaultValeue)
{
try {
List<String> lines = Files.readAllLines(Paths.get(cgroupDiscoverer.discover(CGROUP).toString(), fileName));
return lines.stream().map(Longs::tryParse).filter(Objects::nonNull).findFirst().orElse(defaultValeue);
}
catch (RuntimeException | IOException ex) {
LOG.error(ex, "Unable to fetch %s", fileName);
return defaultValeue;
}
}
public static class CpuAllocationMetric
public static class CpuMetrics
{
// Maps to cpu.shares - the share of CPU given to the process
private final long shares;
@ -85,11 +102,19 @@ public class Cpu
// bandwidth decisions
private final long periodUs;
CpuAllocationMetric(long shares, long quotaUs, long periodUs)
// Maps to user value at cpuacct.stat
private final long userJiffies;
// Maps to system value at cpuacct.stat
private final long systemJiffies;
CpuMetrics(long shares, long quotaUs, long periodUs, long systemJiffis, long userJiffies)
{
this.shares = shares;
this.quotaUs = quotaUs;
this.periodUs = periodUs;
this.userJiffies = userJiffies;
this.systemJiffies = systemJiffis;
}
public final long getShares()
@ -106,5 +131,20 @@ public class Cpu
{
return periodUs;
}
public long getUserJiffies()
{
return userJiffies;
}
public long getSystemJiffies()
{
return systemJiffies;
}
public long getTotalJiffies()
{
return userJiffies + systemJiffies;
}
}
}

View File

@ -0,0 +1,179 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.metrics.cgroups;
import com.google.common.primitives.Longs;
import org.apache.druid.java.util.common.logger.Logger;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Pattern;
public class Disk
{
private static final Logger LOG = new Logger(Disk.class);
private static final String CGROUP = "blkio";
private static final String IO_SERVICED_FILE = "blkio.throttle.io_serviced";
private static final String IO_SERVICE_BYTES_FILE = "blkio.throttle.io_service_bytes";
private static final String READ = "Read";
private static final String WRITE = "Write";
private final CgroupDiscoverer cgroupDiscoverer;
public Disk(CgroupDiscoverer cgroupDiscoverer)
{
this.cgroupDiscoverer = cgroupDiscoverer;
}
/**
* Take a snapshot of cpu cgroup data
*
* @return A snapshot with the data populated.
*/
public Map<String, Metrics> snapshot()
{
Map<String, Metrics> statsByDisk = new HashMap<>();
try (final BufferedReader reader = Files.newBufferedReader(
Paths.get(cgroupDiscoverer.discover(CGROUP).toString(), IO_SERVICED_FILE))) {
for (String line = reader.readLine(); line != null; line = reader.readLine()) {
final String[] parts = line.split(Pattern.quote(" "));
if (parts.length != 3) {
// ignore
continue;
}
Metrics metrics = statsByDisk.computeIfAbsent(parts[0], majorMinor -> new Metrics(majorMinor));
switch (parts[1]) {
case WRITE:
metrics.writeCount = Longs.tryParse(parts[2]);
break;
case READ:
metrics.readCount = Longs.tryParse(parts[2]);
break;
}
}
}
catch (IOException | RuntimeException ex) {
LOG.error(ex, "Unable to fetch disk snapshot");
}
try (final BufferedReader reader = Files.newBufferedReader(
Paths.get(cgroupDiscoverer.discover(CGROUP).toString(), IO_SERVICE_BYTES_FILE))) {
for (String line = reader.readLine(); line != null; line = reader.readLine()) {
final String[] parts = line.split(Pattern.quote(" "));
if (parts.length != 3) {
// ignore
continue;
}
Metrics metrics = statsByDisk.computeIfAbsent(parts[0], majorMinor -> new Metrics(majorMinor));
switch (parts[1]) {
case WRITE:
metrics.writeBytes = Longs.tryParse(parts[2]);
break;
case READ:
metrics.readBytes = Longs.tryParse(parts[2]);
break;
}
}
}
catch (IOException | RuntimeException ex) {
LOG.error(ex, "Unable to fetch memory snapshot");
}
return statsByDisk;
}
public static class Metrics
{
String diskName;
long readCount;
long writeCount;
long readBytes;
long writeBytes;
public Metrics(String majorMinor)
{
try {
File deviceFile = new File("/sys/dev/block/" + majorMinor);
if (deviceFile.exists()) {
diskName = deviceFile.getCanonicalPath();
}
}
catch (IOException e) {
LOG.warn("Unable to get disk name for " + majorMinor);
}
finally {
if (diskName == null) {
diskName = majorMinor;
}
}
}
public long getReadCount()
{
return readCount;
}
public long getWriteCount()
{
return writeCount;
}
public long getReadBytes()
{
return readBytes;
}
public long getWriteBytes()
{
return writeBytes;
}
public String getDiskName()
{
return diskName;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Metrics metrics = (Metrics) o;
return readCount == metrics.readCount
&& writeCount == metrics.writeCount
&& readBytes == metrics.readBytes
&& writeBytes == metrics.writeBytes
&& Objects.equals(diskName, metrics.diskName);
}
@Override
public int hashCode()
{
return Objects.hash(diskName, readCount, writeCount, readBytes, writeBytes);
}
}
}

View File

@ -22,6 +22,7 @@ package org.apache.druid.java.util.metrics.cgroups;
import com.google.common.collect.ImmutableMap;
import com.google.common.primitives.Longs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.metrics.CgroupUtil;
import java.io.BufferedReader;
import java.io.IOException;
@ -36,6 +37,8 @@ public class Memory
private static final Logger LOG = new Logger(Memory.class);
private static final String CGROUP = "memory";
private static final String CGROUP_MEMORY_FILE = "memory.stat";
private static final String MEMORY_USAGE_FILE = "memory.usage_in_bytes";
private static final String MEMORY_LIMIT_FILE = "memory.limit_in_bytes";
private static final String CGROUP_MEMORY_NUMA_FILE = "memory.numa_stat";
private final CgroupDiscoverer cgroupDiscoverer;
@ -47,6 +50,8 @@ public class Memory
public MemoryStat snapshot()
{
final MemoryStat memoryStat = new MemoryStat();
memoryStat.usage = CgroupUtil.readLongValue(cgroupDiscoverer, CGROUP, MEMORY_USAGE_FILE, -1);
memoryStat.limit = CgroupUtil.readLongValue(cgroupDiscoverer, CGROUP, MEMORY_LIMIT_FILE, -1);
try (final BufferedReader reader = Files.newBufferedReader(
Paths.get(cgroupDiscoverer.discover(CGROUP).toString(), CGROUP_MEMORY_FILE)
@ -102,6 +107,8 @@ public class Memory
{
private final Map<String, Long> memoryStats = new HashMap<>();
private final Map<Long, Map<String, Long>> numaMemoryStats = new HashMap<>();
private long usage;
private long limit;
public Map<String, Long> getMemoryStats()
{
@ -113,5 +120,15 @@ public class Memory
// They can modify the inner map... but why?
return ImmutableMap.copyOf(numaMemoryStats);
}
public long getUsage()
{
return usage;
}
public long getLimit()
{
return limit;
}
}
}

View File

@ -20,6 +20,7 @@
package org.apache.druid.java.util.metrics;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer;
@ -36,6 +37,7 @@ import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class CgroupCpuMonitorTest
{
@ -45,6 +47,7 @@ public class CgroupCpuMonitorTest
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private File procDir;
private File cgroupDir;
private File statFile;
private CgroupDiscoverer discoverer;
@Before
@ -60,13 +63,15 @@ public class CgroupCpuMonitorTest
);
FileUtils.mkdirp(cpuDir);
statFile = new File(cpuDir, "cpuacct.stat");
TestUtils.copyOrReplaceResource("/cpu.shares", new File(cpuDir, "cpu.shares"));
TestUtils.copyOrReplaceResource("/cpu.cfs_quota_us", new File(cpuDir, "cpu.cfs_quota_us"));
TestUtils.copyOrReplaceResource("/cpu.cfs_period_us", new File(cpuDir, "cpu.cfs_period_us"));
TestUtils.copyOrReplaceResource("/cpuacct.stat", statFile);
}
@Test
public void testMonitor()
public void testMonitor() throws IOException, InterruptedException
{
final CgroupCpuMonitor monitor = new CgroupCpuMonitor(discoverer, ImmutableMap.of(), "some_feed");
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
@ -79,6 +84,26 @@ public class CgroupCpuMonitorTest
Assert.assertEquals(1024L, sharesEvent.get("value"));
Assert.assertEquals("cgroup/cpu/cores_quota", coresEvent.get("metric"));
Assert.assertEquals(3.0D, coresEvent.get("value"));
emitter.flush();
TestUtils.copyOrReplaceResource("/cpuacct.stat-2", statFile);
// We need to pass atleast a second for the calculation to trigger
// to avoid divide by zero.
Thread.sleep(1000);
Assert.assertTrue(monitor.doMonitor(emitter));
Assert.assertTrue(
emitter
.getEvents()
.stream()
.map(e -> e.toMap().get("metric"))
.collect(Collectors.toList())
.containsAll(
ImmutableSet.of(
CgroupCpuMonitor.TOTAL_USAGE_METRIC,
CgroupCpuMonitor.USER_USAGE_METRIC,
CgroupCpuMonitor.SYS_USAGE_METRIC
)));
}
@Test

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.metrics;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer;
import org.apache.druid.java.util.metrics.cgroups.ProcCgroupDiscoverer;
import org.apache.druid.java.util.metrics.cgroups.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
public class CgroupDiskMonitorTest
{
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private File procDir;
private File cgroupDir;
private File servicedFile;
private File serviceBytesFile;
private CgroupDiscoverer discoverer;
@Before
public void setUp() throws IOException
{
cgroupDir = temporaryFolder.newFolder();
procDir = temporaryFolder.newFolder();
discoverer = new ProcCgroupDiscoverer(procDir.toPath());
TestUtils.setUpCgroups(procDir, cgroupDir);
final File blkioDir = new File(
cgroupDir,
"blkio/system.slice/some.service/"
);
FileUtils.mkdirp(blkioDir);
servicedFile = new File(blkioDir, "blkio.throttle.io_serviced");
serviceBytesFile = new File(blkioDir, "blkio.throttle.io_service_bytes");
TestUtils.copyResource("/blkio.throttle.io_service_bytes", serviceBytesFile);
TestUtils.copyResource("/blkio.throttle.io_serviced", servicedFile);
}
@Test
public void testMonitor() throws IOException
{
final CgroupDiskMonitor monitor = new CgroupDiskMonitor(discoverer, ImmutableMap.of(), "some_feed");
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
Assert.assertTrue(monitor.doMonitor(emitter));
Assert.assertEquals(0, emitter.getEvents().size());
TestUtils.copyOrReplaceResource("/blkio.throttle.io_service_bytes-2", serviceBytesFile);
TestUtils.copyOrReplaceResource("/blkio.throttle.io_serviced-2", servicedFile);
Assert.assertTrue(monitor.doMonitor(emitter));
Assert.assertEquals(8, emitter.getEvents().size());
Assert.assertTrue(
emitter
.getEvents()
.stream()
.map(e -> e.toMap().get("value"))
.allMatch(val -> Long.valueOf(10).equals(val)));
}
}

View File

@ -61,6 +61,8 @@ public class CgroupMemoryMonitorTest
FileUtils.mkdirp(memoryDir);
TestUtils.copyResource("/memory.stat", new File(memoryDir, "memory.stat"));
TestUtils.copyResource("/memory.numa_stat", new File(memoryDir, "memory.numa_stat"));
TestUtils.copyResource("/memory.usage_in_bytes", new File(memoryDir, "memory.usage_in_bytes"));
TestUtils.copyResource("/memory.limit_in_bytes", new File(memoryDir, "memory.limit_in_bytes"));
}
@Test
@ -70,6 +72,6 @@ public class CgroupMemoryMonitorTest
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
Assert.assertTrue(monitor.doMonitor(emitter));
final List<Event> actualEvents = emitter.getEvents();
Assert.assertEquals(44, actualEvents.size());
Assert.assertEquals(46, actualEvents.size());
}
}

View File

@ -52,6 +52,8 @@ public class CpuTest
TestUtils.copyOrReplaceResource("/cpu.shares", new File(cpuDir, "cpu.shares"));
TestUtils.copyOrReplaceResource("/cpu.cfs_quota_us", new File(cpuDir, "cpu.cfs_quota_us"));
TestUtils.copyOrReplaceResource("/cpu.cfs_period_us", new File(cpuDir, "cpu.cfs_period_us"));
TestUtils.copyOrReplaceResource("/cpuacct.usage", new File(cpuDir, "cpuacct.usage"));
TestUtils.copyOrReplaceResource("/cpuacct.stat", new File(cpuDir, "cpuacct.stat"));
}
@Test
@ -60,19 +62,24 @@ public class CpuTest
final Cpu cpu = new Cpu(cgroup -> {
throw new RuntimeException("Should still continue");
});
final Cpu.CpuAllocationMetric metric = cpu.snapshot();
final Cpu.CpuMetrics metric = cpu.snapshot();
Assert.assertEquals(-1L, metric.getShares());
Assert.assertEquals(0, metric.getQuotaUs());
Assert.assertEquals(0, metric.getPeriodUs());
Assert.assertEquals(-1L, metric.getSystemJiffies());
Assert.assertEquals(-1L, metric.getUserJiffies());
}
@Test
public void testSimpleLoad()
{
final Cpu cpu = new Cpu(discoverer);
final Cpu.CpuAllocationMetric snapshot = cpu.snapshot();
final Cpu.CpuMetrics snapshot = cpu.snapshot();
Assert.assertEquals(1024, snapshot.getShares());
Assert.assertEquals(300000, snapshot.getQuotaUs());
Assert.assertEquals(100000, snapshot.getPeriodUs());
Assert.assertEquals(143871L, snapshot.getSystemJiffies());
Assert.assertEquals(251183L, snapshot.getUserJiffies());
Assert.assertEquals(395054L, snapshot.getTotalJiffies());
}
}

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.metrics.cgroups;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.FileUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.util.Map;
public class DiskTest
{
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private File procDir;
private File cgroupDir;
private CgroupDiscoverer discoverer;
@Before
public void setUp() throws Exception
{
cgroupDir = temporaryFolder.newFolder();
procDir = temporaryFolder.newFolder();
discoverer = new ProcCgroupDiscoverer(procDir.toPath());
TestUtils.setUpCgroups(procDir, cgroupDir);
final File blkioDir = new File(
cgroupDir,
"blkio/system.slice/some.service"
);
FileUtils.mkdirp(blkioDir);
TestUtils.copyResource("/blkio.throttle.io_serviced", new File(blkioDir, "blkio.throttle.io_serviced"));
TestUtils.copyResource("/blkio.throttle.io_service_bytes", new File(blkioDir, "blkio.throttle.io_service_bytes"));
}
@Test
public void testWontCrash()
{
final Disk disk = new Disk((cgroup) -> {
throw new RuntimeException("shouldContinue");
});
final Map<String, Disk.Metrics> stats = disk.snapshot();
Assert.assertEquals(ImmutableMap.of(), stats);
}
@Test
public void testSimpleSnapshot()
{
final Map<String, Disk.Metrics> stats = new Disk(discoverer).snapshot();
Assert.assertEquals(ImmutableSet.of("259:0", "259:7"), stats.keySet());
Assert.assertEquals(stats.get("259:0").getReadCount(), 98L);
Assert.assertEquals(stats.get("259:0").getWriteCount(), 756L);
Assert.assertEquals(stats.get("259:0").getReadBytes(), 55000L);
Assert.assertEquals(stats.get("259:0").getWriteBytes(), 6208512L);
Assert.assertEquals(stats.get("259:7").getReadCount(), 26L);
Assert.assertEquals(stats.get("259:7").getWriteCount(), 0L);
Assert.assertEquals(stats.get("259:7").getReadBytes(), 1773568L);
Assert.assertEquals(stats.get("259:7").getWriteBytes(), 0L);
}
}

View File

@ -57,6 +57,8 @@ public class MemoryTest
FileUtils.mkdirp(memoryDir);
TestUtils.copyResource("/memory.stat", new File(memoryDir, "memory.stat"));
TestUtils.copyResource("/memory.numa_stat", new File(memoryDir, "memory.numa_stat"));
TestUtils.copyResource("/memory.usage_in_bytes", new File(memoryDir, "memory.usage_in_bytes"));
TestUtils.copyResource("/memory.limit_in_bytes", new File(memoryDir, "memory.limit_in_bytes"));
}
@Test
@ -75,6 +77,10 @@ public class MemoryTest
{
final Memory memory = new Memory(discoverer);
final Memory.MemoryStat stat = memory.snapshot();
Assert.assertEquals(5000000, stat.getUsage());
Assert.assertEquals(8000000, stat.getLimit());
final Map<String, Long> expectedMemoryStats = new HashMap<>();
expectedMemoryStats.put("inactive_anon", 0L);
expectedMemoryStats.put("total_pgfault", 13137L);

View File

@ -26,6 +26,7 @@ import org.junit.Assert;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
public class TestUtils
{
@ -65,7 +66,7 @@ public class TestUtils
public static void copyOrReplaceResource(String resource, File out) throws IOException
{
Files.copy(TestUtils.class.getResourceAsStream(resource), out.toPath());
Files.copy(TestUtils.class.getResourceAsStream(resource), out.toPath(), StandardCopyOption.REPLACE_EXISTING);
Assert.assertTrue(out.exists());
Assert.assertNotEquals(0, out.length());
}

View File

@ -0,0 +1,13 @@
259:0 Read 55000
259:0 Write 6208512
259:0 Sync 6208512
259:0 Async 0
259:0 Discard 0
259:0 Total 6263512
259:7 Read 1773568
259:7 Write 0
259:7 Sync 1773568
259:7 Async 0
259:7 Discard 0
259:7 Total 1773568
Total 8037080

View File

@ -0,0 +1,13 @@
259:0 Read 55010
259:0 Write 6208522
259:0 Sync 6208522
259:0 Async 0
259:0 Discard 0
259:0 Total 6263532
259:7 Read 1773578
259:7 Write 10
259:7 Sync 1773588
259:7 Async 0
259:7 Discard 0
259:7 Total 1773588
Total 8037120

View File

@ -0,0 +1,13 @@
259:0 Read 98
259:0 Write 756
259:0 Sync 854
259:0 Async 0
259:0 Discard 0
259:0 Total 854
259:7 Read 26
259:7 Write 0
259:7 Sync 26
259:7 Async 0
259:7 Discard 0
259:7 Total 26
Total 880

View File

@ -0,0 +1,13 @@
259:0 Read 108
259:0 Write 766
259:0 Sync 874
259:0 Async 0
259:0 Discard 0
259:0 Total 874
259:7 Read 36
259:7 Write 10
259:7 Sync 46
259:7 Async 0
259:7 Discard 0
259:7 Total 46
Total 920

View File

@ -0,0 +1,2 @@
user 251183
system 143871

View File

@ -0,0 +1,2 @@
user 251208
system 143896

View File

@ -0,0 +1 @@
5000000

View File

@ -0,0 +1 @@
8000000

View File

@ -0,0 +1 @@
5000000

View File

@ -277,6 +277,7 @@ backpressure
base64
big-endian
bigint
blkio
blobstore
Boolean
boolean