From c1b99d584d93bb6f26ba0986f2fcaf663b0caef3 Mon Sep 17 00:00:00 2001
From: Pierre Villard
Date: Thu, 3 Aug 2017 22:56:27 +0200
Subject: [PATCH] NIFI-4028 - fix cache update when Wait releases flow files
NIFI-4028: Refactored Wait processor.
- Consolidated implementation for the cases of releasableFlowCount is 1 or more, in order to reduce complexity and behavior differences
- Added 'consumed' counter when total counter is used to release incoming FlowFiles
- Fixed method name typo, releaseCandidates
This closes #2055.
Signed-off-by: Koji Kawamura
.../apache/nifi/processors/standard/ | 29 ++---
.../standard/ | 36 ++++--
.../nifi/processors/standard/ | 60 +++++++++-
.../standard/ | 109 +++++++++++++++++-
4 files changed, 203 insertions(+), 31 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/
index fccd443817..4e5ae5dd32 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/
@@ -305,6 +305,8 @@ public class Wait extends AbstractProcessor {
final String attributeCopyMode = context.getProperty(ATTRIBUTE_COPY_MODE).getValue();
final boolean replaceOriginalAttributes = ATTRIBUTE_COPY_REPLACE.getValue().equals(attributeCopyMode);
final AtomicReference signalRef = new AtomicReference<>();
+ // This map contains original counts before those are consumed to release incoming FlowFiles.
+ final HashMap originalSignalCounts = new HashMap<>();
final Consumer transferToFailure = flowFile -> {
flowFile = session.penalize(flowFile);
@@ -324,7 +326,7 @@ public class Wait extends AbstractProcessor {
final List flowFilesWithSignalAttributes = routedFlowFiles.getValue().stream()
- .map(f -> copySignalAttributes(session, f, signalRef.get(), replaceOriginalAttributes)).collect(Collectors.toList());
+ .map(f -> copySignalAttributes(session, f, signalRef.get(), originalSignalCounts, replaceOriginalAttributes)).collect(Collectors.toList());
session.transfer(flowFilesWithSignalAttributes, relationship);
@@ -349,6 +351,9 @@ public class Wait extends AbstractProcessor {
// get notifying signal
try {
signal = protocol.getSignal(signalId);
+ if (signal != null) {
+ originalSignalCounts.putAll(signal.getCounts());
+ }
} catch (final IOException e) {
throw new ProcessException(String.format("Failed to get signal for %s due to %s", signalId, e), e);
@@ -423,29 +428,20 @@ public class Wait extends AbstractProcessor {
boolean waitProgressed = false;
if (signal != null && !candidates.isEmpty()) {
- if (releasableFlowFileCount > 1) {
- signal.releaseCandidatese(targetCounterName, targetCount, releasableFlowFileCount, candidates,
+ if (releasableFlowFileCount > 0) {
+ signal.releaseCandidates(targetCounterName, targetCount, releasableFlowFileCount, candidates,
released -> getFlowFilesFor.apply(REL_SUCCESS).addAll(released),
waiting -> getFlowFilesFor.apply(REL_WAIT).addAll(waiting));
+ waitCompleted = signal.getTotalCount() == 0 && signal.getReleasableCount() == 0;
waitProgressed = !getFlowFilesFor.apply(REL_SUCCESS).isEmpty();
} else {
- // releasableFlowFileCount = 0 or 1
boolean reachedTargetCount = StringUtils.isBlank(targetCounterName)
? signal.isTotalCountReached(targetCount)
: signal.isCountReached(targetCounterName, targetCount);
if (reachedTargetCount) {
- if (releasableFlowFileCount == 0) {
- getFlowFilesFor.apply(REL_SUCCESS).addAll(candidates);
- } else {
- // releasableFlowFileCount = 1
- getFlowFilesFor.apply(REL_SUCCESS).add(candidates.remove(0));
- getFlowFilesFor.apply(REL_WAIT).addAll(candidates);
- // If releasableFlowFileCount == 0, leave signal as it is,
- // so that any number of FlowFile can be released as long as target count condition matches.
- waitCompleted = true;
- }
+ getFlowFilesFor.apply(REL_SUCCESS).addAll(candidates);
} else {
@@ -470,7 +466,7 @@ public class Wait extends AbstractProcessor {
- private FlowFile copySignalAttributes(final ProcessSession session, final FlowFile flowFile, final Signal signal, final boolean replaceOriginal) {
+ private FlowFile copySignalAttributes(final ProcessSession session, final FlowFile flowFile, final Signal signal, final Map originalCount, final boolean replaceOriginal) {
if (signal == null) {
return flowFile;
@@ -488,8 +484,7 @@ public class Wait extends AbstractProcessor {
// Copy counter attributes
- final Map counts = signal.getCounts();
- final long totalCount = counts.entrySet().stream().mapToLong(e -> {
+ final long totalCount = originalCount.entrySet().stream().mapToLong(e -> {
final Long count = e.getValue();
attributesToCopy.put("wait.counter." + e.getKey(), String.valueOf(count));
return count;
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/
index 61834553ff..2c9c9fdfcc 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/
@@ -16,6 +16,7 @@
package org.apache.nifi.processors.standard;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
@@ -45,6 +46,7 @@ public class WaitNotifyProtocol {
private static final Logger logger = LoggerFactory.getLogger(WaitNotifyProtocol.class);
public static final String DEFAULT_COUNT_NAME = "default";
+ public static final String CONSUMED_COUNT_NAME = "consumed";
private static final int MAX_REPLACE_RETRY_COUNT = 5;
private static final int REPLACE_RETRY_WAIT_MILLIS = 10;
@@ -86,9 +88,13 @@ public class WaitNotifyProtocol {
this.attributes = attributes;
+ @JsonIgnore
+ public long getTotalCount() {
+ return counts.values().stream().mapToLong(Long::longValue).sum();
+ }
public boolean isTotalCountReached(final long targetCount) {
- final long totalCount = counts.values().stream().mapToLong(Long::longValue).sum();
- return totalCount >= targetCount;
+ return getTotalCount() >= targetCount;
public boolean isCountReached(final String counterName, final long targetCount) {
@@ -96,6 +102,10 @@ public class WaitNotifyProtocol {
public long getCount(final String counterName) {
+ if (counterName == null || counterName.isEmpty()) {
+ return getTotalCount();
+ }
final Long count = counts.get(counterName);
return count != null ? count : 0;
@@ -115,7 +125,7 @@ public class WaitNotifyProtocol {
* Caller of this method is responsible for updating cache storage after processing released and waiting candidates
* by calling {@link #replace(Signal)}. Caller should rollback what it processed with these candidates if complete call failed.
- * @param _counterName signal counter name to consume from.
+ * @param counterName signal counter name to consume from. If not specified, total counter is used, and 'consumed' counter is added to subtract consumed counters from total counter.
* @param requiredCountForPass number of required signals to acquire a pass.
* @param releasableCandidateCountPerPass number of releasable candidate per pass.
* @param candidates candidates waiting for being allowed to pass.
@@ -123,12 +133,9 @@ public class WaitNotifyProtocol {
* @param waiting function to process candidates those should remain in waiting queue.
* @param Type of candidate
- public void releaseCandidatese(final String _counterName, final long requiredCountForPass,
- final int releasableCandidateCountPerPass, final List candidates,
- final Consumer> released, final Consumer> waiting) {
- // counterName is mandatory otherwise, we can't decide which counter to convert into pass count.
- final String counterName = _counterName == null || _counterName.length() == 0 ? DEFAULT_COUNT_NAME : _counterName;
+ public void releaseCandidates(final String counterName, final long requiredCountForPass,
+ final int releasableCandidateCountPerPass, final List candidates,
+ final Consumer> released, final Consumer> waiting) {
final int candidateSize = candidates.size();
if (releasableCount < candidateSize) {
@@ -137,11 +144,18 @@ public class WaitNotifyProtocol {
final long signalCount = getCount(counterName);
releasableCount += (signalCount / requiredCountForPass) * releasableCandidateCountPerPass;
final long reducedSignalCount = signalCount % requiredCountForPass;
- counts.put(counterName, reducedSignalCount);
+ if (counterName != null && !counterName.isEmpty()) {
+ // Update target counter with reduced count.
+ counts.put(counterName, reducedSignalCount);
+ } else {
+ // If target counter name is not specified, add consumed count to subtract from accumulated total count.
+ Long consumedCount = counts.getOrDefault(CONSUMED_COUNT_NAME, 0L);
+ consumedCount -= signalCount - reducedSignalCount;
+ counts.put(CONSUMED_COUNT_NAME, consumedCount);
+ }
int releaseCount = Math.min(releasableCount, candidateSize);
released.accept(candidates.subList(0, releaseCount));
waiting.accept(candidates.subList(releaseCount, candidateSize));
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/
index 71187b6def..a4df2f37e9 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/
@@ -25,6 +25,7 @@ import static;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -459,8 +460,65 @@ public class TestWait {
- assertNull("The key no longer exist", protocol.getSignal("key"));
+ }
+ @Test
+ public void testDecrementCache() throws ConcurrentModificationException, IOException {
+ Map cachedAttributes = new HashMap<>();
+ cachedAttributes.put("both", "notifyValue");
+ cachedAttributes.put("uuid", "notifyUuid");
+ cachedAttributes.put("notify.only", "notifyValue");
+ // Setup existing cache entry.
+ final WaitNotifyProtocol protocol = new WaitNotifyProtocol(service);
+ // A flow file comes in Notify and increment the counter
+ protocol.notify("key", "counter", 1, cachedAttributes);
+ // another flow files comes in Notify and increment the counter
+ protocol.notify("key", "counter", 1, cachedAttributes);
+ runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
+ runner.setProperty(Wait.SIGNAL_COUNTER_NAME, "${signalCounterName}");
+ runner.setProperty(Wait.TARGET_SIGNAL_COUNT, "1");
+ runner.assertValid();
+ final Map waitAttributes = new HashMap<>();
+ waitAttributes.put("releaseSignalAttribute", "key");
+ waitAttributes.put("signalCounterName", "counter");
+ waitAttributes.put("wait.only", "waitValue");
+ waitAttributes.put("both", "waitValue");
+ waitAttributes.put("uuid", UUID.randomUUID().toString());
+ String flowFileContent = "content";
+ runner.enqueue(flowFileContent.getBytes("UTF-8"), waitAttributes);
+ /*
+ * 1st iteration
+ */
+ runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1);
+ MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
+ outputFlowFile.assertAttributeEquals("wait.counter.counter", "2");
+ // expect counter to be decremented to 0 and releasable count remains 1.
+ assertEquals("0", Long.toString(protocol.getSignal("key").getCount("counter")));
+ assertEquals("1", Long.toString(protocol.getSignal("key").getReleasableCount()));
+ // introduce a second flow file with the same signal attribute
+ runner.enqueue(flowFileContent.getBytes("UTF-8"), waitAttributes);
+ /*
+ * 2nd iteration
+ */
+ runner.clearTransferState();
+ runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1);
+ outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
+ // All counters are consumed.
+ outputFlowFile.assertAttributeEquals("wait.counter.counter", "0");
+ assertNull("The key no longer exist", protocol.getSignal("key"));
+ runner.clearTransferState();
private class TestIteration {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/
index e3f982c8e1..01983d5230 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/
@@ -37,6 +37,7 @@ import java.util.function.BiConsumer;
+import static org.apache.nifi.processors.standard.WaitNotifyProtocol.CONSUMED_COUNT_NAME;
import static org.apache.nifi.processors.standard.WaitNotifyProtocol.DEFAULT_COUNT_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -267,12 +268,12 @@ public class TestWaitNotifyProtocol {
final List waiting = new ArrayList<>();
// Test default name.
- final String counterName = null;
+ final String counterName = DEFAULT_COUNT_NAME;
final BiConsumer releaseCandidate = (requiredCountForPass, releasableCandidatePerPass) -> {
- signal.releaseCandidatese(counterName, requiredCountForPass, releasableCandidatePerPass, candidates,
+ signal.releaseCandidates(counterName, requiredCountForPass, releasableCandidatePerPass, candidates,
r -> released.addAll(r), w -> waiting.addAll(w));
@@ -336,4 +337,108 @@ public class TestWaitNotifyProtocol {
+ @Test
+ public void testReleaseCandidateTotal() throws Exception {
+ final List candidates = IntStream.range(0, 10).boxed().collect(Collectors.toList());
+ final Signal signal = new Signal();
+ final List released = new ArrayList<>();
+ final List waiting = new ArrayList<>();
+ // Test empty counter name, should use total counters.
+ final String emptyCounterName = null;
+ final BiConsumer releaseCandidate = (requiredCountForPass, releasableCandidatePerPass) -> {
+ released.clear();
+ waiting.clear();
+ signal.releaseCandidates(emptyCounterName, requiredCountForPass, releasableCandidatePerPass, candidates,
+ r -> released.addAll(r), w -> waiting.addAll(w));
+ };
+ final String counterA = "counterA";
+ final String counterB = "counterB";
+ final String counterC = "counterC";
+ final Field releasableCount = Signal.class.getDeclaredField("releasableCount");
+ releasableCount.setAccessible(true);
+ // No counter, should wait.
+ releaseCandidate.accept(3L, 1);
+ assertEquals(0, released.size());
+ assertEquals(10, waiting.size());
+ assertEquals(0, signal.getCount(emptyCounterName));
+ assertEquals(0, signal.getCount(CONSUMED_COUNT_NAME));
+ assertEquals(0, releasableCount.getInt(signal));
+ // Counter is not enough yet.
+ signal.getCounts().put(counterA, 1L);
+ signal.getCounts().remove(CONSUMED_COUNT_NAME);
+ releaseCandidate.accept(3L, 1);
+ assertEquals(0, released.size());
+ assertEquals(10, waiting.size());
+ assertEquals(1, signal.getCount(emptyCounterName)); // Counter incremented, but not enough
+ assertEquals(0, signal.getCount(CONSUMED_COUNT_NAME));
+ assertEquals(0, releasableCount.getInt(signal));
+ // Counter reached the target.
+ signal.getCounts().put(counterA, 1L);
+ signal.getCounts().put(counterB, 1L);
+ signal.getCounts().put(counterC, 1L);
+ signal.getCounts().remove(CONSUMED_COUNT_NAME);
+ releaseCandidate.accept(3L, 1);
+ assertEquals(1, released.size());
+ assertEquals(9, waiting.size());
+ assertEquals(0, signal.getCount(emptyCounterName)); // Counter 3 was converted into 1 release
+ assertEquals(-3, signal.getCount(CONSUMED_COUNT_NAME));
+ assertEquals(0, releasableCount.getInt(signal));
+ // Counter reached the target for two candidates.
+ signal.getCounts().put(counterA, 1L);
+ signal.getCounts().put(counterB, 2L);
+ signal.getCounts().put(counterC, 3L);
+ signal.getCounts().remove(CONSUMED_COUNT_NAME);
+ releaseCandidate.accept(3L, 1);
+ assertEquals(2, released.size());
+ assertEquals(8, waiting.size());
+ assertEquals(0, signal.getCount(emptyCounterName)); // Counter 3 was converted into 1 release
+ assertEquals(-6, signal.getCount(CONSUMED_COUNT_NAME));
+ assertEquals(0, releasableCount.getInt(signal));
+ // Counter reached the target for two candidates, and reminder is 2.
+ signal.getCounts().put(counterA, 3L);
+ signal.getCounts().put(counterB, 3L);
+ signal.getCounts().put(counterC, 5L);
+ signal.getCounts().remove(CONSUMED_COUNT_NAME);
+ releaseCandidate.accept(3L, 1);
+ assertEquals(3, released.size()); // 11 / 3 = 3
+ assertEquals(7, waiting.size());
+ assertEquals(2, signal.getCount(emptyCounterName));
+ assertEquals(-9, signal.getCount(CONSUMED_COUNT_NAME));
+ assertEquals(0, releasableCount.getInt(signal));
+ // Counter reached the target for two pass count and each pass can release 2 candidates.
+ signal.getCounts().put(counterA, 1L);
+ signal.getCounts().put(counterB, 2L);
+ signal.getCounts().put(counterC, 3L);
+ signal.getCounts().remove(CONSUMED_COUNT_NAME);
+ releaseCandidate.accept(3L, 2);
+ assertEquals(4, released.size()); // (6 / 3) * 2 = 4
+ assertEquals(6, waiting.size());
+ assertEquals(0, signal.getCount(emptyCounterName));
+ assertEquals(-6, signal.getCount(CONSUMED_COUNT_NAME));
+ assertEquals(0, releasableCount.getInt(signal));
+ // If there are counts more than enough to release current candidates, unused releasableCount should remain.
+ signal.getCounts().put(counterA, 10L);
+ signal.getCounts().put(counterB, 20L);
+ signal.getCounts().put(counterC, 20L);
+ signal.getCounts().remove(CONSUMED_COUNT_NAME);
+ releaseCandidate.accept(3L, 2);
+ assertEquals(10, released.size()); // (50 / 3) * 2 = 32. Used 10.
+ assertEquals(0, waiting.size());
+ assertEquals(2, signal.getCount(emptyCounterName)); // 50 % 3 = 2.
+ assertEquals(-48, signal.getCount(CONSUMED_COUNT_NAME)); // 50 % 3 = 2.
+ assertEquals(22, releasableCount.getInt(signal)); // 32 - 10 = 22.
+ }
\ No newline at end of file