Add cgroup memory monitor (#5866)

* Add cgroup memory monitor

* Port of https://github.com/metamx/java-util/pull/67

* Fix copyright

* Don't use `String.format`
This commit is contained in:
Charles Allen 2018-06-18 10:03:44 -07:00 committed by GitHub
parent b4b1b2a020
commit 8dc4aca25f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 449 additions and 10 deletions

View File

@ -0,0 +1,77 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.java.util.metrics;
import com.google.common.collect.ImmutableMap;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.java.util.emitter.service.ServiceMetricEvent;
import io.druid.java.util.metrics.cgroups.CgroupDiscoverer;
import io.druid.java.util.metrics.cgroups.Memory;
import java.util.Map;
public class CgroupMemoryMonitor extends FeedDefiningMonitor
{
final CgroupDiscoverer cgroupDiscoverer;
final Map<String, String[]> dimensions;
public CgroupMemoryMonitor(CgroupDiscoverer cgroupDiscoverer, final Map<String, String[]> dimensions, String feed)
{
super(feed);
this.cgroupDiscoverer = cgroupDiscoverer;
this.dimensions = dimensions;
}
public CgroupMemoryMonitor(final Map<String, String[]> dimensions, String feed)
{
this(null, dimensions, feed);
}
public CgroupMemoryMonitor(final Map<String, String[]> dimensions)
{
this(dimensions, DEFAULT_METRICS_FEED);
}
public CgroupMemoryMonitor()
{
this(ImmutableMap.of());
}
@Override
public boolean doMonitor(ServiceEmitter emitter)
{
final Memory memory = new Memory(cgroupDiscoverer);
final Memory.MemoryStat stat = memory.snapshot();
stat.getMemoryStats().forEach((key, value) -> {
final ServiceMetricEvent.Builder builder = builder();
MonitorUtils.addDimensionsToBuilder(builder, dimensions);
// See https://www.kernel.org/doc/Documentation/cgroup-v1/memory.txt
// There are inconsistent units for these. Most are bytes.
emitter.emit(builder.build(StringUtils.format("cgroup/memory/%s", key), value));
});
stat.getNumaMemoryStats().forEach((key, value) -> {
final ServiceMetricEvent.Builder builder = builder().setDimension("numaZone", Long.toString(key));
MonitorUtils.addDimensionsToBuilder(builder, dimensions);
value.forEach((k, v) -> emitter.emit(builder.build(StringUtils.format("cgroup/memory_numa/%s/pages", k), v)));
});
return true;
}
}

View File

@ -0,0 +1,117 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.java.util.metrics.cgroups;
import com.google.common.collect.ImmutableMap;
import com.google.common.primitives.Longs;
import io.druid.java.util.common.logger.Logger;
import java.io.BufferedReader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;
public class Memory
{
private static final Logger LOG = new Logger(Memory.class);
private static final String CGROUP = "memory";
private static final String CGROUP_MEMORY_FILE = "memory.stat";
private static final String CGROUP_MEMORY_NUMA_FILE = "memory.numa_stat";
private final CgroupDiscoverer cgroupDiscoverer;
public Memory(CgroupDiscoverer cgroupDiscoverer)
{
this.cgroupDiscoverer = cgroupDiscoverer;
}
public MemoryStat snapshot()
{
final MemoryStat memoryStat = new MemoryStat();
try (final BufferedReader reader = Files.newBufferedReader(
Paths.get(cgroupDiscoverer.discover(CGROUP).toString(), CGROUP_MEMORY_FILE)
)) {
for (String line = reader.readLine(); line != null; line = reader.readLine()) {
final String[] parts = line.split(Pattern.quote(" "));
if (parts.length != 2) {
// ignore
break;
}
final Long val = Longs.tryParse(parts[1]);
if (val == null) {
// Ignore
break;
}
memoryStat.memoryStats.put(parts[0], val);
}
}
catch (IOException | RuntimeException ex) {
LOG.error(ex, "Unable to fetch memory snapshot");
}
try (final BufferedReader reader = Files.newBufferedReader(
Paths.get(cgroupDiscoverer.discover(CGROUP).toString(), CGROUP_MEMORY_NUMA_FILE)
)) {
for (String line = reader.readLine(); line != null; line = reader.readLine()) {
// No safety checks here. Just fail as RuntimeException and catch later
final String[] parts = line.split(Pattern.quote(" "));
final String[] macro = parts[0].split(Pattern.quote("="));
final String label = macro[0];
// Ignored
//final long total = Long.parseLong(macro[1]);
for (int i = 1; i < macro.length; ++i) {
final String[] numaParts = parts[i].split(Pattern.quote("="));
final long nodeNum = Long.parseLong(numaParts[0].substring(1));
final long val = Long.parseLong(numaParts[1]);
final Map<String, Long> nodeMetrics = memoryStat.numaMemoryStats.computeIfAbsent(
nodeNum,
l -> new HashMap<>()
);
nodeMetrics.put(label, val);
}
}
}
catch (RuntimeException | IOException e) {
LOG.error(e, "Unable to fetch memory_numa snapshot");
}
return memoryStat;
}
public static class MemoryStat
{
private final Map<String, Long> memoryStats = new HashMap<>();
private final Map<Long, Map<String, Long>> numaMemoryStats = new HashMap<>();
public Map<String, Long> getMemoryStats()
{
return ImmutableMap.copyOf(memoryStats);
}
public Map<Long, Map<String, Long>> getNumaMemoryStats()
{
// They can modify the inner map... but why?
return ImmutableMap.copyOf(numaMemoryStats);
}
}
}

View File

@ -0,0 +1,73 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.java.util.metrics;
import com.google.common.collect.ImmutableMap;
import io.druid.java.util.emitter.core.Event;
import io.druid.java.util.metrics.cgroups.CgroupDiscoverer;
import io.druid.java.util.metrics.cgroups.ProcCgroupDiscoverer;
import io.druid.java.util.metrics.cgroups.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.List;
public class CgroupMemoryMonitorTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private File procDir;
private File cgroupDir;
private CgroupDiscoverer discoverer;
@Before
public void setUp() throws IOException
{
cgroupDir = temporaryFolder.newFolder();
procDir = temporaryFolder.newFolder();
discoverer = new ProcCgroupDiscoverer(procDir.toPath());
TestUtils.setUpCgroups(procDir, cgroupDir);
final File memoryDir = new File(
cgroupDir,
"memory/system.slice/some.service"
);
Assert.assertTrue((memoryDir.isDirectory() && memoryDir.exists()) || memoryDir.mkdirs());
TestUtils.copyResource("/memory.stat", new File(memoryDir, "memory.stat"));
TestUtils.copyResource("/memory.numa_stat", new File(memoryDir, "memory.numa_stat"));
}
@Test
public void testMonitor()
{
final CgroupMemoryMonitor monitor = new CgroupMemoryMonitor(discoverer, ImmutableMap.of(), "some_feed");
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
Assert.assertTrue(monitor.doMonitor(emitter));
final List<Event> actualEvents = emitter.getEvents();
Assert.assertEquals(44, actualEvents.size());
}
}

