diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessorStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessorStatus.java index 54be7ba757..5a4f64a908 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessorStatus.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessorStatus.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.controller.status; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.TimeUnit; /** @@ -42,6 +44,7 @@ public class ProcessorStatus implements Cloneable { private long bytesReceived; private int flowFilesSent; private long bytesSent; + private Map counters; public String getId() { return id; @@ -211,6 +214,14 @@ public class ProcessorStatus implements Cloneable { this.bytesSent = bytesSent; } + public Map getCounters() { + return counters; + } + + public void setCounters(final Map counters) { + this.counters = counters; + } + @Override public ProcessorStatus clone() { final ProcessorStatus clonedObj = new ProcessorStatus(); @@ -234,6 +245,7 @@ public class ProcessorStatus implements Cloneable { clonedObj.flowFilesRemoved = flowFilesRemoved; clonedObj.runStatus = runStatus; clonedObj.type = type; + clonedObj.counters = counters == null ? null : new HashMap<>(counters); return clonedObj; } @@ -268,6 +280,8 @@ public class ProcessorStatus implements Cloneable { builder.append(processingNanos); builder.append(", activeThreadCount="); builder.append(activeThreadCount); + builder.append(", counters="); + builder.append(counters); builder.append("]"); return builder.toString(); } diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusRepository.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusRepository.java index 1042c3fbdf..553903f986 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusRepository.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusRepository.java @@ -103,10 +103,12 @@ public interface ComponentStatusRepository { * If the date range is large, the total number of data points could be far * too many to process. Therefore, this parameter allows the requestor to * indicate how many samples to return. + * @param includeCounters specifies whether or not metrics from Processor counters + * should be included in the StatusHistory. * @return a {@link StatusHistory} that provides the status information * about the Processor with the given ID during the given time period */ - StatusHistory getProcessorStatusHistory(String processorId, Date start, Date end, int preferredDataPoints); + StatusHistory getProcessorStatusHistory(String processorId, Date start, Date end, int preferredDataPoints, boolean includeCounters); /** * @param remoteGroupId to get history of diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistory.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistory.java index f1bb946837..75c609d3cf 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistory.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistory.java @@ -41,4 +41,5 @@ public interface StatusHistory { * @return List of snapshots for a given component */ List getStatusSnapshots(); + } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/StatusDescriptorDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/StatusDescriptorDTO.java index 0a3c41856f..34ee3150dd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/StatusDescriptorDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/StatusDescriptorDTO.java @@ -17,6 +17,9 @@ package org.apache.nifi.web.api.dto.status; import com.wordnik.swagger.annotations.ApiModelProperty; + +import java.util.Objects; + import javax.xml.bind.annotation.XmlType; /** @@ -102,4 +105,23 @@ public class StatusDescriptorDTO { this.formatter = formatter; } + @Override + public int hashCode() { + return 31 + 41 * (field == null ? 0 : field.hashCode()); + } + + @Override + public boolean equals(final Object obj) { + if (obj == this) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof StatusDescriptorDTO)) { + return false; + } + final StatusDescriptorDTO other = (StatusDescriptorDTO) obj; + return Objects.equals(field, other.field); + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java index 60b40f7ac7..eb96adb2e8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java @@ -16,31 +16,38 @@ */ package org.apache.nifi.cluster.coordination.http.endpoints; +import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger; +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor; +import org.apache.nifi.controller.status.history.MetricDescriptor; +import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter; +import org.apache.nifi.controller.status.history.ProcessGroupStatusDescriptor; +import org.apache.nifi.controller.status.history.ProcessorStatusDescriptor; +import org.apache.nifi.controller.status.history.RemoteProcessGroupStatusDescriptor; +import org.apache.nifi.controller.status.history.StandardMetricDescriptor; +import org.apache.nifi.controller.status.history.StandardStatusSnapshot; +import org.apache.nifi.controller.status.history.StatusHistoryUtil; +import org.apache.nifi.controller.status.history.StatusSnapshot; +import org.apache.nifi.controller.status.history.ValueMapper; +import org.apache.nifi.web.api.dto.status.NodeStatusSnapshotsDTO; +import org.apache.nifi.web.api.dto.status.StatusDescriptorDTO; +import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; +import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO; +import org.apache.nifi.web.api.entity.StatusHistoryEntity; + import java.net.URI; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.regex.Pattern; -import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger; -import org.apache.nifi.cluster.manager.NodeResponse; -import org.apache.nifi.cluster.protocol.NodeIdentifier; -import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor; -import org.apache.nifi.controller.status.history.MetricDescriptor; -import org.apache.nifi.controller.status.history.ProcessGroupStatusDescriptor; -import org.apache.nifi.controller.status.history.ProcessorStatusDescriptor; -import org.apache.nifi.controller.status.history.RemoteProcessGroupStatusDescriptor; -import org.apache.nifi.controller.status.history.StandardStatusSnapshot; -import org.apache.nifi.controller.status.history.StatusHistoryUtil; -import org.apache.nifi.controller.status.history.StatusSnapshot; -import org.apache.nifi.web.api.dto.status.NodeStatusSnapshotsDTO; -import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; -import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO; -import org.apache.nifi.web.api.entity.StatusHistoryEntity; public class StatusHistoryEndpointMerger implements EndpointResponseMerger { @@ -55,7 +62,7 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger { this.componentStatusSnapshotMillis = componentStatusSnapshotMillis; } - private Map> getMetricDescriptors(final URI uri) { + private Map> getStandardMetricDescriptors(final URI uri) { final String path = uri.getPath(); final Map> metricDescriptors = new HashMap<>(); @@ -87,16 +94,19 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger { return false; } - final Map> descriptors = getMetricDescriptors(uri); + final Map> descriptors = getStandardMetricDescriptors(uri); return descriptors != null && !descriptors.isEmpty(); } @Override public NodeResponse merge(URI uri, String method, Set successfulResponses, Set problematicResponses, NodeResponse clientResponse) { - final Map> metricDescriptors = getMetricDescriptors(uri); + final Map> metricDescriptors = getStandardMetricDescriptors(uri); final StatusHistoryEntity responseEntity = clientResponse.getClientResponse().getEntity(StatusHistoryEntity.class); + final Set fieldDescriptors = new LinkedHashSet<>(); + + boolean includeCounters = true; StatusHistoryDTO lastStatusHistory = null; final List nodeStatusSnapshots = new ArrayList<>(successfulResponses.size()); LinkedHashMap noReadPermissionsComponentDetails = null; @@ -109,6 +119,10 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger { noReadPermissionsComponentDetails = nodeStatus.getComponentDetails(); } + if (!Boolean.TRUE.equals(nodeResponseEntity.getCanRead())) { + includeCounters = false; + } + final NodeIdentifier nodeId = nodeResponse.getNodeId(); final NodeStatusSnapshotsDTO nodeStatusSnapshot = new NodeStatusSnapshotsDTO(); nodeStatusSnapshot.setNodeId(nodeId.getId()); @@ -116,6 +130,38 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger { nodeStatusSnapshot.setApiPort(nodeId.getApiPort()); nodeStatusSnapshot.setStatusSnapshots(nodeStatus.getAggregateSnapshots()); nodeStatusSnapshots.add(nodeStatusSnapshot); + + final List descriptors = nodeStatus.getFieldDescriptors(); + if (descriptors != null) { + fieldDescriptors.addAll(descriptors); + } + } + + // If there's a status descriptor that is in the fieldDescriptors, but is not in the standard metric descriptors that we find, + // then it is a counter metric and should be included only if all StatusHistoryDTO's include counter metrics. This is done because + // we include counters in the status history only if the user is authorized to read the Processor. Since it's possible for the nodes + // to disagree about who is authorized (if, for example, the authorizer is asynchronously updated), then if any node indicates that + // the user is not authorized, we want to assume that the user is, in fact, not authorized. + if (includeCounters) { + for (final StatusDescriptorDTO descriptorDto : fieldDescriptors) { + final String fieldName = descriptorDto.getField(); + + if (!metricDescriptors.containsKey(fieldName)) { + final ValueMapper valueMapper = s -> { + final Map counters = s.getCounters(); + if (counters == null) { + return 0L; + } + + return counters.getOrDefault(descriptorDto.getField(), 0L); + }; + + final MetricDescriptor metricDescriptor = new StandardMetricDescriptor<>(descriptorDto.getField(), + descriptorDto.getLabel(), descriptorDto.getDescription(), Formatter.COUNT, valueMapper); + + metricDescriptors.put(fieldName, metricDescriptor); + } + } } final StatusHistoryDTO clusterStatusHistory = new StatusHistoryDTO(); @@ -124,8 +170,8 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger { clusterStatusHistory.setNodeSnapshots(nodeStatusSnapshots); if (lastStatusHistory != null) { clusterStatusHistory.setComponentDetails(noReadPermissionsComponentDetails == null ? lastStatusHistory.getComponentDetails() : noReadPermissionsComponentDetails); - clusterStatusHistory.setFieldDescriptors(lastStatusHistory.getFieldDescriptors()); } + clusterStatusHistory.setFieldDescriptors(new ArrayList<>(fieldDescriptors)); final StatusHistoryEntity clusterEntity = new StatusHistoryEntity(); clusterEntity.setStatusHistory(clusterStatusHistory); @@ -177,6 +223,19 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger { final StandardStatusSnapshot snapshot = new StandardStatusSnapshot(); snapshot.setTimestamp(snapshotDto.getTimestamp()); + // Default all metrics to 0 so that if a counter has not yet been registered, it will have a value of 0 instead + // of missing all together. + for (final MetricDescriptor descriptor : metricDescriptors.values()) { + snapshot.addStatusMetric(descriptor, 0L); + + // If the DTO doesn't have an entry for the metric, add with a value of 0. + final Map dtoMetrics = snapshotDto.getStatusMetrics(); + final String field = descriptor.getField(); + if (!dtoMetrics.containsKey(field)) { + dtoMetrics.put(field, 0L); + } + } + final Map metrics = snapshotDto.getStatusMetrics(); for (final Map.Entry entry : metrics.entrySet()) { final String metricId = entry.getKey(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java index f07a5308dd..26cea50e74 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.controller.repository; +import java.util.Map; + public interface FlowFileEvent { String getComponentIdentifier(); @@ -51,4 +53,7 @@ public interface FlowFileEvent { long getBytesSent(); int getInvocations(); + + Map getCounters(); + } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 9438946892..8da894f055 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -2943,6 +2943,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R status.setFlowFilesSent(entry.getFlowFilesSent()); status.setBytesSent(entry.getBytesSent()); status.setFlowFilesRemoved(entry.getFlowFilesRemoved()); + + if (isProcessorAuthorized) { + status.setCounters(entry.getCounters()); + } } // Determine the run status and get any validation error... only validating while STOPPED @@ -4482,12 +4486,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R return StatusHistoryUtil.createStatusHistoryDTO(componentStatusRepository.getConnectionStatusHistory(connectionId, startTime, endTime, preferredDataPoints)); } - public StatusHistoryDTO getProcessorStatusHistory(final String processorId) { - return getProcessorStatusHistory(processorId, null, null, Integer.MAX_VALUE); + public StatusHistoryDTO getProcessorStatusHistory(final String processorId, final boolean includeCounters) { + return getProcessorStatusHistory(processorId, null, null, Integer.MAX_VALUE, includeCounters); } - public StatusHistoryDTO getProcessorStatusHistory(final String processorId, final Date startTime, final Date endTime, final int preferredDataPoints) { - return StatusHistoryUtil.createStatusHistoryDTO(componentStatusRepository.getProcessorStatusHistory(processorId, startTime, endTime, preferredDataPoints)); + public StatusHistoryDTO getProcessorStatusHistory(final String processorId, final Date startTime, final Date endTime, final int preferredDataPoints, final boolean includeCounters) { + return StatusHistoryUtil.createStatusHistoryDTO(componentStatusRepository.getProcessorStatusHistory(processorId, startTime, endTime, preferredDataPoints, includeCounters)); } public StatusHistoryDTO getProcessGroupStatusHistory(final String processGroupId) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RingBufferEventRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RingBufferEventRepository.java deleted file mode 100644 index fb3cdd2eb8..0000000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RingBufferEventRepository.java +++ /dev/null @@ -1,312 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller.repository; - -import java.io.IOException; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicReference; - -public class RingBufferEventRepository implements FlowFileEventRepository { - - private final int numMinutes; - private final ConcurrentMap componentEventMap = new ConcurrentHashMap<>(); - - public RingBufferEventRepository(final int numMinutes) { - this.numMinutes = numMinutes; - } - - @Override - public void close() throws IOException { - } - - @Override - public void updateRepository(final FlowFileEvent event) { - final String componentId = event.getComponentIdentifier(); - EventContainer eventContainer = componentEventMap.get(componentId); - if (eventContainer == null) { - eventContainer = new SecondPrecisionEventContainer(numMinutes); - final EventContainer oldEventContainer = componentEventMap.putIfAbsent(componentId, eventContainer); - if (oldEventContainer != null) { - eventContainer = oldEventContainer; - } - } - - eventContainer.addEvent(event); - } - - @Override - public StandardRepositoryStatusReport reportTransferEvents(final long sinceEpochMillis) { - final StandardRepositoryStatusReport report = new StandardRepositoryStatusReport(); - - for (final Map.Entry entry : componentEventMap.entrySet()) { - final String consumerId = entry.getKey(); - final EventContainer container = entry.getValue(); - - final FlowFileEvent reportEntry = container.generateReport(consumerId, sinceEpochMillis); - report.addReportEntry(reportEntry); - } - - return report; - } - - @Override - public void purgeTransferEvents(final long cutoffEpochMilliseconds) { - // This is done so that if a processor is removed from the graph, its events - // will be removed rather than being kept in memory - for (final EventContainer container : componentEventMap.values()) { - container.purgeEvents(cutoffEpochMilliseconds); - } - } - - private static interface EventContainer { - - public void addEvent(FlowFileEvent event); - - public void purgeEvents(long cutoffEpochMillis); - - public FlowFileEvent generateReport(String consumerId, long sinceEpochMillis); - } - - private class EventSum { - - private final AtomicReference ref = new AtomicReference<>(new EventSumValue()); - - private void add(final FlowFileEvent event) { - EventSumValue newValue; - EventSumValue value; - do { - value = ref.get(); - newValue = new EventSumValue(value, - event.getFlowFilesIn(), event.getFlowFilesOut(), event.getFlowFilesRemoved(), - event.getContentSizeIn(), event.getContentSizeOut(), event.getContentSizeRemoved(), - event.getBytesRead(), event.getBytesWritten(), - event.getFlowFilesReceived(), event.getBytesReceived(), - event.getFlowFilesSent(), event.getBytesSent(), - event.getProcessingNanoseconds(), event.getInvocations(), event.getAggregateLineageMillis()); - } while (!ref.compareAndSet(value, newValue)); - } - - public EventSumValue getValue() { - return ref.get(); - } - - public void addOrReset(final FlowFileEvent event) { - final long expectedMinute = System.currentTimeMillis() / 60000; - - final EventSumValue curValue = ref.get(); - if (curValue.getMinuteTimestamp() != expectedMinute) { - ref.compareAndSet(curValue, new EventSumValue()); - } - add(event); - } - } - - private static class EventSumValue { - - private final int flowFilesIn, flowFilesOut, flowFilesRemoved; - private final long contentSizeIn, contentSizeOut, contentSizeRemoved; - private final long bytesRead, bytesWritten; - private final int flowFilesReceived, flowFilesSent; - private final long bytesReceived, bytesSent; - private final long processingNanos; - private final long aggregateLineageMillis; - private final int invocations; - - private final long minuteTimestamp; - private final long millisecondTimestamp; - - public EventSumValue() { - flowFilesIn = flowFilesOut = flowFilesRemoved = 0; - contentSizeIn = contentSizeOut = contentSizeRemoved = 0; - bytesRead = bytesWritten = 0; - flowFilesReceived = flowFilesSent = 0; - bytesReceived = bytesSent = 0L; - processingNanos = invocations = 0; - aggregateLineageMillis = 0L; - this.millisecondTimestamp = System.currentTimeMillis(); - this.minuteTimestamp = millisecondTimestamp / 60000; - } - - public EventSumValue(final EventSumValue base, final int flowFilesIn, final int flowFilesOut, final int flowFilesRemoved, - final long contentSizeIn, final long contentSizeOut, final long contentSizeRemoved, - final long bytesRead, final long bytesWritten, - final int flowFilesReceived, final long bytesReceived, - final int flowFilesSent, final long bytesSent, - final long processingNanos, final int invocations, final long aggregateLineageMillis) { - this.flowFilesIn = base.flowFilesIn + flowFilesIn; - this.flowFilesOut = base.flowFilesOut + flowFilesOut; - this.flowFilesRemoved = base.flowFilesRemoved + flowFilesRemoved; - this.contentSizeIn = base.contentSizeIn + contentSizeIn; - this.contentSizeOut = base.contentSizeOut + contentSizeOut; - this.contentSizeRemoved = base.contentSizeRemoved + contentSizeRemoved; - this.bytesRead = base.bytesRead + bytesRead; - this.bytesWritten = base.bytesWritten + bytesWritten; - this.flowFilesReceived = base.flowFilesReceived + flowFilesReceived; - this.bytesReceived = base.bytesReceived + bytesReceived; - this.flowFilesSent = base.flowFilesSent + flowFilesSent; - this.bytesSent = base.bytesSent + bytesSent; - this.processingNanos = base.processingNanos + processingNanos; - this.invocations = base.invocations + invocations; - this.aggregateLineageMillis = base.aggregateLineageMillis + aggregateLineageMillis; - this.millisecondTimestamp = System.currentTimeMillis(); - this.minuteTimestamp = millisecondTimestamp / 60000; - } - - public long getTimestamp() { - return millisecondTimestamp; - } - - public long getMinuteTimestamp() { - return minuteTimestamp; - } - - public long getBytesRead() { - return bytesRead; - } - - public long getBytesWritten() { - return bytesWritten; - } - - public int getFlowFilesIn() { - return flowFilesIn; - } - - public int getFlowFilesOut() { - return flowFilesOut; - } - - public long getContentSizeIn() { - return contentSizeIn; - } - - public long getContentSizeOut() { - return contentSizeOut; - } - - public int getFlowFilesRemoved() { - return flowFilesRemoved; - } - - public long getContentSizeRemoved() { - return contentSizeRemoved; - } - - public long getProcessingNanoseconds() { - return processingNanos; - } - - public int getInvocations() { - return invocations; - } - - public long getAggregateLineageMillis() { - return aggregateLineageMillis; - } - - public int getFlowFilesReceived() { - return flowFilesReceived; - } - - public int getFlowFilesSent() { - return flowFilesSent; - } - - public long getBytesReceived() { - return bytesReceived; - } - - public long getBytesSent() { - return bytesSent; - } - } - - private class SecondPrecisionEventContainer implements EventContainer { - - private final int numBins; - private final EventSum[] sums; - - public SecondPrecisionEventContainer(final int numMinutes) { - numBins = 1 + numMinutes * 60; - sums = new EventSum[numBins]; - - for (int i = 0; i < numBins; i++) { - sums[i] = new EventSum(); - } - } - - @Override - public void addEvent(final FlowFileEvent event) { - final int second = (int) (System.currentTimeMillis() / 1000); - final int binIdx = (int) (second % numBins); - final EventSum sum = sums[binIdx]; - - sum.addOrReset(event); - } - - @Override - public void purgeEvents(final long cutoffEpochMilliseconds) { - // no need to do anything - } - - @Override - public FlowFileEvent generateReport(final String consumerId, final long sinceEpochMillis) { - int flowFilesIn = 0, flowFilesOut = 0, flowFilesRemoved = 0; - long contentSizeIn = 0L, contentSizeOut = 0L, contentSizeRemoved = 0L; - long bytesRead = 0L, bytesWritten = 0L; - int invocations = 0; - long processingNanos = 0L; - long aggregateLineageMillis = 0L; - int flowFilesReceived = 0, flowFilesSent = 0; - long bytesReceived = 0L, bytesSent = 0L; - - final long second = sinceEpochMillis / 1000; - final int startBinIdx = (int) (second % numBins); - - for (int i = 0; i < numBins; i++) { - int binIdx = (startBinIdx + i) % numBins; - final EventSum sum = sums[binIdx]; - - final EventSumValue sumValue = sum.getValue(); - if (sumValue.getTimestamp() >= sinceEpochMillis) { - flowFilesIn += sumValue.getFlowFilesIn(); - flowFilesOut += sumValue.getFlowFilesOut(); - flowFilesRemoved += sumValue.getFlowFilesRemoved(); - contentSizeIn += sumValue.getContentSizeIn(); - contentSizeOut += sumValue.getContentSizeOut(); - contentSizeRemoved += sumValue.getContentSizeRemoved(); - bytesRead += sumValue.getBytesRead(); - bytesWritten += sumValue.getBytesWritten(); - flowFilesReceived += sumValue.getFlowFilesReceived(); - bytesReceived += sumValue.getBytesReceived(); - flowFilesSent += sumValue.getFlowFilesSent(); - bytesSent += sumValue.getBytesSent(); - invocations += sumValue.getInvocations(); - processingNanos += sumValue.getProcessingNanoseconds(); - aggregateLineageMillis += sumValue.getAggregateLineageMillis(); - } - } - - return new StandardFlowFileEvent(consumerId, flowFilesIn, contentSizeIn, - flowFilesOut, contentSizeOut, flowFilesRemoved, contentSizeRemoved, - bytesRead, bytesWritten, flowFilesReceived, bytesReceived, flowFilesSent, bytesSent, - invocations, aggregateLineageMillis, processingNanos); - } - } -} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index d2a6af67bf..6393014bcb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -57,6 +57,7 @@ import org.apache.nifi.controller.repository.io.DisableOnCloseOutputStream; import org.apache.nifi.controller.repository.io.FlowFileAccessInputStream; import org.apache.nifi.controller.repository.io.FlowFileAccessOutputStream; import org.apache.nifi.controller.repository.io.LimitedInputStream; +import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.FlowFileFilter; @@ -108,7 +109,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE private final Map records = new HashMap<>(); private final Map connectionCounts = new HashMap<>(); private final Map> unacknowledgedFlowFiles = new HashMap<>(); - private final Map counters = new HashMap<>(); private final Map appendableStreams = new HashMap<>(); private final ProcessContext context; private final Map readRecursionSet = new HashMap<>();// set used to track what is currently being operated on to prevent logic failures if recursive calls occurring @@ -117,6 +117,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE private final long sessionId; private final String connectableDescription; + private Map countersOnCommit; + private Map immediateCounters; + private final Set removedFlowFiles = new HashSet<>(); private final Set createdFlowFiles = new HashSet<>(); @@ -438,8 +441,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } } - for (final Map.Entry entry : checkpoint.counters.entrySet()) { - adjustCounter(entry.getKey(), entry.getValue(), true); + for (final Map.Entry entry : checkpoint.countersOnCommit.entrySet()) { + context.adjustCounter(entry.getKey(), entry.getValue()); } acknowledgeRecords(); @@ -533,6 +536,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } flowFileEvent.setAggregateLineageMillis(lineageMillis); + final Map counters = combineCounters(checkpoint.countersOnCommit, checkpoint.immediateCounters); + flowFileEvent.setCounters(counters); + context.getFlowFileEventRepository().updateRepository(flowFileEvent); for (final FlowFileEvent connectionEvent : checkpoint.connectionCounts.values()) { @@ -543,6 +549,23 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } } + private Map combineCounters(final Map first, final Map second) { + if (first == null && second == null) { + return null; + } + if (first == null) { + return second; + } + if (second == null) { + return first; + } + + final Map combined = new HashMap<>(); + combined.putAll(first); + combined.putAll(second); + return combined; + } + private void addEventType(final Map> map, final String id, final ProvenanceEventType eventType) { Set eventTypes = map.get(id); if (eventTypes == null) { @@ -1013,6 +1036,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final StandardFlowFileEvent flowFileEvent = new StandardFlowFileEvent(connectable.getIdentifier()); flowFileEvent.setBytesRead(bytesRead); flowFileEvent.setBytesWritten(bytesWritten); + flowFileEvent.setCounters(immediateCounters); // update event repository try { @@ -1106,7 +1130,12 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE connectionCounts.clear(); createdFlowFiles.clear(); removedFlowFiles.clear(); - counters.clear(); + if (countersOnCommit != null) { + countersOnCommit.clear(); + } + if (immediateCounters != null) { + immediateCounters.clear(); + } generatedProvenanceEvents.clear(); forkEventBuilders.clear(); @@ -1441,12 +1470,24 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public void adjustCounter(final String name, final long delta, final boolean immediate) { + final Map counters; if (immediate) { - context.adjustCounter(name, delta); - return; + if (immediateCounters == null) { + immediateCounters = new HashMap<>(); + } + counters = immediateCounters; + } else { + if (countersOnCommit == null) { + countersOnCommit = new HashMap<>(); + } + counters = countersOnCommit; } adjustCounter(name, delta, counters); + + if (immediate) { + context.adjustCounter(name, delta); + } } private void adjustCounter(final String name, final long delta, final Map map) { @@ -3216,7 +3257,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE private final Map records = new HashMap<>(); private final Map connectionCounts = new HashMap<>(); private final Map> unacknowledgedFlowFiles = new HashMap<>(); - private final Map counters = new HashMap<>(); + + private Map countersOnCommit = new HashMap<>(); + private Map immediateCounters = new HashMap<>(); private final Map deleteOnCommit = new HashMap<>(); private final Set removedFlowFiles = new HashSet<>(); @@ -3242,7 +3285,22 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE this.records.putAll(session.records); this.connectionCounts.putAll(session.connectionCounts); this.unacknowledgedFlowFiles.putAll(session.unacknowledgedFlowFiles); - this.counters.putAll(session.counters); + + if (session.countersOnCommit != null) { + if (this.countersOnCommit == null) { + this.countersOnCommit = new HashMap<>(); + } + + this.countersOnCommit.putAll(session.countersOnCommit); + } + + if (session.immediateCounters != null) { + if (this.immediateCounters == null) { + this.immediateCounters = new HashMap<>(); + } + + this.immediateCounters.putAll(session.immediateCounters); + } this.deleteOnCommit.putAll(session.deleteOnCommit); this.removedFlowFiles.addAll(session.removedFlowFiles); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventContainer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventContainer.java new file mode 100644 index 0000000000..9dd3c8ef06 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventContainer.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository.metrics; + +import org.apache.nifi.controller.repository.FlowFileEvent; + +public interface EventContainer { + public void addEvent(FlowFileEvent event); + + public void purgeEvents(long cutoffEpochMillis); + + public FlowFileEvent generateReport(String componentId, long sinceEpochMillis); +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSum.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSum.java new file mode 100644 index 0000000000..b1c9120392 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSum.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository.metrics; + +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.nifi.controller.repository.FlowFileEvent; + +public class EventSum { + + private final AtomicReference ref = new AtomicReference<>(); + + public EventSumValue getValue() { + final EventSumValue value = ref.get(); + return value == null ? new EventSumValue() : value; + } + + public void addOrReset(final FlowFileEvent event) { + final long expectedMinute = System.currentTimeMillis() / 60000; + + EventSumValue curValue; + while (true) { + curValue = ref.get(); + if (curValue == null || curValue.getMinuteTimestamp() != expectedMinute) { + final EventSumValue newValue = new EventSumValue(); + final boolean replaced = ref.compareAndSet(curValue, newValue); + if (replaced) { + curValue = newValue; + break; + } + } else { + break; + } + } + + curValue.add(event); + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java new file mode 100644 index 0000000000..3306e2b9cd --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository.metrics; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.nifi.controller.repository.FlowFileEvent; + +public class EventSumValue { + + private final AtomicInteger flowFilesIn = new AtomicInteger(0); + private final AtomicInteger flowFilesOut = new AtomicInteger(0); + private final AtomicInteger flowFilesRemoved = new AtomicInteger(0); + private final AtomicInteger flowFilesReceived = new AtomicInteger(0); + private final AtomicInteger flowFilesSent = new AtomicInteger(0); + + private final AtomicLong contentSizeIn = new AtomicLong(0L); + private final AtomicLong contentSizeOut = new AtomicLong(0L); + private final AtomicLong contentSizeRemoved = new AtomicLong(0L); + private final AtomicLong bytesRead = new AtomicLong(0L); + private final AtomicLong bytesWritten = new AtomicLong(0L); + + private final AtomicLong bytesReceived = new AtomicLong(0L); + private final AtomicLong bytesSent = new AtomicLong(0L); + private final AtomicLong processingNanos = new AtomicLong(0L); + private final AtomicLong aggregateLineageMillis = new AtomicLong(0L); + private final AtomicInteger invocations = new AtomicInteger(0); + private final ConcurrentMap counters = new ConcurrentHashMap<>(); + + private final long minuteTimestamp; + private final long millisecondTimestamp; + + + public EventSumValue() { + this.millisecondTimestamp = System.currentTimeMillis(); + this.minuteTimestamp = millisecondTimestamp / 60000; + } + + public void add(final FlowFileEvent flowFileEvent) { + this.aggregateLineageMillis.addAndGet(flowFileEvent.getAggregateLineageMillis()); + this.bytesRead.addAndGet(flowFileEvent.getBytesRead()); + this.bytesReceived.addAndGet(flowFileEvent.getBytesReceived()); + this.bytesSent.addAndGet(flowFileEvent.getBytesSent()); + this.bytesWritten.addAndGet(flowFileEvent.getBytesWritten()); + this.contentSizeIn.addAndGet(flowFileEvent.getContentSizeIn()); + this.contentSizeOut.addAndGet(flowFileEvent.getContentSizeOut()); + this.contentSizeRemoved.addAndGet(flowFileEvent.getContentSizeRemoved()); + this.flowFilesIn.addAndGet(flowFileEvent.getFlowFilesIn()); + this.flowFilesOut.addAndGet(flowFileEvent.getFlowFilesOut()); + this.flowFilesReceived.addAndGet(flowFileEvent.getFlowFilesReceived()); + this.flowFilesRemoved.addAndGet(flowFileEvent.getFlowFilesRemoved()); + this.flowFilesSent.addAndGet(flowFileEvent.getFlowFilesSent()); + this.invocations.addAndGet(flowFileEvent.getInvocations()); + this.processingNanos.addAndGet(flowFileEvent.getProcessingNanoseconds()); + + final Map eventCounters = flowFileEvent.getCounters(); + if (eventCounters != null) { + for (final Map.Entry entry : eventCounters.entrySet()) { + final String counterName = entry.getKey(); + final Long counterValue = entry.getValue(); + + counters.compute(counterName, (key, value) -> value == null ? counterValue : value + counterValue); + } + } + } + + public FlowFileEvent toFlowFileEvent(final String componentId) { + final StandardFlowFileEvent event = new StandardFlowFileEvent(componentId); + event.setAggregateLineageMillis(getAggregateLineageMillis()); + event.setBytesRead(getBytesRead()); + event.setBytesReceived(getBytesReceived()); + event.setBytesSent(getBytesSent()); + event.setBytesWritten(getBytesWritten()); + event.setContentSizeIn(getContentSizeIn()); + event.setContentSizeOut(getContentSizeOut()); + event.setContentSizeRemoved(getContentSizeRemoved()); + event.setFlowFilesIn(getFlowFilesIn()); + event.setFlowFilesOut(getFlowFilesOut()); + event.setFlowFilesReceived(getFlowFilesReceived()); + event.setFlowFilesRemoved(getFlowFilesRemoved()); + event.setFlowFilesSent(getFlowFilesSent()); + event.setInvocations(getInvocations()); + event.setProcessingNanos(getProcessingNanoseconds()); + event.setCounters(Collections.unmodifiableMap(this.counters)); + return event; + } + + public void add(final EventSumValue other) { + this.aggregateLineageMillis.addAndGet(other.getAggregateLineageMillis()); + this.bytesRead.addAndGet(other.getBytesRead()); + this.bytesReceived.addAndGet(other.getBytesReceived()); + this.bytesSent.addAndGet(other.getBytesSent()); + this.bytesWritten.addAndGet(other.getBytesWritten()); + this.contentSizeIn.addAndGet(other.getContentSizeIn()); + this.contentSizeOut.addAndGet(other.getContentSizeOut()); + this.contentSizeRemoved.addAndGet(other.getContentSizeRemoved()); + this.flowFilesIn.addAndGet(other.getFlowFilesIn()); + this.flowFilesOut.addAndGet(other.getFlowFilesOut()); + this.flowFilesReceived.addAndGet(other.getFlowFilesReceived()); + this.flowFilesRemoved.addAndGet(other.getFlowFilesRemoved()); + this.flowFilesSent.addAndGet(other.getFlowFilesSent()); + this.invocations.addAndGet(other.getInvocations()); + this.processingNanos.addAndGet(other.getProcessingNanoseconds()); + + final Map eventCounters = other.getCounters(); + if (eventCounters != null) { + for (final Map.Entry entry : eventCounters.entrySet()) { + final String counterName = entry.getKey(); + final Long counterValue = entry.getValue(); + + counters.compute(counterName, (key, value) -> value == null ? counterValue : value + counterValue); + } + } + } + + public long getTimestamp() { + return millisecondTimestamp; + } + + public long getMinuteTimestamp() { + return minuteTimestamp; + } + + public long getBytesRead() { + return bytesRead.get(); + } + + public long getBytesWritten() { + return bytesWritten.get(); + } + + public int getFlowFilesIn() { + return flowFilesIn.get(); + } + + public int getFlowFilesOut() { + return flowFilesOut.get(); + } + + public long getContentSizeIn() { + return contentSizeIn.get(); + } + + public long getContentSizeOut() { + return contentSizeOut.get(); + } + + public int getFlowFilesRemoved() { + return flowFilesRemoved.get(); + } + + public long getContentSizeRemoved() { + return contentSizeRemoved.get(); + } + + public long getProcessingNanoseconds() { + return processingNanos.get(); + } + + public int getInvocations() { + return invocations.get(); + } + + public long getAggregateLineageMillis() { + return aggregateLineageMillis.get(); + } + + public int getFlowFilesReceived() { + return flowFilesReceived.get(); + } + + public int getFlowFilesSent() { + return flowFilesSent.get(); + } + + public long getBytesReceived() { + return bytesReceived.get(); + } + + public long getBytesSent() { + return bytesSent.get(); + } + + public Map getCounters() { + return counters; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/RingBufferEventRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/RingBufferEventRepository.java new file mode 100644 index 0000000000..b9a82edf8a --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/RingBufferEventRepository.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.repository.metrics; + +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.nifi.controller.repository.FlowFileEvent; +import org.apache.nifi.controller.repository.FlowFileEventRepository; +import org.apache.nifi.controller.repository.StandardRepositoryStatusReport; + +public class RingBufferEventRepository implements FlowFileEventRepository { + + private final int numMinutes; + private final ConcurrentMap componentEventMap = new ConcurrentHashMap<>(); + + public RingBufferEventRepository(final int numMinutes) { + this.numMinutes = numMinutes; + } + + @Override + public void close() throws IOException { + } + + @Override + public void updateRepository(final FlowFileEvent event) { + final String componentId = event.getComponentIdentifier(); + final EventContainer eventContainer = componentEventMap.computeIfAbsent(componentId, id -> new SecondPrecisionEventContainer(numMinutes)); + eventContainer.addEvent(event); + } + + @Override + public StandardRepositoryStatusReport reportTransferEvents(final long sinceEpochMillis) { + final StandardRepositoryStatusReport report = new StandardRepositoryStatusReport(); + + componentEventMap.entrySet().stream() + .map(entry -> entry.getValue().generateReport(entry.getKey(), sinceEpochMillis)) + .forEach(event -> report.addReportEntry(event)); + + return report; + } + + @Override + public void purgeTransferEvents(final long cutoffEpochMilliseconds) { + // This is done so that if a processor is removed from the graph, its events + // will be removed rather than being kept in memory + for (final EventContainer container : componentEventMap.values()) { + container.purgeEvents(cutoffEpochMilliseconds); + } + } + +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/SecondPrecisionEventContainer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/SecondPrecisionEventContainer.java new file mode 100644 index 0000000000..72a8cfc39a --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/SecondPrecisionEventContainer.java @@ -0,0 +1,69 @@ +/* +e * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository.metrics; + +import org.apache.nifi.controller.repository.FlowFileEvent; + +public class SecondPrecisionEventContainer implements EventContainer { + private final int numBins; + private final EventSum[] sums; + + public SecondPrecisionEventContainer(final int numMinutes) { + numBins = 1 + numMinutes * 60; + sums = new EventSum[numBins]; + + for (int i = 0; i < numBins; i++) { + sums[i] = new EventSum(); + } + } + + @Override + public void addEvent(final FlowFileEvent event) { + final int second = (int) (System.currentTimeMillis() / 1000); + final int binIdx = second % numBins; + final EventSum sum = sums[binIdx]; + + sum.addOrReset(event); + } + + @Override + public void purgeEvents(final long cutoffEpochMilliseconds) { + // no need to do anything + } + + @Override + public FlowFileEvent generateReport(final String componentId, final long sinceEpochMillis) { + final EventSumValue eventSumValue = new EventSumValue(); + final long second = sinceEpochMillis / 1000; + final int startBinIdx = (int) (second % numBins); + + for (int i = 0; i < numBins; i++) { + int binIdx = (startBinIdx + i) % numBins; + final EventSum sum = sums[binIdx]; + + final EventSumValue sumValue = sum.getValue(); + if (sumValue.getTimestamp() >= sinceEpochMillis) { + eventSumValue.add(sumValue); + } + } + + final FlowFileEvent flowFileEvent = eventSumValue.toFlowFileEvent(componentId); + return flowFileEvent; + } + +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileEvent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/StandardFlowFileEvent.java similarity index 68% rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileEvent.java rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/StandardFlowFileEvent.java index d584735237..40ec983ca7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileEvent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/StandardFlowFileEvent.java @@ -14,7 +14,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.controller.repository; +package org.apache.nifi.controller.repository.metrics; + +import java.util.Map; + +import org.apache.nifi.controller.repository.FlowFileEvent; public final class StandardFlowFileEvent implements FlowFileEvent, Cloneable { @@ -35,56 +39,12 @@ public final class StandardFlowFileEvent implements FlowFileEvent, Cloneable { private int flowFilesSent; private long bytesSent; private int invocations; + private Map counters; public StandardFlowFileEvent(final String componentId) { this.componentId = componentId; } - public StandardFlowFileEvent(final String componentId, - final int flowFilesIn, final long contentSizeIn, - final int flowFilesOut, final long contentSizeOut, - final int flowFilesRemoved, final long contentSizeRemoved, - final long bytesRead, final long bytesWritten, - final int flowFilesReceived, final long bytesReceived, - final int flowFilesSent, final long bytesSent, - final int invocations, final long averageLineageMillis, final long processingNanos) { - this.componentId = componentId; - this.flowFilesIn = flowFilesIn; - this.contentSizeIn = contentSizeIn; - this.flowFilesOut = flowFilesOut; - this.contentSizeOut = contentSizeOut; - this.flowFilesRemoved = flowFilesRemoved; - this.contentSizeRemoved = contentSizeRemoved; - this.bytesRead = bytesRead; - this.bytesWritten = bytesWritten; - this.invocations = invocations; - this.flowFilesReceived = flowFilesReceived; - this.bytesReceived = bytesReceived; - this.flowFilesSent = flowFilesSent; - this.bytesSent = bytesSent; - this.aggregateLineageMillis = averageLineageMillis; - this.processingNanos = processingNanos; - } - - public StandardFlowFileEvent(final FlowFileEvent other) { - this.componentId = other.getComponentIdentifier(); - this.flowFilesIn = other.getFlowFilesIn(); - this.contentSizeIn = other.getContentSizeIn(); - this.flowFilesOut = other.getFlowFilesOut(); - this.contentSizeOut = other.getContentSizeOut(); - this.flowFilesRemoved = other.getFlowFilesRemoved(); - this.contentSizeRemoved = other.getContentSizeRemoved(); - this.bytesRead = other.getBytesRead(); - this.bytesWritten = other.getBytesWritten(); - this.invocations = other.getInvocations(); - this.flowFilesReceived = other.getFlowFilesReceived(); - this.bytesReceived = other.getBytesReceived(); - this.flowFilesSent = other.getFlowFilesSent(); - this.bytesSent = other.getBytesSent(); - this.aggregateLineageMillis = other.getAggregateLineageMillis(); - this.processingNanos = other.getProcessingNanoseconds(); - } - @Override public String getComponentIdentifier() { return componentId; @@ -234,4 +194,12 @@ public final class StandardFlowFileEvent implements FlowFileEvent, Cloneable { return aggregateLineageMillis; } + @Override + public Map getCounters() { + return counters; + } + + public void setCounters(final Map counters) { + this.counters = counters; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java index 22684a6d88..0af8657ba2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java @@ -33,9 +33,9 @@ import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.repository.BatchingSessionFactory; import org.apache.nifi.controller.repository.ProcessContext; -import org.apache.nifi.controller.repository.StandardFlowFileEvent; import org.apache.nifi.controller.repository.StandardProcessSession; import org.apache.nifi.controller.repository.StandardProcessSessionFactory; +import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent; import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.engine.FlowEngine; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java index 756e5760c4..a1265893d4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; @@ -37,11 +38,25 @@ public class StatusHistoryUtil { final Set> metricDescriptors = new LinkedHashSet<>(); final LinkedHashMap componentDetails = new LinkedHashMap<>(statusHistory.getComponentDetails()); + final Set metricNames = new HashSet<>(); for (final StatusSnapshot snapshot : statusHistory.getStatusSnapshots()) { - snapshotDtos.add(StatusHistoryUtil.createStatusSnapshotDto(snapshot)); + final StatusSnapshotDTO snapshotDto = StatusHistoryUtil.createStatusSnapshotDto(snapshot); + snapshotDtos.add(snapshotDto); + metricNames.addAll(snapshotDto.getStatusMetrics().keySet()); metricDescriptors.addAll(snapshot.getStatusMetrics().keySet()); } + // We need to ensure that the 'aggregate snapshot' has an entry for every metric, including counters. + // So for any metric that has is not in the aggregate snapshot, add it with a value of 0 + for (final StatusSnapshotDTO snapshotDto : snapshotDtos) { + final Map metrics = snapshotDto.getStatusMetrics(); + for (final String metricName : metricNames) { + if (!metrics.containsKey(metricName)) { + metrics.put(metricName, 0L); + } + } + } + final StatusHistoryDTO dto = new StatusHistoryDTO(); dto.setGenerated(new Date()); dto.setComponentDetails(componentDetails); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java index 320397267b..50a5123bd9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java @@ -20,6 +20,7 @@ import org.apache.nifi.controller.status.ConnectionStatus; import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.controller.status.ProcessorStatus; import org.apache.nifi.controller.status.RemoteProcessGroupStatus; +import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter; import org.apache.nifi.util.ComponentStatusReport; import org.apache.nifi.util.ComponentStatusReport.ComponentType; import org.apache.nifi.util.NiFiProperties; @@ -29,6 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Date; +import java.util.Map; public class VolatileComponentStatusRepository implements ComponentStatusRepository { @@ -72,7 +74,7 @@ public class VolatileComponentStatusRepository implements ComponentStatusReposit } @Override - public StatusHistory getProcessorStatusHistory(final String processorId, final Date start, final Date end, final int preferredDataPoints) { + public StatusHistory getProcessorStatusHistory(final String processorId, final Date start, final Date end, final int preferredDataPoints, final boolean includeCounters) { final StandardStatusHistory history = new StandardStatusHistory(); history.setComponentDetail(COMPONENT_DETAIL_ID, processorId); @@ -98,6 +100,21 @@ public class VolatileComponentStatusRepository implements ComponentStatusReposit } } + if (includeCounters) { + final Map counters = status.getCounters(); + if (counters != null) { + for (final Map.Entry entry : counters.entrySet()) { + final String counterName = entry.getKey(); + + final String label = entry.getKey() + " (5 mins)"; + final MetricDescriptor metricDescriptor = new StandardMetricDescriptor<>(entry.getKey(), label, label, Formatter.COUNT, + s -> s.getCounters() == null ? null : s.getCounters().get(counterName)); + + snapshot.addStatusMetric(metricDescriptor, entry.getValue()); + } + } + } + history.addStatusSnapshot(snapshot); return true; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java index f2a7eee13b..8c2707860a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java @@ -25,9 +25,9 @@ import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.repository.BatchingSessionFactory; import org.apache.nifi.controller.repository.ProcessContext; -import org.apache.nifi.controller.repository.StandardFlowFileEvent; import org.apache.nifi.controller.repository.StandardProcessSession; import org.apache.nifi.controller.repository.StandardProcessSessionFactory; +import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent; import org.apache.nifi.controller.scheduling.ProcessContextFactory; import org.apache.nifi.controller.scheduling.ScheduleState; import org.apache.nifi.controller.scheduling.SchedulingAgent; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/RingBufferEventRepositoryBean.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/RingBufferEventRepositoryBean.java index f861c7abc6..c253a2cd41 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/RingBufferEventRepositoryBean.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/RingBufferEventRepositoryBean.java @@ -16,8 +16,7 @@ */ package org.apache.nifi.spring; -import org.apache.nifi.controller.repository.RingBufferEventRepository; - +import org.apache.nifi.controller.repository.metrics.RingBufferEventRepository; import org.springframework.beans.factory.FactoryBean; public class RingBufferEventRepositoryBean implements FactoryBean { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRingBufferEventRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRingBufferEventRepository.java index a160b9ea3a..cb5c3069c9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRingBufferEventRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRingBufferEventRepository.java @@ -16,10 +16,12 @@ */ package org.apache.nifi.controller.repository; -import org.apache.nifi.controller.repository.RingBufferEventRepository; import org.apache.nifi.controller.repository.StandardRepositoryStatusReport; +import org.apache.nifi.controller.repository.metrics.RingBufferEventRepository; import org.apache.nifi.controller.repository.FlowFileEvent; import java.io.IOException; +import java.util.Collections; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.junit.Test; @@ -133,6 +135,11 @@ public class TestRingBufferEventRepository { public long getBytesSent() { return 0; } + + @Override + public Map getCounters() { + return Collections.emptyMap(); + } }; } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java index 420022ff4d..66a50731f0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java @@ -289,10 +289,12 @@ public class ControllerFacade implements Authorizable { throw new ResourceNotFoundException(String.format("Unable to locate processor with id '%s'.", processorId)); } - final StatusHistoryDTO statusHistory = flowController.getProcessorStatusHistory(processorId); + final boolean authorized = processor.isAuthorized(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); + + final StatusHistoryDTO statusHistory = flowController.getProcessorStatusHistory(processorId, authorized); // if not authorized - if (!processor.isAuthorized(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser())) { + if (!authorized) { statusHistory.getComponentDetails().put(ComponentStatusRepository.COMPONENT_DETAIL_NAME, processorId); statusHistory.getComponentDetails().put(ComponentStatusRepository.COMPONENT_DETAIL_TYPE, "Processor"); }