diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml index d4571840db..43181fde14 100644 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml @@ -78,6 +78,12 @@ nifi-mock test + + com.icegreen + greenmail + 1.5.2 + test + diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/AbstractEmailProcessor.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/AbstractEmailProcessor.java index 7e6193dc97..dbcdd0ef92 100644 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/AbstractEmailProcessor.java +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/AbstractEmailProcessor.java @@ -17,7 +17,6 @@ package org.apache.nifi.processors.email; import java.io.IOException; -import java.io.OutputStream; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; import java.util.ArrayList; @@ -44,14 +43,12 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.support.StaticListableBeanFactory; import org.springframework.integration.mail.AbstractMailReceiver; import org.springframework.util.Assert; -import org.springframework.util.StreamUtils; /** * Base processor for implementing processors to consume messages from Email @@ -348,14 +345,11 @@ abstract class AbstractEmailProcessor extends Ab long start = System.nanoTime(); FlowFile flowFile = processSession.create(); - flowFile = processSession.append(flowFile, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - try { - StreamUtils.copy(emailMessage.getInputStream(), out); - } catch (MessagingException e) { - throw new IOException(e); - } + flowFile = processSession.append(flowFile, out -> { + try { + emailMessage.writeTo(out); + } catch (MessagingException e) { + throw new IOException(e); } }); diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/ConsumeEmailTest.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/ConsumeEmailTest.java deleted file mode 100644 index dabd88176b..0000000000 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/ConsumeEmailTest.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.email; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.io.ByteArrayInputStream; -import java.lang.reflect.Field; -import java.nio.charset.StandardCharsets; -import java.util.List; - -import javax.mail.Message; - -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.Test; -import org.springframework.integration.mail.AbstractMailReceiver; -import org.springframework.integration.mail.ImapMailReceiver; - -public class ConsumeEmailTest { - - @Test - public void validateProtocol() { - AbstractEmailProcessor consume = new ConsumeIMAP(); - TestRunner runner = TestRunners.newTestRunner(consume); - runner.setProperty(ConsumeIMAP.USE_SSL, "false"); - - assertEquals("imap", consume.getProtocol(runner.getProcessContext())); - - runner = TestRunners.newTestRunner(consume); - runner.setProperty(ConsumeIMAP.USE_SSL, "true"); - - assertEquals("imaps", consume.getProtocol(runner.getProcessContext())); - - consume = new ConsumePOP3(); - - assertEquals("pop3", consume.getProtocol(runner.getProcessContext())); - } - - @Test - public void validateUrl() throws Exception { - Field displayUrlField = AbstractEmailProcessor.class.getDeclaredField("displayUrl"); - displayUrlField.setAccessible(true); - - AbstractEmailProcessor consume = new ConsumeIMAP(); - TestRunner runner = TestRunners.newTestRunner(consume); - runner.setProperty(ConsumeIMAP.HOST, "foo.bar.com"); - runner.setProperty(ConsumeIMAP.PORT, "1234"); - runner.setProperty(ConsumeIMAP.USER, "jon"); - runner.setProperty(ConsumeIMAP.PASSWORD, "qhgwjgehr"); - runner.setProperty(ConsumeIMAP.FOLDER, "MYBOX"); - runner.setProperty(ConsumeIMAP.USE_SSL, "false"); - - assertEquals("imap://jon:qhgwjgehr@foo.bar.com:1234/MYBOX", consume.buildUrl(runner.getProcessContext())); - assertEquals("imap://jon:[password]@foo.bar.com:1234/MYBOX", displayUrlField.get(consume)); - } - - @Test - public void validateConsumeIMAP() throws Exception { - TestRunner runner = TestRunners.newTestRunner(new TestImapProcessor(0)); - runner.setProperty(ConsumeIMAP.HOST, "foo.bar.com"); - runner.setProperty(ConsumeIMAP.PORT, "1234"); - runner.setProperty(ConsumeIMAP.USER, "jon"); - runner.setProperty(ConsumeIMAP.PASSWORD, "qhgwjgehr"); - runner.setProperty(ConsumeIMAP.FOLDER, "MYBOX"); - runner.setProperty(ConsumeIMAP.USE_SSL, "false"); - runner.setProperty(ConsumeIMAP.SHOULD_DELETE_MESSAGES, "false"); - - runner.run(); - List flowFiles = runner.getFlowFilesForRelationship(ConsumeIMAP.REL_SUCCESS); - assertTrue(flowFiles.isEmpty()); - - runner = TestRunners.newTestRunner(new TestImapProcessor(2)); - runner.setProperty(ConsumeIMAP.HOST, "foo.bar.com"); - runner.setProperty(ConsumeIMAP.PORT, "1234"); - runner.setProperty(ConsumeIMAP.USER, "jon"); - runner.setProperty(ConsumeIMAP.PASSWORD, "qhgwjgehr"); - runner.setProperty(ConsumeIMAP.FOLDER, "MYBOX"); - runner.setProperty(ConsumeIMAP.USE_SSL, "false"); - runner.setProperty(ConsumeIMAP.SHOULD_DELETE_MESSAGES, "false"); - runner.setProperty(ConsumeIMAP.CONNECTION_TIMEOUT, "130 ms"); - - runner.run(2); - flowFiles = runner.getFlowFilesForRelationship(ConsumeIMAP.REL_SUCCESS); - assertTrue(flowFiles.size() == 2); - MockFlowFile ff = flowFiles.get(0); - ff.assertContentEquals("You've Got Mail - 0".getBytes(StandardCharsets.UTF_8)); - ff = flowFiles.get(1); - ff.assertContentEquals("You've Got Mail - 1".getBytes(StandardCharsets.UTF_8)); - } - - @Test - public void validateConsumeIMAPWithTimeout() throws Exception { - TestRunner runner = TestRunners.newTestRunner(new TestImapProcessor(1)); - runner.setProperty(ConsumeIMAP.HOST, "foo.bar.com"); - runner.setProperty(ConsumeIMAP.PORT, "1234"); - runner.setProperty(ConsumeIMAP.USER, "jon"); - runner.setProperty(ConsumeIMAP.PASSWORD, "qhgwjgehr"); - runner.setProperty(ConsumeIMAP.FOLDER, "MYBOX"); - runner.setProperty(ConsumeIMAP.USE_SSL, "false"); - runner.setProperty(ConsumeIMAP.SHOULD_DELETE_MESSAGES, "false"); - runner.setProperty(ConsumeIMAP.CONNECTION_TIMEOUT, "${random():mod(10):plus(1)} secs"); - - runner.run(1); - List flowFiles = runner.getFlowFilesForRelationship(ConsumeIMAP.REL_SUCCESS); - assertEquals(1, flowFiles.size()); - MockFlowFile ff = flowFiles.get(0); - ff.assertContentEquals("You've Got Mail - 0".getBytes(StandardCharsets.UTF_8)); - } - - public static class TestImapProcessor extends ConsumeIMAP { - - private final int messagesToGenerate; - - TestImapProcessor(int messagesToGenerate) { - this.messagesToGenerate = messagesToGenerate; - } - - @Override - protected ImapMailReceiver buildMessageReceiver(ProcessContext processContext) { - ImapMailReceiver receiver = mock(ImapMailReceiver.class); - try { - Message[] messages = new Message[this.messagesToGenerate]; - for (int i = 0; i < this.messagesToGenerate; i++) { - Message message = mock(Message.class); - when(message.getInputStream()).thenReturn( - new ByteArrayInputStream(("You've Got Mail - " + i).getBytes(StandardCharsets.UTF_8))); - messages[i] = message; - } - when(receiver.receive()).thenReturn(messages); - } catch (Exception e) { - e.printStackTrace(); - fail(); - } - return receiver; - } - } -} diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestConsumeEmail.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestConsumeEmail.java new file mode 100644 index 0000000000..726920057a --- /dev/null +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestConsumeEmail.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.email; + +import com.icegreen.greenmail.user.GreenMailUser; +import com.icegreen.greenmail.util.GreenMail; +import com.icegreen.greenmail.util.ServerSetupTest; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.Assert; +import org.springframework.integration.mail.AbstractMailReceiver; + +import javax.mail.Message; +import javax.mail.MessagingException; +import javax.mail.Session; +import javax.mail.internet.InternetAddress; +import javax.mail.internet.MimeMessage; +import java.lang.reflect.Field; +import java.util.List; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; + + +public class TestConsumeEmail { + + private GreenMail mockIMAP4Server; + private GreenMail mockPOP3Server; + private GreenMailUser imapUser; + private GreenMailUser popUser; + + // Setup mock imap server + @Before + public void setUp() { + mockIMAP4Server = new GreenMail(ServerSetupTest.IMAP); + mockIMAP4Server.start(); + mockPOP3Server = new GreenMail(ServerSetupTest.POP3); + mockPOP3Server.start(); + + imapUser = mockIMAP4Server.setUser("test@nifi.org", "nifiUserImap", "nifiPassword"); + popUser = mockPOP3Server.setUser("test@nifi.org", "nifiUserPop", "nifiPassword"); + } + + @After + public void cleanUp() { + mockIMAP4Server.stop(); + mockPOP3Server.stop(); + } + + public void addMessage(String testName, GreenMailUser user) throws MessagingException { + Properties prop = new Properties(); + Session session = Session.getDefaultInstance(prop); + MimeMessage message = new MimeMessage(session); + message.setFrom(new InternetAddress("alice@nifi.org")); + message.addRecipient(Message.RecipientType.TO, new InternetAddress("test@nifi.org")); + message.setSubject("Test email" + testName); + message.setText("test test test chocolate"); + user.deliver(message); + } + + // Start the testing units + @Test + public void testConsumeIMAP4() throws Exception { + + final TestRunner runner = TestRunners.newTestRunner(new ConsumeIMAP()); + runner.setProperty(ConsumeIMAP.HOST, ServerSetupTest.IMAP.getBindAddress()); + runner.setProperty(ConsumeIMAP.PORT, String.valueOf(ServerSetupTest.IMAP.getPort())); + runner.setProperty(ConsumeIMAP.USER, "nifiUserImap"); + runner.setProperty(ConsumeIMAP.PASSWORD, "nifiPassword"); + runner.setProperty(ConsumeIMAP.FOLDER, "INBOX"); + runner.setProperty(ConsumeIMAP.USE_SSL, "false"); + + addMessage("testConsumeImap1", imapUser); + addMessage("testConsumeImap2", imapUser); + + runner.run(); + + runner.assertTransferCount(ConsumeIMAP.REL_SUCCESS, 2); + final List messages = runner.getFlowFilesForRelationship(ConsumeIMAP.REL_SUCCESS); + String result = new String(runner.getContentAsByteArray(messages.get(0))); + + // Verify body + Assert.assertTrue(result.contains("test test test chocolate")); + + // Verify sender + Assert.assertTrue(result.contains("alice@nifi.org")); + + // Verify subject + Assert.assertTrue(result.contains("testConsumeImap1")); + + } + + @Test + public void testConsumePOP3() throws Exception { + + final TestRunner runner = TestRunners.newTestRunner(new ConsumePOP3()); + runner.setProperty(ConsumeIMAP.HOST, ServerSetupTest.POP3.getBindAddress()); + runner.setProperty(ConsumeIMAP.PORT, String.valueOf(ServerSetupTest.POP3.getPort())); + runner.setProperty(ConsumeIMAP.USER, "nifiUserPop"); + runner.setProperty(ConsumeIMAP.PASSWORD, "nifiPassword"); + runner.setProperty(ConsumeIMAP.FOLDER, "INBOX"); + runner.setProperty(ConsumeIMAP.USE_SSL, "false"); + + addMessage("testConsumePop1", popUser); + addMessage("testConsumePop2", popUser); + + runner.run(); + + runner.assertTransferCount(ConsumePOP3.REL_SUCCESS, 2); + final List messages = runner.getFlowFilesForRelationship(ConsumePOP3.REL_SUCCESS); + String result = new String(runner.getContentAsByteArray(messages.get(0))); + + // Verify body + Assert.assertTrue(result.contains("test test test chocolate")); + + // Verify sender + Assert.assertTrue(result.contains("alice@nifi.org")); + + // Verify subject + Assert.assertTrue(result.contains("Pop1")); + + } + + @Test + public void validateProtocol() { + AbstractEmailProcessor consume = new ConsumeIMAP(); + TestRunner runner = TestRunners.newTestRunner(consume); + runner.setProperty(ConsumeIMAP.USE_SSL, "false"); + + assertEquals("imap", consume.getProtocol(runner.getProcessContext())); + + runner = TestRunners.newTestRunner(consume); + runner.setProperty(ConsumeIMAP.USE_SSL, "true"); + + assertEquals("imaps", consume.getProtocol(runner.getProcessContext())); + + consume = new ConsumePOP3(); + + assertEquals("pop3", consume.getProtocol(runner.getProcessContext())); + } + + @Test + public void validateUrl() throws Exception { + Field displayUrlField = AbstractEmailProcessor.class.getDeclaredField("displayUrl"); + displayUrlField.setAccessible(true); + + AbstractEmailProcessor consume = new ConsumeIMAP(); + TestRunner runner = TestRunners.newTestRunner(consume); + runner.setProperty(ConsumeIMAP.HOST, "foo.bar.com"); + runner.setProperty(ConsumeIMAP.PORT, "1234"); + runner.setProperty(ConsumeIMAP.USER, "jon"); + runner.setProperty(ConsumeIMAP.PASSWORD, "qhgwjgehr"); + runner.setProperty(ConsumeIMAP.FOLDER, "MYBOX"); + runner.setProperty(ConsumeIMAP.USE_SSL, "false"); + + assertEquals("imap://jon:qhgwjgehr@foo.bar.com:1234/MYBOX", consume.buildUrl(runner.getProcessContext())); + assertEquals("imap://jon:[password]@foo.bar.com:1234/MYBOX", displayUrlField.get(consume)); + } + + +}