NIFI-1578: Create PutSlack processor

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes #256
This commit is contained in:
Adam Lamar 2016-03-10 10:17:09 -07:00 committed by Matt Burgess
parent c955ec1689
commit db9a79f79d
14 changed files with 1060 additions and 0 deletions

View File

@ -337,6 +337,11 @@ language governing permissions and limitations under the License. -->
<artifactId>nifi-evtx-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-slack-nar</artifactId>
<type>nar</type>
</dependency>
</dependencies>
<properties>

View File

@ -0,0 +1,41 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-slack-bundle</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-slack-nar</artifactId>
<version>1.0.0-SNAPSHOT</version>
<packaging>nar</packaging>
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
<source.skip>true</source.skip>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-slack-processors</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,18 @@
nifi-slack-nar
Copyright 2016 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
************************
Common Development and Distribution License 1.1
************************
The following binary components are provided under the Common Development and Distribution License 1.1. See project link for details.
(CDDL 1.1) (GPL2 w/ CPE) JSON Processing API (javax.json:javax.json-api:jar:1.0 - http://json-processing-spec.java.net)
(CDDL 1.1) (GPL2 w/ CPE) JSON Processing Default Provider (org.glassfish:javax.json:jar:1.0.4 - https://jsonp.java.net)
The following binary components are provided under the Common Development and Distribution License v1.0. See project link for details.
(CDDL 1.0) JSR311 API (javax.ws.rs:jsr311-api:jar:1.1.1 - https://jsr311.dev.java.net)

View File

@ -0,0 +1,87 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-slack-bundle</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-slack-processors</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>javax.json</groupId>
<artifactId>javax.json-api</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.glassfish</groupId>
<artifactId>javax.json</artifactId>
<version>1.0.4</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>9.2.11.v20150529</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<version>9.2.11.v20150529</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
<version>2.0.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,245 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.slack;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.DataOutputStream;
import javax.json.Json;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import javax.json.JsonWriter;
import java.io.IOException;
import java.io.StringWriter;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@Tags({"put", "slack", "notify"})
@CapabilityDescription("Sends a message to your team on slack.com")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
public class PutSlack extends AbstractProcessor {
public static final PropertyDescriptor WEBHOOK_URL = new PropertyDescriptor
.Builder()
.name("webhook-url")
.displayName("Webhook URL")
.description("The POST URL provided by Slack to send messages into a channel.")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.addValidator(StandardValidators.URL_VALIDATOR)
.sensitive(true)
.build();
public static final PropertyDescriptor WEBHOOK_TEXT = new PropertyDescriptor
.Builder()
.name("webhook-text")
.displayName("Webhook Text")
.description("The text sent in the webhook message")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor CHANNEL = new PropertyDescriptor
.Builder()
.name("channel")
.displayName("Channel")
.description("A public channel using #channel or direct message using @username. If not specified, " +
"the default webhook channel as specified in Slack's Incoming Webhooks web interface is used.")
.required(false)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor USERNAME = new PropertyDescriptor
.Builder()
.name("username")
.displayName("Username")
.description("The displayed Slack username")
.required(false)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor ICON_URL = new PropertyDescriptor
.Builder()
.name("icon-url")
.displayName("Icon URL")
.description("Icon URL to be used for the message")
.required(false)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.URL_VALIDATOR)
.build();
public static final PropertyDescriptor ICON_EMOJI = new PropertyDescriptor
.Builder()
.name("icon-emoji")
.displayName("Icon Emoji")
.description("Icon Emoji to be used for the message. Must begin and end with a colon, e.g. :ghost:")
.required(false)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.addValidator(new EmojiValidator())
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles are routed to success after being successfully sent to Slack")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFiles are routed to failure if unable to be sent to Slack")
.build();
public static final List<PropertyDescriptor> descriptors = Collections.unmodifiableList(
Arrays.asList(WEBHOOK_URL, WEBHOOK_TEXT, CHANNEL, USERNAME, ICON_URL, ICON_EMOJI));
public static final Set<Relationship> relationships = Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
// Validate the channel (or username for a direct message)
private String validateChannel(String channel) {
if ((channel.startsWith("#") || channel.startsWith("@")) && channel.length() > 1) {
return null;
}
return "Channel must begin with '#' or '@'";
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
if ( flowFile == null ) {
return;
}
JsonObjectBuilder builder = Json.createObjectBuilder();
String text = context.getProperty(WEBHOOK_TEXT).evaluateAttributeExpressions(flowFile).getValue();
if (text != null && !text.isEmpty()) {
builder.add("text", text);
} else {
// Slack requires the 'text' attribute
getLogger().error("FlowFile should have non-empty " + WEBHOOK_TEXT.getName());
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
}
String channel = context.getProperty(CHANNEL).evaluateAttributeExpressions(flowFile).getValue();
if (channel != null && !channel.isEmpty()) {
String error = validateChannel(channel);
if (error == null) {
builder.add("channel", channel);
} else {
getLogger().error("Invalid channel '{}': {}", new Object[]{channel, error});
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
}
}
String username = context.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue();
if (username != null && !username.isEmpty()) {
builder.add("username", username);
}
String iconUrl = context.getProperty(ICON_URL).evaluateAttributeExpressions(flowFile).getValue();
if (iconUrl != null && !iconUrl.isEmpty()) {
builder.add("icon_url", iconUrl);
}
String iconEmoji = context.getProperty(ICON_EMOJI).evaluateAttributeExpressions(flowFile).getValue();
if (iconEmoji != null && !iconEmoji.isEmpty()) {
builder.add("icon_emoji", iconEmoji);
}
JsonObject jsonObject = builder.build();
StringWriter stringWriter = new StringWriter();
JsonWriter jsonWriter = Json.createWriter(stringWriter);
jsonWriter.writeObject(jsonObject);
jsonWriter.close();
try {
URL url = new URL(context.getProperty(WEBHOOK_URL).getValue());
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("POST");
conn.setDoOutput(true);
DataOutputStream outputStream = new DataOutputStream(conn.getOutputStream());
String payload = "payload=" + URLEncoder.encode(stringWriter.getBuffer().toString(), "UTF-8");
outputStream.writeBytes(payload);
outputStream.close();
int responseCode = conn.getResponseCode();
if (responseCode >= 200 && responseCode < 300) {
getLogger().info("Successfully posted message to Slack");
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().send(flowFile, context.getProperty(WEBHOOK_URL).getValue());
} else {
getLogger().error("Failed to post message to Slack with response code {}", new Object[]{responseCode});
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
context.yield();
}
} catch (IOException e) {
getLogger().error("Failed to open connection", e);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
context.yield();
}
}
private static class EmojiValidator implements Validator {
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
if (input.startsWith(":") && input.endsWith(":") && input.length() > 2) {
return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
}
return new ValidationResult.Builder().input(input).subject(subject).valid(false)
.explanation("Must begin and end with a colon")
.build();
}
}
}

