From 000414e7eaa4c4f459779e73c331f3021f9a5049 Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Thu, 2 Mar 2017 22:54:46 +0900 Subject: [PATCH] NIFI-3545: Release M FlowFilews once N signals arrive - Support multiplle incoming FlowFiles to Wait processor, up to Wait Buffer Count - Added Releasable FlowFile Count, which controls how many FlowFiles can be released when wait condition is met - Added special meaning to Notify delta Zero(0) to clear a signal counter back to zero This closes #1554 Signed-off-by: Aldrin Piri --- .../nifi/processors/standard/Notify.java | 8 +- .../apache/nifi/processors/standard/Wait.java | 270 +++++++++++++----- .../standard/WaitNotifyProtocol.java | 86 +++++- .../nifi/processors/standard/TestWait.java | 246 +++++++++++++++- .../standard/TestWaitNotifyProtocol.java | 106 ++++++- 5 files changed, 618 insertions(+), 98 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java index 155d131644..346f1fb04b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java @@ -97,7 +97,10 @@ public class Notify extends AbstractProcessor { "be evaluated against a FlowFile in order to determine the signal counter delta. " + "Specify how much the counter should increase. " + "For example, if multiple signal events are processed at upstream flow in batch oriented way, " + - "the number of events processed can be notified with this property at once.") + "the number of events processed can be notified with this property at once. " + + "Zero (0) has a special meaning, it clears target count back to 0, which is especially useful when used with Wait " + + Wait.RELEASABLE_FLOWFILE_COUNT.getDisplayName() + " = Zero (0) mode, to provide 'open-close-gate' type of flow control. " + + "One (1) can open a corresponding Wait processor, and Zero (0) can negate it as if closing a gate.") .required(true) .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true)) .expressionLanguageSupported(true) @@ -171,7 +174,8 @@ public class Notify extends AbstractProcessor { int incrementDelta(final String counterName, final int delta) { int current = deltas.containsKey(counterName) ? deltas.get(counterName) : 0; - int updated = current + delta; + // Zero (0) clears count. + int updated = delta == 0 ? 0 : current + delta; deltas.put(counterName, updated); return updated; } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java index 8b76ce54ed..fccd443817 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java @@ -26,6 +26,11 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; @@ -40,11 +45,13 @@ import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; import org.apache.nifi.expression.AttributeExpression.ResultType; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.FlowFileFilter; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; @@ -52,6 +59,10 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.standard.WaitNotifyProtocol.Signal; +import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE; +import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_TERMINATE; +import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE; + @EventDriven @SupportsBatching @Tags({"map", "cache", "wait", "hold", "distributed", "signal", "release"}) @@ -63,7 +74,7 @@ import org.apache.nifi.processors.standard.WaitNotifyProtocol.Signal; + "The release signal entry is then removed from the cache. Waiting FlowFiles will be routed to 'expired' if they exceed the Expiration Duration. " + "If you need to wait for more than one signal, specify the desired number of signals via the 'Target Signal Count' property. " - + "This is particularly useful with processors that split a source flow file into multiple fragments, such as SplitText. " + + "This is particularly useful with processors that split a source FlowFile into multiple fragments, such as SplitText. " + "In order to wait for all fragments to be processed, connect the 'original' relationship to a Wait processor, and the 'splits' relationship to " + "a corresponding Notify processor. Configure the Notify and Wait processors to use the '${fragment.identifier}' as the value " + "of 'Release Signal Identifier', and specify '${fragment.count}' as the value of 'Target Signal Count' in the Wait processor." @@ -125,11 +136,36 @@ public class Wait extends AbstractProcessor { .expressionLanguageSupported(true) .build(); + public static final PropertyDescriptor WAIT_BUFFER_COUNT = new PropertyDescriptor.Builder() + .name("wait-buffer-count") + .displayName("Wait Buffer Count") + .description("Specify the maximum number of incoming FlowFiles that can be buffered to check whether it can move forward. " + + "The more buffer can provide the better performance, as it reduces the number of interactions with cache service " + + "by grouping FlowFiles by signal identifier. " + + "Only a signal identifier can be processed at a processor execution.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("1") + .build(); + + public static final PropertyDescriptor RELEASABLE_FLOWFILE_COUNT = new PropertyDescriptor.Builder() + .name("releasable-flowfile-count") + .displayName("Releasable FlowFile Count") + .description("A value, or the results of an Attribute Expression Language statement, which will " + + "be evaluated against a FlowFile in order to determine the releasable FlowFile count. " + + "This specifies how many FlowFiles can be released when a target count reaches target signal count. " + + "Zero (0) has a special meaning, any number of FlowFiles can be released as long as signal count matches target.") + .required(true) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(true) + .defaultValue("1") + .build(); + // Selects the FlowFile attribute or expression, whose value is used as cache key public static final PropertyDescriptor EXPIRATION_DURATION = new PropertyDescriptor.Builder() .name("expiration-duration") .displayName("Expiration Duration") - .description("Indicates the duration after which waiting flow files will be routed to the 'expired' relationship") + .description("Indicates the duration after which waiting FlowFiles will be routed to the 'expired' relationship") .required(true) .defaultValue("10 min") .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) @@ -145,7 +181,7 @@ public class Wait extends AbstractProcessor { public static final PropertyDescriptor ATTRIBUTE_COPY_MODE = new PropertyDescriptor.Builder() .name("attribute-copy-mode") .displayName("Attribute Copy Mode") - .description("Specifies how to handle attributes copied from flow files entering the Notify processor") + .description("Specifies how to handle attributes copied from FlowFiles entering the Notify processor") .defaultValue(ATTRIBUTE_COPY_KEEP_ORIGINAL.getValue()) .required(true) .allowableValues(ATTRIBUTE_COPY_REPLACE, ATTRIBUTE_COPY_KEEP_ORIGINAL) @@ -208,6 +244,8 @@ public class Wait extends AbstractProcessor { descriptors.add(RELEASE_SIGNAL_IDENTIFIER); descriptors.add(TARGET_SIGNAL_COUNT); descriptors.add(SIGNAL_COUNTER_NAME); + descriptors.add(WAIT_BUFFER_COUNT); + descriptors.add(RELEASABLE_FLOWFILE_COUNT); descriptors.add(EXPIRATION_DURATION); descriptors.add(DISTRIBUTED_CACHE_SERVICE); descriptors.add(ATTRIBUTE_COPY_MODE); @@ -223,21 +261,81 @@ public class Wait extends AbstractProcessor { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - FlowFile flowFile = session.get(); - if (flowFile == null) { - return; - } - final ComponentLog logger = getLogger(); // Signal id is computed from attribute 'RELEASE_SIGNAL_IDENTIFIER' with expression language support - final String signalId = context.getProperty(RELEASE_SIGNAL_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue(); + final PropertyValue signalIdProperty = context.getProperty(RELEASE_SIGNAL_IDENTIFIER); + final Integer bufferCount = context.getProperty(WAIT_BUFFER_COUNT).asInteger(); - // if the computed value is null, or empty, we transfer the flow file to failure relationship - if (StringUtils.isBlank(signalId)) { - logger.error("FlowFile {} has no attribute for given Release Signal Identifier", new Object[] {flowFile}); + final Map> processedFlowFiles = new HashMap<>(); + final Function> getFlowFilesFor = r -> processedFlowFiles.computeIfAbsent(r, k -> new ArrayList<>()); + + final AtomicReference targetSignalId = new AtomicReference<>(); + final AtomicInteger bufferedCount = new AtomicInteger(0); + final List failedFilteringFlowFiles = new ArrayList<>(); + final Supplier acceptResultSupplier = + () -> bufferedCount.incrementAndGet() == bufferCount ? ACCEPT_AND_TERMINATE : ACCEPT_AND_CONTINUE; + final List flowFiles = session.get(f -> { + + final String fSignalId = signalIdProperty.evaluateAttributeExpressions(f).getValue(); + + // if the computed value is null, or empty, we transfer the FlowFile to failure relationship + if (StringUtils.isBlank(fSignalId)) { + // We can't penalize f before getting it from session, so keep it in a temporal list. + logger.error("FlowFile {} has no attribute for given Release Signal Identifier", new Object[] {f}); + failedFilteringFlowFiles.add(f); + return ACCEPT_AND_CONTINUE; + } + + final String targetSignalIdStr = targetSignalId.get(); + if (targetSignalIdStr == null) { + // This is the first one. + targetSignalId.set(fSignalId); + return acceptResultSupplier.get(); + } + + if (targetSignalIdStr.equals(fSignalId)) { + return acceptResultSupplier.get(); + } + + return REJECT_AND_CONTINUE; + + }); + + final String attributeCopyMode = context.getProperty(ATTRIBUTE_COPY_MODE).getValue(); + final boolean replaceOriginalAttributes = ATTRIBUTE_COPY_REPLACE.getValue().equals(attributeCopyMode); + final AtomicReference signalRef = new AtomicReference<>(); + + final Consumer transferToFailure = flowFile -> { flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_FAILURE); + getFlowFilesFor.apply(REL_FAILURE).add(flowFile); + }; + + final Consumer>> transferFlowFiles = routedFlowFiles -> { + Relationship relationship = routedFlowFiles.getKey(); + + if (REL_WAIT.equals(relationship)) { + final String waitMode = context.getProperty(WAIT_MODE).getValue(); + + if (WAIT_MODE_KEEP_IN_UPSTREAM.getValue().equals(waitMode)) { + // Transfer to self. + relationship = Relationship.SELF; + } + } + + final List flowFilesWithSignalAttributes = routedFlowFiles.getValue().stream() + .map(f -> copySignalAttributes(session, f, signalRef.get(), replaceOriginalAttributes)).collect(Collectors.toList()); + session.transfer(flowFilesWithSignalAttributes, relationship); + }; + + failedFilteringFlowFiles.forEach(f -> { + flowFiles.remove(f); + transferToFailure.accept(f); + }); + + if (flowFiles.isEmpty()) { + // If there was nothing but failed FlowFiles while filtering, transfer those and end immediately. + processedFlowFiles.entrySet().forEach(transferFlowFiles); return; } @@ -245,95 +343,131 @@ public class Wait extends AbstractProcessor { final AtomicDistributedMapCacheClient cache = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(AtomicDistributedMapCacheClient.class); final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache); - String attributeCopyMode = context.getProperty(ATTRIBUTE_COPY_MODE).getValue(); - final boolean replaceOriginalAttributes = ATTRIBUTE_COPY_REPLACE.getValue().equals(attributeCopyMode); + final String signalId = targetSignalId.get(); + final Signal signal; - Signal signal = null; + // get notifying signal try { - // get notifying signal signal = protocol.getSignal(signalId); + signalRef.set(signal); + } catch (final IOException e) { + throw new ProcessException(String.format("Failed to get signal for %s due to %s", signalId, e), e); + } - // check for expiration + String targetCounterName = null; + long targetCount = 1; + int releasableFlowFileCount = 1; + + final List candidates = new ArrayList<>(); + + for (FlowFile flowFile : flowFiles) { + // Set wait start timestamp if it's not set yet String waitStartTimestamp = flowFile.getAttribute(WAIT_START_TIMESTAMP); if (waitStartTimestamp == null) { waitStartTimestamp = String.valueOf(System.currentTimeMillis()); flowFile = session.putAttribute(flowFile, WAIT_START_TIMESTAMP, waitStartTimestamp); } - long lWaitStartTimestamp = 0L; + long lWaitStartTimestamp; try { lWaitStartTimestamp = Long.parseLong(waitStartTimestamp); } catch (NumberFormatException nfe) { logger.error("{} has an invalid value '{}' on FlowFile {}", new Object[] {WAIT_START_TIMESTAMP, waitStartTimestamp, flowFile}); - flowFile = session.penalize(flowFile); - - flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes); - session.transfer(flowFile, REL_FAILURE); - return; + transferToFailure.accept(flowFile); + continue; } + + // check for expiration long expirationDuration = context.getProperty(EXPIRATION_DURATION) .asTimePeriod(TimeUnit.MILLISECONDS); long now = System.currentTimeMillis(); if (now > (lWaitStartTimestamp + expirationDuration)) { logger.info("FlowFile {} expired after {}ms", new Object[] {flowFile, (now - lWaitStartTimestamp)}); - flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes); - session.transfer(flowFile, REL_EXPIRED); - return; + getFlowFilesFor.apply(REL_EXPIRED).add(flowFile); + continue; } + // If there's no signal yet, then we don't have to evaluate target counts. Return immediately. if (signal == null) { - // If there's no signal yet, then we don't have to evaluate target counts. Return immediately. if (logger.isDebugEnabled()) { - logger.debug("No release signal yet for {} on FlowFile {}", new Object[] {signalId, flowFile}); + logger.debug("No release signal found for {} on FlowFile {} yet", new Object[] {signalId, flowFile}); } + getFlowFilesFor.apply(REL_WAIT).add(flowFile); + continue; + } + // Fix target counter name and count from current FlowFile, if those are not set yet. + if (candidates.isEmpty()) { + targetCounterName = context.getProperty(SIGNAL_COUNTER_NAME).evaluateAttributeExpressions(flowFile).getValue(); + try { + targetCount = Long.valueOf(context.getProperty(TARGET_SIGNAL_COUNT).evaluateAttributeExpressions(flowFile).getValue()); + } catch (final NumberFormatException e) { + transferToFailure.accept(flowFile); + logger.error("Failed to parse targetCount when processing {} due to {}", new Object[] {flowFile, e}, e); + continue; + } + try { + releasableFlowFileCount = Integer.valueOf(context.getProperty(RELEASABLE_FLOWFILE_COUNT).evaluateAttributeExpressions(flowFile).getValue()); + } catch (final NumberFormatException e) { + transferToFailure.accept(flowFile); + logger.error("Failed to parse releasableFlowFileCount when processing {} due to {}", new Object[] {flowFile, e}, e); + continue; + } + } - final String waitMode = context.getProperty(WAIT_MODE).getValue(); - if (WAIT_MODE_TRANSFER_TO_WAIT.getValue().equals(waitMode)) { - session.transfer(flowFile, REL_WAIT); - } else if (WAIT_MODE_KEEP_IN_UPSTREAM.getValue().equals(waitMode)) { - // Transfer to self. - session.transfer(flowFile); + // FlowFile is now validated and added to candidates. + candidates.add(flowFile); + } + + boolean waitCompleted = false; + boolean waitProgressed = false; + if (signal != null && !candidates.isEmpty()) { + + if (releasableFlowFileCount > 1) { + signal.releaseCandidatese(targetCounterName, targetCount, releasableFlowFileCount, candidates, + released -> getFlowFilesFor.apply(REL_SUCCESS).addAll(released), + waiting -> getFlowFilesFor.apply(REL_WAIT).addAll(waiting)); + waitProgressed = !getFlowFilesFor.apply(REL_SUCCESS).isEmpty(); + + } else { + // releasableFlowFileCount = 0 or 1 + boolean reachedTargetCount = StringUtils.isBlank(targetCounterName) + ? signal.isTotalCountReached(targetCount) + : signal.isCountReached(targetCounterName, targetCount); + + if (reachedTargetCount) { + if (releasableFlowFileCount == 0) { + getFlowFilesFor.apply(REL_SUCCESS).addAll(candidates); + } else { + // releasableFlowFileCount = 1 + getFlowFilesFor.apply(REL_SUCCESS).add(candidates.remove(0)); + getFlowFilesFor.apply(REL_WAIT).addAll(candidates); + // If releasableFlowFileCount == 0, leave signal as it is, + // so that any number of FlowFile can be released as long as target count condition matches. + waitCompleted = true; + } } else { - throw new ProcessException("Unsupported wait mode " + waitMode + " was specified."); + getFlowFilesFor.apply(REL_WAIT).addAll(candidates); } - return; } + } - final String targetCounterName = context.getProperty(SIGNAL_COUNTER_NAME).evaluateAttributeExpressions(flowFile).getValue(); - final Long targetCount = Long.valueOf(context.getProperty(TARGET_SIGNAL_COUNT).evaluateAttributeExpressions(flowFile).getValue()); - final boolean reachedToTargetCount = StringUtils.isBlank(targetCounterName) - ? signal.isTotalCountReached(targetCount) - : signal.isCountReached(targetCounterName, targetCount); + // Transfer FlowFiles. + processedFlowFiles.entrySet().forEach(transferFlowFiles); - if (!reachedToTargetCount) { - if (logger.isDebugEnabled()) { - logger.debug("Release signal count {} hasn't reached {} for {} on FlowFile {}", - new Object[] {targetCounterName, targetCount, signalId, flowFile}); - } - flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes); - session.transfer(flowFile, REL_WAIT); - return; + // Update signal if needed. + try { + if (waitCompleted) { + protocol.complete(signalId); + } else if (waitProgressed) { + protocol.replace(signal); } - - flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes); - session.transfer(flowFile, REL_SUCCESS); - - protocol.complete(signalId); - - } catch (final NumberFormatException e) { - flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes); - flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_FAILURE); - logger.error("Failed to parse targetCount when processing {} due to {}", new Object[] {flowFile, e}); - } catch (final IOException e) { - flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes); - flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_FAILURE); - logger.error("Unable to communicate with cache when processing {} due to {}", new Object[] {flowFile, e}); + session.rollback(); + throw new ProcessException(String.format("Unable to communicate with cache while updating %s due to %s", signalId, e), e); } + } private FlowFile copySignalAttributes(final ProcessSession session, final FlowFile flowFile, final Signal signal, final boolean replaceOriginal) { @@ -341,13 +475,13 @@ public class Wait extends AbstractProcessor { return flowFile; } - // copy over attributes from release signal flow file, if provided + // copy over attributes from release signal FlowFile, if provided final Map attributesToCopy; if (replaceOriginal) { attributesToCopy = new HashMap<>(signal.getAttributes()); attributesToCopy.remove("uuid"); } else { - // if the current flow file does *not* have the cached attribute, copy it + // if the current FlowFile does *not* have the cached attribute, copy it attributesToCopy = signal.getAttributes().entrySet().stream() .filter(e -> flowFile.getAttribute(e.getKey()) == null) .collect(Collectors.toMap(Entry::getKey, Entry::getValue)); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java index ae5cbbd055..1c89108960 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java @@ -31,7 +31,9 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ConcurrentModificationException; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.function.Consumer; /** * This class provide a protocol for Wait and Notify processors to work together. @@ -51,8 +53,16 @@ public class WaitNotifyProtocol { private final Deserializer stringDeserializer = input -> new String(input, StandardCharsets.UTF_8); public static class Signal { + + /* + * Getter and Setter methods are needed to (de)serialize JSON even if it's not used from app code. + */ + + transient private String identifier; + transient private long revision = -1; private Map counts = new HashMap<>(); private Map attributes = new HashMap<>(); + private int releasableCount = 0; public Map getCounts() { return counts; @@ -84,6 +94,54 @@ public class WaitNotifyProtocol { return count != null ? count : 0; } + public int getReleasableCount() { + return releasableCount; + } + + public void setReleasableCount(int releasableCount) { + this.releasableCount = releasableCount; + } + + /** + *