View File

@ -51,7 +51,7 @@ public class CpuAcctDeltaMonitorTest
TestUtils.setUpCgroups(procDir, cgroupDir);
cpuacctDir = new File(
cgroupDir,
"cpu,cpuacct/system.slice/mesos-agent-druid.service/f12ba7e0-fa16-462e-bb9d-652ccc27f0ee"
"cpu,cpuacct/system.slice/some.service/f12ba7e0-fa16-462e-bb9d-652ccc27f0ee"
);
Assert.assertTrue((cpuacctDir.isDirectory() && cpuacctDir.exists()) || cpuacctDir.mkdirs());
TestUtils.copyResource("/cpuacct.usage_all", new File(cpuacctDir, "cpuacct.usage_all"));

View File

@ -52,7 +52,7 @@ public class CpuAcctTest
TestUtils.setUpCgroups(procDir, cgroupDir);
final File cpuacctDir = new File(
cgroupDir,
"cpu,cpuacct/system.slice/mesos-agent-druid.service/f12ba7e0-fa16-462e-bb9d-652ccc27f0ee"
"cpu,cpuacct/system.slice/some.service/f12ba7e0-fa16-462e-bb9d-652ccc27f0ee"
);
Assert.assertTrue((cpuacctDir.isDirectory() && cpuacctDir.exists()) || cpuacctDir.mkdirs());
TestUtils.copyResource("/cpuacct.usage_all", new File(cpuacctDir, "cpuacct.usage_all"));

