NIFI-7379: Support multiple instances of Prometheus registries/metrics (#4229)

* NIFI-7379: Support multiple instances of Prometheus registries/metrics

* NIFI-7379: Refactored Prometheus objects to support multiple instances
This commit is contained in:
Matthew Burgess 2020-04-28 16:56:49 -04:00 committed by GitHub
parent a7008903ed
commit 1259bd5bd1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 607 additions and 432 deletions

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.prometheus.util;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import java.util.HashMap;
import java.util.Map;
public class AbstractMetricsRegistry {
protected final CollectorRegistry registry = new CollectorRegistry();
protected final Map<String, Gauge> nameToGaugeMap = new HashMap<>();
protected final Map<String, Counter> nameToCounterMap = new HashMap<>();
public CollectorRegistry getRegistry() {
return registry;
}
public void setDataPoint(double val, String gaugeName, String... labels) {
Gauge gauge = nameToGaugeMap.get(gaugeName);
if (gauge == null) {
throw new IllegalArgumentException("Gauge '" + gaugeName + "' does not exist in this registry");
}
gauge.labels(labels).set(val);
}
public void incrementCounter(double val, String counterName, String... labels) {
Counter counter = nameToCounterMap.get(counterName);
if (counter == null) {
throw new IllegalArgumentException("Counter '" + counterName + "' does not exist in this registry");
}
counter.labels(labels).inc(val);
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.prometheus.util;
import io.prometheus.client.Gauge;
public class BulletinMetricsRegistry extends AbstractMetricsRegistry {
public BulletinMetricsRegistry() {
nameToGaugeMap.put("BULLETIN", Gauge.build()
.name("nifi_bulletin")
.help("Bulletin reported by the NiFi instance")
.labelNames("instance", "component_type", "component_id", "parent_id",
"node_address", "category", "source_name", "source_id", "level")
.register(registry));
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.prometheus.util;
import io.prometheus.client.Gauge;
public class ConnectionAnalyticsMetricsRegistry extends AbstractMetricsRegistry {
public ConnectionAnalyticsMetricsRegistry() {
// Connection status analytics metrics
nameToGaugeMap.put("TIME_TO_BYTES_BACKPRESSURE_PREDICTION", Gauge.build()
.name("nifi_time_to_bytes_backpressure_prediction")
.help("Predicted time (in milliseconds) until backpressure will be applied on the connection due to bytes in the queue")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(registry));
nameToGaugeMap.put("TIME_TO_COUNT_BACKPRESSURE_PREDICTION", Gauge.build()
.name("nifi_time_to_count_backpressure_prediction")
.help("Predicted time (in milliseconds) until backpressure will be applied on the connection due to number of objects in the queue")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(registry));
nameToGaugeMap.put("BYTES_AT_NEXT_INTERVAL_PREDICTION", Gauge.build()
.name("nifi_bytes_at_next_interval_prediction")
.help("Predicted number of bytes in the queue at the next configured interval")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(registry));
nameToGaugeMap.put("COUNT_AT_NEXT_INTERVAL_PREDICTION", Gauge.build()
.name("nifi_count_at_next_interval_prediction")
.help("Predicted number of objects in the queue at the next configured interval")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(registry));
}
}

View File

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.prometheus.util;
import io.prometheus.client.Gauge;
public class JvmMetricsRegistry extends AbstractMetricsRegistry {
public JvmMetricsRegistry() {
///////////////////////////////////////////////////////////////
// JVM Metrics
///////////////////////////////////////////////////////////////
nameToGaugeMap.put("JVM_HEAP_USED", Gauge.build()
.name("nifi_jvm_heap_used")
.help("NiFi JVM heap used")
.labelNames("instance")
.register(registry));
nameToGaugeMap.put("JVM_HEAP_USAGE", Gauge.build()
.name("nifi_jvm_heap_usage")
.help("NiFi JVM heap usage")
.labelNames("instance")
.register(registry));
nameToGaugeMap.put("JVM_HEAP_NON_USAGE", Gauge.build()
.name("nifi_jvm_heap_non_usage")
.help("NiFi JVM heap non usage")
.labelNames("instance")
.register(registry));
nameToGaugeMap.put("JVM_THREAD_COUNT", Gauge.build()
.name("nifi_jvm_thread_count")
.help("NiFi JVM thread count")
.labelNames("instance")
.register(registry));
nameToGaugeMap.put("JVM_DAEMON_THREAD_COUNT", Gauge.build()
.name("nifi_jvm_daemon_thread_count")
.help("NiFi JVM daemon thread count")
.labelNames("instance")
.register(registry));
nameToGaugeMap.put("JVM_UPTIME", Gauge.build()
.name("nifi_jvm_uptime")
.help("NiFi JVM uptime")
.labelNames("instance")
.register(registry));
nameToGaugeMap.put("JVM_FILE_DESCRIPTOR_USAGE", Gauge.build()
.name("nifi_jvm_file_descriptor_usage")
.help("NiFi JVM file descriptor usage")
.labelNames("instance")
.register(registry));
nameToGaugeMap.put("JVM_GC_RUNS", Gauge.build()
.name("nifi_jvm_gc_runs")
.help("NiFi JVM GC number of runs")
.labelNames("instance", "gc_name")
.register(registry));
nameToGaugeMap.put("JVM_GC_TIME", Gauge.build()
.name("nifi_jvm_gc_time")
.help("NiFi JVM GC time in milliseconds")
.labelNames("instance", "gc_name")
.register(registry));
}
}

View File

@ -0,0 +1,203 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.prometheus.util;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
public class NiFiMetricsRegistry extends AbstractMetricsRegistry {
public NiFiMetricsRegistry() {
// Processor / Process Group metrics
nameToGaugeMap.put("AMOUNT_FLOWFILES_SENT", Gauge.build()
.name("nifi_amount_flowfiles_sent")
.help("Total number of FlowFiles sent by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(registry));
nameToGaugeMap.put("AMOUNT_FLOWFILES_TRANSFERRED", Gauge.build()
.name("nifi_amount_flowfiles_transferred")
.help("Total number of FlowFiles transferred by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(registry));
nameToGaugeMap.put("AMOUNT_FLOWFILES_RECEIVED", Gauge.build()
.name("nifi_amount_flowfiles_received")
.help("Total number of FlowFiles received by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(registry));
nameToGaugeMap.put("AMOUNT_FLOWFILES_REMOVED", Gauge.build()
.name("nifi_amount_flowfiles_removed")
.help("Total number of FlowFiles removed by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(registry));
nameToGaugeMap.put("AMOUNT_BYTES_SENT", Gauge.build()
.name("nifi_amount_bytes_sent")
.help("Total number of bytes sent by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(registry));
nameToGaugeMap.put("AMOUNT_BYTES_READ", Gauge.build()
.name("nifi_amount_bytes_read")
.help("Total number of bytes read by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(registry));
nameToCounterMap.put("TOTAL_BYTES_READ", Counter.build().name("nifi_total_bytes_read")
.help("Total number of bytes read by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(registry));
nameToCounterMap.put("TOTAL_BYTES_WRITTEN", Counter.build().name("nifi_total_bytes_written")
.help("Total number of bytes written by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(registry));
nameToGaugeMap.put("AMOUNT_BYTES_WRITTEN", Gauge.build()
.name("nifi_amount_bytes_written")
.help("Total number of bytes written by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(registry));
nameToGaugeMap.put("AMOUNT_BYTES_RECEIVED", Gauge.build()
.name("nifi_amount_bytes_received")
.help("Total number of bytes received by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(registry));
nameToGaugeMap.put("AMOUNT_BYTES_TRANSFERRED", Gauge.build()
.name("nifi_amount_bytes_transferred")
.help("Total number of Bytes transferred by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(registry));
nameToGaugeMap.put("AMOUNT_THREADS_TOTAL_ACTIVE", Gauge.build()
.name("nifi_amount_threads_active")
.help("Total number of threads active for the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(registry));
nameToGaugeMap.put("AMOUNT_THREADS_TOTAL_TERMINATED", Gauge.build()
.name("nifi_amount_threads_terminated")
.help("Total number of threads terminated for the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(registry));
nameToGaugeMap.put("SIZE_CONTENT_OUTPUT_TOTAL", Gauge.build()
.name("nifi_size_content_output_total")
.help("Total size of content output by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(registry));
nameToGaugeMap.put("SIZE_CONTENT_INPUT_TOTAL", Gauge.build()
.name("nifi_size_content_input_total")
.help("Total size of content input by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(registry));
nameToGaugeMap.put("SIZE_CONTENT_QUEUED_TOTAL", Gauge.build()
.name("nifi_size_content_queued_total")
.help("Total size of content queued in the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(registry));
nameToGaugeMap.put("AMOUNT_ITEMS_OUTPUT", Gauge.build()
.name("nifi_amount_items_output")
.help("Total number of items output by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(registry));
nameToGaugeMap.put("AMOUNT_ITEMS_INPUT", Gauge.build()
.name("nifi_amount_items_input")
.help("Total number of items input by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(registry));
nameToGaugeMap.put("AMOUNT_ITEMS_QUEUED", Gauge.build()
.name("nifi_amount_items_queued")
.help("Total number of items queued by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(registry));
// Processor metrics
nameToGaugeMap.put("PROCESSOR_COUNTERS", Gauge.build()
.name("nifi_processor_counters")
.help("Counters exposed by NiFi Processors")
.labelNames("processor_name", "counter_name", "processor_id", "instance")
.register(registry));
// Connection metrics
nameToGaugeMap.put("BACKPRESSURE_BYTES_THRESHOLD", Gauge.build()
.name("nifi_backpressure_bytes_threshold")
.help("The number of bytes that can be queued before backpressure is applied")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(registry));
nameToGaugeMap.put("BACKPRESSURE_OBJECT_THRESHOLD", Gauge.build()
.name("nifi_backpressure_object_threshold")
.help("The number of flow files that can be queued before backpressure is applied")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(registry));
nameToGaugeMap.put("IS_BACKPRESSURE_ENABLED", Gauge.build()
.name("nifi_backpressure_enabled")
.help("Whether backpressure has been applied for this component. Values are 0 or 1")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(registry));
// Port metrics
nameToGaugeMap.put("IS_TRANSMITTING", Gauge.build()
.name("nifi_transmitting")
.help("Whether this component is transmitting data. Values are 0 or 1")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id", "run_status")
.register(registry));
// Remote Process Group (RPG) metrics
nameToGaugeMap.put("ACTIVE_REMOTE_PORT_COUNT", Gauge.build()
.name("nifi_active_remote_port_count")
.help("The number of active remote ports associated with this component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(registry));
nameToGaugeMap.put("INACTIVE_REMOTE_PORT_COUNT", Gauge.build()
.name("nifi_inactive_remote_port_count")
.help("The number of inactive remote ports associated with this component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(registry));
nameToGaugeMap.put("AVERAGE_LINEAGE_DURATION", Gauge.build()
.name("nifi_average_lineage_duration")
.help("The average lineage duration (in milliseconds) for all flow file processed by this component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(registry));
}
}

View File

@ -19,12 +19,9 @@ package org.apache.nifi.prometheus.util;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.lang.reflect.Modifier; import java.lang.reflect.Modifier;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import io.prometheus.client.Counter;
import io.prometheus.client.SimpleCollector; import io.prometheus.client.SimpleCollector;
import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
@ -34,7 +31,6 @@ import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus; import org.apache.nifi.controller.status.ProcessorStatus;
import io.prometheus.client.CollectorRegistry; import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Gauge;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus; import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
import org.apache.nifi.controller.status.TransmissionStatus; import org.apache.nifi.controller.status.TransmissionStatus;
import org.apache.nifi.controller.status.analytics.StatusAnalytics; import org.apache.nifi.controller.status.analytics.StatusAnalytics;
@ -53,13 +49,9 @@ public class PrometheusMetricsUtil {
public static final AllowableValue METRICS_STRATEGY_COMPONENTS = new AllowableValue("All Components", "All Components", public static final AllowableValue METRICS_STRATEGY_COMPONENTS = new AllowableValue("All Components", "All Components",
"Send metrics for each component in the system, to include processors, connections, controller services, etc."); "Send metrics for each component in the system, to include processors, connections, controller services, etc.");
private static final CollectorRegistry NIFI_REGISTRY = new CollectorRegistry();
private static final CollectorRegistry JVM_REGISTRY = new CollectorRegistry();
private static final CollectorRegistry CONNECTION_ANALYTICS_REGISTRY = new CollectorRegistry(); private static final CollectorRegistry CONNECTION_ANALYTICS_REGISTRY = new CollectorRegistry();
private static final CollectorRegistry BULLETIN_REGISTRY = new CollectorRegistry(); private static final CollectorRegistry BULLETIN_REGISTRY = new CollectorRegistry();
public static final Collection<CollectorRegistry> ALL_REGISTRIES = Arrays.asList(NIFI_REGISTRY, CONNECTION_ANALYTICS_REGISTRY, BULLETIN_REGISTRY, JVM_REGISTRY);
protected static final String DEFAULT_LABEL_STRING = ""; protected static final String DEFAULT_LABEL_STRING = "";
// Common properties/values // Common properties/values
@ -101,277 +93,8 @@ public class PrometheusMetricsUtil {
.defaultValue(CLIENT_NONE.getValue()) .defaultValue(CLIENT_NONE.getValue())
.build(); .build();
// Processor / Process Group metrics public static CollectorRegistry createNifiMetrics(NiFiMetricsRegistry nifiMetricsRegistry, ProcessGroupStatus status,
private static final Gauge AMOUNT_FLOWFILES_SENT = Gauge.build() String instId, String parentProcessGroupId, String compType, String metricsStrategy) {
.name("nifi_amount_flowfiles_sent")
.help("Total number of FlowFiles sent by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_FLOWFILES_TRANSFERRED = Gauge.build()
.name("nifi_amount_flowfiles_transferred")
.help("Total number of FlowFiles transferred by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_FLOWFILES_RECEIVED = Gauge.build()
.name("nifi_amount_flowfiles_received")
.help("Total number of FlowFiles received by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_FLOWFILES_REMOVED = Gauge.build()
.name("nifi_amount_flowfiles_removed")
.help("Total number of FlowFiles removed by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_BYTES_SENT = Gauge.build()
.name("nifi_amount_bytes_sent")
.help("Total number of bytes sent by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_BYTES_READ = Gauge.build()
.name("nifi_amount_bytes_read")
.help("Total number of bytes read by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Counter TOTAL_BYTES_READ = Counter.build().name("nifi_total_bytes_read")
.help("Total number of bytes read by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Counter TOTAL_BYTES_WRITTEN = Counter.build().name("nifi_total_bytes_written")
.help("Total number of bytes written by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_BYTES_WRITTEN = Gauge.build()
.name("nifi_amount_bytes_written")
.help("Total number of bytes written by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_BYTES_RECEIVED = Gauge.build()
.name("nifi_amount_bytes_received")
.help("Total number of bytes received by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_BYTES_TRANSFERRED = Gauge.build()
.name("nifi_amount_bytes_transferred")
.help("Total number of Bytes transferred by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_THREADS_TOTAL_ACTIVE = Gauge.build()
.name("nifi_amount_threads_active")
.help("Total number of threads active for the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_THREADS_TOTAL_TERMINATED = Gauge.build()
.name("nifi_amount_threads_terminated")
.help("Total number of threads terminated for the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge SIZE_CONTENT_OUTPUT_TOTAL = Gauge.build()
.name("nifi_size_content_output_total")
.help("Total size of content output by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
private static final Gauge SIZE_CONTENT_INPUT_TOTAL = Gauge.build()
.name("nifi_size_content_input_total")
.help("Total size of content input by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
private static final Gauge SIZE_CONTENT_QUEUED_TOTAL = Gauge.build()
.name("nifi_size_content_queued_total")
.help("Total size of content queued in the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_ITEMS_OUTPUT = Gauge.build()
.name("nifi_amount_items_output")
.help("Total number of items output by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_ITEMS_INPUT = Gauge.build()
.name("nifi_amount_items_input")
.help("Total number of items input by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_ITEMS_QUEUED = Gauge.build()
.name("nifi_amount_items_queued")
.help("Total number of items queued by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
// Processor metrics
private static final Gauge PROCESSOR_COUNTERS = Gauge.build()
.name("nifi_processor_counters")
.help("Counters exposed by NiFi Processors")
.labelNames("processor_name", "counter_name", "processor_id", "instance")
.register(NIFI_REGISTRY);
// Connection metrics
private static final Gauge BACKPRESSURE_BYTES_THRESHOLD = Gauge.build()
.name("nifi_backpressure_bytes_threshold")
.help("The number of bytes that can be queued before backpressure is applied")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
private static final Gauge BACKPRESSURE_OBJECT_THRESHOLD = Gauge.build()
.name("nifi_backpressure_object_threshold")
.help("The number of flow files that can be queued before backpressure is applied")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
private static final Gauge IS_BACKPRESSURE_ENABLED = Gauge.build()
.name("nifi_backpressure_enabled")
.help("Whether backpressure has been applied for this component. Values are 0 or 1")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
// Port metrics
private static final Gauge IS_TRANSMITTING = Gauge.build()
.name("nifi_transmitting")
.help("Whether this component is transmitting data. Values are 0 or 1")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id", "run_status")
.register(NIFI_REGISTRY);
// Remote Process Group (RPG) metrics
private static final Gauge ACTIVE_REMOTE_PORT_COUNT = Gauge.build()
.name("nifi_active_remote_port_count")
.help("The number of active remote ports associated with this component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
private static final Gauge INACTIVE_REMOTE_PORT_COUNT = Gauge.build()
.name("nifi_inactive_remote_port_count")
.help("The number of inactive remote ports associated with this component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
private static final Gauge AVERAGE_LINEAGE_DURATION = Gauge.build()
.name("nifi_average_lineage_duration")
.help("The average lineage duration (in milliseconds) for all flow file processed by this component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
// Connection status analytics metrics
private static final Gauge TIME_TO_BYTES_BACKPRESSURE_PREDICTION = Gauge.build()
.name("nifi_time_to_bytes_backpressure_prediction")
.help("Predicted time (in milliseconds) until backpressure will be applied on the connection due to bytes in the queue")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(CONNECTION_ANALYTICS_REGISTRY);
private static final Gauge TIME_TO_COUNT_BACKPRESSURE_PREDICTION = Gauge.build()
.name("nifi_time_to_count_backpressure_prediction")
.help("Predicted time (in milliseconds) until backpressure will be applied on the connection due to number of objects in the queue")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(CONNECTION_ANALYTICS_REGISTRY);
private static final Gauge BYTES_AT_NEXT_INTERVAL_PREDICTION = Gauge.build()
.name("nifi_bytes_at_next_interval_prediction")
.help("Predicted number of bytes in the queue at the next configured interval")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(CONNECTION_ANALYTICS_REGISTRY);
private static final Gauge COUNT_AT_NEXT_INTERVAL_PREDICTION = Gauge.build()
.name("nifi_count_at_next_interval_prediction")
.help("Predicted number of objects in the queue at the next configured interval")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(CONNECTION_ANALYTICS_REGISTRY);
private static final Gauge BULLETIN = Gauge.build()
.name("nifi_bulletin")
.help("Bulletin reported by the NiFi instance")
.labelNames("instance", "component_type", "component_id", "parent_id",
"node_address", "category", "source_name", "source_id", "level")
.register(BULLETIN_REGISTRY);
///////////////////////////////////////////////////////////////
// JVM Metrics
///////////////////////////////////////////////////////////////
private static final Gauge JVM_HEAP_USED = Gauge.build()
.name("nifi_jvm_heap_used")
.help("NiFi JVM heap used")
.labelNames("instance")
.register(JVM_REGISTRY);
private static final Gauge JVM_HEAP_USAGE = Gauge.build()
.name("nifi_jvm_heap_usage")
.help("NiFi JVM heap usage")
.labelNames("instance")
.register(JVM_REGISTRY);
private static final Gauge JVM_HEAP_NON_USAGE = Gauge.build()
.name("nifi_jvm_heap_non_usage")
.help("NiFi JVM heap non usage")
.labelNames("instance")
.register(JVM_REGISTRY);
private static final Gauge JVM_THREAD_COUNT = Gauge.build()
.name("nifi_jvm_thread_count")
.help("NiFi JVM thread count")
.labelNames("instance")
.register(JVM_REGISTRY);
private static final Gauge JVM_DAEMON_THREAD_COUNT = Gauge.build()
.name("nifi_jvm_daemon_thread_count")
.help("NiFi JVM daemon thread count")
.labelNames("instance")
.register(JVM_REGISTRY);
private static final Gauge JVM_UPTIME = Gauge.build()
.name("nifi_jvm_uptime")
.help("NiFi JVM uptime")
.labelNames("instance")
.register(JVM_REGISTRY);
private static final Gauge JVM_FILE_DESCRIPTOR_USAGE = Gauge.build()
.name("nifi_jvm_file_descriptor_usage")
.help("NiFi JVM file descriptor usage")
.labelNames("instance")
.register(JVM_REGISTRY);
private static final Gauge JVM_GC_RUNS = Gauge.build()
.name("nifi_jvm_gc_runs")
.help("NiFi JVM GC number of runs")
.labelNames("instance", "gc_name")
.register(JVM_REGISTRY);
private static final Gauge JVM_GC_TIME = Gauge.build()
.name("nifi_jvm_gc_time")
.help("NiFi JVM GC time in milliseconds")
.labelNames("instance", "gc_name")
.register(JVM_REGISTRY);
public static CollectorRegistry createNifiMetrics(ProcessGroupStatus status, String instId, String parentProcessGroupId, String compType, String metricsStrategy) {
final String instanceId = StringUtils.isEmpty(instId) ? DEFAULT_LABEL_STRING : instId; final String instanceId = StringUtils.isEmpty(instId) ? DEFAULT_LABEL_STRING : instId;
final String parentPGId = StringUtils.isEmpty(parentProcessGroupId) ? DEFAULT_LABEL_STRING : parentProcessGroupId; final String parentPGId = StringUtils.isEmpty(parentProcessGroupId) ? DEFAULT_LABEL_STRING : parentProcessGroupId;
@ -393,40 +116,37 @@ public class PrometheusMetricsUtil {
} }
} }
AMOUNT_FLOWFILES_SENT.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getFlowFilesSent()); nifiMetricsRegistry.setDataPoint(status.getFlowFilesSent(), "AMOUNT_FLOWFILES_SENT", instanceId, componentType, componentName, componentId, parentPGId);
AMOUNT_FLOWFILES_TRANSFERRED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getFlowFilesTransferred()); nifiMetricsRegistry.setDataPoint(status.getFlowFilesTransferred(), "AMOUNT_FLOWFILES_TRANSFERRED", instanceId, componentType, componentName, componentId, parentPGId);
AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getFlowFilesReceived()); nifiMetricsRegistry.setDataPoint(status.getFlowFilesReceived(), "AMOUNT_FLOWFILES_RECEIVED", instanceId, componentType, componentName, componentId, parentPGId);
AMOUNT_BYTES_SENT.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesSent()); nifiMetricsRegistry.setDataPoint(status.getBytesSent(), "AMOUNT_BYTES_SENT", instanceId, componentType, componentName, componentId, parentPGId);
AMOUNT_BYTES_READ.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesRead()); nifiMetricsRegistry.setDataPoint(status.getBytesRead(), "AMOUNT_BYTES_READ", instanceId, componentType, componentName, componentId, parentPGId);
AMOUNT_BYTES_WRITTEN.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesWritten()); nifiMetricsRegistry.setDataPoint(status.getBytesWritten(), "AMOUNT_BYTES_WRITTEN", instanceId, componentType, componentName, componentId, parentPGId);
TOTAL_BYTES_READ.labels(instanceId, componentType, componentName, componentId, parentPGId).inc(status.getBytesRead()); nifiMetricsRegistry.incrementCounter(status.getBytesRead(), "TOTAL_BYTES_READ", instanceId, componentType, componentName, componentId, parentPGId);
TOTAL_BYTES_WRITTEN.labels(instanceId, componentType, componentName, componentId, parentPGId).inc(status.getBytesWritten()); nifiMetricsRegistry.incrementCounter(status.getBytesWritten(), "TOTAL_BYTES_WRITTEN", instanceId, componentType, componentName, componentId, parentPGId);
AMOUNT_BYTES_RECEIVED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesReceived()); nifiMetricsRegistry.setDataPoint(status.getBytesReceived(), "AMOUNT_BYTES_RECEIVED", instanceId, componentType, componentName, componentId, parentPGId);
AMOUNT_BYTES_TRANSFERRED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesTransferred()); nifiMetricsRegistry.setDataPoint(status.getBytesTransferred(), "AMOUNT_BYTES_TRANSFERRED", instanceId, componentType, componentName, componentId, parentPGId);
SIZE_CONTENT_OUTPUT_TOTAL.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "") nifiMetricsRegistry.setDataPoint(status.getOutputContentSize(), "SIZE_CONTENT_OUTPUT_TOTAL",
.set(status.getOutputContentSize()); instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "");
SIZE_CONTENT_INPUT_TOTAL.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "") nifiMetricsRegistry.setDataPoint(status.getInputContentSize(), "SIZE_CONTENT_INPUT_TOTAL",
.set(status.getInputContentSize()); instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "");
SIZE_CONTENT_QUEUED_TOTAL.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "") nifiMetricsRegistry.setDataPoint(status.getQueuedContentSize(), "SIZE_CONTENT_QUEUED_TOTAL",
.set(status.getQueuedContentSize()); instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "");
AMOUNT_ITEMS_OUTPUT.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "") nifiMetricsRegistry.setDataPoint(status.getOutputCount(), "AMOUNT_ITEMS_OUTPUT", instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "");
.set(status.getOutputCount()); nifiMetricsRegistry.setDataPoint(status.getInputCount(), "AMOUNT_ITEMS_INPUT", instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "");
AMOUNT_ITEMS_INPUT.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "") nifiMetricsRegistry.setDataPoint(status.getQueuedCount(), "AMOUNT_ITEMS_QUEUED", instanceId, componentType, componentName, componentId, parentPGId,"", "", "", "");
.set(status.getInputCount());
AMOUNT_ITEMS_QUEUED.labels(instanceId, componentType, componentName, componentId, parentPGId,"", "", "", "")
.set(status.getQueuedCount());
AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, componentType, componentName, componentId, parentPGId) nifiMetricsRegistry.setDataPoint(status.getActiveThreadCount() == null ? 0 : status.getActiveThreadCount(), "AMOUNT_THREADS_TOTAL_ACTIVE",
.set(status.getActiveThreadCount() == null ? 0 : status.getActiveThreadCount()); instanceId, componentType, componentName, componentId, parentPGId);
AMOUNT_THREADS_TOTAL_TERMINATED.labels(instanceId, componentType, componentName, componentId, parentPGId) nifiMetricsRegistry.setDataPoint(status.getTerminatedThreadCount() == null ? 0 : status.getTerminatedThreadCount(), "AMOUNT_THREADS_TOTAL_TERMINATED",
.set(status.getTerminatedThreadCount() == null ? 0 : status.getTerminatedThreadCount()); instanceId, componentType, componentName, componentId, parentPGId);
// Report metrics for child process groups if specified // Report metrics for child process groups if specified
if (METRICS_STRATEGY_PG.getValue().equals(metricsStrategy) || METRICS_STRATEGY_COMPONENTS.getValue().equals(metricsStrategy)) { if (METRICS_STRATEGY_PG.getValue().equals(metricsStrategy) || METRICS_STRATEGY_COMPONENTS.getValue().equals(metricsStrategy)) {
status.getProcessGroupStatus().forEach((childGroupStatus) -> createNifiMetrics(childGroupStatus, instanceId, componentId, "ProcessGroup", metricsStrategy)); status.getProcessGroupStatus().forEach((childGroupStatus) -> createNifiMetrics(nifiMetricsRegistry, childGroupStatus, instanceId, componentId, "ProcessGroup", metricsStrategy));
} }
if (METRICS_STRATEGY_COMPONENTS.getValue().equals(metricsStrategy)) { if (METRICS_STRATEGY_COMPONENTS.getValue().equals(metricsStrategy)) {
@ -435,8 +155,8 @@ public class PrometheusMetricsUtil {
Map<String, Long> counters = processorStatus.getCounters(); Map<String, Long> counters = processorStatus.getCounters();
if (counters != null) { if (counters != null) {
counters.entrySet().stream().forEach(entry -> PROCESSOR_COUNTERS counters.entrySet().stream().forEach(entry -> nifiMetricsRegistry.setDataPoint(entry.getValue(), "PROCESSOR_COUNTERS",
.labels(processorStatus.getName(), entry.getKey(), processorStatus.getId(), instanceId).set(entry.getValue())); processorStatus.getName(), entry.getKey(), processorStatus.getId(), instanceId));
} }
final String procComponentType = "Processor"; final String procComponentType = "Processor";
@ -444,34 +164,36 @@ public class PrometheusMetricsUtil {
final String procComponentName = StringUtils.isEmpty(processorStatus.getName()) ? DEFAULT_LABEL_STRING : processorStatus.getName(); final String procComponentName = StringUtils.isEmpty(processorStatus.getName()) ? DEFAULT_LABEL_STRING : processorStatus.getName();
final String parentId = StringUtils.isEmpty(processorStatus.getGroupId()) ? DEFAULT_LABEL_STRING : processorStatus.getGroupId(); final String parentId = StringUtils.isEmpty(processorStatus.getGroupId()) ? DEFAULT_LABEL_STRING : processorStatus.getGroupId();
AMOUNT_FLOWFILES_SENT.labels(instanceId, procComponentType, procComponentName, procComponentId, parentId).set(processorStatus.getFlowFilesSent()); nifiMetricsRegistry.setDataPoint(processorStatus.getFlowFilesSent(), "AMOUNT_FLOWFILES_SENT", instanceId, procComponentType, procComponentName, procComponentId, parentId);
AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, procComponentType, procComponentName, procComponentId, parentId).set(processorStatus.getFlowFilesReceived()); nifiMetricsRegistry.setDataPoint(processorStatus.getFlowFilesReceived(), "AMOUNT_FLOWFILES_RECEIVED",
AMOUNT_FLOWFILES_REMOVED.labels(instanceId, procComponentType, procComponentName, procComponentId, parentId).set(processorStatus.getFlowFilesRemoved()); instanceId, procComponentType, procComponentName, procComponentId, parentId);
nifiMetricsRegistry.setDataPoint(processorStatus.getFlowFilesRemoved(), "AMOUNT_FLOWFILES_REMOVED",
instanceId, procComponentType, procComponentName, procComponentId, parentId);
AMOUNT_BYTES_SENT.labels(instanceId, procComponentType, procComponentName, procComponentId, parentId).set(processorStatus.getBytesSent()); nifiMetricsRegistry.setDataPoint(processorStatus.getBytesSent(), "AMOUNT_BYTES_SENT", instanceId, procComponentType, procComponentName, procComponentId, parentId);
AMOUNT_BYTES_READ.labels(instanceId, procComponentType, procComponentName, procComponentId, parentId).set(processorStatus.getBytesRead()); nifiMetricsRegistry.setDataPoint(processorStatus.getBytesRead(), "AMOUNT_BYTES_READ", instanceId, procComponentType, procComponentName, procComponentId, parentId);
AMOUNT_BYTES_WRITTEN.labels(instanceId, procComponentType, procComponentName, procComponentId, parentId).set(processorStatus.getBytesWritten()); nifiMetricsRegistry.setDataPoint(processorStatus.getBytesWritten(), "AMOUNT_BYTES_WRITTEN", instanceId, procComponentType, procComponentName, procComponentId, parentId);
TOTAL_BYTES_READ.labels(instanceId, procComponentType, procComponentName, procComponentId, parentId).inc(status.getBytesRead()); nifiMetricsRegistry.incrementCounter(status.getBytesRead(), "TOTAL_BYTES_READ", instanceId, procComponentType, procComponentName, procComponentId, parentId);
TOTAL_BYTES_WRITTEN.labels(instanceId, procComponentType, procComponentName, procComponentId, parentId).inc(status.getBytesWritten()); nifiMetricsRegistry.incrementCounter(status.getBytesWritten(), "TOTAL_BYTES_WRITTEN", instanceId, procComponentType, procComponentName, procComponentId, parentId);
AMOUNT_BYTES_RECEIVED.labels(instanceId, procComponentType, procComponentName, procComponentId, parentId).set(processorStatus.getBytesReceived()); nifiMetricsRegistry.setDataPoint(processorStatus.getBytesReceived(), "AMOUNT_BYTES_RECEIVED", instanceId, procComponentType, procComponentName, procComponentId, parentId);
SIZE_CONTENT_OUTPUT_TOTAL.labels(instanceId, procComponentType, procComponentName, procComponentId, parentId, "", "", "", "") nifiMetricsRegistry.setDataPoint(processorStatus.getOutputBytes(), "SIZE_CONTENT_OUTPUT_TOTAL",
.set(processorStatus.getOutputBytes()); instanceId, procComponentType, procComponentName, procComponentId, parentId, "", "", "", "");
SIZE_CONTENT_INPUT_TOTAL.labels(instanceId, procComponentType, procComponentName, procComponentId, parentId, "", "", "", "") nifiMetricsRegistry.setDataPoint(processorStatus.getInputBytes(), "SIZE_CONTENT_INPUT_TOTAL",
.set(processorStatus.getInputBytes()); instanceId, procComponentType, procComponentName, procComponentId, parentId, "", "", "", "");
AMOUNT_ITEMS_OUTPUT.labels(instanceId, procComponentType, procComponentName, procComponentId, parentId, "", "", "", "") nifiMetricsRegistry.setDataPoint(processorStatus.getOutputCount(), "AMOUNT_ITEMS_OUTPUT",
.set(processorStatus.getOutputCount()); instanceId, procComponentType, procComponentName, procComponentId, parentId, "", "", "", "");
AMOUNT_ITEMS_INPUT.labels(instanceId, procComponentType, procComponentName, procComponentId, parentId, "", "", "", "") nifiMetricsRegistry.setDataPoint(processorStatus.getInputCount(), "AMOUNT_ITEMS_INPUT",
.set(processorStatus.getInputCount()); instanceId, procComponentType, procComponentName, procComponentId, parentId, "", "", "", "");
AVERAGE_LINEAGE_DURATION.labels(instanceId, procComponentType, procComponentName, procComponentId, parentId, "", "", "", "") nifiMetricsRegistry.setDataPoint(processorStatus.getAverageLineageDuration(), "AVERAGE_LINEAGE_DURATION",
.set(processorStatus.getAverageLineageDuration()); instanceId, procComponentType, procComponentName, procComponentId, parentId, "", "", "", "");
AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, procComponentType, procComponentName, procComponentId, parentId) nifiMetricsRegistry.setDataPoint(status.getActiveThreadCount() == null ? 0 : status.getActiveThreadCount(), "AMOUNT_THREADS_TOTAL_ACTIVE",
.set(status.getActiveThreadCount() == null ? 0 : status.getActiveThreadCount()); instanceId, procComponentType, procComponentName, procComponentId, parentId);
AMOUNT_THREADS_TOTAL_TERMINATED.labels(instanceId, procComponentType, procComponentName, procComponentId, parentId) nifiMetricsRegistry.setDataPoint(status.getTerminatedThreadCount() == null ? 0 : status.getTerminatedThreadCount(), "AMOUNT_THREADS_TOTAL_TERMINATED",
.set(status.getTerminatedThreadCount() == null ? 0 : status.getTerminatedThreadCount()); instanceId, procComponentType, procComponentName, procComponentId, parentId);
} }
for (ConnectionStatus connectionStatus : status.getConnectionStatus()) { for (ConnectionStatus connectionStatus : status.getConnectionStatus()) {
@ -483,80 +205,80 @@ public class PrometheusMetricsUtil {
final String destinationName = StringUtils.isEmpty(connectionStatus.getDestinationName()) ? DEFAULT_LABEL_STRING : connectionStatus.getDestinationName(); final String destinationName = StringUtils.isEmpty(connectionStatus.getDestinationName()) ? DEFAULT_LABEL_STRING : connectionStatus.getDestinationName();
final String parentId = StringUtils.isEmpty(connectionStatus.getGroupId()) ? DEFAULT_LABEL_STRING : connectionStatus.getGroupId(); final String parentId = StringUtils.isEmpty(connectionStatus.getGroupId()) ? DEFAULT_LABEL_STRING : connectionStatus.getGroupId();
final String connComponentType = "Connection"; final String connComponentType = "Connection";
SIZE_CONTENT_OUTPUT_TOTAL.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName) nifiMetricsRegistry.setDataPoint(connectionStatus.getOutputBytes(), "SIZE_CONTENT_OUTPUT_TOTAL",
.set(connectionStatus.getOutputBytes()); instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName);
SIZE_CONTENT_INPUT_TOTAL.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName) nifiMetricsRegistry.setDataPoint(connectionStatus.getInputBytes(), "SIZE_CONTENT_INPUT_TOTAL",
.set(connectionStatus.getInputBytes()); instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName);
SIZE_CONTENT_QUEUED_TOTAL.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName) nifiMetricsRegistry.setDataPoint(connectionStatus.getQueuedBytes(), "SIZE_CONTENT_QUEUED_TOTAL",
.set(connectionStatus.getQueuedBytes()); instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName);
AMOUNT_ITEMS_OUTPUT.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName) nifiMetricsRegistry.setDataPoint(connectionStatus.getOutputCount(), "AMOUNT_ITEMS_OUTPUT",
.set(connectionStatus.getOutputCount()); instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName);
AMOUNT_ITEMS_INPUT.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName) nifiMetricsRegistry.setDataPoint(connectionStatus.getInputCount(), "AMOUNT_ITEMS_INPUT",
.set(connectionStatus.getInputCount()); instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName);
AMOUNT_ITEMS_QUEUED.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName) nifiMetricsRegistry.setDataPoint(connectionStatus.getQueuedCount(), "AMOUNT_ITEMS_QUEUED",
.set(connectionStatus.getQueuedCount()); instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName);
BACKPRESSURE_BYTES_THRESHOLD.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName) nifiMetricsRegistry.setDataPoint(connectionStatus.getBackPressureBytesThreshold(), "BACKPRESSURE_BYTES_THRESHOLD",
.set(connectionStatus.getBackPressureBytesThreshold()); instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName);
BACKPRESSURE_OBJECT_THRESHOLD.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName) nifiMetricsRegistry.setDataPoint(connectionStatus.getBackPressureObjectThreshold(), "BACKPRESSURE_OBJECT_THRESHOLD",
.set(connectionStatus.getBackPressureObjectThreshold()); instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName);
boolean isBackpressureEnabled = (connectionStatus.getBackPressureObjectThreshold() > 0 && connectionStatus.getBackPressureObjectThreshold() <= connectionStatus.getQueuedCount()) boolean isBackpressureEnabled = (connectionStatus.getBackPressureObjectThreshold() > 0 && connectionStatus.getBackPressureObjectThreshold() <= connectionStatus.getQueuedCount())
|| (connectionStatus.getBackPressureBytesThreshold() > 0 && connectionStatus.getBackPressureBytesThreshold() <= connectionStatus.getMaxQueuedBytes()); || (connectionStatus.getBackPressureBytesThreshold() > 0 && connectionStatus.getBackPressureBytesThreshold() <= connectionStatus.getMaxQueuedBytes());
IS_BACKPRESSURE_ENABLED.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName) nifiMetricsRegistry.setDataPoint(isBackpressureEnabled ? 1 : 0, "IS_BACKPRESSURE_ENABLED",
.set(isBackpressureEnabled ? 1 : 0); instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName);
} }
for (PortStatus portStatus : status.getInputPortStatus()) { for (PortStatus portStatus : status.getInputPortStatus()) {
final String portComponentId = StringUtils.isEmpty(portStatus.getId()) ? DEFAULT_LABEL_STRING : portStatus.getId(); final String portComponentId = StringUtils.isEmpty(portStatus.getId()) ? DEFAULT_LABEL_STRING : portStatus.getId();
final String portComponentName = StringUtils.isEmpty(portStatus.getName()) ? DEFAULT_LABEL_STRING : portStatus.getId(); final String portComponentName = StringUtils.isEmpty(portStatus.getName()) ? DEFAULT_LABEL_STRING : portStatus.getId();
final String parentId = StringUtils.isEmpty(portStatus.getGroupId()) ? DEFAULT_LABEL_STRING : portStatus.getId(); final String parentId = StringUtils.isEmpty(portStatus.getGroupId()) ? DEFAULT_LABEL_STRING : portStatus.getId();
final String portComponentType = "InputPort"; final String portComponentType = "InputPort";
AMOUNT_FLOWFILES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getFlowFilesSent()); nifiMetricsRegistry.setDataPoint(portStatus.getFlowFilesSent(), "AMOUNT_FLOWFILES_SENT", instanceId, portComponentType, portComponentName, portComponentId, parentId);
AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getFlowFilesReceived()); nifiMetricsRegistry.setDataPoint(portStatus.getFlowFilesReceived(), "AMOUNT_FLOWFILES_RECEIVED", instanceId, portComponentType, portComponentName, portComponentId, parentId);
AMOUNT_BYTES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesSent()); nifiMetricsRegistry.setDataPoint(portStatus.getBytesSent(), "AMOUNT_BYTES_SENT", instanceId, portComponentType, portComponentName, portComponentId, parentId);
AMOUNT_BYTES_READ.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getInputBytes()); nifiMetricsRegistry.setDataPoint(portStatus.getInputBytes(), "AMOUNT_BYTES_READ", instanceId, portComponentType, portComponentName, portComponentId, parentId);
AMOUNT_BYTES_WRITTEN.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getOutputBytes()); nifiMetricsRegistry.setDataPoint(portStatus.getOutputBytes(), "AMOUNT_BYTES_WRITTEN", instanceId, portComponentType, portComponentName, portComponentId, parentId);
TOTAL_BYTES_READ.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).inc(status.getBytesRead()); nifiMetricsRegistry.incrementCounter(status.getBytesRead(), "TOTAL_BYTES_READ", instanceId, portComponentType, portComponentName, portComponentId, parentId);
TOTAL_BYTES_WRITTEN.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).inc(status.getBytesWritten()); nifiMetricsRegistry.incrementCounter(status.getBytesWritten(), "TOTAL_BYTES_WRITTEN", instanceId, portComponentType, portComponentName, portComponentId, parentId);
AMOUNT_BYTES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesReceived()); nifiMetricsRegistry.setDataPoint(portStatus.getBytesReceived(), "AMOUNT_BYTES_RECEIVED", instanceId, portComponentType, portComponentName, portComponentId, parentId);
AMOUNT_ITEMS_OUTPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "") nifiMetricsRegistry.setDataPoint(portStatus.getOutputCount(), "AMOUNT_ITEMS_OUTPUT",
.set(portStatus.getOutputCount()); instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "");
AMOUNT_ITEMS_INPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "") nifiMetricsRegistry.setDataPoint(portStatus.getInputCount(), "AMOUNT_ITEMS_INPUT",
.set(portStatus.getInputCount()); instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "");
final Boolean isTransmitting = portStatus.isTransmitting(); final Boolean isTransmitting = portStatus.isTransmitting();
IS_TRANSMITTING.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, portStatus.getRunStatus().name()) nifiMetricsRegistry.setDataPoint(isTransmitting == null ? 0 : (isTransmitting ? 1 : 0), "IS_TRANSMITTING",
.set(isTransmitting == null ? 0 : (isTransmitting ? 1 : 0)); instanceId, portComponentType, portComponentName, portComponentId, parentId, portStatus.getRunStatus().name());
AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getActiveThreadCount()); nifiMetricsRegistry.setDataPoint(portStatus.getActiveThreadCount(), "AMOUNT_THREADS_TOTAL_ACTIVE", instanceId, portComponentType, portComponentName, portComponentId, parentId);
} }
for (PortStatus portStatus : status.getOutputPortStatus()) { for (PortStatus portStatus : status.getOutputPortStatus()) {
final String portComponentId = StringUtils.isEmpty(portStatus.getId()) ? DEFAULT_LABEL_STRING : portStatus.getId(); final String portComponentId = StringUtils.isEmpty(portStatus.getId()) ? DEFAULT_LABEL_STRING : portStatus.getId();
final String portComponentName = StringUtils.isEmpty(portStatus.getName()) ? DEFAULT_LABEL_STRING : portStatus.getName(); final String portComponentName = StringUtils.isEmpty(portStatus.getName()) ? DEFAULT_LABEL_STRING : portStatus.getName();
final String parentId = StringUtils.isEmpty(portStatus.getGroupId()) ? DEFAULT_LABEL_STRING : portStatus.getGroupId(); final String parentId = StringUtils.isEmpty(portStatus.getGroupId()) ? DEFAULT_LABEL_STRING : portStatus.getGroupId();
final String portComponentType = "OutputPort"; final String portComponentType = "OutputPort";
AMOUNT_FLOWFILES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getFlowFilesSent()); nifiMetricsRegistry.setDataPoint(portStatus.getFlowFilesSent(), "AMOUNT_FLOWFILES_SENT", instanceId, portComponentType, portComponentName, portComponentId, parentId);
AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getFlowFilesReceived()); nifiMetricsRegistry.setDataPoint(portStatus.getFlowFilesReceived(), "AMOUNT_FLOWFILES_RECEIVED", instanceId, portComponentType, portComponentName, portComponentId, parentId);
AMOUNT_BYTES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesSent()); nifiMetricsRegistry.setDataPoint(portStatus.getBytesSent(), "AMOUNT_BYTES_SENT", instanceId, portComponentType, portComponentName, portComponentId, parentId);
AMOUNT_BYTES_READ.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getInputBytes()); nifiMetricsRegistry.setDataPoint(portStatus.getInputBytes(), "AMOUNT_BYTES_READ", instanceId, portComponentType, portComponentName, portComponentId, parentId);
AMOUNT_BYTES_WRITTEN.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getOutputBytes()); nifiMetricsRegistry.setDataPoint(portStatus.getOutputBytes(), "AMOUNT_BYTES_WRITTEN", instanceId, portComponentType, portComponentName, portComponentId, parentId);
TOTAL_BYTES_READ.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).inc(status.getBytesRead()); nifiMetricsRegistry.incrementCounter(status.getBytesRead(), "TOTAL_BYTES_READ", instanceId, portComponentType, portComponentName, portComponentId, parentId);
TOTAL_BYTES_WRITTEN.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).inc(status.getBytesWritten()); nifiMetricsRegistry.incrementCounter(status.getBytesWritten(), "TOTAL_BYTES_WRITTEN", instanceId, portComponentType, portComponentName, portComponentId, parentId);
AMOUNT_BYTES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesReceived()); nifiMetricsRegistry.setDataPoint(portStatus.getBytesReceived(), "AMOUNT_BYTES_RECEIVED", instanceId, portComponentType, portComponentName, portComponentId, parentId);
AMOUNT_ITEMS_OUTPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "") nifiMetricsRegistry.setDataPoint(portStatus.getOutputCount(), "AMOUNT_ITEMS_OUTPUT",
.set(portStatus.getOutputCount()); instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "");
AMOUNT_ITEMS_INPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "") nifiMetricsRegistry.setDataPoint(portStatus.getInputCount(), "AMOUNT_ITEMS_INPUT",
.set(portStatus.getInputCount()); instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "");
final Boolean isTransmitting = portStatus.isTransmitting(); final Boolean isTransmitting = portStatus.isTransmitting();
IS_TRANSMITTING.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, portStatus.getRunStatus().name()) nifiMetricsRegistry.setDataPoint(isTransmitting == null ? 0 : (isTransmitting ? 1 : 0), "IS_TRANSMITTING",
.set(isTransmitting == null ? 0 : (isTransmitting ? 1 : 0)); instanceId, portComponentType, portComponentName, portComponentId, parentId, portStatus.getRunStatus().name());
AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getActiveThreadCount()); nifiMetricsRegistry.setDataPoint(portStatus.getActiveThreadCount(), "AMOUNT_THREADS_TOTAL_ACTIVE", instanceId, portComponentType, portComponentName, portComponentId, parentId);
} }
for (RemoteProcessGroupStatus remoteProcessGroupStatus : status.getRemoteProcessGroupStatus()) { for (RemoteProcessGroupStatus remoteProcessGroupStatus : status.getRemoteProcessGroupStatus()) {
final String rpgComponentId = StringUtils.isEmpty(remoteProcessGroupStatus.getId()) ? DEFAULT_LABEL_STRING : remoteProcessGroupStatus.getId(); final String rpgComponentId = StringUtils.isEmpty(remoteProcessGroupStatus.getId()) ? DEFAULT_LABEL_STRING : remoteProcessGroupStatus.getId();
@ -564,54 +286,54 @@ public class PrometheusMetricsUtil {
final String parentId = StringUtils.isEmpty(remoteProcessGroupStatus.getGroupId()) ? DEFAULT_LABEL_STRING : remoteProcessGroupStatus.getGroupId(); final String parentId = StringUtils.isEmpty(remoteProcessGroupStatus.getGroupId()) ? DEFAULT_LABEL_STRING : remoteProcessGroupStatus.getGroupId();
final String rpgComponentType = "RemoteProcessGroup"; final String rpgComponentType = "RemoteProcessGroup";
AMOUNT_BYTES_WRITTEN.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getSentContentSize()); nifiMetricsRegistry.setDataPoint(remoteProcessGroupStatus.getSentContentSize(), "AMOUNT_BYTES_WRITTEN", instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId);
AMOUNT_BYTES_RECEIVED.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getReceivedContentSize()); nifiMetricsRegistry.setDataPoint(remoteProcessGroupStatus.getReceivedContentSize(), "AMOUNT_BYTES_RECEIVED", instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId);
AMOUNT_ITEMS_OUTPUT.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, "", "", "", "") nifiMetricsRegistry.setDataPoint(remoteProcessGroupStatus.getSentCount(), "AMOUNT_ITEMS_OUTPUT",
.set(remoteProcessGroupStatus.getSentCount()); instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, "", "", "", "");
AMOUNT_ITEMS_INPUT.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, "", "", "", "") nifiMetricsRegistry.setDataPoint(remoteProcessGroupStatus.getReceivedCount(), "AMOUNT_ITEMS_INPUT",
.set(remoteProcessGroupStatus.getReceivedCount()); instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, "", "", "", "");
ACTIVE_REMOTE_PORT_COUNT.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, "", "", "", "") nifiMetricsRegistry.setDataPoint(remoteProcessGroupStatus.getActiveRemotePortCount(), "ACTIVE_REMOTE_PORT_COUNT",
.set(remoteProcessGroupStatus.getActiveRemotePortCount()); instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, "", "", "", "");
INACTIVE_REMOTE_PORT_COUNT.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, "", "", "", "") nifiMetricsRegistry.setDataPoint(remoteProcessGroupStatus.getInactiveRemotePortCount(), "INACTIVE_REMOTE_PORT_COUNT",
.set(remoteProcessGroupStatus.getInactiveRemotePortCount()); instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, "", "", "", "");
AVERAGE_LINEAGE_DURATION.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, "", "", "", "") nifiMetricsRegistry.setDataPoint(remoteProcessGroupStatus.getAverageLineageDuration(), "AVERAGE_LINEAGE_DURATION",
.set(remoteProcessGroupStatus.getAverageLineageDuration()); instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, "", "", "", "");
IS_TRANSMITTING.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, remoteProcessGroupStatus.getTransmissionStatus().name()) nifiMetricsRegistry.setDataPoint(TransmissionStatus.Transmitting.equals(remoteProcessGroupStatus.getTransmissionStatus()) ? 1 : 0, "IS_TRANSMITTING",
.set(TransmissionStatus.Transmitting.equals(remoteProcessGroupStatus.getTransmissionStatus()) ? 1 : 0); instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, remoteProcessGroupStatus.getTransmissionStatus().name());
AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getActiveThreadCount()); nifiMetricsRegistry.setDataPoint(remoteProcessGroupStatus.getActiveThreadCount(), "AMOUNT_THREADS_TOTAL_ACTIVE",
instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId);
} }
} }
return NIFI_REGISTRY; return nifiMetricsRegistry.getRegistry();
} }
public static CollectorRegistry createJvmMetrics(JvmMetrics jvmMetrics, String instId) { public static CollectorRegistry createJvmMetrics(JvmMetricsRegistry jvmMetricsRegistry, JvmMetrics jvmMetrics, String instId) {
final String instanceId = StringUtils.isEmpty(instId) ? DEFAULT_LABEL_STRING : instId; final String instanceId = StringUtils.isEmpty(instId) ? DEFAULT_LABEL_STRING : instId;
JVM_HEAP_USED.labels(instanceId).set(jvmMetrics.heapUsed(DataUnit.B)); jvmMetricsRegistry.setDataPoint(jvmMetrics.heapUsed(DataUnit.B), "JVM_HEAP_USED", instanceId);
JVM_HEAP_USAGE.labels(instanceId).set(jvmMetrics.heapUsage()); jvmMetricsRegistry.setDataPoint(jvmMetrics.heapUsage(), "JVM_HEAP_USAGE", instanceId);
JVM_HEAP_NON_USAGE.labels(instanceId).set(jvmMetrics.nonHeapUsage()); jvmMetricsRegistry.setDataPoint(jvmMetrics.nonHeapUsage(), "JVM_HEAP_NON_USAGE", instanceId);
jvmMetricsRegistry.setDataPoint(jvmMetrics.threadCount(), "JVM_THREAD_COUNT", instanceId);
JVM_THREAD_COUNT.labels(instanceId).set(jvmMetrics.threadCount()); jvmMetricsRegistry.setDataPoint(jvmMetrics.daemonThreadCount(), "JVM_DAEMON_THREAD_COUNT", instanceId);
JVM_DAEMON_THREAD_COUNT.labels(instanceId).set(jvmMetrics.daemonThreadCount()); jvmMetricsRegistry.setDataPoint(jvmMetrics.uptime(), "JVM_UPTIME", instanceId);
jvmMetricsRegistry.setDataPoint(jvmMetrics.fileDescriptorUsage(), "JVM_FILE_DESCRIPTOR_USAGE", instanceId);
JVM_UPTIME.labels(instanceId).set(jvmMetrics.uptime());
JVM_FILE_DESCRIPTOR_USAGE.labels(instanceId).set(jvmMetrics.fileDescriptorUsage());
jvmMetrics.garbageCollectors() jvmMetrics.garbageCollectors()
.forEach((name, stat) -> { .forEach((name, stat) -> {
JVM_GC_RUNS.labels(instanceId, name).set(stat.getRuns()); jvmMetricsRegistry.setDataPoint(stat.getRuns(), "JVM_GC_RUNS", instanceId, name);
JVM_GC_TIME.labels(instanceId, name).set(stat.getTime(TimeUnit.MILLISECONDS)); jvmMetricsRegistry.setDataPoint(stat.getTime(TimeUnit.MILLISECONDS), "JVM_GC_TIME", instanceId, name);
}); });
return JVM_REGISTRY; return jvmMetricsRegistry.getRegistry();
} }
public static CollectorRegistry createConnectionStatusAnalyticsMetrics(StatusAnalytics statusAnalytics, String instId, String connComponentType, String connName, public static CollectorRegistry createConnectionStatusAnalyticsMetrics(ConnectionAnalyticsMetricsRegistry connectionAnalyticsMetricsRegistry, StatusAnalytics statusAnalytics,
String instId, String connComponentType, String connName,
String connId, String pgId, String srcId, String srcName, String destId, String destName) { String connId, String pgId, String srcId, String srcName, String destId, String destName) {
if(statusAnalytics != null) { if(statusAnalytics != null) {
final String instanceId = StringUtils.isEmpty(instId) ? DEFAULT_LABEL_STRING : instId; final String instanceId = StringUtils.isEmpty(instId) ? DEFAULT_LABEL_STRING : instId;
@ -625,20 +347,24 @@ public class PrometheusMetricsUtil {
Map<String, Long> predictions = statusAnalytics.getPredictions(); Map<String, Long> predictions = statusAnalytics.getPredictions();
TIME_TO_BYTES_BACKPRESSURE_PREDICTION.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName) connectionAnalyticsMetricsRegistry.setDataPoint(predictions.get("timeToBytesBackpressureMillis"),
.set(predictions.get("timeToBytesBackpressureMillis")); "TIME_TO_BYTES_BACKPRESSURE_PREDICTION",
TIME_TO_COUNT_BACKPRESSURE_PREDICTION.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName) instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName);
.set(predictions.get("timeToCountBackpressureMillis")); connectionAnalyticsMetricsRegistry.setDataPoint(predictions.get("timeToCountBackpressureMillis"),
BYTES_AT_NEXT_INTERVAL_PREDICTION.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName) "TIME_TO_COUNT_BACKPRESSURE_PREDICTION",
.set(predictions.get("nextIntervalBytes")); instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName);
COUNT_AT_NEXT_INTERVAL_PREDICTION.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName) connectionAnalyticsMetricsRegistry.setDataPoint(predictions.get("nextIntervalBytes"),
.set(predictions.get("nextIntervalCount")); "BYTES_AT_NEXT_INTERVAL_PREDICTION",
instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName);
connectionAnalyticsMetricsRegistry.setDataPoint(predictions.get("nextIntervalCount"),
"COUNT_AT_NEXT_INTERVAL_PREDICTION",
instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName);
} }
return CONNECTION_ANALYTICS_REGISTRY; return connectionAnalyticsMetricsRegistry.getRegistry();
} }
public static CollectorRegistry createBulletinMetrics(String instId, String compType, String compId, String pgId, String nodeAddr, public static CollectorRegistry createBulletinMetrics(BulletinMetricsRegistry bulletinMetricsRegistry, String instId, String compType, String compId, String pgId, String nodeAddr,
String cat, String srcName, String srcId, String lvl) { String cat, String srcName, String srcId, String lvl) {
final String instanceId = StringUtils.isEmpty(instId) ? DEFAULT_LABEL_STRING : instId; final String instanceId = StringUtils.isEmpty(instId) ? DEFAULT_LABEL_STRING : instId;
final String componentType = StringUtils.isEmpty(compType) ? DEFAULT_LABEL_STRING : compType; final String componentType = StringUtils.isEmpty(compType) ? DEFAULT_LABEL_STRING : compType;
@ -649,7 +375,7 @@ public class PrometheusMetricsUtil {
final String category = StringUtils.isEmpty(cat) ? DEFAULT_LABEL_STRING : cat; final String category = StringUtils.isEmpty(cat) ? DEFAULT_LABEL_STRING : cat;
final String parentId = StringUtils.isEmpty(pgId) ? DEFAULT_LABEL_STRING : pgId; final String parentId = StringUtils.isEmpty(pgId) ? DEFAULT_LABEL_STRING : pgId;
final String level = StringUtils.isEmpty(lvl) ? DEFAULT_LABEL_STRING : lvl; final String level = StringUtils.isEmpty(lvl) ? DEFAULT_LABEL_STRING : lvl;
BULLETIN.labels(instanceId, componentType, componentId, parentId, nodeAddress, category, sourceName, sourceId, level).set(1); bulletinMetricsRegistry.setDataPoint(1, "BULLETIN", instanceId, componentType, componentId, parentId, nodeAddress, category, sourceName, sourceId, level);
return BULLETIN_REGISTRY; return bulletinMetricsRegistry.getRegistry();
} }
} }

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.nifi.web; package org.apache.nifi.web;
import io.prometheus.client.CollectorRegistry;
import org.apache.nifi.authorization.AuthorizeAccess; import org.apache.nifi.authorization.AuthorizeAccess;
import org.apache.nifi.authorization.RequestAction; import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.authorization.user.NiFiUser;
@ -317,7 +318,7 @@ public interface NiFiServiceFacade {
/** /**
* Gets the metrics for the flow. * Gets the metrics for the flow.
*/ */
void generateFlowMetrics(); Collection<CollectorRegistry> generateFlowMetrics();
/** /**
* Updates the configuration for this controller. * Updates the configuration for this controller.

View File

@ -17,6 +17,7 @@
package org.apache.nifi.web; package org.apache.nifi.web;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import io.prometheus.client.CollectorRegistry;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import org.apache.nifi.action.Action; import org.apache.nifi.action.Action;
import org.apache.nifi.action.Component; import org.apache.nifi.action.Component;
@ -101,6 +102,10 @@ import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.parameter.ParameterDescriptor; import org.apache.nifi.parameter.ParameterDescriptor;
import org.apache.nifi.parameter.ParameterReferenceManager; import org.apache.nifi.parameter.ParameterReferenceManager;
import org.apache.nifi.parameter.StandardParameterContext; import org.apache.nifi.parameter.StandardParameterContext;
import org.apache.nifi.prometheus.util.BulletinMetricsRegistry;
import org.apache.nifi.prometheus.util.ConnectionAnalyticsMetricsRegistry;
import org.apache.nifi.prometheus.util.JvmMetricsRegistry;
import org.apache.nifi.prometheus.util.NiFiMetricsRegistry;
import org.apache.nifi.prometheus.util.PrometheusMetricsUtil; import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
import org.apache.nifi.registry.ComponentVariableRegistry; import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.registry.authorization.Permissions; import org.apache.nifi.registry.authorization.Permissions;
@ -393,6 +398,20 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
private AuthorizableLookup authorizableLookup; private AuthorizableLookup authorizableLookup;
// Prometheus Metrics objects
private NiFiMetricsRegistry nifiMetricsRegistry = new NiFiMetricsRegistry();
private JvmMetricsRegistry jvmMetricsRegistry = new JvmMetricsRegistry();
private ConnectionAnalyticsMetricsRegistry connectionAnalyticsMetricsRegistry = new ConnectionAnalyticsMetricsRegistry();
private BulletinMetricsRegistry bulletinMetricsRegistry = new BulletinMetricsRegistry();
public final Collection<CollectorRegistry> ALL_REGISTRIES = Arrays.asList(
nifiMetricsRegistry.getRegistry(),
jvmMetricsRegistry.getRegistry(),
connectionAnalyticsMetricsRegistry.getRegistry(),
bulletinMetricsRegistry.getRegistry()
);
// ----------------------------------------- // -----------------------------------------
// Synchronization methods // Synchronization methods
// ----------------------------------------- // -----------------------------------------
@ -5297,20 +5316,20 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
} }
@Override @Override
public void generateFlowMetrics() { public Collection<CollectorRegistry> generateFlowMetrics() {
String instanceId = controllerFacade.getInstanceId(); String instanceId = controllerFacade.getInstanceId();
ProcessGroupStatus rootPGStatus = controllerFacade.getProcessGroupStatus("root"); ProcessGroupStatus rootPGStatus = controllerFacade.getProcessGroupStatus("root");
PrometheusMetricsUtil.createNifiMetrics(rootPGStatus, instanceId, "", "RootProcessGroup", PrometheusMetricsUtil.createNifiMetrics(nifiMetricsRegistry, rootPGStatus, instanceId, "", "RootProcessGroup",
PrometheusMetricsUtil.METRICS_STRATEGY_COMPONENTS.getValue()); PrometheusMetricsUtil.METRICS_STRATEGY_COMPONENTS.getValue());
PrometheusMetricsUtil.createJvmMetrics(JmxJvmMetrics.getInstance(), instanceId); PrometheusMetricsUtil.createJvmMetrics(jvmMetricsRegistry, JmxJvmMetrics.getInstance(), instanceId);
// Get Connection Status Analytics (predictions, e.g.) // Get Connection Status Analytics (predictions, e.g.)
Set<Connection> connections = controllerFacade.getFlowManager().findAllConnections(); Set<Connection> connections = controllerFacade.getFlowManager().findAllConnections();
for (Connection c : connections) { for (Connection c : connections) {
// If a ResourceNotFoundException is thrown, analytics hasn't been enabled // If a ResourceNotFoundException is thrown, analytics hasn't been enabled
try { try {
PrometheusMetricsUtil.createConnectionStatusAnalyticsMetrics(controllerFacade.getConnectionStatusAnalytics(c.getIdentifier()), PrometheusMetricsUtil.createConnectionStatusAnalyticsMetrics(connectionAnalyticsMetricsRegistry, controllerFacade.getConnectionStatusAnalytics(c.getIdentifier()),
instanceId, instanceId,
"Connection", "Connection",
c.getName(), c.getName(),
@ -5332,7 +5351,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
for(BulletinEntity bulletinEntity : bulletinBoardDTO.getBulletins()) { for(BulletinEntity bulletinEntity : bulletinBoardDTO.getBulletins()) {
BulletinDTO bulletin = bulletinEntity.getBulletin(); BulletinDTO bulletin = bulletinEntity.getBulletin();
if(bulletin != null) { if(bulletin != null) {
PrometheusMetricsUtil.createBulletinMetrics(instanceId, PrometheusMetricsUtil.createBulletinMetrics(bulletinMetricsRegistry, instanceId,
"Bulletin", "Bulletin",
String.valueOf(bulletin.getId()), String.valueOf(bulletin.getId()),
bulletin.getGroupId() == null ? "" : bulletin.getGroupId(), bulletin.getGroupId() == null ? "" : bulletin.getGroupId(),
@ -5344,6 +5363,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
); );
} }
} }
return ALL_REGISTRIES;
} }
@Override @Override

View File

@ -45,7 +45,6 @@ import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.nar.NarClassLoadersHolder; import org.apache.nifi.nar.NarClassLoadersHolder;
import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
import org.apache.nifi.registry.client.NiFiRegistryException; import org.apache.nifi.registry.client.NiFiRegistryException;
import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.IllegalClusterResourceRequestException; import org.apache.nifi.web.IllegalClusterResourceRequestException;
@ -138,6 +137,7 @@ import java.io.OutputStreamWriter;
import java.io.Writer; import java.io.Writer;
import java.text.Collator; import java.text.Collator;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator; import java.util.Comparator;
import java.util.Date; import java.util.Date;
import java.util.EnumSet; import java.util.EnumSet;
@ -425,11 +425,11 @@ public class FlowResource extends ApplicationResource {
if ("prometheus".equalsIgnoreCase(producer)) { if ("prometheus".equalsIgnoreCase(producer)) {
// get this process group flow // get this process group flow
serviceFacade.generateFlowMetrics(); final Collection<CollectorRegistry> allRegistries = serviceFacade.generateFlowMetrics();
// generate a streaming response // generate a streaming response
final StreamingOutput response = output -> { final StreamingOutput response = output -> {
Writer writer = new BufferedWriter(new OutputStreamWriter(output)); Writer writer = new BufferedWriter(new OutputStreamWriter(output));
for (CollectorRegistry collectorRegistry : PrometheusMetricsUtil.ALL_REGISTRIES) { for (CollectorRegistry collectorRegistry : allRegistries) {
TextFormat.write004(writer, collectorRegistry.metricFamilySamples()); TextFormat.write004(writer, collectorRegistry.metricFamilySamples());
// flush the response // flush the response
output.flush(); output.flush();

View File

@ -34,6 +34,8 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.metrics.jvm.JmxJvmMetrics; import org.apache.nifi.metrics.jvm.JmxJvmMetrics;
import org.apache.nifi.prometheus.util.JvmMetricsRegistry;
import org.apache.nifi.prometheus.util.NiFiMetricsRegistry;
import org.apache.nifi.reporting.AbstractReportingTask; import org.apache.nifi.reporting.AbstractReportingTask;
import org.apache.nifi.reporting.ReportingContext; import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.prometheus.util.PrometheusMetricsUtil; import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
@ -128,13 +130,15 @@ public class PrometheusReportingTask extends AbstractReportingTask {
ProcessGroupStatus rootGroupStatus = reportingContext.getEventAccess().getControllerStatus(); ProcessGroupStatus rootGroupStatus = reportingContext.getEventAccess().getControllerStatus();
String instanceId = reportingContext.getProperty(PrometheusMetricsUtil.INSTANCE_ID).evaluateAttributeExpressions().getValue(); String instanceId = reportingContext.getProperty(PrometheusMetricsUtil.INSTANCE_ID).evaluateAttributeExpressions().getValue();
String metricsStrategy = reportingContext.getProperty(METRICS_STRATEGY).getValue(); String metricsStrategy = reportingContext.getProperty(METRICS_STRATEGY).getValue();
return PrometheusMetricsUtil.createNifiMetrics(rootGroupStatus, instanceId, "", "RootProcessGroup", metricsStrategy); NiFiMetricsRegistry nifiMetricsRegistry = new NiFiMetricsRegistry();
return PrometheusMetricsUtil.createNifiMetrics(nifiMetricsRegistry, rootGroupStatus, instanceId, "", "RootProcessGroup", metricsStrategy);
}; };
metricsCollectors.add(nifiMetrics); metricsCollectors.add(nifiMetrics);
if (context.getProperty(SEND_JVM_METRICS).asBoolean()) { if (context.getProperty(SEND_JVM_METRICS).asBoolean()) {
Function<ReportingContext, CollectorRegistry> jvmMetrics = (reportingContext) -> { Function<ReportingContext, CollectorRegistry> jvmMetrics = (reportingContext) -> {
String instanceId = reportingContext.getProperty(PrometheusMetricsUtil.INSTANCE_ID).evaluateAttributeExpressions().getValue(); String instanceId = reportingContext.getProperty(PrometheusMetricsUtil.INSTANCE_ID).evaluateAttributeExpressions().getValue();
return PrometheusMetricsUtil.createJvmMetrics(JmxJvmMetrics.getInstance(), instanceId); JvmMetricsRegistry jvmMetricsRegistry = new JvmMetricsRegistry();
return PrometheusMetricsUtil.createJvmMetrics(jvmMetricsRegistry, JmxJvmMetrics.getInstance(), instanceId);
}; };
metricsCollectors.add(jvmMetrics); metricsCollectors.add(jvmMetrics);
} }