From 8771c35f4a01c38ab94ab80b96997ee5f82af094 Mon Sep 17 00:00:00 2001 From: Matthew Burgess Date: Thu, 24 Oct 2019 11:53:28 -0400 Subject: [PATCH] NIFI-6799: Add PrometheusRecordSink and reset() method to RecordSinkService (#3839) This closes #3839 --- .../nifi-prometheus-reporting-task/pom.xml | 15 ++ .../prometheus/PrometheusRecordSink.java | 208 ++++++++++++++++++ .../prometheus/PrometheusReportingTask.java | 97 +++----- .../prometheus/PrometheusServer.java | 48 ++-- .../prometheus/api/PrometheusMetricsUtil.java | 52 +++++ ...g.apache.nifi.controller.ControllerService | 15 ++ .../prometheus/TestPrometheusRecordSink.java | 140 ++++++++++++ .../TestPrometheusReportingTask.java | 8 +- .../reporting/sql/QueryNiFiReportingTask.java | 13 +- .../sql/TestQueryNiFiReportingTask.java | 6 +- .../nifi/record/sink/RecordSinkService.java | 18 ++ 11 files changed, 512 insertions(+), 108 deletions(-) create mode 100644 nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusRecordSink.java create mode 100644 nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService create mode 100644 nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusRecordSink.java diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/pom.xml b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/pom.xml index 90f141c213..ff4804a86a 100644 --- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/pom.xml +++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/pom.xml @@ -40,6 +40,21 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/ma nifi-utils 1.10.0-SNAPSHOT + + org.apache.nifi + nifi-record-sink-api + 1.10.0-SNAPSHOT + + + org.apache.nifi + nifi-record-serialization-service-api + 1.10.0-SNAPSHOT + + + org.apache.nifi + nifi-record + 1.10.0-SNAPSHOT + io.prometheus diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusRecordSink.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusRecordSink.java new file mode 100644 index 0000000000..75ca809ada --- /dev/null +++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusRecordSink.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting.prometheus; + +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.Gauge; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.record.sink.RecordSinkService; +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.ssl.SSLContextService; +import org.eclipse.jetty.server.Server; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; + +@Tags({"record", "send", "write", "prometheus"}) +@CapabilityDescription("Specifies a Record Sink Service that exposes data points to a Prometheus scraping service. Numeric fields are exposed as Gauges, String fields are the " + + "label values for the gauges, and all other fields are ignored.") + +public class PrometheusRecordSink extends AbstractControllerService implements RecordSinkService { + + private volatile PrometheusServer prometheusServer; + private volatile RecordSchema recordSchema; + private volatile String[] labelNames; + private volatile Map gauges; + private static final CollectorRegistry RECORD_REGISTRY = new CollectorRegistry(); + + private static final List properties; + + static { + List props = new ArrayList<>(); + props.add(PrometheusMetricsUtil.METRICS_ENDPOINT_PORT); + props.add(PrometheusMetricsUtil.INSTANCE_ID); + props.add(PrometheusMetricsUtil.SSL_CONTEXT); + props.add(PrometheusMetricsUtil.CLIENT_AUTH); + properties = Collections.unmodifiableList(props); + } + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + @OnEnabled + public void onScheduled(final ConfigurationContext context) { + RECORD_REGISTRY.clear(); + SSLContextService sslContextService = context.getProperty(PrometheusMetricsUtil.SSL_CONTEXT).asControllerService(SSLContextService.class); + final String metricsEndpointPort = context.getProperty(PrometheusMetricsUtil.METRICS_ENDPOINT_PORT).getValue(); + + try { + List> metricsCollectors = new ArrayList<>(); + if (sslContextService == null) { + prometheusServer = new PrometheusServer(new InetSocketAddress(Integer.parseInt(metricsEndpointPort)), getLogger()); + } else { + final String clientAuthValue = context.getProperty(PrometheusMetricsUtil.CLIENT_AUTH).getValue(); + final boolean need; + final boolean want; + if (PrometheusMetricsUtil.CLIENT_NEED.getValue().equals(clientAuthValue)) { + need = true; + want = false; + } else if (PrometheusMetricsUtil.CLIENT_WANT.getValue().equals(clientAuthValue)) { + need = false; + want = true; + } else { + need = false; + want = false; + } + prometheusServer = new PrometheusServer(Integer.parseInt(metricsEndpointPort), sslContextService, getLogger(), need, want); + } + Function nifiMetrics = (reportingContext) -> RECORD_REGISTRY; + metricsCollectors.add(nifiMetrics); + + prometheusServer.setMetricsCollectors(metricsCollectors); + getLogger().info("Started JETTY server"); + } catch (Exception e) { + getLogger().error("Failed to start Jetty server", e); + } + } + + @Override + public WriteResult sendData(RecordSet recordSet, Map attributes, boolean sendZeroResults) throws IOException { + WriteResult writeResult = null; + if (recordSchema == null) { + // The first time through, create the registry, then create the Gauges and register them + recordSchema = recordSet.getSchema(); + RECORD_REGISTRY.clear(); + + // String fields are labels, collect them first + labelNames = recordSchema.getFields().stream().filter( + (f) -> isLabel(f.getDataType().getFieldType())).map(RecordField::getFieldName).toArray(String[]::new); + + gauges = new HashMap<>(); + recordSchema.getFields().stream().filter((field) -> isNumeric(field.getDataType().getFieldType())).forEach( + // Create, register, and add gauge to the list + (field) -> gauges.put(field.getFieldName(), Gauge.build() + .name(field.getFieldName()) + .help("Metric for " + field.getFieldName()) + .labelNames(labelNames) + .register(RECORD_REGISTRY)) + ); + } + int recordCount = 0; + Record r; + while ((r = recordSet.next()) != null) { + final Record record = r; + // Get label values, set empty strings for null values + String[] labelValues = Arrays.stream(labelNames).map((labelName) -> { + String value = record.getAsString(labelName); + return (value != null) ? value : ""; + }).toArray(String[]::new); + + // Get value for each gauge and update the data point + + gauges.forEach((name, gauge) -> { + Optional dataType = record.getSchema().getDataType(name); + if (dataType.isPresent()) { + RecordFieldType recordFieldType = dataType.get().getFieldType(); + // Change boolean fields to doubles + final double value; + if (RecordFieldType.BOOLEAN.equals(recordFieldType)) { + value = record.getAsBoolean(name) ? 1.0 : 0.0; + } else { + value = record.getAsDouble(name); + } + gauge.labels(labelValues).set(value); + } + }); + recordCount++; + } + attributes.put("record.count", Integer.toString(recordCount)); + writeResult = WriteResult.of(recordCount, attributes); + return writeResult; + } + + @OnDisabled + public void onStopped() throws Exception { + Server server = prometheusServer.getServer(); + server.stop(); + recordSchema = null; + } + + @OnShutdown + public void onShutDown() throws Exception { + Server server = prometheusServer.getServer(); + server.stop(); + recordSchema = null; + } + + @Override + public void reset() { + // Reset the schema in order to support different RecordSet schemas + recordSchema = null; + } + + private boolean isNumeric(RecordFieldType dataType) { + // Numeric fields are metrics + return RecordFieldType.INT.equals(dataType) + || RecordFieldType.SHORT.equals(dataType) + || RecordFieldType.LONG.equals(dataType) + || RecordFieldType.BIGINT.equals(dataType) + || RecordFieldType.FLOAT.equals(dataType) + || RecordFieldType.DOUBLE.equals(dataType) + || RecordFieldType.BOOLEAN.equals(dataType); + + } + + private boolean isLabel(RecordFieldType dataType) { + return RecordFieldType.STRING.equals(dataType) + || RecordFieldType.CHAR.equals(dataType); + } +} diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTask.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTask.java index 7dcfe739f7..c8a269a1b5 100644 --- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTask.java +++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTask.java @@ -21,7 +21,9 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.function.Function; +import io.prometheus.client.CollectorRegistry; import org.apache.nifi.annotation.configuration.DefaultSchedule; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; @@ -29,14 +31,13 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnShutdown; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.AllowableValue; import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.expression.ExpressionLanguageScope; -import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.metrics.jvm.JmxJvmMetrics; import org.apache.nifi.reporting.AbstractReportingTask; import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil; import org.apache.nifi.scheduling.SchedulingStrategy; -import org.apache.nifi.ssl.RestrictedSSLContextService; import org.apache.nifi.ssl.SSLContextService; import org.eclipse.jetty.server.Server; @@ -48,39 +49,9 @@ import static org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil.MET @CapabilityDescription("Reports metrics in Prometheus format by creating /metrics http endpoint which can be used for external monitoring of the application." + " The reporting task reports a set of metrics regarding the JVM (optional) and the NiFi instance") @DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "60 sec") - public class PrometheusReportingTask extends AbstractReportingTask { private PrometheusServer prometheusServer; - private SSLContextService sslContextService; - - public static final AllowableValue CLIENT_NONE = new AllowableValue("No Authentication", "No Authentication", - "ReportingTask will not authenticate clients. Anyone can communicate with this ReportingTask anonymously"); - public static final AllowableValue CLIENT_WANT = new AllowableValue("Want Authentication", "Want Authentication", - "ReportingTask will try to verify the client but if unable to verify will allow the client to communicate anonymously"); - public static final AllowableValue CLIENT_NEED = new AllowableValue("Need Authentication", "Need Authentication", - "ReportingTask will reject communications from any client unless the client provides a certificate that is trusted by the TrustStore" - + "specified in the SSL Context Service"); - - public static final PropertyDescriptor METRICS_ENDPOINT_PORT = new PropertyDescriptor.Builder() - .name("prometheus-reporting-task-metrics-endpoint-port") - .displayName("Prometheus Metrics Endpoint Port") - .description("The Port where prometheus metrics can be accessed") - .required(true) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) - .defaultValue("9092") - .addValidator(StandardValidators.INTEGER_VALIDATOR) - .build(); - - public static final PropertyDescriptor INSTANCE_ID = new PropertyDescriptor.Builder() - .name("prometheus-reporting-task-instance-id") - .displayName("Instance ID") - .description("Id of this NiFi instance to be included in the metrics sent to Prometheus") - .required(true) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) - .defaultValue("${hostname(true)}") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); public static final PropertyDescriptor METRICS_STRATEGY = new PropertyDescriptor.Builder() .name("prometheus-reporting-task-metrics-strategy") @@ -100,35 +71,16 @@ public class PrometheusReportingTask extends AbstractReportingTask { .required(true) .build(); - public static final PropertyDescriptor SSL_CONTEXT = new PropertyDescriptor.Builder() - .name("prometheus-reporting-task-ssl-context") - .displayName("SSL Context Service") - .description("The SSL Context Service to use in order to secure the server. If specified, the server will" - + "accept only HTTPS requests; otherwise, the server will accept only HTTP requests") - .required(false) - .identifiesControllerService(RestrictedSSLContextService.class) - .build(); - - public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder() - .name("prometheus-reporting-task-client-auth") - .displayName("Client Authentication") - .description("Specifies whether or not the Reporting Task should authenticate clients. This value is ignored if the " - + "Property is not specified or the SSL Context provided uses only a KeyStore and not a TrustStore.") - .required(true) - .allowableValues(CLIENT_NONE, CLIENT_WANT, CLIENT_NEED) - .defaultValue(CLIENT_NONE.getValue()) - .build(); - private static final List properties; static { List props = new ArrayList<>(); - props.add(METRICS_ENDPOINT_PORT); - props.add(INSTANCE_ID); + props.add(PrometheusMetricsUtil.METRICS_ENDPOINT_PORT); + props.add(PrometheusMetricsUtil.INSTANCE_ID); props.add(METRICS_STRATEGY); props.add(SEND_JVM_METRICS); - props.add(SSL_CONTEXT); - props.add(CLIENT_AUTH); + props.add(PrometheusMetricsUtil.SSL_CONTEXT); + props.add(PrometheusMetricsUtil.CLIENT_AUTH); properties = Collections.unmodifiableList(props); } @@ -139,30 +91,44 @@ public class PrometheusReportingTask extends AbstractReportingTask { @OnScheduled public void onScheduled(final ConfigurationContext context) { - sslContextService = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class); - final String metricsEndpointPort = context.getProperty(METRICS_ENDPOINT_PORT).getValue(); + SSLContextService sslContextService = context.getProperty(PrometheusMetricsUtil.SSL_CONTEXT).asControllerService(SSLContextService.class); + final String metricsEndpointPort = context.getProperty(PrometheusMetricsUtil.METRICS_ENDPOINT_PORT).getValue(); try { + List> metricsCollectors = new ArrayList<>(); if (sslContextService == null) { this.prometheusServer = new PrometheusServer(new InetSocketAddress(Integer.parseInt(metricsEndpointPort)), getLogger()); } else { - final String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue(); + final String clientAuthValue = context.getProperty(PrometheusMetricsUtil.CLIENT_AUTH).getValue(); final boolean need; final boolean want; - if (CLIENT_NEED.equals(clientAuthValue)) { + if (PrometheusMetricsUtil.CLIENT_NEED.getValue().equals(clientAuthValue)) { need = true; want = false; - } else if (CLIENT_WANT.equals(clientAuthValue)) { + } else if (PrometheusMetricsUtil.CLIENT_WANT.getValue().equals(clientAuthValue)) { need = false; want = true; } else { need = false; want = false; } - this.prometheusServer = new PrometheusServer(Integer.parseInt(metricsEndpointPort),sslContextService, getLogger(), need, want); + this.prometheusServer = new PrometheusServer(Integer.parseInt(metricsEndpointPort), sslContextService, getLogger(), need, want); } - this.prometheusServer.setInstanceId(context.getProperty(INSTANCE_ID).evaluateAttributeExpressions().getValue()); - this.prometheusServer.setSendJvmMetrics(context.getProperty(SEND_JVM_METRICS).asBoolean()); + Function nifiMetrics = (reportingContext) -> { + ProcessGroupStatus rootGroupStatus = reportingContext.getEventAccess().getControllerStatus(); + String instanceId = reportingContext.getProperty(PrometheusMetricsUtil.INSTANCE_ID).evaluateAttributeExpressions().getValue(); + String metricsStrategy = reportingContext.getProperty(METRICS_STRATEGY).getValue(); + return PrometheusMetricsUtil.createNifiMetrics(rootGroupStatus, instanceId, "", "RootProcessGroup", metricsStrategy); + }; + metricsCollectors.add(nifiMetrics); + if (context.getProperty(SEND_JVM_METRICS).asBoolean()) { + Function jvmMetrics = (reportingContext) -> { + String instanceId = reportingContext.getProperty(PrometheusMetricsUtil.INSTANCE_ID).evaluateAttributeExpressions().getValue(); + return PrometheusMetricsUtil.createJvmMetrics(JmxJvmMetrics.getInstance(), instanceId); + }; + metricsCollectors.add(jvmMetrics); + } + this.prometheusServer.setMetricsCollectors(metricsCollectors); getLogger().info("Started JETTY server"); } catch (Exception e) { getLogger().error("Failed to start Jetty server", e); @@ -184,6 +150,5 @@ public class PrometheusReportingTask extends AbstractReportingTask { @Override public void onTrigger(final ReportingContext context) { this.prometheusServer.setReportingContext(context); - this.prometheusServer.setMetricsStrategy(context.getProperty(METRICS_STRATEGY).getValue()); } } diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusServer.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusServer.java index 521d9cab84..50d5ee1bc3 100644 --- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusServer.java +++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusServer.java @@ -21,6 +21,9 @@ import java.io.IOException; import java.io.OutputStreamWriter; import java.net.HttpURLConnection; import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; import javax.servlet.ServletException; import javax.servlet.ServletOutputStream; @@ -28,11 +31,8 @@ import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.metrics.jvm.JmxJvmMetrics; import org.apache.nifi.reporting.ReportingContext; -import org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil; import org.apache.nifi.ssl.SSLContextService; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.HttpConfiguration; @@ -53,13 +53,10 @@ public class PrometheusServer { private Server server; private ServletContextHandler handler; private ReportingContext context; - private String metricsStrategy; - private boolean sendJvmMetrics; - private String instanceId; + + private List> metricsCollectors; class MetricsServlet extends HttpServlet { - private CollectorRegistry nifiRegistry, jvmRegistry; - private ProcessGroupStatus rootGroupStatus; @Override protected void doGet(final HttpServletRequest req, final HttpServletResponse resp) throws ServletException, IOException { @@ -67,16 +64,12 @@ public class PrometheusServer { logger.debug("PrometheusServer Do get called"); } - rootGroupStatus = PrometheusServer.this.context.getEventAccess().getControllerStatus(); ServletOutputStream response = resp.getOutputStream(); OutputStreamWriter osw = new OutputStreamWriter(response); - nifiRegistry = PrometheusMetricsUtil.createNifiMetrics(rootGroupStatus, PrometheusServer.this.instanceId, "", "RootProcessGroup", metricsStrategy); - TextFormat.write004(osw, nifiRegistry.metricFamilySamples()); - - if (PrometheusServer.this.sendJvmMetrics == true) { - jvmRegistry = PrometheusMetricsUtil.createJvmMetrics(JmxJvmMetrics.getInstance(), PrometheusServer.this.instanceId); - TextFormat.write004(osw, jvmRegistry.metricFamilySamples()); + for(Function mc : metricsCollectors) { + CollectorRegistry collectorRegistry = mc.apply(getReportingContext()); + TextFormat.write004(osw, collectorRegistry.metricFamilySamples()); } osw.flush(); @@ -91,6 +84,7 @@ public class PrometheusServer { public PrometheusServer(InetSocketAddress addr, ComponentLog logger) throws Exception { PrometheusServer.logger = logger; + metricsCollectors = Collections.emptyList(); this.server = new Server(addr); this.handler = new ServletContextHandler(server, "/metrics"); @@ -148,31 +142,15 @@ public class PrometheusServer { return this.context; } - public boolean getSendJvmMetrics() { - return this.sendJvmMetrics; - } - - public String getInstanceId() { - return this.instanceId; - } - public void setReportingContext(ReportingContext rc) { this.context = rc; } - public void setSendJvmMetrics(boolean jvm) { - this.sendJvmMetrics = jvm; + public List> getMetricsCollectors() { + return metricsCollectors; } - public void setInstanceId(String iid) { - this.instanceId = iid; - } - - public String getMetricsStrategy() { - return metricsStrategy; - } - - public void setMetricsStrategy(String metricsStrategy) { - this.metricsStrategy = metricsStrategy; + public void setMetricsCollectors(List> metricsCollectors) { + this.metricsCollectors = metricsCollectors; } } diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java index fec33383ef..14372394a7 100644 --- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java +++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java @@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit; import io.prometheus.client.SimpleCollector; import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.status.ConnectionStatus; import org.apache.nifi.controller.status.PortStatus; import org.apache.nifi.controller.status.ProcessGroupStatus; @@ -33,8 +34,11 @@ import io.prometheus.client.CollectorRegistry; import io.prometheus.client.Gauge; import org.apache.nifi.controller.status.RemoteProcessGroupStatus; import org.apache.nifi.controller.status.TransmissionStatus; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.metrics.jvm.JvmMetrics; import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.ssl.RestrictedSSLContextService; public class PrometheusMetricsUtil { @@ -48,6 +52,54 @@ public class PrometheusMetricsUtil { private static final CollectorRegistry NIFI_REGISTRY = new CollectorRegistry(); private static final CollectorRegistry JVM_REGISTRY = new CollectorRegistry(); + // Common properties/values + public static final AllowableValue CLIENT_NONE = new AllowableValue("No Authentication", "No Authentication", + "ReportingTask will not authenticate clients. Anyone can communicate with this ReportingTask anonymously"); + public static final AllowableValue CLIENT_WANT = new AllowableValue("Want Authentication", "Want Authentication", + "ReportingTask will try to verify the client but if unable to verify will allow the client to communicate anonymously"); + public static final AllowableValue CLIENT_NEED = new AllowableValue("Need Authentication", "Need Authentication", + "ReportingTask will reject communications from any client unless the client provides a certificate that is trusted by the TrustStore" + + "specified in the SSL Context Service"); + + public static final PropertyDescriptor METRICS_ENDPOINT_PORT = new PropertyDescriptor.Builder() + .name("prometheus-reporting-task-metrics-endpoint-port") + .displayName("Prometheus Metrics Endpoint Port") + .description("The Port where prometheus metrics can be accessed") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .defaultValue("9092") + .addValidator(StandardValidators.INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor INSTANCE_ID = new PropertyDescriptor.Builder() + .name("prometheus-reporting-task-instance-id") + .displayName("Instance ID") + .description("Id of this NiFi instance to be included in the metrics sent to Prometheus") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .defaultValue("${hostname(true)}") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor SSL_CONTEXT = new PropertyDescriptor.Builder() + .name("prometheus-reporting-task-ssl-context") + .displayName("SSL Context Service") + .description("The SSL Context Service to use in order to secure the server. If specified, the server will" + + "accept only HTTPS requests; otherwise, the server will accept only HTTP requests") + .required(false) + .identifiesControllerService(RestrictedSSLContextService.class) + .build(); + + public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder() + .name("prometheus-reporting-task-client-auth") + .displayName("Client Authentication") + .description("Specifies whether or not the Reporting Task should authenticate clients. This value is ignored if the " + + "Property is not specified or the SSL Context provided uses only a KeyStore and not a TrustStore.") + .required(true) + .allowableValues(CLIENT_NONE, CLIENT_WANT, CLIENT_NEED) + .defaultValue(CLIENT_NONE.getValue()) + .build(); + // Processor / Process Group metrics private static final Gauge AMOUNT_FLOWFILES_SENT = Gauge.build() .name("nifi_amount_flowfiles_sent") diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService new file mode 100644 index 0000000000..f6ee427c11 --- /dev/null +++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.reporting.prometheus.PrometheusRecordSink \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusRecordSink.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusRecordSink.java new file mode 100644 index 0000000000..1aaf095449 --- /dev/null +++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusRecordSink.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting.prometheus; + + +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.util.EntityUtils; +import org.apache.nifi.attribute.expression.language.StandardPropertyValue; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.ControllerServiceInitializationContext; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.ListRecordSet; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.state.MockStateManager; +import org.apache.nifi.util.MockControllerServiceInitializationContext; +import org.apache.nifi.util.MockPropertyValue; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestPrometheusRecordSink { + + private static final String portString = "7077"; + + @Test + public void testSendData() throws IOException, InitializationException { + PrometheusRecordSink sink = initTask(); + + List recordFields = Arrays.asList( + new RecordField("field1", RecordFieldType.INT.getDataType()), + new RecordField("field2", RecordFieldType.STRING.getDataType()) + ); + RecordSchema recordSchema = new SimpleRecordSchema(recordFields); + + Map row1 = new HashMap<>(); + row1.put("field1", 15); + row1.put("field2", "Hello"); + + Map row2 = new HashMap<>(); + row2.put("field1", 6); + row2.put("field2", "World!"); + + RecordSet recordSet = new ListRecordSet(recordSchema, Arrays.asList( + new MapRecord(recordSchema, row1), + new MapRecord(recordSchema, row2) + )); + + Map attributes = new HashMap<>(); + attributes.put("a", "Hello"); + WriteResult writeResult = sink.sendData(recordSet, attributes, true); + assertNotNull(writeResult); + assertEquals(2, writeResult.getRecordCount()); + assertEquals("Hello", writeResult.getAttributes().get("a")); + + final String content = getMetrics(); + assertTrue(content.contains("field1{field2=\"Hello\",} 15.0\nfield1{field2=\"World!\",} 6.0\n")); + + try { + sink.onStopped(); + } catch (Exception e) { + // Do nothing, just need to shut down the server before the next run + } + } + + private String getMetrics() throws IOException { + URL url = new URL("http://localhost:" + portString + "/metrics"); + HttpURLConnection con = (HttpURLConnection) url.openConnection(); + con.setRequestMethod("GET"); + int status = con.getResponseCode(); + Assert.assertEquals(HttpURLConnection.HTTP_OK, status); + + HttpClient client = HttpClientBuilder.create().build(); + HttpGet request = new HttpGet("http://localhost:" + portString + "/metrics"); + HttpResponse response = client.execute(request); + HttpEntity entity = response.getEntity(); + return EntityUtils.toString(entity); + } + + private PrometheusRecordSink initTask() throws InitializationException { + + final ComponentLog logger = mock(ComponentLog.class); + final PrometheusRecordSink task = new PrometheusRecordSink(); + ConfigurationContext context = mock(ConfigurationContext.class); + final StateManager stateManager = new MockStateManager(task); + + final PropertyValue pValue = mock(StandardPropertyValue.class); + when(context.getProperty(PrometheusMetricsUtil.METRICS_ENDPOINT_PORT)).thenReturn(new MockPropertyValue(portString)); + when(context.getProperty(PrometheusMetricsUtil.SSL_CONTEXT)).thenReturn(pValue); + when(pValue.asControllerService(SSLContextService.class)).thenReturn(null); + + final ControllerServiceInitializationContext initContext = new MockControllerServiceInitializationContext(task, UUID.randomUUID().toString(), logger, stateManager); + task.initialize(initContext); + task.onScheduled(context); + + return task; + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusReportingTask.java b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusReportingTask.java index 8fa9237022..aef9e0cb25 100644 --- a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusReportingTask.java +++ b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusReportingTask.java @@ -29,6 +29,7 @@ import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.util.EntityUtils; import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil; import org.apache.nifi.state.MockStateManager; import org.apache.nifi.util.MockComponentLog; import org.apache.nifi.util.MockConfigurationContext; @@ -59,7 +60,7 @@ public class TestPrometheusReportingTask { reportingContextStub = new MockReportingContext(Collections.emptyMap(), new MockStateManager(testedReportingTask), new MockVariableRegistry()); - reportingContextStub.setProperty(PrometheusReportingTask.INSTANCE_ID.getName(), "localhost"); + reportingContextStub.setProperty(PrometheusMetricsUtil.INSTANCE_ID.getName(), "localhost"); configurationContextStub = new MockConfigurationContext(reportingContextStub.getProperties(), reportingContextStub.getControllerServiceLookup()); @@ -121,9 +122,6 @@ public class TestPrometheusReportingTask { HttpGet request = new HttpGet("http://localhost:9092/metrics"); HttpResponse response = client.execute(request); HttpEntity entity = response.getEntity(); - String content = EntityUtils.toString(entity); - - return content; + return EntityUtils.toString(entity); } - } diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryNiFiReportingTask.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryNiFiReportingTask.java index ed20acfe22..ae0e326e12 100644 --- a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryNiFiReportingTask.java +++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryNiFiReportingTask.java @@ -20,10 +20,12 @@ import org.apache.calcite.config.Lex; import org.apache.calcite.sql.parser.SqlParser; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; +import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.record.sink.RecordSinkService; import org.apache.nifi.reporting.AbstractReportingTask; @@ -32,6 +34,7 @@ import org.apache.nifi.reporting.ReportingInitializationContext; import org.apache.nifi.serialization.record.ResultSetRecordSet; import org.apache.nifi.util.StopWatch; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -77,6 +80,9 @@ public class QueryNiFiReportingTask extends AbstractReportingTask { .build(); private List properties; + + private volatile RecordSinkService recordSinkService; + private MetricsQueryService metricsQueryService; @Override @@ -94,11 +100,16 @@ public class QueryNiFiReportingTask extends AbstractReportingTask { return properties; } + @OnScheduled + public void setup(final ConfigurationContext context) throws IOException { + recordSinkService = context.getProperty(RECORD_SINK).asControllerService(RecordSinkService.class); + recordSinkService.reset(); + } + @Override public void onTrigger(ReportingContext context) { final StopWatch stopWatch = new StopWatch(true); try { - final RecordSinkService recordSinkService = context.getProperty(RECORD_SINK).asControllerService(RecordSinkService.class); final String sql = context.getProperty(QUERY).evaluateAttributeExpressions().getValue(); final QueryResult queryResult = metricsQueryService.query(context, sql); final ResultSetRecordSet recordSet; diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java index 0a4800b234..9f4cb0a0ac 100644 --- a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java +++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java @@ -20,6 +20,7 @@ package org.apache.nifi.reporting.sql; import org.apache.nifi.attribute.expression.language.StandardPropertyValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.status.ConnectionStatus; import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.controller.status.ProcessorStatus; @@ -265,10 +266,13 @@ public class TestQueryNiFiReportingTask { Mockito.when(context.getProperty(QueryNiFiReportingTask.RECORD_SINK)).thenReturn(pValue); Mockito.when(pValue.asControllerService(RecordSinkService.class)).thenReturn(mockRecordSinkService); + ConfigurationContext configContext = Mockito.mock(ConfigurationContext.class); + Mockito.when(configContext.getProperty(QueryNiFiReportingTask.RECORD_SINK)).thenReturn(pValue); + reportingTask.setup(configContext); + return reportingTask; } private static final class MockQueryNiFiReportingTask extends QueryNiFiReportingTask { - } } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-api/src/main/java/org/apache/nifi/record/sink/RecordSinkService.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-api/src/main/java/org/apache/nifi/record/sink/RecordSinkService.java index 73f3fcf570..461153e27d 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-api/src/main/java/org/apache/nifi/record/sink/RecordSinkService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-api/src/main/java/org/apache/nifi/record/sink/RecordSinkService.java @@ -34,6 +34,9 @@ import java.util.Map; @CapabilityDescription("Specifies a Controller Service that specifies a Record Writer as well as some transport mechanism to write the records to some destination (external system, e.g.)") public interface RecordSinkService extends ControllerService { + /** + * A standard Record Writer property for use in RecordSinkService implementations + */ PropertyDescriptor RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder() .name("record-sink-record-writer") .displayName("Record Writer") @@ -42,5 +45,20 @@ public interface RecordSinkService extends ControllerService { .required(true) .build(); + /** + * Sends the record set to the RecordSinkService + * @param recordSet The RecordSet to transmit + * @param attributes Attributes associated with the RecordSet + * @param sendZeroResults Whether to transmit empty record sets + * @return a WriteResult object containing the number of records transmitted, as well as any metadata in the form of attributes + * @throws IOException if any error occurs during transmission of the record set + */ WriteResult sendData(final RecordSet recordSet, final Map attributes, final boolean sendZeroResults) throws IOException; + + /** + * Resets the RecordSinkService. This is useful when the service uses the record set's schema in order to transmit the data correctly. If subsequent + * RecordSets have different schemas, this can cause issues with schema handling. Calling reset() should perform operations such as clearing the schema + * and any appropriate data related to possibly different RecordSets. The default implementation is a no-op + */ + default void reset() {} }