View File

@ -0,0 +1,128 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.java.util.metrics.cgroups;
import com.google.common.collect.ImmutableMap;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
public class MemoryTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private File procDir;
private File cgroupDir;
private CgroupDiscoverer discoverer;
@Before
public void setUp() throws Exception
{
cgroupDir = temporaryFolder.newFolder();
procDir = temporaryFolder.newFolder();
discoverer = new ProcCgroupDiscoverer(procDir.toPath());
TestUtils.setUpCgroups(procDir, cgroupDir);
final File memoryDir = new File(
cgroupDir,
"memory/system.slice/some.service"
);
Assert.assertTrue((memoryDir.isDirectory() && memoryDir.exists()) || memoryDir.mkdirs());
TestUtils.copyResource("/memory.stat", new File(memoryDir, "memory.stat"));
TestUtils.copyResource("/memory.numa_stat", new File(memoryDir, "memory.numa_stat"));
}
@Test
public void testWontCrash()
{
final Memory memory = new Memory((cgroup) -> {
throw new RuntimeException("shouldContinue");
});
final Memory.MemoryStat stat = memory.snapshot();
Assert.assertEquals(ImmutableMap.of(), stat.getNumaMemoryStats());
Assert.assertEquals(ImmutableMap.of(), stat.getMemoryStats());
}
@Test
public void testSimpleSnapshot()
{
final Memory memory = new Memory(discoverer);
final Memory.MemoryStat stat = memory.snapshot();
final Map<String, Long> expectedMemoryStats = new HashMap<>();
expectedMemoryStats.put("inactive_anon", 0L);
expectedMemoryStats.put("total_pgfault", 13137L);
expectedMemoryStats.put("total_unevictable", 0L);
expectedMemoryStats.put("pgfault", 13137L);
expectedMemoryStats.put("mapped_file", 1327104L);
expectedMemoryStats.put("total_pgpgout", 5975L);
expectedMemoryStats.put("total_active_anon", 1757184L);
expectedMemoryStats.put("total_rss", 1818624L);
expectedMemoryStats.put("rss", 1818624L);
expectedMemoryStats.put("total_inactive_anon", 0L);
expectedMemoryStats.put("active_file", 5873664L);
expectedMemoryStats.put("total_swap", 0L);
expectedMemoryStats.put("dirty", 0L);
expectedMemoryStats.put("total_mapped_file", 1327104L);
expectedMemoryStats.put("total_rss_huge", 0L);
expectedMemoryStats.put("total_inactive_file", 2019328L);
expectedMemoryStats.put("cache", 7892992L);
expectedMemoryStats.put("rss_huge", 0L);
expectedMemoryStats.put("shmem", 0L);
expectedMemoryStats.put("swap", 0L);
expectedMemoryStats.put("total_pgpgin", 8346L);
expectedMemoryStats.put("unevictable", 0L);
expectedMemoryStats.put("active_anon", 1757184L);
expectedMemoryStats.put("total_dirty", 0L);
expectedMemoryStats.put("total_active_file", 5873664L);
expectedMemoryStats.put("hierarchical_memory_limit", 9223372036854771712L);
expectedMemoryStats.put("total_cache", 7892992L);
expectedMemoryStats.put("pgpgin", 8346L);
expectedMemoryStats.put("pgmajfault", 120L);
expectedMemoryStats.put("inactive_file", 2019328L);
expectedMemoryStats.put("hierarchical_memsw_limit", 9223372036854771712L);
expectedMemoryStats.put("writeback", 0L);
expectedMemoryStats.put("total_shmem", 0L);
expectedMemoryStats.put("pgpgout", 5975L);
expectedMemoryStats.put("total_pgmajfault", 120L);
expectedMemoryStats.put("total_writeback", 0L);
Assert.assertEquals(expectedMemoryStats, stat.getMemoryStats());
final Map<Long, Map<String, Long>> expectedMemoryNumaStats = new HashMap<>();
final Map<String, Long> expectedNumaNode0Stats = new HashMap<>();
expectedNumaNode0Stats.put("anon", 432L);
expectedNumaNode0Stats.put("total", 2359L);
expectedNumaNode0Stats.put("hierarchical_total", 2359L);
expectedNumaNode0Stats.put("file", 1927L);
expectedNumaNode0Stats.put("unevictable", 0L);
expectedNumaNode0Stats.put("hierarchical_file", 1927L);
expectedNumaNode0Stats.put("hierarchical_anon", 432L);
expectedNumaNode0Stats.put("hierarchical_unevictable", 0L);
expectedMemoryNumaStats.put(0L, expectedNumaNode0Stats);
Assert.assertEquals(expectedMemoryNumaStats, stat.getNumaMemoryStats());
}
}