View File

@ -0,0 +1,15 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.slack.PutSlack

View File

@ -0,0 +1,48 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>PutSlack</title>
<link rel="stylesheet" href="../../css/component-usage.css" type="text/css" />
</head>
<body>
<!-- Processor Documentation ================================================== -->
<h2>Description:</h2>
<p>
The PutSlack processor sends messages to <a href="https://www.slack.com">Slack</a>,
a team-oriented messaging service.
</p>
<p>
This processor uses Slack's <a href="https://slack.com/apps/A0F7XDUAZ-incoming-webhooks">incoming webhooks</a>
custom integration to post messages to a specific channel. Before using PutSlack, your Slack team should be
configured for the incoming webhooks custom integration, and you'll need to configure at least one incoming
webhook.
</p>
<p>
To configure PutSlack, set the following mandatory properties:
<ul>
<li>
<code>Webhook URL</code>: The URL received from Slack that allows the processor to send messages to your team.
</li>
<li>
<code>Webhook Text</code>: The text of the message to send to Slack.
</li>
</ul>
</p>
</body>
</html>

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.slack;
import java.io.IOException;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.Response.Status;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.file.FileUtils;
public class CaptureServlet extends HttpServlet {
private static final long serialVersionUID = 8402271018449653919L;
private volatile byte[] lastPost;
private volatile Map<String, String> lastPostHeaders;
public byte[] getLastPost() {
return lastPost;
}
public Map<String, String> getLastPostHeaders() {
return lastPostHeaders;
}
@Override
protected void doPost(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
// Capture all the headers for reference. Intentionally choosing to not special handling for headers with multiple values for clarity
final Enumeration<String> headerNames = request.getHeaderNames();
lastPostHeaders = new HashMap<>();
while (headerNames.hasMoreElements()) {
final String nextHeader = headerNames.nextElement();
lastPostHeaders.put(nextHeader, request.getHeader(nextHeader));
}
try {
StreamUtils.copy(request.getInputStream(), baos);
this.lastPost = baos.toByteArray();
} finally {
FileUtils.closeQuietly(baos);
}
response.setStatus(Status.OK.getStatusCode());
}
}

