mirror of https://github.com/apache/nifi.git
NIFI-730: Do not require a Read Lock in order to obtain backpressure configuration values for FlowFileQueue's
This commit is contained in:
parent
570202eb30
commit
edf238e004
|
@ -82,18 +82,12 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
|
||||||
private PriorityQueue<FlowFileRecord> activeQueue = null;
|
private PriorityQueue<FlowFileRecord> activeQueue = null;
|
||||||
private ArrayList<FlowFileRecord> swapQueue = null;
|
private ArrayList<FlowFileRecord> swapQueue = null;
|
||||||
|
|
||||||
// private final AtomicInteger activeQueueSizeRef = new AtomicInteger(0);
|
|
||||||
// private long activeQueueContentSize = 0L;
|
|
||||||
// private int swappedRecordCount = 0;
|
|
||||||
// private long swappedContentSize = 0L;
|
|
||||||
// private final AtomicReference<QueueSize> unacknowledgedSizeRef = new AtomicReference<>(new QueueSize(0, 0L));
|
|
||||||
|
|
||||||
private final AtomicReference<FlowFileQueueSize> size = new AtomicReference<>(new FlowFileQueueSize(0, 0L, 0, 0L, 0, 0L));
|
private final AtomicReference<FlowFileQueueSize> size = new AtomicReference<>(new FlowFileQueueSize(0, 0L, 0, 0L, 0, 0L));
|
||||||
|
|
||||||
private String maximumQueueDataSize;
|
|
||||||
private long maximumQueueByteCount;
|
|
||||||
private boolean swapMode = false;
|
private boolean swapMode = false;
|
||||||
private long maximumQueueObjectCount;
|
private volatile String maximumQueueDataSize;
|
||||||
|
private volatile long maximumQueueByteCount;
|
||||||
|
private volatile long maximumQueueObjectCount;
|
||||||
|
|
||||||
private final EventReporter eventReporter;
|
private final EventReporter eventReporter;
|
||||||
private final AtomicLong flowFileExpirationMillis;
|
private final AtomicLong flowFileExpirationMillis;
|
||||||
|
@ -104,6 +98,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
|
||||||
private final int swapThreshold;
|
private final int swapThreshold;
|
||||||
private final FlowFileSwapManager swapManager;
|
private final FlowFileSwapManager swapManager;
|
||||||
private final List<String> swapLocations = new ArrayList<>();
|
private final List<String> swapLocations = new ArrayList<>();
|
||||||
|
@SuppressWarnings("unused")
|
||||||
private final TimedLock readLock;
|
private final TimedLock readLock;
|
||||||
private final TimedLock writeLock;
|
private final TimedLock writeLock;
|
||||||
private final String identifier;
|
private final String identifier;
|
||||||
|
@ -178,12 +173,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getBackPressureObjectThreshold() {
|
public long getBackPressureObjectThreshold() {
|
||||||
readLock.lock();
|
|
||||||
try {
|
|
||||||
return maximumQueueObjectCount;
|
return maximumQueueObjectCount;
|
||||||
} finally {
|
|
||||||
readLock.unlock("getBackPressureObjectThreshold");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -200,12 +190,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getBackPressureDataSizeThreshold() {
|
public String getBackPressureDataSizeThreshold() {
|
||||||
readLock.lock();
|
|
||||||
try {
|
|
||||||
return maximumQueueDataSize;
|
return maximumQueueDataSize;
|
||||||
} finally {
|
|
||||||
readLock.unlock("getBackPressureDataSizeThreshold");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue