NIFI-2560 This closes #851. fixed property handling and added tests using it

This commit is contained in:
joewitt 2016-08-14 14:35:13 -04:00
parent b7b56b1cad
commit 9c7668948c
2 changed files with 32 additions and 14 deletions

View File

@ -46,7 +46,6 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.FormatUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.support.StaticListableBeanFactory; import org.springframework.beans.factory.support.StaticListableBeanFactory;
@ -58,8 +57,7 @@ import org.springframework.util.StreamUtils;
* Base processor for implementing processors to consume messages from Email * Base processor for implementing processors to consume messages from Email
* servers using Spring Integration libraries. * servers using Spring Integration libraries.
* *
* @param <T> * @param <T> the type of {@link AbstractMailReceiver}.
* the type of {@link AbstractMailReceiver}.
*/ */
abstract class AbstractEmailProcessor<T extends AbstractMailReceiver> extends AbstractProcessor { abstract class AbstractEmailProcessor<T extends AbstractMailReceiver> extends AbstractProcessor {
@ -133,15 +131,14 @@ abstract class AbstractEmailProcessor<T extends AbstractMailReceiver> extends Ab
.defaultValue("30 sec") .defaultValue("30 sec")
.build(); .build();
static final Relationship REL_SUCCESS = new Relationship.Builder() static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success") .name("success")
.description("All messages that are the are successfully received from Email server and converted to FlowFiles are routed to this relationship") .description("All messages that are the are successfully received from Email server and converted to FlowFiles are routed to this relationship")
.build(); .build();
static List<PropertyDescriptor> SHARED_DESCRIPTORS = new ArrayList<>(); final static List<PropertyDescriptor> SHARED_DESCRIPTORS = new ArrayList<>();
static Set<Relationship> SHARED_RELATIONSHIPS = new HashSet<>(); final static Set<Relationship> SHARED_RELATIONSHIPS = new HashSet<>();
/* /*
* Will ensure that list of PropertyDescriptors is build only once, since * Will ensure that list of PropertyDescriptors is build only once, since
@ -219,8 +216,7 @@ abstract class AbstractEmailProcessor<T extends AbstractMailReceiver> extends Ab
* Delegates to sub-classes to build the target receiver as * Delegates to sub-classes to build the target receiver as
* {@link AbstractMailReceiver} * {@link AbstractMailReceiver}
* *
* @param context * @param context instance of {@link ProcessContext}
* instance of {@link ProcessContext}
* @return new instance of {@link AbstractMailReceiver} * @return new instance of {@link AbstractMailReceiver}
*/ */
protected abstract T buildMessageReceiver(ProcessContext context); protected abstract T buildMessageReceiver(ProcessContext context);
@ -299,8 +295,8 @@ abstract class AbstractEmailProcessor<T extends AbstractMailReceiver> extends Ab
/** /**
* Extracts dynamic properties which typically represent the Java Mail * Extracts dynamic properties which typically represent the Java Mail
* properties from the {@link ProcessContext} returning them as instance * properties from the {@link ProcessContext} returning them as instance of
* of {@link Properties} * {@link Properties}
*/ */
private Properties buildJavaMailProperties(ProcessContext context) { private Properties buildJavaMailProperties(ProcessContext context) {
Properties javaMailProperties = new Properties(); Properties javaMailProperties = new Properties();
@ -313,9 +309,8 @@ abstract class AbstractEmailProcessor<T extends AbstractMailReceiver> extends Ab
} }
} }
String propertyName = this.getProtocol(context).equals("pop3") ? "mail.pop3.timeout" : "mail.imap.timeout"; String propertyName = this.getProtocol(context).equals("pop3") ? "mail.pop3.timeout" : "mail.imap.timeout";
final String timeoutInMillis = String.valueOf(context.getProperty(CONNECTION_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS));
javaMailProperties.setProperty(propertyName, String.valueOf(FormatUtils javaMailProperties.setProperty(propertyName, timeoutInMillis);
.getTimeDuration(context.getProperty(CONNECTION_TIMEOUT).getValue().trim(), TimeUnit.MILLISECONDS)));
return javaMailProperties; return javaMailProperties;
} }

View File

@ -98,6 +98,7 @@ public class ConsumeEmailTest {
runner.setProperty(ConsumeIMAP.FOLDER, "MYBOX"); runner.setProperty(ConsumeIMAP.FOLDER, "MYBOX");
runner.setProperty(ConsumeIMAP.USE_SSL, "false"); runner.setProperty(ConsumeIMAP.USE_SSL, "false");
runner.setProperty(ConsumeIMAP.SHOULD_DELETE_MESSAGES, "false"); runner.setProperty(ConsumeIMAP.SHOULD_DELETE_MESSAGES, "false");
runner.setProperty(ConsumeIMAP.CONNECTION_TIMEOUT, "130 ms");
runner.run(2); runner.run(2);
flowFiles = runner.getFlowFilesForRelationship(ConsumeIMAP.REL_SUCCESS); flowFiles = runner.getFlowFilesForRelationship(ConsumeIMAP.REL_SUCCESS);
@ -108,11 +109,33 @@ public class ConsumeEmailTest {
ff.assertContentEquals("You've Got Mail - 1".getBytes(StandardCharsets.UTF_8)); ff.assertContentEquals("You've Got Mail - 1".getBytes(StandardCharsets.UTF_8));
} }
@Test
public void validateConsumeIMAPWithTimeout() throws Exception {
TestRunner runner = TestRunners.newTestRunner(new TestImapProcessor(1));
runner.setProperty(ConsumeIMAP.HOST, "foo.bar.com");
runner.setProperty(ConsumeIMAP.PORT, "1234");
runner.setProperty(ConsumeIMAP.USER, "jon");
runner.setProperty(ConsumeIMAP.PASSWORD, "qhgwjgehr");
runner.setProperty(ConsumeIMAP.FOLDER, "MYBOX");
runner.setProperty(ConsumeIMAP.USE_SSL, "false");
runner.setProperty(ConsumeIMAP.SHOULD_DELETE_MESSAGES, "false");
runner.setProperty(ConsumeIMAP.CONNECTION_TIMEOUT, "${random():mod(10):plus(1)} secs");
runner.run(1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeIMAP.REL_SUCCESS);
assertEquals(1, flowFiles.size());
MockFlowFile ff = flowFiles.get(0);
ff.assertContentEquals("You've Got Mail - 0".getBytes(StandardCharsets.UTF_8));
}
public static class TestImapProcessor extends ConsumeIMAP { public static class TestImapProcessor extends ConsumeIMAP {
private final int messagesToGenerate; private final int messagesToGenerate;
TestImapProcessor(int messagesToGenerate) { TestImapProcessor(int messagesToGenerate) {
this.messagesToGenerate = messagesToGenerate; this.messagesToGenerate = messagesToGenerate;
} }
@Override @Override
protected ImapMailReceiver buildMessageReceiver(ProcessContext processContext) { protected ImapMailReceiver buildMessageReceiver(ProcessContext processContext) {
ImapMailReceiver receiver = mock(ImapMailReceiver.class); ImapMailReceiver receiver = mock(ImapMailReceiver.class);