NIFI-5466: Keep a running total of stats for each component. Refactored FlowFileEvent and repository in order to provide more efficient storage of objects on Java heap by allowing the same 'EMPTY' object to be reused

- Refactored VolatileComponentStatusRepository to avoid holding on to ProcessorStatus objects, etc, and only keep what they need
 - Updated VolatileComponentStatusRepository to ensure that we are efficiently storing metrics for processors, etc. that are not running

This closes #2939

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Mark Payne 2018-08-02 14:17:36 -04:00
parent 410176ed22
commit 7bbb5a823a
38 changed files with 1594 additions and 612 deletions

View File

@ -23,13 +23,14 @@ package org.apache.nifi.controller.status.history;
*/
public interface MetricDescriptor<T> {
public enum Formatter {
enum Formatter {
COUNT,
DURATION,
DATA_SIZE
};
int getMetricIdentifier();
/**
* Specifies how the values should be formatted
*

View File

@ -17,7 +17,7 @@
package org.apache.nifi.controller.status.history;
import java.util.Date;
import java.util.Map;
import java.util.Set;
/**
* A StatusSnapshot represents a Component's status report at some point in time
@ -29,10 +29,16 @@ public interface StatusSnapshot {
*/
Date getTimestamp();
Set<MetricDescriptor<?>> getMetricDescriptors();
Long getStatusMetric(MetricDescriptor<?> descriptor);
/**
* @return a Map of MetricDescriptor to value
* Returns an instance of StatusSnapshot that has all the same information as {@code this} except for
* Counters. If {@code this} does not contain any counters, the object returned may (or may not) be {@code this}.
* @return a StatusSnapshot without counters
*/
Map<MetricDescriptor<?>, Long> getStatusMetrics();
StatusSnapshot withoutCounters();
/**
* @return a {@link ValueReducer} that is capable of merging multiple

View File

@ -41,6 +41,7 @@ import java.net.URI;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
@ -156,7 +157,7 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger {
return counters.getOrDefault(descriptorDto.getField(), 0L);
};
final MetricDescriptor<ProcessorStatus> metricDescriptor = new StandardMetricDescriptor<>(descriptorDto.getField(),
final MetricDescriptor<ProcessorStatus> metricDescriptor = new StandardMetricDescriptor<>(() -> 0, descriptorDto.getField(),
descriptorDto.getLabel(), descriptorDto.getDescription(), Formatter.COUNT, valueMapper);
metricDescriptors.put(fieldName, metricDescriptor);
@ -197,11 +198,7 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger {
final StatusSnapshot snapshot = createSnapshot(snapshotDto, metricDescriptors);
final Date normalizedDate = normalizeStatusSnapshotDate(snapshot.getTimestamp(), componentStatusSnapshotMillis);
Map<String, StatusSnapshot> nodeToSnapshotMap = dateToNodeSnapshots.get(normalizedDate);
if (nodeToSnapshotMap == null) {
nodeToSnapshotMap = new HashMap<>();
dateToNodeSnapshots.put(normalizedDate, nodeToSnapshotMap);
}
Map<String, StatusSnapshot> nodeToSnapshotMap = dateToNodeSnapshots.computeIfAbsent(normalizedDate, k -> new HashMap<>());
nodeToSnapshotMap.put(nodeStatusSnapshot.getNodeId(), snapshot);
}
}
@ -220,7 +217,7 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger {
}
private StatusSnapshot createSnapshot(final StatusSnapshotDTO snapshotDto, final Map<String, MetricDescriptor<?>> metricDescriptors) {
final StandardStatusSnapshot snapshot = new StandardStatusSnapshot();
final StandardStatusSnapshot snapshot = new StandardStatusSnapshot(new HashSet<>(metricDescriptors.values()));
snapshot.setTimestamp(snapshotDto.getTimestamp());
// Default all metrics to 0 so that if a counter has not yet been registered, it will have a value of 0 instead

View File

@ -20,8 +20,6 @@ import java.util.Map;
public interface FlowFileEvent {
String getComponentIdentifier();
int getFlowFilesIn();
int getFlowFilesOut();

View File

@ -25,15 +25,16 @@ public interface FlowFileEventRepository extends Closeable {
* Updates the repository to include a new FlowFile processing event
*
* @param event new event
* @param componentIdentifier the ID of the component that the event belongs to
* @throws java.io.IOException ioe
*/
void updateRepository(FlowFileEvent event) throws IOException;
void updateRepository(FlowFileEvent event, String componentIdentifier) throws IOException;
/**
* @param sinceEpochMillis age of report
* @param now the current time
* @return a report of processing activity since the given time
*/
RepositoryStatusReport reportTransferEvents(long sinceEpochMillis);
RepositoryStatusReport reportTransferEvents(long now);
/**
* Causes any flow file events of the given entry age in epoch milliseconds

View File

@ -20,7 +20,7 @@ import java.util.Map;
public interface RepositoryStatusReport {
void addReportEntry(FlowFileEvent entry);
void addReportEntry(FlowFileEvent entry, String componentId);
Map<String, FlowFileEvent> getReportEntries();

View File

@ -103,6 +103,7 @@ import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.controller.repository.io.LimitedInputStream;
import org.apache.nifi.controller.repository.metrics.EmptyFlowFileEvent;
import org.apache.nifi.controller.scheduling.EventDrivenSchedulingAgent;
import org.apache.nifi.controller.scheduling.QuartzSchedulingAgent;
import org.apache.nifi.controller.scheduling.RepositoryContextFactory;
@ -630,7 +631,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
componentStatusRepository.capture(getControllerStatus(), getGarbageCollectionStatus());
try {
componentStatusRepository.capture(getControllerStatus(), getGarbageCollectionStatus());
} catch (final Exception e) {
LOG.error("Failed to capture component stats for Stats History", e);
}
}
}, snapshotMillis, snapshotMillis, TimeUnit.MILLISECONDS);
@ -3333,18 +3338,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
status.setType(isProcessorAuthorized ? procNode.getComponentType() : "Processor");
final FlowFileEvent entry = report.getReportEntries().get(procNode.getIdentifier());
if (entry == null) {
status.setInputBytes(0L);
status.setInputCount(0);
status.setOutputBytes(0L);
status.setOutputCount(0);
status.setBytesWritten(0L);
status.setBytesRead(0L);
status.setProcessingNanos(0);
status.setInvocations(0);
status.setAverageLineageDuration(0L);
status.setFlowFilesRemoved(0);
} else {
if (entry != null && entry != EmptyFlowFileEvent.INSTANCE) {
final int processedCount = entry.getFlowFilesOut();
final long numProcessedBytes = entry.getContentSizeOut();
status.setOutputBytes(numProcessedBytes);
@ -4117,13 +4111,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
private RepositoryStatusReport getProcessorStats() {
// processed in last 5 minutes
return getProcessorStats(System.currentTimeMillis() - 300000);
return flowFileEventRepository.reportTransferEvents(System.currentTimeMillis());
}
private RepositoryStatusReport getProcessorStats(final long since) {
return flowFileEventRepository.reportTransferEvents(since);
}
//
// Clustering methods

View File

@ -528,7 +528,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
try {
// update event repository
final Connectable connectable = context.getConnectable();
final StandardFlowFileEvent flowFileEvent = new StandardFlowFileEvent(connectable.getIdentifier());
final StandardFlowFileEvent flowFileEvent = new StandardFlowFileEvent();
flowFileEvent.setBytesRead(checkpoint.bytesRead);
flowFileEvent.setBytesWritten(checkpoint.bytesWritten);
flowFileEvent.setContentSizeIn(checkpoint.contentSizeIn);
@ -553,10 +553,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final Map<String, Long> counters = combineCounters(checkpoint.countersOnCommit, checkpoint.immediateCounters);
flowFileEvent.setCounters(counters);
context.getFlowFileEventRepository().updateRepository(flowFileEvent);
context.getFlowFileEventRepository().updateRepository(flowFileEvent, connectable.getIdentifier());
for (final FlowFileEvent connectionEvent : checkpoint.connectionCounts.values()) {
context.getFlowFileEventRepository().updateRepository(connectionEvent);
for (final Map.Entry<String, StandardFlowFileEvent> entry : checkpoint.connectionCounts.entrySet()) {
context.getFlowFileEventRepository().updateRepository(entry.getValue(), entry.getKey());
}
} catch (final IOException ioe) {
LOG.error("FlowFile Event Repository failed to update", ioe);
@ -1052,14 +1052,14 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
final Connectable connectable = context.getConnectable();
final StandardFlowFileEvent flowFileEvent = new StandardFlowFileEvent(connectable.getIdentifier());
final StandardFlowFileEvent flowFileEvent = new StandardFlowFileEvent();
flowFileEvent.setBytesRead(bytesRead);
flowFileEvent.setBytesWritten(bytesWritten);
flowFileEvent.setCounters(immediateCounters);
// update event repository
try {
context.getFlowFileEventRepository().updateRepository(flowFileEvent);
context.getFlowFileEventRepository().updateRepository(flowFileEvent, connectable.getIdentifier());
} catch (final Exception e) {
LOG.error("Failed to update FlowFileEvent Repository due to " + e);
if (LOG.isDebugEnabled()) {
@ -1458,7 +1458,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
private void incrementConnectionInputCounts(final String connectionId, final int flowFileCount, final long bytes) {
final StandardFlowFileEvent connectionEvent = connectionCounts.computeIfAbsent(connectionId, id -> new StandardFlowFileEvent(id));
final StandardFlowFileEvent connectionEvent = connectionCounts.computeIfAbsent(connectionId, id -> new StandardFlowFileEvent());
connectionEvent.setContentSizeIn(connectionEvent.getContentSizeIn() + bytes);
connectionEvent.setFlowFilesIn(connectionEvent.getFlowFilesIn() + flowFileCount);
}
@ -1468,7 +1468,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
private void incrementConnectionOutputCounts(final String connectionId, final int flowFileCount, final long bytes) {
final StandardFlowFileEvent connectionEvent = connectionCounts.computeIfAbsent(connectionId, id -> new StandardFlowFileEvent(id));
final StandardFlowFileEvent connectionEvent = connectionCounts.computeIfAbsent(connectionId, id -> new StandardFlowFileEvent());
connectionEvent.setContentSizeOut(connectionEvent.getContentSizeOut() + bytes);
connectionEvent.setFlowFilesOut(connectionEvent.getFlowFilesOut() + flowFileCount);
}

View File

@ -54,13 +54,14 @@ public class StandardRepositoryStatusReport implements RepositoryStatusReport {
* Adds an entry to the report.
*
* @param entry an entry
* @param componentId the id of the component that the entry belongs to
*/
@Override
public void addReportEntry(FlowFileEvent entry) {
public void addReportEntry(FlowFileEvent entry, final String componentId) {
if (entry == null) {
throw new NullPointerException("report entry may not be null");
}
this.entries.put(entry.getComponentIdentifier(), entry);
this.entries.put(componentId, entry);
}
@Override
@ -69,7 +70,7 @@ public class StandardRepositoryStatusReport implements RepositoryStatusReport {
for (final String key : this.entries.keySet()) {
final FlowFileEvent entry = this.entries.get(key);
strb.append("[")
.append(entry.getComponentIdentifier()).append(", ")
.append(key).append(", ")
.append(entry.getFlowFilesIn()).append(", ")
.append(entry.getContentSizeIn()).append(", ")
.append(entry.getFlowFilesOut()).append(", ")

View File

@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.repository.metrics;
import org.apache.nifi.controller.repository.FlowFileEvent;
import java.util.Collections;
import java.util.Map;
public class EmptyFlowFileEvent implements FlowFileEvent {
public static final EmptyFlowFileEvent INSTANCE = new EmptyFlowFileEvent();
private EmptyFlowFileEvent() {
}
@Override
public int getFlowFilesIn() {
return 0;
}
@Override
public int getFlowFilesOut() {
return 0;
}
@Override
public int getFlowFilesRemoved() {
return 0;
}
@Override
public long getContentSizeIn() {
return 0;
}
@Override
public long getContentSizeOut() {
return 0;
}
@Override
public long getContentSizeRemoved() {
return 0;
}
@Override
public long getBytesRead() {
return 0;
}
@Override
public long getBytesWritten() {
return 0;
}
@Override
public long getProcessingNanoseconds() {
return 0;
}
@Override
public long getAverageLineageMillis() {
return 0;
}
@Override
public long getAggregateLineageMillis() {
return 0;
}
@Override
public int getFlowFilesReceived() {
return 0;
}
@Override
public long getBytesReceived() {
return 0;
}
@Override
public int getFlowFilesSent() {
return 0;
}
@Override
public long getBytesSent() {
return 0;
}
@Override
public int getInvocations() {
return 0;
}
@Override
public Map<String, Long> getCounters() {
return Collections.emptyMap();
}
}

View File

@ -20,9 +20,9 @@ package org.apache.nifi.controller.repository.metrics;
import org.apache.nifi.controller.repository.FlowFileEvent;
public interface EventContainer {
public void addEvent(FlowFileEvent event);
void addEvent(FlowFileEvent event);
public void purgeEvents(long cutoffEpochMillis);
void purgeEvents(long cutoffEpochMillis);
public FlowFileEvent generateReport(String componentId, long sinceEpochMillis);
FlowFileEvent generateReport(long sinceEpochMillis);
}

View File

@ -17,31 +17,30 @@
package org.apache.nifi.controller.repository.metrics;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.controller.repository.FlowFileEvent;
public class EventSum {
import java.util.concurrent.atomic.AtomicReference;
public class EventSum {
private final AtomicReference<EventSumValue> ref = new AtomicReference<>();
public EventSumValue getValue() {
final EventSumValue value = ref.get();
return value == null ? new EventSumValue() : value;
return value == null ? new EventSumValue(System.currentTimeMillis()) : value;
}
public void addOrReset(final FlowFileEvent event) {
final long expectedMinute = System.currentTimeMillis() / 60000;
public EventSumValue addOrReset(final FlowFileEvent event, final long timestamp) {
final long expectedSecond = timestamp / 1000;
EventSumValue curValue;
while (true) {
curValue = ref.get();
if (curValue == null || curValue.getMinuteTimestamp() != expectedMinute) {
final EventSumValue newValue = new EventSumValue();
if (curValue == null || (curValue.getTimestamp() / 1000) != expectedSecond) {
final EventSumValue newValue = new EventSumValue(timestamp);
final boolean replaced = ref.compareAndSet(curValue, newValue);
if (replaced) {
curValue = newValue;
break;
newValue.add(event);
return curValue;
}
} else {
break;
@ -49,5 +48,24 @@ public class EventSum {
}
curValue.add(event);
return null;
}
public EventSumValue reset(final long ifOlderThan) {
while (true) {
final EventSumValue curValue = ref.get();
if (curValue == null) {
return null;
}
if (curValue.getTimestamp() < ifOlderThan) {
if (ref.compareAndSet(curValue, null)) {
return curValue;
}
} else {
return null;
}
}
}
}

View File

@ -17,13 +17,14 @@
package org.apache.nifi.controller.repository.metrics;
import org.apache.nifi.controller.repository.FlowFileEvent;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.nifi.controller.repository.FlowFileEvent;
public class EventSumValue {
private volatile boolean empty = true;
private int flowFilesIn = 0;
private int flowFilesOut = 0;
@ -44,16 +45,16 @@ public class EventSumValue {
private int invocations = 0;
private Map<String, Long> counters;
private final long minuteTimestamp;
private final long millisecondTimestamp;
public EventSumValue() {
this.millisecondTimestamp = System.currentTimeMillis();
this.minuteTimestamp = millisecondTimestamp / 60000;
public EventSumValue(final long timestamp) {
this.millisecondTimestamp = timestamp;
}
public synchronized void add(final FlowFileEvent flowFileEvent) {
empty = false;
this.aggregateLineageMillis += flowFileEvent.getAggregateLineageMillis();
this.bytesRead += flowFileEvent.getBytesRead();
this.bytesReceived += flowFileEvent.getBytesReceived();
@ -84,8 +85,12 @@ public class EventSumValue {
}
}
public synchronized FlowFileEvent toFlowFileEvent(final String componentId) {
final StandardFlowFileEvent event = new StandardFlowFileEvent(componentId);
public synchronized FlowFileEvent toFlowFileEvent() {
if (empty) {
return EmptyFlowFileEvent.INSTANCE;
}
final StandardFlowFileEvent event = new StandardFlowFileEvent();
event.setAggregateLineageMillis(aggregateLineageMillis);
event.setBytesRead(bytesRead);
event.setBytesReceived(bytesReceived);
@ -106,6 +111,10 @@ public class EventSumValue {
}
public synchronized void add(final EventSumValue other) {
if (other.empty) {
return;
}
synchronized (other) {
this.aggregateLineageMillis += other.aggregateLineageMillis;
this.bytesRead += other.bytesRead;
@ -139,8 +148,42 @@ public class EventSumValue {
}
}
public long getMinuteTimestamp() {
return minuteTimestamp;
public synchronized void subtract(final EventSumValue other) {
if (other.empty) {
return;
}
synchronized (other) {
this.aggregateLineageMillis -= other.aggregateLineageMillis;
this.bytesRead -= other.bytesRead;
this.bytesReceived -= other.bytesReceived;
this.bytesSent -= other.bytesSent;
this.bytesWritten -= other.bytesWritten;
this.contentSizeIn -= other.contentSizeIn;
this.contentSizeOut -= other.contentSizeOut;
this.contentSizeRemoved -= other.contentSizeRemoved;
this.flowFilesIn -= other.flowFilesIn;
this.flowFilesOut -= other.flowFilesOut;
this.flowFilesReceived -= other.flowFilesReceived;
this.flowFilesRemoved -= other.flowFilesRemoved;
this.flowFilesSent -= other.flowFilesSent;
this.invocations -= other.invocations;
this.processingNanos -= other.processingNanos;
final Map<String, Long> eventCounters = other.counters;
if (eventCounters != null) {
if (counters == null) {
counters = new HashMap<>();
}
for (final Map.Entry<String, Long> entry : eventCounters.entrySet()) {
final String counterName = entry.getKey();
final Long counterValue = entry.getValue();
counters.compute(counterName, (key, value) -> value == null ? counterValue : counterValue - value);
}
}
}
}
public long getTimestamp() {

View File

@ -16,14 +16,14 @@
*/
package org.apache.nifi.controller.repository.metrics;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.nifi.controller.repository.FlowFileEvent;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.repository.StandardRepositoryStatusReport;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class RingBufferEventRepository implements FlowFileEventRepository {
private final int numMinutes;
@ -38,8 +38,7 @@ public class RingBufferEventRepository implements FlowFileEventRepository {
}
@Override
public void updateRepository(final FlowFileEvent event) {
final String componentId = event.getComponentIdentifier();
public void updateRepository(final FlowFileEvent event, final String componentId) {
final EventContainer eventContainer = componentEventMap.computeIfAbsent(componentId, id -> new SecondPrecisionEventContainer(numMinutes));
eventContainer.addEvent(event);
}
@ -48,10 +47,7 @@ public class RingBufferEventRepository implements FlowFileEventRepository {
public StandardRepositoryStatusReport reportTransferEvents(final long sinceEpochMillis) {
final StandardRepositoryStatusReport report = new StandardRepositoryStatusReport();
componentEventMap.entrySet().stream()
.map(entry -> entry.getValue().generateReport(entry.getKey(), sinceEpochMillis))
.forEach(event -> report.addReportEntry(event));
componentEventMap.forEach((componentId, container) -> report.addReportEntry(container.generateReport(sinceEpochMillis), componentId));
return report;
}

View File

@ -18,13 +18,24 @@ e * Licensed to the Apache Software Foundation (ASF) under one or more
package org.apache.nifi.controller.repository.metrics;
import org.apache.nifi.controller.repository.FlowFileEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.atomic.AtomicLong;
public class SecondPrecisionEventContainer implements EventContainer {
private static final Logger logger = LoggerFactory.getLogger(SecondPrecisionEventContainer.class);
private final int numBins;
private final EventSum[] sums;
private final EventSumValue aggregateValue = new EventSumValue(0);
private final AtomicLong lastUpdateSecond = new AtomicLong(0);
public SecondPrecisionEventContainer(final int numMinutes) {
numBins = 1 + numMinutes * 60;
// number of bins is number of seconds in 'numMinutes' plus 1. We add one because
// we want to have the 'current bin' that we are adding values to, in addition to the
// previous (X = numMinutes * 60) bins of values that have completed
numBins = numMinutes * 60 + 1;
sums = new EventSum[numBins];
for (int i = 0; i < numBins; i++) {
@ -34,11 +45,62 @@ public class SecondPrecisionEventContainer implements EventContainer {
@Override
public void addEvent(final FlowFileEvent event) {
final int second = (int) (System.currentTimeMillis() / 1000);
final int binIdx = second % numBins;
addEvent(event, System.currentTimeMillis());
}
protected void addEvent(final FlowFileEvent event, final long timestamp) {
final long second = timestamp / 1000;
final int binIdx = (int) (second % numBins);
final EventSum sum = sums[binIdx];
sum.addOrReset(event);
final EventSumValue replaced = sum.addOrReset(event, timestamp);
aggregateValue.add(event);
if (replaced == null) {
logger.debug("Updated bin {}. Did NOT replace.", binIdx);
} else {
logger.debug("Replaced bin {}", binIdx);
aggregateValue.subtract(replaced);
}
// If there are any buckets that have expired, we need to update our aggregate value to reflect that.
processExpiredBuckets(second);
}
private void processExpiredBuckets(final long currentSecond) {
final long lastUpdate = lastUpdateSecond.get();
if (currentSecond > lastUpdate) {
final boolean updated = lastUpdateSecond.compareAndSet(lastUpdate, currentSecond);
if (updated) {
if (lastUpdate == 0L) {
// First update, so nothing to expire
return;
}
final int secondsElapsed = (int) (currentSecond - lastUpdate);
int index = (int) (currentSecond % numBins);
final long expirationTimestamp = 1000 * (currentSecond - numBins);
int expired = 0;
for (int i=0; i < secondsElapsed; i++) {
index--;
if (index < 0) {
index = sums.length - 1;
}
final EventSum expiredSum = sums[index];
final EventSumValue expiredValue = expiredSum.reset(expirationTimestamp);
if (expiredValue != null) {
aggregateValue.subtract(expiredValue);
expired++;
}
}
logger.debug("Expired {} bins", expired);
}
}
}
@Override
@ -47,23 +109,17 @@ public class SecondPrecisionEventContainer implements EventContainer {
}
@Override
public FlowFileEvent generateReport(final String componentId, final long sinceEpochMillis) {
final EventSumValue eventSumValue = new EventSumValue();
final long second = sinceEpochMillis / 1000;
final int startBinIdx = (int) (second % numBins);
for (int i = 0; i < numBins; i++) {
int binIdx = (startBinIdx + i) % numBins;
final EventSum sum = sums[binIdx];
final EventSumValue sumValue = sum.getValue();
if (sumValue.getTimestamp() >= sinceEpochMillis) {
eventSumValue.add(sumValue);
}
public FlowFileEvent generateReport(final long now) {
final long second = now / 1000 + 1;
final long lastUpdate = lastUpdateSecond.get();
final long secondsSinceUpdate = second - lastUpdate;
if (secondsSinceUpdate > numBins) {
logger.debug("EventContainer hasn't been updated in {} seconds so will generate report as Empty FlowFile Event", secondsSinceUpdate);
return EmptyFlowFileEvent.INSTANCE;
}
final FlowFileEvent flowFileEvent = eventSumValue.toFlowFileEvent(componentId);
return flowFileEvent;
logger.debug("Will expire up to {} bins", secondsSinceUpdate);
processExpiredBuckets(second);
return aggregateValue.toFlowFileEvent();
}
}

View File

@ -16,13 +16,11 @@
*/
package org.apache.nifi.controller.repository.metrics;
import java.util.Map;
import org.apache.nifi.controller.repository.FlowFileEvent;
public final class StandardFlowFileEvent implements FlowFileEvent, Cloneable {
import java.util.Map;
private final String componentId;
public final class StandardFlowFileEvent implements FlowFileEvent, Cloneable {
private int flowFilesIn;
private int flowFilesOut;
@ -41,13 +39,7 @@ public final class StandardFlowFileEvent implements FlowFileEvent, Cloneable {
private int invocations;
private Map<String, Long> counters;
public StandardFlowFileEvent(final String componentId) {
this.componentId = componentId;
}
@Override
public String getComponentIdentifier() {
return componentId;
public StandardFlowFileEvent() {
}
@Override

View File

@ -16,13 +16,6 @@
*/
package org.apache.nifi.controller.scheduling;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateManagerProvider;
@ -54,6 +47,13 @@ import org.apache.nifi.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
private static final Logger logger = LoggerFactory.getLogger(EventDrivenSchedulingAgent.class);
@ -255,10 +255,10 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
}
try {
final long processingNanos = System.nanoTime() - startNanos;
final StandardFlowFileEvent procEvent = new StandardFlowFileEvent(connectable.getIdentifier());
final StandardFlowFileEvent procEvent = new StandardFlowFileEvent();
procEvent.setProcessingNanos(processingNanos);
procEvent.setInvocations(invocationCount);
context.getFlowFileEventRepository().updateRepository(procEvent);
context.getFlowFileEventRepository().updateRepository(procEvent, connectable.getIdentifier());
} catch (final IOException e) {
logger.error("Unable to update FlowFileEvent Repository for {}; statistics may be inaccurate. Reason for failure: {}", connectable, e.toString());
logger.error("", e);

View File

@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.status.history;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
import java.util.HashMap;
import java.util.Map;
public class ComponentDetails {
private final String componentId;
private final String groupId;
private final String componentName;
private final String componentType;
private final String sourceName;
private final String destinationName;
private final String targetUri;
public ComponentDetails(final String id, final String groupId, final String componentName, final String componentType,
final String sourceName, final String destinationName, final String remoteUri) {
this.componentId = id;
this.groupId = groupId;
this.componentName = componentName;
this.componentType = componentType;
this.sourceName = sourceName;
this.destinationName = destinationName;
this.targetUri = remoteUri;
}
public static ComponentDetails forProcessor(final ProcessorStatus status) {
return forProcessor(status.getId(), status.getGroupId(), status.getName(), status.getType());
}
public static ComponentDetails forProcessor(final String id, final String groupId, final String processorName, final String processorType) {
return new ComponentDetails(id, groupId, processorName, processorType, null, null, null);
}
public static ComponentDetails forConnection(final ConnectionStatus status) {
return forConnection(status.getId(), status.getGroupId(), status.getName(), status.getSourceName(), status.getDestinationName());
}
public static ComponentDetails forConnection(final String id, final String groupId, final String connectionName, final String sourceName, final String destinationName) {
return new ComponentDetails(id, groupId, connectionName, sourceName, destinationName, null, null);
}
public static ComponentDetails forProcessGroup(final ProcessGroupStatus status) {
return forProcessGroup(status.getId(), status.getName());
}
public static ComponentDetails forProcessGroup(final String id, final String groupName) {
return new ComponentDetails(id,null, groupName, null, null, null, null);
}
public static ComponentDetails forRemoteProcessGroup(final RemoteProcessGroupStatus status) {
return forRemoteProcessGroup(status.getId(), status.getGroupId(), status.getName(), status.getTargetUri());
}
public static ComponentDetails forRemoteProcessGroup(final String id, final String parentGroupId, final String rpgName, final String remoteUri) {
return new ComponentDetails(id, parentGroupId, rpgName, null, null, null, remoteUri);
}
public String getComponentId() {
return componentId;
}
public String getGroupId() {
return groupId;
}
public String getComponentName() {
return componentName;
}
public String getComponentType() {
return componentType;
}
public String getSourceName() {
return sourceName;
}
public String getDestinationName() {
return destinationName;
}
public String getTargetUri() {
return targetUri;
}
/**
* Returns a {@Link Map} whose keys are those values defined by {@link ComponentStatusRepository#COMPONENT_DETAIL_GROUP_ID ComponentStatusRepository.COMPONENT_DETAIL_*}
* and values are the values that are populated for this ComponentDetails object.
*/
public Map<String, String> toMap() {
final Map<String, String> map = new HashMap<>();
map.put(ComponentStatusRepository.COMPONENT_DETAIL_ID, componentId);
map.put(ComponentStatusRepository.COMPONENT_DETAIL_GROUP_ID, groupId);
map.put(ComponentStatusRepository.COMPONENT_DETAIL_NAME, componentName);
map.put(ComponentStatusRepository.COMPONENT_DETAIL_TYPE, componentType);
map.put(ComponentStatusRepository.COMPONENT_DETAIL_SOURCE_NAME, sourceName);
map.put(ComponentStatusRepository.COMPONENT_DETAIL_DESTINATION_NAME, destinationName);
map.put(ComponentStatusRepository.COMPONENT_DETAIL_URI, targetUri);
return map;
}
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.status.history;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class ComponentStatusHistory {
private final MetricRollingBuffer snapshots;
private ComponentDetails componentDetails;
public ComponentStatusHistory(final ComponentDetails details, final int maxCapacity) {
this.componentDetails = details;
snapshots = new MetricRollingBuffer(maxCapacity);
}
public void expireBefore(final Date timestamp) {
snapshots.expireBefore(timestamp);
}
public void update(final StatusSnapshot snapshot, final ComponentDetails details) {
if (snapshot == null) {
return;
}
snapshots.update(snapshot);
componentDetails = details;
}
public StatusHistory toStatusHistory(final List<Date> timestamps, final boolean includeCounters, final Set<MetricDescriptor<?>> defaultStatusMetrics) {
final Date dateGenerated = new Date();
final Map<String, String> componentDetailsMap = componentDetails.toMap();
final List<StatusSnapshot> snapshotList = snapshots.getSnapshots(timestamps, includeCounters, defaultStatusMetrics);
return new StandardStatusHistory(snapshotList, componentDetailsMap, dateGenerated);
}
}

View File

@ -21,53 +21,55 @@ import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter;
public enum ConnectionStatusDescriptor {
INPUT_BYTES(new StandardMetricDescriptor<ConnectionStatus>(
INPUT_BYTES(
"inputBytes",
"Bytes In (5 mins)",
"The cumulative size of all FlowFiles that were transferred to this Connection in the past 5 minutes",
Formatter.DATA_SIZE,
s -> s.getInputBytes())),
ConnectionStatus::getInputBytes),
INPUT_COUNT(new StandardMetricDescriptor<ConnectionStatus>(
INPUT_COUNT(
"inputCount",
"FlowFiles In (5 mins)",
"The number of FlowFiles that were transferred to this Connection in the past 5 minutes",
Formatter.COUNT,
s -> Long.valueOf(s.getInputCount()))),
s -> Long.valueOf(s.getInputCount())),
OUTPUT_BYTES(new StandardMetricDescriptor<ConnectionStatus>(
OUTPUT_BYTES(
"outputBytes",
"Bytes Out (5 mins)",
"The cumulative size of all FlowFiles that were pulled from this Connection in the past 5 minutes",
Formatter.DATA_SIZE,
s -> s.getOutputBytes())),
ConnectionStatus::getOutputBytes),
OUTPUT_COUNT(new StandardMetricDescriptor<ConnectionStatus>(
OUTPUT_COUNT(
"outputCount",
"FlowFiles Out (5 mins)",
"The number of FlowFiles that were pulled from this Connection in the past 5 minutes",
Formatter.COUNT,
s -> Long.valueOf(s.getOutputCount()))),
s -> Long.valueOf(s.getOutputCount())),
QUEUED_BYTES(new StandardMetricDescriptor<ConnectionStatus>(
QUEUED_BYTES(
"queuedBytes",
"Queued Bytes",
"The number of Bytes queued in this Connection",
Formatter.DATA_SIZE,
s -> s.getQueuedBytes())),
ConnectionStatus::getQueuedBytes),
QUEUED_COUNT(new StandardMetricDescriptor<ConnectionStatus>(
QUEUED_COUNT(
"queuedCount",
"Queued Count",
"The number of FlowFiles queued in this Connection",
Formatter.COUNT,
s -> Long.valueOf(s.getQueuedCount())));
s -> Long.valueOf(s.getQueuedCount()));
private MetricDescriptor<ConnectionStatus> descriptor;
private ConnectionStatusDescriptor(final MetricDescriptor<ConnectionStatus> descriptor) {
this.descriptor = descriptor;
ConnectionStatusDescriptor(final String field, final String label, final String description,
final MetricDescriptor.Formatter formatter, final ValueMapper<ConnectionStatus> valueFunction) {
this.descriptor = new StandardMetricDescriptor<>(this::ordinal, field, label, description, formatter, valueFunction);
}
public String getField() {

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.status.history;
import java.util.Date;
import java.util.List;
import java.util.Set;
public class EmptyStatusSnapshot implements StatusSnapshot {
private static final ValueReducer<StatusSnapshot, StatusSnapshot> VALUE_REDUCER = new EmptyValueReducer();
private static final Long METRIC_VALUE = 0L;
private final Date timestamp;
private final Set<MetricDescriptor<?>> metricsDescriptors;
public EmptyStatusSnapshot(final Date timestamp, final Set<MetricDescriptor<?>> metricsDescriptors) {
this.timestamp = timestamp;
this.metricsDescriptors = metricsDescriptors;
}
@Override
public Date getTimestamp() {
return timestamp;
}
@Override
public Set<MetricDescriptor<?>> getMetricDescriptors() {
return metricsDescriptors;
}
@Override
public Long getStatusMetric(final MetricDescriptor<?> descriptor) {
return METRIC_VALUE;
}
@Override
public StatusSnapshot withoutCounters() {
return this;
}
@Override
public ValueReducer<StatusSnapshot, StatusSnapshot> getValueReducer() {
return VALUE_REDUCER;
}
private static class EmptyValueReducer implements ValueReducer<StatusSnapshot, StatusSnapshot> {
@Override
public StatusSnapshot reduce(final List<StatusSnapshot> values) {
return (values == null || values.isEmpty()) ? null : values.get(0);
}
}
}

View File

@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.status.history;
public interface IndexableMetric {
int getIndex();
}

View File

@ -0,0 +1,196 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.status.history;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Set;
public class MetricRollingBuffer {
private final int capacity;
private StatusSnapshot[] snapshots;
private int writeIndex = 0;
private int readIndex;
private boolean readExhausted;
private int count = 0;
public MetricRollingBuffer(final int maxCapacity) {
this.capacity = maxCapacity;
}
public void update(final StatusSnapshot snapshot) {
if (snapshot == null) {
return;
}
if (snapshots == null) {
snapshots = new StatusSnapshot[Math.min(capacity, 16)];
}
if (snapshots[writeIndex] == null) {
count++;
}
snapshots[writeIndex++] = snapshot;
if (writeIndex >= snapshots.length) {
if (snapshots.length < capacity) {
grow();
} else {
writeIndex = 0;
}
}
}
public int size() {
return count;
}
public void expireBefore(final Date date) {
if (snapshots == null) {
return;
}
int readIndex = writeIndex;
for (int i=0; i < snapshots.length; i++) {
final StatusSnapshot snapshot = snapshots[readIndex];
if (snapshot == null) {
readIndex++;
if (readIndex >= snapshots.length) {
readIndex = 0;
}
continue;
}
final Date snapshotTimestamp = snapshot.getTimestamp();
if (snapshotTimestamp.after(date)) {
break;
}
snapshots[readIndex] = null;
count--;
readIndex++;
if (readIndex >= snapshots.length) {
readIndex = 0;
}
}
if (count < snapshots.length / 4 || snapshots.length - count > 128) {
// If we're using less than 1/4 of the array or we have at least 128 null entries, compact.
compact();
}
}
private void grow() {
final int initialSize = snapshots.length;
final int newSize = Math.min(capacity, snapshots.length + 64);
final StatusSnapshot[] newArray = new StatusSnapshot[newSize];
System.arraycopy(snapshots, 0, newArray, 0, snapshots.length);
snapshots = newArray;
writeIndex = initialSize;
}
private void compact() {
final StatusSnapshot[] newArray = new StatusSnapshot[count + 1];
int insertionIndex = 0;
int readIndex = writeIndex;
for (int i=0; i < snapshots.length; i++) {
final StatusSnapshot snapshot = snapshots[readIndex];
if (snapshot != null) {
newArray[insertionIndex++] = snapshot;
}
readIndex++;
if (readIndex >= snapshots.length) {
readIndex = 0;
}
}
snapshots = newArray;
writeIndex = count;
count = newArray.length - 1;
}
public List<StatusSnapshot> getSnapshots(final List<Date> timestamps, final boolean includeCounters, final Set<MetricDescriptor<?>> defaultStatusMetrics) {
if (snapshots == null) {
return Collections.emptyList();
}
final List<StatusSnapshot> list = new ArrayList<>(snapshots.length);
resetRead();
for (final Date timestamp : timestamps) {
final StatusSnapshot snapshot = getSnapshotForTimestamp(timestamp);
if (snapshot == null) {
list.add(new EmptyStatusSnapshot(timestamp, defaultStatusMetrics));
} else {
list.add(includeCounters ? snapshot : snapshot.withoutCounters());
}
}
return list;
}
private StatusSnapshot getSnapshotForTimestamp(final Date timestamp) {
while (!readExhausted) {
final StatusSnapshot snapshot = snapshots[readIndex];
if (snapshot == null) {
advanceRead();
continue;
}
final Date snapshotTimestamp = snapshot.getTimestamp();
if (snapshotTimestamp.before(timestamp)) {
advanceRead();
continue;
}
if (snapshotTimestamp.after(timestamp)) {
return null;
}
advanceRead();
return snapshot;
}
return null;
}
private void resetRead() {
readIndex = writeIndex;
readExhausted = false;
}
private void advanceRead() {
readIndex++;
if (readIndex >= snapshots.length) {
readIndex = 0;
}
if (readIndex == writeIndex) {
readExhausted = true;
}
}
}

View File

@ -17,79 +17,90 @@
package org.apache.nifi.controller.status.history;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter;
import java.util.concurrent.TimeUnit;
public enum ProcessGroupStatusDescriptor {
BYTES_READ(new StandardMetricDescriptor<ProcessGroupStatus>(
BYTES_READ(
"bytesRead",
"Bytes Read (5 mins)",
"The total number of bytes read from Content Repository by Processors in this Process Group in the past 5 minutes",
Formatter.DATA_SIZE,
s -> s.getBytesRead())),
ProcessGroupStatus::getBytesRead),
BYTES_WRITTEN(new StandardMetricDescriptor<ProcessGroupStatus>("bytesWritten",
BYTES_WRITTEN(
"bytesWritten",
"Bytes Written (5 mins)",
"The total number of bytes written to Content Repository by Processors in this Process Group in the past 5 minutes",
Formatter.DATA_SIZE,
s -> s.getBytesWritten())),
ProcessGroupStatus::getBytesWritten),
BYTES_TRANSFERRED(new StandardMetricDescriptor<ProcessGroupStatus>("bytesTransferred",
BYTES_TRANSFERRED(
"bytesTransferred",
"Bytes Transferred (5 mins)",
"The total number of bytes read from or written to Content Repository by Processors in this Process Group in the past 5 minutes",
Formatter.DATA_SIZE,
s -> s.getBytesRead() + s.getBytesWritten())),
s -> s.getBytesRead() + s.getBytesWritten()),
INPUT_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("inputBytes",
INPUT_BYTES("inputBytes",
"Bytes In (5 mins)",
"The cumulative size of all FlowFiles that have entered this Process Group via its Input Ports in the past 5 minutes",
Formatter.DATA_SIZE,
s -> s.getInputContentSize())),
ProcessGroupStatus::getInputContentSize),
INPUT_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("inputCount",
INPUT_COUNT(
"inputCount",
"FlowFiles In (5 mins)",
"The number of FlowFiles that have entered this Process Group via its Input Ports in the past 5 minutes",
Formatter.COUNT,
s -> s.getInputCount().longValue())),
s -> s.getInputCount().longValue()),
OUTPUT_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("outputBytes",
OUTPUT_BYTES(
"outputBytes",
"Bytes Out (5 mins)",
"The cumulative size of all FlowFiles that have exited this Process Group via its Output Ports in the past 5 minutes",
Formatter.DATA_SIZE,
s -> s.getOutputContentSize())),
ProcessGroupStatus::getOutputContentSize),
OUTPUT_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("outputCount",
OUTPUT_COUNT(
"outputCount",
"FlowFiles Out (5 mins)",
"The number of FlowFiles that have exited this Process Group via its Output Ports in the past 5 minutes",
Formatter.COUNT,
s -> s.getOutputCount().longValue())),
s -> s.getOutputCount().longValue()),
QUEUED_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("queuedBytes",
QUEUED_BYTES(
"queuedBytes",
"Queued Bytes",
"The cumulative size of all FlowFiles queued in all Connections of this Process Group",
Formatter.DATA_SIZE,
s -> s.getQueuedContentSize())),
ProcessGroupStatus::getQueuedContentSize),
QUEUED_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("queuedCount",
QUEUED_COUNT(
"queuedCount",
"Queued Count",
"The number of FlowFiles queued in all Connections of this Process Group",
Formatter.COUNT,
s -> s.getQueuedCount().longValue())),
s -> s.getQueuedCount().longValue()),
TASK_MILLIS(new StandardMetricDescriptor<ProcessGroupStatus>("taskMillis",
TASK_MILLIS(
"taskMillis",
"Total Task Duration (5 mins)",
"The total number of thread-milliseconds that the Processors within this ProcessGroup have used to complete their tasks in the past 5 minutes",
Formatter.DURATION,
s -> calculateTaskMillis(s)));
ProcessGroupStatusDescriptor::calculateTaskMillis);
private MetricDescriptor<ProcessGroupStatus> descriptor;
private ProcessGroupStatusDescriptor(final MetricDescriptor<ProcessGroupStatus> descriptor) {
this.descriptor = descriptor;
ProcessGroupStatusDescriptor(final String field, final String label, final String description,
final MetricDescriptor.Formatter formatter, final ValueMapper<ProcessGroupStatus> valueFunction) {
this.descriptor = new StandardMetricDescriptor<>(this::ordinal, field, label, description, formatter, valueFunction);
}
public String getField() {

View File

@ -17,91 +17,92 @@
package org.apache.nifi.controller.status.history;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter;
import java.util.List;
import java.util.concurrent.TimeUnit;
public enum ProcessorStatusDescriptor {
BYTES_READ(new StandardMetricDescriptor<ProcessorStatus>(
BYTES_READ(
"bytesRead",
"Bytes Read (5 mins)",
"The total number of bytes read from the Content Repository by this Processor in the past 5 minutes",
Formatter.DATA_SIZE,
s -> s.getBytesRead())),
ProcessorStatus::getBytesRead),
BYTES_WRITTEN(new StandardMetricDescriptor<ProcessorStatus>(
BYTES_WRITTEN(
"bytesWritten",
"Bytes Written (5 mins)",
"The total number of bytes written to the Content Repository by this Processor in the past 5 minutes",
Formatter.DATA_SIZE,
s -> s.getBytesWritten())),
ProcessorStatus::getBytesWritten),
BYTES_TRANSFERRED(new StandardMetricDescriptor<ProcessorStatus>(
BYTES_TRANSFERRED(
"bytesTransferred",
"Bytes Transferred (5 mins)",
"The total number of bytes read from or written to the Content Repository by this Processor in the past 5 minutes",
Formatter.DATA_SIZE,
s -> s.getBytesRead() + s.getBytesWritten())),
s -> s.getBytesRead() + s.getBytesWritten()),
INPUT_BYTES(new StandardMetricDescriptor<ProcessorStatus>(
INPUT_BYTES(
"inputBytes",
"Bytes In (5 mins)",
"The cumulative size of all FlowFiles that this Processor has pulled from its queues in the past 5 minutes",
Formatter.DATA_SIZE,
s -> s.getInputBytes())),
ProcessorStatus::getInputBytes),
INPUT_COUNT(new StandardMetricDescriptor<ProcessorStatus>(
INPUT_COUNT(
"inputCount",
"FlowFiles In (5 mins)",
"The number of FlowFiles that this Processor has pulled from its queues in the past 5 minutes",
Formatter.COUNT,
s -> Long.valueOf(s.getInputCount()))),
s -> Long.valueOf(s.getInputCount())),
OUTPUT_BYTES(new StandardMetricDescriptor<ProcessorStatus>(
OUTPUT_BYTES(
"outputBytes",
"Bytes Out (5 mins)",
"The cumulative size of all FlowFiles that this Processor has transferred to downstream queues in the past 5 minutes",
Formatter.DATA_SIZE,
s -> s.getOutputBytes())),
ProcessorStatus::getOutputBytes),
OUTPUT_COUNT(new StandardMetricDescriptor<ProcessorStatus>(
OUTPUT_COUNT(
"outputCount",
"FlowFiles Out (5 mins)",
"The number of FlowFiles that this Processor has transferred to downstream queues in the past 5 minutes",
Formatter.COUNT,
s -> Long.valueOf(s.getOutputCount()))),
s -> Long.valueOf(s.getOutputCount())),
TASK_COUNT(new StandardMetricDescriptor<ProcessorStatus>(
TASK_COUNT(
"taskCount",
"Tasks (5 mins)",
"The number of tasks that this Processor has completed in the past 5 minutes",
Formatter.COUNT,
s -> Long.valueOf(s.getInvocations()))),
s -> Long.valueOf(s.getInvocations())),
TASK_MILLIS(new StandardMetricDescriptor<ProcessorStatus>(
TASK_MILLIS(
"taskMillis",
"Total Task Duration (5 mins)",
"The total number of thread-milliseconds that the Processor has used to complete its tasks in the past 5 minutes",
Formatter.DURATION,
s -> TimeUnit.MILLISECONDS.convert(s.getProcessingNanos(), TimeUnit.NANOSECONDS))),
s -> TimeUnit.MILLISECONDS.convert(s.getProcessingNanos(), TimeUnit.NANOSECONDS)),
TASK_NANOS(new StandardMetricDescriptor<ProcessorStatus>(
TASK_NANOS(
"taskNanos",
"Total Task Time (nanos)",
"The total number of thread-nanoseconds that the Processor has used to complete its tasks in the past 5 minutes",
Formatter.COUNT,
ProcessorStatus::getProcessingNanos), false),
ProcessorStatus::getProcessingNanos,
false),
FLOWFILES_REMOVED(new StandardMetricDescriptor<ProcessorStatus>(
FLOWFILES_REMOVED(
"flowFilesRemoved",
"FlowFiles Removed (5 mins)",
"The total number of FlowFiles removed by this Processor in the last 5 minutes",
Formatter.COUNT,
s -> Long.valueOf(s.getFlowFilesRemoved()))),
s -> Long.valueOf(s.getFlowFilesRemoved())),
AVERAGE_LINEAGE_DURATION(new StandardMetricDescriptor<ProcessorStatus>(
AVERAGE_LINEAGE_DURATION(
"averageLineageDuration",
"Average Lineage Duration (5 mins)",
"The average amount of time that a FlowFile took to process (from receipt until this Processor finished processing it) in the past 5 minutes.",
@ -114,23 +115,24 @@ public enum ProcessorStatusDescriptor {
int count = 0;
for (final StatusSnapshot snapshot : values) {
final long removed = snapshot.getStatusMetrics().get(FLOWFILES_REMOVED.getDescriptor()).longValue();
final long outputCount = snapshot.getStatusMetrics().get(OUTPUT_COUNT.getDescriptor()).longValue();
final long removed = snapshot.getStatusMetric(FLOWFILES_REMOVED.getDescriptor()).longValue();
final long outputCount = snapshot.getStatusMetric(OUTPUT_COUNT.getDescriptor()).longValue();
final long processed = removed + outputCount;
count += processed;
final long avgMillis = snapshot.getStatusMetrics().get(AVERAGE_LINEAGE_DURATION.getDescriptor()).longValue();
final long avgMillis = snapshot.getStatusMetric(AVERAGE_LINEAGE_DURATION.getDescriptor()).longValue();
final long totalMillis = avgMillis * processed;
millis += totalMillis;
}
return count == 0 ? 0 : millis / count;
}
}
)),
},
true
),
AVERAGE_TASK_NANOS(new StandardMetricDescriptor<ProcessorStatus>(
AVERAGE_TASK_NANOS(
"averageTaskNanos",
"Average Task Duration (nanoseconds)",
"The average number of nanoseconds it took this Processor to complete a task, over the past 5 minutes",
@ -143,12 +145,12 @@ public enum ProcessorStatusDescriptor {
int invocations = 0;
for (final StatusSnapshot snapshot : values) {
final Long taskNanos = snapshot.getStatusMetrics().get(TASK_NANOS.getDescriptor());
final Long taskNanos = snapshot.getStatusMetric(TASK_NANOS.getDescriptor());
if (taskNanos != null) {
procNanos += taskNanos.longValue();
}
final Long taskInvocations = snapshot.getStatusMetrics().get(TASK_COUNT.getDescriptor());
final Long taskInvocations = snapshot.getStatusMetric(TASK_COUNT.getDescriptor());
if (taskInvocations != null) {
invocations += taskInvocations.intValue();
}
@ -160,22 +162,38 @@ public enum ProcessorStatusDescriptor {
return procNanos / invocations;
}
}));
},
true
);
private final MetricDescriptor<ProcessorStatus> descriptor;
private final boolean visible;
private ProcessorStatusDescriptor(final MetricDescriptor<ProcessorStatus> descriptor) {
this(descriptor, true);
ProcessorStatusDescriptor(final String field, final String label, final String description,
final MetricDescriptor.Formatter formatter, final ValueMapper<ProcessorStatus> valueFunction) {
this(field, label, description, formatter, valueFunction, true);
}
private ProcessorStatusDescriptor(final MetricDescriptor<ProcessorStatus> descriptor, final boolean visible) {
this.descriptor = descriptor;
ProcessorStatusDescriptor(final String field, final String label, final String description,
final MetricDescriptor.Formatter formatter, final ValueMapper<ProcessorStatus> valueFunction, final boolean visible) {
this.descriptor = new StandardMetricDescriptor<>(this::ordinal, field, label, description, formatter, valueFunction);
this.visible = visible;
}
ProcessorStatusDescriptor(final String field, final String label, final String description,
final MetricDescriptor.Formatter formatter, final ValueMapper<ProcessorStatus> valueFunction,
final ValueReducer<StatusSnapshot, Long> reducer, final boolean visible) {
this.descriptor = new StandardMetricDescriptor<>(this::ordinal, field, label, description, formatter, valueFunction, reducer);
this.visible = visible;
}
public String getField() {
return descriptor.getField();
}

View File

@ -24,43 +24,49 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
public enum RemoteProcessGroupStatusDescriptor {
SENT_BYTES(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytes",
SENT_BYTES(
"sentBytes",
"Bytes Sent (5 mins)",
"The cumulative size of all FlowFiles that have been successfully sent to the remote system in the past 5 minutes",
Formatter.DATA_SIZE,
s -> s.getSentContentSize())),
RemoteProcessGroupStatus::getSentContentSize),
SENT_COUNT(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentCount",
SENT_COUNT(
"sentCount",
"FlowFiles Sent (5 mins)",
"The number of FlowFiles that have been successfully sent to the remote system in the past 5 minutes",
Formatter.COUNT,
s -> s.getSentCount().longValue())),
s -> s.getSentCount().longValue()),
RECEIVED_BYTES(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedBytes",
RECEIVED_BYTES(
"receivedBytes",
"Bytes Received (5 mins)",
"The cumulative size of all FlowFiles that have been received from the remote system in the past 5 minutes",
Formatter.DATA_SIZE,
s -> s.getReceivedContentSize())),
RemoteProcessGroupStatus::getReceivedContentSize),
RECEIVED_COUNT(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedCount",
RECEIVED_COUNT(
"receivedCount",
"FlowFiles Received (5 mins)",
"The number of FlowFiles that have been received from the remote system in the past 5 minutes",
Formatter.COUNT,
s -> s.getReceivedCount().longValue())),
s -> s.getReceivedCount().longValue()),
RECEIVED_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedBytesPerSecond",
RECEIVED_BYTES_PER_SECOND(
"receivedBytesPerSecond",
"Received Bytes Per Second",
"The data rate at which data was received from the remote system in the past 5 minutes in terms of Bytes Per Second",
Formatter.DATA_SIZE,
s -> s.getReceivedContentSize().longValue() / 300L)),
s -> s.getReceivedContentSize().longValue() / 300L),
SENT_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytesPerSecond",
SENT_BYTES_PER_SECOND(
"sentBytesPerSecond",
"Sent Bytes Per Second",
"The data rate at which data was received from the remote system in the past 5 minutes in terms of Bytes Per Second",
Formatter.DATA_SIZE,
s -> s.getSentContentSize().longValue() / 300L)),
s -> s.getSentContentSize().longValue() / 300L),
TOTAL_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("totalBytesPerSecond",
TOTAL_BYTES_PER_SECOND("totalBytesPerSecond",
"Total Bytes Per Second",
"The sum of the send and receive data rate from the remote system in the past 5 minutes in terms of Bytes Per Second",
Formatter.DATA_SIZE,
@ -69,9 +75,9 @@ public enum RemoteProcessGroupStatusDescriptor {
public Long getValue(final RemoteProcessGroupStatus status) {
return Long.valueOf((status.getReceivedContentSize().longValue() + status.getSentContentSize().longValue()) / 300L);
}
})),
}),
AVERAGE_LINEAGE_DURATION(new StandardMetricDescriptor<RemoteProcessGroupStatus>(
AVERAGE_LINEAGE_DURATION(
"averageLineageDuration",
"Average Lineage Duration (5 mins)",
"The average amount of time that a FlowFile took to process from receipt to drop in the past 5 minutes. For Processors that do not terminate FlowFiles, this value will be 0.",
@ -84,22 +90,29 @@ public enum RemoteProcessGroupStatusDescriptor {
int count = 0;
for (final StatusSnapshot snapshot : values) {
final long sent = snapshot.getStatusMetrics().get(SENT_COUNT.getDescriptor()).longValue();
final long sent = snapshot.getStatusMetric(SENT_COUNT.getDescriptor()).longValue();
count += sent;
final long avgMillis = snapshot.getStatusMetrics().get(AVERAGE_LINEAGE_DURATION.getDescriptor()).longValue();
final long avgMillis = snapshot.getStatusMetric(AVERAGE_LINEAGE_DURATION.getDescriptor()).longValue();
final long totalMillis = avgMillis * sent;
millis += totalMillis;
}
return count == 0 ? 0 : millis / count;
}
}));
});
private final MetricDescriptor<RemoteProcessGroupStatus> descriptor;
private RemoteProcessGroupStatusDescriptor(final MetricDescriptor<RemoteProcessGroupStatus> descriptor) {
this.descriptor = descriptor;
RemoteProcessGroupStatusDescriptor(final String field, final String label, final String description,
final MetricDescriptor.Formatter formatter, final ValueMapper<RemoteProcessGroupStatus> valueFunction) {
this.descriptor = new StandardMetricDescriptor<>(this::ordinal, field, label, description, formatter, valueFunction);
}
RemoteProcessGroupStatusDescriptor(final String field, final String label, final String description,
final MetricDescriptor.Formatter formatter, final ValueMapper<RemoteProcessGroupStatus> valueFunction, final ValueReducer<StatusSnapshot, Long> reducer) {
this.descriptor = new StandardMetricDescriptor<>(this::ordinal, field, label, description, formatter, valueFunction, reducer);
}
public String getField() {

View File

@ -20,6 +20,7 @@ import java.util.List;
public class StandardMetricDescriptor<T> implements MetricDescriptor<T> {
private final IndexableMetric indexableMetric;
private final String field;
private final String label;
private final String description;
@ -27,12 +28,14 @@ public class StandardMetricDescriptor<T> implements MetricDescriptor<T> {
private final ValueMapper<T> valueMapper;
private final ValueReducer<StatusSnapshot, Long> reducer;
public StandardMetricDescriptor(final String field, final String label, final String description, final MetricDescriptor.Formatter formatter, final ValueMapper<T> valueFunction) {
this(field, label, description, formatter, valueFunction, null);
public StandardMetricDescriptor(final IndexableMetric indexableMetric, final String field, final String label, final String description,
final MetricDescriptor.Formatter formatter, final ValueMapper<T> valueFunction) {
this(indexableMetric, field, label, description, formatter, valueFunction, null);
}
public StandardMetricDescriptor(final String field, final String label, final String description,
final MetricDescriptor.Formatter formatter, final ValueMapper<T> valueFunction, final ValueReducer<StatusSnapshot, Long> reducer) {
public StandardMetricDescriptor(final IndexableMetric indexableMetric, final String field, final String label, final String description,
final MetricDescriptor.Formatter formatter, final ValueMapper<T> valueFunction, final ValueReducer<StatusSnapshot, Long> reducer) {
this.indexableMetric = indexableMetric;
this.field = field;
this.label = label;
this.description = description;
@ -41,6 +44,11 @@ public class StandardMetricDescriptor<T> implements MetricDescriptor<T> {
this.reducer = reducer == null ? new SumReducer() : reducer;
}
@Override
public int getMetricIdentifier() {
return indexableMetric.getIndex();
}
@Override
public String getField() {
return field;
@ -100,7 +108,7 @@ public class StandardMetricDescriptor<T> implements MetricDescriptor<T> {
public Long reduce(final List<StatusSnapshot> values) {
long sum = 0;
for (final StatusSnapshot snapshot : values) {
sum += snapshot.getStatusMetrics().get(StandardMetricDescriptor.this);
sum += snapshot.getStatusMetric(StandardMetricDescriptor.this);
}
return sum;

View File

@ -16,18 +16,25 @@
*/
package org.apache.nifi.controller.status.history;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
public class StandardStatusHistory implements StatusHistory {
private final List<StatusSnapshot> snapshots;
private final Date generated;
private final Map<String, String> componentDetails;
private final List<StatusSnapshot> snapshots = new ArrayList<>();
private final Date generated = new Date();
private final Map<String, String> componentDetails = new LinkedHashMap<>();
public StandardStatusHistory(final List<StatusSnapshot> snapshots, final Map<String, String> componentDetails, final Date generated) {
this.snapshots = snapshots;
this.generated = generated;
this.componentDetails = componentDetails;
}
@Override
public List<StatusSnapshot> getStatusSnapshots() {
return snapshots;
}
@Override
public Date getDateGenerated() {
@ -36,19 +43,6 @@ public class StandardStatusHistory implements StatusHistory {
@Override
public Map<String, String> getComponentDetails() {
return Collections.unmodifiableMap(componentDetails);
}
public void setComponentDetail(final String detailName, final String detailValue) {
componentDetails.put(detailName, detailValue);
}
@Override
public List<StatusSnapshot> getStatusSnapshots() {
return Collections.unmodifiableList(snapshots);
}
public void addStatusSnapshot(final StatusSnapshot snapshot) {
snapshots.add(snapshot);
return componentDetails;
}
}

View File

@ -17,7 +17,7 @@
package org.apache.nifi.controller.status.history;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@ -25,25 +25,63 @@ import java.util.Set;
public class StandardStatusSnapshot implements StatusSnapshot {
private final Map<MetricDescriptor<?>, Long> metricValues = new LinkedHashMap<>();
private final Set<MetricDescriptor<?>> metricDescriptors;
private final long[] values;
private Map<MetricDescriptor<?>, Long> counterValues = null;
private Date timestamp = new Date();
public StandardStatusSnapshot(final Set<MetricDescriptor<?>> metricDescriptors) {
this.metricDescriptors = metricDescriptors;
values = new long[metricDescriptors.size()];
}
private StandardStatusSnapshot(final Set<MetricDescriptor<?>> metricDescriptors, final long[] values) {
this.metricDescriptors = metricDescriptors;
this.values = values;
}
@Override
public Date getTimestamp() {
return timestamp;
}
@Override
public Set<MetricDescriptor<?>> getMetricDescriptors() {
return metricDescriptors;
}
@Override
public Long getStatusMetric(final MetricDescriptor<?> descriptor) {
return values[descriptor.getMetricIdentifier()];
}
public void setTimestamp(final Date timestamp) {
this.timestamp = timestamp;
}
@Override
public Map<MetricDescriptor<?>, Long> getStatusMetrics() {
return metricValues;
}
public void addStatusMetric(final MetricDescriptor<?> metric, final Long value) {
metricValues.put(metric, value);
values[metric.getMetricIdentifier()] = value;
}
public void addCounterStatusMetric(final MetricDescriptor<?> metric, final Long value) {
if (counterValues == null) {
counterValues = new HashMap<>();
}
counterValues.put(metric, value);
}
public StandardStatusSnapshot withoutCounters() {
if (counterValues == null) {
return this;
}
final StandardStatusSnapshot without = new StandardStatusSnapshot(metricDescriptors, values);
without.setTimestamp(timestamp);
return without;
}
@Override
@ -52,16 +90,16 @@ public class StandardStatusSnapshot implements StatusSnapshot {
@Override
public StatusSnapshot reduce(final List<StatusSnapshot> values) {
Date reducedTimestamp = null;
final Set<MetricDescriptor<?>> allDescriptors = new LinkedHashSet<>(metricValues.keySet());
final Set<MetricDescriptor<?>> allDescriptors = new LinkedHashSet<>(metricDescriptors);
for (final StatusSnapshot statusSnapshot : values) {
if (reducedTimestamp == null) {
reducedTimestamp = statusSnapshot.getTimestamp();
}
allDescriptors.addAll(statusSnapshot.getStatusMetrics().keySet());
allDescriptors.addAll(statusSnapshot.getMetricDescriptors());
}
final StandardStatusSnapshot reduced = new StandardStatusSnapshot();
final StandardStatusSnapshot reduced = new StandardStatusSnapshot(allDescriptors);
if (reducedTimestamp != null) {
reduced.setTimestamp(reducedTimestamp);
}

View File

@ -16,6 +16,10 @@
*/
package org.apache.nifi.controller.status.history;
import org.apache.nifi.web.api.dto.status.StatusDescriptorDTO;
import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
@ -27,10 +31,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.web.api.dto.status.StatusDescriptorDTO;
import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO;
public class StatusHistoryUtil {
public static StatusHistoryDTO createStatusHistoryDTO(final StatusHistory statusHistory) {
@ -43,7 +43,7 @@ public class StatusHistoryUtil {
final StatusSnapshotDTO snapshotDto = StatusHistoryUtil.createStatusSnapshotDto(snapshot);
snapshotDtos.add(snapshotDto);
metricNames.addAll(snapshotDto.getStatusMetrics().keySet());
metricDescriptors.addAll(snapshot.getStatusMetrics().keySet());
metricDescriptors.addAll(snapshot.getMetricDescriptors());
}
// We need to ensure that the 'aggregate snapshot' has an entry for every metric, including counters.
@ -94,9 +94,7 @@ public class StatusHistoryUtil {
final Set<MetricDescriptor<?>> allDescriptors = new LinkedHashSet<>();
for (final StatusSnapshot statusSnapshot : statusHistory.getStatusSnapshots()) {
for (final MetricDescriptor<?> metricDescriptor : statusSnapshot.getStatusMetrics().keySet()) {
allDescriptors.add(metricDescriptor);
}
allDescriptors.addAll(statusSnapshot.getMetricDescriptors());
}
for (final MetricDescriptor<?> metricDescriptor : allDescriptors) {
@ -111,8 +109,8 @@ public class StatusHistoryUtil {
dto.setTimestamp(statusSnapshot.getTimestamp());
final Map<String, Long> statusMetrics = new HashMap<>();
for (final Map.Entry<MetricDescriptor<?>, Long> entry : statusSnapshot.getStatusMetrics().entrySet()) {
statusMetrics.put(entry.getKey().getField(), entry.getValue());
for (final MetricDescriptor<?> descriptor : statusSnapshot.getMetricDescriptors()) {
statusMetrics.put(descriptor.getField(), statusSnapshot.getStatusMetric(descriptor));
}
dto.setStatusMetrics(statusMetrics);

View File

@ -16,43 +16,64 @@
*/
package org.apache.nifi.controller.status.history;
import java.util.Date;
import java.util.List;
import java.util.Map;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter;
import org.apache.nifi.util.ComponentStatusReport;
import org.apache.nifi.util.ComponentStatusReport.ComponentType;
import org.apache.nifi.util.ComponentMetrics;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.RingBuffer;
import org.apache.nifi.util.RingBuffer.ForEachEvaluator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
public class VolatileComponentStatusRepository implements ComponentStatusRepository {
private static final Logger logger = LoggerFactory.getLogger(VolatileComponentStatusRepository.class);
private static final Set<MetricDescriptor<?>> DEFAULT_PROCESSOR_METRICS = Arrays.stream(ProcessorStatusDescriptor.values())
.map(ProcessorStatusDescriptor::getDescriptor)
.collect(Collectors.toSet());
private static final Set<MetricDescriptor<?>> DEFAULT_CONNECTION_METRICS = Arrays.stream(ConnectionStatusDescriptor.values())
.map(ConnectionStatusDescriptor::getDescriptor)
.collect(Collectors.toSet());
private static final Set<MetricDescriptor<?>> DEFAULT_GROUP_METRICS = Arrays.stream(ProcessGroupStatusDescriptor.values())
.map(ProcessGroupStatusDescriptor::getDescriptor)
.collect(Collectors.toSet());
private static final Set<MetricDescriptor<?>> DEFAULT_RPG_METRICS = Arrays.stream(RemoteProcessGroupStatusDescriptor.values())
.map(RemoteProcessGroupStatusDescriptor::getDescriptor)
.collect(Collectors.toSet());
public static final String NUM_DATA_POINTS_PROPERTY = "nifi.components.status.repository.buffer.size";
public static final int DEFAULT_NUM_DATA_POINTS = 288; // 1 day worth of 5-minute snapshots
private final RingBuffer<Capture> captures;
private final Logger logger = LoggerFactory.getLogger(VolatileComponentStatusRepository.class);
private final Map<String, ComponentStatusHistory> componentStatusHistories = new HashMap<>();
private final RingBuffer<Date> timestamps;
private final RingBuffer<List<GarbageCollectionStatus>> gcStatuses;
private final int numDataPoints;
private volatile long lastCaptureTime = 0L;
/**
* Default no args constructor for service loading only
*/
public VolatileComponentStatusRepository(){
captures = null;
public VolatileComponentStatusRepository() {
numDataPoints = DEFAULT_NUM_DATA_POINTS;
gcStatuses = null;
timestamps = null;
}
public VolatileComponentStatusRepository(final NiFiProperties nifiProperties) {
final int numDataPoints = nifiProperties.getIntegerProperty(NUM_DATA_POINTS_PROPERTY, DEFAULT_NUM_DATA_POINTS);
captures = new RingBuffer<>(numDataPoints);
numDataPoints = nifiProperties.getIntegerProperty(NUM_DATA_POINTS_PROPERTY, DEFAULT_NUM_DATA_POINTS);
gcStatuses = new RingBuffer<>(numDataPoints);
timestamps = new RingBuffer<>(numDataPoints);
}
@Override
@ -62,14 +83,60 @@ public class VolatileComponentStatusRepository implements ComponentStatusReposit
@Override
public synchronized void capture(final ProcessGroupStatus rootGroupStatus, final List<GarbageCollectionStatus> gcStatus, final Date timestamp) {
final ComponentStatusReport statusReport = ComponentStatusReport.fromProcessGroupStatus(rootGroupStatus, ComponentType.PROCESSOR,
ComponentType.CONNECTION, ComponentType.PROCESS_GROUP, ComponentType.REMOTE_PROCESS_GROUP);
final Date evicted = timestamps.add(timestamp);
if (evicted != null) {
componentStatusHistories.values().forEach(history -> history.expireBefore(evicted));
}
capture(rootGroupStatus, timestamp);
gcStatuses.add(gcStatus);
captures.add(new Capture(timestamp, statusReport, gcStatus));
logger.debug("Captured metrics for {}", this);
lastCaptureTime = Math.max(lastCaptureTime, timestamp.getTime());
}
private void capture(final ProcessGroupStatus groupStatus, final Date timestamp) {
// Capture status for the ProcessGroup
final ComponentDetails groupDetails = ComponentDetails.forProcessGroup(groupStatus);
final StatusSnapshot groupSnapshot = ComponentMetrics.createSnapshot(groupStatus, timestamp);
updateStatusHistory(groupSnapshot, groupDetails, timestamp);
// Capture statuses for the Processors
for (final ProcessorStatus processorStatus : groupStatus.getProcessorStatus()) {
final ComponentDetails componentDetails = ComponentDetails.forProcessor(processorStatus);
final StatusSnapshot snapshot = ComponentMetrics.createSnapshot(processorStatus, timestamp);
updateStatusHistory(snapshot, componentDetails, timestamp);
}
// Capture statuses for the Connections
for (final ConnectionStatus connectionStatus : groupStatus.getConnectionStatus()) {
final ComponentDetails componentDetails = ComponentDetails.forConnection(connectionStatus);
final StatusSnapshot snapshot = ComponentMetrics.createSnapshot(connectionStatus, timestamp);
updateStatusHistory(snapshot, componentDetails, timestamp);
}
// Capture statuses for the RPG's
for (final RemoteProcessGroupStatus rpgStatus : groupStatus.getRemoteProcessGroupStatus()) {
final ComponentDetails componentDetails = ComponentDetails.forRemoteProcessGroup(rpgStatus);
final StatusSnapshot snapshot = ComponentMetrics.createSnapshot(rpgStatus, timestamp);
updateStatusHistory(snapshot, componentDetails, timestamp);
}
// Capture statuses for the child groups
for (final ProcessGroupStatus childStatus : groupStatus.getProcessGroupStatus()) {
capture(childStatus, timestamp);
}
}
private void updateStatusHistory(final StatusSnapshot statusSnapshot, final ComponentDetails componentDetails, final Date timestamp) {
final String componentId = componentDetails.getComponentId();
final ComponentStatusHistory procHistory = componentStatusHistories.computeIfAbsent(componentId, id -> new ComponentStatusHistory(componentDetails, numDataPoints));
procHistory.update(statusSnapshot, componentDetails);
}
@Override
public Date getLastCaptureDate() {
return new Date(lastCaptureTime);
@ -77,199 +144,55 @@ public class VolatileComponentStatusRepository implements ComponentStatusReposit
@Override
public StatusHistory getProcessorStatusHistory(final String processorId, final Date start, final Date end, final int preferredDataPoints, final boolean includeCounters) {
final StandardStatusHistory history = new StandardStatusHistory();
history.setComponentDetail(COMPONENT_DETAIL_ID, processorId);
captures.forEach(new ForEachEvaluator<Capture>() {
@Override
public boolean evaluate(final Capture capture) {
final ComponentStatusReport statusReport = capture.getStatusReport();
final ProcessorStatus status = statusReport.getProcessorStatus(processorId);
if (status == null) {
return true;
}
history.setComponentDetail(COMPONENT_DETAIL_GROUP_ID, status.getGroupId());
history.setComponentDetail(COMPONENT_DETAIL_NAME, status.getName());
history.setComponentDetail(COMPONENT_DETAIL_TYPE, status.getType());
final StandardStatusSnapshot snapshot = new StandardStatusSnapshot();
snapshot.setTimestamp(capture.getCaptureDate());
for (final ProcessorStatusDescriptor descriptor : ProcessorStatusDescriptor.values()) {
if (descriptor.isVisible()) {
snapshot.addStatusMetric(descriptor.getDescriptor(), descriptor.getDescriptor().getValueFunction().getValue(status));
}
}
if (includeCounters) {
final Map<String, Long> counters = status.getCounters();
if (counters != null) {
for (final Map.Entry<String, Long> entry : counters.entrySet()) {
final String counterName = entry.getKey();
final String label = entry.getKey() + " (5 mins)";
final MetricDescriptor<ProcessorStatus> metricDescriptor = new StandardMetricDescriptor<>(entry.getKey(), label, label, Formatter.COUNT,
s -> s.getCounters() == null ? null : s.getCounters().get(counterName));
snapshot.addStatusMetric(metricDescriptor, entry.getValue());
}
}
}
history.addStatusSnapshot(snapshot);
return true;
}
});
return history;
return getStatusHistory(processorId, includeCounters, DEFAULT_PROCESSOR_METRICS);
}
@Override
public StatusHistory getConnectionStatusHistory(final String connectionId, final Date start, final Date end, final int preferredDataPoints) {
final StandardStatusHistory history = new StandardStatusHistory();
history.setComponentDetail(COMPONENT_DETAIL_ID, connectionId);
captures.forEach(new ForEachEvaluator<Capture>() {
@Override
public boolean evaluate(final Capture capture) {
final ComponentStatusReport statusReport = capture.getStatusReport();
final ConnectionStatus status = statusReport.getConnectionStatus(connectionId);
if (status == null) {
return true;
}
history.setComponentDetail(COMPONENT_DETAIL_GROUP_ID, status.getGroupId());
history.setComponentDetail(COMPONENT_DETAIL_NAME, status.getName());
history.setComponentDetail(COMPONENT_DETAIL_SOURCE_NAME, status.getSourceName());
history.setComponentDetail(COMPONENT_DETAIL_DESTINATION_NAME, status.getDestinationName());
final StandardStatusSnapshot snapshot = new StandardStatusSnapshot();
snapshot.setTimestamp(capture.getCaptureDate());
for (final ConnectionStatusDescriptor descriptor : ConnectionStatusDescriptor.values()) {
snapshot.addStatusMetric(descriptor.getDescriptor(), descriptor.getDescriptor().getValueFunction().getValue(status));
}
history.addStatusSnapshot(snapshot);
return true;
}
});
return history;
return getStatusHistory(connectionId, true, DEFAULT_CONNECTION_METRICS);
}
@Override
public StatusHistory getProcessGroupStatusHistory(final String processGroupId, final Date start, final Date end, final int preferredDataPoints) {
final StandardStatusHistory history = new StandardStatusHistory();
history.setComponentDetail(COMPONENT_DETAIL_ID, processGroupId);
captures.forEach(new ForEachEvaluator<Capture>() {
@Override
public boolean evaluate(final Capture capture) {
final ComponentStatusReport statusReport = capture.getStatusReport();
final ProcessGroupStatus status = statusReport.getProcessGroupStatus(processGroupId);
if (status == null) {
return true;
}
history.setComponentDetail(COMPONENT_DETAIL_NAME, status.getName());
final StandardStatusSnapshot snapshot = new StandardStatusSnapshot();
snapshot.setTimestamp(capture.getCaptureDate());
for (final ProcessGroupStatusDescriptor descriptor : ProcessGroupStatusDescriptor.values()) {
snapshot.addStatusMetric(descriptor.getDescriptor(), descriptor.getDescriptor().getValueFunction().getValue(status));
}
history.addStatusSnapshot(snapshot);
return true;
}
});
return history;
return getStatusHistory(processGroupId, true, DEFAULT_GROUP_METRICS);
}
@Override
public StatusHistory getRemoteProcessGroupStatusHistory(final String remoteGroupId, final Date start, final Date end, final int preferredDataPoints) {
final StandardStatusHistory history = new StandardStatusHistory();
history.setComponentDetail(COMPONENT_DETAIL_ID, remoteGroupId);
captures.forEach(new ForEachEvaluator<Capture>() {
@Override
public boolean evaluate(final Capture capture) {
final ComponentStatusReport statusReport = capture.getStatusReport();
final RemoteProcessGroupStatus status = statusReport.getRemoteProcessGroupStatus(remoteGroupId);
if (status == null) {
return true;
}
history.setComponentDetail(COMPONENT_DETAIL_GROUP_ID, status.getGroupId());
history.setComponentDetail(COMPONENT_DETAIL_NAME, status.getName());
history.setComponentDetail(COMPONENT_DETAIL_URI, status.getTargetUri());
final StandardStatusSnapshot snapshot = new StandardStatusSnapshot();
snapshot.setTimestamp(capture.getCaptureDate());
for (final RemoteProcessGroupStatusDescriptor descriptor : RemoteProcessGroupStatusDescriptor.values()) {
snapshot.addStatusMetric(descriptor.getDescriptor(), descriptor.getDescriptor().getValueFunction().getValue(status));
}
history.addStatusSnapshot(snapshot);
return true;
}
});
return history;
return getStatusHistory(remoteGroupId, true, DEFAULT_RPG_METRICS);
}
private synchronized StatusHistory getStatusHistory(final String componentId, final boolean includeCounters, final Set<MetricDescriptor<?>> defaultMetricDescriptors) {
final ComponentStatusHistory history = componentStatusHistories.get(componentId);
if (history == null) {
return null;
}
final List<Date> dates = timestamps.asList();
return history.toStatusHistory(dates, includeCounters, defaultMetricDescriptors);
}
@Override
public GarbageCollectionHistory getGarbageCollectionHistory(final Date start, final Date end) {
final StandardGarbageCollectionHistory history = new StandardGarbageCollectionHistory();
captures.forEach(new ForEachEvaluator<Capture>() {
@Override
public boolean evaluate(final Capture capture) {
if (capture.getCaptureDate().before(start)) {
return true;
gcStatuses.forEach(statusSet -> {
for (final GarbageCollectionStatus gcStatus : statusSet) {
if (gcStatus.getTimestamp().before(start)) {
continue;
}
if (capture.getCaptureDate().after(end)) {
return false;
if (gcStatus.getTimestamp().after(end)) {
continue;
}
final List<GarbageCollectionStatus> statuses = capture.getGarbageCollectionStatus();
if (statuses != null) {
statuses.stream().forEach(history::addGarbageCollectionStatus);
}
return true;
history.addGarbageCollectionStatus(gcStatus);
}
return true;
});
return history;
}
private static class Capture {
private final Date captureDate;
private final ComponentStatusReport statusReport;
private final List<GarbageCollectionStatus> gcStatus;
public Capture(final Date date, final ComponentStatusReport statusReport, final List<GarbageCollectionStatus> gcStatus) {
this.captureDate = date;
this.statusReport = statusReport;
this.gcStatus = gcStatus;
}
public Date getCaptureDate() {
return captureDate;
}
public ComponentStatusReport getStatusReport() {
return statusReport;
}
public List<GarbageCollectionStatus> getGarbageCollectionStatus() {
return gcStatus;
}
}
}

View File

@ -16,10 +16,6 @@
*/
package org.apache.nifi.controller.tasks;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
@ -35,8 +31,8 @@ import org.apache.nifi.controller.repository.StandardProcessSessionFactory;
import org.apache.nifi.controller.repository.WeakHashMapProcessSessionFactory;
import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
import org.apache.nifi.controller.scheduling.ConnectableProcessContext;
import org.apache.nifi.controller.scheduling.RepositoryContextFactory;
import org.apache.nifi.controller.scheduling.LifecycleState;
import org.apache.nifi.controller.scheduling.RepositoryContextFactory;
import org.apache.nifi.controller.scheduling.SchedulingAgent;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.logging.ComponentLog;
@ -51,6 +47,10 @@ import org.apache.nifi.util.Connectables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* Continually runs a <code>{@link Connectable}</code> component as long as the component has work to do.
* {@link #invoke()} ()} will return <code>{@link InvocationResult}</code> telling if the component should be yielded.
@ -267,10 +267,10 @@ public class ConnectableTask {
final long processingNanos = System.nanoTime() - startNanos;
try {
final StandardFlowFileEvent procEvent = new StandardFlowFileEvent(connectable.getIdentifier());
final StandardFlowFileEvent procEvent = new StandardFlowFileEvent();
procEvent.setProcessingNanos(processingNanos);
procEvent.setInvocations(invocationCount);
repositoryContext.getFlowFileEventRepository().updateRepository(procEvent);
repositoryContext.getFlowFileEventRepository().updateRepository(procEvent, connectable.getIdentifier());
} catch (final IOException e) {
logger.error("Unable to update FlowFileEvent Repository for {}; statistics may be inaccurate. Reason for failure: {}", connectable.getRunnableComponent(), e.toString());
logger.error("", e);

View File

@ -957,7 +957,7 @@ public final class StandardProcessGroup implements ProcessGroup {
public Collection<ProcessorNode> getProcessors() {
readLock.lock();
try {
return processors.values();
return new HashSet<>(processors.values());
} finally {
readLock.unlock();
}

View File

@ -0,0 +1,188 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.util;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor;
import org.apache.nifi.controller.status.history.MetricDescriptor;
import org.apache.nifi.controller.status.history.ProcessGroupStatusDescriptor;
import org.apache.nifi.controller.status.history.ProcessorStatusDescriptor;
import org.apache.nifi.controller.status.history.RemoteProcessGroupStatusDescriptor;
import org.apache.nifi.controller.status.history.StandardMetricDescriptor;
import org.apache.nifi.controller.status.history.StandardStatusSnapshot;
import org.apache.nifi.controller.status.history.StatusSnapshot;
import java.util.Date;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
public class ComponentMetrics {
private static final Set<MetricDescriptor<?>> PROCESSOR_METRICS;
private static final Set<MetricDescriptor<?>> CONNECTION_METRICS;
private static final Set<MetricDescriptor<?>> PROCESS_GROUP_METRICS;
private static final Set<MetricDescriptor<?>> RPG_METRICS;
static {
PROCESSOR_METRICS = new HashSet<>();
CONNECTION_METRICS = new HashSet<>();
PROCESS_GROUP_METRICS = new HashSet<>();
RPG_METRICS = new HashSet<>();
for (final ProcessorStatusDescriptor descriptor : ProcessorStatusDescriptor.values()) {
PROCESSOR_METRICS.add(descriptor.getDescriptor());
}
for (final ConnectionStatusDescriptor descriptor : ConnectionStatusDescriptor.values()) {
CONNECTION_METRICS.add(descriptor.getDescriptor());
}
for (final ProcessGroupStatusDescriptor descriptor : ProcessGroupStatusDescriptor.values()) {
PROCESS_GROUP_METRICS.add(descriptor.getDescriptor());
}
for (final RemoteProcessGroupStatusDescriptor descriptor : RemoteProcessGroupStatusDescriptor.values()) {
RPG_METRICS.add(descriptor.getDescriptor());
}
}
public static StatusSnapshot createSnapshot(final ProcessorStatus status, final Date timestamp) {
if (isEmpty(status)) {
return null;
}
final StandardStatusSnapshot snapshot = new StandardStatusSnapshot(PROCESSOR_METRICS);
snapshot.setTimestamp(timestamp);
for (final ProcessorStatusDescriptor descriptor : ProcessorStatusDescriptor.values()) {
if (descriptor.isVisible()) {
snapshot.addStatusMetric(descriptor.getDescriptor(), descriptor.getDescriptor().getValueFunction().getValue(status));
}
}
final Map<String, Long> counters = status.getCounters();
if (counters != null) {
for (final Map.Entry<String, Long> entry : counters.entrySet()) {
final String counterName = entry.getKey();
final String label = entry.getKey() + " (5 mins)";
final MetricDescriptor<ProcessorStatus> metricDescriptor = new StandardMetricDescriptor<>(() -> 0, entry.getKey(), label, label, MetricDescriptor.Formatter.COUNT,
s -> s.getCounters() == null ? null : s.getCounters().get(counterName));
snapshot.addCounterStatusMetric(metricDescriptor, entry.getValue());
}
}
return snapshot;
}
public static boolean isEmpty(final ProcessorStatus status) {
for (final ProcessorStatusDescriptor descriptor : ProcessorStatusDescriptor.values()) {
if (descriptor.isVisible()) {
final Long value = descriptor.getDescriptor().getValueFunction().getValue(status);
if (value != null && value > 0) {
return false;
}
}
}
return true;
}
public static StatusSnapshot createSnapshot(final ConnectionStatus status, final Date timestamp) {
if (isEmpty(status)) {
return null;
}
final StandardStatusSnapshot snapshot = new StandardStatusSnapshot(CONNECTION_METRICS);
snapshot.setTimestamp(timestamp);
for (final ConnectionStatusDescriptor descriptor : ConnectionStatusDescriptor.values()) {
snapshot.addStatusMetric(descriptor.getDescriptor(), descriptor.getDescriptor().getValueFunction().getValue(status));
}
return snapshot;
}
public static boolean isEmpty(final ConnectionStatus status) {
for (final ConnectionStatusDescriptor descriptor : ConnectionStatusDescriptor.values()) {
final Long value = descriptor.getDescriptor().getValueFunction().getValue(status);
if (value != null && value > 0) {
return false;
}
}
return true;
}
public static StatusSnapshot createSnapshot(final ProcessGroupStatus status, final Date timestamp) {
if (isEmpty(status)) {
return null;
}
final StandardStatusSnapshot snapshot = new StandardStatusSnapshot(PROCESS_GROUP_METRICS);
snapshot.setTimestamp(timestamp);
for (final ProcessGroupStatusDescriptor descriptor : ProcessGroupStatusDescriptor.values()) {
snapshot.addStatusMetric(descriptor.getDescriptor(), descriptor.getDescriptor().getValueFunction().getValue(status));
}
return snapshot;
}
private static boolean isEmpty(final ProcessGroupStatus status) {
for (final ProcessGroupStatusDescriptor descriptor : ProcessGroupStatusDescriptor.values()) {
final Long value = descriptor.getDescriptor().getValueFunction().getValue(status);
if (value != null && value > 0) {
return false;
}
}
return true;
}
public static StatusSnapshot createSnapshot(final RemoteProcessGroupStatus status, final Date timestamp) {
if (isEmpty(status)) {
return null;
}
final StandardStatusSnapshot snapshot = new StandardStatusSnapshot(RPG_METRICS);
snapshot.setTimestamp(timestamp);
for (final RemoteProcessGroupStatusDescriptor descriptor : RemoteProcessGroupStatusDescriptor.values()) {
snapshot.addStatusMetric(descriptor.getDescriptor(), descriptor.getDescriptor().getValueFunction().getValue(status));
}
return snapshot;
}
private static boolean isEmpty(final RemoteProcessGroupStatus status) {
for (final RemoteProcessGroupStatusDescriptor descriptor : RemoteProcessGroupStatusDescriptor.values()) {
final Long value = descriptor.getDescriptor().getValueFunction().getValue(status);
if (value != null && value > 0) {
return false;
}
}
return true;
}
}

View File

@ -1,137 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.util;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
/**
* ComponentStatusReport is a util class that can be used to "flatten" a ProcessGroupStatus into a collection of Map's so that retrieval of a Status for a particular component is very efficient
*/
public class ComponentStatusReport {
private final Map<String, ProcessGroupStatus> groupMap = new HashMap<>();
private final Map<String, ProcessorStatus> processorMap = new HashMap<>();
private final Map<String, ConnectionStatus> connectionMap = new HashMap<>();
private final Map<String, RemoteProcessGroupStatus> remoteGroupMap = new HashMap<>();
private final Map<String, PortStatus> inputPortMap = new HashMap<>();
private final Map<String, PortStatus> outputPortMap = new HashMap<>();
private ComponentStatusReport() {
}
public static ComponentStatusReport createEmpty() {
return new ComponentStatusReport();
}
public static ComponentStatusReport fromProcessGroupStatus(final ProcessGroupStatus status) {
return fromProcessGroupStatus(status, ComponentType.values());
}
public static ComponentStatusReport fromProcessGroupStatus(final ProcessGroupStatus status, final ComponentType... componentTypes) {
final Set<ComponentType> componentTypeSet = new HashSet<>();
for (final ComponentType type : componentTypes) {
componentTypeSet.add(type);
}
final ComponentStatusReport report = new ComponentStatusReport();
report.populate(status, componentTypeSet);
return report;
}
private void populate(final ProcessGroupStatus status, final Set<ComponentType> componentTypes) {
if (componentTypes.contains(ComponentType.PROCESS_GROUP)) {
groupMap.put(status.getId(), status);
}
if (componentTypes.contains(ComponentType.PROCESSOR)) {
for (final ProcessorStatus procStatus : status.getProcessorStatus()) {
processorMap.put(procStatus.getId(), procStatus);
}
}
if (componentTypes.contains(ComponentType.CONNECTION)) {
for (final ConnectionStatus connStatus : status.getConnectionStatus()) {
connectionMap.put(connStatus.getId(), connStatus);
}
}
if (componentTypes.contains(ComponentType.REMOTE_PROCESS_GROUP)) {
for (final RemoteProcessGroupStatus rpgStatus : status.getRemoteProcessGroupStatus()) {
remoteGroupMap.put(rpgStatus.getId(), rpgStatus);
}
}
if (componentTypes.contains(ComponentType.INPUT_PORT)) {
for (final PortStatus portStatus : status.getInputPortStatus()) {
inputPortMap.put(portStatus.getId(), portStatus);
}
}
if (componentTypes.contains(ComponentType.OUTPUT_PORT)) {
for (final PortStatus portStatus : status.getOutputPortStatus()) {
outputPortMap.put(portStatus.getId(), portStatus);
}
}
for (final ProcessGroupStatus childStatus : status.getProcessGroupStatus()) {
populate(childStatus, componentTypes);
}
}
public ProcessGroupStatus getProcessGroupStatus(final String groupId) {
return groupMap.get(groupId);
}
public ProcessorStatus getProcessorStatus(final String processorId) {
return processorMap.get(processorId);
}
public ConnectionStatus getConnectionStatus(final String connectionId) {
return connectionMap.get(connectionId);
}
public RemoteProcessGroupStatus getRemoteProcessGroupStatus(final String remoteGroupId) {
return remoteGroupMap.get(remoteGroupId);
}
public PortStatus getInputPortStatus(final String portId) {
return inputPortMap.get(portId);
}
public PortStatus getOutputPortStatus(final String portId) {
return outputPortMap.get(portId);
}
public static enum ComponentType {
PROCESSOR,
INPUT_PORT,
OUTPUT_PORT,
PROCESS_GROUP,
CONNECTION,
REMOTE_PROCESS_GROUP;
}
}

View File

@ -16,17 +16,15 @@
*/
package org.apache.nifi.controller.repository;
import org.apache.nifi.controller.repository.StandardRepositoryStatusReport;
import org.apache.nifi.controller.repository.metrics.RingBufferEventRepository;
import org.apache.nifi.controller.repository.FlowFileEvent;
import org.junit.Test;
import org.testng.Assert;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.testng.Assert;
public class TestRingBufferEventRepository {
@Test
@ -34,15 +32,15 @@ public class TestRingBufferEventRepository {
final RingBufferEventRepository repo = new RingBufferEventRepository(5);
long insertNanos = 0L;
for (int i = 0; i < 1000000; i++) {
final FlowFileEvent event = generateEvent("ABC");
final FlowFileEvent event = generateEvent();
final long insertStart = System.nanoTime();
repo.updateRepository(event);
repo.updateRepository(event, "ABC");
insertNanos += System.nanoTime() - insertStart;
}
final long queryStart = System.nanoTime();
final StandardRepositoryStatusReport report = repo.reportTransferEvents(System.currentTimeMillis() - 2 * 60000);
final StandardRepositoryStatusReport report = repo.reportTransferEvents(System.currentTimeMillis());
final long queryNanos = System.nanoTime() - queryStart;
System.out.println(report);
System.out.println("Insert: " + TimeUnit.MILLISECONDS.convert(insertNanos, TimeUnit.NANOSECONDS));
@ -55,36 +53,31 @@ public class TestRingBufferEventRepository {
final FlowFileEventRepository repo = new RingBufferEventRepository(5);
String id1 = "component1";
String id2 = "component2";
repo.updateRepository(generateEvent(id1));
repo.updateRepository(generateEvent(id2));
RepositoryStatusReport report = repo.reportTransferEvents(System.currentTimeMillis() - 2 * 60000);
repo.updateRepository(generateEvent(), id1);
repo.updateRepository(generateEvent(), id2);
RepositoryStatusReport report = repo.reportTransferEvents(System.currentTimeMillis());
FlowFileEvent entry = report.getReportEntry(id1);
Assert.assertNotNull(entry);
entry = report.getReportEntry(id2);
Assert.assertNotNull(entry);
repo.purgeTransferEvents(id1);
report = repo.reportTransferEvents(System.currentTimeMillis() - 2 * 60000);
report = repo.reportTransferEvents(System.currentTimeMillis());
entry = report.getReportEntry(id1);
Assert.assertNull(entry);
entry = report.getReportEntry(id2);
Assert.assertNotNull(entry);
repo.purgeTransferEvents(id2);
report = repo.reportTransferEvents(System.currentTimeMillis() - 2 * 60000);
report = repo.reportTransferEvents(System.currentTimeMillis());
entry = report.getReportEntry(id2);
Assert.assertNull(entry);
repo.close();
}
private FlowFileEvent generateEvent(final String id) {
private FlowFileEvent generateEvent() {
return new FlowFileEvent() {
@Override
public String getComponentIdentifier() {
return id;
}
@Override
public int getFlowFilesIn() {
return 1;

View File

@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.repository.metrics;
import org.apache.nifi.controller.repository.FlowFileEvent;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class TestSecondPrecisionEventContainer {
@Test
public void testUpdateOncePerSecond() {
final SecondPrecisionEventContainer container = new SecondPrecisionEventContainer(5);
final long startTime = System.currentTimeMillis();
final StandardFlowFileEvent event = new StandardFlowFileEvent();
event.setBytesRead(100L);
event.setBytesWritten(100L);
for (int i=0; i < 5; i++) {
for (int j=0; j < 300; j++) {
container.addEvent(event, startTime + (i * 300_000) + (j * 1000));
}
final long timestamp = startTime + 300_000 * i + 300_000;
final FlowFileEvent result = container.generateReport(timestamp);
assertEquals("Failure at i=" + i, 300 * 100, result.getBytesRead());
assertEquals("Failure at i=" + i, 300 * 100, result.getBytesWritten());
}
}
@Test
public void testExpiresOnReportGeneration() {
final SecondPrecisionEventContainer container = new SecondPrecisionEventContainer(5);
final long startTime = System.currentTimeMillis();
final StandardFlowFileEvent event = new StandardFlowFileEvent();
event.setBytesRead(100L);
event.setBytesWritten(100L);
for (int j=0; j < 100; j++) {
container.addEvent(event, startTime + (j * 1000));
}
final FlowFileEvent resultAt5Mins = container.generateReport(startTime + 300_000);
assertEquals(100 * 100, resultAt5Mins.getBytesRead());
assertEquals(100 * 100, resultAt5Mins.getBytesWritten());
final FlowFileEvent resultAt5MinsPlus50Seconds = container.generateReport(startTime + 350_000);
assertEquals(50 * 100, resultAt5MinsPlus50Seconds.getBytesRead());
assertEquals(50 * 100, resultAt5MinsPlus50Seconds.getBytesWritten());
final FlowFileEvent resultAt5MinsPlus99Seconds = container.generateReport(startTime + 399_000);
assertEquals(100, resultAt5MinsPlus99Seconds.getBytesRead());
assertEquals(100, resultAt5MinsPlus99Seconds.getBytesWritten());
final FlowFileEvent resultAt5MinsPlus100Seconds = container.generateReport(startTime + 400_000);
assertEquals(0, resultAt5MinsPlus100Seconds.getBytesRead());
assertEquals(0, resultAt5MinsPlus100Seconds.getBytesWritten());
final FlowFileEvent resultAt5MinsPlus101Seconds = container.generateReport(startTime + 401_000);
assertEquals(0, resultAt5MinsPlus101Seconds.getBytesRead());
assertEquals(0, resultAt5MinsPlus101Seconds.getBytesWritten());
final FlowFileEvent resultsAt5MinsPlus300seconds = container.generateReport(startTime + 600_000);
assertEquals(0, resultsAt5MinsPlus300seconds.getBytesRead());
assertEquals(0, resultsAt5MinsPlus300seconds.getBytesWritten());
final FlowFileEvent resultsAt5MinsPlus600seconds = container.generateReport(startTime + 900_000);
assertEquals(0, resultsAt5MinsPlus600seconds.getBytesRead());
assertEquals(0, resultsAt5MinsPlus600seconds.getBytesWritten());
}
@Test
public void testExpiresOnReportGenerationWithSkipsBetweenUpdates() {
final SecondPrecisionEventContainer container = new SecondPrecisionEventContainer(5);
final long startTime = System.currentTimeMillis();
final StandardFlowFileEvent event = new StandardFlowFileEvent();
event.setBytesRead(100L);
event.setBytesWritten(100L);
for (int j=0; j < 20; j++) {
container.addEvent(event, startTime + (j * 5000));
}
final FlowFileEvent resultAt5Mins = container.generateReport(startTime + 300_000);
assertEquals(20 * 100, resultAt5Mins.getBytesRead());
assertEquals(20 * 100, resultAt5Mins.getBytesWritten());
final FlowFileEvent resultAt5MinsPlus50Seconds = container.generateReport(startTime + 350_000);
assertEquals(10 * 100, resultAt5MinsPlus50Seconds.getBytesRead());
assertEquals(10 * 100, resultAt5MinsPlus50Seconds.getBytesWritten());
final FlowFileEvent resultAt5MinsPlus94Seconds = container.generateReport(startTime + 394_000);
assertEquals(100, resultAt5MinsPlus94Seconds.getBytesRead());
assertEquals(100, resultAt5MinsPlus94Seconds.getBytesWritten());
final FlowFileEvent resultAt5MinsPlus95Seconds = container.generateReport(startTime + 395_000);
assertEquals(100, resultAt5MinsPlus95Seconds.getBytesRead());
assertEquals(100, resultAt5MinsPlus95Seconds.getBytesWritten());
final FlowFileEvent resultAt5MinsPlus100Seconds = container.generateReport(startTime + 400_000);
assertEquals(0, resultAt5MinsPlus100Seconds.getBytesRead());
assertEquals(0, resultAt5MinsPlus100Seconds.getBytesWritten());
final FlowFileEvent resultAt5MinsPlus101Seconds = container.generateReport(startTime + 401_000);
assertEquals(0, resultAt5MinsPlus101Seconds.getBytesRead());
assertEquals(0, resultAt5MinsPlus101Seconds.getBytesWritten());
final FlowFileEvent resultsAt5MinsPlus300seconds = container.generateReport(startTime + 600_000);
assertEquals(0, resultsAt5MinsPlus300seconds.getBytesRead());
assertEquals(0, resultsAt5MinsPlus300seconds.getBytesWritten());
final FlowFileEvent resultsAt5MinsPlus600seconds = container.generateReport(startTime + 900_000);
assertEquals(0, resultsAt5MinsPlus600seconds.getBytesRead());
assertEquals(0, resultsAt5MinsPlus600seconds.getBytesWritten());
}
}

View File

@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.status.history;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class TestMetricRollingBuffer {
private static final Set<MetricDescriptor<?>> PROCESSOR_METRICS = Arrays.stream(ProcessorStatusDescriptor.values())
.map(ProcessorStatusDescriptor::getDescriptor)
.collect(Collectors.toSet());
@Test
public void testBufferGrows() {
final int bufferCapacity = 1000;
final MetricRollingBuffer buffer = new MetricRollingBuffer(bufferCapacity);
final long startTime = System.currentTimeMillis();
final List<Date> timestamps = new ArrayList<>();
int iterations = 1440;
for (int i=0; i < iterations; i++) {
final StandardStatusSnapshot snapshot = new StandardStatusSnapshot(PROCESSOR_METRICS);
snapshot.setTimestamp(new Date(startTime + i * 1000));
timestamps.add(snapshot.getTimestamp());
snapshot.addStatusMetric(ProcessorStatusDescriptor.BYTES_WRITTEN.getDescriptor(), Long.valueOf(i));
buffer.update(snapshot);
}
assertEquals(bufferCapacity, buffer.size());
final List<StatusSnapshot> snapshots = buffer.getSnapshots(timestamps, true, PROCESSOR_METRICS);
assertEquals(iterations, snapshots.size());
final int expectedEmptyCount = iterations - bufferCapacity;
final long emptyCount = snapshots.stream().filter(snapshot -> snapshot instanceof EmptyStatusSnapshot).count();
assertEquals(expectedEmptyCount, emptyCount);
for (int i=0; i < iterations; i++) {
final StatusSnapshot snapshot = snapshots.get(i);
if (i < expectedEmptyCount) {
assertTrue("Snapshot at i=" + i + " is not an EmptyStatusSnapshot", snapshot instanceof EmptyStatusSnapshot);
} else {
assertEquals(Long.valueOf(i), snapshot.getStatusMetric(ProcessorStatusDescriptor.BYTES_WRITTEN.getDescriptor()));
assertFalse(snapshot instanceof EmptyStatusSnapshot);
}
}
}
@Test
public void testBufferShrinks() {
// Cause buffer to grow
final int bufferCapacity = 1000;
final MetricRollingBuffer buffer = new MetricRollingBuffer(bufferCapacity);
final long startTime = System.currentTimeMillis();
int iterations = 1440;
for (int i=0; i < iterations; i++) {
final StandardStatusSnapshot snapshot = new StandardStatusSnapshot(PROCESSOR_METRICS);
snapshot.setTimestamp(new Date(startTime + i * 1000));
snapshot.addStatusMetric(ProcessorStatusDescriptor.BYTES_WRITTEN.getDescriptor(), Long.valueOf(i));
buffer.update(snapshot);
}
assertEquals(bufferCapacity, buffer.size());
// Expire data ensure that the buffer shrinks
final long lastTimestamp = startTime + 1440 * 1000;
buffer.expireBefore(new Date(lastTimestamp - 144_001L));
assertEquals(144, buffer.size());
buffer.expireBefore(new Date(lastTimestamp - 16_001L));
assertEquals(16, buffer.size());
buffer.expireBefore(new Date(lastTimestamp));
assertEquals(0, buffer.size());
// Ensure that we can now properly add data again
long insertStart = lastTimestamp + 10_000L;
final List<Date> timestamps = new ArrayList<>();
for (int i=0; i < 4; i++) {
final StandardStatusSnapshot snapshot = new StandardStatusSnapshot(PROCESSOR_METRICS);
snapshot.setTimestamp(new Date(insertStart + i * 1000));
timestamps.add(snapshot.getTimestamp());
snapshot.addStatusMetric(ProcessorStatusDescriptor.BYTES_WRITTEN.getDescriptor(), Long.valueOf(i));
buffer.update(snapshot);
}
assertEquals(4, buffer.size());
final List<StatusSnapshot> snapshots = buffer.getSnapshots(timestamps, true, PROCESSOR_METRICS);
assertEquals(4, snapshots.size());
for (int i=0; i < 4; i++) {
final StatusSnapshot snapshot = snapshots.get(i);
assertEquals(Long.valueOf(i), snapshot.getStatusMetric(ProcessorStatusDescriptor.BYTES_WRITTEN.getDescriptor()));
}
}
}