View File

@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.slack;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.eclipse.jetty.servlet.ServletHandler;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class PutSlackTest {
private TestRunner testRunner;
private TestServer server;
private CaptureServlet servlet;
public static final String WEBHOOK_TEST_TEXT = "Hello From Apache NiFi";
@Before
public void init() throws Exception {
testRunner = TestRunners.newTestRunner(PutSlack.class);
// set up web service
ServletHandler handler = new ServletHandler();
handler.addServletWithMapping(CaptureServlet.class, "/*");
servlet = (CaptureServlet) handler.getServlets()[0].getServlet();
// create the service
server = new TestServer();
server.addHandler(handler);
server.startServer();
}
@Test(expected = AssertionError.class)
public void testBlankText() {
testRunner.setProperty(PutSlack.WEBHOOK_URL, server.getUrl());
testRunner.setProperty(PutSlack.WEBHOOK_TEXT, "");
testRunner.enqueue(new byte[0]);
testRunner.run(1);
}
@Test
public void testBlankTextViaExpression() {
testRunner.setProperty(PutSlack.WEBHOOK_URL, server.getUrl());
testRunner.setProperty(PutSlack.WEBHOOK_TEXT, "${invalid-attr}"); // Create a blank webhook text
testRunner.enqueue(new byte[0]);
testRunner.run(1);
testRunner.assertAllFlowFilesTransferred(PutSlack.REL_FAILURE);
}
@Test
public void testInvalidChannel() {
testRunner.setProperty(PutSlack.WEBHOOK_URL, server.getUrl());
testRunner.setProperty(PutSlack.WEBHOOK_TEXT, WEBHOOK_TEST_TEXT);
testRunner.setProperty(PutSlack.CHANNEL, "invalid");
testRunner.enqueue(new byte[0]);
testRunner.run(1);
testRunner.assertAllFlowFilesTransferred(PutSlack.REL_FAILURE);
}
@Test(expected = AssertionError.class)
public void testInvalidIconUrl() {
testRunner.setProperty(PutSlack.WEBHOOK_URL, server.getUrl());
testRunner.setProperty(PutSlack.WEBHOOK_TEXT, WEBHOOK_TEST_TEXT);
testRunner.setProperty(PutSlack.ICON_URL, "invalid");
testRunner.enqueue(new byte[0]);
testRunner.run(1);
}
@Test(expected = AssertionError.class)
public void testInvalidIconEmoji() {
testRunner.setProperty(PutSlack.WEBHOOK_URL, server.getUrl());
testRunner.setProperty(PutSlack.WEBHOOK_TEXT, WEBHOOK_TEST_TEXT);
testRunner.setProperty(PutSlack.ICON_EMOJI, "invalid");
testRunner.enqueue(new byte[0]);
testRunner.run(1);
}
@Test
public void testGetPropertyDescriptors() throws Exception {
PutSlack processor = new PutSlack();
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
assertEquals("size should be eq", 6, pd.size());
assertTrue(pd.contains(PutSlack.WEBHOOK_TEXT));
assertTrue(pd.contains(PutSlack.WEBHOOK_URL));
assertTrue(pd.contains(PutSlack.CHANNEL));
assertTrue(pd.contains(PutSlack.USERNAME));
assertTrue(pd.contains(PutSlack.ICON_URL));
assertTrue(pd.contains(PutSlack.ICON_EMOJI));
}
@Test
public void testSimplePut() {
testRunner.setProperty(PutSlack.WEBHOOK_URL, server.getUrl());
testRunner.setProperty(PutSlack.WEBHOOK_TEXT, PutSlackTest.WEBHOOK_TEST_TEXT);
testRunner.enqueue(new byte[0]);
testRunner.run(1);
testRunner.assertAllFlowFilesTransferred(PutSlack.REL_SUCCESS, 1);
byte[] expected = "payload=%7B%22text%22%3A%22Hello+From+Apache+NiFi%22%7D".getBytes();
assertTrue(Arrays.equals(expected, servlet.getLastPost()));
}
@Test
public void testSimplePutWithAttributes() {
testRunner.setProperty(PutSlack.WEBHOOK_URL, server.getUrl());
testRunner.setProperty(PutSlack.WEBHOOK_TEXT, PutSlackTest.WEBHOOK_TEST_TEXT);
testRunner.setProperty(PutSlack.CHANNEL, "#test-attributes");
testRunner.setProperty(PutSlack.USERNAME, "integration-test-webhook");
testRunner.setProperty(PutSlack.ICON_EMOJI, ":smile:");
testRunner.enqueue(new byte[0]);
testRunner.run(1);
testRunner.assertAllFlowFilesTransferred(PutSlack.REL_SUCCESS, 1);
final String expected = "payload=%7B%22text%22%3A%22Hello+From+Apache+NiFi%22%2C%22channel%22%3A%22%23test-attributes%22%2C%22username%22%3A%22" +
"integration-test-webhook%22%2C%22icon_emoji%22%3A%22%3Asmile%3A%22%7D";
assertTrue(Arrays.equals(expected.getBytes(), servlet.getLastPost()));
}
@Test
public void testSimplePutWithAttributesIconURL() {
testRunner.setProperty(PutSlack.WEBHOOK_URL, server.getUrl());
testRunner.setProperty(PutSlack.WEBHOOK_TEXT, PutSlackTest.WEBHOOK_TEST_TEXT);
testRunner.setProperty(PutSlack.CHANNEL, "#test-attributes-url");
testRunner.setProperty(PutSlack.USERNAME, "integration-test-webhook");
testRunner.setProperty(PutSlack.ICON_URL, "http://lorempixel.com/48/48/");
testRunner.enqueue(new byte[0]);
testRunner.run(1);
testRunner.assertAllFlowFilesTransferred(PutSlack.REL_SUCCESS, 1);
final String expected = "payload=%7B%22text%22%3A%22Hello+From+Apache+NiFi%22%2C%22channel%22%3A%22%23test-attributes-url%22%2C%22username%22%3A%22"
+ "integration-test-webhook%22%2C%22icon_url%22%3A%22http%3A%2F%2Florempixel.com%2F48%2F48%2F%22%7D";
assertTrue(Arrays.equals(expected.getBytes(), servlet.getLastPost()));
}
}

