diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/ClusterMetricsRegistry.java b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/ClusterMetricsRegistry.java new file mode 100644 index 0000000000..6ca836253f --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/ClusterMetricsRegistry.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.prometheus.util; + +import io.prometheus.client.Gauge; + +/** + * This registry contains metrics related to a NiFi cluster, such as connected node count and total node count + */ +public class ClusterMetricsRegistry extends AbstractMetricsRegistry { + + public ClusterMetricsRegistry() { + + nameToGaugeMap.put("IS_CLUSTERED", Gauge.build() + .name("cluster_is_clustered") + .help("Whether this NiFi instance is clustered. Values are 0 or 1") + .labelNames("instance") + .register(registry)); + + nameToGaugeMap.put("IS_CONNECTED_TO_CLUSTER", Gauge.build() + .name("cluster_is_connected_to_cluster") + .help("Whether this NiFi instance is connected to a cluster. Values are 0 or 1") + .labelNames("instance") + .register(registry)); + + nameToGaugeMap.put("CONNECTED_NODE_COUNT", Gauge.build() + .name("cluster_connected_node_count") + .help("The number of connected nodes in this cluster") + .labelNames("instance", "connected_nodes") + .register(registry)); + + nameToGaugeMap.put("TOTAL_NODE_COUNT", Gauge.build() + .name("cluster_total_node_count") + .help("The total number of nodes in this cluster") + .labelNames("instance") + .register(registry)); + } +} diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java index 6f1118ec65..f4e87d8a11 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java @@ -512,4 +512,15 @@ public class PrometheusMetricsUtil { return niFiMetricsRegistry.getRegistry(); } + + public static CollectorRegistry createClusterMetrics(final ClusterMetricsRegistry clusterMetricsRegistry, final String instId, final boolean isClustered, final boolean isConnectedToCluster, + final String connectedNodes, final int connectedNodeCount, final int totalNodeCount) { + final String instanceId = StringUtils.isEmpty(instId) ? DEFAULT_LABEL_STRING : instId; + clusterMetricsRegistry.setDataPoint(isClustered ? 1 : 0, "IS_CLUSTERED", instanceId); + clusterMetricsRegistry.setDataPoint(isConnectedToCluster ? 1 : 0, "IS_CONNECTED_TO_CLUSTER", instanceId); + clusterMetricsRegistry.setDataPoint(connectedNodeCount, "CONNECTED_NODE_COUNT", instanceId, connectedNodes); + clusterMetricsRegistry.setDataPoint(totalNodeCount, "TOTAL_NODE_COUNT", instanceId); + + return clusterMetricsRegistry.getRegistry(); + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 4fb1b2271e..9f4bd64c3b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -132,6 +132,7 @@ import org.apache.nifi.parameter.StandardParameterContext; import org.apache.nifi.processor.VerifiableProcessor; import org.apache.nifi.prometheus.util.AbstractMetricsRegistry; import org.apache.nifi.prometheus.util.BulletinMetricsRegistry; +import org.apache.nifi.prometheus.util.ClusterMetricsRegistry; import org.apache.nifi.prometheus.util.ConnectionAnalyticsMetricsRegistry; import org.apache.nifi.prometheus.util.JvmMetricsRegistry; import org.apache.nifi.prometheus.util.NiFiMetricsRegistry; @@ -438,19 +439,22 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { private final JvmMetricsRegistry jvmMetricsRegistry = new JvmMetricsRegistry(); private final ConnectionAnalyticsMetricsRegistry connectionAnalyticsMetricsRegistry = new ConnectionAnalyticsMetricsRegistry(); private final BulletinMetricsRegistry bulletinMetricsRegistry = new BulletinMetricsRegistry(); + private final ClusterMetricsRegistry clusterMetricsRegistry = new ClusterMetricsRegistry(); private final Collection configuredRegistries = Arrays.asList( nifiMetricsRegistry, jvmMetricsRegistry, connectionAnalyticsMetricsRegistry, - bulletinMetricsRegistry + bulletinMetricsRegistry, + clusterMetricsRegistry ); private final Collection metricsRegistries = Arrays.asList( nifiMetricsRegistry.getRegistry(), jvmMetricsRegistry.getRegistry(), connectionAnalyticsMetricsRegistry.getRegistry(), - bulletinMetricsRegistry.getRegistry() + bulletinMetricsRegistry.getRegistry(), + clusterMetricsRegistry.getRegistry() ); @@ -6192,6 +6196,25 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { ); } } + + // Collect cluster summary metrics + int connectedNodeCount = 0; + int totalNodeCount = 0; + String connectedNodesLabel = "Not clustered"; + if (clusterCoordinator != null && clusterCoordinator.isConnected()) { + final Map> stateMap = clusterCoordinator.getConnectionStates(); + for (final List nodeList : stateMap.values()) { + totalNodeCount += nodeList.size(); + } + final List connectedNodeIds = stateMap.get(NodeConnectionState.CONNECTED); + connectedNodeCount = (connectedNodeIds == null) ? 0 : connectedNodeIds.size(); + + connectedNodesLabel = connectedNodeCount + " / " + totalNodeCount; + } + final boolean isClustered = clusterCoordinator != null; + final boolean isConnectedToCluster = isClustered() && clusterCoordinator.isConnected(); + PrometheusMetricsUtil.createClusterMetrics(clusterMetricsRegistry, instanceId, isClustered, isConnectedToCluster, connectedNodesLabel, connectedNodeCount, totalNodeCount); + return metricsRegistries; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/request/FlowMetricsRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/request/FlowMetricsRegistry.java index c827ae822e..46b0d8033a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/request/FlowMetricsRegistry.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/request/FlowMetricsRegistry.java @@ -18,6 +18,7 @@ package org.apache.nifi.web.api.request; import org.apache.nifi.prometheus.util.AbstractMetricsRegistry; import org.apache.nifi.prometheus.util.BulletinMetricsRegistry; +import org.apache.nifi.prometheus.util.ClusterMetricsRegistry; import org.apache.nifi.prometheus.util.ConnectionAnalyticsMetricsRegistry; import org.apache.nifi.prometheus.util.JvmMetricsRegistry; import org.apache.nifi.prometheus.util.NiFiMetricsRegistry; @@ -32,7 +33,9 @@ public enum FlowMetricsRegistry { BULLETIN("BULLETIN", BulletinMetricsRegistry.class), - CONNECTION("CONNECTION", ConnectionAnalyticsMetricsRegistry.class); + CONNECTION("CONNECTION", ConnectionAnalyticsMetricsRegistry.class), + + CLUSTER("CLUSTER", ClusterMetricsRegistry.class); private final String registry; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestFlowResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestFlowResource.java index 6689a99317..9446df5369 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestFlowResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestFlowResource.java @@ -28,6 +28,7 @@ import io.prometheus.client.CollectorRegistry; import io.prometheus.client.exporter.common.TextFormat; import org.apache.nifi.metrics.jvm.JmxJvmMetrics; import org.apache.nifi.prometheus.util.BulletinMetricsRegistry; +import org.apache.nifi.prometheus.util.ClusterMetricsRegistry; import org.apache.nifi.prometheus.util.ConnectionAnalyticsMetricsRegistry; import org.apache.nifi.prometheus.util.JvmMetricsRegistry; import org.apache.nifi.prometheus.util.NiFiMetricsRegistry; @@ -83,6 +84,8 @@ public class TestFlowResource { private static final String SAMPLE_LABEL_VALUES_PROCESS_GROUP = "ProcessGroup"; private static final String COMPONENT_TYPE_LABEL = "component_type"; private static final int COMPONENT_TYPE_VALUE_INDEX = 1; + private static final String CLUSTER_TYPE_LABEL = "cluster"; + private static final String CLUSTER_LABEL_KEY = "instance"; @InjectMocks private FlowResource resource = new FlowResource(); @@ -193,12 +196,13 @@ public class TestFlowResource { assertThat(metrics, hasKey(ROOT_FIELD_NAME)); final List registryList = metrics.get(ROOT_FIELD_NAME); - assertThat(registryList, hasSize(9)); + assertThat(registryList, hasSize(13)); final Map result = getResult(registryList); assertThat(3L, equalTo(result.get(SAMPLE_NAME_JVM))); assertThat(4L, equalTo(result.get(SAMPLE_LABEL_VALUES_PROCESS_GROUP))); assertThat(2L, equalTo(result.get(SAMPLE_LABEL_VALUES_ROOT_PROCESS_GROUP))); + assertThat(4L, equalTo(result.get(CLUSTER_LABEL_KEY))); } @Test @@ -317,7 +321,13 @@ public class TestFlowResource { } private String getResultKey(final Sample sample) { - return sample.labelNames.contains(COMPONENT_TYPE_LABEL) ? sample.labelValues.get(COMPONENT_TYPE_VALUE_INDEX) : SAMPLE_NAME_JVM; + if (sample.labelNames.contains(COMPONENT_TYPE_LABEL)) { + return sample.labelValues.get(COMPONENT_TYPE_VALUE_INDEX); + } + if (sample.name.startsWith(CLUSTER_TYPE_LABEL)) { + return CLUSTER_LABEL_KEY; + } + return SAMPLE_NAME_JVM; } private static List getCollectorRegistriesForJson() { @@ -327,6 +337,7 @@ public class TestFlowResource { registryList.add(getConnectionMetricsRegistry()); registryList.add(getJvmMetricsRegistry()); registryList.add(getBulletinMetricsRegistry()); + registryList.add(getClusterMetricsRegistry()); return registryList; @@ -378,6 +389,18 @@ public class TestFlowResource { return bulletinMetricsRegistry.getRegistry(); } + private static CollectorRegistry getClusterMetricsRegistry() { + final ClusterMetricsRegistry clusterMetricsRegistry = new ClusterMetricsRegistry(); + + clusterMetricsRegistry.setDataPoint(1, "IS_CLUSTERED", "B1Id"); + clusterMetricsRegistry.setDataPoint(1, "IS_CONNECTED_TO_CLUSTER", "B1Id"); + clusterMetricsRegistry.setDataPoint(2, "CONNECTED_NODE_COUNT", "B1Id", "2 / 3"); + clusterMetricsRegistry.setDataPoint(3, "TOTAL_NODE_COUNT", "B1Id"); + + return clusterMetricsRegistry.getRegistry(); + } + + private static class SampleDeserializer extends StdDeserializer { protected SampleDeserializer() { super(Sample.class);