NIFI-4028 - fix cache update when Wait releases flow files

NIFI-4028: Refactored Wait processor.

- Consolidated implementation for the cases of releasableFlowCount is 1 or more, in order to reduce complexity and behavior differences
- Added 'consumed' counter when total counter is used to release incoming FlowFiles
- Fixed method name typo, releaseCandidates

This closes #2055.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Pierre Villard 2017-08-03 22:56:27 +02:00 committed by Koji Kawamura
parent 9f23ec360a
commit c1b99d584d
4 changed files with 203 additions and 31 deletions

View File

@ -305,6 +305,8 @@ public class Wait extends AbstractProcessor {
final String attributeCopyMode = context.getProperty(ATTRIBUTE_COPY_MODE).getValue(); final String attributeCopyMode = context.getProperty(ATTRIBUTE_COPY_MODE).getValue();
final boolean replaceOriginalAttributes = ATTRIBUTE_COPY_REPLACE.getValue().equals(attributeCopyMode); final boolean replaceOriginalAttributes = ATTRIBUTE_COPY_REPLACE.getValue().equals(attributeCopyMode);
final AtomicReference<Signal> signalRef = new AtomicReference<>(); final AtomicReference<Signal> signalRef = new AtomicReference<>();
// This map contains original counts before those are consumed to release incoming FlowFiles.
final HashMap<String, Long> originalSignalCounts = new HashMap<>();
final Consumer<FlowFile> transferToFailure = flowFile -> { final Consumer<FlowFile> transferToFailure = flowFile -> {
flowFile = session.penalize(flowFile); flowFile = session.penalize(flowFile);
@ -324,7 +326,7 @@ public class Wait extends AbstractProcessor {
} }
final List<FlowFile> flowFilesWithSignalAttributes = routedFlowFiles.getValue().stream() final List<FlowFile> flowFilesWithSignalAttributes = routedFlowFiles.getValue().stream()
.map(f -> copySignalAttributes(session, f, signalRef.get(), replaceOriginalAttributes)).collect(Collectors.toList()); .map(f -> copySignalAttributes(session, f, signalRef.get(), originalSignalCounts, replaceOriginalAttributes)).collect(Collectors.toList());
session.transfer(flowFilesWithSignalAttributes, relationship); session.transfer(flowFilesWithSignalAttributes, relationship);
}; };
@ -349,6 +351,9 @@ public class Wait extends AbstractProcessor {
// get notifying signal // get notifying signal
try { try {
signal = protocol.getSignal(signalId); signal = protocol.getSignal(signalId);
if (signal != null) {
originalSignalCounts.putAll(signal.getCounts());
}
signalRef.set(signal); signalRef.set(signal);
} catch (final IOException e) { } catch (final IOException e) {
throw new ProcessException(String.format("Failed to get signal for %s due to %s", signalId, e), e); throw new ProcessException(String.format("Failed to get signal for %s due to %s", signalId, e), e);
@ -423,29 +428,20 @@ public class Wait extends AbstractProcessor {
boolean waitProgressed = false; boolean waitProgressed = false;
if (signal != null && !candidates.isEmpty()) { if (signal != null && !candidates.isEmpty()) {
if (releasableFlowFileCount > 1) { if (releasableFlowFileCount > 0) {
signal.releaseCandidatese(targetCounterName, targetCount, releasableFlowFileCount, candidates, signal.releaseCandidates(targetCounterName, targetCount, releasableFlowFileCount, candidates,
released -> getFlowFilesFor.apply(REL_SUCCESS).addAll(released), released -> getFlowFilesFor.apply(REL_SUCCESS).addAll(released),
waiting -> getFlowFilesFor.apply(REL_WAIT).addAll(waiting)); waiting -> getFlowFilesFor.apply(REL_WAIT).addAll(waiting));
waitCompleted = signal.getTotalCount() == 0 && signal.getReleasableCount() == 0;
waitProgressed = !getFlowFilesFor.apply(REL_SUCCESS).isEmpty(); waitProgressed = !getFlowFilesFor.apply(REL_SUCCESS).isEmpty();
} else { } else {
// releasableFlowFileCount = 0 or 1
boolean reachedTargetCount = StringUtils.isBlank(targetCounterName) boolean reachedTargetCount = StringUtils.isBlank(targetCounterName)
? signal.isTotalCountReached(targetCount) ? signal.isTotalCountReached(targetCount)
: signal.isCountReached(targetCounterName, targetCount); : signal.isCountReached(targetCounterName, targetCount);
if (reachedTargetCount) { if (reachedTargetCount) {
if (releasableFlowFileCount == 0) {
getFlowFilesFor.apply(REL_SUCCESS).addAll(candidates); getFlowFilesFor.apply(REL_SUCCESS).addAll(candidates);
} else {
// releasableFlowFileCount = 1
getFlowFilesFor.apply(REL_SUCCESS).add(candidates.remove(0));
getFlowFilesFor.apply(REL_WAIT).addAll(candidates);
// If releasableFlowFileCount == 0, leave signal as it is,
// so that any number of FlowFile can be released as long as target count condition matches.
waitCompleted = true;
}
} else { } else {
getFlowFilesFor.apply(REL_WAIT).addAll(candidates); getFlowFilesFor.apply(REL_WAIT).addAll(candidates);
} }
@ -470,7 +466,7 @@ public class Wait extends AbstractProcessor {
} }
private FlowFile copySignalAttributes(final ProcessSession session, final FlowFile flowFile, final Signal signal, final boolean replaceOriginal) { private FlowFile copySignalAttributes(final ProcessSession session, final FlowFile flowFile, final Signal signal, final Map<String, Long> originalCount, final boolean replaceOriginal) {
if (signal == null) { if (signal == null) {
return flowFile; return flowFile;
} }
@ -488,8 +484,7 @@ public class Wait extends AbstractProcessor {
} }
// Copy counter attributes // Copy counter attributes
final Map<String, Long> counts = signal.getCounts(); final long totalCount = originalCount.entrySet().stream().mapToLong(e -> {
final long totalCount = counts.entrySet().stream().mapToLong(e -> {
final Long count = e.getValue(); final Long count = e.getValue();
attributesToCopy.put("wait.counter." + e.getKey(), String.valueOf(count)); attributesToCopy.put("wait.counter." + e.getKey(), String.valueOf(count));
return count; return count;

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.distributed.cache.client.AtomicCacheEntry; import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
@ -45,6 +46,7 @@ public class WaitNotifyProtocol {
private static final Logger logger = LoggerFactory.getLogger(WaitNotifyProtocol.class); private static final Logger logger = LoggerFactory.getLogger(WaitNotifyProtocol.class);
public static final String DEFAULT_COUNT_NAME = "default"; public static final String DEFAULT_COUNT_NAME = "default";
public static final String CONSUMED_COUNT_NAME = "consumed";
private static final int MAX_REPLACE_RETRY_COUNT = 5; private static final int MAX_REPLACE_RETRY_COUNT = 5;
private static final int REPLACE_RETRY_WAIT_MILLIS = 10; private static final int REPLACE_RETRY_WAIT_MILLIS = 10;
@ -86,9 +88,13 @@ public class WaitNotifyProtocol {
this.attributes = attributes; this.attributes = attributes;
} }
@JsonIgnore
public long getTotalCount() {
return counts.values().stream().mapToLong(Long::longValue).sum();
}
public boolean isTotalCountReached(final long targetCount) { public boolean isTotalCountReached(final long targetCount) {
final long totalCount = counts.values().stream().mapToLong(Long::longValue).sum(); return getTotalCount() >= targetCount;
return totalCount >= targetCount;
} }
public boolean isCountReached(final String counterName, final long targetCount) { public boolean isCountReached(final String counterName, final long targetCount) {
@ -96,6 +102,10 @@ public class WaitNotifyProtocol {
} }
public long getCount(final String counterName) { public long getCount(final String counterName) {
if (counterName == null || counterName.isEmpty()) {
return getTotalCount();
}
final Long count = counts.get(counterName); final Long count = counts.get(counterName);
return count != null ? count : 0; return count != null ? count : 0;
} }
@ -115,7 +125,7 @@ public class WaitNotifyProtocol {
* Caller of this method is responsible for updating cache storage after processing released and waiting candidates * Caller of this method is responsible for updating cache storage after processing released and waiting candidates
* by calling {@link #replace(Signal)}. Caller should rollback what it processed with these candidates if complete call failed.</p> * by calling {@link #replace(Signal)}. Caller should rollback what it processed with these candidates if complete call failed.</p>
* *
* @param _counterName signal counter name to consume from. * @param counterName signal counter name to consume from. If not specified, total counter is used, and 'consumed' counter is added to subtract consumed counters from total counter.
* @param requiredCountForPass number of required signals to acquire a pass. * @param requiredCountForPass number of required signals to acquire a pass.
* @param releasableCandidateCountPerPass number of releasable candidate per pass. * @param releasableCandidateCountPerPass number of releasable candidate per pass.
* @param candidates candidates waiting for being allowed to pass. * @param candidates candidates waiting for being allowed to pass.
@ -123,13 +133,10 @@ public class WaitNotifyProtocol {
* @param waiting function to process candidates those should remain in waiting queue. * @param waiting function to process candidates those should remain in waiting queue.
* @param <E> Type of candidate * @param <E> Type of candidate
*/ */
public <E> void releaseCandidatese(final String _counterName, final long requiredCountForPass, public <E> void releaseCandidates(final String counterName, final long requiredCountForPass,
final int releasableCandidateCountPerPass, final List<E> candidates, final int releasableCandidateCountPerPass, final List<E> candidates,
final Consumer<List<E>> released, final Consumer<List<E>> waiting) { final Consumer<List<E>> released, final Consumer<List<E>> waiting) {
// counterName is mandatory otherwise, we can't decide which counter to convert into pass count.
final String counterName = _counterName == null || _counterName.length() == 0 ? DEFAULT_COUNT_NAME : _counterName;
final int candidateSize = candidates.size(); final int candidateSize = candidates.size();
if (releasableCount < candidateSize) { if (releasableCount < candidateSize) {
// If current passCount is not enough for the candidate size, then try to get more. // If current passCount is not enough for the candidate size, then try to get more.
@ -137,11 +144,18 @@ public class WaitNotifyProtocol {
final long signalCount = getCount(counterName); final long signalCount = getCount(counterName);
releasableCount += (signalCount / requiredCountForPass) * releasableCandidateCountPerPass; releasableCount += (signalCount / requiredCountForPass) * releasableCandidateCountPerPass;
final long reducedSignalCount = signalCount % requiredCountForPass; final long reducedSignalCount = signalCount % requiredCountForPass;
if (counterName != null && !counterName.isEmpty()) {
// Update target counter with reduced count.
counts.put(counterName, reducedSignalCount); counts.put(counterName, reducedSignalCount);
} else {
// If target counter name is not specified, add consumed count to subtract from accumulated total count.
Long consumedCount = counts.getOrDefault(CONSUMED_COUNT_NAME, 0L);
consumedCount -= signalCount - reducedSignalCount;
counts.put(CONSUMED_COUNT_NAME, consumedCount);
}
} }
int releaseCount = Math.min(releasableCount, candidateSize); int releaseCount = Math.min(releasableCount, candidateSize);
released.accept(candidates.subList(0, releaseCount)); released.accept(candidates.subList(0, releaseCount));
waiting.accept(candidates.subList(releaseCount, candidateSize)); waiting.accept(candidates.subList(releaseCount, candidateSize));

View File

@ -25,6 +25,7 @@ import static org.junit.Assert.fail;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.ConcurrentModificationException;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -459,8 +460,65 @@ public class TestWait {
runner.clearTransferState(); runner.clearTransferState();
assertNull("The key no longer exist", protocol.getSignal("key")); }
@Test
public void testDecrementCache() throws ConcurrentModificationException, IOException {
Map<String, String> cachedAttributes = new HashMap<>();
cachedAttributes.put("both", "notifyValue");
cachedAttributes.put("uuid", "notifyUuid");
cachedAttributes.put("notify.only", "notifyValue");
// Setup existing cache entry.
final WaitNotifyProtocol protocol = new WaitNotifyProtocol(service);
// A flow file comes in Notify and increment the counter
protocol.notify("key", "counter", 1, cachedAttributes);
// another flow files comes in Notify and increment the counter
protocol.notify("key", "counter", 1, cachedAttributes);
runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
runner.setProperty(Wait.SIGNAL_COUNTER_NAME, "${signalCounterName}");
runner.setProperty(Wait.TARGET_SIGNAL_COUNT, "1");
runner.assertValid();
final Map<String, String> waitAttributes = new HashMap<>();
waitAttributes.put("releaseSignalAttribute", "key");
waitAttributes.put("signalCounterName", "counter");
waitAttributes.put("wait.only", "waitValue");
waitAttributes.put("both", "waitValue");
waitAttributes.put("uuid", UUID.randomUUID().toString());
String flowFileContent = "content";
runner.enqueue(flowFileContent.getBytes("UTF-8"), waitAttributes);
/*
* 1st iteration
*/
runner.run();
runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1);
MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
outputFlowFile.assertAttributeEquals("wait.counter.counter", "2");
// expect counter to be decremented to 0 and releasable count remains 1.
assertEquals("0", Long.toString(protocol.getSignal("key").getCount("counter")));
assertEquals("1", Long.toString(protocol.getSignal("key").getReleasableCount()));
// introduce a second flow file with the same signal attribute
runner.enqueue(flowFileContent.getBytes("UTF-8"), waitAttributes);
/*
* 2nd iteration
*/
runner.clearTransferState();
runner.run();
runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1);
outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
// All counters are consumed.
outputFlowFile.assertAttributeEquals("wait.counter.counter", "0");
assertNull("The key no longer exist", protocol.getSignal("key"));
runner.clearTransferState();
} }
private class TestIteration { private class TestIteration {

View File

@ -37,6 +37,7 @@ import java.util.function.BiConsumer;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import static org.apache.nifi.processors.standard.WaitNotifyProtocol.CONSUMED_COUNT_NAME;
import static org.apache.nifi.processors.standard.WaitNotifyProtocol.DEFAULT_COUNT_NAME; import static org.apache.nifi.processors.standard.WaitNotifyProtocol.DEFAULT_COUNT_NAME;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
@ -267,12 +268,12 @@ public class TestWaitNotifyProtocol {
final List<Integer> waiting = new ArrayList<>(); final List<Integer> waiting = new ArrayList<>();
// Test default name. // Test default name.
final String counterName = null; final String counterName = DEFAULT_COUNT_NAME;
final BiConsumer<Long, Integer> releaseCandidate = (requiredCountForPass, releasableCandidatePerPass) -> { final BiConsumer<Long, Integer> releaseCandidate = (requiredCountForPass, releasableCandidatePerPass) -> {
released.clear(); released.clear();
waiting.clear(); waiting.clear();
signal.releaseCandidatese(counterName, requiredCountForPass, releasableCandidatePerPass, candidates, signal.releaseCandidates(counterName, requiredCountForPass, releasableCandidatePerPass, candidates,
r -> released.addAll(r), w -> waiting.addAll(w)); r -> released.addAll(r), w -> waiting.addAll(w));
}; };
@ -336,4 +337,108 @@ public class TestWaitNotifyProtocol {
} }
@Test
public void testReleaseCandidateTotal() throws Exception {
final List<Integer> candidates = IntStream.range(0, 10).boxed().collect(Collectors.toList());
final Signal signal = new Signal();
final List<Integer> released = new ArrayList<>();
final List<Integer> waiting = new ArrayList<>();
// Test empty counter name, should use total counters.
final String emptyCounterName = null;
final BiConsumer<Long, Integer> releaseCandidate = (requiredCountForPass, releasableCandidatePerPass) -> {
released.clear();
waiting.clear();
signal.releaseCandidates(emptyCounterName, requiredCountForPass, releasableCandidatePerPass, candidates,
r -> released.addAll(r), w -> waiting.addAll(w));
};
final String counterA = "counterA";
final String counterB = "counterB";
final String counterC = "counterC";
final Field releasableCount = Signal.class.getDeclaredField("releasableCount");
releasableCount.setAccessible(true);
// No counter, should wait.
releaseCandidate.accept(3L, 1);
assertEquals(0, released.size());
assertEquals(10, waiting.size());
assertEquals(0, signal.getCount(emptyCounterName));
assertEquals(0, signal.getCount(CONSUMED_COUNT_NAME));
assertEquals(0, releasableCount.getInt(signal));
// Counter is not enough yet.
signal.getCounts().put(counterA, 1L);
signal.getCounts().remove(CONSUMED_COUNT_NAME);
releaseCandidate.accept(3L, 1);
assertEquals(0, released.size());
assertEquals(10, waiting.size());
assertEquals(1, signal.getCount(emptyCounterName)); // Counter incremented, but not enough
assertEquals(0, signal.getCount(CONSUMED_COUNT_NAME));
assertEquals(0, releasableCount.getInt(signal));
// Counter reached the target.
signal.getCounts().put(counterA, 1L);
signal.getCounts().put(counterB, 1L);
signal.getCounts().put(counterC, 1L);
signal.getCounts().remove(CONSUMED_COUNT_NAME);
releaseCandidate.accept(3L, 1);
assertEquals(1, released.size());
assertEquals(9, waiting.size());
assertEquals(0, signal.getCount(emptyCounterName)); // Counter 3 was converted into 1 release
assertEquals(-3, signal.getCount(CONSUMED_COUNT_NAME));
assertEquals(0, releasableCount.getInt(signal));
// Counter reached the target for two candidates.
signal.getCounts().put(counterA, 1L);
signal.getCounts().put(counterB, 2L);
signal.getCounts().put(counterC, 3L);
signal.getCounts().remove(CONSUMED_COUNT_NAME);
releaseCandidate.accept(3L, 1);
assertEquals(2, released.size());
assertEquals(8, waiting.size());
assertEquals(0, signal.getCount(emptyCounterName)); // Counter 3 was converted into 1 release
assertEquals(-6, signal.getCount(CONSUMED_COUNT_NAME));
assertEquals(0, releasableCount.getInt(signal));
// Counter reached the target for two candidates, and reminder is 2.
signal.getCounts().put(counterA, 3L);
signal.getCounts().put(counterB, 3L);
signal.getCounts().put(counterC, 5L);
signal.getCounts().remove(CONSUMED_COUNT_NAME);
releaseCandidate.accept(3L, 1);
assertEquals(3, released.size()); // 11 / 3 = 3
assertEquals(7, waiting.size());
assertEquals(2, signal.getCount(emptyCounterName));
assertEquals(-9, signal.getCount(CONSUMED_COUNT_NAME));
assertEquals(0, releasableCount.getInt(signal));
// Counter reached the target for two pass count and each pass can release 2 candidates.
signal.getCounts().put(counterA, 1L);
signal.getCounts().put(counterB, 2L);
signal.getCounts().put(counterC, 3L);
signal.getCounts().remove(CONSUMED_COUNT_NAME);
releaseCandidate.accept(3L, 2);
assertEquals(4, released.size()); // (6 / 3) * 2 = 4
assertEquals(6, waiting.size());
assertEquals(0, signal.getCount(emptyCounterName));
assertEquals(-6, signal.getCount(CONSUMED_COUNT_NAME));
assertEquals(0, releasableCount.getInt(signal));
// If there are counts more than enough to release current candidates, unused releasableCount should remain.
signal.getCounts().put(counterA, 10L);
signal.getCounts().put(counterB, 20L);
signal.getCounts().put(counterC, 20L);
signal.getCounts().remove(CONSUMED_COUNT_NAME);
releaseCandidate.accept(3L, 2);
assertEquals(10, released.size()); // (50 / 3) * 2 = 32. Used 10.
assertEquals(0, waiting.size());
assertEquals(2, signal.getCount(emptyCounterName)); // 50 % 3 = 2.
assertEquals(-48, signal.getCount(CONSUMED_COUNT_NAME)); // 50 % 3 = 2.
assertEquals(22, releasableCount.getInt(signal)); // 32 - 10 = 22.
}
} }