View File

@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.slack;
import org.apache.nifi.ssl.StandardSSLContextService;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import java.util.Map;
/**
* Test server to assist with unit tests that requires a server to be stood up.
*/
public class TestServer {
public static final String NEED_CLIENT_AUTH = "clientAuth";
private Server jetty;
private boolean secure = false;
/**
* Creates the test server.
*/
public TestServer() {
createServer(null);
}
/**
* Creates the test server.
*
* @param sslProperties SSLProps to be used in the secure connection. The keys should should use the StandardSSLContextService properties.
*/
public TestServer(final Map<String, String> sslProperties) {
createServer(sslProperties);
}
private void createServer(final Map<String, String> sslProperties) {
jetty = new Server();
// create the unsecure connector
createConnector();
// create the secure connector if sslProperties are specified
if (sslProperties != null) {
createSecureConnector(sslProperties);
}
jetty.setHandler(new HandlerCollection(true));
}
/**
* Creates the http connection
*/
private void createConnector() {
final ServerConnector http = new ServerConnector(jetty);
http.setPort(0);
// Severely taxed environments may have significant delays when executing.
http.setIdleTimeout(30000L);
jetty.addConnector(http);
}
private void createSecureConnector(final Map<String, String> sslProperties) {
SslContextFactory ssl = new SslContextFactory();
if (sslProperties.get(StandardSSLContextService.KEYSTORE.getName()) != null) {
ssl.setKeyStorePath(sslProperties.get(StandardSSLContextService.KEYSTORE.getName()));
ssl.setKeyStorePassword(sslProperties.get(StandardSSLContextService.KEYSTORE_PASSWORD.getName()));
ssl.setKeyStoreType(sslProperties.get(StandardSSLContextService.KEYSTORE_TYPE.getName()));
}
if (sslProperties.get(StandardSSLContextService.TRUSTSTORE.getName()) != null) {
ssl.setTrustStorePath(sslProperties.get(StandardSSLContextService.TRUSTSTORE.getName()));
ssl.setTrustStorePassword(sslProperties.get(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName()));
ssl.setTrustStoreType(sslProperties.get(StandardSSLContextService.TRUSTSTORE_TYPE.getName()));
}
final String clientAuth = sslProperties.get(NEED_CLIENT_AUTH);
if (clientAuth == null) {
ssl.setNeedClientAuth(true);
} else {
ssl.setNeedClientAuth(Boolean.parseBoolean(clientAuth));
}
// build the connector
final ServerConnector https = new ServerConnector(jetty, ssl);
// set host and port
https.setPort(0);
// Severely taxed environments may have significant delays when executing.
https.setIdleTimeout(30000L);
// add the connector
jetty.addConnector(https);
// mark secure as enabled
secure = true;
}
public void clearHandlers() {
HandlerCollection hc = (HandlerCollection) jetty.getHandler();
Handler[] ha = hc.getHandlers();
if (ha != null) {
for (Handler h : ha) {
hc.removeHandler(h);
}
}
}
public void addHandler(Handler handler) {
((HandlerCollection) jetty.getHandler()).addHandler(handler);
}
public void startServer() throws Exception {
jetty.start();
}
public void shutdownServer() throws Exception {
jetty.stop();
jetty.destroy();
}
private int getPort() {
if (!jetty.isStarted()) {
throw new IllegalStateException("Jetty server not started");
}
return ((ServerConnector) jetty.getConnectors()[0]).getLocalPort();
}
private int getSecurePort() {
if (!jetty.isStarted()) {
throw new IllegalStateException("Jetty server not started");
}
return ((ServerConnector) jetty.getConnectors()[1]).getLocalPort();
}
public String getUrl() {
return "http://localhost:" + getPort();
}
public String getSecureUrl() {
String url = null;
if (secure) {
url = "https://localhost:" + getSecurePort();
}
return url;
}
}

