NIFI-3545: Release M FlowFilews once N signals arrive

- Support multiplle incoming FlowFiles to Wait processor, up to Wait
  Buffer Count
- Added Releasable FlowFile Count, which controls how many FlowFiles can
  be released when wait condition is met
- Added special meaning to Notify delta Zero(0) to clear a signal
  counter back to zero

  This closes #1554

Signed-off-by: Aldrin Piri <aldrin@apache.org>
This commit is contained in:
Koji Kawamura 2017-03-02 22:54:46 +09:00 committed by Aldrin Piri
parent 7f5c0dfb54
commit 000414e7ea
No known key found for this signature in database
GPG Key ID: 531AEBAA4CFE5D00
5 changed files with 618 additions and 98 deletions

View File

@ -97,7 +97,10 @@ public class Notify extends AbstractProcessor {
"be evaluated against a FlowFile in order to determine the signal counter delta. " +
"Specify how much the counter should increase. " +
"For example, if multiple signal events are processed at upstream flow in batch oriented way, " +
"the number of events processed can be notified with this property at once.")
"the number of events processed can be notified with this property at once. " +
"Zero (0) has a special meaning, it clears target count back to 0, which is especially useful when used with Wait " +
Wait.RELEASABLE_FLOWFILE_COUNT.getDisplayName() + " = Zero (0) mode, to provide 'open-close-gate' type of flow control. " +
"One (1) can open a corresponding Wait processor, and Zero (0) can negate it as if closing a gate.")
.required(true)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
.expressionLanguageSupported(true)
@ -171,7 +174,8 @@ public class Notify extends AbstractProcessor {
int incrementDelta(final String counterName, final int delta) {
int current = deltas.containsKey(counterName) ? deltas.get(counterName) : 0;
int updated = current + delta;
// Zero (0) clears count.
int updated = delta == 0 ? 0 : current + delta;
deltas.put(counterName, updated);
return updated;
}

View File

@ -26,6 +26,11 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
@ -40,11 +45,13 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.expression.AttributeExpression.ResultType;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
@ -52,6 +59,10 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.WaitNotifyProtocol.Signal;
import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE;
import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_TERMINATE;
import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE;
@EventDriven
@SupportsBatching
@Tags({"map", "cache", "wait", "hold", "distributed", "signal", "release"})
@ -63,7 +74,7 @@ import org.apache.nifi.processors.standard.WaitNotifyProtocol.Signal;
+ "The release signal entry is then removed from the cache. Waiting FlowFiles will be routed to 'expired' if they exceed the Expiration Duration. "
+ "If you need to wait for more than one signal, specify the desired number of signals via the 'Target Signal Count' property. "
+ "This is particularly useful with processors that split a source flow file into multiple fragments, such as SplitText. "
+ "This is particularly useful with processors that split a source FlowFile into multiple fragments, such as SplitText. "
+ "In order to wait for all fragments to be processed, connect the 'original' relationship to a Wait processor, and the 'splits' relationship to "
+ "a corresponding Notify processor. Configure the Notify and Wait processors to use the '${fragment.identifier}' as the value "
+ "of 'Release Signal Identifier', and specify '${fragment.count}' as the value of 'Target Signal Count' in the Wait processor."
@ -125,11 +136,36 @@ public class Wait extends AbstractProcessor {
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor WAIT_BUFFER_COUNT = new PropertyDescriptor.Builder()
.name("wait-buffer-count")
.displayName("Wait Buffer Count")
.description("Specify the maximum number of incoming FlowFiles that can be buffered to check whether it can move forward. " +
"The more buffer can provide the better performance, as it reduces the number of interactions with cache service " +
"by grouping FlowFiles by signal identifier. " +
"Only a signal identifier can be processed at a processor execution.")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("1")
.build();
public static final PropertyDescriptor RELEASABLE_FLOWFILE_COUNT = new PropertyDescriptor.Builder()
.name("releasable-flowfile-count")
.displayName("Releasable FlowFile Count")
.description("A value, or the results of an Attribute Expression Language statement, which will " +
"be evaluated against a FlowFile in order to determine the releasable FlowFile count. " +
"This specifies how many FlowFiles can be released when a target count reaches target signal count. " +
"Zero (0) has a special meaning, any number of FlowFiles can be released as long as signal count matches target.")
.required(true)
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(true)
.defaultValue("1")
.build();
// Selects the FlowFile attribute or expression, whose value is used as cache key
public static final PropertyDescriptor EXPIRATION_DURATION = new PropertyDescriptor.Builder()
.name("expiration-duration")
.displayName("Expiration Duration")
.description("Indicates the duration after which waiting flow files will be routed to the 'expired' relationship")
.description("Indicates the duration after which waiting FlowFiles will be routed to the 'expired' relationship")
.required(true)
.defaultValue("10 min")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
@ -145,7 +181,7 @@ public class Wait extends AbstractProcessor {
public static final PropertyDescriptor ATTRIBUTE_COPY_MODE = new PropertyDescriptor.Builder()
.name("attribute-copy-mode")
.displayName("Attribute Copy Mode")
.description("Specifies how to handle attributes copied from flow files entering the Notify processor")
.description("Specifies how to handle attributes copied from FlowFiles entering the Notify processor")
.defaultValue(ATTRIBUTE_COPY_KEEP_ORIGINAL.getValue())
.required(true)
.allowableValues(ATTRIBUTE_COPY_REPLACE, ATTRIBUTE_COPY_KEEP_ORIGINAL)
@ -208,6 +244,8 @@ public class Wait extends AbstractProcessor {
descriptors.add(RELEASE_SIGNAL_IDENTIFIER);
descriptors.add(TARGET_SIGNAL_COUNT);
descriptors.add(SIGNAL_COUNTER_NAME);
descriptors.add(WAIT_BUFFER_COUNT);
descriptors.add(RELEASABLE_FLOWFILE_COUNT);
descriptors.add(EXPIRATION_DURATION);
descriptors.add(DISTRIBUTED_CACHE_SERVICE);
descriptors.add(ATTRIBUTE_COPY_MODE);
@ -223,21 +261,81 @@ public class Wait extends AbstractProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final ComponentLog logger = getLogger();
// Signal id is computed from attribute 'RELEASE_SIGNAL_IDENTIFIER' with expression language support
final String signalId = context.getProperty(RELEASE_SIGNAL_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue();
final PropertyValue signalIdProperty = context.getProperty(RELEASE_SIGNAL_IDENTIFIER);
final Integer bufferCount = context.getProperty(WAIT_BUFFER_COUNT).asInteger();
// if the computed value is null, or empty, we transfer the flow file to failure relationship
if (StringUtils.isBlank(signalId)) {
logger.error("FlowFile {} has no attribute for given Release Signal Identifier", new Object[] {flowFile});
final Map<Relationship, List<FlowFile>> processedFlowFiles = new HashMap<>();
final Function<Relationship, List<FlowFile>> getFlowFilesFor = r -> processedFlowFiles.computeIfAbsent(r, k -> new ArrayList<>());
final AtomicReference<String> targetSignalId = new AtomicReference<>();
final AtomicInteger bufferedCount = new AtomicInteger(0);
final List<FlowFile> failedFilteringFlowFiles = new ArrayList<>();
final Supplier<FlowFileFilter.FlowFileFilterResult> acceptResultSupplier =
() -> bufferedCount.incrementAndGet() == bufferCount ? ACCEPT_AND_TERMINATE : ACCEPT_AND_CONTINUE;
final List<FlowFile> flowFiles = session.get(f -> {
final String fSignalId = signalIdProperty.evaluateAttributeExpressions(f).getValue();
// if the computed value is null, or empty, we transfer the FlowFile to failure relationship
if (StringUtils.isBlank(fSignalId)) {
// We can't penalize f before getting it from session, so keep it in a temporal list.
logger.error("FlowFile {} has no attribute for given Release Signal Identifier", new Object[] {f});
failedFilteringFlowFiles.add(f);
return ACCEPT_AND_CONTINUE;
}
final String targetSignalIdStr = targetSignalId.get();
if (targetSignalIdStr == null) {
// This is the first one.
targetSignalId.set(fSignalId);
return acceptResultSupplier.get();
}
if (targetSignalIdStr.equals(fSignalId)) {
return acceptResultSupplier.get();
}
return REJECT_AND_CONTINUE;
});
final String attributeCopyMode = context.getProperty(ATTRIBUTE_COPY_MODE).getValue();
final boolean replaceOriginalAttributes = ATTRIBUTE_COPY_REPLACE.getValue().equals(attributeCopyMode);
final AtomicReference<Signal> signalRef = new AtomicReference<>();
final Consumer<FlowFile> transferToFailure = flowFile -> {
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
getFlowFilesFor.apply(REL_FAILURE).add(flowFile);
};
final Consumer<Entry<Relationship, List<FlowFile>>> transferFlowFiles = routedFlowFiles -> {
Relationship relationship = routedFlowFiles.getKey();
if (REL_WAIT.equals(relationship)) {
final String waitMode = context.getProperty(WAIT_MODE).getValue();
if (WAIT_MODE_KEEP_IN_UPSTREAM.getValue().equals(waitMode)) {
// Transfer to self.
relationship = Relationship.SELF;
}
}
final List<FlowFile> flowFilesWithSignalAttributes = routedFlowFiles.getValue().stream()
.map(f -> copySignalAttributes(session, f, signalRef.get(), replaceOriginalAttributes)).collect(Collectors.toList());
session.transfer(flowFilesWithSignalAttributes, relationship);
};
failedFilteringFlowFiles.forEach(f -> {
flowFiles.remove(f);
transferToFailure.accept(f);
});
if (flowFiles.isEmpty()) {
// If there was nothing but failed FlowFiles while filtering, transfer those and end immediately.
processedFlowFiles.entrySet().forEach(transferFlowFiles);
return;
}
@ -245,95 +343,131 @@ public class Wait extends AbstractProcessor {
final AtomicDistributedMapCacheClient cache = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(AtomicDistributedMapCacheClient.class);
final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);
String attributeCopyMode = context.getProperty(ATTRIBUTE_COPY_MODE).getValue();
final boolean replaceOriginalAttributes = ATTRIBUTE_COPY_REPLACE.getValue().equals(attributeCopyMode);
final String signalId = targetSignalId.get();
final Signal signal;
Signal signal = null;
// get notifying signal
try {
// get notifying signal
signal = protocol.getSignal(signalId);
signalRef.set(signal);
} catch (final IOException e) {
throw new ProcessException(String.format("Failed to get signal for %s due to %s", signalId, e), e);
}
// check for expiration
String targetCounterName = null;
long targetCount = 1;
int releasableFlowFileCount = 1;
final List<FlowFile> candidates = new ArrayList<>();
for (FlowFile flowFile : flowFiles) {
// Set wait start timestamp if it's not set yet
String waitStartTimestamp = flowFile.getAttribute(WAIT_START_TIMESTAMP);
if (waitStartTimestamp == null) {
waitStartTimestamp = String.valueOf(System.currentTimeMillis());
flowFile = session.putAttribute(flowFile, WAIT_START_TIMESTAMP, waitStartTimestamp);
}
long lWaitStartTimestamp = 0L;
long lWaitStartTimestamp;
try {
lWaitStartTimestamp = Long.parseLong(waitStartTimestamp);
} catch (NumberFormatException nfe) {
logger.error("{} has an invalid value '{}' on FlowFile {}", new Object[] {WAIT_START_TIMESTAMP, waitStartTimestamp, flowFile});
flowFile = session.penalize(flowFile);
flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes);
session.transfer(flowFile, REL_FAILURE);
return;
transferToFailure.accept(flowFile);
continue;
}
// check for expiration
long expirationDuration = context.getProperty(EXPIRATION_DURATION)
.asTimePeriod(TimeUnit.MILLISECONDS);
long now = System.currentTimeMillis();
if (now > (lWaitStartTimestamp + expirationDuration)) {
logger.info("FlowFile {} expired after {}ms", new Object[] {flowFile, (now - lWaitStartTimestamp)});
flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes);
session.transfer(flowFile, REL_EXPIRED);
return;
getFlowFilesFor.apply(REL_EXPIRED).add(flowFile);
continue;
}
// If there's no signal yet, then we don't have to evaluate target counts. Return immediately.
if (signal == null) {
// If there's no signal yet, then we don't have to evaluate target counts. Return immediately.
if (logger.isDebugEnabled()) {
logger.debug("No release signal yet for {} on FlowFile {}", new Object[] {signalId, flowFile});
logger.debug("No release signal found for {} on FlowFile {} yet", new Object[] {signalId, flowFile});
}
getFlowFilesFor.apply(REL_WAIT).add(flowFile);
continue;
}
// Fix target counter name and count from current FlowFile, if those are not set yet.
if (candidates.isEmpty()) {
targetCounterName = context.getProperty(SIGNAL_COUNTER_NAME).evaluateAttributeExpressions(flowFile).getValue();
try {
targetCount = Long.valueOf(context.getProperty(TARGET_SIGNAL_COUNT).evaluateAttributeExpressions(flowFile).getValue());
} catch (final NumberFormatException e) {
transferToFailure.accept(flowFile);
logger.error("Failed to parse targetCount when processing {} due to {}", new Object[] {flowFile, e}, e);
continue;
}
try {
releasableFlowFileCount = Integer.valueOf(context.getProperty(RELEASABLE_FLOWFILE_COUNT).evaluateAttributeExpressions(flowFile).getValue());
} catch (final NumberFormatException e) {
transferToFailure.accept(flowFile);
logger.error("Failed to parse releasableFlowFileCount when processing {} due to {}", new Object[] {flowFile, e}, e);
continue;
}
}
final String waitMode = context.getProperty(WAIT_MODE).getValue();
if (WAIT_MODE_TRANSFER_TO_WAIT.getValue().equals(waitMode)) {
session.transfer(flowFile, REL_WAIT);
} else if (WAIT_MODE_KEEP_IN_UPSTREAM.getValue().equals(waitMode)) {
// Transfer to self.
session.transfer(flowFile);
// FlowFile is now validated and added to candidates.
candidates.add(flowFile);
}
boolean waitCompleted = false;
boolean waitProgressed = false;
if (signal != null && !candidates.isEmpty()) {
if (releasableFlowFileCount > 1) {
signal.releaseCandidatese(targetCounterName, targetCount, releasableFlowFileCount, candidates,
released -> getFlowFilesFor.apply(REL_SUCCESS).addAll(released),
waiting -> getFlowFilesFor.apply(REL_WAIT).addAll(waiting));
waitProgressed = !getFlowFilesFor.apply(REL_SUCCESS).isEmpty();
} else {
// releasableFlowFileCount = 0 or 1
boolean reachedTargetCount = StringUtils.isBlank(targetCounterName)
? signal.isTotalCountReached(targetCount)
: signal.isCountReached(targetCounterName, targetCount);
if (reachedTargetCount) {
if (releasableFlowFileCount == 0) {
getFlowFilesFor.apply(REL_SUCCESS).addAll(candidates);
} else {
// releasableFlowFileCount = 1
getFlowFilesFor.apply(REL_SUCCESS).add(candidates.remove(0));
getFlowFilesFor.apply(REL_WAIT).addAll(candidates);
// If releasableFlowFileCount == 0, leave signal as it is,
// so that any number of FlowFile can be released as long as target count condition matches.
waitCompleted = true;
}
} else {
throw new ProcessException("Unsupported wait mode " + waitMode + " was specified.");
getFlowFilesFor.apply(REL_WAIT).addAll(candidates);
}
return;
}
}
final String targetCounterName = context.getProperty(SIGNAL_COUNTER_NAME).evaluateAttributeExpressions(flowFile).getValue();
final Long targetCount = Long.valueOf(context.getProperty(TARGET_SIGNAL_COUNT).evaluateAttributeExpressions(flowFile).getValue());
final boolean reachedToTargetCount = StringUtils.isBlank(targetCounterName)
? signal.isTotalCountReached(targetCount)
: signal.isCountReached(targetCounterName, targetCount);
// Transfer FlowFiles.
processedFlowFiles.entrySet().forEach(transferFlowFiles);
if (!reachedToTargetCount) {
if (logger.isDebugEnabled()) {
logger.debug("Release signal count {} hasn't reached {} for {} on FlowFile {}",
new Object[] {targetCounterName, targetCount, signalId, flowFile});
}
flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes);
session.transfer(flowFile, REL_WAIT);
return;
// Update signal if needed.
try {
if (waitCompleted) {
protocol.complete(signalId);
} else if (waitProgressed) {
protocol.replace(signal);
}
flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes);
session.transfer(flowFile, REL_SUCCESS);
protocol.complete(signalId);
} catch (final NumberFormatException e) {
flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
logger.error("Failed to parse targetCount when processing {} due to {}", new Object[] {flowFile, e});
} catch (final IOException e) {
flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
logger.error("Unable to communicate with cache when processing {} due to {}", new Object[] {flowFile, e});
session.rollback();
throw new ProcessException(String.format("Unable to communicate with cache while updating %s due to %s", signalId, e), e);
}
}
private FlowFile copySignalAttributes(final ProcessSession session, final FlowFile flowFile, final Signal signal, final boolean replaceOriginal) {
@ -341,13 +475,13 @@ public class Wait extends AbstractProcessor {
return flowFile;
}
// copy over attributes from release signal flow file, if provided
// copy over attributes from release signal FlowFile, if provided
final Map<String, String> attributesToCopy;
if (replaceOriginal) {
attributesToCopy = new HashMap<>(signal.getAttributes());
attributesToCopy.remove("uuid");
} else {
// if the current flow file does *not* have the cached attribute, copy it
// if the current FlowFile does *not* have the cached attribute, copy it
attributesToCopy = signal.getAttributes().entrySet().stream()
.filter(e -> flowFile.getAttribute(e.getKey()) == null)
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));

View File

@ -31,7 +31,9 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
/**
* This class provide a protocol for Wait and Notify processors to work together.
@ -51,8 +53,16 @@ public class WaitNotifyProtocol {
private final Deserializer<String> stringDeserializer = input -> new String(input, StandardCharsets.UTF_8);
public static class Signal {
/*
* Getter and Setter methods are needed to (de)serialize JSON even if it's not used from app code.
*/
transient private String identifier;
transient private long revision = -1;
private Map<String, Long> counts = new HashMap<>();
private Map<String, String> attributes = new HashMap<>();
private int releasableCount = 0;
public Map<String, Long> getCounts() {
return counts;
@ -84,6 +94,54 @@ public class WaitNotifyProtocol {
return count != null ? count : 0;
}
public int getReleasableCount() {
return releasableCount;
}
public void setReleasableCount(int releasableCount) {
this.releasableCount = releasableCount;
}
/**
* <p>Consume accumulated notification signals to let some waiting candidates get released.</p>
*
* <p>This method updates state of this instance, but does not update cache storage.
* Caller of this method is responsible for updating cache storage after processing released and waiting candidates
* by calling {@link #replace(Signal)}. Caller should rollback what it processed with these candidates if complete call failed.</p>
*
* @param _counterName signal counter name to consume from.
* @param requiredCountForPass number of required signals to acquire a pass.
* @param releasableCandidateCountPerPass number of releasable candidate per pass.
* @param candidates candidates waiting for being allowed to pass.
* @param released function to process allowed candidates to pass.
* @param waiting function to process candidates those should remain in waiting queue.
* @param <E> Type of candidate
*/
public <E> void releaseCandidatese(final String _counterName, final long requiredCountForPass,
final int releasableCandidateCountPerPass, final List<E> candidates,
final Consumer<List<E>> released, final Consumer<List<E>> waiting) {
// counterName is mandatory otherwise, we can't decide which counter to convert into pass count.
final String counterName = _counterName == null || _counterName.length() == 0 ? DEFAULT_COUNT_NAME : _counterName;
final int candidateSize = candidates.size();
if (releasableCount < candidateSize) {
// If current passCount is not enough for the candidate size, then try to get more.
// Convert notification signals to pass ticket.
final long signalCount = getCount(counterName);
releasableCount += (signalCount / requiredCountForPass) * releasableCandidateCountPerPass;
final long reducedSignalCount = signalCount % requiredCountForPass;
counts.put(counterName, reducedSignalCount);
}
int releaseCount = Math.min(releasableCount, candidateSize);
released.accept(candidates.subList(0, releaseCount));
waiting.accept(candidates.subList(releaseCount, candidateSize));
releasableCount -= releaseCount;
}
}
private final AtomicDistributedMapCacheClient cache;
@ -95,7 +153,7 @@ public class WaitNotifyProtocol {
/**
* Notify a signal to increase a counter.
* @param signalId a key in the underlying cache engine
* @param deltas a map containing counterName and delta entries
* @param deltas a map containing counterName and delta entries, 0 has special meaning, clears the counter back to 0
* @param attributes attributes to save in the cache entry
* @return A Signal instance, merged with an existing signal if any
* @throws IOException thrown when it failed interacting with the cache engine
@ -106,10 +164,9 @@ public class WaitNotifyProtocol {
for (int i = 0; i < MAX_REPLACE_RETRY_COUNT; i++) {
final CacheEntry<String, String> existingEntry = cache.fetch(signalId, stringSerializer, stringDeserializer);
final Signal existingSignal = getSignal(signalId);
final Signal signal = existingSignal != null ? existingSignal : new Signal();
signal.identifier = signalId;
if (attributes != null) {
signal.attributes.putAll(attributes);
@ -117,15 +174,11 @@ public class WaitNotifyProtocol {
deltas.forEach((counterName, delta) -> {
long count = signal.counts.containsKey(counterName) ? signal.counts.get(counterName) : 0;
count += delta;
count = delta == 0 ? 0 : count + delta;
signal.counts.put(counterName, count);
});
final String signalJson = objectMapper.writeValueAsString(signal);
final long revision = existingEntry != null ? existingEntry.getRevision() : -1;
if (cache.replace(signalId, signalJson, stringSerializer, stringSerializer, revision)) {
if (replace(signal)) {
return signal;
}
@ -148,7 +201,7 @@ public class WaitNotifyProtocol {
* Notify a signal to increase a counter.
* @param signalId a key in the underlying cache engine
* @param counterName specify count to update
* @param delta delta to update a counter
* @param delta delta to update a counter, 0 has special meaning, clears the counter back to 0
* @param attributes attributes to save in the cache entry
* @return A Signal instance, merged with an existing signal if any
* @throws IOException thrown when it failed interacting with the cache engine
@ -184,12 +237,16 @@ public class WaitNotifyProtocol {
final String value = entry.getValue();
try {
return objectMapper.readValue(value, Signal.class);
final Signal signal = objectMapper.readValue(value, Signal.class);
signal.identifier = signalId;
signal.revision = entry.getRevision();
return signal;
} catch (final JsonParseException jsonE) {
// Try to read it as FlowFileAttributes for backward compatibility.
try {
final Map<String, String> attributes = new FlowFileAttributesSerializer().deserialize(value.getBytes(StandardCharsets.UTF_8));
final Signal signal = new Signal();
signal.identifier = signalId;
signal.setAttributes(attributes);
signal.getCounts().put(DEFAULT_COUNT_NAME, 1L);
return signal;
@ -209,4 +266,11 @@ public class WaitNotifyProtocol {
public void complete(final String signalId) throws IOException {
cache.remove(signalId, stringSerializer);
}
public boolean replace(final Signal signal) throws IOException {
final String signalJson = objectMapper.writeValueAsString(signal);
return cache.replace(signal.identifier, signalJson, stringSerializer, stringSerializer, signal.revision);
}
}

View File

@ -19,12 +19,22 @@ package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.IntStream;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.standard.TestNotify.MockCacheClient;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
@ -180,14 +190,15 @@ public class TestWait {
final Map<String, String> props = new HashMap<>();
props.put("releaseSignalAttribute", "2");
runner.enqueue(new byte[]{}, props);
runner.run();
//Expect the processor to receive an IO exception from the cache service and route to failure
runner.assertAllFlowFilesTransferred(Wait.REL_FAILURE, 1);
runner.assertTransferCount(Wait.REL_FAILURE, 1);
runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0).assertAttributeNotExists("wait.counter.total");
service.setFailOnCalls(false);
try {
runner.run();
fail("Expect the processor to receive an IO exception from the cache service and throws ProcessException.");
} catch (final AssertionError e) {
assertTrue(e.getCause() instanceof ProcessException);
assertTrue(e.getCause().getCause() instanceof IOException);
} finally {
service.setFailOnCalls(false);
}
}
@Test
@ -360,7 +371,6 @@ public class TestWait {
assertNull("The key no longer exist", protocol.getSignal("key"));
}
@Test
public void testWaitForSpecificCount() throws InitializationException, IOException {
Map<String, String> cachedAttributes = new HashMap<>();
@ -455,4 +465,222 @@ public class TestWait {
}
private class TestIteration {
final List<MockFlowFile> released = new ArrayList<>();
final List<MockFlowFile> waiting = new ArrayList<>();
final List<MockFlowFile> failed = new ArrayList<>();
final List<String> expectedReleased = new ArrayList<>();
final List<String> expectedWaiting = new ArrayList<>();
final List<String> expectedFailed = new ArrayList<>();
void run() {
released.clear();
waiting.clear();
failed.clear();
runner.run();
released.addAll(runner.getFlowFilesForRelationship(Wait.REL_SUCCESS));
waiting.addAll(runner.getFlowFilesForRelationship(Wait.REL_WAIT));
failed.addAll(runner.getFlowFilesForRelationship(Wait.REL_FAILURE));
assertEquals(expectedReleased.size(), released.size());
assertEquals(expectedWaiting.size(), waiting.size());
assertEquals(expectedFailed.size(), failed.size());
final BiConsumer<List<String>, List<MockFlowFile>> assertContents = (expected, actual) -> {
for (int i = 0; i < expected.size(); i++) {
actual.get(i).assertContentEquals(expected.get(i));
}
};
assertContents.accept(expectedReleased, released);
assertContents.accept(expectedWaiting, waiting);
assertContents.accept(expectedFailed, failed);
runner.clearTransferState();
expectedReleased.clear();
expectedWaiting.clear();
expectedFailed.clear();
}
}
@Test
public void testWaitBufferCount() throws InitializationException, IOException {
Map<String, String> cachedAttributes = new HashMap<>();
cachedAttributes.put("notified", "notified-value");
// Setup existing cache entry.
final WaitNotifyProtocol protocol = new WaitNotifyProtocol(service);
runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
runner.setProperty(Wait.TARGET_SIGNAL_COUNT, "${targetSignalCount}");
runner.setProperty(Wait.SIGNAL_COUNTER_NAME, "${signalCounterName}");
runner.setProperty(Wait.WAIT_BUFFER_COUNT, "2");
final Map<String, String> waitAttributesA = new HashMap<>();
waitAttributesA.put("releaseSignalAttribute", "key-A");
waitAttributesA.put("targetSignalCount", "1");
waitAttributesA.put("signalCounterName", "counter");
final Map<String, String> waitAttributesB = new HashMap<>();
waitAttributesB.put("releaseSignalAttribute", "key-B");
waitAttributesB.put("targetSignalCount", "3");
waitAttributesB.put("signalCounterName", "counter");
final Map<String, String> waitAttributesBInvalid = new HashMap<>();
waitAttributesBInvalid.putAll(waitAttributesB);
waitAttributesBInvalid.remove("releaseSignalAttribute");
final TestIteration testIteration = new TestIteration();
// Enqueue multiple wait FlowFiles.
runner.enqueue("1".getBytes(), waitAttributesB); // Should be picked at 1st and 2nd itr
runner.enqueue("2".getBytes(), waitAttributesA); // Should be picked at 3rd itr
runner.enqueue("3".getBytes(), waitAttributesBInvalid);
runner.enqueue("4".getBytes(), waitAttributesA); // Should be picked at 3rd itr
runner.enqueue("5".getBytes(), waitAttributesB); // Should be picked at 1st
runner.enqueue("6".getBytes(), waitAttributesB); // Should be picked at 2nd itr
/*
* 1st run:
* pick 1 key-B
* skip 2 cause key-A
* skip 3 cause invalid
* skip 4 cause key-A
* pick 5 key-B
*/
testIteration.expectedWaiting.addAll(Arrays.asList("1", "5")); // Picked, but not enough counter.
testIteration.expectedFailed.add("3"); // invalid.
testIteration.run();
/*
* 2nd run:
* pick 6 key-B
* pick 1 key-B
*/
protocol.notify("key-B", "counter", 3, cachedAttributes);
testIteration.expectedReleased.add("6");
testIteration.expectedWaiting.add("1"); // Picked but only one FlowFile can be released.
// enqueue waiting, simulating wait relationship back to self
testIteration.waiting.forEach(f -> runner.enqueue(f));
testIteration.run();
}
@Test
public void testReleaseMultipleFlowFiles() throws InitializationException, IOException {
Map<String, String> cachedAttributes = new HashMap<>();
cachedAttributes.put("notified", "notified-value");
// Setup existing cache entry.
final WaitNotifyProtocol protocol = new WaitNotifyProtocol(service);
runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
runner.setProperty(Wait.TARGET_SIGNAL_COUNT, "${targetSignalCount}");
runner.setProperty(Wait.SIGNAL_COUNTER_NAME, "${signalCounterName}");
runner.setProperty(Wait.WAIT_BUFFER_COUNT, "2");
runner.setProperty(Wait.RELEASABLE_FLOWFILE_COUNT, "${fragmentCount}");
final Map<String, String> waitAttributes = new HashMap<>();
waitAttributes.put("releaseSignalAttribute", "key");
waitAttributes.put("targetSignalCount", "3");
waitAttributes.put("signalCounterName", "counter");
waitAttributes.put("fragmentCount", "6");
final TestIteration testIteration = new TestIteration();
// Enqueue 6 wait FlowFiles. 1,2,3,4,5,6
IntStream.range(1, 7).forEach(i -> runner.enqueue(String.valueOf(i).getBytes(), waitAttributes));
/*
* 1st run
*/
testIteration.expectedWaiting.addAll(Arrays.asList("1", "2"));
testIteration.run();
WaitNotifyProtocol.Signal signal = protocol.getSignal("key");
assertNull(signal);
/*
* 2nd run
*/
protocol.notify("key", "counter", 3, cachedAttributes);
testIteration.expectedReleased.addAll(Arrays.asList("3", "4"));
testIteration.waiting.forEach(f -> runner.enqueue(f));
testIteration.run();
signal = protocol.getSignal("key");
assertEquals(0, signal.getCount("count"));
assertEquals(4, signal.getReleasableCount());
/*
* 3rd run
*/
testIteration.expectedReleased.addAll(Arrays.asList("5", "6"));
testIteration.waiting.forEach(f -> runner.enqueue(f));
testIteration.run();
signal = protocol.getSignal("key");
assertEquals(0, signal.getCount("count"));
assertEquals(2, signal.getReleasableCount());
}
@Test
public void testOpenGate() throws InitializationException, IOException {
Map<String, String> cachedAttributes = new HashMap<>();
cachedAttributes.put("notified", "notified-value");
// Setup existing cache entry.
final WaitNotifyProtocol protocol = new WaitNotifyProtocol(service);
runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
runner.setProperty(Wait.TARGET_SIGNAL_COUNT, "${targetSignalCount}");
runner.setProperty(Wait.SIGNAL_COUNTER_NAME, "${signalCounterName}");
runner.setProperty(Wait.WAIT_BUFFER_COUNT, "2");
runner.setProperty(Wait.RELEASABLE_FLOWFILE_COUNT, "0"); // Leave gate open
final Map<String, String> waitAttributes = new HashMap<>();
waitAttributes.put("releaseSignalAttribute", "key");
waitAttributes.put("targetSignalCount", "3");
waitAttributes.put("signalCounterName", "counter");
final TestIteration testIteration = new TestIteration();
// Enqueue 6 wait FlowFiles. 1,2,3,4,5,6
IntStream.range(1, 7).forEach(i -> runner.enqueue(String.valueOf(i).getBytes(), waitAttributes));
/*
* 1st run
*/
testIteration.expectedWaiting.addAll(Arrays.asList("1", "2"));
testIteration.run();
WaitNotifyProtocol.Signal signal = protocol.getSignal("key");
assertNull(signal);
/*
* 2nd run
*/
protocol.notify("key", "counter", 3, cachedAttributes);
testIteration.expectedReleased.addAll(Arrays.asList("3", "4"));
testIteration.waiting.forEach(f -> runner.enqueue(f));
testIteration.run();
signal = protocol.getSignal("key");
assertEquals(3, signal.getCount("counter"));
assertEquals(0, signal.getReleasableCount());
/*
* 3rd run
*/
testIteration.expectedReleased.addAll(Arrays.asList("5", "6"));
testIteration.waiting.forEach(f -> runner.enqueue(f));
testIteration.run();
signal = protocol.getSignal("key");
assertEquals(3, signal.getCount("counter"));
assertEquals(0, signal.getReleasableCount());
}
}

View File

@ -27,11 +27,18 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.stubbing.Answer;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.apache.nifi.processors.standard.WaitNotifyProtocol.DEFAULT_COUNT_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@ -102,7 +109,7 @@ public class TestWaitNotifyProtocol {
final CacheEntry<String, String> cacheEntry = cacheEntries.get("signal-id");
assertEquals(0, cacheEntry.getRevision());
assertEquals("{\"counts\":{\"a\":1},\"attributes\":{}}", cacheEntry.getValue());
assertEquals("{\"counts\":{\"a\":1},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue());
}
@Test
@ -119,20 +126,20 @@ public class TestWaitNotifyProtocol {
CacheEntry<String, String> cacheEntry = cacheEntries.get("signal-id");
assertEquals(1, cacheEntry.getRevision());
assertEquals("{\"counts\":{\"a\":2},\"attributes\":{}}", cacheEntry.getValue());
assertEquals("{\"counts\":{\"a\":2},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue());
protocol.notify(signalId, "a", 10, null);
cacheEntry = cacheEntries.get("signal-id");
assertEquals(2, cacheEntry.getRevision());
assertEquals("{\"counts\":{\"a\":12},\"attributes\":{}}", cacheEntry.getValue());
assertEquals("{\"counts\":{\"a\":12},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue());
protocol.notify(signalId, "b", 2, null);
protocol.notify(signalId, "c", 3, null);
cacheEntry = cacheEntries.get("signal-id");
assertEquals(4, cacheEntry.getRevision());
assertEquals("{\"counts\":{\"a\":12,\"b\":2,\"c\":3},\"attributes\":{}}", cacheEntry.getValue());
assertEquals("{\"counts\":{\"a\":12,\"b\":2,\"c\":3},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue());
final Map<String, Integer> deltas = new HashMap<>();
deltas.put("a", 10);
@ -141,7 +148,13 @@ public class TestWaitNotifyProtocol {
cacheEntry = cacheEntries.get("signal-id");
assertEquals(5, cacheEntry.getRevision());
assertEquals("{\"counts\":{\"a\":22,\"b\":27,\"c\":3},\"attributes\":{}}", cacheEntry.getValue());
assertEquals("{\"counts\":{\"a\":22,\"b\":27,\"c\":3},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue());
// Zero clear 'b'.
protocol.notify("signal-id", "b", 0, null);
cacheEntry = cacheEntries.get("signal-id");
assertEquals(6, cacheEntry.getRevision());
assertEquals("{\"counts\":{\"a\":22,\"b\":0,\"c\":3},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue());
}
@ -161,7 +174,7 @@ public class TestWaitNotifyProtocol {
CacheEntry<String, String> cacheEntry = cacheEntries.get("signal-id");
assertEquals(0, cacheEntry.getRevision());
assertEquals("{\"counts\":{\"a\":1},\"attributes\":{\"p1\":\"a1\",\"p2\":\"a1\"}}", cacheEntry.getValue());
assertEquals("{\"counts\":{\"a\":1},\"attributes\":{\"p1\":\"a1\",\"p2\":\"a1\"},\"releasableCount\":0}", cacheEntry.getValue());
final Map<String, String> attributeA2 = new HashMap<>();
attributeA2.put("p2", "a2"); // Update p2
@ -173,7 +186,7 @@ public class TestWaitNotifyProtocol {
cacheEntry = cacheEntries.get("signal-id");
assertEquals(1, cacheEntry.getRevision());
assertEquals("Updated attributes should be merged correctly",
"{\"counts\":{\"a\":2},\"attributes\":{\"p1\":\"a1\",\"p2\":\"a2\",\"p3\":\"a2\"}}", cacheEntry.getValue());
"{\"counts\":{\"a\":2},\"attributes\":{\"p1\":\"a1\",\"p2\":\"a2\",\"p3\":\"a2\"},\"releasableCount\":0}", cacheEntry.getValue());
}
@ -237,7 +250,7 @@ public class TestWaitNotifyProtocol {
final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);
final Signal signal = protocol.getSignal(signalId);
assertEquals(1, signal.getCount(WaitNotifyProtocol.DEFAULT_COUNT_NAME));
assertEquals(1, signal.getCount(DEFAULT_COUNT_NAME));
assertEquals("value1", signal.getAttributes().get("key1"));
assertEquals("value2", signal.getAttributes().get("key2"));
assertEquals("value3", signal.getAttributes().get("key3"));
@ -251,4 +264,81 @@ public class TestWaitNotifyProtocol {
}
@Test
public void testReleaseCandidate() throws Exception {
final List<Integer> candidates = IntStream.range(0, 10).boxed().collect(Collectors.toList());
final Signal signal = new Signal();
final List<Integer> released = new ArrayList<>();
final List<Integer> waiting = new ArrayList<>();
// Test default name.
final String counterName = null;
final BiConsumer<Long, Integer> releaseCandidate = (requiredCountForPass, releasableCandidatePerPass) -> {
released.clear();
waiting.clear();
signal.releaseCandidatese(counterName, requiredCountForPass, releasableCandidatePerPass, candidates,
r -> released.addAll(r), w -> waiting.addAll(w));
};
final Field releasableCount = Signal.class.getDeclaredField("releasableCount");
releasableCount.setAccessible(true);
// No counter, should wait.
releaseCandidate.accept(3L, 1);
assertEquals(0, released.size());
assertEquals(10, waiting.size());
assertEquals(0, signal.getCount(DEFAULT_COUNT_NAME));
assertEquals(0, releasableCount.getInt(signal));
// Counter is not enough yet.
signal.getCounts().put(DEFAULT_COUNT_NAME, 1L);
releaseCandidate.accept(3L, 1);
assertEquals(0, released.size());
assertEquals(10, waiting.size());
assertEquals(1, signal.getCount(DEFAULT_COUNT_NAME)); // Counter incremented, but not enough
assertEquals(0, releasableCount.getInt(signal));
// Counter reached the target.
signal.getCounts().put(DEFAULT_COUNT_NAME, 3L);
releaseCandidate.accept(3L, 1);
assertEquals(1, released.size());
assertEquals(9, waiting.size());
assertEquals(0, signal.getCount(DEFAULT_COUNT_NAME)); // Counter 3 was converted into 1 release
assertEquals(0, releasableCount.getInt(signal));
// Counter reached the target for two candidates.
signal.getCounts().put(DEFAULT_COUNT_NAME, 6L);
releaseCandidate.accept(3L, 1);
assertEquals(2, released.size());
assertEquals(8, waiting.size());
assertEquals(0, signal.getCount(DEFAULT_COUNT_NAME)); // Counter 3 was converted into 1 release
assertEquals(0, releasableCount.getInt(signal));
// Counter reached the target for two candidates, and reminder is 2.
signal.getCounts().put(DEFAULT_COUNT_NAME, 11L);
releaseCandidate.accept(3L, 1);
assertEquals(3, released.size()); // 11 / 3 = 3
assertEquals(7, waiting.size());
assertEquals(2, signal.getCount(DEFAULT_COUNT_NAME)); // 11 % 3 = 2
assertEquals(0, releasableCount.getInt(signal));
// Counter reached the target for two pass count and each pass can release 2 candidates.
signal.getCounts().put(DEFAULT_COUNT_NAME, 6L);
releaseCandidate.accept(3L, 2);
assertEquals(4, released.size()); // (6 / 3) * 2 = 4
assertEquals(6, waiting.size());
assertEquals(0, signal.getCount(DEFAULT_COUNT_NAME)); // 6 % 3 = 0
assertEquals(0, releasableCount.getInt(signal));
// If there are counts more than enough to release current candidates, unused releasableCount should remain.
signal.getCounts().put(DEFAULT_COUNT_NAME, 50L);
releaseCandidate.accept(3L, 2);
assertEquals(10, released.size()); // (50 / 3) * 2 = 32. Used 10.
assertEquals(0, waiting.size());
assertEquals(2, signal.getCount(DEFAULT_COUNT_NAME)); // 50 % 3 = 2.
assertEquals(22, releasableCount.getInt(signal)); // 32 - 10 = 22.
}
}