From 9c7668948c9a1541b9b317ab5352570910037d20 Mon Sep 17 00:00:00 2001 From: joewitt Date: Sun, 14 Aug 2016 14:35:13 -0400 Subject: [PATCH] NIFI-2560 This closes #851. fixed property handling and added tests using it --- .../email/AbstractEmailProcessor.java | 23 ++++++++----------- .../processors/email/ConsumeEmailTest.java | 23 +++++++++++++++++++ 2 files changed, 32 insertions(+), 14 deletions(-) diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/AbstractEmailProcessor.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/AbstractEmailProcessor.java index 292d42b1b1..62f4a88bb4 100644 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/AbstractEmailProcessor.java +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/AbstractEmailProcessor.java @@ -46,7 +46,6 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.util.FormatUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.support.StaticListableBeanFactory; @@ -58,8 +57,7 @@ import org.springframework.util.StreamUtils; * Base processor for implementing processors to consume messages from Email * servers using Spring Integration libraries. * - * @param - * the type of {@link AbstractMailReceiver}. + * @param the type of {@link AbstractMailReceiver}. */ abstract class AbstractEmailProcessor extends AbstractProcessor { @@ -133,15 +131,14 @@ abstract class AbstractEmailProcessor extends Ab .defaultValue("30 sec") .build(); - static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("All messages that are the are successfully received from Email server and converted to FlowFiles are routed to this relationship") .build(); - static List SHARED_DESCRIPTORS = new ArrayList<>(); + final static List SHARED_DESCRIPTORS = new ArrayList<>(); - static Set SHARED_RELATIONSHIPS = new HashSet<>(); + final static Set SHARED_RELATIONSHIPS = new HashSet<>(); /* * Will ensure that list of PropertyDescriptors is build only once, since @@ -219,8 +216,7 @@ abstract class AbstractEmailProcessor extends Ab * Delegates to sub-classes to build the target receiver as * {@link AbstractMailReceiver} * - * @param context - * instance of {@link ProcessContext} + * @param context instance of {@link ProcessContext} * @return new instance of {@link AbstractMailReceiver} */ protected abstract T buildMessageReceiver(ProcessContext context); @@ -299,8 +295,8 @@ abstract class AbstractEmailProcessor extends Ab /** * Extracts dynamic properties which typically represent the Java Mail - * properties from the {@link ProcessContext} returning them as instance - * of {@link Properties} + * properties from the {@link ProcessContext} returning them as instance of + * {@link Properties} */ private Properties buildJavaMailProperties(ProcessContext context) { Properties javaMailProperties = new Properties(); @@ -313,9 +309,8 @@ abstract class AbstractEmailProcessor extends Ab } } String propertyName = this.getProtocol(context).equals("pop3") ? "mail.pop3.timeout" : "mail.imap.timeout"; - - javaMailProperties.setProperty(propertyName, String.valueOf(FormatUtils - .getTimeDuration(context.getProperty(CONNECTION_TIMEOUT).getValue().trim(), TimeUnit.MILLISECONDS))); + final String timeoutInMillis = String.valueOf(context.getProperty(CONNECTION_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS)); + javaMailProperties.setProperty(propertyName, timeoutInMillis); return javaMailProperties; } @@ -377,7 +372,7 @@ abstract class AbstractEmailProcessor extends Ab } processSession.getProvenanceReporter().receive(flowFile, this.displayUrl, "Received message from " + fromAddressesString, executionDuration); - this.getLogger().info("Successfully received {} from {} in {} millis", new Object[] { flowFile, fromAddressesString, executionDuration }); + this.getLogger().info("Successfully received {} from {} in {} millis", new Object[]{flowFile, fromAddressesString, executionDuration}); processSession.transfer(flowFile, REL_SUCCESS); try { diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/ConsumeEmailTest.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/ConsumeEmailTest.java index 84a78f5601..dabd88176b 100644 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/ConsumeEmailTest.java +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/ConsumeEmailTest.java @@ -98,6 +98,7 @@ public class ConsumeEmailTest { runner.setProperty(ConsumeIMAP.FOLDER, "MYBOX"); runner.setProperty(ConsumeIMAP.USE_SSL, "false"); runner.setProperty(ConsumeIMAP.SHOULD_DELETE_MESSAGES, "false"); + runner.setProperty(ConsumeIMAP.CONNECTION_TIMEOUT, "130 ms"); runner.run(2); flowFiles = runner.getFlowFilesForRelationship(ConsumeIMAP.REL_SUCCESS); @@ -108,11 +109,33 @@ public class ConsumeEmailTest { ff.assertContentEquals("You've Got Mail - 1".getBytes(StandardCharsets.UTF_8)); } + @Test + public void validateConsumeIMAPWithTimeout() throws Exception { + TestRunner runner = TestRunners.newTestRunner(new TestImapProcessor(1)); + runner.setProperty(ConsumeIMAP.HOST, "foo.bar.com"); + runner.setProperty(ConsumeIMAP.PORT, "1234"); + runner.setProperty(ConsumeIMAP.USER, "jon"); + runner.setProperty(ConsumeIMAP.PASSWORD, "qhgwjgehr"); + runner.setProperty(ConsumeIMAP.FOLDER, "MYBOX"); + runner.setProperty(ConsumeIMAP.USE_SSL, "false"); + runner.setProperty(ConsumeIMAP.SHOULD_DELETE_MESSAGES, "false"); + runner.setProperty(ConsumeIMAP.CONNECTION_TIMEOUT, "${random():mod(10):plus(1)} secs"); + + runner.run(1); + List flowFiles = runner.getFlowFilesForRelationship(ConsumeIMAP.REL_SUCCESS); + assertEquals(1, flowFiles.size()); + MockFlowFile ff = flowFiles.get(0); + ff.assertContentEquals("You've Got Mail - 0".getBytes(StandardCharsets.UTF_8)); + } + public static class TestImapProcessor extends ConsumeIMAP { + private final int messagesToGenerate; + TestImapProcessor(int messagesToGenerate) { this.messagesToGenerate = messagesToGenerate; } + @Override protected ImapMailReceiver buildMessageReceiver(ProcessContext processContext) { ImapMailReceiver receiver = mock(ImapMailReceiver.class);