NIFI-1899 reworking bi-directional Email processing in ListenSMTP

This closes #483
This commit is contained in:
jpercivall 2016-07-16 18:29:41 -04:00
parent 4f672832c0
commit 4e224c283f
5 changed files with 218 additions and 181 deletions

View File

@ -20,7 +20,7 @@ import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocket;
import javax.net.ssl.SSLSocketFactory;
import java.io.IOException;
import java.io.OutputStream;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.ArrayList;
@ -44,7 +44,6 @@ import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
@ -64,7 +63,6 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.processors.email.smtp.event.SmtpEvent;
@ -356,87 +354,125 @@ public class ListenSMTP extends AbstractProcessor {
while (!incomingMessages.isEmpty()) {
SmtpEvent message = incomingMessages.poll();
if (message == null) {
return;
}
synchronized (message) {
if (resultCodeSetAndIsError(message)) {
SMTPResultCode resultCode = SMTPResultCode.fromCode(message.getReturnCode());
getLogger().warn("Message failed before onTrigger processing message was: " + resultCode.getLogMessage());
continue;
}
FlowFile flowfile = session.create();
try {
FlowFile flowfile = session.create();
if (message.getMessageData() != null) {
ByteArrayOutputStream messageData = message.getMessageData();
flowfile = session.write(flowfile, new OutputStreamCallback() {
if (message.getMessageData() != null) {
flowfile = session.write(flowfile, out -> {
InputStream inputStream = message.getMessageData();
byte [] buffer = new byte[1024];
// Write the messageData to flowfile content
@Override
public void process(OutputStream out) throws IOException {
out.write(messageData.toByteArray());
int rd;
long totalBytesRead =0;
while ((rd = inputStream.read(buffer, 0, buffer.length)) != -1 ) {
totalBytesRead += rd;
if (totalBytesRead > server.getMaxMessageSize() ) {
message.setReturnCode(500);
message.setProcessed();
break;
}
out.write(buffer, 0, rd);
}
out.flush();
});
} else {
getLogger().debug("Message body was null");
message.setReturnCode(SMTPResultCode.UNKNOWN_ERROR_CODE.getCode());
message.setProcessed();
}
if (!message.getProcessed()) {
HashMap<String, String> attributes = new HashMap<>();
// Gather message attributes
attributes.put(SMTP_HELO, message.getHelo());
attributes.put(SMTP_SRC_IP, message.getHelo());
attributes.put(SMTP_FROM, message.getFrom());
attributes.put(SMTP_TO, message.getTo());
List<Map<String, String>> details = message.getCertifcateDetails();
int c = 0;
// Add a selection of each X509 certificates to the already gathered attributes
for (Map<String, String> detail : details) {
attributes.put("smtp.certificate." + c + ".serial", detail.getOrDefault("SerialNumber", null));
attributes.put("smtp.certificate." + c + ".subjectName", detail.getOrDefault("SubjectName", null));
c++;
}
});
// Set Mime-Type
attributes.put(CoreAttributes.MIME_TYPE.key(), MIME_TYPE);
// Add the attributes. to flowfile
flowfile = session.putAllAttributes(flowfile, attributes);
session.getProvenanceReporter().receive(flowfile, "smtp://" + SMTP_HOSTNAME + ":" + SMTP_PORT + "/");
session.transfer(flowfile, REL_SUCCESS);
getLogger().info("Transferring {} to success", new Object[]{flowfile});
}
} catch (Exception e) {
message.setProcessed();
message.setReturnCode(SMTPResultCode.UNEXPECTED_ERROR.getCode());
}
HashMap<String, String> attributes = new HashMap<>();
// Gather message attributes
attributes.put(SMTP_HELO, message.getHelo());
attributes.put(SMTP_SRC_IP, message.getHelo());
attributes.put(SMTP_FROM, message.getFrom());
attributes.put(SMTP_TO, message.getTo());
List<Map<String, String>> details = message.getCertifcateDetails();
int c = 0;
// Add a selection of each X509 certificates to the already gathered attributes
for (Map<String, String> detail : details) {
attributes.put("smtp.certificate." + c + ".serial", detail.getOrDefault("SerialNumber", null));
attributes.put("smtp.certificate." + c + ".subjectName", detail.getOrDefault("SubjectName", null));
c++;
// Check to see if it failed when creating the FlowFile
if (resultCodeSetAndIsError(message)) {
session.rollback();
SMTPResultCode resultCode = SMTPResultCode.fromCode(message.getReturnCode());
getLogger().warn("Failed to received message due to: " + resultCode.getLogMessage());
message.notifyAll();
continue;
}
// Set Mime-Type
attributes.put(CoreAttributes.MIME_TYPE.key(), MIME_TYPE);
// Add the attributes. to flowfile
flowfile = session.putAllAttributes(flowfile, attributes);
session.getProvenanceReporter().receive(flowfile, "smtp://" + SMTP_HOSTNAME + ":" + SMTP_PORT + "/");
session.transfer(flowfile, REL_SUCCESS);
getLogger().info("Transferring {} to success", new Object[]{flowfile});
// Finished processing,
message.setProcessed();
// update the latch so data() can process the rest of the method
message.updateProcessedLatch();
// notify on the message so data() can process the rest of the method
message.notifyAll();
// End of synchronized block
}
// Wait for SMTPMessageHandler data() and done() to complete
// their side of the work (i.e. acknowledgement)
while (!message.getAcknowledged()) {
// Busy wait
// Wait for data() to tell sender we received the message and double check we didn't timeout
final long serverTimeout = context.getProperty(SMTP_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
try {
message.wait(serverTimeout);
} catch (InterruptedException e) {
getLogger().info("Interrupted while waiting for Message Handler to acknowledge message.");
}
// Lock one last time
synchronized (message) {
SMTPResultCode resultCode = SMTPResultCode.fromCode(message.getReturnCode());
switch (resultCode) {
case UNEXPECTED_ERROR:
case TIMEOUT_ERROR:
session.rollback();
getLogger().warn(resultCode.getLogMessage());
case SUCCESS:
getLogger().info(resultCode.getLogMessage());
break;
default:
getLogger().error(resultCode.getLogMessage());
// Check to see if the sender was correctly notified
if (resultCodeSetAndIsError(message)) {
SMTPResultCode resultCode = SMTPResultCode.fromCode(message.getReturnCode());
session.rollback();
getLogger().warn("Failed to received message due to: " + resultCode.getLogMessage());
} else {
// Need to commit because if we didn't and a following message needed to be rolled back, this message would be too, causing data loss.
session.commit();
}
}
}
}
private boolean resultCodeSetAndIsError(SmtpEvent message){
if (message.getReturnCode() != null ) {
SMTPResultCode resultCode = SMTPResultCode.fromCode(message.getReturnCode());
if (resultCode.isError()) {
return true;
}
}
return false;
}
// Same old... same old... used for testing to access the random port that was selected
protected int getPort() {
return server == null ? 0 : server.getPort();

View File

@ -18,14 +18,13 @@
package org.apache.nifi.processors.email.smtp.event;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -38,19 +37,15 @@ public class SmtpEvent{
private final String helo;
private final String from;
private final String to;
private final ByteArrayOutputStream messageData;
private final InputStream messageData;
private List<Map<String, String>> certificatesDetails;
private AtomicBoolean processed = new AtomicBoolean(false);
private AtomicBoolean acknowledged = new AtomicBoolean(false);
private AtomicInteger returnCode = new AtomicInteger();
private CountDownLatch processedLatch;
public SmtpEvent(
final String remoteIP, final String helo, final String from, final String to, final X509Certificate[] certificates,
final ByteArrayOutputStream messageData,
CountDownLatch processedLatch) {
this.processedLatch = processedLatch;
final InputStream messageData) {
this.remoteIP = remoteIP;
this.helo = helo;
@ -86,7 +81,7 @@ public class SmtpEvent{
return helo;
}
public synchronized ByteArrayOutputStream getMessageData() {
public synchronized InputStream getMessageData() {
return messageData;
}
@ -118,15 +113,11 @@ public class SmtpEvent{
return this.acknowledged.get();
}
public synchronized void updateProcessedLatch() {
this.processedLatch.countDown();
}
public synchronized void setReturnCode(int code) {
this.returnCode.set(code);
}
public synchronized int getReturnCode() {
public synchronized Integer getReturnCode() {
return this.returnCode.get();
}

View File

@ -20,12 +20,10 @@ package org.apache.nifi.processors.email.smtp.handler;
import java.io.IOException;
import java.io.InputStream;
import java.security.cert.X509Certificate;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.util.StopWatch;
import org.subethamail.smtp.DropConnectionException;
import org.subethamail.smtp.MessageContext;
@ -33,7 +31,6 @@ import org.subethamail.smtp.MessageHandler;
import org.subethamail.smtp.MessageHandlerFactory;
import org.subethamail.smtp.RejectException;
import org.subethamail.smtp.TooMuchDataException;
import org.subethamail.smtp.server.SMTPServer;
import org.apache.nifi.processors.email.smtp.event.SmtpEvent;
@ -57,13 +54,9 @@ public class SMTPMessageHandlerFactory implements MessageHandlerFactory {
final MessageContext messageContext;
String from;
String recipient;
ByteArrayOutputStream messageData;
private CountDownLatch latch;
public Handler(MessageContext messageContext, LinkedBlockingQueue<SmtpEvent> incomingMessages, ComponentLog logger){
this.messageContext = messageContext;
this.latch = new CountDownLatch(1);
}
@Override
@ -82,31 +75,9 @@ public class SMTPMessageHandlerFactory implements MessageHandlerFactory {
public void data(InputStream inputStream) throws RejectException, TooMuchDataException, IOException {
// Start counting the timer...
StopWatch watch = new StopWatch(true);
long elapsed;
SMTPServer server = messageContext.getSMTPServer();
final long serverTimeout = TimeUnit.MILLISECONDS.convert(messageContext.getSMTPServer().getConnectionTimeout(), TimeUnit.MILLISECONDS);
this.messageData = new ByteArrayOutputStream();
byte [] buffer = new byte[1024];
int rd;
while ((rd = inputStream.read(buffer, 0, buffer.length)) != -1 ) {
messageData.write(buffer, 0, rd);
if (messageData.getBufferLength() > server.getMaxMessageSize() ) {
// NOTE: Setting processed at this stage is not desirable as message object will only be created
// if this test (i.e. message size) passes.
final SMTPResultCode returnCode = SMTPResultCode.fromCode(500);
logger.warn(returnCode.getLogMessage());
throw new TooMuchDataException(returnCode.getErrorMessage());
}
}
messageData.flush();
X509Certificate[] certificates = new X509Certificate[]{};
final String remoteIP = messageContext.getRemoteAddress().toString();
@ -116,67 +87,79 @@ public class SMTPMessageHandlerFactory implements MessageHandlerFactory {
certificates = (X509Certificate[]) messageContext.getTlsPeerCertificates().clone();
}
SmtpEvent message = new SmtpEvent(remoteIP, helo, from, recipient, certificates, messageData, latch);
SmtpEvent message = new SmtpEvent(remoteIP, helo, from, recipient, certificates, inputStream);
// / Try to queue the message back to the NiFi session
try {
synchronized (message) {
// / Try to queue the message back to the NiFi session
try {
elapsed = watch.getElapsed(TimeUnit.MILLISECONDS);
incomingMessages.offer(message, serverTimeout - elapsed, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
final SMTPResultCode returnCode = SMTPResultCode.fromCode(421);
logger.trace(returnCode.getLogMessage());
// NOTE: Setting acknowledged at this stage is redundant as this catch deals with the inability of
// adding message to the processing queue. Yet, for the sake of consistency the message is
// updated nonetheless
message.setReturnCode(returnCode.getCode());
message.setAcknowledged();
throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage());
}
// Once message has been sent to the queue, it should be processed by NiFi onTrigger,
// a flowfile created and its processed status updated before an acknowledgment is
// given back to the SMTP client
elapsed = watch.getElapsed(TimeUnit.MILLISECONDS);
incomingMessages.offer(message, serverTimeout - elapsed, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
final SMTPResultCode returnCode = SMTPResultCode.fromCode(421);
logger.trace(returnCode.getLogMessage());
try {
message.wait(serverTimeout - elapsed);
} catch (InterruptedException e) {
// Interrupted while waiting for the message to process. Will return error and request onTrigger to rollback
logger.trace("Interrupted while waiting for processor to process data. Returned error to SMTP client as precautionary measure");
incomingMessages.remove(message);
// NOTE: Setting processed at this stage is redundant as this catch deals with the inability of
// adding message to the processing queue. Yet, for the sake of consistency the message is
// updated nonetheless
message.setReturnCode(returnCode.getCode());
message.setAcknowledged();
throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage());
// Set the final values so onTrigger can figure out what happened to message
final SMTPResultCode returnCode = SMTPResultCode.fromCode(423);
message.setReturnCode(returnCode.getCode());
message.setAcknowledged();
// Inform client
throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage());
}
// Check if message is processed
if (!message.getProcessed()) {
incomingMessages.remove(message);
final SMTPResultCode returnCode = SMTPResultCode.fromCode(451);
logger.trace("Did not receive the onTrigger response within the acceptable timeframe.");
// Set the final values so onTrigger can figure out what happened to message
message.setReturnCode(returnCode.getCode());
message.setAcknowledged();
throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage());
} else if(message.getReturnCode() != null) {
// No need to check if over server timeout because we already processed the data. Might as well use the status code returned by onTrigger.
final SMTPResultCode returnCode = SMTPResultCode.fromCode(message.getReturnCode());
if(returnCode.isError()){
message.setAcknowledged();
throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage());
}
} else {
// onTrigger successfully processed the data.
// No need to check if over server timeout because we already processed the data. Might as well finalize it.
// Set the final values so onTrigger can figure out what happened to message
message.setReturnCode(250);
message.setAcknowledged();
}
// Exit, allowing Handler to acknowledge the message
message.notifyAll();
}
// Once message has been sent to the queue, it should be processed by NiFi onTrigger,
// a flowfile created and its processed status updated before an acknowledgment is
// given back to the SMTP client
elapsed = watch.getElapsed(TimeUnit.MILLISECONDS);
try {
latch.await(serverTimeout - elapsed, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// Latch open unexpectedly. Will return error and requestonTrigger to rollback
logger.trace("Latch opened unexpectedly and processor indicates data wasn't processed. Returned error to SMTP client as precautionary measure");
incomingMessages.remove(message);
// Set the final values so onTrigger can figure out what happened to message
final SMTPResultCode returnCode = SMTPResultCode.fromCode(423);
message.setReturnCode(returnCode.getCode());
message.setAcknowledged();
// Inform client
throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage());
}
// Remove the message from the queue.
incomingMessages.remove(message);
// Check if message is processed and if yes, check if it was received on time and wraps it up.
elapsed = watch.getElapsed(TimeUnit.MILLISECONDS);
if (!message.getProcessed() || (elapsed >= serverTimeout)) {
final SMTPResultCode returnCode = SMTPResultCode.fromCode(451);
logger.trace("Did not receive the onTrigger response within the acceptable timeframes. Data duplication may have occurred.");
// Set the final values so onTrigger can figure out what happened to message
message.setReturnCode(returnCode.getCode());
message.setAcknowledged();
throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage());
}
// Set the final values so onTrigger can figure out what happened to message
message.setReturnCode(250);
message.setAcknowledged();
// Exit, allowing Handler to acknowledge the message
}
}
@Override
public void done() {
logger.trace("Called the last method of message handler. Exiting");
// Notifying the ontrigger that the message was handled.
}
}
}

View File

@ -79,5 +79,15 @@ public enum SMTPResultCode {
return (smtpResultCode == null) ? UNKNOWN_ERROR_CODE : smtpResultCode;
}
public boolean isError(){
switch (this) {
case MESSAGE_TOO_LARGE:
case UNEXPECTED_ERROR:
case QUEUE_ERROR:
case TIMEOUT_ERROR:
return true;
default:
return false;
}
}
}

View File

@ -32,6 +32,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
public class TestListenSMTP {
@ -251,7 +252,7 @@ public class TestListenSMTP {
listenSmtp.startShutdown();
}
@Test(timeout=15000, expected=EmailException.class)
@Test(timeout=15000)
public void emailTooLarge() throws Exception {
ListenSMTP listenSmtp = new ListenSMTP();
final TestRunner runner = TestRunners.newTestRunner(listenSmtp);
@ -272,31 +273,47 @@ public class TestListenSMTP {
listenSmtp.initializeSMTPServer(context);
final int port = listenSmtp.getPort();
Email email = new SimpleEmail();
email.setHostName("127.0.0.1");
email.setSmtpPort(port);
email.setStartTLSEnabled(false);
email.setFrom("alice@nifi.apache.org");
email.setSubject("This is a test");
email.setMsg("Test test test chocolate");
email.addTo("bob@nifi.apache.org");
email.send();
Thread.sleep(100);
// process the request.
listenSmtp.onTrigger(context, processSessionFactory);
runner.assertTransferCount(ListenSMTP.REL_SUCCESS, 0);
runner.assertQueueEmpty();
AtomicBoolean finished = new AtomicBoolean(false);;
AtomicBoolean failed = new AtomicBoolean(false);
try {
listenSmtp.startShutdown();
} catch (InterruptedException e) {
e.printStackTrace();
Assert.assertFalse(e.toString(), true);
final Thread clientThread = new Thread(new Runnable() {
@Override
public void run() {
try {
Email email = new SimpleEmail();
email.setHostName("127.0.0.1");
email.setSmtpPort(port);
email.setStartTLSEnabled(false);
email.setFrom("alice@nifi.apache.org");
email.setSubject("This is a test");
email.setMsg("Test test test chocolate");
email.addTo("bob@nifi.apache.org");
email.send();
} catch (final EmailException t) {
failed.set(true);
}
finished.set(true);
}
});
clientThread.start();
while (!finished.get()) {
// process the request.
listenSmtp.onTrigger(context, processSessionFactory);
Thread.sleep(10);
}
clientThread.stop();
Assert.assertTrue("Sending email succeeded when it should have failed", failed.get());
runner.assertTransferCount(ListenSMTP.REL_SUCCESS, 0);
runner.assertQueueEmpty();
} finally {
// shut down the server
listenSmtp.startShutdown();
}
}
}