diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index f65565990d..888533b71c 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -337,6 +337,11 @@ language governing permissions and limitations under the License. --> nifi-evtx-nar nar + + org.apache.nifi + nifi-slack-nar + nar + diff --git a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-nar/pom.xml b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-nar/pom.xml new file mode 100644 index 0000000000..7755a3e6c9 --- /dev/null +++ b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-nar/pom.xml @@ -0,0 +1,41 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-slack-bundle + 1.0.0-SNAPSHOT + + + nifi-slack-nar + 1.0.0-SNAPSHOT + nar + + true + true + + + + + org.apache.nifi + nifi-slack-processors + 1.0.0-SNAPSHOT + + + + diff --git a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000000..759452ac2d --- /dev/null +++ b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,18 @@ +nifi-slack-nar +Copyright 2016 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +************************ +Common Development and Distribution License 1.1 +************************ + +The following binary components are provided under the Common Development and Distribution License 1.1. See project link for details. + + (CDDL 1.1) (GPL2 w/ CPE) JSON Processing API (javax.json:javax.json-api:jar:1.0 - http://json-processing-spec.java.net) + (CDDL 1.1) (GPL2 w/ CPE) JSON Processing Default Provider (org.glassfish:javax.json:jar:1.0.4 - https://jsonp.java.net) + +The following binary components are provided under the Common Development and Distribution License v1.0. See project link for details. + + (CDDL 1.0) JSR311 API (javax.ws.rs:jsr311-api:jar:1.1.1 - https://jsr311.dev.java.net) diff --git a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/pom.xml b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/pom.xml new file mode 100644 index 0000000000..4e75faafe6 --- /dev/null +++ b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/pom.xml @@ -0,0 +1,87 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-slack-bundle + 1.0.0-SNAPSHOT + + + nifi-slack-processors + jar + + + + javax.json + javax.json-api + 1.0 + + + org.glassfish + javax.json + 1.0.4 + + + org.apache.nifi + nifi-api + + + org.apache.nifi + nifi-processor-utils + + + org.apache.nifi + nifi-mock + test + + + org.slf4j + slf4j-simple + test + + + junit + junit + 4.11 + test + + + org.eclipse.jetty + jetty-server + 9.2.11.v20150529 + test + + + org.eclipse.jetty + jetty-servlet + 9.2.11.v20150529 + test + + + javax.ws.rs + javax.ws.rs-api + 2.0.1 + test + + + org.apache.nifi + nifi-ssl-context-service + test + + + diff --git a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/slack/PutSlack.java b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/slack/PutSlack.java new file mode 100644 index 0000000000..e5e2c03bb3 --- /dev/null +++ b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/slack/PutSlack.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.slack; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.DataOutputStream; + +import javax.json.Json; +import javax.json.JsonObject; +import javax.json.JsonObjectBuilder; +import javax.json.JsonWriter; +import java.io.IOException; +import java.io.StringWriter; +import java.net.HttpURLConnection; +import java.net.URL; +import java.net.URLEncoder; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +@Tags({"put", "slack", "notify"}) +@CapabilityDescription("Sends a message to your team on slack.com") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +public class PutSlack extends AbstractProcessor { + + public static final PropertyDescriptor WEBHOOK_URL = new PropertyDescriptor + .Builder() + .name("webhook-url") + .displayName("Webhook URL") + .description("The POST URL provided by Slack to send messages into a channel.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(StandardValidators.URL_VALIDATOR) + .sensitive(true) + .build(); + + public static final PropertyDescriptor WEBHOOK_TEXT = new PropertyDescriptor + .Builder() + .name("webhook-text") + .displayName("Webhook Text") + .description("The text sent in the webhook message") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor CHANNEL = new PropertyDescriptor + .Builder() + .name("channel") + .displayName("Channel") + .description("A public channel using #channel or direct message using @username. If not specified, " + + "the default webhook channel as specified in Slack's Incoming Webhooks web interface is used.") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor USERNAME = new PropertyDescriptor + .Builder() + .name("username") + .displayName("Username") + .description("The displayed Slack username") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor ICON_URL = new PropertyDescriptor + .Builder() + .name("icon-url") + .displayName("Icon URL") + .description("Icon URL to be used for the message") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.URL_VALIDATOR) + .build(); + + public static final PropertyDescriptor ICON_EMOJI = new PropertyDescriptor + .Builder() + .name("icon-emoji") + .displayName("Icon Emoji") + .description("Icon Emoji to be used for the message. Must begin and end with a colon, e.g. :ghost:") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(new EmojiValidator()) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles are routed to success after being successfully sent to Slack") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFiles are routed to failure if unable to be sent to Slack") + .build(); + + public static final List descriptors = Collections.unmodifiableList( + Arrays.asList(WEBHOOK_URL, WEBHOOK_TEXT, CHANNEL, USERNAME, ICON_URL, ICON_EMOJI)); + + public static final Set relationships = Collections.unmodifiableSet( + new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE))); + + @Override + public Set getRelationships() { + return this.relationships; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return descriptors; + } + + // Validate the channel (or username for a direct message) + private String validateChannel(String channel) { + if ((channel.startsWith("#") || channel.startsWith("@")) && channel.length() > 1) { + return null; + } + return "Channel must begin with '#' or '@'"; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + FlowFile flowFile = session.get(); + if ( flowFile == null ) { + return; + } + + JsonObjectBuilder builder = Json.createObjectBuilder(); + String text = context.getProperty(WEBHOOK_TEXT).evaluateAttributeExpressions(flowFile).getValue(); + if (text != null && !text.isEmpty()) { + builder.add("text", text); + } else { + // Slack requires the 'text' attribute + getLogger().error("FlowFile should have non-empty " + WEBHOOK_TEXT.getName()); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + return; + } + + String channel = context.getProperty(CHANNEL).evaluateAttributeExpressions(flowFile).getValue(); + if (channel != null && !channel.isEmpty()) { + String error = validateChannel(channel); + if (error == null) { + builder.add("channel", channel); + } else { + getLogger().error("Invalid channel '{}': {}", new Object[]{channel, error}); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + return; + } + } + + String username = context.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue(); + if (username != null && !username.isEmpty()) { + builder.add("username", username); + } + + String iconUrl = context.getProperty(ICON_URL).evaluateAttributeExpressions(flowFile).getValue(); + if (iconUrl != null && !iconUrl.isEmpty()) { + builder.add("icon_url", iconUrl); + } + + String iconEmoji = context.getProperty(ICON_EMOJI).evaluateAttributeExpressions(flowFile).getValue(); + if (iconEmoji != null && !iconEmoji.isEmpty()) { + builder.add("icon_emoji", iconEmoji); + } + + JsonObject jsonObject = builder.build(); + StringWriter stringWriter = new StringWriter(); + JsonWriter jsonWriter = Json.createWriter(stringWriter); + jsonWriter.writeObject(jsonObject); + jsonWriter.close(); + + try { + URL url = new URL(context.getProperty(WEBHOOK_URL).getValue()); + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + conn.setRequestMethod("POST"); + conn.setDoOutput(true); + DataOutputStream outputStream = new DataOutputStream(conn.getOutputStream()); + String payload = "payload=" + URLEncoder.encode(stringWriter.getBuffer().toString(), "UTF-8"); + outputStream.writeBytes(payload); + outputStream.close(); + + int responseCode = conn.getResponseCode(); + if (responseCode >= 200 && responseCode < 300) { + getLogger().info("Successfully posted message to Slack"); + session.transfer(flowFile, REL_SUCCESS); + session.getProvenanceReporter().send(flowFile, context.getProperty(WEBHOOK_URL).getValue()); + } else { + getLogger().error("Failed to post message to Slack with response code {}", new Object[]{responseCode}); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + context.yield(); + } + } catch (IOException e) { + getLogger().error("Failed to open connection", e); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + context.yield(); + } + } + + private static class EmojiValidator implements Validator { + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + if (input.startsWith(":") && input.endsWith(":") && input.length() > 2) { + return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); + } + + return new ValidationResult.Builder().input(input).subject(subject).valid(false) + .explanation("Must begin and end with a colon") + .build(); + } + } +} diff --git a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000000..9a861ee379 --- /dev/null +++ b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.slack.PutSlack \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/resources/docs/org.apache.nifi.processors.slack.PutSlack/additionalDetails.html b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/resources/docs/org.apache.nifi.processors.slack.PutSlack/additionalDetails.html new file mode 100644 index 0000000000..53f2dc55ed --- /dev/null +++ b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/resources/docs/org.apache.nifi.processors.slack.PutSlack/additionalDetails.html @@ -0,0 +1,48 @@ + + + + + + PutSlack + + + + + +

Description:

+

+ The PutSlack processor sends messages to Slack, + a team-oriented messaging service. +

+

+ This processor uses Slack's incoming webhooks + custom integration to post messages to a specific channel. Before using PutSlack, your Slack team should be + configured for the incoming webhooks custom integration, and you'll need to configure at least one incoming + webhook. +

+

+ To configure PutSlack, set the following mandatory properties: +

    +
  • + Webhook URL: The URL received from Slack that allows the processor to send messages to your team. +
  • +
  • + Webhook Text: The text of the message to send to Slack. +
  • +
+

+ + diff --git a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/test/java/org/apache/nifi/processors/slack/CaptureServlet.java b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/test/java/org/apache/nifi/processors/slack/CaptureServlet.java new file mode 100644 index 0000000000..a605130a7d --- /dev/null +++ b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/test/java/org/apache/nifi/processors/slack/CaptureServlet.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.slack; + +import java.io.IOException; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Map; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.core.Response.Status; + +import org.apache.nifi.stream.io.ByteArrayOutputStream; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.file.FileUtils; + +public class CaptureServlet extends HttpServlet { + + private static final long serialVersionUID = 8402271018449653919L; + + private volatile byte[] lastPost; + private volatile Map lastPostHeaders; + + public byte[] getLastPost() { + return lastPost; + } + + public Map getLastPostHeaders() { + return lastPostHeaders; + } + + @Override + protected void doPost(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + // Capture all the headers for reference. Intentionally choosing to not special handling for headers with multiple values for clarity + final Enumeration headerNames = request.getHeaderNames(); + lastPostHeaders = new HashMap<>(); + while (headerNames.hasMoreElements()) { + final String nextHeader = headerNames.nextElement(); + lastPostHeaders.put(nextHeader, request.getHeader(nextHeader)); + } + + try { + StreamUtils.copy(request.getInputStream(), baos); + this.lastPost = baos.toByteArray(); + } finally { + FileUtils.closeQuietly(baos); + } + response.setStatus(Status.OK.getStatusCode()); + } +} diff --git a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/test/java/org/apache/nifi/processors/slack/PutSlackTest.java b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/test/java/org/apache/nifi/processors/slack/PutSlackTest.java new file mode 100644 index 0000000000..bef2e5ed1f --- /dev/null +++ b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/test/java/org/apache/nifi/processors/slack/PutSlackTest.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.slack; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.eclipse.jetty.servlet.ServletHandler; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class PutSlackTest { + + private TestRunner testRunner; + private TestServer server; + private CaptureServlet servlet; + public static final String WEBHOOK_TEST_TEXT = "Hello From Apache NiFi"; + + @Before + public void init() throws Exception { + testRunner = TestRunners.newTestRunner(PutSlack.class); + + // set up web service + ServletHandler handler = new ServletHandler(); + handler.addServletWithMapping(CaptureServlet.class, "/*"); + servlet = (CaptureServlet) handler.getServlets()[0].getServlet(); + + // create the service + server = new TestServer(); + server.addHandler(handler); + server.startServer(); + } + + @Test(expected = AssertionError.class) + public void testBlankText() { + testRunner.setProperty(PutSlack.WEBHOOK_URL, server.getUrl()); + testRunner.setProperty(PutSlack.WEBHOOK_TEXT, ""); + + testRunner.enqueue(new byte[0]); + testRunner.run(1); + } + + @Test + public void testBlankTextViaExpression() { + testRunner.setProperty(PutSlack.WEBHOOK_URL, server.getUrl()); + testRunner.setProperty(PutSlack.WEBHOOK_TEXT, "${invalid-attr}"); // Create a blank webhook text + + testRunner.enqueue(new byte[0]); + testRunner.run(1); + testRunner.assertAllFlowFilesTransferred(PutSlack.REL_FAILURE); + } + + @Test + public void testInvalidChannel() { + testRunner.setProperty(PutSlack.WEBHOOK_URL, server.getUrl()); + testRunner.setProperty(PutSlack.WEBHOOK_TEXT, WEBHOOK_TEST_TEXT); + testRunner.setProperty(PutSlack.CHANNEL, "invalid"); + + testRunner.enqueue(new byte[0]); + testRunner.run(1); + testRunner.assertAllFlowFilesTransferred(PutSlack.REL_FAILURE); + } + + @Test(expected = AssertionError.class) + public void testInvalidIconUrl() { + testRunner.setProperty(PutSlack.WEBHOOK_URL, server.getUrl()); + testRunner.setProperty(PutSlack.WEBHOOK_TEXT, WEBHOOK_TEST_TEXT); + testRunner.setProperty(PutSlack.ICON_URL, "invalid"); + + testRunner.enqueue(new byte[0]); + testRunner.run(1); + } + + @Test(expected = AssertionError.class) + public void testInvalidIconEmoji() { + testRunner.setProperty(PutSlack.WEBHOOK_URL, server.getUrl()); + testRunner.setProperty(PutSlack.WEBHOOK_TEXT, WEBHOOK_TEST_TEXT); + testRunner.setProperty(PutSlack.ICON_EMOJI, "invalid"); + + testRunner.enqueue(new byte[0]); + testRunner.run(1); + } + + @Test + public void testGetPropertyDescriptors() throws Exception { + PutSlack processor = new PutSlack(); + List pd = processor.getSupportedPropertyDescriptors(); + assertEquals("size should be eq", 6, pd.size()); + assertTrue(pd.contains(PutSlack.WEBHOOK_TEXT)); + assertTrue(pd.contains(PutSlack.WEBHOOK_URL)); + assertTrue(pd.contains(PutSlack.CHANNEL)); + assertTrue(pd.contains(PutSlack.USERNAME)); + assertTrue(pd.contains(PutSlack.ICON_URL)); + assertTrue(pd.contains(PutSlack.ICON_EMOJI)); + } + + @Test + public void testSimplePut() { + testRunner.setProperty(PutSlack.WEBHOOK_URL, server.getUrl()); + testRunner.setProperty(PutSlack.WEBHOOK_TEXT, PutSlackTest.WEBHOOK_TEST_TEXT); + + testRunner.enqueue(new byte[0]); + testRunner.run(1); + testRunner.assertAllFlowFilesTransferred(PutSlack.REL_SUCCESS, 1); + + byte[] expected = "payload=%7B%22text%22%3A%22Hello+From+Apache+NiFi%22%7D".getBytes(); + assertTrue(Arrays.equals(expected, servlet.getLastPost())); + } + + @Test + public void testSimplePutWithAttributes() { + testRunner.setProperty(PutSlack.WEBHOOK_URL, server.getUrl()); + testRunner.setProperty(PutSlack.WEBHOOK_TEXT, PutSlackTest.WEBHOOK_TEST_TEXT); + testRunner.setProperty(PutSlack.CHANNEL, "#test-attributes"); + testRunner.setProperty(PutSlack.USERNAME, "integration-test-webhook"); + testRunner.setProperty(PutSlack.ICON_EMOJI, ":smile:"); + + testRunner.enqueue(new byte[0]); + testRunner.run(1); + testRunner.assertAllFlowFilesTransferred(PutSlack.REL_SUCCESS, 1); + + final String expected = "payload=%7B%22text%22%3A%22Hello+From+Apache+NiFi%22%2C%22channel%22%3A%22%23test-attributes%22%2C%22username%22%3A%22" + + "integration-test-webhook%22%2C%22icon_emoji%22%3A%22%3Asmile%3A%22%7D"; + assertTrue(Arrays.equals(expected.getBytes(), servlet.getLastPost())); + } + + @Test + public void testSimplePutWithAttributesIconURL() { + testRunner.setProperty(PutSlack.WEBHOOK_URL, server.getUrl()); + testRunner.setProperty(PutSlack.WEBHOOK_TEXT, PutSlackTest.WEBHOOK_TEST_TEXT); + testRunner.setProperty(PutSlack.CHANNEL, "#test-attributes-url"); + testRunner.setProperty(PutSlack.USERNAME, "integration-test-webhook"); + testRunner.setProperty(PutSlack.ICON_URL, "http://lorempixel.com/48/48/"); + + testRunner.enqueue(new byte[0]); + testRunner.run(1); + testRunner.assertAllFlowFilesTransferred(PutSlack.REL_SUCCESS, 1); + + final String expected = "payload=%7B%22text%22%3A%22Hello+From+Apache+NiFi%22%2C%22channel%22%3A%22%23test-attributes-url%22%2C%22username%22%3A%22" + + "integration-test-webhook%22%2C%22icon_url%22%3A%22http%3A%2F%2Florempixel.com%2F48%2F48%2F%22%7D"; + assertTrue(Arrays.equals(expected.getBytes(), servlet.getLastPost())); + } +} diff --git a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/test/java/org/apache/nifi/processors/slack/TestServer.java b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/test/java/org/apache/nifi/processors/slack/TestServer.java new file mode 100644 index 0000000000..6506fd8afe --- /dev/null +++ b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/test/java/org/apache/nifi/processors/slack/TestServer.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.slack; + +import org.apache.nifi.ssl.StandardSSLContextService; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.handler.HandlerCollection; +import org.eclipse.jetty.util.ssl.SslContextFactory; + +import java.util.Map; + +/** + * Test server to assist with unit tests that requires a server to be stood up. + */ +public class TestServer { + + public static final String NEED_CLIENT_AUTH = "clientAuth"; + + private Server jetty; + private boolean secure = false; + + /** + * Creates the test server. + */ + public TestServer() { + createServer(null); + } + + /** + * Creates the test server. + * + * @param sslProperties SSLProps to be used in the secure connection. The keys should should use the StandardSSLContextService properties. + */ + public TestServer(final Map sslProperties) { + createServer(sslProperties); + } + + private void createServer(final Map sslProperties) { + jetty = new Server(); + + // create the unsecure connector + createConnector(); + + // create the secure connector if sslProperties are specified + if (sslProperties != null) { + createSecureConnector(sslProperties); + } + + jetty.setHandler(new HandlerCollection(true)); + } + + /** + * Creates the http connection + */ + private void createConnector() { + final ServerConnector http = new ServerConnector(jetty); + http.setPort(0); + // Severely taxed environments may have significant delays when executing. + http.setIdleTimeout(30000L); + jetty.addConnector(http); + } + + private void createSecureConnector(final Map sslProperties) { + SslContextFactory ssl = new SslContextFactory(); + + if (sslProperties.get(StandardSSLContextService.KEYSTORE.getName()) != null) { + ssl.setKeyStorePath(sslProperties.get(StandardSSLContextService.KEYSTORE.getName())); + ssl.setKeyStorePassword(sslProperties.get(StandardSSLContextService.KEYSTORE_PASSWORD.getName())); + ssl.setKeyStoreType(sslProperties.get(StandardSSLContextService.KEYSTORE_TYPE.getName())); + } + + if (sslProperties.get(StandardSSLContextService.TRUSTSTORE.getName()) != null) { + ssl.setTrustStorePath(sslProperties.get(StandardSSLContextService.TRUSTSTORE.getName())); + ssl.setTrustStorePassword(sslProperties.get(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName())); + ssl.setTrustStoreType(sslProperties.get(StandardSSLContextService.TRUSTSTORE_TYPE.getName())); + } + + final String clientAuth = sslProperties.get(NEED_CLIENT_AUTH); + if (clientAuth == null) { + ssl.setNeedClientAuth(true); + } else { + ssl.setNeedClientAuth(Boolean.parseBoolean(clientAuth)); + } + + // build the connector + final ServerConnector https = new ServerConnector(jetty, ssl); + + // set host and port + https.setPort(0); + // Severely taxed environments may have significant delays when executing. + https.setIdleTimeout(30000L); + + // add the connector + jetty.addConnector(https); + + // mark secure as enabled + secure = true; + } + + public void clearHandlers() { + HandlerCollection hc = (HandlerCollection) jetty.getHandler(); + Handler[] ha = hc.getHandlers(); + if (ha != null) { + for (Handler h : ha) { + hc.removeHandler(h); + } + } + } + + public void addHandler(Handler handler) { + ((HandlerCollection) jetty.getHandler()).addHandler(handler); + } + + public void startServer() throws Exception { + jetty.start(); + } + + public void shutdownServer() throws Exception { + jetty.stop(); + jetty.destroy(); + } + + private int getPort() { + if (!jetty.isStarted()) { + throw new IllegalStateException("Jetty server not started"); + } + return ((ServerConnector) jetty.getConnectors()[0]).getLocalPort(); + } + + private int getSecurePort() { + if (!jetty.isStarted()) { + throw new IllegalStateException("Jetty server not started"); + } + return ((ServerConnector) jetty.getConnectors()[1]).getLocalPort(); + } + + public String getUrl() { + return "http://localhost:" + getPort(); + } + + public String getSecureUrl() { + String url = null; + if (secure) { + url = "https://localhost:" + getSecurePort(); + } + return url; + } +} diff --git a/nifi-nar-bundles/nifi-slack-bundle/pom.xml b/nifi-nar-bundles/nifi-slack-bundle/pom.xml new file mode 100644 index 0000000000..89f3c34496 --- /dev/null +++ b/nifi-nar-bundles/nifi-slack-bundle/pom.xml @@ -0,0 +1,35 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-nar-bundles + 1.0.0-SNAPSHOT + + + org.apache.nifi + nifi-slack-bundle + 1.0.0-SNAPSHOT + pom + + + nifi-slack-processors + nifi-slack-nar + + + diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/slack/TestServer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/slack/TestServer.java new file mode 100644 index 0000000000..1037ca6f91 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/slack/TestServer.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.slack; + +import java.util.Map; +import org.apache.nifi.ssl.StandardSSLContextService; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.handler.HandlerCollection; +import org.eclipse.jetty.util.ssl.SslContextFactory; + +/** + * Test server to assist with unit tests that requires a server to be stood up. + */ +public class TestServer { + + public static final String NEED_CLIENT_AUTH = "clientAuth"; + + private Server jetty; + private boolean secure = false; + + /** + * Creates the test server. + */ + public TestServer() { + createServer(null); + } + + /** + * Creates the test server. + * + * @param sslProperties SSLProps to be used in the secure connection. The keys should should use the StandardSSLContextService properties. + */ + public TestServer(final Map sslProperties) { + createServer(sslProperties); + } + + private void createServer(final Map sslProperties) { + jetty = new Server(); + + // create the unsecure connector + createConnector(); + + // create the secure connector if sslProperties are specified + if (sslProperties != null) { + createSecureConnector(sslProperties); + } + + jetty.setHandler(new HandlerCollection(true)); + } + + /** + * Creates the http connection + */ + private void createConnector() { + final ServerConnector http = new ServerConnector(jetty); + http.setPort(0); + // Severely taxed environments may have significant delays when executing. + http.setIdleTimeout(30000L); + jetty.addConnector(http); + } + + private void createSecureConnector(final Map sslProperties) { + SslContextFactory ssl = new SslContextFactory(); + + if (sslProperties.get(StandardSSLContextService.KEYSTORE.getName()) != null) { + ssl.setKeyStorePath(sslProperties.get(StandardSSLContextService.KEYSTORE.getName())); + ssl.setKeyStorePassword(sslProperties.get(StandardSSLContextService.KEYSTORE_PASSWORD.getName())); + ssl.setKeyStoreType(sslProperties.get(StandardSSLContextService.KEYSTORE_TYPE.getName())); + } + + if (sslProperties.get(StandardSSLContextService.TRUSTSTORE.getName()) != null) { + ssl.setTrustStorePath(sslProperties.get(StandardSSLContextService.TRUSTSTORE.getName())); + ssl.setTrustStorePassword(sslProperties.get(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName())); + ssl.setTrustStoreType(sslProperties.get(StandardSSLContextService.TRUSTSTORE_TYPE.getName())); + } + + final String clientAuth = sslProperties.get(NEED_CLIENT_AUTH); + if (clientAuth == null) { + ssl.setNeedClientAuth(true); + } else { + ssl.setNeedClientAuth(Boolean.parseBoolean(clientAuth)); + } + + // build the connector + final ServerConnector https = new ServerConnector(jetty, ssl); + + // set host and port + https.setPort(0); + // Severely taxed environments may have significant delays when executing. + https.setIdleTimeout(30000L); + + // add the connector + jetty.addConnector(https); + + // mark secure as enabled + secure = true; + } + + public void clearHandlers() { + HandlerCollection hc = (HandlerCollection) jetty.getHandler(); + Handler[] ha = hc.getHandlers(); + if (ha != null) { + for (Handler h : ha) { + hc.removeHandler(h); + } + } + } + + public void addHandler(Handler handler) { + ((HandlerCollection) jetty.getHandler()).addHandler(handler); + } + + public void startServer() throws Exception { + jetty.start(); + } + + public void shutdownServer() throws Exception { + jetty.stop(); + jetty.destroy(); + } + + private int getPort() { + if (!jetty.isStarted()) { + throw new IllegalStateException("Jetty server not started"); + } + return ((ServerConnector) jetty.getConnectors()[0]).getLocalPort(); + } + + private int getSecurePort() { + if (!jetty.isStarted()) { + throw new IllegalStateException("Jetty server not started"); + } + return ((ServerConnector) jetty.getConnectors()[1]).getLocalPort(); + } + + public String getUrl() { + return "http://localhost:" + getPort(); + } + + public String getSecureUrl() { + String url = null; + if (secure) { + url = "https://localhost:" + getSecurePort(); + } + return url; + } +} diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index 998ced8841..250822a2ff 100644 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -64,6 +64,7 @@ nifi-site-to-site-reporting-bundle nifi-mqtt-bundle nifi-evtx-bundle + nifi-slack-bundle diff --git a/pom.xml b/pom.xml index df4e70cd31..25ac4a33e8 100644 --- a/pom.xml +++ b/pom.xml @@ -1102,6 +1102,12 @@ language governing permissions and limitations under the License. --> 1.0.0-SNAPSHOT nar + + org.apache.nifi + nifi-slack-nar + 1.0.0-SNAPSHOT + nar + org.apache.nifi nifi-elasticsearch-nar