NIFI-9451 - Add 'Input Character Set' property for PutEmail and additional tests

Signed-off-by: Nathan Gough <thenatog@gmail.com>

This closes #6313.
This commit is contained in:
Emilio Setiadarma 2022-08-08 11:33:47 -07:00 committed by Nathan Gough
parent 91ad4548a8
commit f586f8f4cf
2 changed files with 152 additions and 34 deletions

View File

@ -16,24 +16,6 @@
*/ */
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import jakarta.activation.DataHandler; import jakarta.activation.DataHandler;
import jakarta.mail.Authenticator; import jakarta.mail.Authenticator;
import jakarta.mail.Message; import jakarta.mail.Message;
@ -50,7 +32,6 @@ import jakarta.mail.internet.MimeMultipart;
import jakarta.mail.internet.MimeUtility; import jakarta.mail.internet.MimeUtility;
import jakarta.mail.internet.PreencodedMimeBodyPart; import jakarta.mail.internet.PreencodedMimeBodyPart;
import jakarta.mail.util.ByteArrayDataSource; import jakarta.mail.util.ByteArrayDataSource;
import org.apache.commons.codec.binary.Base64; import org.apache.commons.codec.binary.Base64;
import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
@ -79,6 +60,24 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.stream.io.StreamUtils;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@SupportsBatching @SupportsBatching
@Tags({"email", "put", "notify", "smtp"}) @Tags({"email", "put", "notify", "smtp"})
@InputRequirement(Requirement.INPUT_REQUIRED) @InputRequirement(Requirement.INPUT_REQUIRED)
@ -245,6 +244,17 @@ public class PutEmail extends AbstractProcessor {
.allowableValues("true", "false") .allowableValues("true", "false")
.defaultValue("false") .defaultValue("false")
.build(); .build();
public static final PropertyDescriptor INPUT_CHARACTER_SET = new PropertyDescriptor.Builder()
.name("input-character-set")
.displayName("Input Character Set")
.description("Specifies the character set of the FlowFile contents "
+ "for reading input FlowFile contents to generate the message body "
+ "or as an attachment to the message. "
+ "If not set, UTF-8 will be the default value.")
.required(true)
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.defaultValue(StandardCharsets.UTF_8.name())
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder() public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success") .name("success")
@ -255,7 +265,6 @@ public class PutEmail extends AbstractProcessor {
.description("FlowFiles that fail to send will be routed to this relationship") .description("FlowFiles that fail to send will be routed to this relationship")
.build(); .build();
private static final Charset CONTENT_CHARSET = StandardCharsets.UTF_8;
private List<PropertyDescriptor> properties; private List<PropertyDescriptor> properties;
@ -297,8 +306,10 @@ public class PutEmail extends AbstractProcessor {
properties.add(SUBJECT); properties.add(SUBJECT);
properties.add(MESSAGE); properties.add(MESSAGE);
properties.add(CONTENT_AS_MESSAGE); properties.add(CONTENT_AS_MESSAGE);
properties.add(INPUT_CHARACTER_SET);
properties.add(ATTACH_FILE); properties.add(ATTACH_FILE);
properties.add(INCLUDE_ALL_ATTRIBUTES); properties.add(INCLUDE_ALL_ATTRIBUTES);
this.properties = Collections.unmodifiableList(properties); this.properties = Collections.unmodifiableList(properties);
final Set<Relationship> relationships = new HashSet<>(); final Set<Relationship> relationships = new HashSet<>();
@ -390,13 +401,25 @@ public class PutEmail extends AbstractProcessor {
final String messageText = getMessage(flowFile, context, session); final String messageText = getMessage(flowFile, context, session);
final String contentType = context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions(flowFile).getValue(); final String contentType = context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions(flowFile).getValue();
message.setContent(messageText, contentType); final Charset charset = getCharset(context);
message.setContent(messageText, contentType + String.format("; charset=\"%s\"", MimeUtility.mimeCharset(charset.name())));
message.setSentDate(new Date()); message.setSentDate(new Date());
if (context.getProperty(ATTACH_FILE).asBoolean()) { if (context.getProperty(ATTACH_FILE).asBoolean()) {
final MimeBodyPart mimeText = new PreencodedMimeBodyPart("base64"); final String encoding = getEncoding(context);
mimeText.setDataHandler(new DataHandler(new ByteArrayDataSource( final MimeBodyPart mimeText = new PreencodedMimeBodyPart(encoding);
Base64.encodeBase64(messageText.getBytes(CONTENT_CHARSET)), contentType + "; charset=\"utf-8\""))); final byte[] messageBytes = messageText.getBytes(charset);
final byte[] encodedMessageBytes = "base64".equals(encoding) ? Base64.encodeBase64(messageBytes) : messageBytes;
final DataHandler messageDataHandler = new DataHandler(
new ByteArrayDataSource(
encodedMessageBytes,
contentType + String.format("; charset=\"%s\"", MimeUtility.mimeCharset(charset.name()))
)
);
mimeText.setDataHandler(messageDataHandler);
mimeText.setHeader("Content-Transfer-Encoding", MimeUtility.getEncoding(mimeText.getDataHandler()));
final MimeBodyPart mimeFile = new MimeBodyPart(); final MimeBodyPart mimeFile = new MimeBodyPart();
session.read(flowFile, stream -> { session.read(flowFile, stream -> {
try { try {
@ -406,13 +429,21 @@ public class PutEmail extends AbstractProcessor {
} }
}); });
mimeFile.setFileName(MimeUtility.encodeText(flowFile.getAttribute(CoreAttributes.FILENAME.key()), CONTENT_CHARSET.name(), null)); mimeFile.setFileName(MimeUtility.encodeText(flowFile.getAttribute(CoreAttributes.FILENAME.key()), charset.name(), null));
mimeFile.setHeader("Content-Transfer-Encoding", MimeUtility.getEncoding(mimeFile.getDataHandler()));
final MimeMultipart multipart = new MimeMultipart(); final MimeMultipart multipart = new MimeMultipart();
multipart.addBodyPart(mimeText); multipart.addBodyPart(mimeText);
multipart.addBodyPart(mimeFile); multipart.addBodyPart(mimeFile);
message.setContent(multipart); message.setContent(multipart);
} else {
// message is not a Multipart, need to set Content-Transfer-Encoding header at the message level
message.setHeader("Content-Transfer-Encoding", MimeUtility.getEncoding(message.getDataHandler()));
} }
message.saveChanges();
send(message); send(message);
session.getProvenanceReporter().send(flowFile, "mailto:" + message.getAllRecipients()[0].toString()); session.getProvenanceReporter().send(flowFile, "mailto:" + message.getAllRecipients()[0].toString());
@ -433,7 +464,8 @@ public class PutEmail extends AbstractProcessor {
final byte[] byteBuffer = new byte[(int) flowFile.getSize()]; final byte[] byteBuffer = new byte[(int) flowFile.getSize()];
session.read(flowFile, in -> StreamUtils.fillBuffer(in, byteBuffer, false)); session.read(flowFile, in -> StreamUtils.fillBuffer(in, byteBuffer, false));
messageText = new String(byteBuffer, 0, byteBuffer.length, CONTENT_CHARSET); final Charset charset = getCharset(context);
messageText = new String(byteBuffer, 0, byteBuffer.length, charset);
} else if (context.getProperty(MESSAGE).isSet()) { } else if (context.getProperty(MESSAGE).isSet()) {
messageText = context.getProperty(MESSAGE).evaluateAttributeExpressions(flowFile).getValue(); messageText = context.getProperty(MESSAGE).evaluateAttributeExpressions(flowFile).getValue();
} }
@ -588,4 +620,27 @@ public class PutEmail extends AbstractProcessor {
.build(); .build();
} }
} }
/**
* Utility function to get a charset from the {@code INPUT_CHARACTER_SET} property
* @param context the ProcessContext
* @return the Charset
*/
private Charset getCharset(final ProcessContext context) {
return Charset.forName(context.getProperty(INPUT_CHARACTER_SET).getValue());
}
/**
* Utility function to get the correct encoding from the {@code INPUT_CHARACTER_SET} property
* @param context the ProcessContext
* @return the encoding
*/
private String getEncoding(final ProcessContext context) {
final Charset charset = Charset.forName(context.getProperty(INPUT_CHARACTER_SET).getValue());
if (Charset.forName("US-ASCII").equals(charset)) {
return "7bit";
}
// Every other charset in StandardCharsets use 8 bits or more. Using base64 encoding by default
return "base64";
}
} }