Consume accumulated notification signals to let some waiting candidates get released.

+ * + *

This method updates state of this instance, but does not update cache storage. + * Caller of this method is responsible for updating cache storage after processing released and waiting candidates + * by calling {@link #replace(Signal)}. Caller should rollback what it processed with these candidates if complete call failed.

+ * + * @param _counterName signal counter name to consume from. + * @param requiredCountForPass number of required signals to acquire a pass. + * @param releasableCandidateCountPerPass number of releasable candidate per pass. + * @param candidates candidates waiting for being allowed to pass. + * @param released function to process allowed candidates to pass. + * @param waiting function to process candidates those should remain in waiting queue. + * @param Type of candidate + */ + public void releaseCandidatese(final String _counterName, final long requiredCountForPass, + final int releasableCandidateCountPerPass, final List candidates, + final Consumer> released, final Consumer> waiting) { + + // counterName is mandatory otherwise, we can't decide which counter to convert into pass count. + final String counterName = _counterName == null || _counterName.length() == 0 ? DEFAULT_COUNT_NAME : _counterName; + + final int candidateSize = candidates.size(); + if (releasableCount < candidateSize) { + // If current passCount is not enough for the candidate size, then try to get more. + // Convert notification signals to pass ticket. + final long signalCount = getCount(counterName); + releasableCount += (signalCount / requiredCountForPass) * releasableCandidateCountPerPass; + final long reducedSignalCount = signalCount % requiredCountForPass; + counts.put(counterName, reducedSignalCount); + } + + int releaseCount = Math.min(releasableCount, candidateSize); + + released.accept(candidates.subList(0, releaseCount)); + waiting.accept(candidates.subList(releaseCount, candidateSize)); + + releasableCount -= releaseCount; + } + } private final AtomicDistributedMapCacheClient cache; @@ -95,7 +153,7 @@ public class WaitNotifyProtocol { /** * Notify a signal to increase a counter. * @param signalId a key in the underlying cache engine - * @param deltas a map containing counterName and delta entries + * @param deltas a map containing counterName and delta entries, 0 has special meaning, clears the counter back to 0 * @param attributes attributes to save in the cache entry * @return A Signal instance, merged with an existing signal if any * @throws IOException thrown when it failed interacting with the cache engine @@ -106,10 +164,9 @@ public class WaitNotifyProtocol { for (int i = 0; i < MAX_REPLACE_RETRY_COUNT; i++) { - final CacheEntry existingEntry = cache.fetch(signalId, stringSerializer, stringDeserializer); - final Signal existingSignal = getSignal(signalId); final Signal signal = existingSignal != null ? existingSignal : new Signal(); + signal.identifier = signalId; if (attributes != null) { signal.attributes.putAll(attributes); @@ -117,15 +174,11 @@ public class WaitNotifyProtocol { deltas.forEach((counterName, delta) -> { long count = signal.counts.containsKey(counterName) ? signal.counts.get(counterName) : 0; - count += delta; + count = delta == 0 ? 0 : count + delta; signal.counts.put(counterName, count); }); - final String signalJson = objectMapper.writeValueAsString(signal); - final long revision = existingEntry != null ? existingEntry.getRevision() : -1; - - - if (cache.replace(signalId, signalJson, stringSerializer, stringSerializer, revision)) { + if (replace(signal)) { return signal; } @@ -148,7 +201,7 @@ public class WaitNotifyProtocol { * Notify a signal to increase a counter. * @param signalId a key in the underlying cache engine * @param counterName specify count to update - * @param delta delta to update a counter + * @param delta delta to update a counter, 0 has special meaning, clears the counter back to 0 * @param attributes attributes to save in the cache entry * @return A Signal instance, merged with an existing signal if any * @throws IOException thrown when it failed interacting with the cache engine @@ -184,12 +237,16 @@ public class WaitNotifyProtocol { final String value = entry.getValue(); try { - return objectMapper.readValue(value, Signal.class); + final Signal signal = objectMapper.readValue(value, Signal.class); + signal.identifier = signalId; + signal.revision = entry.getRevision(); + return signal; } catch (final JsonParseException jsonE) { // Try to read it as FlowFileAttributes for backward compatibility. try { final Map attributes = new FlowFileAttributesSerializer().deserialize(value.getBytes(StandardCharsets.UTF_8)); final Signal signal = new Signal(); + signal.identifier = signalId; signal.setAttributes(attributes); signal.getCounts().put(DEFAULT_COUNT_NAME, 1L); return signal; @@ -209,4 +266,11 @@ public class WaitNotifyProtocol { public void complete(final String signalId) throws IOException { cache.remove(signalId, stringSerializer); } + + public boolean replace(final Signal signal) throws IOException { + + final String signalJson = objectMapper.writeValueAsString(signal); + return cache.replace(signal.identifier, signalJson, stringSerializer, stringSerializer, signal.revision); + + } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java index 0ce0045d08..eb196c9bd7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java @@ -19,12 +19,22 @@ package org.apache.nifi.processors.standard; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.stream.IntStream; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processors.standard.TestNotify.MockCacheClient; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.MockFlowFile; @@ -180,14 +190,15 @@ public class TestWait { final Map props = new HashMap<>(); props.put("releaseSignalAttribute", "2"); runner.enqueue(new byte[]{}, props); - runner.run(); - - //Expect the processor to receive an IO exception from the cache service and route to failure - runner.assertAllFlowFilesTransferred(Wait.REL_FAILURE, 1); - runner.assertTransferCount(Wait.REL_FAILURE, 1); - runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0).assertAttributeNotExists("wait.counter.total"); - - service.setFailOnCalls(false); + try { + runner.run(); + fail("Expect the processor to receive an IO exception from the cache service and throws ProcessException."); + } catch (final AssertionError e) { + assertTrue(e.getCause() instanceof ProcessException); + assertTrue(e.getCause().getCause() instanceof IOException); + } finally { + service.setFailOnCalls(false); + } } @Test @@ -360,7 +371,6 @@ public class TestWait { assertNull("The key no longer exist", protocol.getSignal("key")); } - @Test public void testWaitForSpecificCount() throws InitializationException, IOException { Map cachedAttributes = new HashMap<>(); @@ -455,4 +465,222 @@ public class TestWait { } + private class TestIteration { + final List released = new ArrayList<>(); + final List waiting = new ArrayList<>(); + final List failed = new ArrayList<>(); + + final List expectedReleased = new ArrayList<>(); + final List expectedWaiting = new ArrayList<>(); + final List expectedFailed = new ArrayList<>(); + + void run() { + released.clear(); + waiting.clear(); + failed.clear(); + + runner.run(); + + released.addAll(runner.getFlowFilesForRelationship(Wait.REL_SUCCESS)); + waiting.addAll(runner.getFlowFilesForRelationship(Wait.REL_WAIT)); + failed.addAll(runner.getFlowFilesForRelationship(Wait.REL_FAILURE)); + + assertEquals(expectedReleased.size(), released.size()); + assertEquals(expectedWaiting.size(), waiting.size()); + assertEquals(expectedFailed.size(), failed.size()); + + final BiConsumer, List> assertContents = (expected, actual) -> { + for (int i = 0; i < expected.size(); i++) { + actual.get(i).assertContentEquals(expected.get(i)); + } + }; + + assertContents.accept(expectedReleased, released); + assertContents.accept(expectedWaiting, waiting); + assertContents.accept(expectedFailed, failed); + + runner.clearTransferState(); + expectedReleased.clear(); + expectedWaiting.clear(); + expectedFailed.clear(); + } + } + + @Test + public void testWaitBufferCount() throws InitializationException, IOException { + Map cachedAttributes = new HashMap<>(); + cachedAttributes.put("notified", "notified-value"); + + // Setup existing cache entry. + final WaitNotifyProtocol protocol = new WaitNotifyProtocol(service); + + runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}"); + runner.setProperty(Wait.TARGET_SIGNAL_COUNT, "${targetSignalCount}"); + runner.setProperty(Wait.SIGNAL_COUNTER_NAME, "${signalCounterName}"); + runner.setProperty(Wait.WAIT_BUFFER_COUNT, "2"); + + final Map waitAttributesA = new HashMap<>(); + waitAttributesA.put("releaseSignalAttribute", "key-A"); + waitAttributesA.put("targetSignalCount", "1"); + waitAttributesA.put("signalCounterName", "counter"); + + final Map waitAttributesB = new HashMap<>(); + waitAttributesB.put("releaseSignalAttribute", "key-B"); + waitAttributesB.put("targetSignalCount", "3"); + waitAttributesB.put("signalCounterName", "counter"); + + final Map waitAttributesBInvalid = new HashMap<>(); + waitAttributesBInvalid.putAll(waitAttributesB); + waitAttributesBInvalid.remove("releaseSignalAttribute"); + + final TestIteration testIteration = new TestIteration(); + + // Enqueue multiple wait FlowFiles. + runner.enqueue("1".getBytes(), waitAttributesB); // Should be picked at 1st and 2nd itr + runner.enqueue("2".getBytes(), waitAttributesA); // Should be picked at 3rd itr + runner.enqueue("3".getBytes(), waitAttributesBInvalid); + runner.enqueue("4".getBytes(), waitAttributesA); // Should be picked at 3rd itr + runner.enqueue("5".getBytes(), waitAttributesB); // Should be picked at 1st + runner.enqueue("6".getBytes(), waitAttributesB); // Should be picked at 2nd itr + + /* + * 1st run: + * pick 1 key-B + * skip 2 cause key-A + * skip 3 cause invalid + * skip 4 cause key-A + * pick 5 key-B + */ + testIteration.expectedWaiting.addAll(Arrays.asList("1", "5")); // Picked, but not enough counter. + testIteration.expectedFailed.add("3"); // invalid. + testIteration.run(); + + /* + * 2nd run: + * pick 6 key-B + * pick 1 key-B + */ + protocol.notify("key-B", "counter", 3, cachedAttributes); + testIteration.expectedReleased.add("6"); + testIteration.expectedWaiting.add("1"); // Picked but only one FlowFile can be released. + // enqueue waiting, simulating wait relationship back to self + testIteration.waiting.forEach(f -> runner.enqueue(f)); + testIteration.run(); + + } + + @Test + public void testReleaseMultipleFlowFiles() throws InitializationException, IOException { + Map cachedAttributes = new HashMap<>(); + cachedAttributes.put("notified", "notified-value"); + + // Setup existing cache entry. + final WaitNotifyProtocol protocol = new WaitNotifyProtocol(service); + + runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}"); + runner.setProperty(Wait.TARGET_SIGNAL_COUNT, "${targetSignalCount}"); + runner.setProperty(Wait.SIGNAL_COUNTER_NAME, "${signalCounterName}"); + runner.setProperty(Wait.WAIT_BUFFER_COUNT, "2"); + runner.setProperty(Wait.RELEASABLE_FLOWFILE_COUNT, "${fragmentCount}"); + + final Map waitAttributes = new HashMap<>(); + waitAttributes.put("releaseSignalAttribute", "key"); + waitAttributes.put("targetSignalCount", "3"); + waitAttributes.put("signalCounterName", "counter"); + waitAttributes.put("fragmentCount", "6"); + + final TestIteration testIteration = new TestIteration(); + + // Enqueue 6 wait FlowFiles. 1,2,3,4,5,6 + IntStream.range(1, 7).forEach(i -> runner.enqueue(String.valueOf(i).getBytes(), waitAttributes)); + + /* + * 1st run + */ + testIteration.expectedWaiting.addAll(Arrays.asList("1", "2")); + testIteration.run(); + + WaitNotifyProtocol.Signal signal = protocol.getSignal("key"); + assertNull(signal); + + /* + * 2nd run + */ + protocol.notify("key", "counter", 3, cachedAttributes); + testIteration.expectedReleased.addAll(Arrays.asList("3", "4")); + testIteration.waiting.forEach(f -> runner.enqueue(f)); + testIteration.run(); + + signal = protocol.getSignal("key"); + assertEquals(0, signal.getCount("count")); + assertEquals(4, signal.getReleasableCount()); + + /* + * 3rd run + */ + testIteration.expectedReleased.addAll(Arrays.asList("5", "6")); + testIteration.waiting.forEach(f -> runner.enqueue(f)); + testIteration.run(); + + signal = protocol.getSignal("key"); + assertEquals(0, signal.getCount("count")); + assertEquals(2, signal.getReleasableCount()); + } + + @Test + public void testOpenGate() throws InitializationException, IOException { + Map cachedAttributes = new HashMap<>(); + cachedAttributes.put("notified", "notified-value"); + + // Setup existing cache entry. + final WaitNotifyProtocol protocol = new WaitNotifyProtocol(service); + + runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}"); + runner.setProperty(Wait.TARGET_SIGNAL_COUNT, "${targetSignalCount}"); + runner.setProperty(Wait.SIGNAL_COUNTER_NAME, "${signalCounterName}"); + runner.setProperty(Wait.WAIT_BUFFER_COUNT, "2"); + runner.setProperty(Wait.RELEASABLE_FLOWFILE_COUNT, "0"); // Leave gate open + + final Map waitAttributes = new HashMap<>(); + waitAttributes.put("releaseSignalAttribute", "key"); + waitAttributes.put("targetSignalCount", "3"); + waitAttributes.put("signalCounterName", "counter"); + + final TestIteration testIteration = new TestIteration(); + + // Enqueue 6 wait FlowFiles. 1,2,3,4,5,6 + IntStream.range(1, 7).forEach(i -> runner.enqueue(String.valueOf(i).getBytes(), waitAttributes)); + + /* + * 1st run + */ + testIteration.expectedWaiting.addAll(Arrays.asList("1", "2")); + testIteration.run(); + + WaitNotifyProtocol.Signal signal = protocol.getSignal("key"); + assertNull(signal); + + /* + * 2nd run + */ + protocol.notify("key", "counter", 3, cachedAttributes); + testIteration.expectedReleased.addAll(Arrays.asList("3", "4")); + testIteration.waiting.forEach(f -> runner.enqueue(f)); + testIteration.run(); + + signal = protocol.getSignal("key"); + assertEquals(3, signal.getCount("counter")); + assertEquals(0, signal.getReleasableCount()); + + /* + * 3rd run + */ + testIteration.expectedReleased.addAll(Arrays.asList("5", "6")); + testIteration.waiting.forEach(f -> runner.enqueue(f)); + testIteration.run(); + + signal = protocol.getSignal("key"); + assertEquals(3, signal.getCount("counter")); + assertEquals(0, signal.getReleasableCount()); + } } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java index bf7e1e65ab..13b4346859 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java @@ -27,11 +27,18 @@ import org.junit.Before; import org.junit.Test; import org.mockito.stubbing.Answer; +import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.ConcurrentModificationException; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import static org.apache.nifi.processors.standard.WaitNotifyProtocol.DEFAULT_COUNT_NAME; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -102,7 +109,7 @@ public class TestWaitNotifyProtocol { final CacheEntry cacheEntry = cacheEntries.get("signal-id"); assertEquals(0, cacheEntry.getRevision()); - assertEquals("{\"counts\":{\"a\":1},\"attributes\":{}}", cacheEntry.getValue()); + assertEquals("{\"counts\":{\"a\":1},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue()); } @Test @@ -119,20 +126,20 @@ public class TestWaitNotifyProtocol { CacheEntry cacheEntry = cacheEntries.get("signal-id"); assertEquals(1, cacheEntry.getRevision()); - assertEquals("{\"counts\":{\"a\":2},\"attributes\":{}}", cacheEntry.getValue()); + assertEquals("{\"counts\":{\"a\":2},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue()); protocol.notify(signalId, "a", 10, null); cacheEntry = cacheEntries.get("signal-id"); assertEquals(2, cacheEntry.getRevision()); - assertEquals("{\"counts\":{\"a\":12},\"attributes\":{}}", cacheEntry.getValue()); + assertEquals("{\"counts\":{\"a\":12},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue()); protocol.notify(signalId, "b", 2, null); protocol.notify(signalId, "c", 3, null); cacheEntry = cacheEntries.get("signal-id"); assertEquals(4, cacheEntry.getRevision()); - assertEquals("{\"counts\":{\"a\":12,\"b\":2,\"c\":3},\"attributes\":{}}", cacheEntry.getValue()); + assertEquals("{\"counts\":{\"a\":12,\"b\":2,\"c\":3},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue()); final Map deltas = new HashMap<>(); deltas.put("a", 10); @@ -141,7 +148,13 @@ public class TestWaitNotifyProtocol { cacheEntry = cacheEntries.get("signal-id"); assertEquals(5, cacheEntry.getRevision()); - assertEquals("{\"counts\":{\"a\":22,\"b\":27,\"c\":3},\"attributes\":{}}", cacheEntry.getValue()); + assertEquals("{\"counts\":{\"a\":22,\"b\":27,\"c\":3},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue()); + + // Zero clear 'b'. + protocol.notify("signal-id", "b", 0, null); + cacheEntry = cacheEntries.get("signal-id"); + assertEquals(6, cacheEntry.getRevision()); + assertEquals("{\"counts\":{\"a\":22,\"b\":0,\"c\":3},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue()); } @@ -161,7 +174,7 @@ public class TestWaitNotifyProtocol { CacheEntry cacheEntry = cacheEntries.get("signal-id"); assertEquals(0, cacheEntry.getRevision()); - assertEquals("{\"counts\":{\"a\":1},\"attributes\":{\"p1\":\"a1\",\"p2\":\"a1\"}}", cacheEntry.getValue()); + assertEquals("{\"counts\":{\"a\":1},\"attributes\":{\"p1\":\"a1\",\"p2\":\"a1\"},\"releasableCount\":0}", cacheEntry.getValue()); final Map attributeA2 = new HashMap<>(); attributeA2.put("p2", "a2"); // Update p2 @@ -173,7 +186,7 @@ public class TestWaitNotifyProtocol { cacheEntry = cacheEntries.get("signal-id"); assertEquals(1, cacheEntry.getRevision()); assertEquals("Updated attributes should be merged correctly", - "{\"counts\":{\"a\":2},\"attributes\":{\"p1\":\"a1\",\"p2\":\"a2\",\"p3\":\"a2\"}}", cacheEntry.getValue()); + "{\"counts\":{\"a\":2},\"attributes\":{\"p1\":\"a1\",\"p2\":\"a2\",\"p3\":\"a2\"},\"releasableCount\":0}", cacheEntry.getValue()); } @@ -237,7 +250,7 @@ public class TestWaitNotifyProtocol { final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache); final Signal signal = protocol.getSignal(signalId); - assertEquals(1, signal.getCount(WaitNotifyProtocol.DEFAULT_COUNT_NAME)); + assertEquals(1, signal.getCount(DEFAULT_COUNT_NAME)); assertEquals("value1", signal.getAttributes().get("key1")); assertEquals("value2", signal.getAttributes().get("key2")); assertEquals("value3", signal.getAttributes().get("key3")); @@ -251,4 +264,81 @@ public class TestWaitNotifyProtocol { } + @Test + public void testReleaseCandidate() throws Exception { + final List candidates = IntStream.range(0, 10).boxed().collect(Collectors.toList()); + final Signal signal = new Signal(); + final List released = new ArrayList<>(); + final List waiting = new ArrayList<>(); + + // Test default name. + final String counterName = null; + + final BiConsumer releaseCandidate = (requiredCountForPass, releasableCandidatePerPass) -> { + released.clear(); + waiting.clear(); + signal.releaseCandidatese(counterName, requiredCountForPass, releasableCandidatePerPass, candidates, + r -> released.addAll(r), w -> waiting.addAll(w)); + }; + + final Field releasableCount = Signal.class.getDeclaredField("releasableCount"); + releasableCount.setAccessible(true); + + // No counter, should wait. + releaseCandidate.accept(3L, 1); + assertEquals(0, released.size()); + assertEquals(10, waiting.size()); + assertEquals(0, signal.getCount(DEFAULT_COUNT_NAME)); + assertEquals(0, releasableCount.getInt(signal)); + + // Counter is not enough yet. + signal.getCounts().put(DEFAULT_COUNT_NAME, 1L); + releaseCandidate.accept(3L, 1); + assertEquals(0, released.size()); + assertEquals(10, waiting.size()); + assertEquals(1, signal.getCount(DEFAULT_COUNT_NAME)); // Counter incremented, but not enough + assertEquals(0, releasableCount.getInt(signal)); + + // Counter reached the target. + signal.getCounts().put(DEFAULT_COUNT_NAME, 3L); + releaseCandidate.accept(3L, 1); + assertEquals(1, released.size()); + assertEquals(9, waiting.size()); + assertEquals(0, signal.getCount(DEFAULT_COUNT_NAME)); // Counter 3 was converted into 1 release + assertEquals(0, releasableCount.getInt(signal)); + + // Counter reached the target for two candidates. + signal.getCounts().put(DEFAULT_COUNT_NAME, 6L); + releaseCandidate.accept(3L, 1); + assertEquals(2, released.size()); + assertEquals(8, waiting.size()); + assertEquals(0, signal.getCount(DEFAULT_COUNT_NAME)); // Counter 3 was converted into 1 release + assertEquals(0, releasableCount.getInt(signal)); + + // Counter reached the target for two candidates, and reminder is 2. + signal.getCounts().put(DEFAULT_COUNT_NAME, 11L); + releaseCandidate.accept(3L, 1); + assertEquals(3, released.size()); // 11 / 3 = 3 + assertEquals(7, waiting.size()); + assertEquals(2, signal.getCount(DEFAULT_COUNT_NAME)); // 11 % 3 = 2 + assertEquals(0, releasableCount.getInt(signal)); + + // Counter reached the target for two pass count and each pass can release 2 candidates. + signal.getCounts().put(DEFAULT_COUNT_NAME, 6L); + releaseCandidate.accept(3L, 2); + assertEquals(4, released.size()); // (6 / 3) * 2 = 4 + assertEquals(6, waiting.size()); + assertEquals(0, signal.getCount(DEFAULT_COUNT_NAME)); // 6 % 3 = 0 + assertEquals(0, releasableCount.getInt(signal)); + + // If there are counts more than enough to release current candidates, unused releasableCount should remain. + signal.getCounts().put(DEFAULT_COUNT_NAME, 50L); + releaseCandidate.accept(3L, 2); + assertEquals(10, released.size()); // (50 / 3) * 2 = 32. Used 10. + assertEquals(0, waiting.size()); + assertEquals(2, signal.getCount(DEFAULT_COUNT_NAME)); // 50 % 3 = 2. + assertEquals(22, releasableCount.getInt(signal)); // 32 - 10 = 22. + + } + } \ No newline at end of file