NIFI-9758 Added Dynamic Properties to PutEmail

This closes #6204

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Emilio Setiadarma 2022-07-12 16:48:37 -07:00 committed by exceptionfactory
parent 047b3611bf
commit cb0753b03c
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
2 changed files with 110 additions and 17 deletions

View File

@ -31,6 +31,7 @@ import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import jakarta.activation.DataHandler; import jakarta.activation.DataHandler;
@ -51,9 +52,11 @@ import jakarta.mail.internet.PreencodedMimeBodyPart;
import jakarta.mail.util.ByteArrayDataSource; import jakarta.mail.util.ByteArrayDataSource;
import org.apache.commons.codec.binary.Base64; import org.apache.commons.codec.binary.Base64;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.SupportsSensitiveDynamicProperties;
import org.apache.nifi.annotation.behavior.SystemResource; import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration; import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -62,6 +65,7 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
@ -79,10 +83,18 @@ import org.apache.nifi.stream.io.StreamUtils;
@Tags({"email", "put", "notify", "smtp"}) @Tags({"email", "put", "notify", "smtp"})
@InputRequirement(Requirement.INPUT_REQUIRED) @InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Sends an e-mail to configured recipients for each incoming FlowFile") @CapabilityDescription("Sends an e-mail to configured recipients for each incoming FlowFile")
@SupportsSensitiveDynamicProperties
@DynamicProperty(name = "mail.propertyName",
value = "Value for a specific property to be set in the JavaMail Session object",
description = "Dynamic property names that will be passed to the Mail session. " +
"Possible properties can be found in: https://javaee.github.io/javamail/docs/api/com/sun/mail/smtp/package-summary.html.",
expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "The entirety of the FlowFile's content (as a String object) " @SystemResourceConsideration(resource = SystemResource.MEMORY, description = "The entirety of the FlowFile's content (as a String object) "
+ "will be read into memory in case the property to use the flow file content as the email body is set to true.") + "will be read into memory in case the property to use the flow file content as the email body is set to true.")
public class PutEmail extends AbstractProcessor { public class PutEmail extends AbstractProcessor {
private static final Pattern MAIL_PROPERTY_PATTERN = Pattern.compile("^mail\\.smtps?\\.([a-z0-9\\.]+)$");
public static final PropertyDescriptor SMTP_HOSTNAME = new PropertyDescriptor.Builder() public static final PropertyDescriptor SMTP_HOSTNAME = new PropertyDescriptor.Builder()
.name("SMTP Hostname") .name("SMTP Hostname")
.description("The hostname of the SMTP host") .description("The hostname of the SMTP host")
@ -150,8 +162,8 @@ public class PutEmail extends AbstractProcessor {
.name("attribute-name-regex") .name("attribute-name-regex")
.displayName("Attributes to Send as Headers (Regex)") .displayName("Attributes to Send as Headers (Regex)")
.description("A Regular Expression that is matched against all FlowFile attribute names. " .description("A Regular Expression that is matched against all FlowFile attribute names. "
+ "Any attribute whose name matches the regex will be added to the Email messages as a Header. " + "Any attribute whose name matches the regex will be added to the Email messages as a Header. "
+ "If not specified, no FlowFile attributes will be added as headers.") + "If not specified, no FlowFile attributes will be added as headers.")
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.required(false) .required(false)
.build(); .build();
@ -305,6 +317,18 @@ public class PutEmail extends AbstractProcessor {
return properties; return properties;
} }
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.description("SMTP property passed to the Mail Session")
.required(false)
.dynamic(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(new DynamicMailPropertyValidator())
.build();
}
@Override @Override
protected Collection<ValidationResult> customValidate(final ValidationContext context) { protected Collection<ValidationResult> customValidate(final ValidationContext context) {
final List<ValidationResult> errors = new ArrayList<>(super.customValidate(context)); final List<ValidationResult> errors = new ArrayList<>(super.customValidate(context));
@ -464,6 +488,16 @@ public class PutEmail extends AbstractProcessor {
} }
} }
for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
if (descriptor.isDynamic()) {
final String mailPropertyValue = context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue();
// Nullable values are not allowed, so filter out
if (null != mailPropertyValue) {
properties.setProperty(descriptor.getName(), mailPropertyValue);
}
}
}
return properties; return properties;
} }
@ -493,7 +527,7 @@ public class PutEmail extends AbstractProcessor {
* @throws AddressException if the property cannot be parsed to a valid InternetAddress[] * @throws AddressException if the property cannot be parsed to a valid InternetAddress[]
*/ */
private InternetAddress[] toInetAddresses(final ProcessContext context, final FlowFile flowFile, private InternetAddress[] toInetAddresses(final ProcessContext context, final FlowFile flowFile,
PropertyDescriptor propertyDescriptor) throws AddressException { PropertyDescriptor propertyDescriptor) throws AddressException {
InternetAddress[] parse; InternetAddress[] parse;
final String value = context.getProperty(propertyDescriptor).evaluateAttributeExpressions(flowFile).getValue(); final String value = context.getProperty(propertyDescriptor).evaluateAttributeExpressions(flowFile).getValue();
if (value == null || value.isEmpty()){ if (value == null || value.isEmpty()){
@ -523,4 +557,35 @@ public class PutEmail extends AbstractProcessor {
protected void send(final Message msg) throws MessagingException { protected void send(final Message msg) throws MessagingException {
Transport.send(msg); Transport.send(msg);
} }
private static class DynamicMailPropertyValidator implements Validator {
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
final Matcher matcher = MAIL_PROPERTY_PATTERN.matcher(subject);
if (!matcher.matches()) {
return new ValidationResult.Builder()
.input(input)
.subject(subject)
.valid(false)
.explanation(String.format("[%s] does not start with mail.smtp", subject))
.build();
}
if (propertyToContext.containsKey(subject)) {
return new ValidationResult.Builder()
.input(input)
.subject(subject)
.valid(false)
.explanation(String.format("[%s] overwrites standard properties", subject))
.build();
}
return new ValidationResult.Builder()
.subject(subject)
.input(input)
.valid(true)
.explanation("Valid mail.smtp property found")
.build();
}
}
} }

View File

@ -180,11 +180,9 @@ public class TestPutEmail {
@Test @Test
public void testInvalidAddress() { public void testInvalidAddress() {
// verifies that unparsable addresses lead to the flow file being routed to failure // verifies that unparsable addresses lead to the flow file being routed to failure
runner.setProperty(PutEmail.SMTP_HOSTNAME, "smtp-host"); setRequiredProperties(runner);
runner.setProperty(PutEmail.HEADER_XMAILER, "TestingNiFi");
runner.setProperty(PutEmail.FROM, "test@apache.org <invalid"); runner.setProperty(PutEmail.FROM, "test@apache.org <invalid");
runner.setProperty(PutEmail.MESSAGE, "Message Body"); runner.setProperty(PutEmail.MESSAGE, "Message Body");
runner.setProperty(PutEmail.TO, "recipient@apache.org");
runner.enqueue("Some Text".getBytes()); runner.enqueue("Some Text".getBytes());
@ -200,11 +198,9 @@ public class TestPutEmail {
public void testEmptyFrom() { public void testEmptyFrom() {
// verifies that if the FROM property evaluates to an empty string at // verifies that if the FROM property evaluates to an empty string at
// runtime the flow file is transferred to failure. // runtime the flow file is transferred to failure.
runner.setProperty(PutEmail.SMTP_HOSTNAME, "smtp-host"); setRequiredProperties(runner);
runner.setProperty(PutEmail.HEADER_XMAILER, "TestingNiFi");
runner.setProperty(PutEmail.FROM, "${MISSING_PROPERTY}"); runner.setProperty(PutEmail.FROM, "${MISSING_PROPERTY}");
runner.setProperty(PutEmail.MESSAGE, "Message Body"); runner.setProperty(PutEmail.MESSAGE, "Message Body");
runner.setProperty(PutEmail.TO, "recipient@apache.org");
runner.enqueue("Some Text".getBytes()); runner.enqueue("Some Text".getBytes());
@ -220,13 +216,10 @@ public class TestPutEmail {
@Test @Test
public void testOutgoingMessageAttachment() throws Exception { public void testOutgoingMessageAttachment() throws Exception {
// verifies that are set on the outgoing Message correctly // verifies that are set on the outgoing Message correctly
runner.setProperty(PutEmail.SMTP_HOSTNAME, "smtp-host"); setRequiredProperties(runner);
runner.setProperty(PutEmail.HEADER_XMAILER, "TestingNiFi");
runner.setProperty(PutEmail.FROM, "test@apache.org");
runner.setProperty(PutEmail.MESSAGE, "Message Body"); runner.setProperty(PutEmail.MESSAGE, "Message Body");
runner.setProperty(PutEmail.ATTACH_FILE, "true"); runner.setProperty(PutEmail.ATTACH_FILE, "true");
runner.setProperty(PutEmail.CONTENT_TYPE, "text/html"); runner.setProperty(PutEmail.CONTENT_TYPE, "text/html");
runner.setProperty(PutEmail.TO, "recipient@apache.org");
Map<String, String> attributes = new HashMap<>(); Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.FILENAME.key(), "test한的ほу́.pdf"); attributes.put(CoreAttributes.FILENAME.key(), "test한的ほу́.pdf");
@ -265,11 +258,8 @@ public class TestPutEmail {
@Test @Test
public void testOutgoingMessageWithFlowfileContent() throws Exception { public void testOutgoingMessageWithFlowfileContent() throws Exception {
// verifies that are set on the outgoing Message correctly // verifies that are set on the outgoing Message correctly
runner.setProperty(PutEmail.SMTP_HOSTNAME, "smtp-host"); setRequiredProperties(runner);
runner.setProperty(PutEmail.HEADER_XMAILER, "TestingNiFi");
runner.setProperty(PutEmail.FROM, "test@apache.org,from@apache.org");
runner.setProperty(PutEmail.MESSAGE, "${body}"); runner.setProperty(PutEmail.MESSAGE, "${body}");
runner.setProperty(PutEmail.TO, "recipient@apache.org,another@apache.org");
runner.setProperty(PutEmail.CC, "recipientcc@apache.org,anothercc@apache.org"); runner.setProperty(PutEmail.CC, "recipientcc@apache.org,anothercc@apache.org");
runner.setProperty(PutEmail.BCC, "recipientbcc@apache.org,anotherbcc@apache.org"); runner.setProperty(PutEmail.BCC, "recipientbcc@apache.org,anotherbcc@apache.org");
runner.setProperty(PutEmail.CONTENT_AS_MESSAGE, "${sendContent}"); runner.setProperty(PutEmail.CONTENT_AS_MESSAGE, "${sendContent}");
@ -299,4 +289,42 @@ public class TestPutEmail {
assertEquals("anotherbcc@apache.org", message.getRecipients(RecipientType.BCC)[1].toString()); assertEquals("anotherbcc@apache.org", message.getRecipients(RecipientType.BCC)[1].toString());
} }
@Test
public void testValidDynamicMailProperties() {
setRequiredProperties(runner);
runner.setProperty(PutEmail.MESSAGE, "${body}");
runner.setProperty(PutEmail.CONTENT_AS_MESSAGE, "${sendContent}");
runner.setProperty("mail.smtp.timeout", "sample dynamic smtp property");
runner.setProperty("mail.smtps.timeout", "sample dynamic smtps property");
runner.assertValid();
}
@Test
public void testInvalidDynamicMailPropertyName() {
setRequiredProperties(runner);
runner.setProperty(PutEmail.MESSAGE, "${body}");
runner.setProperty(PutEmail.CONTENT_AS_MESSAGE, "${sendContent}");
runner.setProperty("mail.", "sample_value");
runner.assertNotValid();
}
@Test
public void testOverwritingDynamicMailProperty() {
setRequiredProperties(runner);
runner.setProperty(PutEmail.MESSAGE, "${body}");
runner.setProperty(PutEmail.CONTENT_AS_MESSAGE, "${sendContent}");
runner.setProperty("mail.smtp.user", "test-user-value");
runner.assertNotValid();
}
private void setRequiredProperties(final TestRunner runner) {
// values here may be overridden in some tests
runner.setProperty(PutEmail.SMTP_HOSTNAME, "smtp-host");
runner.setProperty(PutEmail.HEADER_XMAILER, "TestingNiFi");
runner.setProperty(PutEmail.FROM, "test@apache.org,from@apache.org");
runner.setProperty(PutEmail.TO, "recipient@apache.org,another@apache.org");
}
} }