View File

@ -0,0 +1,35 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-slack-bundle</artifactId>
<version>1.0.0-SNAPSHOT</version>
<packaging>pom</packaging>
<modules>
<module>nifi-slack-processors</module>
<module>nifi-slack-nar</module>
</modules>
</project>

View File

@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.slack;
import java.util.Map;
import org.apache.nifi.ssl.StandardSSLContextService;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.util.ssl.SslContextFactory;
/**
* Test server to assist with unit tests that requires a server to be stood up.
*/
public class TestServer {
public static final String NEED_CLIENT_AUTH = "clientAuth";
private Server jetty;
private boolean secure = false;
/**
* Creates the test server.
*/
public TestServer() {
createServer(null);
}
/**
* Creates the test server.
*
* @param sslProperties SSLProps to be used in the secure connection. The keys should should use the StandardSSLContextService properties.
*/
public TestServer(final Map<String, String> sslProperties) {
createServer(sslProperties);
}
private void createServer(final Map<String, String> sslProperties) {
jetty = new Server();
// create the unsecure connector
createConnector();
// create the secure connector if sslProperties are specified
if (sslProperties != null) {
createSecureConnector(sslProperties);
}
jetty.setHandler(new HandlerCollection(true));
}
/**
* Creates the http connection
*/
private void createConnector() {
final ServerConnector http = new ServerConnector(jetty);
http.setPort(0);
// Severely taxed environments may have significant delays when executing.
http.setIdleTimeout(30000L);
jetty.addConnector(http);
}
private void createSecureConnector(final Map<String, String> sslProperties) {
SslContextFactory ssl = new SslContextFactory();
if (sslProperties.get(StandardSSLContextService.KEYSTORE.getName()) != null) {
ssl.setKeyStorePath(sslProperties.get(StandardSSLContextService.KEYSTORE.getName()));
ssl.setKeyStorePassword(sslProperties.get(StandardSSLContextService.KEYSTORE_PASSWORD.getName()));
ssl.setKeyStoreType(sslProperties.get(StandardSSLContextService.KEYSTORE_TYPE.getName()));
}
if (sslProperties.get(StandardSSLContextService.TRUSTSTORE.getName()) != null) {
ssl.setTrustStorePath(sslProperties.get(StandardSSLContextService.TRUSTSTORE.getName()));
ssl.setTrustStorePassword(sslProperties.get(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName()));
ssl.setTrustStoreType(sslProperties.get(StandardSSLContextService.TRUSTSTORE_TYPE.getName()));
}
final String clientAuth = sslProperties.get(NEED_CLIENT_AUTH);
if (clientAuth == null) {
ssl.setNeedClientAuth(true);
} else {
ssl.setNeedClientAuth(Boolean.parseBoolean(clientAuth));
}
// build the connector
final ServerConnector https = new ServerConnector(jetty, ssl);
// set host and port
https.setPort(0);
// Severely taxed environments may have significant delays when executing.
https.setIdleTimeout(30000L);
// add the connector
jetty.addConnector(https);
// mark secure as enabled
secure = true;
}
public void clearHandlers() {
HandlerCollection hc = (HandlerCollection) jetty.getHandler();
Handler[] ha = hc.getHandlers();
if (ha != null) {
for (Handler h : ha) {
hc.removeHandler(h);
}
}
}
public void addHandler(Handler handler) {
((HandlerCollection) jetty.getHandler()).addHandler(handler);
}
public void startServer() throws Exception {
jetty.start();
}
public void shutdownServer() throws Exception {
jetty.stop();
jetty.destroy();
}
private int getPort() {
if (!jetty.isStarted()) {
throw new IllegalStateException("Jetty server not started");
}
return ((ServerConnector) jetty.getConnectors()[0]).getLocalPort();
}
private int getSecurePort() {
if (!jetty.isStarted()) {
throw new IllegalStateException("Jetty server not started");
}
return ((ServerConnector) jetty.getConnectors()[1]).getLocalPort();
}
public String getUrl() {
return "http://localhost:" + getPort();
}
public String getSecureUrl() {
String url = null;
if (secure) {
url = "https://localhost:" + getSecurePort();
}
return url;
}
}

View File

@ -64,6 +64,7 @@
<module>nifi-site-to-site-reporting-bundle</module>
<module>nifi-mqtt-bundle</module>
<module>nifi-evtx-bundle</module>
<module>nifi-slack-bundle</module>
</modules>
<dependencyManagement>

View File

@ -1102,6 +1102,12 @@ language governing permissions and limitations under the License. -->
<version>1.0.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-slack-nar</artifactId>
<version>1.0.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-elasticsearch-nar</artifactId>