NIFI-11036: Add Cluster Summary Metrics to Prometheus endpoint

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #6834.
This commit is contained in:
Matthew Burgess 2023-01-10 18:33:22 -05:00 committed by Pierre Villard
parent 2abb8921e7
commit 6adbc1aeb0
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
5 changed files with 117 additions and 5 deletions

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.prometheus.util;
import io.prometheus.client.Gauge;
/**
* This registry contains metrics related to a NiFi cluster, such as connected node count and total node count
*/
public class ClusterMetricsRegistry extends AbstractMetricsRegistry {
public ClusterMetricsRegistry() {
nameToGaugeMap.put("IS_CLUSTERED", Gauge.build()
.name("cluster_is_clustered")
.help("Whether this NiFi instance is clustered. Values are 0 or 1")
.labelNames("instance")
.register(registry));
nameToGaugeMap.put("IS_CONNECTED_TO_CLUSTER", Gauge.build()
.name("cluster_is_connected_to_cluster")
.help("Whether this NiFi instance is connected to a cluster. Values are 0 or 1")
.labelNames("instance")
.register(registry));
nameToGaugeMap.put("CONNECTED_NODE_COUNT", Gauge.build()
.name("cluster_connected_node_count")
.help("The number of connected nodes in this cluster")
.labelNames("instance", "connected_nodes")
.register(registry));
nameToGaugeMap.put("TOTAL_NODE_COUNT", Gauge.build()
.name("cluster_total_node_count")
.help("The total number of nodes in this cluster")
.labelNames("instance")
.register(registry));
}
}

View File

@ -512,4 +512,15 @@ public class PrometheusMetricsUtil {
return niFiMetricsRegistry.getRegistry();
}
public static CollectorRegistry createClusterMetrics(final ClusterMetricsRegistry clusterMetricsRegistry, final String instId, final boolean isClustered, final boolean isConnectedToCluster,
final String connectedNodes, final int connectedNodeCount, final int totalNodeCount) {
final String instanceId = StringUtils.isEmpty(instId) ? DEFAULT_LABEL_STRING : instId;
clusterMetricsRegistry.setDataPoint(isClustered ? 1 : 0, "IS_CLUSTERED", instanceId);
clusterMetricsRegistry.setDataPoint(isConnectedToCluster ? 1 : 0, "IS_CONNECTED_TO_CLUSTER", instanceId);
clusterMetricsRegistry.setDataPoint(connectedNodeCount, "CONNECTED_NODE_COUNT", instanceId, connectedNodes);
clusterMetricsRegistry.setDataPoint(totalNodeCount, "TOTAL_NODE_COUNT", instanceId);
return clusterMetricsRegistry.getRegistry();
}
}

View File

@ -132,6 +132,7 @@ import org.apache.nifi.parameter.StandardParameterContext;
import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.prometheus.util.AbstractMetricsRegistry;
import org.apache.nifi.prometheus.util.BulletinMetricsRegistry;
import org.apache.nifi.prometheus.util.ClusterMetricsRegistry;
import org.apache.nifi.prometheus.util.ConnectionAnalyticsMetricsRegistry;
import org.apache.nifi.prometheus.util.JvmMetricsRegistry;
import org.apache.nifi.prometheus.util.NiFiMetricsRegistry;
@ -438,19 +439,22 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
private final JvmMetricsRegistry jvmMetricsRegistry = new JvmMetricsRegistry();
private final ConnectionAnalyticsMetricsRegistry connectionAnalyticsMetricsRegistry = new ConnectionAnalyticsMetricsRegistry();
private final BulletinMetricsRegistry bulletinMetricsRegistry = new BulletinMetricsRegistry();
private final ClusterMetricsRegistry clusterMetricsRegistry = new ClusterMetricsRegistry();
private final Collection<AbstractMetricsRegistry> configuredRegistries = Arrays.asList(
nifiMetricsRegistry,
jvmMetricsRegistry,
connectionAnalyticsMetricsRegistry,
bulletinMetricsRegistry
bulletinMetricsRegistry,
clusterMetricsRegistry
);
private final Collection<CollectorRegistry> metricsRegistries = Arrays.asList(
nifiMetricsRegistry.getRegistry(),
jvmMetricsRegistry.getRegistry(),
connectionAnalyticsMetricsRegistry.getRegistry(),
bulletinMetricsRegistry.getRegistry()
bulletinMetricsRegistry.getRegistry(),
clusterMetricsRegistry.getRegistry()
);
@ -6192,6 +6196,25 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
);
}
}
// Collect cluster summary metrics
int connectedNodeCount = 0;
int totalNodeCount = 0;
String connectedNodesLabel = "Not clustered";
if (clusterCoordinator != null && clusterCoordinator.isConnected()) {
final Map<NodeConnectionState, List<NodeIdentifier>> stateMap = clusterCoordinator.getConnectionStates();
for (final List<NodeIdentifier> nodeList : stateMap.values()) {
totalNodeCount += nodeList.size();
}
final List<NodeIdentifier> connectedNodeIds = stateMap.get(NodeConnectionState.CONNECTED);
connectedNodeCount = (connectedNodeIds == null) ? 0 : connectedNodeIds.size();
connectedNodesLabel = connectedNodeCount + " / " + totalNodeCount;
}
final boolean isClustered = clusterCoordinator != null;
final boolean isConnectedToCluster = isClustered() && clusterCoordinator.isConnected();
PrometheusMetricsUtil.createClusterMetrics(clusterMetricsRegistry, instanceId, isClustered, isConnectedToCluster, connectedNodesLabel, connectedNodeCount, totalNodeCount);
return metricsRegistries;
}

