mirror of https://github.com/apache/nifi.git
NIFI-4017: Emit provenance event from Notify.
This closes #1890. Signed-off-by: Andy LoPresto <alopresto@apache.org>
This commit is contained in:
parent
f54e146561
commit
5c755c006b
|
@ -30,6 +30,7 @@ import org.apache.nifi.annotation.behavior.EventDriven;
|
||||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||||
|
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||||
import org.apache.nifi.annotation.documentation.Tags;
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
|
@ -53,10 +54,14 @@ import org.apache.nifi.processor.util.StandardValidators;
|
||||||
@CapabilityDescription("Caches a release signal identifier in the distributed cache, optionally along with "
|
@CapabilityDescription("Caches a release signal identifier in the distributed cache, optionally along with "
|
||||||
+ "the FlowFile's attributes. Any flow files held at a corresponding Wait processor will be "
|
+ "the FlowFile's attributes. Any flow files held at a corresponding Wait processor will be "
|
||||||
+ "released once this signal in the cache is discovered.")
|
+ "released once this signal in the cache is discovered.")
|
||||||
|
@WritesAttribute(attribute = "notified", description = "All FlowFiles will have an attribute 'notified'. The value of this " +
|
||||||
|
"attribute is true, is the FlowFile is notified, otherwise false.")
|
||||||
@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer",
|
@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer",
|
||||||
"org.apache.nifi.processors.standard.Wait"})
|
"org.apache.nifi.processors.standard.Wait"})
|
||||||
public class Notify extends AbstractProcessor {
|
public class Notify extends AbstractProcessor {
|
||||||
|
|
||||||
|
public static final String NOTIFIED_ATTRIBUTE_NAME = "notified";
|
||||||
|
|
||||||
// Identifies the distributed map cache client
|
// Identifies the distributed map cache client
|
||||||
public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
|
||||||
.name("distributed-cache-service")
|
.name("distributed-cache-service")
|
||||||
|
@ -210,7 +215,8 @@ public class Notify extends AbstractProcessor {
|
||||||
// if the computed value is null, or empty, we transfer the flow file to failure relationship
|
// if the computed value is null, or empty, we transfer the flow file to failure relationship
|
||||||
if (StringUtils.isBlank(signalId)) {
|
if (StringUtils.isBlank(signalId)) {
|
||||||
logger.error("FlowFile {} has no attribute for given Release Signal Identifier", new Object[] {flowFile});
|
logger.error("FlowFile {} has no attribute for given Release Signal Identifier", new Object[] {flowFile});
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
// set 'notified' attribute
|
||||||
|
session.transfer(session.putAttribute(flowFile, NOTIFIED_ATTRIBUTE_NAME, String.valueOf(false)), REL_FAILURE);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -226,7 +232,7 @@ public class Notify extends AbstractProcessor {
|
||||||
delta = Integer.parseInt(deltaStr);
|
delta = Integer.parseInt(deltaStr);
|
||||||
} catch (final NumberFormatException e) {
|
} catch (final NumberFormatException e) {
|
||||||
logger.error("Failed to calculate delta for FlowFile {} due to {}", new Object[] {flowFile, e}, e);
|
logger.error("Failed to calculate delta for FlowFile {} due to {}", new Object[] {flowFile, e}, e);
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
session.transfer(session.putAttribute(flowFile, NOTIFIED_ATTRIBUTE_NAME, String.valueOf(false)), REL_FAILURE);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -256,7 +262,8 @@ public class Notify extends AbstractProcessor {
|
||||||
// retry after yielding for a while.
|
// retry after yielding for a while.
|
||||||
try {
|
try {
|
||||||
protocol.notify(signalId, signalBuffer.deltas, signalBuffer.attributesToCache);
|
protocol.notify(signalId, signalBuffer.deltas, signalBuffer.attributesToCache);
|
||||||
session.transfer(signalBuffer.flowFiles, REL_SUCCESS);
|
signalBuffer.flowFiles.forEach(flowFile ->
|
||||||
|
session.transfer(session.putAttribute(flowFile, NOTIFIED_ATTRIBUTE_NAME, String.valueOf(true)), REL_SUCCESS));
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new RuntimeException(String.format("Unable to communicate with cache when processing %s due to %s", signalId, e), e);
|
throw new RuntimeException(String.format("Unable to communicate with cache when processing %s due to %s", signalId, e), e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -72,6 +72,7 @@ public class TestNotify {
|
||||||
runner.run();
|
runner.run();
|
||||||
|
|
||||||
runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 1);
|
runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 1);
|
||||||
|
runner.getFlowFilesForRelationship(Notify.REL_SUCCESS).get(0).assertAttributeEquals(Notify.NOTIFIED_ATTRIBUTE_NAME, "true");
|
||||||
runner.clearTransferState();
|
runner.clearTransferState();
|
||||||
|
|
||||||
final Signal signal = new WaitNotifyProtocol(service).getSignal("1");
|
final Signal signal = new WaitNotifyProtocol(service).getSignal("1");
|
||||||
|
@ -107,6 +108,7 @@ public class TestNotify {
|
||||||
runner.run(3);
|
runner.run(3);
|
||||||
|
|
||||||
runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 3);
|
runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 3);
|
||||||
|
runner.getFlowFilesForRelationship(Notify.REL_SUCCESS).forEach(flowFile -> flowFile.assertAttributeEquals(Notify.NOTIFIED_ATTRIBUTE_NAME, "true"));
|
||||||
runner.clearTransferState();
|
runner.clearTransferState();
|
||||||
|
|
||||||
final Signal signal = new WaitNotifyProtocol(service).getSignal("someDataProcessing");
|
final Signal signal = new WaitNotifyProtocol(service).getSignal("someDataProcessing");
|
||||||
|
@ -146,6 +148,7 @@ public class TestNotify {
|
||||||
|
|
||||||
// Limited by the buffer count
|
// Limited by the buffer count
|
||||||
runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 2);
|
runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 2);
|
||||||
|
runner.getFlowFilesForRelationship(Notify.REL_SUCCESS).forEach(flowFile -> flowFile.assertAttributeEquals(Notify.NOTIFIED_ATTRIBUTE_NAME, "true"));
|
||||||
runner.clearTransferState();
|
runner.clearTransferState();
|
||||||
|
|
||||||
Signal signal = new WaitNotifyProtocol(service).getSignal("someDataProcessing");
|
Signal signal = new WaitNotifyProtocol(service).getSignal("someDataProcessing");
|
||||||
|
@ -158,6 +161,7 @@ public class TestNotify {
|
||||||
// Run it again, and it should process remaining one flow file.
|
// Run it again, and it should process remaining one flow file.
|
||||||
runner.run();
|
runner.run();
|
||||||
runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 1);
|
runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 1);
|
||||||
|
runner.getFlowFilesForRelationship(Notify.REL_SUCCESS).forEach(flowFile -> flowFile.assertAttributeEquals(Notify.NOTIFIED_ATTRIBUTE_NAME, "true"));
|
||||||
runner.clearTransferState();
|
runner.clearTransferState();
|
||||||
|
|
||||||
signal = new WaitNotifyProtocol(service).getSignal("someDataProcessing");
|
signal = new WaitNotifyProtocol(service).getSignal("someDataProcessing");
|
||||||
|
@ -201,6 +205,7 @@ public class TestNotify {
|
||||||
runner.run();
|
runner.run();
|
||||||
|
|
||||||
runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 3);
|
runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 3);
|
||||||
|
runner.getFlowFilesForRelationship(Notify.REL_SUCCESS).forEach(flowFile -> flowFile.assertAttributeEquals(Notify.NOTIFIED_ATTRIBUTE_NAME, "true"));
|
||||||
runner.clearTransferState();
|
runner.clearTransferState();
|
||||||
|
|
||||||
final Signal signal = new WaitNotifyProtocol(service).getSignal("someDataProcessing");
|
final Signal signal = new WaitNotifyProtocol(service).getSignal("someDataProcessing");
|
||||||
|
@ -244,7 +249,9 @@ public class TestNotify {
|
||||||
|
|
||||||
// Only failed records should be transferred to failure.
|
// Only failed records should be transferred to failure.
|
||||||
runner.assertTransferCount(Notify.REL_SUCCESS, 2);
|
runner.assertTransferCount(Notify.REL_SUCCESS, 2);
|
||||||
|
runner.getFlowFilesForRelationship(Notify.REL_SUCCESS).forEach(flowFile -> flowFile.assertAttributeEquals(Notify.NOTIFIED_ATTRIBUTE_NAME, "true"));
|
||||||
runner.assertTransferCount(Notify.REL_FAILURE, 1);
|
runner.assertTransferCount(Notify.REL_FAILURE, 1);
|
||||||
|
runner.getFlowFilesForRelationship(Notify.REL_FAILURE).forEach(flowFile -> flowFile.assertAttributeEquals(Notify.NOTIFIED_ATTRIBUTE_NAME, "false"));
|
||||||
runner.clearTransferState();
|
runner.clearTransferState();
|
||||||
|
|
||||||
final Signal signal = new WaitNotifyProtocol(service).getSignal("someDataProcessing");
|
final Signal signal = new WaitNotifyProtocol(service).getSignal("someDataProcessing");
|
||||||
|
|
Loading…
Reference in New Issue