NIFI-7796: Add Prometheus counters for total bytes sent/received (#4522)

* NIFI-7796: Add Prometheus metrics for total bytes sent/received, fixed read/written metrics

* NIFI-7796: Incorporated review comments
This commit is contained in:
Matthew Burgess 2020-10-06 09:26:27 -04:00 committed by GitHub
parent f4473afad6
commit 7cc3713389
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 160 additions and 16 deletions

View File

@ -63,4 +63,32 @@ public interface EventAccess {
*/
List<Action> getFlowChanges(int firstActionId, final int maxActions);
/**
* Returns the total number of bytes read by this instance (at the root process group level, i.e. all events) since the instance started
*
* @return the total number of bytes read by this instance
*/
long getTotalBytesRead();
/**
* Returns the total number of bytes written by this instance (at the root process group level, i.e. all events) since the instance started
*
* @return the total number of bytes written by this instance
*/
long getTotalBytesWritten();
/**
* Returns the total number of bytes sent by this instance (at the root process group level) since the instance started
*
* @return the total number of bytes sent by this instance
*/
long getTotalBytesSent();
/**
* Returns the total number of bytes received by this instance (at the root process group level) since the instance started
*
* @return the total number of bytes received by this instance
*/
long getTotalBytesReceived();
}

View File

@ -104,4 +104,19 @@ public class MockEventAccess implements EventAccess {
this.flowChanges.add(action);
}
public long getTotalBytesRead() {
return -1;
}
public long getTotalBytesWritten() {
return -1;
}
public long getTotalBytesSent() {
return -1;
}
public long getTotalBytesReceived() {
return -1;
}
}

View File

