NIFI-3855 Support PutSlack attachments as dynamic properties

NIFI-3855 Refactor deprecated API

NIFI-3855 Log separate error messages in PutSlack

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes #1774
This commit is contained in:
Tim Reardon 2017-05-09 13:42:42 -04:00 committed by Matt Burgess
parent d092551211
commit 196ca237e6
3 changed files with 146 additions and 7 deletions

View File

@ -16,26 +16,34 @@
*/
package org.apache.nifi.processors.slack;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.DataOutputStream;
import javax.json.Json;
import javax.json.JsonArrayBuilder;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import javax.json.JsonReader;
import javax.json.JsonWriter;
import javax.json.stream.JsonParsingException;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.StringReader;
import java.io.StringWriter;
import java.net.HttpURLConnection;
import java.net.URL;
@ -44,11 +52,17 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
@Tags({"put", "slack", "notify"})
@CapabilityDescription("Sends a message to your team on slack.com")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@DynamicProperty(name = "A JSON object to add to Slack's \"attachments\" JSON payload.", value = "JSON-formatted string to add to Slack's payload JSON appended to the \"attachments\" JSON array.",
supportsExpressionLanguage = true,
description = "Converts the contents of each value specified by the Dynamic Property's value to JSON and appends it to the payload being sent to Slack.")
public class PutSlack extends AbstractProcessor {
public static final PropertyDescriptor WEBHOOK_URL = new PropertyDescriptor
@ -124,6 +138,8 @@ public class PutSlack extends AbstractProcessor {
.description("FlowFiles are routed to failure if unable to be sent to Slack")
.build();
private final SortedSet<PropertyDescriptor> attachments = Collections.synchronizedSortedSet(new TreeSet<PropertyDescriptor>());
public static final List<PropertyDescriptor> descriptors = Collections.unmodifiableList(
Arrays.asList(WEBHOOK_URL, WEBHOOK_TEXT, CHANNEL, USERNAME, ICON_URL, ICON_EMOJI));
@ -140,6 +156,18 @@ public class PutSlack extends AbstractProcessor {
return descriptors;
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.required(false)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
.expressionLanguageSupported(true)
.dynamic(true)
.build();
}
// Validate the channel (or username for a direct message)
private String validateChannel(String channel) {
if ((channel.startsWith("#") || channel.startsWith("@")) && channel.length() > 1) {
@ -148,6 +176,17 @@ public class PutSlack extends AbstractProcessor {
return "Channel must begin with '#' or '@'";
}
@OnScheduled
public void initialize(final ProcessContext context) {
attachments.clear();
for (Map.Entry<PropertyDescriptor, String> property : context.getProperties().entrySet()) {
PropertyDescriptor descriptor = property.getKey();
if (descriptor.isDynamic()) {
attachments.add(descriptor);
}
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
@ -195,13 +234,25 @@ public class PutSlack extends AbstractProcessor {
builder.add("icon_emoji", iconEmoji);
}
JsonObject jsonObject = builder.build();
StringWriter stringWriter = new StringWriter();
JsonWriter jsonWriter = Json.createWriter(stringWriter);
jsonWriter.writeObject(jsonObject);
jsonWriter.close();
try {
// Get Attachments Array
if (!attachments.isEmpty()) {
JsonArrayBuilder jsonArrayBuiler = Json.createArrayBuilder();
for (PropertyDescriptor attachment : attachments) {
String s = context.getProperty(attachment).evaluateAttributeExpressions(flowFile).getValue();
JsonReader reader = Json.createReader(new StringReader(s));
JsonObject attachmentJson = reader.readObject();
jsonArrayBuiler.add(attachmentJson);
}
builder.add("attachments", jsonArrayBuiler);
}
JsonObject jsonObject = builder.build();
StringWriter stringWriter = new StringWriter();
JsonWriter jsonWriter = Json.createWriter(stringWriter);
jsonWriter.writeObject(jsonObject);
jsonWriter.close();
URL url = new URL(context.getProperty(WEBHOOK_URL).getValue());
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("POST");
@ -222,6 +273,10 @@ public class PutSlack extends AbstractProcessor {
session.transfer(flowFile, REL_FAILURE);
context.yield();
}
} catch (JsonParsingException e) {
getLogger().error("Failed to parse JSON", e);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
} catch (IOException e) {
getLogger().error("Failed to open connection", e);
flowFile = session.penalize(flowFile);

View File

@ -44,5 +44,10 @@
</li>
</ul>
</p>
<p>
Dynamic properties can be used to append items to the "attachments" branch of the JSON payload. Each dynamic property will be processed and added
as an item within the array. The keys are not used by the processor. Instead, for each flowfile, the values of the dynamic properties is converted to JSON and added
to the the attachments key of the JSON payload sent to Slack. For information on the attachment data structure, see <a href="https://api.slack.com/docs/message-attachments">https://api.slack.com/docs/message-attachments</a>
</p>
</body>
</html>

View File

@ -17,6 +17,8 @@
package org.apache.nifi.processors.slack;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.eclipse.jetty.servlet.ServletHandler;
@ -24,7 +26,9 @@ import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -101,6 +105,81 @@ public class PutSlackTest {
testRunner.run(1);
}
@Test
public void testInvalidDynamicProperties() {
testRunner.setProperty(PutSlack.WEBHOOK_URL, server.getUrl());
testRunner.setProperty(PutSlack.WEBHOOK_TEXT, WEBHOOK_TEST_TEXT);
PropertyDescriptor dynamicProp = new PropertyDescriptor.Builder()
.dynamic(true)
.name("foo")
.build();
testRunner.setProperty(dynamicProp, "{\"a\": a}");
testRunner.enqueue("{}".getBytes());
testRunner.run(1);
testRunner.assertTransferCount(PutSlack.REL_FAILURE, 1);
}
@Test
public void testValidDynamicProperties() {
testRunner.setProperty(PutSlack.WEBHOOK_URL, server.getUrl());
testRunner.setProperty(PutSlack.WEBHOOK_TEXT, WEBHOOK_TEST_TEXT);
PropertyDescriptor dynamicProp = new PropertyDescriptor.Builder()
.dynamic(true)
.name("foo")
.build();
testRunner.setProperty(dynamicProp, "{\"a\": \"a\"}");
testRunner.enqueue("{}".getBytes());
testRunner.run(1);
testRunner.assertTransferCount(PutSlack.REL_FAILURE, 0);
}
@Test
public void testValidDynamicPropertiesWithExpressionLanguage() {
ProcessSession session = testRunner.getProcessSessionFactory().createSession();
FlowFile ff = session.create();
Map<String, String> props = new HashMap<>();
props.put("foo", "\"bar\"");
props.put("ping", "pong");
ff = session.putAllAttributes(ff, props);
testRunner.setProperty(PutSlack.WEBHOOK_URL, server.getUrl());
testRunner.setProperty(PutSlack.WEBHOOK_TEXT, WEBHOOK_TEST_TEXT);
PropertyDescriptor dynamicProp = new PropertyDescriptor.Builder()
.dynamic(true)
.name("foo")
.build();
testRunner.setProperty(dynamicProp, "{\"foo\": ${foo}, \"ping\":\"${ping}\"}");
testRunner.enqueue(ff);
testRunner.run(1);
testRunner.assertTransferCount(PutSlack.REL_SUCCESS, 1);
}
@Test
public void testInvalidDynamicPropertiesWithExpressionLanguage() {
ProcessSession session = testRunner.getProcessSessionFactory().createSession();
FlowFile ff = session.create();
Map<String, String> props = new HashMap<>();
props.put("foo", "\"\"bar\"");
props.put("ping", "\"pong");
ff = session.putAllAttributes(ff, props);
testRunner.setProperty(PutSlack.WEBHOOK_URL, server.getUrl());
testRunner.setProperty(PutSlack.WEBHOOK_TEXT, WEBHOOK_TEST_TEXT);
PropertyDescriptor dynamicProp = new PropertyDescriptor.Builder()
.dynamic(true)
.name("foo")
.build();
testRunner.setProperty(dynamicProp, "{\"foo\": ${foo}, \"ping\":\"${ping}\"}");
testRunner.enqueue(ff);
testRunner.run(1);
testRunner.assertTransferCount(PutSlack.REL_SUCCESS, 0);
testRunner.assertTransferCount(PutSlack.REL_FAILURE, 1);
}
@Test
public void testGetPropertyDescriptors() throws Exception {
PutSlack processor = new PutSlack();