From db9a79f79d730db70f5ce9df0191e2d9bebf552c Mon Sep 17 00:00:00 2001
From: Adam Lamar
Date: Thu, 10 Mar 2016 10:17:09 -0700
Subject: [PATCH] NIFI-1578: Create PutSlack processor
Signed-off-by: Matt Burgess
This closes #256
---
nifi-assembly/pom.xml | 5 +
.../nifi-slack-bundle/nifi-slack-nar/pom.xml | 41 +++
.../src/main/resources/META-INF/NOTICE | 18 ++
.../nifi-slack-processors/pom.xml | 87 +++++++
.../nifi/processors/slack/PutSlack.java | 245 ++++++++++++++++++
.../org.apache.nifi.processor.Processor | 15 ++
.../additionalDetails.html | 48 ++++
.../nifi/processors/slack/CaptureServlet.java | 69 +++++
.../nifi/processors/slack/PutSlackTest.java | 163 ++++++++++++
.../nifi/processors/slack/TestServer.java | 164 ++++++++++++
nifi-nar-bundles/nifi-slack-bundle/pom.xml | 35 +++
.../nifi/processors/slack/TestServer.java | 163 ++++++++++++
nifi-nar-bundles/pom.xml | 1 +
pom.xml | 6 +
14 files changed, 1060 insertions(+)
create mode 100644 nifi-nar-bundles/nifi-slack-bundle/nifi-slack-nar/pom.xml
create mode 100644 nifi-nar-bundles/nifi-slack-bundle/nifi-slack-nar/src/main/resources/META-INF/NOTICE
create mode 100644 nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/pom.xml
create mode 100644 nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/slack/PutSlack.java
create mode 100644 nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
create mode 100644 nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/resources/docs/org.apache.nifi.processors.slack.PutSlack/additionalDetails.html
create mode 100644 nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/test/java/org/apache/nifi/processors/slack/CaptureServlet.java
create mode 100644 nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/test/java/org/apache/nifi/processors/slack/PutSlackTest.java
create mode 100644 nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/test/java/org/apache/nifi/processors/slack/TestServer.java
create mode 100644 nifi-nar-bundles/nifi-slack-bundle/pom.xml
create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/slack/TestServer.java
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index f65565990d..888533b71c 100644
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -337,6 +337,11 @@ language governing permissions and limitations under the License. -->
nifi-evtx-narnar
+
+ org.apache.nifi
+ nifi-slack-nar
+ nar
+
diff --git a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-nar/pom.xml b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-nar/pom.xml
new file mode 100644
index 0000000000..7755a3e6c9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-nar/pom.xml
@@ -0,0 +1,41 @@
+
+
+
+ 4.0.0
+
+
+ org.apache.nifi
+ nifi-slack-bundle
+ 1.0.0-SNAPSHOT
+
+
+ nifi-slack-nar
+ 1.0.0-SNAPSHOT
+ nar
+
+ true
+ true
+
+
+
+
+ org.apache.nifi
+ nifi-slack-processors
+ 1.0.0-SNAPSHOT
+
+
+
+
diff --git a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000000..759452ac2d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,18 @@
+nifi-slack-nar
+Copyright 2016 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+************************
+Common Development and Distribution License 1.1
+************************
+
+The following binary components are provided under the Common Development and Distribution License 1.1. See project link for details.
+
+ (CDDL 1.1) (GPL2 w/ CPE) JSON Processing API (javax.json:javax.json-api:jar:1.0 - http://json-processing-spec.java.net)
+ (CDDL 1.1) (GPL2 w/ CPE) JSON Processing Default Provider (org.glassfish:javax.json:jar:1.0.4 - https://jsonp.java.net)
+
+The following binary components are provided under the Common Development and Distribution License v1.0. See project link for details.
+
+ (CDDL 1.0) JSR311 API (javax.ws.rs:jsr311-api:jar:1.1.1 - https://jsr311.dev.java.net)
diff --git a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/pom.xml b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/pom.xml
new file mode 100644
index 0000000000..4e75faafe6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/pom.xml
@@ -0,0 +1,87 @@
+
+
+
+ 4.0.0
+
+
+ org.apache.nifi
+ nifi-slack-bundle
+ 1.0.0-SNAPSHOT
+
+
+ nifi-slack-processors
+ jar
+
+
+
+ javax.json
+ javax.json-api
+ 1.0
+
+
+ org.glassfish
+ javax.json
+ 1.0.4
+
+
+ org.apache.nifi
+ nifi-api
+
+
+ org.apache.nifi
+ nifi-processor-utils
+
+
+ org.apache.nifi
+ nifi-mock
+ test
+
+
+ org.slf4j
+ slf4j-simple
+ test
+
+
+ junit
+ junit
+ 4.11
+ test
+
+
+ org.eclipse.jetty
+ jetty-server
+ 9.2.11.v20150529
+ test
+
+
+ org.eclipse.jetty
+ jetty-servlet
+ 9.2.11.v20150529
+ test
+
+
+ javax.ws.rs
+ javax.ws.rs-api
+ 2.0.1
+ test
+
+
+ org.apache.nifi
+ nifi-ssl-context-service
+ test
+
+
+
diff --git a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/slack/PutSlack.java b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/slack/PutSlack.java
new file mode 100644
index 0000000000..e5e2c03bb3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/slack/PutSlack.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.slack;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.DataOutputStream;
+
+import javax.json.Json;
+import javax.json.JsonObject;
+import javax.json.JsonObjectBuilder;
+import javax.json.JsonWriter;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Tags({"put", "slack", "notify"})
+@CapabilityDescription("Sends a message to your team on slack.com")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class PutSlack extends AbstractProcessor {
+
+ public static final PropertyDescriptor WEBHOOK_URL = new PropertyDescriptor
+ .Builder()
+ .name("webhook-url")
+ .displayName("Webhook URL")
+ .description("The POST URL provided by Slack to send messages into a channel.")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .addValidator(StandardValidators.URL_VALIDATOR)
+ .sensitive(true)
+ .build();
+
+ public static final PropertyDescriptor WEBHOOK_TEXT = new PropertyDescriptor
+ .Builder()
+ .name("webhook-text")
+ .displayName("Webhook Text")
+ .description("The text sent in the webhook message")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor CHANNEL = new PropertyDescriptor
+ .Builder()
+ .name("channel")
+ .displayName("Channel")
+ .description("A public channel using #channel or direct message using @username. If not specified, " +
+ "the default webhook channel as specified in Slack's Incoming Webhooks web interface is used.")
+ .required(false)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor USERNAME = new PropertyDescriptor
+ .Builder()
+ .name("username")
+ .displayName("Username")
+ .description("The displayed Slack username")
+ .required(false)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor ICON_URL = new PropertyDescriptor
+ .Builder()
+ .name("icon-url")
+ .displayName("Icon URL")
+ .description("Icon URL to be used for the message")
+ .required(false)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.URL_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor ICON_EMOJI = new PropertyDescriptor
+ .Builder()
+ .name("icon-emoji")
+ .displayName("Icon Emoji")
+ .description("Icon Emoji to be used for the message. Must begin and end with a colon, e.g. :ghost:")
+ .required(false)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .addValidator(new EmojiValidator())
+ .build();
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("FlowFiles are routed to success after being successfully sent to Slack")
+ .build();
+
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("FlowFiles are routed to failure if unable to be sent to Slack")
+ .build();
+
+ public static final List descriptors = Collections.unmodifiableList(
+ Arrays.asList(WEBHOOK_URL, WEBHOOK_TEXT, CHANNEL, USERNAME, ICON_URL, ICON_EMOJI));
+
+ public static final Set relationships = Collections.unmodifiableSet(
+ new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
+
+ @Override
+ public Set getRelationships() {
+ return this.relationships;
+ }
+
+ @Override
+ public final List getSupportedPropertyDescriptors() {
+ return descriptors;
+ }
+
+ // Validate the channel (or username for a direct message)
+ private String validateChannel(String channel) {
+ if ((channel.startsWith("#") || channel.startsWith("@")) && channel.length() > 1) {
+ return null;
+ }
+ return "Channel must begin with '#' or '@'";
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) {
+ FlowFile flowFile = session.get();
+ if ( flowFile == null ) {
+ return;
+ }
+
+ JsonObjectBuilder builder = Json.createObjectBuilder();
+ String text = context.getProperty(WEBHOOK_TEXT).evaluateAttributeExpressions(flowFile).getValue();
+ if (text != null && !text.isEmpty()) {
+ builder.add("text", text);
+ } else {
+ // Slack requires the 'text' attribute
+ getLogger().error("FlowFile should have non-empty " + WEBHOOK_TEXT.getName());
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ String channel = context.getProperty(CHANNEL).evaluateAttributeExpressions(flowFile).getValue();
+ if (channel != null && !channel.isEmpty()) {
+ String error = validateChannel(channel);
+ if (error == null) {
+ builder.add("channel", channel);
+ } else {
+ getLogger().error("Invalid channel '{}': {}", new Object[]{channel, error});
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+ }
+
+ String username = context.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue();
+ if (username != null && !username.isEmpty()) {
+ builder.add("username", username);
+ }
+
+ String iconUrl = context.getProperty(ICON_URL).evaluateAttributeExpressions(flowFile).getValue();
+ if (iconUrl != null && !iconUrl.isEmpty()) {
+ builder.add("icon_url", iconUrl);
+ }
+
+ String iconEmoji = context.getProperty(ICON_EMOJI).evaluateAttributeExpressions(flowFile).getValue();
+ if (iconEmoji != null && !iconEmoji.isEmpty()) {
+ builder.add("icon_emoji", iconEmoji);
+ }
+
+ JsonObject jsonObject = builder.build();
+ StringWriter stringWriter = new StringWriter();
+ JsonWriter jsonWriter = Json.createWriter(stringWriter);
+ jsonWriter.writeObject(jsonObject);
+ jsonWriter.close();
+
+ try {
+ URL url = new URL(context.getProperty(WEBHOOK_URL).getValue());
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ conn.setRequestMethod("POST");
+ conn.setDoOutput(true);
+ DataOutputStream outputStream = new DataOutputStream(conn.getOutputStream());
+ String payload = "payload=" + URLEncoder.encode(stringWriter.getBuffer().toString(), "UTF-8");
+ outputStream.writeBytes(payload);
+ outputStream.close();
+
+ int responseCode = conn.getResponseCode();
+ if (responseCode >= 200 && responseCode < 300) {
+ getLogger().info("Successfully posted message to Slack");
+ session.transfer(flowFile, REL_SUCCESS);
+ session.getProvenanceReporter().send(flowFile, context.getProperty(WEBHOOK_URL).getValue());
+ } else {
+ getLogger().error("Failed to post message to Slack with response code {}", new Object[]{responseCode});
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ context.yield();
+ }
+ } catch (IOException e) {
+ getLogger().error("Failed to open connection", e);
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ context.yield();
+ }
+ }
+
+ private static class EmojiValidator implements Validator {
+ @Override
+ public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+ if (input.startsWith(":") && input.endsWith(":") && input.length() > 2) {
+ return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
+ }
+
+ return new ValidationResult.Builder().input(input).subject(subject).valid(false)
+ .explanation("Must begin and end with a colon")
+ .build();
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000000..9a861ee379
--- /dev/null
+++ b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.processors.slack.PutSlack
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/resources/docs/org.apache.nifi.processors.slack.PutSlack/additionalDetails.html b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/resources/docs/org.apache.nifi.processors.slack.PutSlack/additionalDetails.html
new file mode 100644
index 0000000000..53f2dc55ed
--- /dev/null
+++ b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/resources/docs/org.apache.nifi.processors.slack.PutSlack/additionalDetails.html
@@ -0,0 +1,48 @@
+
+
+
+
+
+ PutSlack
+
+
+
+
+
+
Description:
+
+ The PutSlack processor sends messages to Slack,
+ a team-oriented messaging service.
+
+
+ This processor uses Slack's incoming webhooks
+ custom integration to post messages to a specific channel. Before using PutSlack, your Slack team should be
+ configured for the incoming webhooks custom integration, and you'll need to configure at least one incoming
+ webhook.
+
+
+ To configure PutSlack, set the following mandatory properties:
+
+
+ Webhook URL: The URL received from Slack that allows the processor to send messages to your team.
+
+
+ Webhook Text: The text of the message to send to Slack.
+
+
+
+
+
diff --git a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/test/java/org/apache/nifi/processors/slack/CaptureServlet.java b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/test/java/org/apache/nifi/processors/slack/CaptureServlet.java
new file mode 100644
index 0000000000..a605130a7d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/test/java/org/apache/nifi/processors/slack/CaptureServlet.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.slack;
+
+import java.io.IOException;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.Response.Status;
+
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.file.FileUtils;
+
+public class CaptureServlet extends HttpServlet {
+
+ private static final long serialVersionUID = 8402271018449653919L;
+
+ private volatile byte[] lastPost;
+ private volatile Map lastPostHeaders;
+
+ public byte[] getLastPost() {
+ return lastPost;
+ }
+
+ public Map getLastPostHeaders() {
+ return lastPostHeaders;
+ }
+
+ @Override
+ protected void doPost(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+ // Capture all the headers for reference. Intentionally choosing to not special handling for headers with multiple values for clarity
+ final Enumeration headerNames = request.getHeaderNames();
+ lastPostHeaders = new HashMap<>();
+ while (headerNames.hasMoreElements()) {
+ final String nextHeader = headerNames.nextElement();
+ lastPostHeaders.put(nextHeader, request.getHeader(nextHeader));
+ }
+
+ try {
+ StreamUtils.copy(request.getInputStream(), baos);
+ this.lastPost = baos.toByteArray();
+ } finally {
+ FileUtils.closeQuietly(baos);
+ }
+ response.setStatus(Status.OK.getStatusCode());
+ }
+}
diff --git a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/test/java/org/apache/nifi/processors/slack/PutSlackTest.java b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/test/java/org/apache/nifi/processors/slack/PutSlackTest.java
new file mode 100644
index 0000000000..bef2e5ed1f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/test/java/org/apache/nifi/processors/slack/PutSlackTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.slack;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.eclipse.jetty.servlet.ServletHandler;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class PutSlackTest {
+
+ private TestRunner testRunner;
+ private TestServer server;
+ private CaptureServlet servlet;
+ public static final String WEBHOOK_TEST_TEXT = "Hello From Apache NiFi";
+
+ @Before
+ public void init() throws Exception {
+ testRunner = TestRunners.newTestRunner(PutSlack.class);
+
+ // set up web service
+ ServletHandler handler = new ServletHandler();
+ handler.addServletWithMapping(CaptureServlet.class, "/*");
+ servlet = (CaptureServlet) handler.getServlets()[0].getServlet();
+
+ // create the service
+ server = new TestServer();
+ server.addHandler(handler);
+ server.startServer();
+ }
+
+ @Test(expected = AssertionError.class)
+ public void testBlankText() {
+ testRunner.setProperty(PutSlack.WEBHOOK_URL, server.getUrl());
+ testRunner.setProperty(PutSlack.WEBHOOK_TEXT, "");
+
+ testRunner.enqueue(new byte[0]);
+ testRunner.run(1);
+ }
+
+ @Test
+ public void testBlankTextViaExpression() {
+ testRunner.setProperty(PutSlack.WEBHOOK_URL, server.getUrl());
+ testRunner.setProperty(PutSlack.WEBHOOK_TEXT, "${invalid-attr}"); // Create a blank webhook text
+
+ testRunner.enqueue(new byte[0]);
+ testRunner.run(1);
+ testRunner.assertAllFlowFilesTransferred(PutSlack.REL_FAILURE);
+ }
+
+ @Test
+ public void testInvalidChannel() {
+ testRunner.setProperty(PutSlack.WEBHOOK_URL, server.getUrl());
+ testRunner.setProperty(PutSlack.WEBHOOK_TEXT, WEBHOOK_TEST_TEXT);
+ testRunner.setProperty(PutSlack.CHANNEL, "invalid");
+
+ testRunner.enqueue(new byte[0]);
+ testRunner.run(1);
+ testRunner.assertAllFlowFilesTransferred(PutSlack.REL_FAILURE);
+ }
+
+ @Test(expected = AssertionError.class)
+ public void testInvalidIconUrl() {
+ testRunner.setProperty(PutSlack.WEBHOOK_URL, server.getUrl());
+ testRunner.setProperty(PutSlack.WEBHOOK_TEXT, WEBHOOK_TEST_TEXT);
+ testRunner.setProperty(PutSlack.ICON_URL, "invalid");
+
+ testRunner.enqueue(new byte[0]);
+ testRunner.run(1);
+ }
+
+ @Test(expected = AssertionError.class)
+ public void testInvalidIconEmoji() {
+ testRunner.setProperty(PutSlack.WEBHOOK_URL, server.getUrl());
+ testRunner.setProperty(PutSlack.WEBHOOK_TEXT, WEBHOOK_TEST_TEXT);
+ testRunner.setProperty(PutSlack.ICON_EMOJI, "invalid");
+
+ testRunner.enqueue(new byte[0]);
+ testRunner.run(1);
+ }
+
+ @Test
+ public void testGetPropertyDescriptors() throws Exception {
+ PutSlack processor = new PutSlack();
+ List pd = processor.getSupportedPropertyDescriptors();
+ assertEquals("size should be eq", 6, pd.size());
+ assertTrue(pd.contains(PutSlack.WEBHOOK_TEXT));
+ assertTrue(pd.contains(PutSlack.WEBHOOK_URL));
+ assertTrue(pd.contains(PutSlack.CHANNEL));
+ assertTrue(pd.contains(PutSlack.USERNAME));
+ assertTrue(pd.contains(PutSlack.ICON_URL));
+ assertTrue(pd.contains(PutSlack.ICON_EMOJI));
+ }
+
+ @Test
+ public void testSimplePut() {
+ testRunner.setProperty(PutSlack.WEBHOOK_URL, server.getUrl());
+ testRunner.setProperty(PutSlack.WEBHOOK_TEXT, PutSlackTest.WEBHOOK_TEST_TEXT);
+
+ testRunner.enqueue(new byte[0]);
+ testRunner.run(1);
+ testRunner.assertAllFlowFilesTransferred(PutSlack.REL_SUCCESS, 1);
+
+ byte[] expected = "payload=%7B%22text%22%3A%22Hello+From+Apache+NiFi%22%7D".getBytes();
+ assertTrue(Arrays.equals(expected, servlet.getLastPost()));
+ }
+
+ @Test
+ public void testSimplePutWithAttributes() {
+ testRunner.setProperty(PutSlack.WEBHOOK_URL, server.getUrl());
+ testRunner.setProperty(PutSlack.WEBHOOK_TEXT, PutSlackTest.WEBHOOK_TEST_TEXT);
+ testRunner.setProperty(PutSlack.CHANNEL, "#test-attributes");
+ testRunner.setProperty(PutSlack.USERNAME, "integration-test-webhook");
+ testRunner.setProperty(PutSlack.ICON_EMOJI, ":smile:");
+
+ testRunner.enqueue(new byte[0]);
+ testRunner.run(1);
+ testRunner.assertAllFlowFilesTransferred(PutSlack.REL_SUCCESS, 1);
+
+ final String expected = "payload=%7B%22text%22%3A%22Hello+From+Apache+NiFi%22%2C%22channel%22%3A%22%23test-attributes%22%2C%22username%22%3A%22" +
+ "integration-test-webhook%22%2C%22icon_emoji%22%3A%22%3Asmile%3A%22%7D";
+ assertTrue(Arrays.equals(expected.getBytes(), servlet.getLastPost()));
+ }
+
+ @Test
+ public void testSimplePutWithAttributesIconURL() {
+ testRunner.setProperty(PutSlack.WEBHOOK_URL, server.getUrl());
+ testRunner.setProperty(PutSlack.WEBHOOK_TEXT, PutSlackTest.WEBHOOK_TEST_TEXT);
+ testRunner.setProperty(PutSlack.CHANNEL, "#test-attributes-url");
+ testRunner.setProperty(PutSlack.USERNAME, "integration-test-webhook");
+ testRunner.setProperty(PutSlack.ICON_URL, "http://lorempixel.com/48/48/");
+
+ testRunner.enqueue(new byte[0]);
+ testRunner.run(1);
+ testRunner.assertAllFlowFilesTransferred(PutSlack.REL_SUCCESS, 1);
+
+ final String expected = "payload=%7B%22text%22%3A%22Hello+From+Apache+NiFi%22%2C%22channel%22%3A%22%23test-attributes-url%22%2C%22username%22%3A%22"
+ + "integration-test-webhook%22%2C%22icon_url%22%3A%22http%3A%2F%2Florempixel.com%2F48%2F48%2F%22%7D";
+ assertTrue(Arrays.equals(expected.getBytes(), servlet.getLastPost()));
+ }
+}
diff --git a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/test/java/org/apache/nifi/processors/slack/TestServer.java b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/test/java/org/apache/nifi/processors/slack/TestServer.java
new file mode 100644
index 0000000000..6506fd8afe
--- /dev/null
+++ b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/test/java/org/apache/nifi/processors/slack/TestServer.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.slack;
+
+import org.apache.nifi.ssl.StandardSSLContextService;
+import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.handler.HandlerCollection;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+
+import java.util.Map;
+
+/**
+ * Test server to assist with unit tests that requires a server to be stood up.
+ */
+public class TestServer {
+
+ public static final String NEED_CLIENT_AUTH = "clientAuth";
+
+ private Server jetty;
+ private boolean secure = false;
+
+ /**
+ * Creates the test server.
+ */
+ public TestServer() {
+ createServer(null);
+ }
+
+ /**
+ * Creates the test server.
+ *
+ * @param sslProperties SSLProps to be used in the secure connection. The keys should should use the StandardSSLContextService properties.
+ */
+ public TestServer(final Map sslProperties) {
+ createServer(sslProperties);
+ }
+
+ private void createServer(final Map sslProperties) {
+ jetty = new Server();
+
+ // create the unsecure connector
+ createConnector();
+
+ // create the secure connector if sslProperties are specified
+ if (sslProperties != null) {
+ createSecureConnector(sslProperties);
+ }
+
+ jetty.setHandler(new HandlerCollection(true));
+ }
+
+ /**
+ * Creates the http connection
+ */
+ private void createConnector() {
+ final ServerConnector http = new ServerConnector(jetty);
+ http.setPort(0);
+ // Severely taxed environments may have significant delays when executing.
+ http.setIdleTimeout(30000L);
+ jetty.addConnector(http);
+ }
+
+ private void createSecureConnector(final Map sslProperties) {
+ SslContextFactory ssl = new SslContextFactory();
+
+ if (sslProperties.get(StandardSSLContextService.KEYSTORE.getName()) != null) {
+ ssl.setKeyStorePath(sslProperties.get(StandardSSLContextService.KEYSTORE.getName()));
+ ssl.setKeyStorePassword(sslProperties.get(StandardSSLContextService.KEYSTORE_PASSWORD.getName()));
+ ssl.setKeyStoreType(sslProperties.get(StandardSSLContextService.KEYSTORE_TYPE.getName()));
+ }
+
+ if (sslProperties.get(StandardSSLContextService.TRUSTSTORE.getName()) != null) {
+ ssl.setTrustStorePath(sslProperties.get(StandardSSLContextService.TRUSTSTORE.getName()));
+ ssl.setTrustStorePassword(sslProperties.get(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName()));
+ ssl.setTrustStoreType(sslProperties.get(StandardSSLContextService.TRUSTSTORE_TYPE.getName()));
+ }
+
+ final String clientAuth = sslProperties.get(NEED_CLIENT_AUTH);
+ if (clientAuth == null) {
+ ssl.setNeedClientAuth(true);
+ } else {
+ ssl.setNeedClientAuth(Boolean.parseBoolean(clientAuth));
+ }
+
+ // build the connector
+ final ServerConnector https = new ServerConnector(jetty, ssl);
+
+ // set host and port
+ https.setPort(0);
+ // Severely taxed environments may have significant delays when executing.
+ https.setIdleTimeout(30000L);
+
+ // add the connector
+ jetty.addConnector(https);
+
+ // mark secure as enabled
+ secure = true;
+ }
+
+ public void clearHandlers() {
+ HandlerCollection hc = (HandlerCollection) jetty.getHandler();
+ Handler[] ha = hc.getHandlers();
+ if (ha != null) {
+ for (Handler h : ha) {
+ hc.removeHandler(h);
+ }
+ }
+ }
+
+ public void addHandler(Handler handler) {
+ ((HandlerCollection) jetty.getHandler()).addHandler(handler);
+ }
+
+ public void startServer() throws Exception {
+ jetty.start();
+ }
+
+ public void shutdownServer() throws Exception {
+ jetty.stop();
+ jetty.destroy();
+ }
+
+ private int getPort() {
+ if (!jetty.isStarted()) {
+ throw new IllegalStateException("Jetty server not started");
+ }
+ return ((ServerConnector) jetty.getConnectors()[0]).getLocalPort();
+ }
+
+ private int getSecurePort() {
+ if (!jetty.isStarted()) {
+ throw new IllegalStateException("Jetty server not started");
+ }
+ return ((ServerConnector) jetty.getConnectors()[1]).getLocalPort();
+ }
+
+ public String getUrl() {
+ return "http://localhost:" + getPort();
+ }
+
+ public String getSecureUrl() {
+ String url = null;
+ if (secure) {
+ url = "https://localhost:" + getSecurePort();
+ }
+ return url;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-slack-bundle/pom.xml b/nifi-nar-bundles/nifi-slack-bundle/pom.xml
new file mode 100644
index 0000000000..89f3c34496
--- /dev/null
+++ b/nifi-nar-bundles/nifi-slack-bundle/pom.xml
@@ -0,0 +1,35 @@
+
+
+
+ 4.0.0
+
+
+ org.apache.nifi
+ nifi-nar-bundles
+ 1.0.0-SNAPSHOT
+
+
+ org.apache.nifi
+ nifi-slack-bundle
+ 1.0.0-SNAPSHOT
+ pom
+
+
+ nifi-slack-processors
+ nifi-slack-nar
+
+
+
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/slack/TestServer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/slack/TestServer.java
new file mode 100644
index 0000000000..1037ca6f91
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/slack/TestServer.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.slack;
+
+import java.util.Map;
+import org.apache.nifi.ssl.StandardSSLContextService;
+import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.handler.HandlerCollection;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+
+/**
+ * Test server to assist with unit tests that requires a server to be stood up.
+ */
+public class TestServer {
+
+ public static final String NEED_CLIENT_AUTH = "clientAuth";
+
+ private Server jetty;
+ private boolean secure = false;
+
+ /**
+ * Creates the test server.
+ */
+ public TestServer() {
+ createServer(null);
+ }
+
+ /**
+ * Creates the test server.
+ *
+ * @param sslProperties SSLProps to be used in the secure connection. The keys should should use the StandardSSLContextService properties.
+ */
+ public TestServer(final Map sslProperties) {
+ createServer(sslProperties);
+ }
+
+ private void createServer(final Map sslProperties) {
+ jetty = new Server();
+
+ // create the unsecure connector
+ createConnector();
+
+ // create the secure connector if sslProperties are specified
+ if (sslProperties != null) {
+ createSecureConnector(sslProperties);
+ }
+
+ jetty.setHandler(new HandlerCollection(true));
+ }
+
+ /**
+ * Creates the http connection
+ */
+ private void createConnector() {
+ final ServerConnector http = new ServerConnector(jetty);
+ http.setPort(0);
+ // Severely taxed environments may have significant delays when executing.
+ http.setIdleTimeout(30000L);
+ jetty.addConnector(http);
+ }
+
+ private void createSecureConnector(final Map sslProperties) {
+ SslContextFactory ssl = new SslContextFactory();
+
+ if (sslProperties.get(StandardSSLContextService.KEYSTORE.getName()) != null) {
+ ssl.setKeyStorePath(sslProperties.get(StandardSSLContextService.KEYSTORE.getName()));
+ ssl.setKeyStorePassword(sslProperties.get(StandardSSLContextService.KEYSTORE_PASSWORD.getName()));
+ ssl.setKeyStoreType(sslProperties.get(StandardSSLContextService.KEYSTORE_TYPE.getName()));
+ }
+
+ if (sslProperties.get(StandardSSLContextService.TRUSTSTORE.getName()) != null) {
+ ssl.setTrustStorePath(sslProperties.get(StandardSSLContextService.TRUSTSTORE.getName()));
+ ssl.setTrustStorePassword(sslProperties.get(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName()));
+ ssl.setTrustStoreType(sslProperties.get(StandardSSLContextService.TRUSTSTORE_TYPE.getName()));
+ }
+
+ final String clientAuth = sslProperties.get(NEED_CLIENT_AUTH);
+ if (clientAuth == null) {
+ ssl.setNeedClientAuth(true);
+ } else {
+ ssl.setNeedClientAuth(Boolean.parseBoolean(clientAuth));
+ }
+
+ // build the connector
+ final ServerConnector https = new ServerConnector(jetty, ssl);
+
+ // set host and port
+ https.setPort(0);
+ // Severely taxed environments may have significant delays when executing.
+ https.setIdleTimeout(30000L);
+
+ // add the connector
+ jetty.addConnector(https);
+
+ // mark secure as enabled
+ secure = true;
+ }
+
+ public void clearHandlers() {
+ HandlerCollection hc = (HandlerCollection) jetty.getHandler();
+ Handler[] ha = hc.getHandlers();
+ if (ha != null) {
+ for (Handler h : ha) {
+ hc.removeHandler(h);
+ }
+ }
+ }
+
+ public void addHandler(Handler handler) {
+ ((HandlerCollection) jetty.getHandler()).addHandler(handler);
+ }
+
+ public void startServer() throws Exception {
+ jetty.start();
+ }
+
+ public void shutdownServer() throws Exception {
+ jetty.stop();
+ jetty.destroy();
+ }
+
+ private int getPort() {
+ if (!jetty.isStarted()) {
+ throw new IllegalStateException("Jetty server not started");
+ }
+ return ((ServerConnector) jetty.getConnectors()[0]).getLocalPort();
+ }
+
+ private int getSecurePort() {
+ if (!jetty.isStarted()) {
+ throw new IllegalStateException("Jetty server not started");
+ }
+ return ((ServerConnector) jetty.getConnectors()[1]).getLocalPort();
+ }
+
+ public String getUrl() {
+ return "http://localhost:" + getPort();
+ }
+
+ public String getSecureUrl() {
+ String url = null;
+ if (secure) {
+ url = "https://localhost:" + getSecurePort();
+ }
+ return url;
+ }
+}
diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml
index 998ced8841..250822a2ff 100644
--- a/nifi-nar-bundles/pom.xml
+++ b/nifi-nar-bundles/pom.xml
@@ -64,6 +64,7 @@
nifi-site-to-site-reporting-bundlenifi-mqtt-bundlenifi-evtx-bundle
+ nifi-slack-bundle
diff --git a/pom.xml b/pom.xml
index df4e70cd31..25ac4a33e8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1102,6 +1102,12 @@ language governing permissions and limitations under the License. -->
1.0.0-SNAPSHOTnar
+
+ org.apache.nifi
+ nifi-slack-nar
+ 1.0.0-SNAPSHOT
+ nar
+ org.apache.nifinifi-elasticsearch-nar