mirror of
https://github.com/apache/nifi.git
synced 2025-03-01 15:09:11 +00:00
NIFI-8126
- Added totalActiveQueuedDuration and maxActiveQueuedDuration to the ConnectionStatus object - Updated FlowFileQueue implementations and supporting code to properly calculate and provide the totalActiveQueuedDuration and maxActiveQueuedDuration for their active queues - Fixing failing unit test. In examining this it appears that the unit test only accidentally passed in the past and that the object mocked to always throw an exception was not actually being used in the test. - Adding UI component via ConnectionStatusDescriptor along with updates based on PR comments. Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
parent
f812dfdfc0
commit
2309d75d3d
@ -42,6 +42,8 @@ public class ConnectionStatus implements Cloneable {
|
||||
private long outputBytes;
|
||||
private int maxQueuedCount;
|
||||
private long maxQueuedBytes;
|
||||
private long totalQueuedDuration;
|
||||
private long maxQueuedDuration;
|
||||
|
||||
public String getId() {
|
||||
return id;
|
||||
@ -196,6 +198,22 @@ public class ConnectionStatus implements Cloneable {
|
||||
this.backPressureBytesThreshold = backPressureBytesThreshold;
|
||||
}
|
||||
|
||||
public long getTotalQueuedDuration() {
|
||||
return totalQueuedDuration;
|
||||
}
|
||||
|
||||
public void setTotalQueuedDuration(long totalQueuedDuration) {
|
||||
this.totalQueuedDuration = totalQueuedDuration;
|
||||
}
|
||||
|
||||
public long getMaxQueuedDuration() {
|
||||
return maxQueuedDuration;
|
||||
}
|
||||
|
||||
public void setMaxQueuedDuration(long maxQueuedDuration) {
|
||||
this.maxQueuedDuration = maxQueuedDuration;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConnectionStatus clone() {
|
||||
final ConnectionStatus clonedObj = new ConnectionStatus();
|
||||
@ -221,6 +239,8 @@ public class ConnectionStatus implements Cloneable {
|
||||
clonedObj.backPressureObjectThreshold = backPressureObjectThreshold;
|
||||
clonedObj.maxQueuedBytes = maxQueuedBytes;
|
||||
clonedObj.maxQueuedCount = maxQueuedCount;
|
||||
clonedObj.totalQueuedDuration = totalQueuedDuration;
|
||||
clonedObj.maxQueuedDuration = maxQueuedDuration;
|
||||
return clonedObj;
|
||||
}
|
||||
|
||||
@ -261,6 +281,10 @@ public class ConnectionStatus implements Cloneable {
|
||||
builder.append(maxQueuedCount);
|
||||
builder.append(", maxQueueBytes=");
|
||||
builder.append(maxQueuedBytes);
|
||||
builder.append(", totalActiveQueuedDuration=");
|
||||
builder.append(totalQueuedDuration);
|
||||
builder.append(", maxActiveQueuedDuration=");
|
||||
builder.append(maxQueuedDuration);
|
||||
builder.append("]");
|
||||
return builder.toString();
|
||||
}
|
||||
|
@ -93,6 +93,17 @@ public interface FlowFileQueue {
|
||||
|
||||
QueueSize size();
|
||||
|
||||
/**
|
||||
* @param fromTimestamp The timestamp in milliseconds from which to calculate durations. This will typically be the current timestamp.
|
||||
* @return the sum in milliseconds of how long all FlowFiles within this queue have currently been in this queue.
|
||||
*/
|
||||
long getTotalQueuedDuration(long fromTimestamp);
|
||||
|
||||
/**
|
||||
* @return The minimum lastQueueDate in milliseconds of all FlowFiles currently enqueued. If no FlowFile is enqueued, this returns 0.
|
||||
*/
|
||||
long getMinLastQueueDate();
|
||||
|
||||
/**
|
||||
* @return true if no items queue; false otherwise
|
||||
*/
|
||||
|
@ -47,4 +47,14 @@ public interface SwapSummary {
|
||||
* @return a List of all ResourceClaims that are referenced by the FlowFiles in the swap file
|
||||
*/
|
||||
List<ResourceClaim> getResourceClaims();
|
||||
|
||||
/**
|
||||
* @return The minimum lastQueueDate of all FlowFiles that are currently stored in this queue.
|
||||
*/
|
||||
Long getMinLastQueueDate();
|
||||
|
||||
/**
|
||||
* @return The sum of all the lastQueueDates of all FlowFiles that are currently stored in this queue.
|
||||
*/
|
||||
Long getTotalLastQueueDate();
|
||||
}
|
||||
|
@ -227,6 +227,8 @@ public abstract class AbstractEventAccess implements EventAccess {
|
||||
final Collection<ConnectionStatus> connectionStatusCollection = new ArrayList<>();
|
||||
status.setConnectionStatus(connectionStatusCollection);
|
||||
|
||||
long now = System.currentTimeMillis();
|
||||
|
||||
// get the connection and remote port status
|
||||
for (final Connection conn : group.getConnections()) {
|
||||
final boolean isConnectionAuthorized = isAuthorized.test(conn);
|
||||
@ -242,6 +244,9 @@ public abstract class AbstractEventAccess implements EventAccess {
|
||||
connStatus.setDestinationName(isDestinationAuthorized ? conn.getDestination().getName() : conn.getDestination().getIdentifier());
|
||||
connStatus.setBackPressureDataSizeThreshold(conn.getFlowFileQueue().getBackPressureDataSizeThreshold());
|
||||
connStatus.setBackPressureObjectThreshold(conn.getFlowFileQueue().getBackPressureObjectThreshold());
|
||||
connStatus.setTotalQueuedDuration(conn.getFlowFileQueue().getTotalQueuedDuration(now));
|
||||
long minLastQueueDate = conn.getFlowFileQueue().getMinLastQueueDate();
|
||||
connStatus.setMaxQueuedDuration(minLastQueueDate == 0 ? 0 : now - minLastQueueDate);
|
||||
|
||||
final FlowFileEvent connectionStatusReport = statusReport.getReportEntry(conn.getIdentifier());
|
||||
if (connectionStatusReport != null) {
|
||||
|
@ -167,7 +167,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
|
||||
warn("Cannot swap in FlowFiles from location " + swapLocation + " because the FlowFile Repository does not know about this Swap Location. " +
|
||||
"This file should be manually removed. This typically occurs when a Swap File is written but the FlowFile Repository is not updated yet to reflect this. " +
|
||||
"This is generally not a cause for concern, but may be indicative of a failure to update the FlowFile Repository.");
|
||||
final SwapSummary swapSummary = new StandardSwapSummary(new QueueSize(0, 0), 0L, Collections.emptyList());
|
||||
final SwapSummary swapSummary = new StandardSwapSummary(new QueueSize(0, 0), 0L, Collections.emptyList(), 0L, 0L);
|
||||
return new StandardSwapContents(swapSummary, Collections.emptyList());
|
||||
}
|
||||
|
||||
|
@ -161,6 +161,16 @@ public class StandardFlowFileQueue extends AbstractFlowFileQueue implements Flow
|
||||
return queue.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getTotalQueuedDuration(long fromTimestamp) {
|
||||
return queue.getTotalQueuedDuration(fromTimestamp);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMinLastQueueDate() {
|
||||
return queue.getMinLastQueueDate();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEmpty() {
|
||||
return queue.getFlowFileQueueSize().isEmpty();
|
||||
|
@ -40,9 +40,11 @@ import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.PriorityQueue;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
@ -83,6 +85,11 @@ public class SwappablePriorityQueue {
|
||||
private ArrayList<FlowFileRecord> swapQueue;
|
||||
private boolean swapMode = false;
|
||||
|
||||
// The following members are used to keep metrics in memory for reporting purposes so that we don't have to constantly
|
||||
// read these values from swap files on disk.
|
||||
private final Map<String, Long> minQueueDateInSwapLocation = new HashMap<>();
|
||||
private final Map<String, Long> totalQueueDateInSwapLocation = new HashMap<>();
|
||||
|
||||
public SwappablePriorityQueue(final FlowFileSwapManager swapManager, final int swapThreshold, final EventReporter eventReporter, final FlowFileQueue flowFileQueue,
|
||||
final DropFlowFileAction dropAction, final String swapPartitionName) {
|
||||
this.swapManager = swapManager;
|
||||
@ -186,6 +193,8 @@ public class SwappablePriorityQueue {
|
||||
final List<String> swapLocations = new ArrayList<>(numSwapFiles);
|
||||
for (int i = 0; i < numSwapFiles; i++) {
|
||||
long bytesSwappedThisIteration = 0L;
|
||||
long totalSwapQueueDatesThisIteration = 0L;
|
||||
long minQueueDateThisIteration = Long.MAX_VALUE;
|
||||
|
||||
// Create a new swap file for the next SWAP_RECORD_POLL_SIZE records
|
||||
final List<FlowFileRecord> toSwap = new ArrayList<>(SWAP_RECORD_POLL_SIZE);
|
||||
@ -193,6 +202,8 @@ public class SwappablePriorityQueue {
|
||||
final FlowFileRecord flowFile = tempQueue.poll();
|
||||
toSwap.add(flowFile);
|
||||
bytesSwappedThisIteration += flowFile.getSize();
|
||||
totalSwapQueueDatesThisIteration += flowFile.getLastQueueDate();
|
||||
minQueueDateThisIteration = minQueueDateThisIteration < flowFile.getLastQueueDate() ? minQueueDateThisIteration : flowFile.getLastQueueDate();
|
||||
}
|
||||
|
||||
try {
|
||||
@ -204,6 +215,8 @@ public class SwappablePriorityQueue {
|
||||
|
||||
bytesSwappedOut += bytesSwappedThisIteration;
|
||||
flowFilesSwappedOut += toSwap.size();
|
||||
minQueueDateInSwapLocation.put(swapLocation, minQueueDateThisIteration);
|
||||
totalQueueDateInSwapLocation.put(swapLocation, totalSwapQueueDatesThisIteration);
|
||||
} catch (final IOException ioe) {
|
||||
tempQueue.addAll(toSwap); // if we failed, we must add the FlowFiles back to the queue.
|
||||
|
||||
@ -350,12 +363,16 @@ public class SwappablePriorityQueue {
|
||||
logger.debug("Attempting to swap in {}; all swap locations = {}", swapLocation, swapLocations);
|
||||
swapContents = swapManager.swapIn(swapLocation, flowFileQueue);
|
||||
swapLocations.remove(0);
|
||||
minQueueDateInSwapLocation.remove(swapLocation);
|
||||
totalQueueDateInSwapLocation.remove(swapLocation);
|
||||
} catch (final IncompleteSwapFileException isfe) {
|
||||
logger.error("Failed to swap in all FlowFiles from Swap File {}; Swap File ended prematurely. The records that were present will still be swapped in", swapLocation);
|
||||
logger.error("", isfe);
|
||||
swapContents = isfe.getPartialContents();
|
||||
partialContents = true;
|
||||
swapLocations.remove(0);
|
||||
minQueueDateInSwapLocation.remove(swapLocation);
|
||||
totalQueueDateInSwapLocation.remove(swapLocation);
|
||||
} catch (final FileNotFoundException fnfe) {
|
||||
logger.error("Failed to swap in FlowFiles from Swap File {} because the Swap File can no longer be found", swapLocation);
|
||||
if (eventReporter != null) {
|
||||
@ -363,6 +380,8 @@ public class SwappablePriorityQueue {
|
||||
}
|
||||
|
||||
swapLocations.remove(0);
|
||||
minQueueDateInSwapLocation.remove(swapLocation);
|
||||
totalQueueDateInSwapLocation.remove(swapLocation);
|
||||
return;
|
||||
} catch (final IOException ioe) {
|
||||
logger.error("Failed to swap in FlowFiles from Swap File {}; Swap File appears to be corrupt!", swapLocation);
|
||||
@ -829,6 +848,8 @@ public class SwappablePriorityQueue {
|
||||
|
||||
dropRequest.setCurrentSize(size());
|
||||
swapLocationItr.remove();
|
||||
minQueueDateInSwapLocation.remove(swapLocation);
|
||||
totalQueueDateInSwapLocation.remove(swapLocation);
|
||||
logger.debug("For DropFlowFileRequest {}, dropped {} for Swap File {}", requestIdentifier, droppedSize, swapLocation);
|
||||
}
|
||||
|
||||
@ -851,6 +872,8 @@ public class SwappablePriorityQueue {
|
||||
public SwapSummary recoverSwappedFlowFiles() {
|
||||
int swapFlowFileCount = 0;
|
||||
long swapByteCount = 0L;
|
||||
long totalSwappedQueueDate = 0L;
|
||||
Long minSwappedQueueDate = null;
|
||||
Long maxId = null;
|
||||
List<ResourceClaim> resourceClaims = new ArrayList<>();
|
||||
final long startNanos = System.nanoTime();
|
||||
@ -893,6 +916,20 @@ public class SwappablePriorityQueue {
|
||||
swapFlowFileCount += queueSize.getObjectCount();
|
||||
swapByteCount += queueSize.getByteCount();
|
||||
resourceClaims.addAll(summary.getResourceClaims());
|
||||
|
||||
// Update class member metrics
|
||||
minQueueDateInSwapLocation.put(swapLocation, summary.getMinLastQueueDate());
|
||||
totalQueueDateInSwapLocation.put(swapLocation, summary.getTotalLastQueueDate());
|
||||
|
||||
// Update metrics for this method's return value
|
||||
if(minSwappedQueueDate == null) {
|
||||
minSwappedQueueDate = summary.getMinLastQueueDate();
|
||||
} else {
|
||||
if(summary.getMinLastQueueDate() != null) {
|
||||
minSwappedQueueDate = Long.min(minSwappedQueueDate, summary.getMinLastQueueDate());
|
||||
}
|
||||
}
|
||||
totalSwappedQueueDate += summary.getTotalLastQueueDate();
|
||||
} catch (final IOException ioe) {
|
||||
failures++;
|
||||
logger.error("Failed to recover FlowFiles from Swap File {}; the file appears to be corrupt", swapLocation);
|
||||
@ -917,10 +954,63 @@ public class SwappablePriorityQueue {
|
||||
logger.info("Recovered {} swap files for {} in {} millis", swapLocations.size() - failures, this, millis);
|
||||
}
|
||||
|
||||
return new StandardSwapSummary(new QueueSize(swapFlowFileCount, swapByteCount), maxId, resourceClaims);
|
||||
// minSwappedQueueDate and totalSwappedQueueDate within this particular StandardSwapSummary are not ultimately used by the FlowController. However,
|
||||
// it can't hurt to set them here accurately in case they ever are.
|
||||
return new StandardSwapSummary(new QueueSize(swapFlowFileCount, swapByteCount), maxId, resourceClaims, minSwappedQueueDate, totalSwappedQueueDate);
|
||||
}
|
||||
|
||||
public long getMinLastQueueDate() {
|
||||
readLock.lock();
|
||||
try {
|
||||
// We want the oldest timestamp, which will be the min
|
||||
long min = getMinLastQueueDate(activeQueue, 0L);
|
||||
min = Long.min(min, getMinLastQueueDate(swapQueue, min));
|
||||
|
||||
for(Long minSwapQueueDate: minQueueDateInSwapLocation.values()) {
|
||||
min = min == 0 ? minSwapQueueDate : Long.min(min, minSwapQueueDate);
|
||||
}
|
||||
|
||||
return min;
|
||||
} finally {
|
||||
readLock.unlock("Get Min Last Queue Date");
|
||||
}
|
||||
}
|
||||
|
||||
private long getMinLastQueueDate(Iterable<FlowFileRecord> iterable, long defaultMin) {
|
||||
long min = 0;
|
||||
|
||||
for (FlowFileRecord flowFileRecord : iterable) {
|
||||
min = min == 0 ? flowFileRecord.getLastQueueDate() : Long.min(flowFileRecord.getLastQueueDate(), min);
|
||||
}
|
||||
|
||||
return min == 0 ? defaultMin : min;
|
||||
}
|
||||
|
||||
public long getTotalQueuedDuration(long fromTimestamp) {
|
||||
readLock.lock();
|
||||
try {
|
||||
long sum = 0L;
|
||||
for (FlowFileRecord flowFileRecord : activeQueue) {
|
||||
sum += (fromTimestamp - flowFileRecord.getLastQueueDate());
|
||||
}
|
||||
|
||||
for (FlowFileRecord flowFileRecord : swapQueue) {
|
||||
sum += (fromTimestamp - flowFileRecord.getLastQueueDate());
|
||||
}
|
||||
|
||||
long totalSwappedQueueDate = 0L;
|
||||
for(Long totalQueueDate: totalQueueDateInSwapLocation.values()) {
|
||||
totalSwappedQueueDate += totalQueueDate;
|
||||
}
|
||||
|
||||
// We are only considering FlowFiles that have been swapped to disk in this calculation since we took care of the
|
||||
// in-memory swapQueue previously.
|
||||
sum += ((getFlowFileQueueSize().getSwappedCount() - swapQueue.size()) * fromTimestamp) - totalSwappedQueueDate;
|
||||
return sum;
|
||||
} finally {
|
||||
readLock.unlock("Get Total Queued Duration");
|
||||
}
|
||||
}
|
||||
|
||||
protected void incrementActiveQueueSize(final int count, final long bytes) {
|
||||
boolean updated = false;
|
||||
|
@ -482,6 +482,8 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
|
||||
Long maxId = null;
|
||||
QueueSize totalQueueSize = new QueueSize(0, 0L);
|
||||
final List<ResourceClaim> resourceClaims = new ArrayList<>();
|
||||
Long minLastQueueDate = null;
|
||||
long totalLastQueueDate = 0L;
|
||||
|
||||
for (final SwapSummary summary : summaries) {
|
||||
Long summaryMaxId = summary.getMaxFlowFileId();
|
||||
@ -494,11 +496,21 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
|
||||
|
||||
final List<ResourceClaim> summaryResourceClaims = summary.getResourceClaims();
|
||||
resourceClaims.addAll(summaryResourceClaims);
|
||||
|
||||
if(minLastQueueDate == null) {
|
||||
minLastQueueDate = summary.getMinLastQueueDate();
|
||||
} else {
|
||||
if(summary.getMinLastQueueDate() != null) {
|
||||
minLastQueueDate = Long.min(minLastQueueDate, summary.getMinLastQueueDate());
|
||||
}
|
||||
}
|
||||
|
||||
totalLastQueueDate += summary.getTotalLastQueueDate();
|
||||
}
|
||||
|
||||
adjustSize(totalQueueSize.getObjectCount(), totalQueueSize.getByteCount());
|
||||
|
||||
return new StandardSwapSummary(totalQueueSize, maxId, resourceClaims);
|
||||
return new StandardSwapSummary(totalQueueSize, maxId, resourceClaims, minLastQueueDate, totalLastQueueDate);
|
||||
} finally {
|
||||
partitionReadLock.unlock();
|
||||
}
|
||||
@ -514,6 +526,25 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
|
||||
return totalSize.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getTotalQueuedDuration(long fromTimestamp) {
|
||||
long sum = 0L;
|
||||
for (QueuePartition queuePartition : queuePartitions) {
|
||||
long totalActiveQueuedDuration = queuePartition.getTotalActiveQueuedDuration(fromTimestamp);
|
||||
sum += totalActiveQueuedDuration;
|
||||
}
|
||||
return sum;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMinLastQueueDate() {
|
||||
long min = 0;
|
||||
for (QueuePartition queuePartition : queuePartitions) {
|
||||
min = min == 0 ? queuePartition.getMinLastQueueDate() : Long.min(min, queuePartition.getMinLastQueueDate());
|
||||
}
|
||||
return min;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEmpty() {
|
||||
return size().getObjectCount() == 0;
|
||||
|
@ -99,4 +99,15 @@ public interface QueuePartition {
|
||||
* @return the current size of the partition's queue
|
||||
*/
|
||||
QueueSize size();
|
||||
|
||||
/**
|
||||
* @param fromTimestamp The timestamp in miliiseconds from which to calculate durations. This will typically be the current timestamp.
|
||||
* @return the sum in milliseconds of how long all FlowFiles within this queue have currently been in this queue.
|
||||
*/
|
||||
long getTotalActiveQueuedDuration(long fromTimestamp);
|
||||
|
||||
/**
|
||||
* @return The minimum lastQueueDate in milliseconds of all FlowFiles currently enqueued. If no FlowFile is enqueued, this returns 0.
|
||||
*/
|
||||
long getMinLastQueueDate();
|
||||
}
|
||||
|
@ -98,6 +98,16 @@ public class RemoteQueuePartition implements QueuePartition {
|
||||
return priorityQueue.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getTotalActiveQueuedDuration(long fromTimestamp) {
|
||||
return priorityQueue.getTotalQueuedDuration(fromTimestamp);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMinLastQueueDate() {
|
||||
return priorityQueue.getMinLastQueueDate();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getSwapPartitionName() {
|
||||
return nodeIdentifier.getId();
|
||||
|
@ -67,6 +67,16 @@ public class StandardRebalancingPartition implements RebalancingPartition {
|
||||
return queue.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getTotalActiveQueuedDuration(long fromTimestamp) {
|
||||
return queue.getTotalQueuedDuration(fromTimestamp);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMinLastQueueDate() {
|
||||
return queue.getMinLastQueueDate();
|
||||
}
|
||||
|
||||
@Override
|
||||
public SwapSummary recoverSwappedFlowFiles() {
|
||||
return this.queue.recoverSwappedFlowFiles();
|
||||
|
@ -66,6 +66,16 @@ public class SwappablePriorityQueueLocalPartition implements LocalQueuePartition
|
||||
return priorityQueue.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getTotalActiveQueuedDuration(long fromTimestamp) {
|
||||
return priorityQueue.getTotalQueuedDuration(fromTimestamp);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMinLastQueueDate() {
|
||||
return priorityQueue.getMinLastQueueDate();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isUnacknowledgedFlowFile() {
|
||||
return priorityQueue.isUnacknowledgedFlowFile();
|
||||
|
@ -61,7 +61,21 @@ public enum ConnectionStatusDescriptor {
|
||||
"Queued Count",
|
||||
"The number of FlowFiles queued in this Connection",
|
||||
Formatter.COUNT,
|
||||
s -> Long.valueOf(s.getQueuedCount()));
|
||||
s -> Long.valueOf(s.getQueuedCount())),
|
||||
|
||||
TOTAL_QUEUED_DURATION(
|
||||
"totalQueuedDuration",
|
||||
"Total Queued Duration (5 mins)",
|
||||
"The cumulative queued duration, in milliseconds, of all FlowFiles that were transferred to this Connection in the past 5 minutes",
|
||||
Formatter.COUNT,
|
||||
s -> Long.valueOf(s.getTotalQueuedDuration())),
|
||||
|
||||
MAX_QUEUED_DURATION(
|
||||
"maxQueuedDuration",
|
||||
"Max Queued Duration (5 mins)",
|
||||
"The max queued duration, in milliseconds, of any FlowFile that was transferred to this Connection in the past 5 minutes",
|
||||
Formatter.COUNT,
|
||||
s -> Long.valueOf(s.getMaxQueuedDuration()));
|
||||
|
||||
|
||||
private MetricDescriptor<ConnectionStatus> descriptor;
|
||||
|
@ -46,7 +46,7 @@ import org.apache.nifi.repository.schema.SimpleRecordField;
|
||||
public class SchemaSwapSerializer implements SwapSerializer {
|
||||
static final String SERIALIZATION_NAME = "Schema Swap Serialization";
|
||||
|
||||
private final RecordSchema schema = SwapSchema.FULL_SWAP_FILE_SCHEMA_V2;
|
||||
private final RecordSchema schema = SwapSchema.FULL_SWAP_FILE_SCHEMA_V3;
|
||||
private final RecordSchema flowFileSchema = new RecordSchema(schema.getField(SwapSchema.FLOWFILE_CONTENTS).getSubFields());
|
||||
|
||||
@Override
|
||||
@ -55,6 +55,8 @@ public class SchemaSwapSerializer implements SwapSerializer {
|
||||
|
||||
long contentSize = 0L;
|
||||
long maxFlowFileId = -1L;
|
||||
Long minLastQueueDate = null;
|
||||
long totalLastQueuedate = 0L;
|
||||
final List<ResourceClaim> resourceClaims = new ArrayList<>();
|
||||
for (final FlowFileRecord flowFile : toSwap) {
|
||||
contentSize += flowFile.getSize();
|
||||
@ -62,6 +64,10 @@ public class SchemaSwapSerializer implements SwapSerializer {
|
||||
maxFlowFileId = flowFile.getId();
|
||||
}
|
||||
|
||||
totalLastQueuedate += flowFile.getLastQueueDate();
|
||||
|
||||
minLastQueueDate = minLastQueueDate == null ? flowFile.getLastQueueDate() : Long.min(minLastQueueDate, flowFile.getLastQueueDate());
|
||||
|
||||
final ContentClaim contentClaim = flowFile.getContentClaim();
|
||||
if (contentClaim != null) {
|
||||
resourceClaims.add(contentClaim.getResourceClaim());
|
||||
@ -69,8 +75,8 @@ public class SchemaSwapSerializer implements SwapSerializer {
|
||||
}
|
||||
|
||||
final QueueSize queueSize = new QueueSize(toSwap.size(), contentSize);
|
||||
final SwapSummary swapSummary = new StandardSwapSummary(queueSize, maxFlowFileId, resourceClaims);
|
||||
final Record summaryRecord = new SwapSummaryFieldMap(swapSummary, queue.getIdentifier(), SwapSchema.SWAP_SUMMARY_SCHEMA_V1);
|
||||
final SwapSummary swapSummary = new StandardSwapSummary(queueSize, maxFlowFileId, resourceClaims, minLastQueueDate, totalLastQueuedate);
|
||||
final Record summaryRecord = new SwapSummaryFieldMap(swapSummary, queue.getIdentifier(), SwapSchema.SWAP_SUMMARY_SCHEMA_V3);
|
||||
|
||||
final List<Record> flowFileRecords = toSwap.stream()
|
||||
.map(flowFile -> new FlowFileRecordFieldMap(flowFile, flowFileSchema))
|
||||
|
@ -104,7 +104,7 @@ public class SimpleSwapDeserializer implements SwapDeserializer {
|
||||
}
|
||||
} catch (final EOFException eof) {
|
||||
final QueueSize queueSize = new QueueSize(numRecords, contentSize);
|
||||
final SwapSummary summary = new StandardSwapSummary(queueSize, maxRecordId, Collections.emptyList());
|
||||
final SwapSummary summary = new StandardSwapSummary(queueSize, maxRecordId, Collections.emptyList(), 0L, 0L);
|
||||
final SwapContents partialContents = new StandardSwapContents(summary, Collections.emptyList());
|
||||
throw new IncompleteSwapFileException(swapLocation, partialContents);
|
||||
}
|
||||
@ -247,13 +247,13 @@ public class SimpleSwapDeserializer implements SwapDeserializer {
|
||||
|
||||
flowFiles.add(record);
|
||||
} catch (final EOFException eof) {
|
||||
final SwapSummary swapSummary = new StandardSwapSummary(queueSize, maxId, resourceClaims);
|
||||
final SwapSummary swapSummary = new StandardSwapSummary(queueSize, maxId, resourceClaims, 0L, 0L);
|
||||
final SwapContents partialContents = new StandardSwapContents(swapSummary, flowFiles);
|
||||
throw new IncompleteSwapFileException(location, partialContents);
|
||||
}
|
||||
}
|
||||
|
||||
final SwapSummary swapSummary = new StandardSwapSummary(queueSize, maxId, resourceClaims);
|
||||
final SwapSummary swapSummary = new StandardSwapSummary(queueSize, maxId, resourceClaims, 0L, 0L);
|
||||
return new StandardSwapContents(swapSummary, flowFiles);
|
||||
}
|
||||
|
||||
|
@ -25,16 +25,20 @@ import org.apache.nifi.controller.repository.SwapSummary;
|
||||
import org.apache.nifi.controller.repository.claim.ResourceClaim;
|
||||
|
||||
public class StandardSwapSummary implements SwapSummary {
|
||||
public static final SwapSummary EMPTY_SUMMARY = new StandardSwapSummary(new QueueSize(0, 0L), null, Collections.<ResourceClaim> emptyList());
|
||||
public static final SwapSummary EMPTY_SUMMARY = new StandardSwapSummary(new QueueSize(0, 0L), null, Collections.<ResourceClaim> emptyList(), 0L, 0L);
|
||||
|
||||
private final QueueSize queueSize;
|
||||
private final Long maxFlowFileId;
|
||||
private final List<ResourceClaim> resourceClaims;
|
||||
private final Long minLastQueueDate;
|
||||
private final Long totalLastQueueDate;
|
||||
|
||||
public StandardSwapSummary(final QueueSize queueSize, final Long maxFlowFileId, final List<ResourceClaim> resourceClaims) {
|
||||
public StandardSwapSummary(final QueueSize queueSize, final Long maxFlowFileId, final List<ResourceClaim> resourceClaims, final Long minLastQueueDate, final Long totalLastQueueDate) {
|
||||
this.queueSize = queueSize;
|
||||
this.maxFlowFileId = maxFlowFileId;
|
||||
this.resourceClaims = Collections.unmodifiableList(resourceClaims);
|
||||
this.minLastQueueDate = minLastQueueDate;
|
||||
this.totalLastQueueDate = totalLastQueueDate;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -51,4 +55,16 @@ public class StandardSwapSummary implements SwapSummary {
|
||||
public List<ResourceClaim> getResourceClaims() {
|
||||
return resourceClaims;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long getMinLastQueueDate() {
|
||||
return minLastQueueDate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long getTotalLastQueueDate() {
|
||||
return totalLastQueueDate;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -40,6 +40,9 @@ public class SwapSchema {
|
||||
public static final RecordSchema SWAP_CONTENTS_SCHEMA_V2;
|
||||
public static final RecordSchema FULL_SWAP_FILE_SCHEMA_V2;
|
||||
|
||||
public static final RecordSchema SWAP_SUMMARY_SCHEMA_V3;
|
||||
public static final RecordSchema FULL_SWAP_FILE_SCHEMA_V3;
|
||||
|
||||
public static final String RESOURCE_CLAIMS = "Resource Claims";
|
||||
public static final String RESOURCE_CLAIM = "Resource Claim";
|
||||
public static final String RESOURCE_CLAIM_COUNT = "Claim Count";
|
||||
@ -48,6 +51,8 @@ public class SwapSchema {
|
||||
public static final String FLOWFILE_COUNT = "FlowFile Count";
|
||||
public static final String FLOWFILE_SIZE = "FlowFile Size";
|
||||
public static final String MAX_RECORD_ID = "Max Record ID";
|
||||
public static final String MIN_LAST_QUEUE_DATE = "Min Last Queue Date";
|
||||
public static final String TOTAL_LAST_QUEUE_DATE = "Total Last Queue Date";
|
||||
public static final String SWAP_SUMMARY = "Swap Summary";
|
||||
public static final String FLOWFILE_CONTENTS = "FlowFiles";
|
||||
|
||||
@ -106,4 +111,32 @@ public class SwapSchema {
|
||||
fullSchemaFields.add(new ComplexRecordField(FLOWFILE_CONTENTS, Repetition.ZERO_OR_MORE, FlowFileSchema.FLOWFILE_SCHEMA_V2.getFields()));
|
||||
FULL_SWAP_FILE_SCHEMA_V2 = new RecordSchema(fullSchemaFields);
|
||||
}
|
||||
|
||||
static {
|
||||
final RecordField queueIdentifier = new SimpleRecordField(QUEUE_IDENTIFIER, FieldType.STRING, Repetition.EXACTLY_ONE);
|
||||
final RecordField flowFileCount = new SimpleRecordField(FLOWFILE_COUNT, FieldType.INT, Repetition.EXACTLY_ONE);
|
||||
final RecordField flowFileSize = new SimpleRecordField(FLOWFILE_SIZE, FieldType.LONG, Repetition.EXACTLY_ONE);
|
||||
final RecordField maxRecordId = new SimpleRecordField(MAX_RECORD_ID, FieldType.LONG, Repetition.EXACTLY_ONE);
|
||||
final RecordField minLastQueueDate = new SimpleRecordField(MIN_LAST_QUEUE_DATE, FieldType.LONG, Repetition.EXACTLY_ONE);
|
||||
final RecordField totalLastQueueDate = new SimpleRecordField(TOTAL_LAST_QUEUE_DATE, FieldType.LONG, Repetition.EXACTLY_ONE);
|
||||
|
||||
final RecordField resourceClaimField = new ComplexRecordField(RESOURCE_CLAIM, Repetition.EXACTLY_ONE, ContentClaimSchema.RESOURCE_CLAIM_SCHEMA_V1.getFields());
|
||||
final RecordField claimCountField = new SimpleRecordField(RESOURCE_CLAIM_COUNT, FieldType.INT, Repetition.EXACTLY_ONE);
|
||||
final RecordField resourceClaims = new MapRecordField(RESOURCE_CLAIMS, resourceClaimField, claimCountField, Repetition.EXACTLY_ONE);
|
||||
|
||||
final List<RecordField> summaryFields = new ArrayList<>();
|
||||
summaryFields.add(queueIdentifier);
|
||||
summaryFields.add(flowFileCount);
|
||||
summaryFields.add(flowFileSize);
|
||||
summaryFields.add(maxRecordId);
|
||||
summaryFields.add(minLastQueueDate);
|
||||
summaryFields.add(totalLastQueueDate);
|
||||
summaryFields.add(resourceClaims);
|
||||
SWAP_SUMMARY_SCHEMA_V3 = new RecordSchema(summaryFields);
|
||||
|
||||
final List<RecordField> fullSchemaFields = new ArrayList<>();
|
||||
fullSchemaFields.add(new ComplexRecordField(SWAP_SUMMARY, Repetition.EXACTLY_ONE, summaryFields));
|
||||
fullSchemaFields.add(new ComplexRecordField(FLOWFILE_CONTENTS, Repetition.ZERO_OR_MORE, FlowFileSchema.FLOWFILE_SCHEMA_V2.getFields()));
|
||||
FULL_SWAP_FILE_SCHEMA_V3 = new RecordSchema(fullSchemaFields);
|
||||
}
|
||||
}
|
||||
|
@ -77,6 +77,10 @@ public class SwapSummaryFieldMap implements Record {
|
||||
return queueIdentifier;
|
||||
case SwapSchema.RESOURCE_CLAIMS:
|
||||
return claimCounts;
|
||||
case SwapSchema.MIN_LAST_QUEUE_DATE:
|
||||
return swapSummary.getMinLastQueueDate();
|
||||
case SwapSchema.TOTAL_LAST_QUEUE_DATE:
|
||||
return swapSummary.getTotalLastQueueDate();
|
||||
}
|
||||
|
||||
return null;
|
||||
@ -86,8 +90,21 @@ public class SwapSummaryFieldMap implements Record {
|
||||
public static SwapSummary getSwapSummary(final Record record, final ResourceClaimManager claimManager) {
|
||||
final int flowFileCount = (Integer) record.getFieldValue(SwapSchema.FLOWFILE_COUNT);
|
||||
final long flowFileSize = (Long) record.getFieldValue(SwapSchema.FLOWFILE_SIZE);
|
||||
final QueueSize queueSize = new QueueSize(flowFileCount, flowFileSize);
|
||||
|
||||
// In the event that min and totalLastQueueDate are null, set them to neutral values based on
|
||||
// the current time.
|
||||
Long minLastQueueDate = (Long) record.getFieldValue(SwapSchema.MIN_LAST_QUEUE_DATE);
|
||||
long now = System.currentTimeMillis();
|
||||
if(minLastQueueDate == null) {
|
||||
minLastQueueDate = now;
|
||||
}
|
||||
|
||||
Long totalLastQueueDate = (Long) record.getFieldValue(SwapSchema.TOTAL_LAST_QUEUE_DATE);
|
||||
if(totalLastQueueDate == null) {
|
||||
totalLastQueueDate = now * flowFileCount;
|
||||
}
|
||||
|
||||
final QueueSize queueSize = new QueueSize(flowFileCount, flowFileSize);
|
||||
final long maxFlowFileId = (Long) record.getFieldValue(SwapSchema.MAX_RECORD_ID);
|
||||
|
||||
final Map<Record, Integer> resourceClaimRecords = (Map<Record, Integer>) record.getFieldValue(SwapSchema.RESOURCE_CLAIMS);
|
||||
@ -101,6 +118,6 @@ public class SwapSummaryFieldMap implements Record {
|
||||
}
|
||||
}
|
||||
|
||||
return new StandardSwapSummary(queueSize, maxFlowFileId, resourceClaims);
|
||||
return new StandardSwapSummary(queueSize, maxFlowFileId, resourceClaims, minLastQueueDate, totalLastQueueDate);
|
||||
}
|
||||
}
|
||||
|
@ -35,6 +35,7 @@ public class MockFlowFileRecord implements FlowFileRecord {
|
||||
private final Map<String, String> attributes;
|
||||
private final long size;
|
||||
private final ContentClaim contentClaim;
|
||||
private long lastQueuedDate = System.currentTimeMillis() + 1;
|
||||
|
||||
public MockFlowFileRecord() {
|
||||
this(1L);
|
||||
@ -82,11 +83,6 @@ public class MockFlowFileRecord implements FlowFileRecord {
|
||||
return entryDate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long getLastQueueDate() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isPenalized() {
|
||||
return false;
|
||||
@ -132,8 +128,17 @@ public class MockFlowFileRecord implements FlowFileRecord {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long getLastQueueDate() {
|
||||
return lastQueuedDate;
|
||||
}
|
||||
|
||||
public void setLastQueuedDate(long lastQueuedDate) {
|
||||
this.lastQueuedDate = lastQueuedDate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getQueueDateIndex() {
|
||||
return 0;
|
||||
return lastQueuedDate;
|
||||
}
|
||||
}
|
||||
|
@ -146,6 +146,8 @@ public class MockSwapManager implements FlowFileSwapManager {
|
||||
int count = 0;
|
||||
long size = 0L;
|
||||
Long max = null;
|
||||
Long minLastQueueDate = null;
|
||||
Long totalLastQueueDate = 0L;
|
||||
final List<ResourceClaim> resourceClaims = new ArrayList<>();
|
||||
for (final FlowFileRecord flowFile : flowFiles) {
|
||||
count++;
|
||||
@ -154,12 +156,16 @@ public class MockSwapManager implements FlowFileSwapManager {
|
||||
max = flowFile.getId();
|
||||
}
|
||||
|
||||
minLastQueueDate = minLastQueueDate == null ? flowFile.getLastQueueDate() : Long.min(minLastQueueDate, flowFile.getLastQueueDate());
|
||||
|
||||
totalLastQueueDate += flowFile.getLastQueueDate();
|
||||
|
||||
if (flowFile.getContentClaim() != null) {
|
||||
resourceClaims.add(flowFile.getContentClaim().getResourceClaim());
|
||||
}
|
||||
}
|
||||
|
||||
return new StandardSwapSummary(new QueueSize(count, size), max, resourceClaims);
|
||||
return new StandardSwapSummary(new QueueSize(count, size), max, resourceClaims, minLastQueueDate, totalLastQueueDate);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -26,7 +26,9 @@ import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
|
||||
import org.apache.nifi.events.EventReporter;
|
||||
import org.apache.nifi.stream.io.StreamUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
@ -44,7 +46,7 @@ import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.ArgumentMatchers.anyCollection;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@ -79,9 +81,9 @@ public class TestFileSystemSwapManager {
|
||||
|
||||
final FlowFileRepository flowFileRepo = Mockito.mock(FlowFileRepository.class);
|
||||
Mockito.doThrow(new IOException("Intentional IOException for unit test"))
|
||||
.when(flowFileRepo).updateRepository(anyCollection());
|
||||
.when(flowFileRepo).swapFlowFilesOut(any(), any(), any());
|
||||
|
||||
final FileSystemSwapManager swapManager = createSwapManager();
|
||||
final FileSystemSwapManager swapManager = createSwapManager(flowFileRepo);
|
||||
|
||||
final List<FlowFileRecord> flowFileRecords = new ArrayList<>();
|
||||
for (int i=0; i < 10000; i++) {
|
||||
@ -146,13 +148,16 @@ public class TestFileSystemSwapManager {
|
||||
assertEquals(10000, contents.getFlowFiles().size());
|
||||
}
|
||||
|
||||
private FileSystemSwapManager createSwapManager() {
|
||||
private FileSystemSwapManager createSwapManager() throws IOException {
|
||||
final FlowFileRepository flowFileRepo = Mockito.mock(FlowFileRepository.class);
|
||||
return createSwapManager(flowFileRepo);
|
||||
}
|
||||
|
||||
private FileSystemSwapManager createSwapManager(final FlowFileRepository flowFileRepo) {
|
||||
final FileSystemSwapManager swapManager = new FileSystemSwapManager();
|
||||
@Rule
|
||||
public TemporaryFolder temporaryFolder = new TemporaryFolder();
|
||||
|
||||
private FileSystemSwapManager createSwapManager(final FlowFileRepository flowFileRepo) throws IOException {
|
||||
final FileSystemSwapManager swapManager = new FileSystemSwapManager(temporaryFolder.newFolder().toPath());
|
||||
final ResourceClaimManager resourceClaimManager = new NopResourceClaimManager();
|
||||
swapManager.initialize(new SwapManagerInitializationContext() {
|
||||
@Override
|
||||
|
@ -604,6 +604,40 @@ public class TestStandardFlowFileQueue {
|
||||
assertTrue(swapManager.swappedOut.isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetTotalActiveQueuedDuration() {
|
||||
long now = System.currentTimeMillis();
|
||||
MockFlowFileRecord testFlowfile1 = new MockFlowFileRecord();
|
||||
testFlowfile1.setLastQueuedDate(now - 500);
|
||||
MockFlowFileRecord testFlowfile2 = new MockFlowFileRecord();
|
||||
testFlowfile2.setLastQueuedDate(now - 1000);
|
||||
|
||||
queue.put(testFlowfile1);
|
||||
queue.put(testFlowfile2);
|
||||
|
||||
assertEquals(1500, queue.getTotalQueuedDuration(now));
|
||||
queue.poll(1, Collections.emptySet());
|
||||
|
||||
assertEquals(1000, queue.getTotalQueuedDuration(now));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetMinLastQueueDate() {
|
||||
long now = System.currentTimeMillis();
|
||||
MockFlowFileRecord testFlowfile1 = new MockFlowFileRecord();
|
||||
testFlowfile1.setLastQueuedDate(now - 1000);
|
||||
MockFlowFileRecord testFlowfile2 = new MockFlowFileRecord();
|
||||
testFlowfile2.setLastQueuedDate(now - 500);
|
||||
|
||||
queue.put(testFlowfile1);
|
||||
queue.put(testFlowfile2);
|
||||
|
||||
assertEquals(1000, now - queue.getMinLastQueueDate());
|
||||
queue.poll(1, Collections.emptySet());
|
||||
|
||||
assertEquals(500, now - queue.getMinLastQueueDate());
|
||||
}
|
||||
|
||||
|
||||
private static class FlowFileSizePrioritizer implements FlowFilePrioritizer {
|
||||
@Override
|
||||
|
@ -361,12 +361,18 @@ public class TestSocketLoadBalancedFlowFileQueue {
|
||||
|
||||
@Test
|
||||
public void testRecoverSwapFiles() throws IOException {
|
||||
long expectedMinLastQueueDate = Long.MAX_VALUE;
|
||||
long expectedTotalLastQueueDate = 0L;
|
||||
|
||||
for (int partitionIndex = 0; partitionIndex < 3; partitionIndex++) {
|
||||
final String partitionName = queue.getPartition(partitionIndex).getSwapPartitionName();
|
||||
|
||||
final List<FlowFileRecord> flowFiles = new ArrayList<>();
|
||||
for (int i = 0; i < 100; i++) {
|
||||
flowFiles.add(new MockFlowFileRecord(100L));
|
||||
FlowFileRecord newMockFlowFilerecord = new MockFlowFileRecord(100L);
|
||||
flowFiles.add(newMockFlowFilerecord);
|
||||
expectedMinLastQueueDate = Long.min(expectedMinLastQueueDate, newMockFlowFilerecord.getLastQueueDate());
|
||||
expectedTotalLastQueueDate += newMockFlowFilerecord.getLastQueueDate();
|
||||
}
|
||||
|
||||
swapManager.swapOut(flowFiles, queue, partitionName);
|
||||
@ -374,7 +380,10 @@ public class TestSocketLoadBalancedFlowFileQueue {
|
||||
|
||||
final List<FlowFileRecord> flowFiles = new ArrayList<>();
|
||||
for (int i = 0; i < 100; i++) {
|
||||
flowFiles.add(new MockFlowFileRecord(100L));
|
||||
FlowFileRecord newMockFlowFilerecord = new MockFlowFileRecord(100L);
|
||||
flowFiles.add(newMockFlowFilerecord);
|
||||
expectedMinLastQueueDate = Long.min(expectedMinLastQueueDate, newMockFlowFilerecord.getLastQueueDate());
|
||||
expectedTotalLastQueueDate += newMockFlowFilerecord.getLastQueueDate();
|
||||
}
|
||||
|
||||
swapManager.swapOut(flowFiles, queue, "other-partition");
|
||||
@ -383,6 +392,8 @@ public class TestSocketLoadBalancedFlowFileQueue {
|
||||
assertEquals(399L, swapSummary.getMaxFlowFileId().longValue());
|
||||
assertEquals(400, swapSummary.getQueueSize().getObjectCount());
|
||||
assertEquals(400 * 100L, swapSummary.getQueueSize().getByteCount());
|
||||
assertEquals(expectedTotalLastQueueDate, swapSummary.getTotalLastQueueDate().longValue());
|
||||
assertEquals(expectedMinLastQueueDate, swapSummary.getMinLastQueueDate().longValue());
|
||||
}
|
||||
|
||||
|
||||
|
@ -36,6 +36,7 @@ import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
@ -716,4 +717,52 @@ public class TestSwappablePriorityQueue {
|
||||
|
||||
assertTrue(swapManager.swappedOut.isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
// The purpose of this test is to populate a SwappablePriorityQueue and to ensure that the minLastQueueDate and TotalQueueDuration are correct.
|
||||
// To truly test this we need to get both the in-memory swap queue and swap "on disk" involved.
|
||||
public void testLastQueueDateMetrics() throws IOException {
|
||||
Set<FlowFileRecord> flowFileRecords = new HashSet<>(11001);
|
||||
queue = new SwappablePriorityQueue(swapManager, 1000, eventReporter, flowFileQueue, dropAction, "testGetMinLastQueueDate");
|
||||
long minQueueDate = Long.MAX_VALUE;
|
||||
long totalQueueDate = 0L;
|
||||
// Put enough files in the queue to swap to disk
|
||||
for (int i = 1; i <= 11001; i++) {
|
||||
FlowFileRecord flowFileRecord = new MockFlowFileRecord();
|
||||
queue.put(flowFileRecord);
|
||||
flowFileRecords.add(flowFileRecord);
|
||||
totalQueueDate += flowFileRecord.getLastQueueDate();
|
||||
minQueueDate = Long.min(minQueueDate, flowFileRecord.getLastQueueDate());
|
||||
}
|
||||
|
||||
// Assert the queue has a max of active, in-memory swap, and on-disk swap
|
||||
assertEquals(1000, queue.getActiveFlowFiles().size());
|
||||
assertEquals(10001, queue.getFlowFileQueueSize().getSwappedCount());
|
||||
assertEquals(1, queue.getFlowFileQueueSize().getSwapFileCount());
|
||||
assertEquals(10000, swapManager.getSwapSummary(swapManager.recoverSwapLocations(flowFileQueue, "testGetMinLastQueueDate").get(0)).getQueueSize().getObjectCount());
|
||||
|
||||
// Ensure that the min and totals are correct
|
||||
long now = System.currentTimeMillis();
|
||||
long totalNow = now * flowFileRecords.size();
|
||||
assertEquals(totalNow - totalQueueDate, queue.getTotalQueuedDuration(now));
|
||||
assertEquals(minQueueDate, queue.getMinLastQueueDate());
|
||||
|
||||
List<FlowFileRecord> polledRecords = queue.poll(1000, Collections.emptySet(), -1);
|
||||
polledRecords.addAll(queue.poll(2, Collections.emptySet(), -1));
|
||||
|
||||
// Assert that the lone swap file was recovered into memory and that all numbers are as we still expect them to be.
|
||||
assertEquals(9998, queue.getActiveFlowFiles().size());
|
||||
assertEquals(1, queue.getFlowFileQueueSize().getSwappedCount());
|
||||
assertEquals(0, queue.getFlowFileQueueSize().getSwapFileCount());
|
||||
assert(swapManager.recoverSwapLocations(flowFileQueue, "testGetMinLastQueueDate").isEmpty());
|
||||
|
||||
// Ensure that the min and total are still correct
|
||||
flowFileRecords.removeAll(polledRecords);
|
||||
totalQueueDate = flowFileRecords.stream().mapToLong(FlowFileRecord::getLastQueueDate).sum();
|
||||
minQueueDate = flowFileRecords.stream().mapToLong(FlowFileRecord::getLastQueueDate).min().getAsLong();
|
||||
now = System.currentTimeMillis();
|
||||
totalNow = now * flowFileRecords.size();
|
||||
assertEquals(totalNow - totalQueueDate, queue.getTotalQueuedDuration(now));
|
||||
assertEquals(minQueueDate, queue.getMinLastQueueDate());
|
||||
}
|
||||
}
|
||||
|
@ -786,7 +786,7 @@ public class TestRocksDBFlowFileRepository {
|
||||
}
|
||||
}
|
||||
|
||||
return new StandardSwapSummary(new QueueSize(records.size(), size), maxId, resourceClaims);
|
||||
return new StandardSwapSummary(new QueueSize(records.size(), size), maxId, resourceClaims, 0L, 0L);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -172,6 +172,16 @@ public class TestWriteAheadFlowFileRepository {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getTotalQueuedDuration(long fromTimestamp) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMinLastQueueDate() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEmpty() {
|
||||
return false;
|
||||
@ -763,7 +773,7 @@ public class TestWriteAheadFlowFileRepository {
|
||||
}
|
||||
}
|
||||
|
||||
return new StandardSwapSummary(new QueueSize(records.size(), size), maxId, resourceClaims);
|
||||
return new StandardSwapSummary(new QueueSize(records.size(), size), maxId, resourceClaims, 0L, 0L);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -34,7 +34,7 @@ public class MockFlowFile implements FlowFileRecord {
|
||||
|
||||
private final long id;
|
||||
private final long entryDate = System.currentTimeMillis();
|
||||
private final long lastQueueDate = System.currentTimeMillis();
|
||||
private final long lastQueueDate = System.currentTimeMillis() + 1;
|
||||
private final Map<String, String> attributes;
|
||||
private final long size;
|
||||
private final ContentClaim contentClaim;
|
||||
|
@ -47,6 +47,7 @@ import org.apache.nifi.controller.repository.claim.ContentClaim;
|
||||
import org.apache.nifi.controller.repository.claim.ResourceClaim;
|
||||
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
|
||||
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.stream.io.NullOutputStream;
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
@ -108,6 +109,9 @@ public class TestSchemaSwapSerializerDeserializer {
|
||||
|
||||
final Set<ResourceClaim> uniqueClaims = new HashSet<>(resourceClaims);
|
||||
assertEquals(9999, uniqueClaims.size());
|
||||
|
||||
assertEquals((Long)toSwap.stream().mapToLong(FlowFile::getLastQueueDate).sum(), swapSummary.getTotalLastQueueDate());
|
||||
assertEquals((Long)toSwap.stream().mapToLong(FlowFile::getLastQueueDate).min().getAsLong(), swapSummary.getMinLastQueueDate());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -98,6 +98,25 @@ public class StatelessFlowFileQueue implements DrainableFlowFileQueue {
|
||||
return new QueueSize(flowFiles.size() + unacknowledgedCount.get(), totalBytes.get());
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getTotalQueuedDuration(long fromTimestamp) {
|
||||
long sum = 0L;
|
||||
for (FlowFileRecord flowFileRecord : flowFiles) {
|
||||
long l = fromTimestamp - flowFileRecord.getLastQueueDate();
|
||||
sum += l;
|
||||
}
|
||||
return sum;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMinLastQueueDate() {
|
||||
long min = 0;
|
||||
for (FlowFileRecord flowFile : flowFiles) {
|
||||
min = min == 0 ? flowFile.getLastQueueDate() : Long.min(min, flowFile.getLastQueueDate());
|
||||
}
|
||||
return min;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEmpty() {
|
||||
return flowFiles.isEmpty() && unacknowledgedCount.get() == 0;
|
||||
|
Loading…
x
Reference in New Issue
Block a user