From 4e224c283f84e25c08ad4371f6ab2c323c02dd25 Mon Sep 17 00:00:00 2001 From: jpercivall Date: Sat, 16 Jul 2016 18:29:41 -0400 Subject: [PATCH] NIFI-1899 reworking bi-directional Email processing in ListenSMTP This closes #483 --- .../nifi/processors/email/ListenSMTP.java | 156 +++++++++++------- .../email/smtp/event/SmtpEvent.java | 19 +-- .../handler/SMTPMessageHandlerFactory.java | 147 ++++++++--------- .../email/smtp/handler/SMTPResultCode.java | 12 +- .../nifi/processors/email/TestListenSMTP.java | 65 +++++--- 5 files changed, 218 insertions(+), 181 deletions(-) diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java index 51b1d2d3fb..0f17838837 100644 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java @@ -20,7 +20,7 @@ import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSocket; import javax.net.ssl.SSLSocketFactory; import java.io.IOException; -import java.io.OutputStream; +import java.io.InputStream; import java.net.InetSocketAddress; import java.net.Socket; import java.util.ArrayList; @@ -44,7 +44,6 @@ import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.annotation.lifecycle.OnUnscheduled; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.DataUnit; -import org.apache.nifi.stream.io.ByteArrayOutputStream; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; @@ -64,7 +63,6 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.processors.email.smtp.event.SmtpEvent; @@ -356,87 +354,125 @@ public class ListenSMTP extends AbstractProcessor { while (!incomingMessages.isEmpty()) { SmtpEvent message = incomingMessages.poll(); - if (message == null) { return; } synchronized (message) { + if (resultCodeSetAndIsError(message)) { + SMTPResultCode resultCode = SMTPResultCode.fromCode(message.getReturnCode()); + getLogger().warn("Message failed before onTrigger processing message was: " + resultCode.getLogMessage()); + continue; + } - FlowFile flowfile = session.create(); + try { + FlowFile flowfile = session.create(); - if (message.getMessageData() != null) { - ByteArrayOutputStream messageData = message.getMessageData(); - flowfile = session.write(flowfile, new OutputStreamCallback() { + if (message.getMessageData() != null) { + flowfile = session.write(flowfile, out -> { + InputStream inputStream = message.getMessageData(); + byte [] buffer = new byte[1024]; - // Write the messageData to flowfile content - @Override - public void process(OutputStream out) throws IOException { - out.write(messageData.toByteArray()); + int rd; + long totalBytesRead =0; + + while ((rd = inputStream.read(buffer, 0, buffer.length)) != -1 ) { + totalBytesRead += rd; + if (totalBytesRead > server.getMaxMessageSize() ) { + message.setReturnCode(500); + message.setProcessed(); + break; + } + out.write(buffer, 0, rd); + } + out.flush(); + }); + } else { + getLogger().debug("Message body was null"); + message.setReturnCode(SMTPResultCode.UNKNOWN_ERROR_CODE.getCode()); + message.setProcessed(); + } + + if (!message.getProcessed()) { + HashMap attributes = new HashMap<>(); + // Gather message attributes + attributes.put(SMTP_HELO, message.getHelo()); + attributes.put(SMTP_SRC_IP, message.getHelo()); + attributes.put(SMTP_FROM, message.getFrom()); + attributes.put(SMTP_TO, message.getTo()); + + List> details = message.getCertifcateDetails(); + int c = 0; + + // Add a selection of each X509 certificates to the already gathered attributes + + for (Map detail : details) { + attributes.put("smtp.certificate." + c + ".serial", detail.getOrDefault("SerialNumber", null)); + attributes.put("smtp.certificate." + c + ".subjectName", detail.getOrDefault("SubjectName", null)); + c++; } - }); + + // Set Mime-Type + attributes.put(CoreAttributes.MIME_TYPE.key(), MIME_TYPE); + + // Add the attributes. to flowfile + flowfile = session.putAllAttributes(flowfile, attributes); + session.getProvenanceReporter().receive(flowfile, "smtp://" + SMTP_HOSTNAME + ":" + SMTP_PORT + "/"); + session.transfer(flowfile, REL_SUCCESS); + + getLogger().info("Transferring {} to success", new Object[]{flowfile}); + } + } catch (Exception e) { + message.setProcessed(); + message.setReturnCode(SMTPResultCode.UNEXPECTED_ERROR.getCode()); } - HashMap attributes = new HashMap<>(); - // Gather message attributes - attributes.put(SMTP_HELO, message.getHelo()); - attributes.put(SMTP_SRC_IP, message.getHelo()); - attributes.put(SMTP_FROM, message.getFrom()); - attributes.put(SMTP_TO, message.getTo()); - - List> details = message.getCertifcateDetails(); - int c = 0; - - // Add a selection of each X509 certificates to the already gathered attributes - - for (Map detail : details) { - attributes.put("smtp.certificate." + c + ".serial", detail.getOrDefault("SerialNumber", null)); - attributes.put("smtp.certificate." + c + ".subjectName", detail.getOrDefault("SubjectName", null)); - c++; + // Check to see if it failed when creating the FlowFile + if (resultCodeSetAndIsError(message)) { + session.rollback(); + SMTPResultCode resultCode = SMTPResultCode.fromCode(message.getReturnCode()); + getLogger().warn("Failed to received message due to: " + resultCode.getLogMessage()); + message.notifyAll(); + continue; } - // Set Mime-Type - attributes.put(CoreAttributes.MIME_TYPE.key(), MIME_TYPE); - - // Add the attributes. to flowfile - flowfile = session.putAllAttributes(flowfile, attributes); - session.getProvenanceReporter().receive(flowfile, "smtp://" + SMTP_HOSTNAME + ":" + SMTP_PORT + "/"); - session.transfer(flowfile, REL_SUCCESS); - getLogger().info("Transferring {} to success", new Object[]{flowfile}); - // Finished processing, message.setProcessed(); - // update the latch so data() can process the rest of the method - message.updateProcessedLatch(); + // notify on the message so data() can process the rest of the method + message.notifyAll(); - // End of synchronized block - } - - // Wait for SMTPMessageHandler data() and done() to complete - // their side of the work (i.e. acknowledgement) - while (!message.getAcknowledged()) { - // Busy wait + // Wait for data() to tell sender we received the message and double check we didn't timeout + final long serverTimeout = context.getProperty(SMTP_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS); + try { + message.wait(serverTimeout); + } catch (InterruptedException e) { + getLogger().info("Interrupted while waiting for Message Handler to acknowledge message."); } - // Lock one last time - synchronized (message) { - SMTPResultCode resultCode = SMTPResultCode.fromCode(message.getReturnCode()); - switch (resultCode) { - case UNEXPECTED_ERROR: - case TIMEOUT_ERROR: - session.rollback(); - getLogger().warn(resultCode.getLogMessage()); - case SUCCESS: - getLogger().info(resultCode.getLogMessage()); - break; - default: - getLogger().error(resultCode.getLogMessage()); + // Check to see if the sender was correctly notified + if (resultCodeSetAndIsError(message)) { + SMTPResultCode resultCode = SMTPResultCode.fromCode(message.getReturnCode()); + session.rollback(); + getLogger().warn("Failed to received message due to: " + resultCode.getLogMessage()); + } else { + // Need to commit because if we didn't and a following message needed to be rolled back, this message would be too, causing data loss. + session.commit(); } } } } + private boolean resultCodeSetAndIsError(SmtpEvent message){ + if (message.getReturnCode() != null ) { + SMTPResultCode resultCode = SMTPResultCode.fromCode(message.getReturnCode()); + if (resultCode.isError()) { + return true; + } + } + return false; + } + // Same old... same old... used for testing to access the random port that was selected protected int getPort() { return server == null ? 0 : server.getPort(); diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/event/SmtpEvent.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/event/SmtpEvent.java index eaded4a86f..e1c36c53a3 100644 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/event/SmtpEvent.java +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/event/SmtpEvent.java @@ -18,14 +18,13 @@ package org.apache.nifi.processors.email.smtp.event; -import org.apache.nifi.stream.io.ByteArrayOutputStream; +import java.io.InputStream; import java.security.cert.X509Certificate; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -38,19 +37,15 @@ public class SmtpEvent{ private final String helo; private final String from; private final String to; - private final ByteArrayOutputStream messageData; + private final InputStream messageData; private List> certificatesDetails; private AtomicBoolean processed = new AtomicBoolean(false); private AtomicBoolean acknowledged = new AtomicBoolean(false); private AtomicInteger returnCode = new AtomicInteger(); - private CountDownLatch processedLatch; public SmtpEvent( final String remoteIP, final String helo, final String from, final String to, final X509Certificate[] certificates, - final ByteArrayOutputStream messageData, - CountDownLatch processedLatch) { - - this.processedLatch = processedLatch; + final InputStream messageData) { this.remoteIP = remoteIP; this.helo = helo; @@ -86,7 +81,7 @@ public class SmtpEvent{ return helo; } - public synchronized ByteArrayOutputStream getMessageData() { + public synchronized InputStream getMessageData() { return messageData; } @@ -118,15 +113,11 @@ public class SmtpEvent{ return this.acknowledged.get(); } - public synchronized void updateProcessedLatch() { - this.processedLatch.countDown(); - } - public synchronized void setReturnCode(int code) { this.returnCode.set(code); } - public synchronized int getReturnCode() { + public synchronized Integer getReturnCode() { return this.returnCode.get(); } diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPMessageHandlerFactory.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPMessageHandlerFactory.java index 0ac4127c0c..6b647bf872 100644 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPMessageHandlerFactory.java +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPMessageHandlerFactory.java @@ -20,12 +20,10 @@ package org.apache.nifi.processors.email.smtp.handler; import java.io.IOException; import java.io.InputStream; import java.security.cert.X509Certificate; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.stream.io.ByteArrayOutputStream; import org.apache.nifi.util.StopWatch; import org.subethamail.smtp.DropConnectionException; import org.subethamail.smtp.MessageContext; @@ -33,7 +31,6 @@ import org.subethamail.smtp.MessageHandler; import org.subethamail.smtp.MessageHandlerFactory; import org.subethamail.smtp.RejectException; import org.subethamail.smtp.TooMuchDataException; -import org.subethamail.smtp.server.SMTPServer; import org.apache.nifi.processors.email.smtp.event.SmtpEvent; @@ -57,13 +54,9 @@ public class SMTPMessageHandlerFactory implements MessageHandlerFactory { final MessageContext messageContext; String from; String recipient; - ByteArrayOutputStream messageData; - - private CountDownLatch latch; public Handler(MessageContext messageContext, LinkedBlockingQueue incomingMessages, ComponentLog logger){ this.messageContext = messageContext; - this.latch = new CountDownLatch(1); } @Override @@ -82,31 +75,9 @@ public class SMTPMessageHandlerFactory implements MessageHandlerFactory { public void data(InputStream inputStream) throws RejectException, TooMuchDataException, IOException { // Start counting the timer... StopWatch watch = new StopWatch(true); - long elapsed; - - SMTPServer server = messageContext.getSMTPServer(); - final long serverTimeout = TimeUnit.MILLISECONDS.convert(messageContext.getSMTPServer().getConnectionTimeout(), TimeUnit.MILLISECONDS); - this.messageData = new ByteArrayOutputStream(); - - byte [] buffer = new byte[1024]; - - int rd; - - while ((rd = inputStream.read(buffer, 0, buffer.length)) != -1 ) { - messageData.write(buffer, 0, rd); - if (messageData.getBufferLength() > server.getMaxMessageSize() ) { - // NOTE: Setting processed at this stage is not desirable as message object will only be created - // if this test (i.e. message size) passes. - final SMTPResultCode returnCode = SMTPResultCode.fromCode(500); - logger.warn(returnCode.getLogMessage()); - throw new TooMuchDataException(returnCode.getErrorMessage()); - } - } - messageData.flush(); - X509Certificate[] certificates = new X509Certificate[]{}; final String remoteIP = messageContext.getRemoteAddress().toString(); @@ -116,67 +87,79 @@ public class SMTPMessageHandlerFactory implements MessageHandlerFactory { certificates = (X509Certificate[]) messageContext.getTlsPeerCertificates().clone(); } - SmtpEvent message = new SmtpEvent(remoteIP, helo, from, recipient, certificates, messageData, latch); + SmtpEvent message = new SmtpEvent(remoteIP, helo, from, recipient, certificates, inputStream); - // / Try to queue the message back to the NiFi session - try { + synchronized (message) { + // / Try to queue the message back to the NiFi session + try { + elapsed = watch.getElapsed(TimeUnit.MILLISECONDS); + incomingMessages.offer(message, serverTimeout - elapsed, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + final SMTPResultCode returnCode = SMTPResultCode.fromCode(421); + logger.trace(returnCode.getLogMessage()); + + // NOTE: Setting acknowledged at this stage is redundant as this catch deals with the inability of + // adding message to the processing queue. Yet, for the sake of consistency the message is + // updated nonetheless + message.setReturnCode(returnCode.getCode()); + message.setAcknowledged(); + throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage()); + } + + // Once message has been sent to the queue, it should be processed by NiFi onTrigger, + // a flowfile created and its processed status updated before an acknowledgment is + // given back to the SMTP client elapsed = watch.getElapsed(TimeUnit.MILLISECONDS); - incomingMessages.offer(message, serverTimeout - elapsed, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - final SMTPResultCode returnCode = SMTPResultCode.fromCode(421); - logger.trace(returnCode.getLogMessage()); + try { + message.wait(serverTimeout - elapsed); + } catch (InterruptedException e) { + // Interrupted while waiting for the message to process. Will return error and request onTrigger to rollback + logger.trace("Interrupted while waiting for processor to process data. Returned error to SMTP client as precautionary measure"); + incomingMessages.remove(message); - // NOTE: Setting processed at this stage is redundant as this catch deals with the inability of - // adding message to the processing queue. Yet, for the sake of consistency the message is - // updated nonetheless - message.setReturnCode(returnCode.getCode()); - message.setAcknowledged(); - throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage()); + // Set the final values so onTrigger can figure out what happened to message + final SMTPResultCode returnCode = SMTPResultCode.fromCode(423); + message.setReturnCode(returnCode.getCode()); + message.setAcknowledged(); + + // Inform client + throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage()); + } + + // Check if message is processed + if (!message.getProcessed()) { + incomingMessages.remove(message); + final SMTPResultCode returnCode = SMTPResultCode.fromCode(451); + logger.trace("Did not receive the onTrigger response within the acceptable timeframe."); + + // Set the final values so onTrigger can figure out what happened to message + message.setReturnCode(returnCode.getCode()); + message.setAcknowledged(); + throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage()); + } else if(message.getReturnCode() != null) { + // No need to check if over server timeout because we already processed the data. Might as well use the status code returned by onTrigger. + final SMTPResultCode returnCode = SMTPResultCode.fromCode(message.getReturnCode()); + + if(returnCode.isError()){ + message.setAcknowledged(); + throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage()); + } + } else { + // onTrigger successfully processed the data. + // No need to check if over server timeout because we already processed the data. Might as well finalize it. + // Set the final values so onTrigger can figure out what happened to message + message.setReturnCode(250); + message.setAcknowledged(); + } + // Exit, allowing Handler to acknowledge the message + message.notifyAll(); } - - // Once message has been sent to the queue, it should be processed by NiFi onTrigger, - // a flowfile created and its processed status updated before an acknowledgment is - // given back to the SMTP client - elapsed = watch.getElapsed(TimeUnit.MILLISECONDS); - try { - latch.await(serverTimeout - elapsed, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - // Latch open unexpectedly. Will return error and requestonTrigger to rollback - logger.trace("Latch opened unexpectedly and processor indicates data wasn't processed. Returned error to SMTP client as precautionary measure"); - incomingMessages.remove(message); - - // Set the final values so onTrigger can figure out what happened to message - final SMTPResultCode returnCode = SMTPResultCode.fromCode(423); - message.setReturnCode(returnCode.getCode()); - message.setAcknowledged(); - - // Inform client - throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage()); - } - - // Remove the message from the queue. - incomingMessages.remove(message); - // Check if message is processed and if yes, check if it was received on time and wraps it up. - elapsed = watch.getElapsed(TimeUnit.MILLISECONDS); - if (!message.getProcessed() || (elapsed >= serverTimeout)) { - final SMTPResultCode returnCode = SMTPResultCode.fromCode(451); - logger.trace("Did not receive the onTrigger response within the acceptable timeframes. Data duplication may have occurred."); - - // Set the final values so onTrigger can figure out what happened to message - message.setReturnCode(returnCode.getCode()); - message.setAcknowledged(); - throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage()); - } - - // Set the final values so onTrigger can figure out what happened to message - message.setReturnCode(250); - message.setAcknowledged(); - // Exit, allowing Handler to acknowledge the message - } + } @Override public void done() { logger.trace("Called the last method of message handler. Exiting"); + // Notifying the ontrigger that the message was handled. } } } diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPResultCode.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPResultCode.java index 5328a0d7ff..b9c0d60e8e 100644 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPResultCode.java +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPResultCode.java @@ -79,5 +79,15 @@ public enum SMTPResultCode { return (smtpResultCode == null) ? UNKNOWN_ERROR_CODE : smtpResultCode; } - + public boolean isError(){ + switch (this) { + case MESSAGE_TOO_LARGE: + case UNEXPECTED_ERROR: + case QUEUE_ERROR: + case TIMEOUT_ERROR: + return true; + default: + return false; + } + } } diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java index 983ac4a8b4..1fd7628732 100644 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java @@ -32,6 +32,7 @@ import org.junit.Assert; import org.junit.Test; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; public class TestListenSMTP { @@ -251,7 +252,7 @@ public class TestListenSMTP { listenSmtp.startShutdown(); } - @Test(timeout=15000, expected=EmailException.class) + @Test(timeout=15000) public void emailTooLarge() throws Exception { ListenSMTP listenSmtp = new ListenSMTP(); final TestRunner runner = TestRunners.newTestRunner(listenSmtp); @@ -272,31 +273,47 @@ public class TestListenSMTP { listenSmtp.initializeSMTPServer(context); final int port = listenSmtp.getPort(); - - Email email = new SimpleEmail(); - email.setHostName("127.0.0.1"); - email.setSmtpPort(port); - email.setStartTLSEnabled(false); - email.setFrom("alice@nifi.apache.org"); - email.setSubject("This is a test"); - email.setMsg("Test test test chocolate"); - email.addTo("bob@nifi.apache.org"); - email.send(); - - Thread.sleep(100); - - - // process the request. - listenSmtp.onTrigger(context, processSessionFactory); - - runner.assertTransferCount(ListenSMTP.REL_SUCCESS, 0); - runner.assertQueueEmpty(); + AtomicBoolean finished = new AtomicBoolean(false);; + AtomicBoolean failed = new AtomicBoolean(false); try { - listenSmtp.startShutdown(); - } catch (InterruptedException e) { - e.printStackTrace(); - Assert.assertFalse(e.toString(), true); + final Thread clientThread = new Thread(new Runnable() { + @Override + public void run() { + try { + Email email = new SimpleEmail(); + email.setHostName("127.0.0.1"); + email.setSmtpPort(port); + email.setStartTLSEnabled(false); + email.setFrom("alice@nifi.apache.org"); + email.setSubject("This is a test"); + email.setMsg("Test test test chocolate"); + email.addTo("bob@nifi.apache.org"); + email.send(); + + } catch (final EmailException t) { + failed.set(true); + } + finished.set(true); + } + }); + clientThread.start(); + + while (!finished.get()) { + // process the request. + listenSmtp.onTrigger(context, processSessionFactory); + Thread.sleep(10); + } + clientThread.stop(); + + Assert.assertTrue("Sending email succeeded when it should have failed", failed.get()); + + runner.assertTransferCount(ListenSMTP.REL_SUCCESS, 0); + + runner.assertQueueEmpty(); + } finally { + // shut down the server + listenSmtp.startShutdown(); } } } \ No newline at end of file