NIFI-6799: Add PrometheusRecordSink and reset() method to RecordSinkService (#3839)

This closes #3839
This commit is contained in:
Matthew Burgess 2019-10-24 11:53:28 -04:00 committed by Yolanda Davis
parent 147365285c
commit 8771c35f4a
11 changed files with 512 additions and 108 deletions

View File

@ -40,6 +40,21 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/ma
<artifactId>nifi-utils</artifactId>
<version>1.10.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-sink-api</artifactId>
<version>1.10.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-service-api</artifactId>
<version>1.10.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
<version>1.10.0-SNAPSHOT</version>
</dependency>
<!-- The client -->
<dependency>
<groupId>io.prometheus</groupId>

View File

@ -0,0 +1,208 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.prometheus;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Gauge;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.record.sink.RecordSinkService;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.ssl.SSLContextService;
import org.eclipse.jetty.server.Server;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
@Tags({"record", "send", "write", "prometheus"})
@CapabilityDescription("Specifies a Record Sink Service that exposes data points to a Prometheus scraping service. Numeric fields are exposed as Gauges, String fields are the "
+ "label values for the gauges, and all other fields are ignored.")
public class PrometheusRecordSink extends AbstractControllerService implements RecordSinkService {
private volatile PrometheusServer prometheusServer;
private volatile RecordSchema recordSchema;
private volatile String[] labelNames;
private volatile Map<String, Gauge> gauges;
private static final CollectorRegistry RECORD_REGISTRY = new CollectorRegistry();
private static final List<PropertyDescriptor> properties;
static {
List<PropertyDescriptor> props = new ArrayList<>();
props.add(PrometheusMetricsUtil.METRICS_ENDPOINT_PORT);
props.add(PrometheusMetricsUtil.INSTANCE_ID);
props.add(PrometheusMetricsUtil.SSL_CONTEXT);
props.add(PrometheusMetricsUtil.CLIENT_AUTH);
properties = Collections.unmodifiableList(props);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@OnEnabled
public void onScheduled(final ConfigurationContext context) {
RECORD_REGISTRY.clear();
SSLContextService sslContextService = context.getProperty(PrometheusMetricsUtil.SSL_CONTEXT).asControllerService(SSLContextService.class);
final String metricsEndpointPort = context.getProperty(PrometheusMetricsUtil.METRICS_ENDPOINT_PORT).getValue();
try {
List<Function<ReportingContext, CollectorRegistry>> metricsCollectors = new ArrayList<>();
if (sslContextService == null) {
prometheusServer = new PrometheusServer(new InetSocketAddress(Integer.parseInt(metricsEndpointPort)), getLogger());
} else {
final String clientAuthValue = context.getProperty(PrometheusMetricsUtil.CLIENT_AUTH).getValue();
final boolean need;
final boolean want;
if (PrometheusMetricsUtil.CLIENT_NEED.getValue().equals(clientAuthValue)) {
need = true;
want = false;
} else if (PrometheusMetricsUtil.CLIENT_WANT.getValue().equals(clientAuthValue)) {
need = false;
want = true;
} else {
need = false;
want = false;
}
prometheusServer = new PrometheusServer(Integer.parseInt(metricsEndpointPort), sslContextService, getLogger(), need, want);
}
Function<ReportingContext, CollectorRegistry> nifiMetrics = (reportingContext) -> RECORD_REGISTRY;
metricsCollectors.add(nifiMetrics);
prometheusServer.setMetricsCollectors(metricsCollectors);
getLogger().info("Started JETTY server");
} catch (Exception e) {
getLogger().error("Failed to start Jetty server", e);
}
}
@Override
public WriteResult sendData(RecordSet recordSet, Map<String, String> attributes, boolean sendZeroResults) throws IOException {
WriteResult writeResult = null;
if (recordSchema == null) {
// The first time through, create the registry, then create the Gauges and register them
recordSchema = recordSet.getSchema();
RECORD_REGISTRY.clear();
// String fields are labels, collect them first
labelNames = recordSchema.getFields().stream().filter(
(f) -> isLabel(f.getDataType().getFieldType())).map(RecordField::getFieldName).toArray(String[]::new);
gauges = new HashMap<>();
recordSchema.getFields().stream().filter((field) -> isNumeric(field.getDataType().getFieldType())).forEach(
// Create, register, and add gauge to the list
(field) -> gauges.put(field.getFieldName(), Gauge.build()
.name(field.getFieldName())
.help("Metric for " + field.getFieldName())
.labelNames(labelNames)
.register(RECORD_REGISTRY))
);
}
int recordCount = 0;
Record r;
while ((r = recordSet.next()) != null) {
final Record record = r;
// Get label values, set empty strings for null values
String[] labelValues = Arrays.stream(labelNames).map((labelName) -> {
String value = record.getAsString(labelName);
return (value != null) ? value : "";
}).toArray(String[]::new);
// Get value for each gauge and update the data point
gauges.forEach((name, gauge) -> {
Optional<DataType> dataType = record.getSchema().getDataType(name);
if (dataType.isPresent()) {
RecordFieldType recordFieldType = dataType.get().getFieldType();
// Change boolean fields to doubles
final double value;
if (RecordFieldType.BOOLEAN.equals(recordFieldType)) {
value = record.getAsBoolean(name) ? 1.0 : 0.0;
} else {
value = record.getAsDouble(name);
}
gauge.labels(labelValues).set(value);
}
});
recordCount++;
}
attributes.put("record.count", Integer.toString(recordCount));
writeResult = WriteResult.of(recordCount, attributes);
return writeResult;
}
@OnDisabled
public void onStopped() throws Exception {
Server server = prometheusServer.getServer();
server.stop();
recordSchema = null;
}
@OnShutdown
public void onShutDown() throws Exception {
Server server = prometheusServer.getServer();
server.stop();
recordSchema = null;
}
@Override
public void reset() {
// Reset the schema in order to support different RecordSet schemas
recordSchema = null;
}
private boolean isNumeric(RecordFieldType dataType) {
// Numeric fields are metrics
return RecordFieldType.INT.equals(dataType)
|| RecordFieldType.SHORT.equals(dataType)
|| RecordFieldType.LONG.equals(dataType)
|| RecordFieldType.BIGINT.equals(dataType)
|| RecordFieldType.FLOAT.equals(dataType)
|| RecordFieldType.DOUBLE.equals(dataType)
|| RecordFieldType.BOOLEAN.equals(dataType);
}
private boolean isLabel(RecordFieldType dataType) {
return RecordFieldType.STRING.equals(dataType)
|| RecordFieldType.CHAR.equals(dataType);
}
}

View File

@ -21,7 +21,9 @@ import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import io.prometheus.client.CollectorRegistry;
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
@ -29,14 +31,13 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.metrics.jvm.JmxJvmMetrics;
import org.apache.nifi.reporting.AbstractReportingTask;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;
import org.eclipse.jetty.server.Server;
@ -48,39 +49,9 @@ import static org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil.MET
@CapabilityDescription("Reports metrics in Prometheus format by creating /metrics http endpoint which can be used for external monitoring of the application."
+ " The reporting task reports a set of metrics regarding the JVM (optional) and the NiFi instance")
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "60 sec")
public class PrometheusReportingTask extends AbstractReportingTask {
private PrometheusServer prometheusServer;
private SSLContextService sslContextService;
public static final AllowableValue CLIENT_NONE = new AllowableValue("No Authentication", "No Authentication",
"ReportingTask will not authenticate clients. Anyone can communicate with this ReportingTask anonymously");
public static final AllowableValue CLIENT_WANT = new AllowableValue("Want Authentication", "Want Authentication",
"ReportingTask will try to verify the client but if unable to verify will allow the client to communicate anonymously");
public static final AllowableValue CLIENT_NEED = new AllowableValue("Need Authentication", "Need Authentication",
"ReportingTask will reject communications from any client unless the client provides a certificate that is trusted by the TrustStore"
+ "specified in the SSL Context Service");
public static final PropertyDescriptor METRICS_ENDPOINT_PORT = new PropertyDescriptor.Builder()
.name("prometheus-reporting-task-metrics-endpoint-port")
.displayName("Prometheus Metrics Endpoint Port")
.description("The Port where prometheus metrics can be accessed")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.defaultValue("9092")
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.build();
public static final PropertyDescriptor INSTANCE_ID = new PropertyDescriptor.Builder()
.name("prometheus-reporting-task-instance-id")
.displayName("Instance ID")
.description("Id of this NiFi instance to be included in the metrics sent to Prometheus")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.defaultValue("${hostname(true)}")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor METRICS_STRATEGY = new PropertyDescriptor.Builder()
.name("prometheus-reporting-task-metrics-strategy")
@ -100,35 +71,16 @@ public class PrometheusReportingTask extends AbstractReportingTask {
.required(true)
.build();
public static final PropertyDescriptor SSL_CONTEXT = new PropertyDescriptor.Builder()
.name("prometheus-reporting-task-ssl-context")
.displayName("SSL Context Service")
.description("The SSL Context Service to use in order to secure the server. If specified, the server will"
+ "accept only HTTPS requests; otherwise, the server will accept only HTTP requests")
.required(false)
.identifiesControllerService(RestrictedSSLContextService.class)
.build();
public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
.name("prometheus-reporting-task-client-auth")
.displayName("Client Authentication")
.description("Specifies whether or not the Reporting Task should authenticate clients. This value is ignored if the <SSL Context Service> "
+ "Property is not specified or the SSL Context provided uses only a KeyStore and not a TrustStore.")
.required(true)
.allowableValues(CLIENT_NONE, CLIENT_WANT, CLIENT_NEED)
.defaultValue(CLIENT_NONE.getValue())
.build();
private static final List<PropertyDescriptor> properties;
static {
List<PropertyDescriptor> props = new ArrayList<>();
props.add(METRICS_ENDPOINT_PORT);
props.add(INSTANCE_ID);
props.add(PrometheusMetricsUtil.METRICS_ENDPOINT_PORT);
props.add(PrometheusMetricsUtil.INSTANCE_ID);
props.add(METRICS_STRATEGY);
props.add(SEND_JVM_METRICS);
props.add(SSL_CONTEXT);
props.add(CLIENT_AUTH);
props.add(PrometheusMetricsUtil.SSL_CONTEXT);
props.add(PrometheusMetricsUtil.CLIENT_AUTH);
properties = Collections.unmodifiableList(props);
}
@ -139,30 +91,44 @@ public class PrometheusReportingTask extends AbstractReportingTask {
@OnScheduled
public void onScheduled(final ConfigurationContext context) {
sslContextService = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class);
final String metricsEndpointPort = context.getProperty(METRICS_ENDPOINT_PORT).getValue();
SSLContextService sslContextService = context.getProperty(PrometheusMetricsUtil.SSL_CONTEXT).asControllerService(SSLContextService.class);
final String metricsEndpointPort = context.getProperty(PrometheusMetricsUtil.METRICS_ENDPOINT_PORT).getValue();
try {
List<Function<ReportingContext, CollectorRegistry>> metricsCollectors = new ArrayList<>();
if (sslContextService == null) {
this.prometheusServer = new PrometheusServer(new InetSocketAddress(Integer.parseInt(metricsEndpointPort)), getLogger());
} else {
final String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue();
final String clientAuthValue = context.getProperty(PrometheusMetricsUtil.CLIENT_AUTH).getValue();
final boolean need;
final boolean want;
if (CLIENT_NEED.equals(clientAuthValue)) {
if (PrometheusMetricsUtil.CLIENT_NEED.getValue().equals(clientAuthValue)) {
need = true;
want = false;
} else if (CLIENT_WANT.equals(clientAuthValue)) {
} else if (PrometheusMetricsUtil.CLIENT_WANT.getValue().equals(clientAuthValue)) {
need = false;
want = true;
} else {
need = false;
want = false;
}
this.prometheusServer = new PrometheusServer(Integer.parseInt(metricsEndpointPort),sslContextService, getLogger(), need, want);
this.prometheusServer = new PrometheusServer(Integer.parseInt(metricsEndpointPort), sslContextService, getLogger(), need, want);
}
this.prometheusServer.setInstanceId(context.getProperty(INSTANCE_ID).evaluateAttributeExpressions().getValue());
this.prometheusServer.setSendJvmMetrics(context.getProperty(SEND_JVM_METRICS).asBoolean());
Function<ReportingContext, CollectorRegistry> nifiMetrics = (reportingContext) -> {
ProcessGroupStatus rootGroupStatus = reportingContext.getEventAccess().getControllerStatus();
String instanceId = reportingContext.getProperty(PrometheusMetricsUtil.INSTANCE_ID).evaluateAttributeExpressions().getValue();
String metricsStrategy = reportingContext.getProperty(METRICS_STRATEGY).getValue();
return PrometheusMetricsUtil.createNifiMetrics(rootGroupStatus, instanceId, "", "RootProcessGroup", metricsStrategy);
};
metricsCollectors.add(nifiMetrics);
if (context.getProperty(SEND_JVM_METRICS).asBoolean()) {
Function<ReportingContext, CollectorRegistry> jvmMetrics = (reportingContext) -> {
String instanceId = reportingContext.getProperty(PrometheusMetricsUtil.INSTANCE_ID).evaluateAttributeExpressions().getValue();
return PrometheusMetricsUtil.createJvmMetrics(JmxJvmMetrics.getInstance(), instanceId);
};
metricsCollectors.add(jvmMetrics);
}
this.prometheusServer.setMetricsCollectors(metricsCollectors);
getLogger().info("Started JETTY server");
} catch (Exception e) {
getLogger().error("Failed to start Jetty server", e);
@ -184,6 +150,5 @@ public class PrometheusReportingTask extends AbstractReportingTask {
@Override
public void onTrigger(final ReportingContext context) {
this.prometheusServer.setReportingContext(context);
this.prometheusServer.setMetricsStrategy(context.getProperty(METRICS_STRATEGY).getValue());
}
}

View File

@ -21,6 +21,9 @@ import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
@ -28,11 +31,8 @@ import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.metrics.jvm.JmxJvmMetrics;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil;
import org.apache.nifi.ssl.SSLContextService;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration;
@ -53,13 +53,10 @@ public class PrometheusServer {
private Server server;
private ServletContextHandler handler;
private ReportingContext context;
private String metricsStrategy;
private boolean sendJvmMetrics;
private String instanceId;
private List<Function<ReportingContext, CollectorRegistry>> metricsCollectors;
class MetricsServlet extends HttpServlet {
private CollectorRegistry nifiRegistry, jvmRegistry;
private ProcessGroupStatus rootGroupStatus;
@Override
protected void doGet(final HttpServletRequest req, final HttpServletResponse resp) throws ServletException, IOException {
@ -67,16 +64,12 @@ public class PrometheusServer {
logger.debug("PrometheusServer Do get called");
}
rootGroupStatus = PrometheusServer.this.context.getEventAccess().getControllerStatus();
ServletOutputStream response = resp.getOutputStream();
OutputStreamWriter osw = new OutputStreamWriter(response);
nifiRegistry = PrometheusMetricsUtil.createNifiMetrics(rootGroupStatus, PrometheusServer.this.instanceId, "", "RootProcessGroup", metricsStrategy);
TextFormat.write004(osw, nifiRegistry.metricFamilySamples());
if (PrometheusServer.this.sendJvmMetrics == true) {
jvmRegistry = PrometheusMetricsUtil.createJvmMetrics(JmxJvmMetrics.getInstance(), PrometheusServer.this.instanceId);
TextFormat.write004(osw, jvmRegistry.metricFamilySamples());
for(Function<ReportingContext, CollectorRegistry> mc : metricsCollectors) {
CollectorRegistry collectorRegistry = mc.apply(getReportingContext());
TextFormat.write004(osw, collectorRegistry.metricFamilySamples());
}
osw.flush();
@ -91,6 +84,7 @@ public class PrometheusServer {
public PrometheusServer(InetSocketAddress addr, ComponentLog logger) throws Exception {
PrometheusServer.logger = logger;
metricsCollectors = Collections.emptyList();
this.server = new Server(addr);
this.handler = new ServletContextHandler(server, "/metrics");
@ -148,31 +142,15 @@ public class PrometheusServer {
return this.context;
}
public boolean getSendJvmMetrics() {
return this.sendJvmMetrics;
}
public String getInstanceId() {
return this.instanceId;
}
public void setReportingContext(ReportingContext rc) {
this.context = rc;
}
public void setSendJvmMetrics(boolean jvm) {
this.sendJvmMetrics = jvm;
public List<Function<ReportingContext, CollectorRegistry>> getMetricsCollectors() {
return metricsCollectors;
}
public void setInstanceId(String iid) {
this.instanceId = iid;
}
public String getMetricsStrategy() {
return metricsStrategy;
}
public void setMetricsStrategy(String metricsStrategy) {
this.metricsStrategy = metricsStrategy;
public void setMetricsCollectors(List<Function<ReportingContext, CollectorRegistry>> metricsCollectors) {
this.metricsCollectors = metricsCollectors;
}
}

View File

@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
import io.prometheus.client.SimpleCollector;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
@ -33,8 +34,11 @@ import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Gauge;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
import org.apache.nifi.controller.status.TransmissionStatus;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.metrics.jvm.JvmMetrics;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.RestrictedSSLContextService;
public class PrometheusMetricsUtil {
@ -48,6 +52,54 @@ public class PrometheusMetricsUtil {
private static final CollectorRegistry NIFI_REGISTRY = new CollectorRegistry();
private static final CollectorRegistry JVM_REGISTRY = new CollectorRegistry();
// Common properties/values
public static final AllowableValue CLIENT_NONE = new AllowableValue("No Authentication", "No Authentication",
"ReportingTask will not authenticate clients. Anyone can communicate with this ReportingTask anonymously");
public static final AllowableValue CLIENT_WANT = new AllowableValue("Want Authentication", "Want Authentication",
"ReportingTask will try to verify the client but if unable to verify will allow the client to communicate anonymously");
public static final AllowableValue CLIENT_NEED = new AllowableValue("Need Authentication", "Need Authentication",
"ReportingTask will reject communications from any client unless the client provides a certificate that is trusted by the TrustStore"
+ "specified in the SSL Context Service");
public static final PropertyDescriptor METRICS_ENDPOINT_PORT = new PropertyDescriptor.Builder()
.name("prometheus-reporting-task-metrics-endpoint-port")
.displayName("Prometheus Metrics Endpoint Port")
.description("The Port where prometheus metrics can be accessed")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.defaultValue("9092")
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.build();
public static final PropertyDescriptor INSTANCE_ID = new PropertyDescriptor.Builder()
.name("prometheus-reporting-task-instance-id")
.displayName("Instance ID")
.description("Id of this NiFi instance to be included in the metrics sent to Prometheus")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.defaultValue("${hostname(true)}")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor SSL_CONTEXT = new PropertyDescriptor.Builder()
.name("prometheus-reporting-task-ssl-context")
.displayName("SSL Context Service")
.description("The SSL Context Service to use in order to secure the server. If specified, the server will"
+ "accept only HTTPS requests; otherwise, the server will accept only HTTP requests")
.required(false)
.identifiesControllerService(RestrictedSSLContextService.class)
.build();
public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
.name("prometheus-reporting-task-client-auth")
.displayName("Client Authentication")
.description("Specifies whether or not the Reporting Task should authenticate clients. This value is ignored if the <SSL Context Service> "
+ "Property is not specified or the SSL Context provided uses only a KeyStore and not a TrustStore.")
.required(true)
.allowableValues(CLIENT_NONE, CLIENT_WANT, CLIENT_NEED)
.defaultValue(CLIENT_NONE.getValue())
.build();
// Processor / Process Group metrics
private static final Gauge AMOUNT_FLOWFILES_SENT = Gauge.build()
.name("nifi_amount_flowfiles_sent")

View File

@ -0,0 +1,15 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.reporting.prometheus.PrometheusRecordSink

View File

@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.prometheus;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.ListRecordSet;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.util.MockControllerServiceInitializationContext;
import org.apache.nifi.util.MockPropertyValue;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestPrometheusRecordSink {
private static final String portString = "7077";
@Test
public void testSendData() throws IOException, InitializationException {
PrometheusRecordSink sink = initTask();
List<RecordField> recordFields = Arrays.asList(
new RecordField("field1", RecordFieldType.INT.getDataType()),
new RecordField("field2", RecordFieldType.STRING.getDataType())
);
RecordSchema recordSchema = new SimpleRecordSchema(recordFields);
Map<String, Object> row1 = new HashMap<>();
row1.put("field1", 15);
row1.put("field2", "Hello");
Map<String, Object> row2 = new HashMap<>();
row2.put("field1", 6);
row2.put("field2", "World!");
RecordSet recordSet = new ListRecordSet(recordSchema, Arrays.asList(
new MapRecord(recordSchema, row1),
new MapRecord(recordSchema, row2)
));
Map<String, String> attributes = new HashMap<>();
attributes.put("a", "Hello");
WriteResult writeResult = sink.sendData(recordSet, attributes, true);
assertNotNull(writeResult);
assertEquals(2, writeResult.getRecordCount());
assertEquals("Hello", writeResult.getAttributes().get("a"));
final String content = getMetrics();
assertTrue(content.contains("field1{field2=\"Hello\",} 15.0\nfield1{field2=\"World!\",} 6.0\n"));
try {
sink.onStopped();
} catch (Exception e) {
// Do nothing, just need to shut down the server before the next run
}
}
private String getMetrics() throws IOException {
URL url = new URL("http://localhost:" + portString + "/metrics");
HttpURLConnection con = (HttpURLConnection) url.openConnection();
con.setRequestMethod("GET");
int status = con.getResponseCode();
Assert.assertEquals(HttpURLConnection.HTTP_OK, status);
HttpClient client = HttpClientBuilder.create().build();
HttpGet request = new HttpGet("http://localhost:" + portString + "/metrics");
HttpResponse response = client.execute(request);
HttpEntity entity = response.getEntity();
return EntityUtils.toString(entity);
}
private PrometheusRecordSink initTask() throws InitializationException {
final ComponentLog logger = mock(ComponentLog.class);
final PrometheusRecordSink task = new PrometheusRecordSink();
ConfigurationContext context = mock(ConfigurationContext.class);
final StateManager stateManager = new MockStateManager(task);
final PropertyValue pValue = mock(StandardPropertyValue.class);
when(context.getProperty(PrometheusMetricsUtil.METRICS_ENDPOINT_PORT)).thenReturn(new MockPropertyValue(portString));
when(context.getProperty(PrometheusMetricsUtil.SSL_CONTEXT)).thenReturn(pValue);
when(pValue.asControllerService(SSLContextService.class)).thenReturn(null);
final ControllerServiceInitializationContext initContext = new MockControllerServiceInitializationContext(task, UUID.randomUUID().toString(), logger, stateManager);
task.initialize(initContext);
task.onScheduled(context);
return task;
}
}

View File

@ -29,6 +29,7 @@ import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil;
import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.util.MockComponentLog;
import org.apache.nifi.util.MockConfigurationContext;
@ -59,7 +60,7 @@ public class TestPrometheusReportingTask {
reportingContextStub = new MockReportingContext(Collections.emptyMap(),
new MockStateManager(testedReportingTask), new MockVariableRegistry());
reportingContextStub.setProperty(PrometheusReportingTask.INSTANCE_ID.getName(), "localhost");
reportingContextStub.setProperty(PrometheusMetricsUtil.INSTANCE_ID.getName(), "localhost");
configurationContextStub = new MockConfigurationContext(reportingContextStub.getProperties(),
reportingContextStub.getControllerServiceLookup());
@ -121,9 +122,6 @@ public class TestPrometheusReportingTask {
HttpGet request = new HttpGet("http://localhost:9092/metrics");
HttpResponse response = client.execute(request);
HttpEntity entity = response.getEntity();
String content = EntityUtils.toString(entity);
return content;
return EntityUtils.toString(entity);
}
}

View File

@ -20,10 +20,12 @@ import org.apache.calcite.config.Lex;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.record.sink.RecordSinkService;
import org.apache.nifi.reporting.AbstractReportingTask;
@ -32,6 +34,7 @@ import org.apache.nifi.reporting.ReportingInitializationContext;
import org.apache.nifi.serialization.record.ResultSetRecordSet;
import org.apache.nifi.util.StopWatch;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -77,6 +80,9 @@ public class QueryNiFiReportingTask extends AbstractReportingTask {
.build();
private List<PropertyDescriptor> properties;
private volatile RecordSinkService recordSinkService;
private MetricsQueryService metricsQueryService;
@Override
@ -94,11 +100,16 @@ public class QueryNiFiReportingTask extends AbstractReportingTask {
return properties;
}
@OnScheduled
public void setup(final ConfigurationContext context) throws IOException {
recordSinkService = context.getProperty(RECORD_SINK).asControllerService(RecordSinkService.class);
recordSinkService.reset();
}
@Override
public void onTrigger(ReportingContext context) {
final StopWatch stopWatch = new StopWatch(true);
try {
final RecordSinkService recordSinkService = context.getProperty(RECORD_SINK).asControllerService(RecordSinkService.class);
final String sql = context.getProperty(QUERY).evaluateAttributeExpressions().getValue();
final QueryResult queryResult = metricsQueryService.query(context, sql);
final ResultSetRecordSet recordSet;

View File

@ -20,6 +20,7 @@ package org.apache.nifi.reporting.sql;
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
@ -265,10 +266,13 @@ public class TestQueryNiFiReportingTask {
Mockito.when(context.getProperty(QueryNiFiReportingTask.RECORD_SINK)).thenReturn(pValue);
Mockito.when(pValue.asControllerService(RecordSinkService.class)).thenReturn(mockRecordSinkService);
ConfigurationContext configContext = Mockito.mock(ConfigurationContext.class);
Mockito.when(configContext.getProperty(QueryNiFiReportingTask.RECORD_SINK)).thenReturn(pValue);
reportingTask.setup(configContext);
return reportingTask;
}
private static final class MockQueryNiFiReportingTask extends QueryNiFiReportingTask {
}
}

View File

@ -34,6 +34,9 @@ import java.util.Map;
@CapabilityDescription("Specifies a Controller Service that specifies a Record Writer as well as some transport mechanism to write the records to some destination (external system, e.g.)")
public interface RecordSinkService extends ControllerService {
/**
* A standard Record Writer property for use in RecordSinkService implementations
*/
PropertyDescriptor RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder()
.name("record-sink-record-writer")
.displayName("Record Writer")
@ -42,5 +45,20 @@ public interface RecordSinkService extends ControllerService {
.required(true)
.build();
/**
* Sends the record set to the RecordSinkService
* @param recordSet The RecordSet to transmit
* @param attributes Attributes associated with the RecordSet
* @param sendZeroResults Whether to transmit empty record sets
* @return a WriteResult object containing the number of records transmitted, as well as any metadata in the form of attributes
* @throws IOException if any error occurs during transmission of the record set
*/
WriteResult sendData(final RecordSet recordSet, final Map<String,String> attributes, final boolean sendZeroResults) throws IOException;
/**
* Resets the RecordSinkService. This is useful when the service uses the record set's schema in order to transmit the data correctly. If subsequent
* RecordSets have different schemas, this can cause issues with schema handling. Calling reset() should perform operations such as clearing the schema
* and any appropriate data related to possibly different RecordSets. The default implementation is a no-op
*/
default void reset() {}
}