View File

@ -55,7 +55,7 @@ public class ProcCgroupDiscovererTest
Assert.assertEquals(
new File(
cgroupDir,
"cpu,cpuacct/system.slice/mesos-agent-druid.service/f12ba7e0-fa16-462e-bb9d-652ccc27f0ee"
"cpu,cpuacct/system.slice/some.service/f12ba7e0-fa16-462e-bb9d-652ccc27f0ee"
).toPath(),
discoverer.discover("cpu")
);

View File

@ -49,7 +49,7 @@ public class TestUtils
Assert.assertTrue(new File(
cgroupDir,
"cpu,cpuacct/system.slice/mesos-agent-druid.service/f12ba7e0-fa16-462e-bb9d-652ccc27f0ee"
"cpu,cpuacct/system.slice/some.service/f12ba7e0-fa16-462e-bb9d-652ccc27f0ee"
).mkdirs());
copyResource("/proc.pid.cgroup", new File(procDir, "cgroup"));
}

View File

@ -0,0 +1,8 @@
total=2359 N0=2359
file=1927 N0=1927
anon=432 N0=432
unevictable=0 N0=0
hierarchical_total=2359 N0=2359
hierarchical_file=1927 N0=1927
hierarchical_anon=432 N0=432
hierarchical_unevictable=0 N0=0

View File

@ -0,0 +1,36 @@
cache 7892992
rss 1818624
rss_huge 0
shmem 0
mapped_file 1327104
dirty 0
writeback 0
swap 0
pgpgin 8346
pgpgout 5975
pgfault 13137
pgmajfault 120
inactive_anon 0
active_anon 1757184
inactive_file 2019328
active_file 5873664
unevictable 0
hierarchical_memory_limit 9223372036854771712
hierarchical_memsw_limit 9223372036854771712
total_cache 7892992
total_rss 1818624
total_rss_huge 0
total_shmem 0
total_mapped_file 1327104
total_dirty 0
total_writeback 0
total_swap 0
total_pgpgin 8346
total_pgpgout 5975
total_pgfault 13137
total_pgmajfault 120
total_inactive_anon 0
total_active_anon 1757184
total_inactive_file 2019328
total_active_file 5873664
total_unevictable 0

View File

@ -1,11 +1,11 @@
11:pids:/system.slice/mesos-agent-druid.service
11:pids:/system.slice/some.service
10:perf_event:/
9:cpu,cpuacct:/system.slice/mesos-agent-druid.service/f12ba7e0-fa16-462e-bb9d-652ccc27f0ee
8:blkio:/system.slice/mesos-agent-druid.service
7:devices:/system.slice/mesos-agent-druid.service
6:freezer:/system.slice/mesos-agent-druid.service/f12ba7e0-fa16-462e-bb9d-652ccc27f0ee
9:cpu,cpuacct:/system.slice/some.service/f12ba7e0-fa16-462e-bb9d-652ccc27f0ee
8:blkio:/system.slice/some.service
7:devices:/system.slice/some.service
6:freezer:/system.slice/some.service/f12ba7e0-fa16-462e-bb9d-652ccc27f0ee
5:hugetlb:/
4:cpuset:/
3:memory:/system.slice/mesos-agent-druid.service
3:memory:/system.slice/some.service
2:net_cls,net_prio:/
1:name=systemd:/mesos_executors.slice