From b188b0abd6901f702c58737b616fa7cd1874c1df Mon Sep 17 00:00:00 2001 From: Bryan Bende Date: Wed, 16 Mar 2016 16:17:23 -0400 Subject: [PATCH] NIFI-1420 Fixing bug where a FlowFile should route to failure when PutSplunk can't createa connection, defaulting PutSplunk to TCP Signed-off-by: joewitt --- .../util/put/AbstractPutEventProcessor.java | 2 +- .../nifi/processors/splunk/PutSplunk.java | 1 + .../nifi/processors/splunk/TestPutSplunk.java | 30 +++++++++++++++++++ 3 files changed, 32 insertions(+), 1 deletion(-) diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java index c7313dc328..961edf5dc0 100644 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java @@ -104,7 +104,7 @@ public abstract class AbstractPutEventProcessor extends AbstractSessionFactoryPr .description("The protocol for communication.") .required(true) .allowableValues(TCP_VALUE, UDP_VALUE) - .defaultValue(UDP_VALUE.getValue()) + .defaultValue(TCP_VALUE.getValue()) .build(); public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder() .name("Message Delimiter") diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java index 482b85d94e..39c684322b 100644 --- a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java +++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java @@ -160,6 +160,7 @@ public class PutSplunk extends AbstractPutEventProcessor { getLogger().error("No available connections, and unable to create a new one, transferring {} to failure", new Object[]{flowFile}, e); session.transfer(flowFile, REL_FAILURE); + session.commit(); context.yield(); return; } diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestPutSplunk.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestPutSplunk.java index bb5537237f..d6bf3a1c71 100644 --- a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestPutSplunk.java +++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestPutSplunk.java @@ -53,6 +53,7 @@ public class TestPutSplunk { @Test public void testUDPSendWholeFlowFile() { + runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.UDP_VALUE.getValue()); final String message = "This is one message, should send the whole FlowFile"; runner.enqueue(message); @@ -102,6 +103,8 @@ public class TestPutSplunk { @Test public void testUDPSendDelimitedMessages() { + runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.UDP_VALUE.getValue()); + final String delimiter = "DD"; runner.setProperty(PutSplunk.MESSAGE_DELIMITER, delimiter); @@ -286,6 +289,8 @@ public class TestPutSplunk { @Test public void testCompletingPreviousBatchOnNextExecution() { + runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.UDP_VALUE.getValue()); + final String message = "This is one message, should send the whole FlowFile"; runner.enqueue(message); @@ -299,6 +304,30 @@ public class TestPutSplunk { Assert.assertEquals(message, sender.getMessages().get(0)); } + @Test + public void testUnableToCreateConnectionShouldRouteToFailure() { + PutSplunk proc = new UnableToConnectPutSplunk(); + runner = TestRunners.newTestRunner(proc); + runner.setProperty(PutSplunk.PORT, "12345"); + + final String message = "This is one message, should send the whole FlowFile"; + + runner.enqueue(message); + runner.run(); + runner.assertAllFlowFilesTransferred(PutSplunk.REL_FAILURE, 1); + } + + /** + * Extend PutSplunk to use a CapturingChannelSender. + */ + private static class UnableToConnectPutSplunk extends PutSplunk { + + @Override + protected ChannelSender createSender(String protocol, String host, int port, int timeout, int maxSendBufferSize, SSLContext sslContext) throws IOException { + throw new IOException("Unable to create connection"); + } + } + /** * Extend PutSplunk to use a CapturingChannelSender. */ @@ -316,6 +345,7 @@ public class TestPutSplunk { } } + /** * A ChannelSender that captures each message that was sent. */