From a90fa9c285cce2beb1fea8fb266df402f8926fcf Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Thu, 2 Feb 2017 15:18:01 +0900 Subject: [PATCH] NIFI-3431: Support batch update in Notify processor - Added Signal Counter Delta property - Added Signal Buffer Count property - Added processor property name and display name - Changed IOException handling from routing it to failure to throw RuntimeException, so that NiFi framework can yield the processor for a while and try again Signed-off-by: Pierre Villard This closes #1466. --- .../nifi/processors/standard/Notify.java | 219 ++++++++++++------ .../standard/WaitNotifyProtocol.java | 93 +++++--- .../nifi/processors/standard/TestNotify.java | 153 +++++++++++- .../standard/TestWaitNotifyProtocol.java | 9 + 4 files changed, 358 insertions(+), 116 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java index 9d809efde2..155d131644 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java @@ -24,7 +24,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.EventDriven; @@ -35,6 +34,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; import org.apache.nifi.expression.AttributeExpression.ResultType; import org.apache.nifi.flowfile.FlowFile; @@ -59,54 +59,83 @@ public class Notify extends AbstractProcessor { // Identifies the distributed map cache client public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder() - .name("Distributed Cache Service") - .description("The Controller Service that is used to cache release signals in order to release files queued at a corresponding Wait processor") - .required(true) - .identifiesControllerService(AtomicDistributedMapCacheClient.class) - .build(); + .name("distributed-cache-service") + .displayName("Distributed Cache Service") + .description("The Controller Service that is used to cache release signals in order to release files queued at a corresponding Wait processor") + .required(true) + .identifiesControllerService(AtomicDistributedMapCacheClient.class) + .build(); // Selects the FlowFile attribute or expression, whose value is used as cache key public static final PropertyDescriptor RELEASE_SIGNAL_IDENTIFIER = new PropertyDescriptor.Builder() - .name("Release Signal Identifier") - .description("A value, or the results of an Attribute Expression Language statement, which will " + - "be evaluated against a FlowFile in order to determine the release signal cache key") - .required(true) - .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true)) - .expressionLanguageSupported(true) - .build(); + .name("release-signal-id") + .displayName("Release Signal Identifier") + .description("A value, or the results of an Attribute Expression Language statement, which will " + + "be evaluated against a FlowFile in order to determine the release signal cache key") + .required(true) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true)) + .expressionLanguageSupported(true) + .build(); public static final PropertyDescriptor SIGNAL_COUNTER_NAME = new PropertyDescriptor.Builder() - .name("Signal Counter Name") - .description("A value, or the results of an Attribute Expression Language statement, which will " + - "be evaluated against a FlowFile in order to determine the signal counter name. " + - "Signal counter name is useful when a corresponding Wait processor needs to know the number of occurrences " + - "of different types of events, such as success or failure, or destination data source names, etc.") - .required(true) - .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true)) - .expressionLanguageSupported(true) - .defaultValue(WaitNotifyProtocol.DEFAULT_COUNT_NAME) - .build(); + .name("signal-counter-name") + .displayName("Signal Counter Name") + .description("A value, or the results of an Attribute Expression Language statement, which will " + + "be evaluated against a FlowFile in order to determine the signal counter name. " + + "Signal counter name is useful when a corresponding Wait processor needs to know the number of occurrences " + + "of different types of events, such as success or failure, or destination data source names, etc.") + .required(true) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true)) + .expressionLanguageSupported(true) + .defaultValue(WaitNotifyProtocol.DEFAULT_COUNT_NAME) + .build(); + + public static final PropertyDescriptor SIGNAL_COUNTER_DELTA = new PropertyDescriptor.Builder() + .name("signal-counter-delta") + .displayName("Signal Counter Delta") + .description("A value, or the results of an Attribute Expression Language statement, which will " + + "be evaluated against a FlowFile in order to determine the signal counter delta. " + + "Specify how much the counter should increase. " + + "For example, if multiple signal events are processed at upstream flow in batch oriented way, " + + "the number of events processed can be notified with this property at once.") + .required(true) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true)) + .expressionLanguageSupported(true) + .defaultValue("1") + .build(); + + public static final PropertyDescriptor SIGNAL_BUFFER_COUNT = new PropertyDescriptor.Builder() + .name("signal-buffer-count") + .displayName("Signal Buffer Count") + .description("Specify the maximum number of incoming flow files that can be buffered until signals are notified to cache service. " + + "The more buffer can provide the better performance, as it reduces the number of interactions with cache service " + + "by grouping signals by signal identifier when multiple incoming flow files share the same signal identifier.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("1") + .build(); // Specifies an optional regex used to identify which attributes to cache public static final PropertyDescriptor ATTRIBUTE_CACHE_REGEX = new PropertyDescriptor.Builder() - .name("Attribute Cache Regex") - .description("Any attributes whose names match this regex will be stored in the distributed cache to be " - + "copied to any FlowFiles released from a corresponding Wait processor. Note that the " - + "uuid attribute will not be cached regardless of this value. If blank, no attributes " - + "will be cached.") - .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) - .expressionLanguageSupported(false) - .build(); + .name("attribute-cache-regex") + .displayName("Attribute Cache Regex") + .description("Any attributes whose names match this regex will be stored in the distributed cache to be " + + "copied to any FlowFiles released from a corresponding Wait processor. Note that the " + + "uuid attribute will not be cached regardless of this value. If blank, no attributes " + + "will be cached.") + .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) + .expressionLanguageSupported(false) + .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("All FlowFiles where the release signal has been successfully entered in the cache will be routed to this relationship") - .build(); + .name("success") + .description("All FlowFiles where the release signal has been successfully entered in the cache will be routed to this relationship") + .build(); public static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("When the cache cannot be reached, or if the Release Signal Identifier evaluates to null or empty, FlowFiles will be routed to this relationship") - .build(); + .name("failure") + .description("When the cache cannot be reached, or if the Release Signal Identifier evaluates to null or empty, FlowFiles will be routed to this relationship") + .build(); private final Set relationships; @@ -122,6 +151,8 @@ public class Notify extends AbstractProcessor { final List descriptors = new ArrayList<>(); descriptors.add(RELEASE_SIGNAL_IDENTIFIER); descriptors.add(SIGNAL_COUNTER_NAME); + descriptors.add(SIGNAL_COUNTER_DELTA); + descriptors.add(SIGNAL_BUFFER_COUNT); descriptors.add(DISTRIBUTED_CACHE_SERVICE); descriptors.add(ATTRIBUTE_CACHE_REGEX); return descriptors; @@ -132,58 +163,100 @@ public class Notify extends AbstractProcessor { return relationships; } + private class SignalBuffer { + + final Map deltas = new HashMap<>(); + final Map attributesToCache = new HashMap<>(); + final List flowFiles = new ArrayList<>(); + + int incrementDelta(final String counterName, final int delta) { + int current = deltas.containsKey(counterName) ? deltas.get(counterName) : 0; + int updated = current + delta; + deltas.put(counterName, updated); + return updated; + } + } + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - FlowFile flowFile = session.get(); - if (flowFile == null) { - return; - } - final ComponentLog logger = getLogger(); + final PropertyValue signalIdProperty = context.getProperty(RELEASE_SIGNAL_IDENTIFIER); + final PropertyValue counterNameProperty = context.getProperty(SIGNAL_COUNTER_NAME); + final PropertyValue deltaProperty = context.getProperty(SIGNAL_COUNTER_DELTA); + final String attributeCacheRegex = context.getProperty(ATTRIBUTE_CACHE_REGEX).getValue(); + final Integer bufferCount = context.getProperty(SIGNAL_BUFFER_COUNT).asInteger(); - // Signal id is computed from attribute 'RELEASE_SIGNAL_IDENTIFIER' with expression language support - final String signalId = context.getProperty(RELEASE_SIGNAL_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue(); - final String counterName = context.getProperty(SIGNAL_COUNTER_NAME).evaluateAttributeExpressions(flowFile).getValue(); - - // if the computed value is null, or empty, we transfer the flow file to failure relationship - if (StringUtils.isBlank(signalId)) { - logger.error("FlowFile {} has no attribute for given Release Signal Identifier", new Object[] {flowFile}); - flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_FAILURE); - return; - } - - // the cache client used to interact with the distributed cache + // the cache client used to interact with the distributed cache. final AtomicDistributedMapCacheClient cache = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(AtomicDistributedMapCacheClient.class); final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache); - try { - final String attributeCacheRegex = (context.getProperty(ATTRIBUTE_CACHE_REGEX).isSet()) - ? context.getProperty(ATTRIBUTE_CACHE_REGEX).getValue() - : null; + final Map signalBuffers = new HashMap<>(); - Map attributesToCache = new HashMap<>(); - if (StringUtils.isNotEmpty(attributeCacheRegex)) { - attributesToCache = flowFile.getAttributes().entrySet() - .stream().filter(e -> (!e.getKey().equals("uuid") && e.getKey().matches(attributeCacheRegex))) - .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())); + for (int i = 0; i < bufferCount; i++) { + + final FlowFile flowFile = session.get(); + if (flowFile == null) { + break; } + // Signal id is computed from attribute 'RELEASE_SIGNAL_IDENTIFIER' with expression language support + final String signalId = signalIdProperty.evaluateAttributeExpressions(flowFile).getValue(); + + // if the computed value is null, or empty, we transfer the flow file to failure relationship + if (StringUtils.isBlank(signalId)) { + logger.error("FlowFile {} has no attribute for given Release Signal Identifier", new Object[] {flowFile}); + session.transfer(flowFile, REL_FAILURE); + continue; + } + + String counterName = counterNameProperty.evaluateAttributeExpressions(flowFile).getValue(); + if (StringUtils.isEmpty(counterName)) { + counterName = WaitNotifyProtocol.DEFAULT_COUNT_NAME; + } + + int delta = 1; + if (deltaProperty.isSet()) { + final String deltaStr = deltaProperty.evaluateAttributeExpressions(flowFile).getValue(); + try { + delta = Integer.parseInt(deltaStr); + } catch (final NumberFormatException e) { + logger.error("Failed to calculate delta for FlowFile {} due to {}", new Object[] {flowFile, e}, e); + session.transfer(flowFile, REL_FAILURE); + continue; + } + } + + if (!signalBuffers.containsKey(signalId)) { + signalBuffers.put(signalId, new SignalBuffer()); + } + final SignalBuffer signalBuffer = signalBuffers.get(signalId); + + if (StringUtils.isNotEmpty(attributeCacheRegex)) { + flowFile.getAttributes().entrySet() + .stream().filter(e -> (!e.getKey().equals("uuid") && e.getKey().matches(attributeCacheRegex))) + .forEach(e -> signalBuffer.attributesToCache.put(e.getKey(), e.getValue())); + } + + signalBuffer.incrementDelta(counterName, delta); + signalBuffer.flowFiles.add(flowFile); + if (logger.isDebugEnabled()) { logger.debug("Cached release signal identifier {} counterName {} from FlowFile {}", new Object[] {signalId, counterName, flowFile}); } - // In case of ConcurrentModificationException, just throw the exception so that processor can - // retry after yielding for a while. - protocol.notify(signalId, counterName, 1, attributesToCache); - - session.transfer(flowFile, REL_SUCCESS); - } catch (final IOException e) { - flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_FAILURE); - logger.error("Unable to communicate with cache when processing {} due to {}", new Object[] {flowFile, e}); } + + signalBuffers.forEach((signalId, signalBuffer) -> { + // In case of Exception, just throw the exception so that processor can + // retry after yielding for a while. + try { + protocol.notify(signalId, signalBuffer.deltas, signalBuffer.attributesToCache); + session.transfer(signalBuffer.flowFiles, REL_SUCCESS); + } catch (IOException e) { + throw new RuntimeException(String.format("Unable to communicate with cache when processing %s due to %s", signalId, e), e); + } + }); } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java index a74590a200..ae5cbbd055 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java @@ -81,7 +81,7 @@ public class WaitNotifyProtocol { public long getCount(final String counterName) { final Long count = counts.get(counterName); - return count != null ? count : -1; + return count != null ? count : 0; } } @@ -92,6 +92,58 @@ public class WaitNotifyProtocol { this.cache = cache; } + /** + * Notify a signal to increase a counter. + * @param signalId a key in the underlying cache engine + * @param deltas a map containing counterName and delta entries + * @param attributes attributes to save in the cache entry + * @return A Signal instance, merged with an existing signal if any + * @throws IOException thrown when it failed interacting with the cache engine + * @throws ConcurrentModificationException thrown if other process is also updating the same signal and failed to update after few retry attempts + */ + public Signal notify(final String signalId, final Map deltas, final Map attributes) + throws IOException, ConcurrentModificationException { + + for (int i = 0; i < MAX_REPLACE_RETRY_COUNT; i++) { + + final CacheEntry existingEntry = cache.fetch(signalId, stringSerializer, stringDeserializer); + + final Signal existingSignal = getSignal(signalId); + final Signal signal = existingSignal != null ? existingSignal : new Signal(); + + if (attributes != null) { + signal.attributes.putAll(attributes); + } + + deltas.forEach((counterName, delta) -> { + long count = signal.counts.containsKey(counterName) ? signal.counts.get(counterName) : 0; + count += delta; + signal.counts.put(counterName, count); + }); + + final String signalJson = objectMapper.writeValueAsString(signal); + final long revision = existingEntry != null ? existingEntry.getRevision() : -1; + + + if (cache.replace(signalId, signalJson, stringSerializer, stringSerializer, revision)) { + return signal; + } + + long waitMillis = REPLACE_RETRY_WAIT_MILLIS * (i + 1); + logger.info("Waiting for {} ms to retry... {}.{}", waitMillis, signalId, deltas); + try { + Thread.sleep(waitMillis); + } catch (InterruptedException e) { + final String msg = String.format("Interrupted while waiting for retrying signal [%s] counter [%s].", signalId, deltas); + throw new ConcurrentModificationException(msg, e); + } + } + + final String msg = String.format("Failed to update signal [%s] counter [%s] after retrying %d times.", signalId, deltas, MAX_REPLACE_RETRY_COUNT); + throw new ConcurrentModificationException(msg); + } + + /** * Notify a signal to increase a counter. * @param signalId a key in the underlying cache engine @@ -105,43 +157,10 @@ public class WaitNotifyProtocol { public Signal notify(final String signalId, final String counterName, final int delta, final Map attributes) throws IOException, ConcurrentModificationException { - for (int i = 0; i < MAX_REPLACE_RETRY_COUNT; i++) { + final Map deltas = new HashMap<>(); + deltas.put(counterName, delta); + return notify(signalId, deltas, attributes); - final CacheEntry existingEntry = cache.fetch(signalId, stringSerializer, stringDeserializer); - - Signal signal = getSignal(signalId); - if (signal == null) { - signal = new Signal(); - } - - if (attributes != null) { - signal.attributes.putAll(attributes); - } - - long count = signal.counts.containsKey(counterName) ? signal.counts.get(counterName) : 0; - count += delta; - signal.counts.put(counterName, count); - - final String signalJson = objectMapper.writeValueAsString(signal); - final long revision = existingEntry != null ? existingEntry.getRevision() : -1; - - - if (cache.replace(signalId, signalJson, stringSerializer, stringSerializer, revision)) { - return signal; - } - - long waitMillis = REPLACE_RETRY_WAIT_MILLIS * (i + 1); - logger.info("Waiting for {} ms to retry... {}.{}", waitMillis, signalId, counterName); - try { - Thread.sleep(waitMillis); - } catch (InterruptedException e) { - final String msg = String.format("Interrupted while waiting for retrying signal [%s] counter [%s].", signalId, counterName); - throw new ConcurrentModificationException(msg, e); - } - } - - final String msg = String.format("Failed to update signal [%s] counter [%s] after retrying %d times.", signalId, counterName, MAX_REPLACE_RETRY_COUNT); - throw new ConcurrentModificationException(msg); } /** diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java index e5c183f923..0494b1830e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java @@ -38,6 +38,7 @@ import java.util.concurrent.ConcurrentMap; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class TestNotify { @@ -112,6 +113,145 @@ public class TestNotify { assertEquals(1, signal.getCount("failure")); } + @Test + public void testNotifyCountersBatch() throws InitializationException, IOException { + runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}"); + runner.setProperty(Notify.ATTRIBUTE_CACHE_REGEX, ".*"); + runner.setProperty(Notify.SIGNAL_COUNTER_NAME, "${status}"); + runner.setProperty(Notify.SIGNAL_BUFFER_COUNT, "2"); + + final Map props1 = new HashMap<>(); + props1.put("releaseSignalAttribute", "someDataProcessing"); + props1.put("key", "data1"); + props1.put("status", "success"); + runner.enqueue(new byte[]{}, props1); + + final Map props2 = new HashMap<>(); + props2.put("releaseSignalAttribute", "someDataProcessing"); + props2.put("key", "data2"); + props2.put("status", "success"); + runner.enqueue(new byte[]{}, props2); + + final Map props3 = new HashMap<>(); + props3.put("releaseSignalAttribute", "someDataProcessing"); + props3.put("key", "data3"); + props3.put("status", "failure"); + runner.enqueue(new byte[]{}, props3); + + runner.run(); + + // Limited by the buffer count + runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 2); + runner.clearTransferState(); + + Signal signal = new WaitNotifyProtocol(service).getSignal("someDataProcessing"); + Map cachedAttributes = signal.getAttributes(); + assertEquals("Same attribute key will be overwritten by the latest signal", "data2", cachedAttributes.get("key")); + assertTrue(signal.isTotalCountReached(2)); + assertEquals(2, signal.getCount("success")); + assertEquals(0, signal.getCount("failure")); + + // Run it again, and it should process remaining one flow file. + runner.run(); + runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 1); + runner.clearTransferState(); + + signal = new WaitNotifyProtocol(service).getSignal("someDataProcessing"); + cachedAttributes = signal.getAttributes(); + assertEquals("Same attribute key will be overwritten by the latest signal", "data3", cachedAttributes.get("key")); + assertTrue(signal.isTotalCountReached(3)); + assertEquals(2, signal.getCount("success")); + assertEquals(1, signal.getCount("failure")); + + } + + @Test + public void testNotifyCountersUsingDelta() throws InitializationException, IOException { + runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}"); + runner.setProperty(Notify.ATTRIBUTE_CACHE_REGEX, ".*"); + runner.setProperty(Notify.SIGNAL_COUNTER_NAME, "${status}"); + runner.setProperty(Notify.SIGNAL_COUNTER_DELTA, "${record.count}"); + runner.setProperty(Notify.SIGNAL_BUFFER_COUNT, "10"); + + final Map props1 = new HashMap<>(); + props1.put("releaseSignalAttribute", "someDataProcessing"); + props1.put("key", "data1"); + props1.put("status", "success"); + props1.put("record.count", "1024"); + runner.enqueue(new byte[]{}, props1); + + final Map props2 = new HashMap<>(); + props2.put("releaseSignalAttribute", "someDataProcessing"); + props2.put("key", "data2"); + props2.put("status", "success"); + props2.put("record.count", "2048"); + runner.enqueue(new byte[]{}, props2); + + final Map props3 = new HashMap<>(); + props3.put("releaseSignalAttribute", "someDataProcessing"); + props3.put("key", "data3"); + props3.put("status", "failure"); + props3.put("record.count", "512"); + runner.enqueue(new byte[]{}, props3); + + runner.run(); + + runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 3); + runner.clearTransferState(); + + final Signal signal = new WaitNotifyProtocol(service).getSignal("someDataProcessing"); + Map cachedAttributes = signal.getAttributes(); + assertEquals("Same attribute key will be overwritten by the latest signal", "data3", cachedAttributes.get("key")); + assertTrue(signal.isTotalCountReached(3584)); + assertEquals(3072, signal.getCount("success")); + assertEquals(512, signal.getCount("failure")); + } + + @Test + public void testIllegalDelta() throws InitializationException, IOException { + runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}"); + runner.setProperty(Notify.ATTRIBUTE_CACHE_REGEX, ".*"); + runner.setProperty(Notify.SIGNAL_COUNTER_NAME, "${status}"); + runner.setProperty(Notify.SIGNAL_COUNTER_DELTA, "${record.count}"); + runner.setProperty(Notify.SIGNAL_BUFFER_COUNT, "10"); + + final Map props1 = new HashMap<>(); + props1.put("releaseSignalAttribute", "someDataProcessing"); + props1.put("key", "data1"); + props1.put("status", "success"); + props1.put("record.count", "1024"); + runner.enqueue(new byte[]{}, props1); + + final Map props2 = new HashMap<>(); + props2.put("releaseSignalAttribute", "someDataProcessing"); + props2.put("key", "data2"); + props2.put("status", "success"); + props2.put("record.count", "2048 records"); + runner.enqueue(new byte[]{}, props2); + + final Map props3 = new HashMap<>(); + props3.put("releaseSignalAttribute", "someDataProcessing"); + props3.put("key", "data3"); + props3.put("status", "failure"); + props3.put("record.count", "512"); + runner.enqueue(new byte[]{}, props3); + + runner.run(); + + // Only failed records should be transferred to failure. + runner.assertTransferCount(Notify.REL_SUCCESS, 2); + runner.assertTransferCount(Notify.REL_FAILURE, 1); + runner.clearTransferState(); + + final Signal signal = new WaitNotifyProtocol(service).getSignal("someDataProcessing"); + Map cachedAttributes = signal.getAttributes(); + assertEquals("Same attribute key will be overwritten by the latest signal", "data3", cachedAttributes.get("key")); + assertTrue(signal.isTotalCountReached(1536)); + assertEquals(1024, signal.getCount("success")); + assertEquals(512, signal.getCount("failure")); + + } + @Test public void testRegex() throws InitializationException, IOException { runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}"); @@ -156,13 +296,14 @@ public class TestNotify { final Map props = new HashMap<>(); props.put("releaseSignalAttribute", "2"); runner.enqueue(new byte[] {}, props); - runner.run(); - - //Expect the processor to receive an IO exception from the cache service and route to failure - runner.assertAllFlowFilesTransferred(Notify.REL_FAILURE, 1); - runner.assertTransferCount(Notify.REL_FAILURE, 1); - + try { + runner.run(); + fail("Processor should throw RuntimeException in case it receives an IO exception from the cache service and yield for a while."); + } catch (final AssertionError e) { + assertTrue(e.getCause() instanceof RuntimeException); + } service.setFailOnCalls(false); + } static class MockCacheClient extends AbstractControllerService implements AtomicDistributedMapCacheClient { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java index d4bc783033..bf7e1e65ab 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java @@ -134,6 +134,15 @@ public class TestWaitNotifyProtocol { assertEquals(4, cacheEntry.getRevision()); assertEquals("{\"counts\":{\"a\":12,\"b\":2,\"c\":3},\"attributes\":{}}", cacheEntry.getValue()); + final Map deltas = new HashMap<>(); + deltas.put("a", 10); + deltas.put("b", 25); + protocol.notify("signal-id", deltas, null); + + cacheEntry = cacheEntries.get("signal-id"); + assertEquals(5, cacheEntry.getRevision()); + assertEquals("{\"counts\":{\"a\":22,\"b\":27,\"c\":3},\"attributes\":{}}", cacheEntry.getValue()); + } @Test