NIFI-1420 Fixing bug where a FlowFile should route to failure when PutSplunk can't createa connection, defaulting PutSplunk to TCP

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Bryan Bende 2016-03-16 16:17:23 -04:00 committed by joewitt
parent a7b97419e5
commit b188b0abd6
3 changed files with 32 additions and 1 deletions

View File

@ -104,7 +104,7 @@ public abstract class AbstractPutEventProcessor extends AbstractSessionFactoryPr
.description("The protocol for communication.") .description("The protocol for communication.")
.required(true) .required(true)
.allowableValues(TCP_VALUE, UDP_VALUE) .allowableValues(TCP_VALUE, UDP_VALUE)
.defaultValue(UDP_VALUE.getValue()) .defaultValue(TCP_VALUE.getValue())
.build(); .build();
public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder() public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder()
.name("Message Delimiter") .name("Message Delimiter")

View File

@ -160,6 +160,7 @@ public class PutSplunk extends AbstractPutEventProcessor {
getLogger().error("No available connections, and unable to create a new one, transferring {} to failure", getLogger().error("No available connections, and unable to create a new one, transferring {} to failure",
new Object[]{flowFile}, e); new Object[]{flowFile}, e);
session.transfer(flowFile, REL_FAILURE); session.transfer(flowFile, REL_FAILURE);
session.commit();
context.yield(); context.yield();
return; return;
} }

View File

@ -53,6 +53,7 @@ public class TestPutSplunk {
@Test @Test
public void testUDPSendWholeFlowFile() { public void testUDPSendWholeFlowFile() {
runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.UDP_VALUE.getValue());
final String message = "This is one message, should send the whole FlowFile"; final String message = "This is one message, should send the whole FlowFile";
runner.enqueue(message); runner.enqueue(message);
@ -102,6 +103,8 @@ public class TestPutSplunk {
@Test @Test
public void testUDPSendDelimitedMessages() { public void testUDPSendDelimitedMessages() {
runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.UDP_VALUE.getValue());
final String delimiter = "DD"; final String delimiter = "DD";
runner.setProperty(PutSplunk.MESSAGE_DELIMITER, delimiter); runner.setProperty(PutSplunk.MESSAGE_DELIMITER, delimiter);
@ -286,6 +289,8 @@ public class TestPutSplunk {
@Test @Test
public void testCompletingPreviousBatchOnNextExecution() { public void testCompletingPreviousBatchOnNextExecution() {
runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.UDP_VALUE.getValue());
final String message = "This is one message, should send the whole FlowFile"; final String message = "This is one message, should send the whole FlowFile";
runner.enqueue(message); runner.enqueue(message);
@ -299,6 +304,30 @@ public class TestPutSplunk {
Assert.assertEquals(message, sender.getMessages().get(0)); Assert.assertEquals(message, sender.getMessages().get(0));
} }
@Test
public void testUnableToCreateConnectionShouldRouteToFailure() {
PutSplunk proc = new UnableToConnectPutSplunk();
runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutSplunk.PORT, "12345");
final String message = "This is one message, should send the whole FlowFile";
runner.enqueue(message);
runner.run();
runner.assertAllFlowFilesTransferred(PutSplunk.REL_FAILURE, 1);
}
/**
* Extend PutSplunk to use a CapturingChannelSender.
*/
private static class UnableToConnectPutSplunk extends PutSplunk {
@Override
protected ChannelSender createSender(String protocol, String host, int port, int timeout, int maxSendBufferSize, SSLContext sslContext) throws IOException {
throw new IOException("Unable to create connection");
}
}
/** /**
* Extend PutSplunk to use a CapturingChannelSender. * Extend PutSplunk to use a CapturingChannelSender.
*/ */
@ -316,6 +345,7 @@ public class TestPutSplunk {
} }
} }
/** /**
* A ChannelSender that captures each message that was sent. * A ChannelSender that captures each message that was sent.
*/ */