View File

@ -18,6 +18,7 @@ package org.apache.nifi.web.api.request;
import org.apache.nifi.prometheus.util.AbstractMetricsRegistry;
import org.apache.nifi.prometheus.util.BulletinMetricsRegistry;
import org.apache.nifi.prometheus.util.ClusterMetricsRegistry;
import org.apache.nifi.prometheus.util.ConnectionAnalyticsMetricsRegistry;
import org.apache.nifi.prometheus.util.JvmMetricsRegistry;
import org.apache.nifi.prometheus.util.NiFiMetricsRegistry;
@ -32,7 +33,9 @@ public enum FlowMetricsRegistry {
BULLETIN("BULLETIN", BulletinMetricsRegistry.class),
CONNECTION("CONNECTION", ConnectionAnalyticsMetricsRegistry.class);
CONNECTION("CONNECTION", ConnectionAnalyticsMetricsRegistry.class),
CLUSTER("CLUSTER", ClusterMetricsRegistry.class);
private final String registry;

View File

@ -28,6 +28,7 @@ import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.exporter.common.TextFormat;
import org.apache.nifi.metrics.jvm.JmxJvmMetrics;
import org.apache.nifi.prometheus.util.BulletinMetricsRegistry;
import org.apache.nifi.prometheus.util.ClusterMetricsRegistry;
import org.apache.nifi.prometheus.util.ConnectionAnalyticsMetricsRegistry;
import org.apache.nifi.prometheus.util.JvmMetricsRegistry;
import org.apache.nifi.prometheus.util.NiFiMetricsRegistry;
@ -83,6 +84,8 @@ public class TestFlowResource {
private static final String SAMPLE_LABEL_VALUES_PROCESS_GROUP = "ProcessGroup";
private static final String COMPONENT_TYPE_LABEL = "component_type";
private static final int COMPONENT_TYPE_VALUE_INDEX = 1;
private static final String CLUSTER_TYPE_LABEL = "cluster";
private static final String CLUSTER_LABEL_KEY = "instance";
@InjectMocks
private FlowResource resource = new FlowResource();
@ -193,12 +196,13 @@ public class TestFlowResource {
assertThat(metrics, hasKey(ROOT_FIELD_NAME));
final List<Sample> registryList = metrics.get(ROOT_FIELD_NAME);
assertThat(registryList, hasSize(9));
assertThat(registryList, hasSize(13));
final Map<String, Long> result = getResult(registryList);
assertThat(3L, equalTo(result.get(SAMPLE_NAME_JVM)));
assertThat(4L, equalTo(result.get(SAMPLE_LABEL_VALUES_PROCESS_GROUP)));
assertThat(2L, equalTo(result.get(SAMPLE_LABEL_VALUES_ROOT_PROCESS_GROUP)));
assertThat(4L, equalTo(result.get(CLUSTER_LABEL_KEY)));
}
@Test
@ -317,7 +321,13 @@ public class TestFlowResource {
}
private String getResultKey(final Sample sample) {
return sample.labelNames.contains(COMPONENT_TYPE_LABEL) ? sample.labelValues.get(COMPONENT_TYPE_VALUE_INDEX) : SAMPLE_NAME_JVM;
if (sample.labelNames.contains(COMPONENT_TYPE_LABEL)) {
return sample.labelValues.get(COMPONENT_TYPE_VALUE_INDEX);
}
if (sample.name.startsWith(CLUSTER_TYPE_LABEL)) {
return CLUSTER_LABEL_KEY;
}
return SAMPLE_NAME_JVM;
}
private static List<CollectorRegistry> getCollectorRegistriesForJson() {
@ -327,6 +337,7 @@ public class TestFlowResource {
registryList.add(getConnectionMetricsRegistry());
registryList.add(getJvmMetricsRegistry());
registryList.add(getBulletinMetricsRegistry());
registryList.add(getClusterMetricsRegistry());
return registryList;
@ -378,6 +389,18 @@ public class TestFlowResource {
return bulletinMetricsRegistry.getRegistry();
}
private static CollectorRegistry getClusterMetricsRegistry() {
final ClusterMetricsRegistry clusterMetricsRegistry = new ClusterMetricsRegistry();
clusterMetricsRegistry.setDataPoint(1, "IS_CLUSTERED", "B1Id");
clusterMetricsRegistry.setDataPoint(1, "IS_CONNECTED_TO_CLUSTER", "B1Id");
clusterMetricsRegistry.setDataPoint(2, "CONNECTED_NODE_COUNT", "B1Id", "2 / 3");
clusterMetricsRegistry.setDataPoint(3, "TOTAL_NODE_COUNT", "B1Id");
return clusterMetricsRegistry.getRegistry();
}
private static class SampleDeserializer extends StdDeserializer<Sample> {
protected SampleDeserializer() {
super(Sample.class);