diff --git a/java-util/src/main/java/io/druid/java/util/metrics/CgroupMemoryMonitor.java b/java-util/src/main/java/io/druid/java/util/metrics/CgroupMemoryMonitor.java new file mode 100644 index 00000000000..df48ccdf768 --- /dev/null +++ b/java-util/src/main/java/io/druid/java/util/metrics/CgroupMemoryMonitor.java @@ -0,0 +1,77 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.java.util.metrics; + +import com.google.common.collect.ImmutableMap; +import io.druid.java.util.common.StringUtils; +import io.druid.java.util.emitter.service.ServiceEmitter; +import io.druid.java.util.emitter.service.ServiceMetricEvent; +import io.druid.java.util.metrics.cgroups.CgroupDiscoverer; +import io.druid.java.util.metrics.cgroups.Memory; + +import java.util.Map; + +public class CgroupMemoryMonitor extends FeedDefiningMonitor +{ + final CgroupDiscoverer cgroupDiscoverer; + final Map dimensions; + + public CgroupMemoryMonitor(CgroupDiscoverer cgroupDiscoverer, final Map dimensions, String feed) + { + super(feed); + this.cgroupDiscoverer = cgroupDiscoverer; + this.dimensions = dimensions; + } + + public CgroupMemoryMonitor(final Map dimensions, String feed) + { + this(null, dimensions, feed); + } + + public CgroupMemoryMonitor(final Map dimensions) + { + this(dimensions, DEFAULT_METRICS_FEED); + } + + public CgroupMemoryMonitor() + { + this(ImmutableMap.of()); + } + + @Override + public boolean doMonitor(ServiceEmitter emitter) + { + final Memory memory = new Memory(cgroupDiscoverer); + final Memory.MemoryStat stat = memory.snapshot(); + stat.getMemoryStats().forEach((key, value) -> { + final ServiceMetricEvent.Builder builder = builder(); + MonitorUtils.addDimensionsToBuilder(builder, dimensions); + // See https://www.kernel.org/doc/Documentation/cgroup-v1/memory.txt + // There are inconsistent units for these. Most are bytes. + emitter.emit(builder.build(StringUtils.format("cgroup/memory/%s", key), value)); + }); + stat.getNumaMemoryStats().forEach((key, value) -> { + final ServiceMetricEvent.Builder builder = builder().setDimension("numaZone", Long.toString(key)); + MonitorUtils.addDimensionsToBuilder(builder, dimensions); + value.forEach((k, v) -> emitter.emit(builder.build(StringUtils.format("cgroup/memory_numa/%s/pages", k), v))); + }); + return true; + } +} diff --git a/java-util/src/main/java/io/druid/java/util/metrics/cgroups/Memory.java b/java-util/src/main/java/io/druid/java/util/metrics/cgroups/Memory.java new file mode 100644 index 00000000000..e491b52f4f4 --- /dev/null +++ b/java-util/src/main/java/io/druid/java/util/metrics/cgroups/Memory.java @@ -0,0 +1,117 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.java.util.metrics.cgroups; + +import com.google.common.collect.ImmutableMap; +import com.google.common.primitives.Longs; +import io.druid.java.util.common.logger.Logger; + +import java.io.BufferedReader; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Pattern; + +public class Memory +{ + private static final Logger LOG = new Logger(Memory.class); + private static final String CGROUP = "memory"; + private static final String CGROUP_MEMORY_FILE = "memory.stat"; + private static final String CGROUP_MEMORY_NUMA_FILE = "memory.numa_stat"; + private final CgroupDiscoverer cgroupDiscoverer; + + public Memory(CgroupDiscoverer cgroupDiscoverer) + { + this.cgroupDiscoverer = cgroupDiscoverer; + } + + public MemoryStat snapshot() + { + final MemoryStat memoryStat = new MemoryStat(); + + try (final BufferedReader reader = Files.newBufferedReader( + Paths.get(cgroupDiscoverer.discover(CGROUP).toString(), CGROUP_MEMORY_FILE) + )) { + for (String line = reader.readLine(); line != null; line = reader.readLine()) { + final String[] parts = line.split(Pattern.quote(" ")); + if (parts.length != 2) { + // ignore + break; + } + final Long val = Longs.tryParse(parts[1]); + if (val == null) { + // Ignore + break; + } + memoryStat.memoryStats.put(parts[0], val); + } + } + catch (IOException | RuntimeException ex) { + LOG.error(ex, "Unable to fetch memory snapshot"); + } + + try (final BufferedReader reader = Files.newBufferedReader( + Paths.get(cgroupDiscoverer.discover(CGROUP).toString(), CGROUP_MEMORY_NUMA_FILE) + )) { + for (String line = reader.readLine(); line != null; line = reader.readLine()) { + // No safety checks here. Just fail as RuntimeException and catch later + final String[] parts = line.split(Pattern.quote(" ")); + final String[] macro = parts[0].split(Pattern.quote("=")); + final String label = macro[0]; + // Ignored + //final long total = Long.parseLong(macro[1]); + for (int i = 1; i < macro.length; ++i) { + final String[] numaParts = parts[i].split(Pattern.quote("=")); + final long nodeNum = Long.parseLong(numaParts[0].substring(1)); + final long val = Long.parseLong(numaParts[1]); + final Map nodeMetrics = memoryStat.numaMemoryStats.computeIfAbsent( + nodeNum, + l -> new HashMap<>() + ); + nodeMetrics.put(label, val); + } + } + } + catch (RuntimeException | IOException e) { + LOG.error(e, "Unable to fetch memory_numa snapshot"); + } + return memoryStat; + } + + + public static class MemoryStat + { + private final Map memoryStats = new HashMap<>(); + private final Map> numaMemoryStats = new HashMap<>(); + + public Map getMemoryStats() + { + return ImmutableMap.copyOf(memoryStats); + } + + public Map> getNumaMemoryStats() + { + // They can modify the inner map... but why? + return ImmutableMap.copyOf(numaMemoryStats); + } + } +} diff --git a/java-util/src/test/java/io/druid/java/util/metrics/CgroupMemoryMonitorTest.java b/java-util/src/test/java/io/druid/java/util/metrics/CgroupMemoryMonitorTest.java new file mode 100644 index 00000000000..5a1aae34268 --- /dev/null +++ b/java-util/src/test/java/io/druid/java/util/metrics/CgroupMemoryMonitorTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.java.util.metrics; + +import com.google.common.collect.ImmutableMap; +import io.druid.java.util.emitter.core.Event; +import io.druid.java.util.metrics.cgroups.CgroupDiscoverer; +import io.druid.java.util.metrics.cgroups.ProcCgroupDiscoverer; +import io.druid.java.util.metrics.cgroups.TestUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.List; + +public class CgroupMemoryMonitorTest +{ + @Rule + public ExpectedException expectedException = ExpectedException.none(); + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + private File procDir; + private File cgroupDir; + private CgroupDiscoverer discoverer; + + @Before + public void setUp() throws IOException + { + cgroupDir = temporaryFolder.newFolder(); + procDir = temporaryFolder.newFolder(); + discoverer = new ProcCgroupDiscoverer(procDir.toPath()); + TestUtils.setUpCgroups(procDir, cgroupDir); + final File memoryDir = new File( + cgroupDir, + "memory/system.slice/some.service" + ); + Assert.assertTrue((memoryDir.isDirectory() && memoryDir.exists()) || memoryDir.mkdirs()); + TestUtils.copyResource("/memory.stat", new File(memoryDir, "memory.stat")); + TestUtils.copyResource("/memory.numa_stat", new File(memoryDir, "memory.numa_stat")); + } + + @Test + public void testMonitor() + { + final CgroupMemoryMonitor monitor = new CgroupMemoryMonitor(discoverer, ImmutableMap.of(), "some_feed"); + final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); + Assert.assertTrue(monitor.doMonitor(emitter)); + final List actualEvents = emitter.getEvents(); + Assert.assertEquals(44, actualEvents.size()); + } +} diff --git a/java-util/src/test/java/io/druid/java/util/metrics/CpuAcctDeltaMonitorTest.java b/java-util/src/test/java/io/druid/java/util/metrics/CpuAcctDeltaMonitorTest.java index fd69e268aec..cbdbdcca780 100644 --- a/java-util/src/test/java/io/druid/java/util/metrics/CpuAcctDeltaMonitorTest.java +++ b/java-util/src/test/java/io/druid/java/util/metrics/CpuAcctDeltaMonitorTest.java @@ -51,7 +51,7 @@ public class CpuAcctDeltaMonitorTest TestUtils.setUpCgroups(procDir, cgroupDir); cpuacctDir = new File( cgroupDir, - "cpu,cpuacct/system.slice/mesos-agent-druid.service/f12ba7e0-fa16-462e-bb9d-652ccc27f0ee" + "cpu,cpuacct/system.slice/some.service/f12ba7e0-fa16-462e-bb9d-652ccc27f0ee" ); Assert.assertTrue((cpuacctDir.isDirectory() && cpuacctDir.exists()) || cpuacctDir.mkdirs()); TestUtils.copyResource("/cpuacct.usage_all", new File(cpuacctDir, "cpuacct.usage_all")); diff --git a/java-util/src/test/java/io/druid/java/util/metrics/cgroups/CpuAcctTest.java b/java-util/src/test/java/io/druid/java/util/metrics/cgroups/CpuAcctTest.java index 22b8973fbe8..ec0109c206c 100644 --- a/java-util/src/test/java/io/druid/java/util/metrics/cgroups/CpuAcctTest.java +++ b/java-util/src/test/java/io/druid/java/util/metrics/cgroups/CpuAcctTest.java @@ -52,7 +52,7 @@ public class CpuAcctTest TestUtils.setUpCgroups(procDir, cgroupDir); final File cpuacctDir = new File( cgroupDir, - "cpu,cpuacct/system.slice/mesos-agent-druid.service/f12ba7e0-fa16-462e-bb9d-652ccc27f0ee" + "cpu,cpuacct/system.slice/some.service/f12ba7e0-fa16-462e-bb9d-652ccc27f0ee" ); Assert.assertTrue((cpuacctDir.isDirectory() && cpuacctDir.exists()) || cpuacctDir.mkdirs()); TestUtils.copyResource("/cpuacct.usage_all", new File(cpuacctDir, "cpuacct.usage_all")); diff --git a/java-util/src/test/java/io/druid/java/util/metrics/cgroups/MemoryTest.java b/java-util/src/test/java/io/druid/java/util/metrics/cgroups/MemoryTest.java new file mode 100644 index 00000000000..250d26c88cd --- /dev/null +++ b/java-util/src/test/java/io/druid/java/util/metrics/cgroups/MemoryTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.java.util.metrics.cgroups; + +import com.google.common.collect.ImmutableMap; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; + +public class MemoryTest +{ + @Rule + public ExpectedException expectedException = ExpectedException.none(); + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + private File procDir; + private File cgroupDir; + private CgroupDiscoverer discoverer; + + @Before + public void setUp() throws Exception + { + cgroupDir = temporaryFolder.newFolder(); + procDir = temporaryFolder.newFolder(); + discoverer = new ProcCgroupDiscoverer(procDir.toPath()); + TestUtils.setUpCgroups(procDir, cgroupDir); + final File memoryDir = new File( + cgroupDir, + "memory/system.slice/some.service" + ); + Assert.assertTrue((memoryDir.isDirectory() && memoryDir.exists()) || memoryDir.mkdirs()); + TestUtils.copyResource("/memory.stat", new File(memoryDir, "memory.stat")); + TestUtils.copyResource("/memory.numa_stat", new File(memoryDir, "memory.numa_stat")); + } + + @Test + public void testWontCrash() + { + final Memory memory = new Memory((cgroup) -> { + throw new RuntimeException("shouldContinue"); + }); + final Memory.MemoryStat stat = memory.snapshot(); + Assert.assertEquals(ImmutableMap.of(), stat.getNumaMemoryStats()); + Assert.assertEquals(ImmutableMap.of(), stat.getMemoryStats()); + } + + @Test + public void testSimpleSnapshot() + { + final Memory memory = new Memory(discoverer); + final Memory.MemoryStat stat = memory.snapshot(); + final Map expectedMemoryStats = new HashMap<>(); + expectedMemoryStats.put("inactive_anon", 0L); + expectedMemoryStats.put("total_pgfault", 13137L); + expectedMemoryStats.put("total_unevictable", 0L); + expectedMemoryStats.put("pgfault", 13137L); + expectedMemoryStats.put("mapped_file", 1327104L); + expectedMemoryStats.put("total_pgpgout", 5975L); + expectedMemoryStats.put("total_active_anon", 1757184L); + expectedMemoryStats.put("total_rss", 1818624L); + expectedMemoryStats.put("rss", 1818624L); + expectedMemoryStats.put("total_inactive_anon", 0L); + expectedMemoryStats.put("active_file", 5873664L); + expectedMemoryStats.put("total_swap", 0L); + expectedMemoryStats.put("dirty", 0L); + expectedMemoryStats.put("total_mapped_file", 1327104L); + expectedMemoryStats.put("total_rss_huge", 0L); + expectedMemoryStats.put("total_inactive_file", 2019328L); + expectedMemoryStats.put("cache", 7892992L); + expectedMemoryStats.put("rss_huge", 0L); + expectedMemoryStats.put("shmem", 0L); + expectedMemoryStats.put("swap", 0L); + expectedMemoryStats.put("total_pgpgin", 8346L); + expectedMemoryStats.put("unevictable", 0L); + expectedMemoryStats.put("active_anon", 1757184L); + expectedMemoryStats.put("total_dirty", 0L); + expectedMemoryStats.put("total_active_file", 5873664L); + expectedMemoryStats.put("hierarchical_memory_limit", 9223372036854771712L); + expectedMemoryStats.put("total_cache", 7892992L); + expectedMemoryStats.put("pgpgin", 8346L); + expectedMemoryStats.put("pgmajfault", 120L); + expectedMemoryStats.put("inactive_file", 2019328L); + expectedMemoryStats.put("hierarchical_memsw_limit", 9223372036854771712L); + expectedMemoryStats.put("writeback", 0L); + expectedMemoryStats.put("total_shmem", 0L); + expectedMemoryStats.put("pgpgout", 5975L); + expectedMemoryStats.put("total_pgmajfault", 120L); + expectedMemoryStats.put("total_writeback", 0L); + Assert.assertEquals(expectedMemoryStats, stat.getMemoryStats()); + + final Map> expectedMemoryNumaStats = new HashMap<>(); + final Map expectedNumaNode0Stats = new HashMap<>(); + expectedNumaNode0Stats.put("anon", 432L); + expectedNumaNode0Stats.put("total", 2359L); + expectedNumaNode0Stats.put("hierarchical_total", 2359L); + expectedNumaNode0Stats.put("file", 1927L); + expectedNumaNode0Stats.put("unevictable", 0L); + expectedNumaNode0Stats.put("hierarchical_file", 1927L); + expectedNumaNode0Stats.put("hierarchical_anon", 432L); + expectedNumaNode0Stats.put("hierarchical_unevictable", 0L); + expectedMemoryNumaStats.put(0L, expectedNumaNode0Stats); + Assert.assertEquals(expectedMemoryNumaStats, stat.getNumaMemoryStats()); + } +} diff --git a/java-util/src/test/java/io/druid/java/util/metrics/cgroups/ProcCgroupDiscovererTest.java b/java-util/src/test/java/io/druid/java/util/metrics/cgroups/ProcCgroupDiscovererTest.java index bf3223e354d..37b4b79d170 100644 --- a/java-util/src/test/java/io/druid/java/util/metrics/cgroups/ProcCgroupDiscovererTest.java +++ b/java-util/src/test/java/io/druid/java/util/metrics/cgroups/ProcCgroupDiscovererTest.java @@ -55,7 +55,7 @@ public class ProcCgroupDiscovererTest Assert.assertEquals( new File( cgroupDir, - "cpu,cpuacct/system.slice/mesos-agent-druid.service/f12ba7e0-fa16-462e-bb9d-652ccc27f0ee" + "cpu,cpuacct/system.slice/some.service/f12ba7e0-fa16-462e-bb9d-652ccc27f0ee" ).toPath(), discoverer.discover("cpu") ); diff --git a/java-util/src/test/java/io/druid/java/util/metrics/cgroups/TestUtils.java b/java-util/src/test/java/io/druid/java/util/metrics/cgroups/TestUtils.java index 1ddc006ca56..dfbbfa06913 100644 --- a/java-util/src/test/java/io/druid/java/util/metrics/cgroups/TestUtils.java +++ b/java-util/src/test/java/io/druid/java/util/metrics/cgroups/TestUtils.java @@ -49,7 +49,7 @@ public class TestUtils Assert.assertTrue(new File( cgroupDir, - "cpu,cpuacct/system.slice/mesos-agent-druid.service/f12ba7e0-fa16-462e-bb9d-652ccc27f0ee" + "cpu,cpuacct/system.slice/some.service/f12ba7e0-fa16-462e-bb9d-652ccc27f0ee" ).mkdirs()); copyResource("/proc.pid.cgroup", new File(procDir, "cgroup")); } diff --git a/java-util/src/test/resources/memory.numa_stat b/java-util/src/test/resources/memory.numa_stat new file mode 100644 index 00000000000..4b887ef1ebe --- /dev/null +++ b/java-util/src/test/resources/memory.numa_stat @@ -0,0 +1,8 @@ +total=2359 N0=2359 +file=1927 N0=1927 +anon=432 N0=432 +unevictable=0 N0=0 +hierarchical_total=2359 N0=2359 +hierarchical_file=1927 N0=1927 +hierarchical_anon=432 N0=432 +hierarchical_unevictable=0 N0=0 diff --git a/java-util/src/test/resources/memory.stat b/java-util/src/test/resources/memory.stat new file mode 100644 index 00000000000..e68ddceedfa --- /dev/null +++ b/java-util/src/test/resources/memory.stat @@ -0,0 +1,36 @@ +cache 7892992 +rss 1818624 +rss_huge 0 +shmem 0 +mapped_file 1327104 +dirty 0 +writeback 0 +swap 0 +pgpgin 8346 +pgpgout 5975 +pgfault 13137 +pgmajfault 120 +inactive_anon 0 +active_anon 1757184 +inactive_file 2019328 +active_file 5873664 +unevictable 0 +hierarchical_memory_limit 9223372036854771712 +hierarchical_memsw_limit 9223372036854771712 +total_cache 7892992 +total_rss 1818624 +total_rss_huge 0 +total_shmem 0 +total_mapped_file 1327104 +total_dirty 0 +total_writeback 0 +total_swap 0 +total_pgpgin 8346 +total_pgpgout 5975 +total_pgfault 13137 +total_pgmajfault 120 +total_inactive_anon 0 +total_active_anon 1757184 +total_inactive_file 2019328 +total_active_file 5873664 +total_unevictable 0 diff --git a/java-util/src/test/resources/proc.pid.cgroup b/java-util/src/test/resources/proc.pid.cgroup index 139f81b29c8..e9ad851a940 100644 --- a/java-util/src/test/resources/proc.pid.cgroup +++ b/java-util/src/test/resources/proc.pid.cgroup @@ -1,11 +1,11 @@ -11:pids:/system.slice/mesos-agent-druid.service +11:pids:/system.slice/some.service 10:perf_event:/ -9:cpu,cpuacct:/system.slice/mesos-agent-druid.service/f12ba7e0-fa16-462e-bb9d-652ccc27f0ee -8:blkio:/system.slice/mesos-agent-druid.service -7:devices:/system.slice/mesos-agent-druid.service -6:freezer:/system.slice/mesos-agent-druid.service/f12ba7e0-fa16-462e-bb9d-652ccc27f0ee +9:cpu,cpuacct:/system.slice/some.service/f12ba7e0-fa16-462e-bb9d-652ccc27f0ee +8:blkio:/system.slice/some.service +7:devices:/system.slice/some.service +6:freezer:/system.slice/some.service/f12ba7e0-fa16-462e-bb9d-652ccc27f0ee 5:hugetlb:/ 4:cpuset:/ -3:memory:/system.slice/mesos-agent-druid.service +3:memory:/system.slice/some.service 2:net_cls,net_prio:/ 1:name=systemd:/mesos_executors.slice