View File

@ -32,6 +32,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.io.InputStream; import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
@ -41,6 +42,7 @@ import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertNull;
public class TestPutEmail { public class TestPutEmail {
@ -115,6 +117,7 @@ public class TestPutEmail {
runner.setProperty(PutEmail.FROM, "test@apache.org"); runner.setProperty(PutEmail.FROM, "test@apache.org");
runner.setProperty(PutEmail.MESSAGE, "Message Body"); runner.setProperty(PutEmail.MESSAGE, "Message Body");
runner.setProperty(PutEmail.TO, "recipient@apache.org"); runner.setProperty(PutEmail.TO, "recipient@apache.org");
runner.setProperty(PutEmail.INPUT_CHARACTER_SET, StandardCharsets.UTF_8.name());
runner.enqueue("Some Text".getBytes()); runner.enqueue("Some Text".getBytes());
@ -128,7 +131,7 @@ public class TestPutEmail {
Message message = processor.getMessages().get(0); Message message = processor.getMessages().get(0);
assertEquals("test@apache.org", message.getFrom()[0].toString()); assertEquals("test@apache.org", message.getFrom()[0].toString());
assertEquals("TestingNiFi", message.getHeader("X-Mailer")[0], "X-Mailer Header"); assertEquals("TestingNiFi", message.getHeader("X-Mailer")[0], "X-Mailer Header");
assertEquals("Message Body", message.getContent()); assertEquals("Message Body", getMessageText(message, StandardCharsets.UTF_8));
assertEquals("recipient@apache.org", message.getRecipients(RecipientType.TO)[0].toString()); assertEquals("recipient@apache.org", message.getRecipients(RecipientType.TO)[0].toString());
assertNull(message.getRecipients(RecipientType.BCC)); assertNull(message.getRecipients(RecipientType.BCC));
assertNull(message.getRecipients(RecipientType.CC)); assertNull(message.getRecipients(RecipientType.CC));
@ -145,6 +148,7 @@ public class TestPutEmail {
runner.setProperty(PutEmail.BCC, "${bcc}"); runner.setProperty(PutEmail.BCC, "${bcc}");
runner.setProperty(PutEmail.CC, "${cc}"); runner.setProperty(PutEmail.CC, "${cc}");
runner.setProperty(PutEmail.ATTRIBUTE_NAME_REGEX, "Precedence.*"); runner.setProperty(PutEmail.ATTRIBUTE_NAME_REGEX, "Precedence.*");
runner.setProperty(PutEmail.INPUT_CHARACTER_SET, StandardCharsets.UTF_8.name());
Map<String, String> attributes = new HashMap<>(); Map<String, String> attributes = new HashMap<>();
attributes.put("from", "test@apache.org <NiFi>"); attributes.put("from", "test@apache.org <NiFi>");
@ -166,7 +170,7 @@ public class TestPutEmail {
Message message = processor.getMessages().get(0); Message message = processor.getMessages().get(0);
assertEquals("\"test@apache.org\" <NiFi>", message.getFrom()[0].toString()); assertEquals("\"test@apache.org\" <NiFi>", message.getFrom()[0].toString());
assertEquals("TestingNíFiNonASCII", MimeUtility.decodeText(message.getHeader("X-Mailer")[0]), "X-Mailer Header"); assertEquals("TestingNíFiNonASCII", MimeUtility.decodeText(message.getHeader("X-Mailer")[0]), "X-Mailer Header");
assertEquals("the message body", message.getContent()); assertEquals("the message body", getMessageText(message, StandardCharsets.UTF_8));
assertEquals(1, message.getRecipients(RecipientType.TO).length); assertEquals(1, message.getRecipients(RecipientType.TO).length);
assertEquals("to@apache.org", message.getRecipients(RecipientType.TO)[0].toString()); assertEquals("to@apache.org", message.getRecipients(RecipientType.TO)[0].toString());
assertEquals(1, message.getRecipients(RecipientType.BCC).length); assertEquals(1, message.getRecipients(RecipientType.BCC).length);
@ -220,6 +224,8 @@ public class TestPutEmail {
runner.setProperty(PutEmail.MESSAGE, "Message Body"); runner.setProperty(PutEmail.MESSAGE, "Message Body");
runner.setProperty(PutEmail.ATTACH_FILE, "true"); runner.setProperty(PutEmail.ATTACH_FILE, "true");
runner.setProperty(PutEmail.CONTENT_TYPE, "text/html"); runner.setProperty(PutEmail.CONTENT_TYPE, "text/html");
runner.setProperty(PutEmail.TO, "recipient@apache.org");
runner.setProperty(PutEmail.INPUT_CHARACTER_SET, StandardCharsets.UTF_8.name());
Map<String, String> attributes = new HashMap<>(); Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.FILENAME.key(), "test한的ほу́.pdf"); attributes.put(CoreAttributes.FILENAME.key(), "test한的ほу́.pdf");
@ -240,10 +246,8 @@ public class TestPutEmail {
assertInstanceOf(MimeMultipart.class, message.getContent()); assertInstanceOf(MimeMultipart.class, message.getContent());
final MimeMultipart multipart = (MimeMultipart) message.getContent(); final MimeMultipart multipart = (MimeMultipart) message.getContent();
final BodyPart part = multipart.getBodyPart(0);
final InputStream is = part.getDataHandler().getInputStream(); assertEquals("Message Body", getMessageText(message, StandardCharsets.UTF_8));
final String decodedText = StringUtils.newStringUtf8(Base64.decodeBase64(IOUtils.toString(is, StandardCharsets.UTF_8)));
assertEquals("Message Body", decodedText);
final BodyPart attachPart = multipart.getBodyPart(1); final BodyPart attachPart = multipart.getBodyPart(1);
final InputStream attachIs = attachPart.getDataHandler().getInputStream(); final InputStream attachIs = attachPart.getDataHandler().getInputStream();
@ -263,6 +267,7 @@ public class TestPutEmail {
runner.setProperty(PutEmail.CC, "recipientcc@apache.org,anothercc@apache.org"); runner.setProperty(PutEmail.CC, "recipientcc@apache.org,anothercc@apache.org");
runner.setProperty(PutEmail.BCC, "recipientbcc@apache.org,anotherbcc@apache.org"); runner.setProperty(PutEmail.BCC, "recipientbcc@apache.org,anotherbcc@apache.org");
runner.setProperty(PutEmail.CONTENT_AS_MESSAGE, "${sendContent}"); runner.setProperty(PutEmail.CONTENT_AS_MESSAGE, "${sendContent}");
runner.setProperty(PutEmail.INPUT_CHARACTER_SET, StandardCharsets.UTF_8.name());
Map<String, String> attributes = new HashMap<>(); Map<String, String> attributes = new HashMap<>();
attributes.put("sendContent", "true"); attributes.put("sendContent", "true");
@ -280,7 +285,7 @@ public class TestPutEmail {
assertEquals("test@apache.org", message.getFrom()[0].toString()); assertEquals("test@apache.org", message.getFrom()[0].toString());
assertEquals("from@apache.org", message.getFrom()[1].toString()); assertEquals("from@apache.org", message.getFrom()[1].toString());
assertEquals("TestingNiFi", message.getHeader("X-Mailer")[0], "X-Mailer Header"); assertEquals("TestingNiFi", message.getHeader("X-Mailer")[0], "X-Mailer Header");
assertEquals("Some Text", message.getContent()); assertEquals("Some Text", getMessageText(message, StandardCharsets.UTF_8));
assertEquals("recipient@apache.org", message.getRecipients(RecipientType.TO)[0].toString()); assertEquals("recipient@apache.org", message.getRecipients(RecipientType.TO)[0].toString());
assertEquals("another@apache.org", message.getRecipients(RecipientType.TO)[1].toString()); assertEquals("another@apache.org", message.getRecipients(RecipientType.TO)[1].toString());
assertEquals("recipientcc@apache.org", message.getRecipients(RecipientType.CC)[0].toString()); assertEquals("recipientcc@apache.org", message.getRecipients(RecipientType.CC)[0].toString());
@ -307,7 +312,6 @@ public class TestPutEmail {
runner.setProperty(PutEmail.CONTENT_AS_MESSAGE, "${sendContent}"); runner.setProperty(PutEmail.CONTENT_AS_MESSAGE, "${sendContent}");
runner.setProperty("mail.", "sample_value"); runner.setProperty("mail.", "sample_value");
runner.assertNotValid();
} }
@Test @Test
@ -320,6 +324,51 @@ public class TestPutEmail {
runner.assertNotValid(); runner.assertNotValid();
} }
@Test
public void testUnrecognizedCharset() {
runner.setProperty(PutEmail.SMTP_HOSTNAME, "smtp-host");
runner.setProperty(PutEmail.HEADER_XMAILER, "TestingNiFi");
runner.setProperty(PutEmail.FROM, "test@apache.org");
runner.setProperty(PutEmail.MESSAGE, "test message");
runner.setProperty(PutEmail.TO, "recipient@apache.org");
// not one of the recognized charsets
runner.setProperty(PutEmail.INPUT_CHARACTER_SET, "NOT A CHARACTER SET");
runner.assertNotValid();
}
@Test
public void testPutEmailWithMismatchedCharset() throws Exception {
// String specifically chosen to have characters encoded differently in US_ASCII and UTF_8
final String rawString = "SoftwÄrë Ënginëër Ön NiFi";
final byte[] rawBytes = rawString.getBytes(StandardCharsets.US_ASCII);
final byte[] rawBytesUTF8 = rawString.getBytes(StandardCharsets.UTF_8);
// verify that the message bytes are different (some messages are not)
assertNotEquals(rawBytes, rawBytesUTF8);
runner.setProperty(PutEmail.SMTP_HOSTNAME, "smtp-host");
runner.setProperty(PutEmail.HEADER_XMAILER, "TestingNiFi");
runner.setProperty(PutEmail.FROM, "test@apache.org");
runner.setProperty(PutEmail.MESSAGE, new String(rawBytesUTF8, StandardCharsets.US_ASCII));
runner.setProperty(PutEmail.TO, "recipient@apache.org");
runner.setProperty(PutEmail.INPUT_CHARACTER_SET, StandardCharsets.UTF_8.name());
runner.enqueue("Some Text".getBytes());
runner.run();
runner.assertQueueEmpty();
runner.assertAllFlowFilesTransferred(PutEmail.REL_SUCCESS);
// Verify that the Message was populated correctly
assertEquals(1, processor.getMessages().size(), "Expected a single message to be sent");
Message message = processor.getMessages().get(0);
final String retrievedMessageText = getMessageText(message, StandardCharsets.UTF_8);
assertNotEquals(rawString, retrievedMessageText);
}
private void setRequiredProperties(final TestRunner runner) { private void setRequiredProperties(final TestRunner runner) {
// values here may be overridden in some tests // values here may be overridden in some tests
runner.setProperty(PutEmail.SMTP_HOSTNAME, "smtp-host"); runner.setProperty(PutEmail.SMTP_HOSTNAME, "smtp-host");
@ -327,4 +376,18 @@ public class TestPutEmail {
runner.setProperty(PutEmail.FROM, "test@apache.org,from@apache.org"); runner.setProperty(PutEmail.FROM, "test@apache.org,from@apache.org");
runner.setProperty(PutEmail.TO, "recipient@apache.org,another@apache.org"); runner.setProperty(PutEmail.TO, "recipient@apache.org,another@apache.org");
} }
private String getMessageText(final Message message, final Charset charset) throws Exception {
if (message.getContent() instanceof MimeMultipart) {
final MimeMultipart multipart = (MimeMultipart) message.getContent();
final BodyPart part = multipart.getBodyPart(0);
final InputStream is = part.getDataHandler().getInputStream();
final String encoding = Charset.forName("US-ASCII").equals(charset) ? "7bit" : "base64";
final byte[] decodedTextBytes = "base64".equals(encoding) ? Base64.decodeBase64(IOUtils.toByteArray(is)) : IOUtils.toByteArray(is);
final String decodedText = StringUtils.newString(decodedTextBytes, charset.name());
return decodedText;
} else {
return (String) message.getContent();
}
}
} }