ARTEMIS-3280 Netty Pool micrometer metric plugin

This commit is contained in:
franz1981 2020-07-30 12:15:50 +02:00 committed by Justin Bertram
parent 3ca02955bf
commit 14dddb04d7
11 changed files with 412 additions and 2 deletions

View File

@ -630,6 +630,9 @@ public final class ActiveMQDefaultConfiguration {
// Number of concurrent workers for a core bridge
public static int DEFAULT_BRIDGE_CONCURRENCY = 1;
// Whether or not to report Netty pool metrics
private static final boolean DEFAULT_NETTY_POOL_METRICS = false;
/**
* If true then the ActiveMQ Artemis Server will make use of any Protocol Managers that are in available on the classpath. If false then only the core protocol will be available, unless in Embedded mode where users can inject their own Protocol Managers.
*/
@ -1721,4 +1724,11 @@ public final class ActiveMQDefaultConfiguration {
public static int getDefaultBridgeConcurrency() {
return DEFAULT_BRIDGE_CONCURRENCY;
}
/**
* Whether or not to report Netty pool metrics
*/
public static Boolean getDefaultNettyPoolMetrics() {
return DEFAULT_NETTY_POOL_METRICS;
}
}

View File

@ -27,6 +27,7 @@ public class MetricsConfiguration implements Serializable {
private boolean jvmMemory = ActiveMQDefaultConfiguration.getDefaultJvmMemoryMetrics();
private boolean jvmGc = ActiveMQDefaultConfiguration.getDefaultJvmGcMetrics();
private boolean jvmThread = ActiveMQDefaultConfiguration.getDefaultJvmThreadMetrics();
private boolean nettyPool = ActiveMQDefaultConfiguration.getDefaultNettyPoolMetrics();
private ActiveMQMetricsPlugin plugin;
public boolean isJvmMemory() {
@ -56,6 +57,15 @@ public class MetricsConfiguration implements Serializable {
return this;
}
public boolean isNettyPool() {
return nettyPool;
}
public MetricsConfiguration setNettyPool(boolean nettyPool) {
this.nettyPool = nettyPool;
return this;
}
public ActiveMQMetricsPlugin getPlugin() {
return plugin;
}

View File

@ -907,6 +907,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
metricsConfiguration.setJvmMemory(XMLUtil.parseBoolean(child));
} else if (child.getNodeName().equals("jvm-threads")) {
metricsConfiguration.setJvmThread(XMLUtil.parseBoolean(child));
} else if (child.getNodeName().equals("netty-pool")) {
metricsConfiguration.setNettyPool(XMLUtil.parseBoolean(child));
} else if (child.getNodeName().equals("plugin")) {
metricsConfiguration.setPlugin(parseMetricsPlugin(child, config));
}

View File

@ -31,6 +31,7 @@ import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.binder.jvm.JvmGcMetrics;
import io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics;
import io.micrometer.core.instrument.binder.jvm.JvmThreadMetrics;
import io.netty.buffer.PooledByteBufAllocator;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.config.MetricsConfiguration;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
@ -65,6 +66,9 @@ public class MetricsManager {
if (metricsConfiguration.isJvmThread()) {
new JvmThreadMetrics().bindTo(meterRegistry);
}
if (metricsConfiguration.isNettyPool()) {
new NettyPooledAllocatorMetrics(PooledByteBufAllocator.DEFAULT.metric()).bindTo(meterRegistry);
}
}
public MeterRegistry getMeterRegistry() {

View File

@ -0,0 +1,369 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.metrics;
import java.util.List;
import java.util.function.Function;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.MeterBinder;
import io.netty.buffer.PoolChunkListMetric;
import io.netty.buffer.PoolChunkMetric;
import io.netty.buffer.PoolSubpageMetric;
import io.netty.buffer.PooledByteBufAllocatorMetric;
import io.micrometer.core.instrument.FunctionCounter;
import io.netty.buffer.PoolArenaMetric;
public final class NettyPooledAllocatorMetrics implements MeterBinder {
private static final String BYTES_UNIT = "bytes";
private final PooledByteBufAllocatorMetric metric;
public NettyPooledAllocatorMetrics(final PooledByteBufAllocatorMetric pooledAllocatorMetric) {
this.metric = pooledAllocatorMetric;
}
@Override
public void bindTo(final MeterRegistry registry) {
Gauge.builder("netty.pooled.used.memory", this.metric, metric -> metric.usedDirectMemory())
.tags("type", "direct")
.description("The used memory")
.baseUnit(BYTES_UNIT).register(registry);
Gauge.builder("netty.pooled.used.memory", this.metric, metric -> metric.usedHeapMemory())
.tags("type", "heap")
.description("The used memory")
.baseUnit(BYTES_UNIT).register(registry);
Gauge.builder("netty.pooled.arenas.num", this.metric, metric -> metric.numDirectArenas())
.tags("type", "direct")
.description("The number of arenas")
.register(registry);
Gauge.builder("netty.pooled.arenas.num", this.metric, metric -> metric.numHeapArenas())
.tags("type", "heap").description("The number or arenas")
.register(registry);
Gauge.builder("netty.pooled.cachesize", this.metric, metric -> metric.tinyCacheSize())
.tags("type", "tiny")
.description("The cachesize used by this netty allocator")
.register(registry);
Gauge.builder("netty.pooled.cachesize", this.metric, metric -> metric.smallCacheSize())
.tags("type", "small")
.description("The cachesize used by this netty allocator")
.register(registry);
Gauge.builder("netty.pooled.cachesize", this.metric, metric -> metric.normalCacheSize())
.tags("type", "normal")
.description("The cachesize used by this netty allocator")
.register(registry);
Gauge.builder("netty.pooled.threadlocalcache.num", this.metric, metric -> metric.numThreadLocalCaches())
.description("The number of thread local caches used by this netty allocator")
.register(registry);
Gauge.builder("netty.pooled.chunk.size", this.metric, metric -> metric.chunkSize())
.description("The arena chunk size of this netty allocator")
.baseUnit(BYTES_UNIT)
.register(registry);
{
int i = 0;
for (final PoolArenaMetric poolArenaMetric : metric.directArenas()) {
metricsOfPoolArena(registry, poolArenaMetric, i++, "direct");
}
}
{
int i = 0;
for (final PoolArenaMetric poolArenaMetric : metric.heapArenas()) {
metricsOfPoolArena(registry, poolArenaMetric, i++, "heap");
}
}
}
private void metricsOfPoolArena(final MeterRegistry registry,
final PoolArenaMetric poolArenaMetric,
final int poolArenaIndex,
final String poolArenaType) {
/**
* the number of thread caches backed by this arena.
*/
final String poolArenaIndexString = Integer.toString(poolArenaIndex);
Gauge.builder("netty.pooled.arena.threadcaches.num", poolArenaMetric, metric -> metric.numThreadCaches())
.tags("pool_arena_type", poolArenaType, "pool_arena_index", poolArenaIndexString)
.description("The number of thread caches backed by this arena")
.register(registry);
FunctionCounter.builder("netty.pooled.arena.allocations.num", poolArenaMetric,
metric -> metric.numAllocations())
.tags("pool_arena_type", poolArenaType, "pool_arena_index", poolArenaIndexString, "size", "all")
.description("The number of allocations done via the arena. This includes all sizes")
.register(registry);
FunctionCounter.builder("netty.pooled.arena.allocations.num", poolArenaMetric,
metric -> metric.numTinyAllocations())
.tags("pool_arena_type", poolArenaType, "pool_arena_index", poolArenaIndexString, "size", "tiny")
.description("The number of tiny allocations done via the arena")
.register(registry);
FunctionCounter.builder("netty.pooled.arena.allocations.num", poolArenaMetric,
metric -> metric.numSmallAllocations())
.tags("pool_arena_type", poolArenaType, "pool_arena_index", poolArenaIndexString, "size", "small")
.description("The number of small allocations done via the arena")
.register(registry);
FunctionCounter.builder("netty.pooled.arena.allocations.num", poolArenaMetric,
metric -> metric.numNormalAllocations())
.tags("pool_arena_type", poolArenaType, "pool_arena_index", poolArenaIndexString, "size", "normal")
.description("The number of normal allocations done via the arena")
.register(registry);
FunctionCounter.builder("netty.pooled.arena.allocations.num", poolArenaMetric,
metric -> metric.numHugeAllocations())
.tags("pool_arena_type", poolArenaType, "pool_arena_index", poolArenaIndexString, "size", "huge")
.description("The number of huge allocations done via the arena")
.register(registry);
FunctionCounter.builder("netty.pooled.arena.deallocations.num", poolArenaMetric,
metric -> metric.numDeallocations())
.tags("pool_arena_type", poolArenaType, "pool_arena_index", poolArenaIndexString, "size", "all")
.description("The number of deallocations done via the arena. This includes all sizes")
.register(registry);
FunctionCounter.builder("netty.pooled.arena.deallocations.num", poolArenaMetric,
metric -> metric.numTinyDeallocations())
.tags("pool_arena_type", poolArenaType, "pool_arena_index", poolArenaIndexString, "size", "tiny")
.description("The number of tiny deallocations done via the arena")
.register(registry);
FunctionCounter.builder("netty.pooled.arena.deallocations.num", poolArenaMetric,
metric -> metric.numSmallDeallocations())
.tags("pool_arena_type", poolArenaType, "pool_arena_index", poolArenaIndexString, "size", "small")
.description("The number of small deallocations done via the arena")
.register(registry);
FunctionCounter.builder("netty.pooled.arena.deallocations.num", poolArenaMetric,
metric -> metric.numNormalDeallocations())
.tags("pool_arena_type", poolArenaType, "pool_arena_index", poolArenaIndexString, "size", "normal")
.description("The number of normal deallocations done via the arena")
.register(registry);
FunctionCounter.builder("netty.pooled.arena.deallocations.num", poolArenaMetric,
metric -> metric.numHugeDeallocations())
.tags("pool_arena_type", poolArenaType, "pool_arena_index", poolArenaIndexString, "size", "huge")
.description("The number of huge deallocations done via the arena")
.register(registry);
Gauge.builder("netty.pooled.arena.active.allocations.num", poolArenaMetric,
metric -> metric.numActiveAllocations())
.tags("pool_arena_type", poolArenaType, "pool_arena_index", poolArenaIndexString, "size", "all")
.description("The number of currently active allocations")
.register(registry);
Gauge.builder("netty.pooled.arena.active.allocations.num", poolArenaMetric,
metric -> metric.numActiveTinyAllocations())
.tags("pool_arena_type", poolArenaType, "pool_arena_index", poolArenaIndexString, "size", "tiny")
.description("The number of currently active tiny allocations")
.register(registry);
Gauge.builder("netty.pooled.arena.active.allocations.num", poolArenaMetric,
metric -> metric.numActiveSmallAllocations())
.tags("pool_arena_type", poolArenaType, "pool_arena_index", poolArenaIndexString, "size", "small")
.description("The number of currently active small allocations")
.register(registry);
Gauge.builder("netty.pooled.arena.active.allocations.num", poolArenaMetric,
metric -> metric.numActiveNormalAllocations())
.tags("pool_arena_type", poolArenaType, "pool_arena_index", poolArenaIndexString, "size", "normal")
.description("The number of currently active normal allocations")
.register(registry);
Gauge.builder("netty.pooled.arena.active.allocations.num", poolArenaMetric,
metric -> metric.numActiveHugeAllocations())
.tags("pool_arena_type", poolArenaType, "pool_arena_index", poolArenaIndexString, "size", "huge")
.description("The number of currently active huge allocations")
.register(registry);
Gauge.builder("netty.pooled.arena.active.allocated.num", poolArenaMetric,
metric -> metric.numActiveBytes())
.tags("pool_arena_type", poolArenaType, "pool_arena_index", poolArenaIndexString)
.description("The number of active bytes that are currently allocated by the arena")
.baseUnit(BYTES_UNIT).register(registry);
Gauge.builder("netty.pooled.arena.chunk.num", poolArenaMetric,
metric -> metric.numChunkLists())
.tags("pool_arena_type", poolArenaType, "pool_arena_index", poolArenaIndexString)
.description("The number of chunk lists for the arena")
.register(registry);
Gauge.builder("netty.pooled.arena.subpages.num", poolArenaMetric,
metric -> metric.numTinySubpages())
.tags("pool_arena_type", poolArenaType, "pool_arena_index", poolArenaIndexString, "size", "tiny")
.description("The number of tiny sub-pages for the arena")
.register(registry);
Gauge.builder("netty.pooled.arena.subpages.num", poolArenaMetric,
metric -> metric.numSmallSubpages())
.tags("pool_arena_type", poolArenaType, "pool_arena_index", poolArenaIndexString, "size", "small")
.description("The number of small sub-pages for the arena")
.register(registry);
List<PoolChunkListMetric> poolChunkListMetrics = poolArenaMetric.chunkLists();
assert poolChunkListMetrics.size() == 6;
for (PoolChunkListMetric poolChunkListMetric : poolChunkListMetrics) {
final String poolChunkListType = usageTypeOf(poolChunkListMetric);
metricsOfPoolChunkListMetric(registry, poolChunkListMetric, poolArenaIndexString, poolArenaType, poolChunkListType);
}
// smallSubpages metrics
metricsOfPoolSubpageMetric(registry, poolArenaMetric, PoolArenaMetric::smallSubpages,
poolArenaIndexString, poolArenaType, "small");
// tinySubpages metrics
metricsOfPoolSubpageMetric(registry, poolArenaMetric, PoolArenaMetric::tinySubpages,
poolArenaIndexString, poolArenaType, "tiny");
}
private void metricsOfPoolSubpageMetric(final MeterRegistry registry,
final PoolArenaMetric poolArenaMetric,
final Function<? super PoolArenaMetric, List<PoolSubpageMetric>> poolSubpageMetricProvider,
final String poolArenaIndex,
final String poolArenaType,
final String size) {
Gauge.builder("netty.pooled.arena.subpages.count", poolArenaMetric, metric -> {
long total = 0;
for (PoolSubpageMetric poolSubpageMetric : poolSubpageMetricProvider.apply(metric)) {
total++;
}
return total;
})
.tags("pool_arena_type", poolArenaType, "pool_arena_index", poolArenaIndex, "size", size)
.description("The total count of subpages")
.register(registry);
Gauge.builder("netty.pooled.arena.subpages.elementsize", poolArenaMetric, metric -> {
long total = 0;
for (PoolSubpageMetric poolSubpageMetric : poolSubpageMetricProvider.apply(metric)) {
total += poolSubpageMetric.elementSize();
}
return total;
})
.tags("pool_arena_type", poolArenaType, "pool_arena_index", poolArenaIndex, "size", size)
.description("The total size (in bytes) of the elements that will be allocated")
.baseUnit(BYTES_UNIT)
.register(registry);
Gauge.builder("netty.pooled.arena.subpages.maxnumelements", poolArenaMetric, metric -> {
long total = 0;
for (PoolSubpageMetric poolSubpageMetric : poolSubpageMetricProvider.apply(metric)) {
total += poolSubpageMetric.maxNumElements();
}
return total;
})
.tags("pool_arena_type", poolArenaType, "pool_arena_index", poolArenaIndex, "size", size)
.description("The total number of maximal elements that can be allocated out of the sub-page.")
.register(registry);
Gauge.builder("netty.pooled.arena.subpages.numavailable", poolArenaMetric, metric -> {
long total = 0;
for (PoolSubpageMetric poolSubpageMetric : poolSubpageMetricProvider.apply(metric)) {
total += poolSubpageMetric.numAvailable();
}
return total;
})
.tags("pool_arena_type", poolArenaType, "pool_arena_index", poolArenaIndex, "size", size)
.description("The total number of available elements to be allocated")
.register(registry);
Gauge.builder("netty.pooled.arena.subpages.pagesize", poolArenaMetric, metric -> {
long total = 0;
for (PoolSubpageMetric poolSubpageMetric : poolSubpageMetricProvider.apply(metric)) {
total += poolSubpageMetric.pageSize();
}
return total;
})
.tags("pool_arena_type", poolArenaType, "pool_arena_index", poolArenaIndex, "size", size)
.description("The total size (in bytes) of the pages")
.baseUnit(BYTES_UNIT)
.register(registry);
}
private static String usageTypeOf(PoolChunkListMetric metric) {
// metrics.add(qInit); 0~25% -> new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25, chunkSize);
// metrics.add(q000); 0~50% -> new PoolChunkList<T>(this, q025, 1, 50, chunkSize);
// metrics.add(q025); 25~75% -> new PoolChunkList<T>(this, q050, 25, 75, chunkSize);
// metrics.add(q050); 50~100% -> new PoolChunkList<T>(this, q075, 50, 100, chunkSize);
// metrics.add(q075); 75~100% -> new PoolChunkList<T>(this, q100, 75, 100, chunkSize);
// metrics.add(q100); 100% -> new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE, chunkSize);
final StringBuilder builder = new StringBuilder("75~100%".length());
int minUsage = metric.minUsage();
if (minUsage <= 1) {
minUsage = 0;
}
builder.append(minUsage);
int maxUsage = metric.maxUsage();
if (maxUsage != minUsage) {
builder.append('~').append(maxUsage);
}
builder.append('%');
return builder.toString();
}
private void metricsOfPoolChunkListMetric(final MeterRegistry registry,
final PoolChunkListMetric poolChunkListMetrics,
final String poolArenaIndex,
final String poolArenaType,
final String poolChunkListType) {
Gauge.builder("netty.pooled.arena.chunk.list.capacity", poolChunkListMetrics, metric -> {
long total = 0;
for (PoolChunkMetric poolChunkMetric : poolChunkListMetrics) {
total += poolChunkMetric.chunkSize();
}
return total;
})
.tags("pool_arena_type", poolArenaType, "pool_arena_index", poolArenaIndex, "pool_chunk_list_type", poolChunkListType)
.description("The total capacity in bytes of the chunks in the list")
.baseUnit(BYTES_UNIT)
.register(registry);
Gauge.builder("netty.pooled.arena.chunk.list.free", poolChunkListMetrics, metric -> {
long total = 0;
for (PoolChunkMetric poolChunkMetric : poolChunkListMetrics) {
total += poolChunkMetric.freeBytes();
}
return total;
})
.tags("pool_arena_type", poolArenaType, "pool_arena_index", poolArenaIndex, "pool_chunk_list_type", poolChunkListType)
.description("The total free bytes of the chunks in the list")
.baseUnit(BYTES_UNIT)
.register(registry);
Gauge.builder("netty.pooled.arena.chunk.list.count", poolChunkListMetrics, metric -> {
long total = 0;
for (PoolChunkMetric poolChunkMetric : poolChunkListMetrics) {
total++;
}
return total;
})
.tags("pool_arena_type", poolArenaType, "pool_arena_index", poolArenaIndex, "pool_chunk_list_type", poolChunkListType)
.description("The number of chunks in the list")
.register(registry);
}
}

View File

@ -1187,6 +1187,14 @@
</xsd:annotation>
</xsd:element>
<xsd:element name="netty-pool" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
whether or not to report Netty pool metrics
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="plugin" maxOccurs="1" minOccurs="0">
<xsd:complexType>
<xsd:annotation>

View File

@ -494,6 +494,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
assertFalse(metricsConfiguration.isJvmMemory());
assertTrue(metricsConfiguration.isJvmGc());
assertTrue(metricsConfiguration.isJvmThread());
assertTrue(metricsConfiguration.isNettyPool());
}
private void verifyAddresses() {
@ -797,6 +798,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
assertTrue(metricsConfiguration.isJvmMemory());
assertTrue(metricsConfiguration.isJvmGc());
assertTrue(metricsConfiguration.isJvmThread());
assertTrue(metricsConfiguration.isNettyPool());
ActiveMQMetricsPlugin metricPlugin = metricsConfiguration.getPlugin();
assertTrue(metricPlugin instanceof FakeMetricPlugin);

View File

@ -326,6 +326,7 @@
<jvm-memory>false</jvm-memory>
<jvm-gc>true</jvm-gc>
<jvm-threads>true</jvm-threads>
<netty-pool>true</netty-pool>
<plugin class-name="org.apache.activemq.artemis.core.server.metrics.plugins.SimpleMetricsPlugin">
<property key="foo" value="x"/>
<property key="bar" value="y"/>

View File

@ -210,6 +210,7 @@
<jvm-memory>false</jvm-memory>
<jvm-gc>true</jvm-gc>
<jvm-threads>true</jvm-threads>
<netty-pool>true</netty-pool>
<plugin class-name="org.apache.activemq.artemis.core.server.metrics.plugins.SimpleMetricsPlugin">
<property key="foo" value="x"/>
<property key="bar" value="y"/>

View File

@ -23,6 +23,7 @@
<jvm-memory>true</jvm-memory>
<jvm-gc>true</jvm-gc>
<jvm-threads>true</jvm-threads>
<netty-pool>true</netty-pool>
<plugin class-name="org.apache.activemq.artemis.core.config.impl.FileConfigurationTest$FakeMetricPlugin">
<property key="key1" value="value1"/>
<property key="key2" value="value2"/>

View File

@ -82,8 +82,9 @@ message count). However, these metrics can be deduced by aggregating the
lower level metrics (e.g. aggregate the message.count metrics from all queues
to get the total).
JVM memory metrics are also exported by default and GC and thread metrics can
be configured
JVM memory metrics are also exported by default and GC, thread metrics, and
[Netty](https://netty.io/4.1/api/io/netty/buffer/PooledByteBufAllocatorMetric.html)
metrics can be configured
## Configuration
@ -100,6 +101,7 @@ JVM metrics:
<jvm-memory>true</jvm-memory> <!-- defaults to true -->
<jvm-gc>true</jvm-gc> <!-- defaults to false -->
<jvm-threads>true</jvm-threads> <!-- defaults to false -->
<netty-pool>true</netty-pool> <!-- defaults to false -->
<plugin class-name="org.apache.activemq.artemis.core.server.metrics.plugins.LoggingMetricsPlugin"/>
</metrics>
```