diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java index 346f1fb04b..44933906c2 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java @@ -30,6 +30,7 @@ import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; @@ -53,10 +54,14 @@ import org.apache.nifi.processor.util.StandardValidators; @CapabilityDescription("Caches a release signal identifier in the distributed cache, optionally along with " + "the FlowFile's attributes. Any flow files held at a corresponding Wait processor will be " + "released once this signal in the cache is discovered.") +@WritesAttribute(attribute = "notified", description = "All FlowFiles will have an attribute 'notified'. The value of this " + + "attribute is true, is the FlowFile is notified, otherwise false.") @SeeAlso(classNames = {"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer", "org.apache.nifi.processors.standard.Wait"}) public class Notify extends AbstractProcessor { + public static final String NOTIFIED_ATTRIBUTE_NAME = "notified"; + // Identifies the distributed map cache client public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder() .name("distributed-cache-service") @@ -210,7 +215,8 @@ public class Notify extends AbstractProcessor { // if the computed value is null, or empty, we transfer the flow file to failure relationship if (StringUtils.isBlank(signalId)) { logger.error("FlowFile {} has no attribute for given Release Signal Identifier", new Object[] {flowFile}); - session.transfer(flowFile, REL_FAILURE); + // set 'notified' attribute + session.transfer(session.putAttribute(flowFile, NOTIFIED_ATTRIBUTE_NAME, String.valueOf(false)), REL_FAILURE); continue; } @@ -226,7 +232,7 @@ public class Notify extends AbstractProcessor { delta = Integer.parseInt(deltaStr); } catch (final NumberFormatException e) { logger.error("Failed to calculate delta for FlowFile {} due to {}", new Object[] {flowFile, e}, e); - session.transfer(flowFile, REL_FAILURE); + session.transfer(session.putAttribute(flowFile, NOTIFIED_ATTRIBUTE_NAME, String.valueOf(false)), REL_FAILURE); continue; } } @@ -256,7 +262,8 @@ public class Notify extends AbstractProcessor { // retry after yielding for a while. try { protocol.notify(signalId, signalBuffer.deltas, signalBuffer.attributesToCache); - session.transfer(signalBuffer.flowFiles, REL_SUCCESS); + signalBuffer.flowFiles.forEach(flowFile -> + session.transfer(session.putAttribute(flowFile, NOTIFIED_ATTRIBUTE_NAME, String.valueOf(true)), REL_SUCCESS)); } catch (IOException e) { throw new RuntimeException(String.format("Unable to communicate with cache when processing %s due to %s", signalId, e), e); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java index 3e0cd689d6..ab99af9316 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java @@ -72,6 +72,7 @@ public class TestNotify { runner.run(); runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 1); + runner.getFlowFilesForRelationship(Notify.REL_SUCCESS).get(0).assertAttributeEquals(Notify.NOTIFIED_ATTRIBUTE_NAME, "true"); runner.clearTransferState(); final Signal signal = new WaitNotifyProtocol(service).getSignal("1"); @@ -107,6 +108,7 @@ public class TestNotify { runner.run(3); runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 3); + runner.getFlowFilesForRelationship(Notify.REL_SUCCESS).forEach(flowFile -> flowFile.assertAttributeEquals(Notify.NOTIFIED_ATTRIBUTE_NAME, "true")); runner.clearTransferState(); final Signal signal = new WaitNotifyProtocol(service).getSignal("someDataProcessing"); @@ -146,6 +148,7 @@ public class TestNotify { // Limited by the buffer count runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 2); + runner.getFlowFilesForRelationship(Notify.REL_SUCCESS).forEach(flowFile -> flowFile.assertAttributeEquals(Notify.NOTIFIED_ATTRIBUTE_NAME, "true")); runner.clearTransferState(); Signal signal = new WaitNotifyProtocol(service).getSignal("someDataProcessing"); @@ -158,6 +161,7 @@ public class TestNotify { // Run it again, and it should process remaining one flow file. runner.run(); runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 1); + runner.getFlowFilesForRelationship(Notify.REL_SUCCESS).forEach(flowFile -> flowFile.assertAttributeEquals(Notify.NOTIFIED_ATTRIBUTE_NAME, "true")); runner.clearTransferState(); signal = new WaitNotifyProtocol(service).getSignal("someDataProcessing"); @@ -201,6 +205,7 @@ public class TestNotify { runner.run(); runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 3); + runner.getFlowFilesForRelationship(Notify.REL_SUCCESS).forEach(flowFile -> flowFile.assertAttributeEquals(Notify.NOTIFIED_ATTRIBUTE_NAME, "true")); runner.clearTransferState(); final Signal signal = new WaitNotifyProtocol(service).getSignal("someDataProcessing"); @@ -244,7 +249,9 @@ public class TestNotify { // Only failed records should be transferred to failure. runner.assertTransferCount(Notify.REL_SUCCESS, 2); + runner.getFlowFilesForRelationship(Notify.REL_SUCCESS).forEach(flowFile -> flowFile.assertAttributeEquals(Notify.NOTIFIED_ATTRIBUTE_NAME, "true")); runner.assertTransferCount(Notify.REL_FAILURE, 1); + runner.getFlowFilesForRelationship(Notify.REL_FAILURE).forEach(flowFile -> flowFile.assertAttributeEquals(Notify.NOTIFIED_ATTRIBUTE_NAME, "false")); runner.clearTransferState(); final Signal signal = new WaitNotifyProtocol(service).getSignal("someDataProcessing");