NIFI-106:

- Expose processors' counters in Stats History
- Only include counters in Processors' Status History if user has read access to corresponding Processor
- Addressed review feedback. Found and addressed bug where a counter is not present in all of the aggregate snaphot values for status history, resulting in the UI not rendering the chart properly
- This closes #1872
This commit is contained in:
Mark Payne 2017-05-30 14:40:30 -04:00 committed by Matt Gilman
parent c54b2ad81c
commit 695e8aa98f
No known key found for this signature in database
GPG Key ID: DF61EC19432AEE37
22 changed files with 684 additions and 399 deletions

View File

@ -16,6 +16,8 @@
*/
package org.apache.nifi.controller.status;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
@ -42,6 +44,7 @@ public class ProcessorStatus implements Cloneable {
private long bytesReceived;
private int flowFilesSent;
private long bytesSent;
private Map<String, Long> counters;
public String getId() {
return id;
@ -211,6 +214,14 @@ public class ProcessorStatus implements Cloneable {
this.bytesSent = bytesSent;
}
public Map<String, Long> getCounters() {
return counters;
}
public void setCounters(final Map<String, Long> counters) {
this.counters = counters;
}
@Override
public ProcessorStatus clone() {
final ProcessorStatus clonedObj = new ProcessorStatus();
@ -234,6 +245,7 @@ public class ProcessorStatus implements Cloneable {
clonedObj.flowFilesRemoved = flowFilesRemoved;
clonedObj.runStatus = runStatus;
clonedObj.type = type;
clonedObj.counters = counters == null ? null : new HashMap<>(counters);
return clonedObj;
}
@ -268,6 +280,8 @@ public class ProcessorStatus implements Cloneable {
builder.append(processingNanos);
builder.append(", activeThreadCount=");
builder.append(activeThreadCount);
builder.append(", counters=");
builder.append(counters);
builder.append("]");
return builder.toString();
}

View File

@ -103,10 +103,12 @@ public interface ComponentStatusRepository {
* If the date range is large, the total number of data points could be far
* too many to process. Therefore, this parameter allows the requestor to
* indicate how many samples to return.
* @param includeCounters specifies whether or not metrics from Processor counters
* should be included in the StatusHistory.
* @return a {@link StatusHistory} that provides the status information
* about the Processor with the given ID during the given time period
*/
StatusHistory getProcessorStatusHistory(String processorId, Date start, Date end, int preferredDataPoints);
StatusHistory getProcessorStatusHistory(String processorId, Date start, Date end, int preferredDataPoints, boolean includeCounters);
/**
* @param remoteGroupId to get history of

View File

@ -41,4 +41,5 @@ public interface StatusHistory {
* @return List of snapshots for a given component
*/
List<StatusSnapshot> getStatusSnapshots();
}

View File

@ -17,6 +17,9 @@
package org.apache.nifi.web.api.dto.status;
import com.wordnik.swagger.annotations.ApiModelProperty;
import java.util.Objects;
import javax.xml.bind.annotation.XmlType;
/**
@ -102,4 +105,23 @@ public class StatusDescriptorDTO {
this.formatter = formatter;
}
@Override
public int hashCode() {
return 31 + 41 * (field == null ? 0 : field.hashCode());
}
@Override
public boolean equals(final Object obj) {
if (obj == this) {
return true;
}
if (obj == null) {
return false;
}
if (!(obj instanceof StatusDescriptorDTO)) {
return false;
}
final StatusDescriptorDTO other = (StatusDescriptorDTO) obj;
return Objects.equals(field, other.field);
}
}

View File

@ -16,31 +16,38 @@
*/
package org.apache.nifi.cluster.coordination.http.endpoints;
import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor;
import org.apache.nifi.controller.status.history.MetricDescriptor;
import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter;
import org.apache.nifi.controller.status.history.ProcessGroupStatusDescriptor;
import org.apache.nifi.controller.status.history.ProcessorStatusDescriptor;
import org.apache.nifi.controller.status.history.RemoteProcessGroupStatusDescriptor;
import org.apache.nifi.controller.status.history.StandardMetricDescriptor;
import org.apache.nifi.controller.status.history.StandardStatusSnapshot;
import org.apache.nifi.controller.status.history.StatusHistoryUtil;
import org.apache.nifi.controller.status.history.StatusSnapshot;
import org.apache.nifi.controller.status.history.ValueMapper;
import org.apache.nifi.web.api.dto.status.NodeStatusSnapshotsDTO;
import org.apache.nifi.web.api.dto.status.StatusDescriptorDTO;
import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO;
import org.apache.nifi.web.api.entity.StatusHistoryEntity;
import java.net.URI;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.regex.Pattern;
import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor;
import org.apache.nifi.controller.status.history.MetricDescriptor;
import org.apache.nifi.controller.status.history.ProcessGroupStatusDescriptor;
import org.apache.nifi.controller.status.history.ProcessorStatusDescriptor;
import org.apache.nifi.controller.status.history.RemoteProcessGroupStatusDescriptor;
import org.apache.nifi.controller.status.history.StandardStatusSnapshot;
import org.apache.nifi.controller.status.history.StatusHistoryUtil;
import org.apache.nifi.controller.status.history.StatusSnapshot;
import org.apache.nifi.web.api.dto.status.NodeStatusSnapshotsDTO;
import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO;
import org.apache.nifi.web.api.entity.StatusHistoryEntity;
public class StatusHistoryEndpointMerger implements EndpointResponseMerger {
@ -55,7 +62,7 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger {
this.componentStatusSnapshotMillis = componentStatusSnapshotMillis;
}
private Map<String, MetricDescriptor<?>> getMetricDescriptors(final URI uri) {
private Map<String, MetricDescriptor<?>> getStandardMetricDescriptors(final URI uri) {
final String path = uri.getPath();
final Map<String, MetricDescriptor<?>> metricDescriptors = new HashMap<>();
@ -87,16 +94,19 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger {
return false;
}
final Map<String, MetricDescriptor<?>> descriptors = getMetricDescriptors(uri);
final Map<String, MetricDescriptor<?>> descriptors = getStandardMetricDescriptors(uri);
return descriptors != null && !descriptors.isEmpty();
}
@Override
public NodeResponse merge(URI uri, String method, Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses, NodeResponse clientResponse) {
final Map<String, MetricDescriptor<?>> metricDescriptors = getMetricDescriptors(uri);
final Map<String, MetricDescriptor<?>> metricDescriptors = getStandardMetricDescriptors(uri);
final StatusHistoryEntity responseEntity = clientResponse.getClientResponse().getEntity(StatusHistoryEntity.class);
final Set<StatusDescriptorDTO> fieldDescriptors = new LinkedHashSet<>();
boolean includeCounters = true;
StatusHistoryDTO lastStatusHistory = null;
final List<NodeStatusSnapshotsDTO> nodeStatusSnapshots = new ArrayList<>(successfulResponses.size());
LinkedHashMap<String, String> noReadPermissionsComponentDetails = null;
@ -109,6 +119,10 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger {
noReadPermissionsComponentDetails = nodeStatus.getComponentDetails();
}
if (!Boolean.TRUE.equals(nodeResponseEntity.getCanRead())) {
includeCounters = false;
}
final NodeIdentifier nodeId = nodeResponse.getNodeId();
final NodeStatusSnapshotsDTO nodeStatusSnapshot = new NodeStatusSnapshotsDTO();
nodeStatusSnapshot.setNodeId(nodeId.getId());
@ -116,6 +130,38 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger {
nodeStatusSnapshot.setApiPort(nodeId.getApiPort());
nodeStatusSnapshot.setStatusSnapshots(nodeStatus.getAggregateSnapshots());
nodeStatusSnapshots.add(nodeStatusSnapshot);
final List<StatusDescriptorDTO> descriptors = nodeStatus.getFieldDescriptors();
if (descriptors != null) {
fieldDescriptors.addAll(descriptors);
}
}
// If there's a status descriptor that is in the fieldDescriptors, but is not in the standard metric descriptors that we find,
// then it is a counter metric and should be included only if all StatusHistoryDTO's include counter metrics. This is done because
// we include counters in the status history only if the user is authorized to read the Processor. Since it's possible for the nodes
// to disagree about who is authorized (if, for example, the authorizer is asynchronously updated), then if any node indicates that
// the user is not authorized, we want to assume that the user is, in fact, not authorized.
if (includeCounters) {
for (final StatusDescriptorDTO descriptorDto : fieldDescriptors) {
final String fieldName = descriptorDto.getField();
if (!metricDescriptors.containsKey(fieldName)) {
final ValueMapper<ProcessorStatus> valueMapper = s -> {
final Map<String, Long> counters = s.getCounters();
if (counters == null) {
return 0L;
}
return counters.getOrDefault(descriptorDto.getField(), 0L);
};
final MetricDescriptor<ProcessorStatus> metricDescriptor = new StandardMetricDescriptor<>(descriptorDto.getField(),
descriptorDto.getLabel(), descriptorDto.getDescription(), Formatter.COUNT, valueMapper);
metricDescriptors.put(fieldName, metricDescriptor);
}
}
}
final StatusHistoryDTO clusterStatusHistory = new StatusHistoryDTO();
@ -124,8 +170,8 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger {
clusterStatusHistory.setNodeSnapshots(nodeStatusSnapshots);
if (lastStatusHistory != null) {
clusterStatusHistory.setComponentDetails(noReadPermissionsComponentDetails == null ? lastStatusHistory.getComponentDetails() : noReadPermissionsComponentDetails);
clusterStatusHistory.setFieldDescriptors(lastStatusHistory.getFieldDescriptors());
}
clusterStatusHistory.setFieldDescriptors(new ArrayList<>(fieldDescriptors));
final StatusHistoryEntity clusterEntity = new StatusHistoryEntity();
clusterEntity.setStatusHistory(clusterStatusHistory);
@ -177,6 +223,19 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger {
final StandardStatusSnapshot snapshot = new StandardStatusSnapshot();
snapshot.setTimestamp(snapshotDto.getTimestamp());
// Default all metrics to 0 so that if a counter has not yet been registered, it will have a value of 0 instead
// of missing all together.
for (final MetricDescriptor<?> descriptor : metricDescriptors.values()) {
snapshot.addStatusMetric(descriptor, 0L);
// If the DTO doesn't have an entry for the metric, add with a value of 0.
final Map<String, Long> dtoMetrics = snapshotDto.getStatusMetrics();
final String field = descriptor.getField();
if (!dtoMetrics.containsKey(field)) {
dtoMetrics.put(field, 0L);
}
}
final Map<String, Long> metrics = snapshotDto.getStatusMetrics();
for (final Map.Entry<String, Long> entry : metrics.entrySet()) {
final String metricId = entry.getKey();

View File

@ -16,6 +16,8 @@
*/
package org.apache.nifi.controller.repository;
import java.util.Map;
public interface FlowFileEvent {
String getComponentIdentifier();
@ -51,4 +53,7 @@ public interface FlowFileEvent {
long getBytesSent();
int getInvocations();
Map<String, Long> getCounters();
}

View File

@ -2943,6 +2943,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
status.setFlowFilesSent(entry.getFlowFilesSent());
status.setBytesSent(entry.getBytesSent());
status.setFlowFilesRemoved(entry.getFlowFilesRemoved());
if (isProcessorAuthorized) {
status.setCounters(entry.getCounters());
}
}
// Determine the run status and get any validation error... only validating while STOPPED
@ -4482,12 +4486,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
return StatusHistoryUtil.createStatusHistoryDTO(componentStatusRepository.getConnectionStatusHistory(connectionId, startTime, endTime, preferredDataPoints));
}
public StatusHistoryDTO getProcessorStatusHistory(final String processorId) {
return getProcessorStatusHistory(processorId, null, null, Integer.MAX_VALUE);
public StatusHistoryDTO getProcessorStatusHistory(final String processorId, final boolean includeCounters) {
return getProcessorStatusHistory(processorId, null, null, Integer.MAX_VALUE, includeCounters);
}
public StatusHistoryDTO getProcessorStatusHistory(final String processorId, final Date startTime, final Date endTime, final int preferredDataPoints) {
return StatusHistoryUtil.createStatusHistoryDTO(componentStatusRepository.getProcessorStatusHistory(processorId, startTime, endTime, preferredDataPoints));
public StatusHistoryDTO getProcessorStatusHistory(final String processorId, final Date startTime, final Date endTime, final int preferredDataPoints, final boolean includeCounters) {
return StatusHistoryUtil.createStatusHistoryDTO(componentStatusRepository.getProcessorStatusHistory(processorId, startTime, endTime, preferredDataPoints, includeCounters));
}
public StatusHistoryDTO getProcessGroupStatusHistory(final String processGroupId) {

View File

@ -1,312 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.repository;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
public class RingBufferEventRepository implements FlowFileEventRepository {
private final int numMinutes;
private final ConcurrentMap<String, EventContainer> componentEventMap = new ConcurrentHashMap<>();
public RingBufferEventRepository(final int numMinutes) {
this.numMinutes = numMinutes;
}
@Override
public void close() throws IOException {
}
@Override
public void updateRepository(final FlowFileEvent event) {
final String componentId = event.getComponentIdentifier();
EventContainer eventContainer = componentEventMap.get(componentId);
if (eventContainer == null) {
eventContainer = new SecondPrecisionEventContainer(numMinutes);
final EventContainer oldEventContainer = componentEventMap.putIfAbsent(componentId, eventContainer);
if (oldEventContainer != null) {
eventContainer = oldEventContainer;
}
}
eventContainer.addEvent(event);
}
@Override
public StandardRepositoryStatusReport reportTransferEvents(final long sinceEpochMillis) {
final StandardRepositoryStatusReport report = new StandardRepositoryStatusReport();
for (final Map.Entry<String, EventContainer> entry : componentEventMap.entrySet()) {
final String consumerId = entry.getKey();
final EventContainer container = entry.getValue();
final FlowFileEvent reportEntry = container.generateReport(consumerId, sinceEpochMillis);
report.addReportEntry(reportEntry);
}
return report;
}
@Override
public void purgeTransferEvents(final long cutoffEpochMilliseconds) {
// This is done so that if a processor is removed from the graph, its events
// will be removed rather than being kept in memory
for (final EventContainer container : componentEventMap.values()) {
container.purgeEvents(cutoffEpochMilliseconds);
}
}
private static interface EventContainer {
public void addEvent(FlowFileEvent event);
public void purgeEvents(long cutoffEpochMillis);
public FlowFileEvent generateReport(String consumerId, long sinceEpochMillis);
}
private class EventSum {
private final AtomicReference<EventSumValue> ref = new AtomicReference<>(new EventSumValue());
private void add(final FlowFileEvent event) {
EventSumValue newValue;
EventSumValue value;
do {
value = ref.get();
newValue = new EventSumValue(value,
event.getFlowFilesIn(), event.getFlowFilesOut(), event.getFlowFilesRemoved(),
event.getContentSizeIn(), event.getContentSizeOut(), event.getContentSizeRemoved(),
event.getBytesRead(), event.getBytesWritten(),
event.getFlowFilesReceived(), event.getBytesReceived(),
event.getFlowFilesSent(), event.getBytesSent(),
event.getProcessingNanoseconds(), event.getInvocations(), event.getAggregateLineageMillis());
} while (!ref.compareAndSet(value, newValue));
}
public EventSumValue getValue() {
return ref.get();
}
public void addOrReset(final FlowFileEvent event) {
final long expectedMinute = System.currentTimeMillis() / 60000;
final EventSumValue curValue = ref.get();
if (curValue.getMinuteTimestamp() != expectedMinute) {
ref.compareAndSet(curValue, new EventSumValue());
}
add(event);
}
}
private static class EventSumValue {
private final int flowFilesIn, flowFilesOut, flowFilesRemoved;
private final long contentSizeIn, contentSizeOut, contentSizeRemoved;
private final long bytesRead, bytesWritten;
private final int flowFilesReceived, flowFilesSent;
private final long bytesReceived, bytesSent;
private final long processingNanos;
private final long aggregateLineageMillis;
private final int invocations;
private final long minuteTimestamp;
private final long millisecondTimestamp;
public EventSumValue() {
flowFilesIn = flowFilesOut = flowFilesRemoved = 0;
contentSizeIn = contentSizeOut = contentSizeRemoved = 0;
bytesRead = bytesWritten = 0;
flowFilesReceived = flowFilesSent = 0;
bytesReceived = bytesSent = 0L;
processingNanos = invocations = 0;
aggregateLineageMillis = 0L;
this.millisecondTimestamp = System.currentTimeMillis();
this.minuteTimestamp = millisecondTimestamp / 60000;
}
public EventSumValue(final EventSumValue base, final int flowFilesIn, final int flowFilesOut, final int flowFilesRemoved,
final long contentSizeIn, final long contentSizeOut, final long contentSizeRemoved,
final long bytesRead, final long bytesWritten,
final int flowFilesReceived, final long bytesReceived,
final int flowFilesSent, final long bytesSent,
final long processingNanos, final int invocations, final long aggregateLineageMillis) {
this.flowFilesIn = base.flowFilesIn + flowFilesIn;
this.flowFilesOut = base.flowFilesOut + flowFilesOut;
this.flowFilesRemoved = base.flowFilesRemoved + flowFilesRemoved;
this.contentSizeIn = base.contentSizeIn + contentSizeIn;
this.contentSizeOut = base.contentSizeOut + contentSizeOut;
this.contentSizeRemoved = base.contentSizeRemoved + contentSizeRemoved;
this.bytesRead = base.bytesRead + bytesRead;
this.bytesWritten = base.bytesWritten + bytesWritten;
this.flowFilesReceived = base.flowFilesReceived + flowFilesReceived;
this.bytesReceived = base.bytesReceived + bytesReceived;
this.flowFilesSent = base.flowFilesSent + flowFilesSent;
this.bytesSent = base.bytesSent + bytesSent;
this.processingNanos = base.processingNanos + processingNanos;
this.invocations = base.invocations + invocations;
this.aggregateLineageMillis = base.aggregateLineageMillis + aggregateLineageMillis;
this.millisecondTimestamp = System.currentTimeMillis();
this.minuteTimestamp = millisecondTimestamp / 60000;
}
public long getTimestamp() {
return millisecondTimestamp;
}
public long getMinuteTimestamp() {
return minuteTimestamp;
}
public long getBytesRead() {
return bytesRead;
}
public long getBytesWritten() {
return bytesWritten;
}
public int getFlowFilesIn() {
return flowFilesIn;
}
public int getFlowFilesOut() {
return flowFilesOut;
}
public long getContentSizeIn() {
return contentSizeIn;
}
public long getContentSizeOut() {
return contentSizeOut;
}
public int getFlowFilesRemoved() {
return flowFilesRemoved;
}
public long getContentSizeRemoved() {
return contentSizeRemoved;
}
public long getProcessingNanoseconds() {
return processingNanos;
}
public int getInvocations() {
return invocations;
}
public long getAggregateLineageMillis() {
return aggregateLineageMillis;
}
public int getFlowFilesReceived() {
return flowFilesReceived;
}
public int getFlowFilesSent() {
return flowFilesSent;
}
public long getBytesReceived() {
return bytesReceived;
}
public long getBytesSent() {
return bytesSent;
}
}
private class SecondPrecisionEventContainer implements EventContainer {
private final int numBins;
private final EventSum[] sums;
public SecondPrecisionEventContainer(final int numMinutes) {
numBins = 1 + numMinutes * 60;
sums = new EventSum[numBins];
for (int i = 0; i < numBins; i++) {
sums[i] = new EventSum();
}
}
@Override
public void addEvent(final FlowFileEvent event) {
final int second = (int) (System.currentTimeMillis() / 1000);
final int binIdx = (int) (second % numBins);
final EventSum sum = sums[binIdx];
sum.addOrReset(event);
}
@Override
public void purgeEvents(final long cutoffEpochMilliseconds) {
// no need to do anything
}
@Override
public FlowFileEvent generateReport(final String consumerId, final long sinceEpochMillis) {
int flowFilesIn = 0, flowFilesOut = 0, flowFilesRemoved = 0;
long contentSizeIn = 0L, contentSizeOut = 0L, contentSizeRemoved = 0L;
long bytesRead = 0L, bytesWritten = 0L;
int invocations = 0;
long processingNanos = 0L;
long aggregateLineageMillis = 0L;
int flowFilesReceived = 0, flowFilesSent = 0;
long bytesReceived = 0L, bytesSent = 0L;
final long second = sinceEpochMillis / 1000;
final int startBinIdx = (int) (second % numBins);
for (int i = 0; i < numBins; i++) {
int binIdx = (startBinIdx + i) % numBins;
final EventSum sum = sums[binIdx];
final EventSumValue sumValue = sum.getValue();
if (sumValue.getTimestamp() >= sinceEpochMillis) {
flowFilesIn += sumValue.getFlowFilesIn();
flowFilesOut += sumValue.getFlowFilesOut();
flowFilesRemoved += sumValue.getFlowFilesRemoved();
contentSizeIn += sumValue.getContentSizeIn();
contentSizeOut += sumValue.getContentSizeOut();
contentSizeRemoved += sumValue.getContentSizeRemoved();
bytesRead += sumValue.getBytesRead();
bytesWritten += sumValue.getBytesWritten();
flowFilesReceived += sumValue.getFlowFilesReceived();
bytesReceived += sumValue.getBytesReceived();
flowFilesSent += sumValue.getFlowFilesSent();
bytesSent += sumValue.getBytesSent();
invocations += sumValue.getInvocations();
processingNanos += sumValue.getProcessingNanoseconds();
aggregateLineageMillis += sumValue.getAggregateLineageMillis();
}
}
return new StandardFlowFileEvent(consumerId, flowFilesIn, contentSizeIn,
flowFilesOut, contentSizeOut, flowFilesRemoved, contentSizeRemoved,
bytesRead, bytesWritten, flowFilesReceived, bytesReceived, flowFilesSent, bytesSent,
invocations, aggregateLineageMillis, processingNanos);
}
}
}

View File

@ -57,6 +57,7 @@ import org.apache.nifi.controller.repository.io.DisableOnCloseOutputStream;
import org.apache.nifi.controller.repository.io.FlowFileAccessInputStream;
import org.apache.nifi.controller.repository.io.FlowFileAccessOutputStream;
import org.apache.nifi.controller.repository.io.LimitedInputStream;
import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.FlowFileFilter;
@ -108,7 +109,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
private final Map<FlowFileRecord, StandardRepositoryRecord> records = new HashMap<>();
private final Map<String, StandardFlowFileEvent> connectionCounts = new HashMap<>();
private final Map<FlowFileQueue, Set<FlowFileRecord>> unacknowledgedFlowFiles = new HashMap<>();
private final Map<String, Long> counters = new HashMap<>();
private final Map<ContentClaim, ByteCountingOutputStream> appendableStreams = new HashMap<>();
private final ProcessContext context;
private final Map<FlowFile, Integer> readRecursionSet = new HashMap<>();// set used to track what is currently being operated on to prevent logic failures if recursive calls occurring
@ -117,6 +117,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
private final long sessionId;
private final String connectableDescription;
private Map<String, Long> countersOnCommit;
private Map<String, Long> immediateCounters;
private final Set<String> removedFlowFiles = new HashSet<>();
private final Set<String> createdFlowFiles = new HashSet<>();
@ -438,8 +441,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
}
for (final Map.Entry<String, Long> entry : checkpoint.counters.entrySet()) {
adjustCounter(entry.getKey(), entry.getValue(), true);
for (final Map.Entry<String, Long> entry : checkpoint.countersOnCommit.entrySet()) {
context.adjustCounter(entry.getKey(), entry.getValue());
}
acknowledgeRecords();
@ -533,6 +536,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
flowFileEvent.setAggregateLineageMillis(lineageMillis);
final Map<String, Long> counters = combineCounters(checkpoint.countersOnCommit, checkpoint.immediateCounters);
flowFileEvent.setCounters(counters);
context.getFlowFileEventRepository().updateRepository(flowFileEvent);
for (final FlowFileEvent connectionEvent : checkpoint.connectionCounts.values()) {
@ -543,6 +549,23 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
}
private Map<String, Long> combineCounters(final Map<String, Long> first, final Map<String, Long> second) {
if (first == null && second == null) {
return null;
}
if (first == null) {
return second;
}
if (second == null) {
return first;
}
final Map<String, Long> combined = new HashMap<>();
combined.putAll(first);
combined.putAll(second);
return combined;
}
private void addEventType(final Map<String, Set<ProvenanceEventType>> map, final String id, final ProvenanceEventType eventType) {
Set<ProvenanceEventType> eventTypes = map.get(id);
if (eventTypes == null) {
@ -1013,6 +1036,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final StandardFlowFileEvent flowFileEvent = new StandardFlowFileEvent(connectable.getIdentifier());
flowFileEvent.setBytesRead(bytesRead);
flowFileEvent.setBytesWritten(bytesWritten);
flowFileEvent.setCounters(immediateCounters);
// update event repository
try {
@ -1106,7 +1130,12 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
connectionCounts.clear();
createdFlowFiles.clear();
removedFlowFiles.clear();
counters.clear();
if (countersOnCommit != null) {
countersOnCommit.clear();
}
if (immediateCounters != null) {
immediateCounters.clear();
}
generatedProvenanceEvents.clear();
forkEventBuilders.clear();
@ -1441,12 +1470,24 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
@Override
public void adjustCounter(final String name, final long delta, final boolean immediate) {
final Map<String, Long> counters;
if (immediate) {
context.adjustCounter(name, delta);
return;
if (immediateCounters == null) {
immediateCounters = new HashMap<>();
}
counters = immediateCounters;
} else {
if (countersOnCommit == null) {
countersOnCommit = new HashMap<>();
}
counters = countersOnCommit;
}
adjustCounter(name, delta, counters);
if (immediate) {
context.adjustCounter(name, delta);
}
}
private void adjustCounter(final String name, final long delta, final Map<String, Long> map) {
@ -3216,7 +3257,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
private final Map<FlowFileRecord, StandardRepositoryRecord> records = new HashMap<>();
private final Map<String, StandardFlowFileEvent> connectionCounts = new HashMap<>();
private final Map<FlowFileQueue, Set<FlowFileRecord>> unacknowledgedFlowFiles = new HashMap<>();
private final Map<String, Long> counters = new HashMap<>();
private Map<String, Long> countersOnCommit = new HashMap<>();
private Map<String, Long> immediateCounters = new HashMap<>();
private final Map<FlowFile, Path> deleteOnCommit = new HashMap<>();
private final Set<String> removedFlowFiles = new HashSet<>();
@ -3242,7 +3285,22 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
this.records.putAll(session.records);
this.connectionCounts.putAll(session.connectionCounts);
this.unacknowledgedFlowFiles.putAll(session.unacknowledgedFlowFiles);
this.counters.putAll(session.counters);
if (session.countersOnCommit != null) {
if (this.countersOnCommit == null) {
this.countersOnCommit = new HashMap<>();
}
this.countersOnCommit.putAll(session.countersOnCommit);
}
if (session.immediateCounters != null) {
if (this.immediateCounters == null) {
this.immediateCounters = new HashMap<>();
}
this.immediateCounters.putAll(session.immediateCounters);
}
this.deleteOnCommit.putAll(session.deleteOnCommit);
this.removedFlowFiles.addAll(session.removedFlowFiles);

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.repository.metrics;
import org.apache.nifi.controller.repository.FlowFileEvent;
public interface EventContainer {
public void addEvent(FlowFileEvent event);
public void purgeEvents(long cutoffEpochMillis);
public FlowFileEvent generateReport(String componentId, long sinceEpochMillis);
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.repository.metrics;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.controller.repository.FlowFileEvent;
public class EventSum {
private final AtomicReference<EventSumValue> ref = new AtomicReference<>();
public EventSumValue getValue() {
final EventSumValue value = ref.get();
return value == null ? new EventSumValue() : value;
}
public void addOrReset(final FlowFileEvent event) {
final long expectedMinute = System.currentTimeMillis() / 60000;
EventSumValue curValue;
while (true) {
curValue = ref.get();
if (curValue == null || curValue.getMinuteTimestamp() != expectedMinute) {
final EventSumValue newValue = new EventSumValue();
final boolean replaced = ref.compareAndSet(curValue, newValue);
if (replaced) {
curValue = newValue;
break;
}
} else {
break;
}
}
curValue.add(event);
}
}

View File

@ -0,0 +1,207 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.repository.metrics;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.controller.repository.FlowFileEvent;
public class EventSumValue {
private final AtomicInteger flowFilesIn = new AtomicInteger(0);
private final AtomicInteger flowFilesOut = new AtomicInteger(0);
private final AtomicInteger flowFilesRemoved = new AtomicInteger(0);
private final AtomicInteger flowFilesReceived = new AtomicInteger(0);
private final AtomicInteger flowFilesSent = new AtomicInteger(0);
private final AtomicLong contentSizeIn = new AtomicLong(0L);
private final AtomicLong contentSizeOut = new AtomicLong(0L);
private final AtomicLong contentSizeRemoved = new AtomicLong(0L);
private final AtomicLong bytesRead = new AtomicLong(0L);
private final AtomicLong bytesWritten = new AtomicLong(0L);
private final AtomicLong bytesReceived = new AtomicLong(0L);
private final AtomicLong bytesSent = new AtomicLong(0L);
private final AtomicLong processingNanos = new AtomicLong(0L);
private final AtomicLong aggregateLineageMillis = new AtomicLong(0L);
private final AtomicInteger invocations = new AtomicInteger(0);
private final ConcurrentMap<String, Long> counters = new ConcurrentHashMap<>();
private final long minuteTimestamp;
private final long millisecondTimestamp;
public EventSumValue() {
this.millisecondTimestamp = System.currentTimeMillis();
this.minuteTimestamp = millisecondTimestamp / 60000;
}
public void add(final FlowFileEvent flowFileEvent) {
this.aggregateLineageMillis.addAndGet(flowFileEvent.getAggregateLineageMillis());
this.bytesRead.addAndGet(flowFileEvent.getBytesRead());
this.bytesReceived.addAndGet(flowFileEvent.getBytesReceived());
this.bytesSent.addAndGet(flowFileEvent.getBytesSent());
this.bytesWritten.addAndGet(flowFileEvent.getBytesWritten());
this.contentSizeIn.addAndGet(flowFileEvent.getContentSizeIn());
this.contentSizeOut.addAndGet(flowFileEvent.getContentSizeOut());
this.contentSizeRemoved.addAndGet(flowFileEvent.getContentSizeRemoved());
this.flowFilesIn.addAndGet(flowFileEvent.getFlowFilesIn());
this.flowFilesOut.addAndGet(flowFileEvent.getFlowFilesOut());
this.flowFilesReceived.addAndGet(flowFileEvent.getFlowFilesReceived());
this.flowFilesRemoved.addAndGet(flowFileEvent.getFlowFilesRemoved());
this.flowFilesSent.addAndGet(flowFileEvent.getFlowFilesSent());
this.invocations.addAndGet(flowFileEvent.getInvocations());
this.processingNanos.addAndGet(flowFileEvent.getProcessingNanoseconds());
final Map<String, Long> eventCounters = flowFileEvent.getCounters();
if (eventCounters != null) {
for (final Map.Entry<String, Long> entry : eventCounters.entrySet()) {
final String counterName = entry.getKey();
final Long counterValue = entry.getValue();
counters.compute(counterName, (key, value) -> value == null ? counterValue : value + counterValue);
}
}
}
public FlowFileEvent toFlowFileEvent(final String componentId) {
final StandardFlowFileEvent event = new StandardFlowFileEvent(componentId);
event.setAggregateLineageMillis(getAggregateLineageMillis());
event.setBytesRead(getBytesRead());
event.setBytesReceived(getBytesReceived());
event.setBytesSent(getBytesSent());
event.setBytesWritten(getBytesWritten());
event.setContentSizeIn(getContentSizeIn());
event.setContentSizeOut(getContentSizeOut());
event.setContentSizeRemoved(getContentSizeRemoved());
event.setFlowFilesIn(getFlowFilesIn());
event.setFlowFilesOut(getFlowFilesOut());
event.setFlowFilesReceived(getFlowFilesReceived());
event.setFlowFilesRemoved(getFlowFilesRemoved());
event.setFlowFilesSent(getFlowFilesSent());
event.setInvocations(getInvocations());
event.setProcessingNanos(getProcessingNanoseconds());
event.setCounters(Collections.unmodifiableMap(this.counters));
return event;
}
public void add(final EventSumValue other) {
this.aggregateLineageMillis.addAndGet(other.getAggregateLineageMillis());
this.bytesRead.addAndGet(other.getBytesRead());
this.bytesReceived.addAndGet(other.getBytesReceived());
this.bytesSent.addAndGet(other.getBytesSent());
this.bytesWritten.addAndGet(other.getBytesWritten());
this.contentSizeIn.addAndGet(other.getContentSizeIn());
this.contentSizeOut.addAndGet(other.getContentSizeOut());
this.contentSizeRemoved.addAndGet(other.getContentSizeRemoved());
this.flowFilesIn.addAndGet(other.getFlowFilesIn());
this.flowFilesOut.addAndGet(other.getFlowFilesOut());
this.flowFilesReceived.addAndGet(other.getFlowFilesReceived());
this.flowFilesRemoved.addAndGet(other.getFlowFilesRemoved());
this.flowFilesSent.addAndGet(other.getFlowFilesSent());
this.invocations.addAndGet(other.getInvocations());
this.processingNanos.addAndGet(other.getProcessingNanoseconds());
final Map<String, Long> eventCounters = other.getCounters();
if (eventCounters != null) {
for (final Map.Entry<String, Long> entry : eventCounters.entrySet()) {
final String counterName = entry.getKey();
final Long counterValue = entry.getValue();
counters.compute(counterName, (key, value) -> value == null ? counterValue : value + counterValue);
}
}
}
public long getTimestamp() {
return millisecondTimestamp;
}
public long getMinuteTimestamp() {
return minuteTimestamp;
}
public long getBytesRead() {
return bytesRead.get();
}
public long getBytesWritten() {
return bytesWritten.get();
}
public int getFlowFilesIn() {
return flowFilesIn.get();
}
public int getFlowFilesOut() {
return flowFilesOut.get();
}
public long getContentSizeIn() {
return contentSizeIn.get();
}
public long getContentSizeOut() {
return contentSizeOut.get();
}
public int getFlowFilesRemoved() {
return flowFilesRemoved.get();
}
public long getContentSizeRemoved() {
return contentSizeRemoved.get();
}
public long getProcessingNanoseconds() {
return processingNanos.get();
}
public int getInvocations() {
return invocations.get();
}
public long getAggregateLineageMillis() {
return aggregateLineageMillis.get();
}
public int getFlowFilesReceived() {
return flowFilesReceived.get();
}
public int getFlowFilesSent() {
return flowFilesSent.get();
}
public long getBytesReceived() {
return bytesReceived.get();
}
public long getBytesSent() {
return bytesSent.get();
}
public Map<String, Long> getCounters() {
return counters;
}
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.repository.metrics;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.nifi.controller.repository.FlowFileEvent;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.repository.StandardRepositoryStatusReport;
public class RingBufferEventRepository implements FlowFileEventRepository {
private final int numMinutes;
private final ConcurrentMap<String, EventContainer> componentEventMap = new ConcurrentHashMap<>();
public RingBufferEventRepository(final int numMinutes) {
this.numMinutes = numMinutes;
}
@Override
public void close() throws IOException {
}
@Override
public void updateRepository(final FlowFileEvent event) {
final String componentId = event.getComponentIdentifier();
final EventContainer eventContainer = componentEventMap.computeIfAbsent(componentId, id -> new SecondPrecisionEventContainer(numMinutes));
eventContainer.addEvent(event);
}
@Override
public StandardRepositoryStatusReport reportTransferEvents(final long sinceEpochMillis) {
final StandardRepositoryStatusReport report = new StandardRepositoryStatusReport();
componentEventMap.entrySet().stream()
.map(entry -> entry.getValue().generateReport(entry.getKey(), sinceEpochMillis))
.forEach(event -> report.addReportEntry(event));
return report;
}
@Override
public void purgeTransferEvents(final long cutoffEpochMilliseconds) {
// This is done so that if a processor is removed from the graph, its events
// will be removed rather than being kept in memory
for (final EventContainer container : componentEventMap.values()) {
container.purgeEvents(cutoffEpochMilliseconds);
}
}
}

View File

@ -0,0 +1,69 @@
/*
e * Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.repository.metrics;
import org.apache.nifi.controller.repository.FlowFileEvent;
public class SecondPrecisionEventContainer implements EventContainer {
private final int numBins;
private final EventSum[] sums;
public SecondPrecisionEventContainer(final int numMinutes) {
numBins = 1 + numMinutes * 60;
sums = new EventSum[numBins];
for (int i = 0; i < numBins; i++) {
sums[i] = new EventSum();
}
}
@Override
public void addEvent(final FlowFileEvent event) {
final int second = (int) (System.currentTimeMillis() / 1000);
final int binIdx = second % numBins;
final EventSum sum = sums[binIdx];
sum.addOrReset(event);
}
@Override
public void purgeEvents(final long cutoffEpochMilliseconds) {
// no need to do anything
}
@Override
public FlowFileEvent generateReport(final String componentId, final long sinceEpochMillis) {
final EventSumValue eventSumValue = new EventSumValue();
final long second = sinceEpochMillis / 1000;
final int startBinIdx = (int) (second % numBins);
for (int i = 0; i < numBins; i++) {
int binIdx = (startBinIdx + i) % numBins;
final EventSum sum = sums[binIdx];
final EventSumValue sumValue = sum.getValue();
if (sumValue.getTimestamp() >= sinceEpochMillis) {
eventSumValue.add(sumValue);
}
}
final FlowFileEvent flowFileEvent = eventSumValue.toFlowFileEvent(componentId);
return flowFileEvent;
}
}

View File

@ -14,7 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.repository;
package org.apache.nifi.controller.repository.metrics;
import java.util.Map;
import org.apache.nifi.controller.repository.FlowFileEvent;
public final class StandardFlowFileEvent implements FlowFileEvent, Cloneable {
@ -35,56 +39,12 @@ public final class StandardFlowFileEvent implements FlowFileEvent, Cloneable {
private int flowFilesSent;
private long bytesSent;
private int invocations;
private Map<String, Long> counters;
public StandardFlowFileEvent(final String componentId) {
this.componentId = componentId;
}
public StandardFlowFileEvent(final String componentId,
final int flowFilesIn, final long contentSizeIn,
final int flowFilesOut, final long contentSizeOut,
final int flowFilesRemoved, final long contentSizeRemoved,
final long bytesRead, final long bytesWritten,
final int flowFilesReceived, final long bytesReceived,
final int flowFilesSent, final long bytesSent,
final int invocations, final long averageLineageMillis, final long processingNanos) {
this.componentId = componentId;
this.flowFilesIn = flowFilesIn;
this.contentSizeIn = contentSizeIn;
this.flowFilesOut = flowFilesOut;
this.contentSizeOut = contentSizeOut;
this.flowFilesRemoved = flowFilesRemoved;
this.contentSizeRemoved = contentSizeRemoved;
this.bytesRead = bytesRead;
this.bytesWritten = bytesWritten;
this.invocations = invocations;
this.flowFilesReceived = flowFilesReceived;
this.bytesReceived = bytesReceived;
this.flowFilesSent = flowFilesSent;
this.bytesSent = bytesSent;
this.aggregateLineageMillis = averageLineageMillis;
this.processingNanos = processingNanos;
}
public StandardFlowFileEvent(final FlowFileEvent other) {
this.componentId = other.getComponentIdentifier();
this.flowFilesIn = other.getFlowFilesIn();
this.contentSizeIn = other.getContentSizeIn();
this.flowFilesOut = other.getFlowFilesOut();
this.contentSizeOut = other.getContentSizeOut();
this.flowFilesRemoved = other.getFlowFilesRemoved();
this.contentSizeRemoved = other.getContentSizeRemoved();
this.bytesRead = other.getBytesRead();
this.bytesWritten = other.getBytesWritten();
this.invocations = other.getInvocations();
this.flowFilesReceived = other.getFlowFilesReceived();
this.bytesReceived = other.getBytesReceived();
this.flowFilesSent = other.getFlowFilesSent();
this.bytesSent = other.getBytesSent();
this.aggregateLineageMillis = other.getAggregateLineageMillis();
this.processingNanos = other.getProcessingNanoseconds();
}
@Override
public String getComponentIdentifier() {
return componentId;
@ -234,4 +194,12 @@ public final class StandardFlowFileEvent implements FlowFileEvent, Cloneable {
return aggregateLineageMillis;
}
@Override
public Map<String, Long> getCounters() {
return counters;
}
public void setCounters(final Map<String, Long> counters) {
this.counters = counters;
}
}

View File

@ -33,9 +33,9 @@ import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.repository.BatchingSessionFactory;
import org.apache.nifi.controller.repository.ProcessContext;
import org.apache.nifi.controller.repository.StandardFlowFileEvent;
import org.apache.nifi.controller.repository.StandardProcessSession;
import org.apache.nifi.controller.repository.StandardProcessSessionFactory;
import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.engine.FlowEngine;

View File

@ -20,6 +20,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
@ -37,11 +38,25 @@ public class StatusHistoryUtil {
final Set<MetricDescriptor<?>> metricDescriptors = new LinkedHashSet<>();
final LinkedHashMap<String, String> componentDetails = new LinkedHashMap<>(statusHistory.getComponentDetails());
final Set<String> metricNames = new HashSet<>();
for (final StatusSnapshot snapshot : statusHistory.getStatusSnapshots()) {
snapshotDtos.add(StatusHistoryUtil.createStatusSnapshotDto(snapshot));
final StatusSnapshotDTO snapshotDto = StatusHistoryUtil.createStatusSnapshotDto(snapshot);
snapshotDtos.add(snapshotDto);
metricNames.addAll(snapshotDto.getStatusMetrics().keySet());
metricDescriptors.addAll(snapshot.getStatusMetrics().keySet());
}
// We need to ensure that the 'aggregate snapshot' has an entry for every metric, including counters.
// So for any metric that has is not in the aggregate snapshot, add it with a value of 0
for (final StatusSnapshotDTO snapshotDto : snapshotDtos) {
final Map<String, Long> metrics = snapshotDto.getStatusMetrics();
for (final String metricName : metricNames) {
if (!metrics.containsKey(metricName)) {
metrics.put(metricName, 0L);
}
}
}
final StatusHistoryDTO dto = new StatusHistoryDTO();
dto.setGenerated(new Date());
dto.setComponentDetails(componentDetails);

View File

@ -20,6 +20,7 @@ import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter;
import org.apache.nifi.util.ComponentStatusReport;
import org.apache.nifi.util.ComponentStatusReport.ComponentType;
import org.apache.nifi.util.NiFiProperties;
@ -29,6 +30,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.Map;
public class VolatileComponentStatusRepository implements ComponentStatusRepository {
@ -72,7 +74,7 @@ public class VolatileComponentStatusRepository implements ComponentStatusReposit
}
@Override
public StatusHistory getProcessorStatusHistory(final String processorId, final Date start, final Date end, final int preferredDataPoints) {
public StatusHistory getProcessorStatusHistory(final String processorId, final Date start, final Date end, final int preferredDataPoints, final boolean includeCounters) {
final StandardStatusHistory history = new StandardStatusHistory();
history.setComponentDetail(COMPONENT_DETAIL_ID, processorId);
@ -98,6 +100,21 @@ public class VolatileComponentStatusRepository implements ComponentStatusReposit
}
}
if (includeCounters) {
final Map<String, Long> counters = status.getCounters();
if (counters != null) {
for (final Map.Entry<String, Long> entry : counters.entrySet()) {
final String counterName = entry.getKey();
final String label = entry.getKey() + " (5 mins)";
final MetricDescriptor<ProcessorStatus> metricDescriptor = new StandardMetricDescriptor<>(entry.getKey(), label, label, Formatter.COUNT,
s -> s.getCounters() == null ? null : s.getCounters().get(counterName));
snapshot.addStatusMetric(metricDescriptor, entry.getValue());
}
}
}
history.addStatusSnapshot(snapshot);
return true;
}

View File

@ -25,9 +25,9 @@ import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.repository.BatchingSessionFactory;
import org.apache.nifi.controller.repository.ProcessContext;
import org.apache.nifi.controller.repository.StandardFlowFileEvent;
import org.apache.nifi.controller.repository.StandardProcessSession;
import org.apache.nifi.controller.repository.StandardProcessSessionFactory;
import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
import org.apache.nifi.controller.scheduling.ProcessContextFactory;
import org.apache.nifi.controller.scheduling.ScheduleState;
import org.apache.nifi.controller.scheduling.SchedulingAgent;

View File

@ -16,8 +16,7 @@
*/
package org.apache.nifi.spring;
import org.apache.nifi.controller.repository.RingBufferEventRepository;
import org.apache.nifi.controller.repository.metrics.RingBufferEventRepository;
import org.springframework.beans.factory.FactoryBean;
public class RingBufferEventRepositoryBean implements FactoryBean<RingBufferEventRepository> {

View File

@ -16,10 +16,12 @@
*/
package org.apache.nifi.controller.repository;
import org.apache.nifi.controller.repository.RingBufferEventRepository;
import org.apache.nifi.controller.repository.StandardRepositoryStatusReport;
import org.apache.nifi.controller.repository.metrics.RingBufferEventRepository;
import org.apache.nifi.controller.repository.FlowFileEvent;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
@ -133,6 +135,11 @@ public class TestRingBufferEventRepository {
public long getBytesSent() {
return 0;
}
@Override
public Map<String, Long> getCounters() {
return Collections.emptyMap();
}
};
}
}

View File

@ -289,10 +289,12 @@ public class ControllerFacade implements Authorizable {
throw new ResourceNotFoundException(String.format("Unable to locate processor with id '%s'.", processorId));
}
final StatusHistoryDTO statusHistory = flowController.getProcessorStatusHistory(processorId);
final boolean authorized = processor.isAuthorized(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
final StatusHistoryDTO statusHistory = flowController.getProcessorStatusHistory(processorId, authorized);
// if not authorized
if (!processor.isAuthorized(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser())) {
if (!authorized) {
statusHistory.getComponentDetails().put(ComponentStatusRepository.COMPONENT_DETAIL_NAME, processorId);
statusHistory.getComponentDetails().put(ComponentStatusRepository.COMPONENT_DETAIL_TYPE, "Processor");
}