@ -16,7 +16,6 @@
*/
package org.apache.nifi.prometheus.util;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
public class NiFiMetricsRegistry extends AbstractMetricsRegistry {
@ -54,19 +53,25 @@ public class NiFiMetricsRegistry extends AbstractMetricsRegistry {
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(registry));
nameToGaugeMap.put("TOTAL_BYTES_SENT", Gauge.build()
.name("nifi_total_bytes_sent")
.help("Running total number of bytes sent by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(registry));
nameToGaugeMap.put("AMOUNT_BYTES_READ", Gauge.build()
.name("nifi_amount_bytes_read")
.help("Total number of bytes read by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(registry));
nameToCounterMap.put("TOTAL_BYTES_READ", Counter.build().name("nifi_total_bytes_read")
.help("Total number of bytes read by the component")
nameToGaugeMap.put("TOTAL_BYTES_READ", Gauge.build().name("nifi_total_bytes_read")
.help("Running total number of bytes read by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(registry));
nameToCounterMap.put("TOTAL_BYTES_WRITTEN", Counter.build().name("nifi_total_bytes_written")
.help("Total number of bytes written by the component")
nameToGaugeMap.put("TOTAL_BYTES_WRITTEN", Gauge.build().name("nifi_total_bytes_written")
.help("Running total number of bytes written by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(registry));
@ -82,6 +87,12 @@ public class NiFiMetricsRegistry extends AbstractMetricsRegistry {
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(registry));
nameToGaugeMap.put("TOTAL_BYTES_RECEIVED", Gauge.build()
.name("nifi_total_bytes_received")
.help("Running total number of bytes received by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(registry));
nameToGaugeMap.put("AMOUNT_BYTES_TRANSFERRED", Gauge.build()
.name("nifi_amount_bytes_transferred")
.help("Total number of Bytes transferred by the component")

View File

@ -123,8 +123,6 @@ public class PrometheusMetricsUtil {
nifiMetricsRegistry.setDataPoint(status.getBytesSent(), "AMOUNT_BYTES_SENT", instanceId, componentType, componentName, componentId, parentPGId);
nifiMetricsRegistry.setDataPoint(status.getBytesRead(), "AMOUNT_BYTES_READ", instanceId, componentType, componentName, componentId, parentPGId);
nifiMetricsRegistry.setDataPoint(status.getBytesWritten(), "AMOUNT_BYTES_WRITTEN", instanceId, componentType, componentName, componentId, parentPGId);
nifiMetricsRegistry.incrementCounter(status.getBytesRead(), "TOTAL_BYTES_READ", instanceId, componentType, componentName, componentId, parentPGId);
nifiMetricsRegistry.incrementCounter(status.getBytesWritten(), "TOTAL_BYTES_WRITTEN", instanceId, componentType, componentName, componentId, parentPGId);
nifiMetricsRegistry.setDataPoint(status.getBytesReceived(), "AMOUNT_BYTES_RECEIVED", instanceId, componentType, componentName, componentId, parentPGId);
nifiMetricsRegistry.setDataPoint(status.getBytesTransferred(), "AMOUNT_BYTES_TRANSFERRED", instanceId, componentType, componentName, componentId, parentPGId);
@ -173,8 +171,6 @@ public class PrometheusMetricsUtil {
nifiMetricsRegistry.setDataPoint(processorStatus.getBytesSent(), "AMOUNT_BYTES_SENT", instanceId, procComponentType, procComponentName, procComponentId, parentId);
nifiMetricsRegistry.setDataPoint(processorStatus.getBytesRead(), "AMOUNT_BYTES_READ", instanceId, procComponentType, procComponentName, procComponentId, parentId);
nifiMetricsRegistry.setDataPoint(processorStatus.getBytesWritten(), "AMOUNT_BYTES_WRITTEN", instanceId, procComponentType, procComponentName, procComponentId, parentId);
nifiMetricsRegistry.incrementCounter(status.getBytesRead(), "TOTAL_BYTES_READ", instanceId, procComponentType, procComponentName, procComponentId, parentId);
nifiMetricsRegistry.incrementCounter(status.getBytesWritten(), "TOTAL_BYTES_WRITTEN", instanceId, procComponentType, procComponentName, procComponentId, parentId);
nifiMetricsRegistry.setDataPoint(processorStatus.getBytesReceived(), "AMOUNT_BYTES_RECEIVED", instanceId, procComponentType, procComponentName, procComponentId, parentId);
nifiMetricsRegistry.setDataPoint(processorStatus.getOutputBytes(), "SIZE_CONTENT_OUTPUT_TOTAL",
@ -245,8 +241,6 @@ public class PrometheusMetricsUtil {
nifiMetricsRegistry.setDataPoint(portStatus.getBytesSent(), "AMOUNT_BYTES_SENT", instanceId, portComponentType, portComponentName, portComponentId, parentId);
nifiMetricsRegistry.setDataPoint(portStatus.getInputBytes(), "AMOUNT_BYTES_READ", instanceId, portComponentType, portComponentName, portComponentId, parentId);
nifiMetricsRegistry.setDataPoint(portStatus.getOutputBytes(), "AMOUNT_BYTES_WRITTEN", instanceId, portComponentType, portComponentName, portComponentId, parentId);
nifiMetricsRegistry.incrementCounter(status.getBytesRead(), "TOTAL_BYTES_READ", instanceId, portComponentType, portComponentName, portComponentId, parentId);
nifiMetricsRegistry.incrementCounter(status.getBytesWritten(), "TOTAL_BYTES_WRITTEN", instanceId, portComponentType, portComponentName, portComponentId, parentId);
nifiMetricsRegistry.setDataPoint(portStatus.getBytesReceived(), "AMOUNT_BYTES_RECEIVED", instanceId, portComponentType, portComponentName, portComponentId, parentId);
nifiMetricsRegistry.setDataPoint(portStatus.getOutputCount(), "AMOUNT_ITEMS_OUTPUT",
@ -271,8 +265,6 @@ public class PrometheusMetricsUtil {
nifiMetricsRegistry.setDataPoint(portStatus.getBytesSent(), "AMOUNT_BYTES_SENT", instanceId, portComponentType, portComponentName, portComponentId, parentId);
nifiMetricsRegistry.setDataPoint(portStatus.getInputBytes(), "AMOUNT_BYTES_READ", instanceId, portComponentType, portComponentName, portComponentId, parentId);
nifiMetricsRegistry.setDataPoint(portStatus.getOutputBytes(), "AMOUNT_BYTES_WRITTEN", instanceId, portComponentType, portComponentName, portComponentId, parentId);
nifiMetricsRegistry.incrementCounter(status.getBytesRead(), "TOTAL_BYTES_READ", instanceId, portComponentType, portComponentName, portComponentId, parentId);
nifiMetricsRegistry.incrementCounter(status.getBytesWritten(), "TOTAL_BYTES_WRITTEN", instanceId, portComponentType, portComponentName, portComponentId, parentId);
nifiMetricsRegistry.setDataPoint(portStatus.getBytesReceived(), "AMOUNT_BYTES_RECEIVED", instanceId, portComponentType, portComponentName, portComponentId, parentId);
nifiMetricsRegistry.setDataPoint(portStatus.getOutputCount(), "AMOUNT_ITEMS_OUTPUT",

View File

@ -51,4 +51,10 @@ public interface FlowFileEventRepository extends Closeable {
* @param componentIdentifier Identifier of the component
*/
void purgeTransferEvents(String componentIdentifier);
/**
* Reports aggregate metrics for all flowfile events
* @return a report of processing activity
*/
FlowFileEvent reportAggregateEvent();
}

View File

@ -3003,6 +3003,10 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
return result;
}
public FlowFileEventRepository getFlowFileEventRepository() {
return flowFileEventRepository;
}
private static class HeartbeatBean {
private final ProcessGroup rootGroup;

View File

@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentMap;
public class RingBufferEventRepository implements FlowFileEventRepository {
private final int numMinutes;
private final EventSumValue aggregateValues = new EventSumValue(0L);
private final ConcurrentMap<String, EventContainer> componentEventMap = new ConcurrentHashMap<>();
public RingBufferEventRepository(final int numMinutes) {
@ -40,6 +41,7 @@ public class RingBufferEventRepository implements FlowFileEventRepository {
public void updateRepository(final FlowFileEvent event, final String componentId) {
final EventContainer eventContainer = componentEventMap.computeIfAbsent(componentId, id -> new SecondPrecisionEventContainer(numMinutes));
eventContainer.addEvent(event);
aggregateValues.add(event);
}
@Override
@ -61,4 +63,8 @@ public class RingBufferEventRepository implements FlowFileEventRepository {
componentEventMap.remove(componentIdentifier);
}
@Override
public FlowFileEvent reportAggregateEvent() {
return aggregateValues.toFlowFileEvent();
}
}

View File

@ -744,4 +744,44 @@ public class StandardEventAccess implements UserAwareEventAccess {
return status;
}
/**
* Returns the total number of bytes read by this instance (at the root process group level, i.e. all events) since the instance started
*
* @return the total number of bytes read by this instance
*/
@Override
public long getTotalBytesRead() {
return flowFileEventRepository.reportAggregateEvent().getBytesRead();
}
/**
* Returns the total number of bytes written by this instance (at the root process group level, i.e. all events) since the instance started
*
* @return the total number of bytes written by this instance
*/
@Override
public long getTotalBytesWritten() {
return flowFileEventRepository.reportAggregateEvent().getBytesWritten();
}
/**
* Returns the total number of bytes sent by this instance (at the root process group level) since the instance started
*
* @return the total number of bytes sent by this instance
*/
@Override
public long getTotalBytesSent() {
return flowFileEventRepository.reportAggregateEvent().getBytesSent();
}
/**
* Returns the total number of bytes received by this instance (at the root process group level) since the instance started
*
* @return the total number of bytes received by this instance
*/
@Override
public long getTotalBytesReceived() {
return flowFileEventRepository.reportAggregateEvent().getBytesReceived();
}
}

View File

@ -81,6 +81,8 @@ import org.apache.nifi.controller.Template;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.leader.election.LeaderElectionManager;
import org.apache.nifi.controller.repository.FlowFileEvent;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.repository.claim.ContentDirection;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceReference;
@ -152,6 +154,7 @@ import org.apache.nifi.reporting.ComponentType;
import org.apache.nifi.util.BundleUtils;
import org.apache.nifi.util.FlowDifferenceFilters;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.StringUtils;
import org.apache.nifi.web.api.dto.AccessPolicyDTO;
import org.apache.nifi.web.api.dto.AccessPolicySummaryDTO;
import org.apache.nifi.web.api.dto.AffectedComponentDTO;
@ -5376,10 +5379,25 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public Collection<CollectorRegistry> generateFlowMetrics() {
String instanceId = controllerFacade.getInstanceId();
final String instanceId = StringUtils.isEmpty(controllerFacade.getInstanceId()) ? "" : controllerFacade.getInstanceId();
ProcessGroupStatus rootPGStatus = controllerFacade.getProcessGroupStatus("root");
PrometheusMetricsUtil.createNifiMetrics(nifiMetricsRegistry, rootPGStatus, instanceId, "", "RootProcessGroup",
PrometheusMetricsUtil.METRICS_STRATEGY_COMPONENTS.getValue());
// Add the total byte counts (read/written) to the NiFi metrics registry
FlowFileEventRepository flowFileEventRepository = controllerFacade.getFlowFileEventRepository();
final String rootPGId = StringUtils.isEmpty(rootPGStatus.getId()) ? "" : rootPGStatus.getId();
final String rootPGName = StringUtils.isEmpty(rootPGStatus.getName()) ? "" : rootPGStatus.getName();
final FlowFileEvent aggregateEvent = flowFileEventRepository.reportAggregateEvent();
nifiMetricsRegistry.setDataPoint(aggregateEvent.getBytesRead(), "TOTAL_BYTES_READ",
instanceId, "RootProcessGroup", rootPGName, rootPGId, "");
nifiMetricsRegistry.setDataPoint(aggregateEvent.getBytesWritten(), "TOTAL_BYTES_WRITTEN",
instanceId, "RootProcessGroup", rootPGName, rootPGId, "");
nifiMetricsRegistry.setDataPoint(aggregateEvent.getBytesSent(), "TOTAL_BYTES_SENT",
instanceId, "RootProcessGroup", rootPGName, rootPGId, "");
nifiMetricsRegistry.setDataPoint(aggregateEvent.getBytesReceived(), "TOTAL_BYTES_RECEIVED",
instanceId, "RootProcessGroup", rootPGName, rootPGId, "");
PrometheusMetricsUtil.createJvmMetrics(jvmMetricsRegistry, JmxJvmMetrics.getInstance(), instanceId);
// Get Connection Status Analytics (predictions, e.g.)

View File

@ -48,6 +48,7 @@ import org.apache.nifi.controller.Template;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.repository.ContentNotFoundException;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.repository.claim.ContentDirection;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
@ -1645,6 +1646,10 @@ public class ControllerFacade implements Authorizable {
return dtoFactory.createProcessorDiagnosticsDto(processor, processorStatus, bulletinRepository, flowController, serviceEntityFactory);
}
public FlowFileEventRepository getFlowFileEventRepository() {
return flowController.getFlowFileEventRepository();
}
/*
* setters
*/

View File

@ -33,10 +33,12 @@ import org.apache.nifi.prometheus.util.JvmMetricsRegistry;
import org.apache.nifi.prometheus.util.NiFiMetricsRegistry;
import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
import org.apache.nifi.reporting.AbstractReportingTask;
import org.apache.nifi.reporting.EventAccess;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.StringUtils;
import org.eclipse.jetty.server.Server;
import java.net.InetSocketAddress;
@ -130,11 +132,28 @@ public class PrometheusReportingTask extends AbstractReportingTask {
this.prometheusServer = new PrometheusServer(Integer.parseInt(metricsEndpointPort), sslContextService, getLogger(), need, want);
}
Function<ReportingContext, CollectorRegistry> nifiMetrics = (reportingContext) -> {
ProcessGroupStatus rootGroupStatus = reportingContext.getEventAccess().getControllerStatus();
EventAccess eventAccess = reportingContext.getEventAccess();
ProcessGroupStatus rootGroupStatus = eventAccess.getControllerStatus();
String instanceId = reportingContext.getProperty(PrometheusMetricsUtil.INSTANCE_ID).evaluateAttributeExpressions().getValue();
if (instanceId == null) {
instanceId = "";
}
String metricsStrategy = reportingContext.getProperty(METRICS_STRATEGY).getValue();
NiFiMetricsRegistry nifiMetricsRegistry = new NiFiMetricsRegistry();
return PrometheusMetricsUtil.createNifiMetrics(nifiMetricsRegistry, rootGroupStatus, instanceId, "", "RootProcessGroup", metricsStrategy);
CollectorRegistry collectorRegistry = PrometheusMetricsUtil.createNifiMetrics(nifiMetricsRegistry, rootGroupStatus, instanceId, "", "RootProcessGroup", metricsStrategy);
// Add the total byte counts (read/written) to the NiFi metrics registry
final String rootPGId = StringUtils.isEmpty(rootGroupStatus.getId()) ? "" : rootGroupStatus.getId();
final String rootPGName = StringUtils.isEmpty(rootGroupStatus.getName()) ? "" : rootGroupStatus.getName();
nifiMetricsRegistry.setDataPoint(eventAccess.getTotalBytesRead(), "TOTAL_BYTES_READ",
instanceId, "RootProcessGroup", rootPGName, rootPGId, "");
nifiMetricsRegistry.setDataPoint(eventAccess.getTotalBytesWritten(), "TOTAL_BYTES_WRITTEN",
instanceId, "RootProcessGroup", rootPGName, rootPGId, "");
nifiMetricsRegistry.setDataPoint(eventAccess.getTotalBytesSent(), "TOTAL_BYTES_SENT",
instanceId, "RootProcessGroup", rootPGName, rootPGId, "");
nifiMetricsRegistry.setDataPoint(eventAccess.getTotalBytesReceived(), "TOTAL_BYTES_RECEIVED",
instanceId, "RootProcessGroup", rootPGName, rootPGId, "");
return collectorRegistry;
};
metricsCollectors.add(nifiMetrics);
if (context.getProperty(SEND_JVM_METRICS).asBoolean()) {