NIFI-12331 Added PublishSlack Processor

- Removed deprecated PutSlack and PostSlack

This closes #8120

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mark Payne 2023-12-03 15:56:42 -05:00 committed by exceptionfactory
parent ab8a82b997
commit a21993ef72
No known key found for this signature in database
20 changed files with 789 additions and 2094 deletions

View File

@ -26,45 +26,26 @@
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.slack.api</groupId>
<artifactId>bolt-socket-mode</artifactId>
<version>1.32.1</version>
<version>1.36.1</version>
</dependency>
<!-- Required by bolt-socket-mode but the library itself doesn't have the dependency. -->
<dependency>
<groupId>org.glassfish.tyrus.bundles</groupId>
<artifactId>tyrus-standalone-client</artifactId>
<version>1.20</version>
</dependency>
<dependency>
<groupId>javax.json</groupId>
<artifactId>javax.json-api</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>org.glassfish</groupId>
<artifactId>javax.json</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpmime</artifactId>
<version>${org.apache.httpcomponents.httpclient.version}</version>
<version>1.21</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
@ -82,17 +63,6 @@
<artifactId>mockwebserver</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
<version>2.1.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
@ -146,6 +116,5 @@
<artifactId>nifi-proxy-configuration-api</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -57,15 +57,16 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.slack.consume.ConsumeChannel;
import org.apache.nifi.processors.slack.consume.ConsumeSlackClient;
import org.apache.nifi.processors.slack.consume.ConsumeSlackUtil;
import org.apache.nifi.processors.slack.consume.UsernameLookup;
import org.apache.nifi.processors.slack.util.RateLimit;
import org.apache.nifi.processors.slack.util.SlackResponseUtil;
import org.apache.nifi.util.StringUtils;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -73,7 +74,6 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@PrimaryNodeOnly
@TriggerSerially
@ -86,7 +86,7 @@ import java.util.concurrent.atomic.AtomicLong;
@WritesAttribute(attribute = "slack.message.count", description = "The number of slack messages that are included in the FlowFile"),
@WritesAttribute(attribute = "mime.type", description = "Set to application/json, as the output will always be in JSON format")
})
@SeeAlso({ListenSlack.class, PostSlack.class, PutSlack.class})
@SeeAlso({ListenSlack.class})
@Tags({"slack", "conversation", "conversation.history", "social media", "team", "text", "unstructured"})
@CapabilityDescription("Retrieves messages from one or more configured Slack channels. The messages are written out in JSON format. " +
"See Usage / Additional Details for more information about how to configure this Processor and enable it to retrieve messages from Slack.")
@ -182,7 +182,7 @@ public class ConsumeSlack extends AbstractProcessor implements VerifiableProcess
.build();
private final AtomicLong nextRequestTime = new AtomicLong(0L);
private final RateLimit rateLimit = new RateLimit(getLogger());
private final Queue<ConsumeChannel> channels = new LinkedBlockingQueue<>();
private volatile App slackApp;
@ -329,7 +329,8 @@ public class ConsumeSlack extends AbstractProcessor implements VerifiableProcess
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
// Check to see if we are currently in a backoff period due to Slack's Rate Limit
if (isRateLimited()) {
if (rateLimit.isLimitReached()) {
getLogger().debug("Will not consume from Slack because rate limit has been reached");
context.yield();
return;
}
@ -355,32 +356,18 @@ public class ConsumeSlack extends AbstractProcessor implements VerifiableProcess
private void yieldOnException(final Throwable t, final String channelId, final ProcessContext context) {
if (ConsumeSlackUtil.isRateLimited(t)) {
if (SlackResponseUtil.isRateLimited(t)) {
getLogger().warn("Slack indicated that the Rate Limit has been exceeded when attempting to retrieve messages for channel {}", channelId);
} else {
getLogger().error("Failed to retrieve messages for channel {}", channelId, t);
}
final int retryAfterSeconds = ConsumeSlackUtil.getRetryAfterSeconds(t);
final long timeOfNextRequest = System.currentTimeMillis() + (retryAfterSeconds * 1000L);
nextRequestTime.getAndUpdate(currentTime -> Math.max(currentTime, timeOfNextRequest));
final int retryAfterSeconds = SlackResponseUtil.getRetryAfterSeconds(t);
rateLimit.retryAfter(Duration.ofSeconds(retryAfterSeconds));
context.yield();
}
private boolean isRateLimited() {
final long nextTime = nextRequestTime.get();
if (nextTime > 0 && System.currentTimeMillis() < nextTime) {
getLogger().debug("Will not retrieve any messages until {} due to Slack's Rate Limit", new Date(nextTime));
return true;
} else if (nextTime > 0) {
// Set nextRequestTime to 0 so that we no longer bother to make system calls to System.currentTimeMillis()
nextRequestTime.compareAndSet(nextTime, 0);
}
return false;
}
@Override
public List<ConfigVerificationResult> verify(final ProcessContext context, final ComponentLog verificationLogger, final Map<String, String> attributes) {
final List<ConfigVerificationResult> results = new ArrayList<>();
@ -466,7 +453,7 @@ public class ConsumeSlack extends AbstractProcessor implements VerifiableProcess
continue;
}
final String errorMessage = ConsumeSlackUtil.getErrorMessage(response.getError(), response.getNeeded(), response.getProvided(), response.getWarning());
final String errorMessage = SlackResponseUtil.getErrorMessage(response.getError(), response.getNeeded(), response.getProvided(), response.getWarning());
throw new RuntimeException("Failed to determine Channel IDs: " + errorMessage);
}
}

View File

@ -71,7 +71,7 @@ import java.util.regex.Pattern;
@WritesAttributes({
@WritesAttribute(attribute = "mime.type", description = "Set to application/json, as the output will always be in JSON format")
})
@SeeAlso({ConsumeSlack.class, PostSlack.class, PutSlack.class})
@SeeAlso({ConsumeSlack.class})
@Tags({"slack", "real-time", "event", "message", "command", "listen", "receive", "social media", "team", "text", "unstructured"})
@CapabilityDescription("Retrieves real-time messages or Slack commands from one or more Slack conversations. The messages are written out in JSON format. " +
"Note that this Processor should be used to obtain real-time messages and commands from Slack and does not provide a mechanism for obtaining historical messages. " +

View File

@ -1,501 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.slack;
import java.io.IOException;
import java.io.StringReader;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import javax.json.Json;
import javax.json.JsonArrayBuilder;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import javax.json.JsonString;
import javax.json.stream.JsonParsingException;
import javax.net.ssl.SSLContext;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.conn.socket.PlainConnectionSocketFactory;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.entity.mime.MultipartEntityBuilder;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.util.EntityUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;
@Tags({"slack", "post", "notify", "upload", "message"})
@CapabilityDescription("Sends a message on Slack. The FlowFile content (e.g. an image) can be uploaded and attached to the message.")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@DynamicProperty(name = "<Arbitrary name>", value = "JSON snippet specifying a Slack message \"attachment\"",
description = "The property value will be converted to JSON and will be added to the array of attachments in the JSON payload being sent to Slack." +
" The property name will not be used by the processor.",
expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@WritesAttribute(attribute="slack.file.url", description = "The Slack URL of the uploaded file. It will be added if 'Upload FlowFile' has been set to 'Yes'.")
public class PostSlack extends AbstractProcessor {
private static final String SLACK_POST_MESSAGE_URL = "https://slack.com/api/chat.postMessage";
private static final String SLACK_FILE_UPLOAD_URL = "https://slack.com/api/files.upload";
public static final PropertyDescriptor POST_MESSAGE_URL = new PropertyDescriptor.Builder()
.name("post-message-url")
.displayName("Post Message URL")
.description("Slack Web API URL for posting text messages to channels." +
" It only needs to be changed if Slack changes its API URL.")
.required(true)
.defaultValue(SLACK_POST_MESSAGE_URL)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.addValidator(StandardValidators.URL_VALIDATOR)
.build();
public static final PropertyDescriptor FILE_UPLOAD_URL = new PropertyDescriptor.Builder()
.name("file-upload-url")
.displayName("File Upload URL")
.description("Slack Web API URL for uploading files to channels." +
" It only needs to be changed if Slack changes its API URL.")
.required(true)
.defaultValue(SLACK_FILE_UPLOAD_URL)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.addValidator(StandardValidators.URL_VALIDATOR)
.build();
public static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder()
.name("access-token")
.displayName("Access Token")
.description("OAuth Access Token used for authenticating/authorizing the Slack request sent by NiFi.")
.required(true)
.sensitive(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor CHANNEL = new PropertyDescriptor.Builder()
.name("channel")
.displayName("Channel")
.description("Slack channel, private group, or IM channel to send the message to.")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor TEXT = new PropertyDescriptor.Builder()
.name("text")
.displayName("Text")
.description("Text of the Slack message to send. Only required if no attachment has been specified and 'Upload File' has been set to 'No'.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final AllowableValue UPLOAD_FLOWFILE_YES = new AllowableValue(
"true",
"Yes",
"Upload and attach FlowFile content to the Slack message."
);
public static final AllowableValue UPLOAD_FLOWFILE_NO = new AllowableValue(
"false",
"No",
"Don't upload and attach FlowFile content to the Slack message."
);
public static final PropertyDescriptor UPLOAD_FLOWFILE = new PropertyDescriptor.Builder()
.name("upload-flowfile")
.displayName("Upload FlowFile")
.description("Whether or not to upload and attach the FlowFile content to the Slack message.")
.allowableValues(UPLOAD_FLOWFILE_YES, UPLOAD_FLOWFILE_NO)
.required(true)
.defaultValue("false")
.build();
public static final PropertyDescriptor FILE_TITLE = new PropertyDescriptor.Builder()
.name("file-title")
.displayName("File Title")
.description("Title of the file displayed in the Slack message." +
" The property value will only be used if 'Upload FlowFile' has been set to 'Yes'.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor FILE_NAME = new PropertyDescriptor.Builder()
.name("file-name")
.displayName("File Name")
.description("Name of the file to be uploaded." +
" The property value will only be used if 'Upload FlowFile' has been set to 'Yes'." +
" If the property evaluated to null or empty string, then the file name will be set to 'file' in the Slack message.")
.defaultValue("${" + CoreAttributes.FILENAME.key() + "}")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor FILE_MIME_TYPE = new PropertyDescriptor.Builder()
.name("file-mime-type")
.displayName("File Mime Type")
.description("Mime type of the file to be uploaded." +
" The property value will only be used if 'Upload FlowFile' has been set to 'Yes'." +
" If the property evaluated to null or empty string, then the mime type will be set to 'application/octet-stream' in the Slack message.")
.defaultValue("${" + CoreAttributes.MIME_TYPE.key() + "}")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("Specifies an optional SSL Context Service that, if provided, will be used to create connections")
.identifiesControllerService(SSLContextService.class)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles are routed to success after being successfully sent to Slack")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFiles are routed to failure if unable to be sent to Slack")
.build();
public static final List<PropertyDescriptor> properties = List.of(
POST_MESSAGE_URL,
FILE_UPLOAD_URL,
ACCESS_TOKEN,
CHANNEL,
TEXT,
UPLOAD_FLOWFILE,
FILE_TITLE,
FILE_NAME,
FILE_MIME_TYPE,
SSL_CONTEXT_SERVICE);
public static final Set<Relationship> relationships = Set.of(REL_SUCCESS, REL_FAILURE);
private final SortedSet<PropertyDescriptor> attachmentProperties = Collections.synchronizedSortedSet(new TreeSet<>());
private volatile PoolingHttpClientConnectionManager connManager;
private volatile CloseableHttpClient client;
private static final ContentType MIME_TYPE_PLAINTEXT_UTF8 = ContentType.create("text/plain", StandardCharsets.UTF_8);
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.description("Slack Attachment JSON snippet that will be added to the message. The property value will only be used if 'Upload FlowFile' has been set to 'No'." +
" If the property evaluated to null or empty string, or contains invalid JSON, then it will not be added to the Slack message.")
.required(false)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.dynamic(true)
.build();
}
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@OnScheduled
public void onScheduled(ProcessContext context) {
attachmentProperties.clear();
attachmentProperties.addAll(
context.getProperties().keySet()
.stream()
.filter(PropertyDescriptor::isDynamic)
.toList());
final SSLContextService sslService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sslService != null) {
final SSLContext sslContext = sslService.createContext();
final Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder.<ConnectionSocketFactory>create()
.register("http", PlainConnectionSocketFactory.getSocketFactory())
.register("https", new SSLConnectionSocketFactory(sslContext))
.build();
connManager = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
} else {
connManager = new PoolingHttpClientConnectionManager();
}
client = HttpClientBuilder.create()
.setConnectionManager(connManager)
.build();
}
@OnStopped
public void closeHttpResources() {
try {
if (client != null) {
client.close();
client = null;
}
if (connManager != null) {
connManager.close();
connManager = null;
}
} catch (IOException e) {
getLogger().error("Could not properly close HTTP connections.", e);
}
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
List<ValidationResult> validationResults = new ArrayList<>();
boolean textSpecified = validationContext.getProperty(TEXT).isSet();
boolean attachmentSpecified = validationContext.getProperties().keySet()
.stream()
.anyMatch(PropertyDescriptor::isDynamic);
boolean uploadFileYes = validationContext.getProperty(UPLOAD_FLOWFILE).asBoolean();
if (!textSpecified && !attachmentSpecified && !uploadFileYes) {
validationResults.add(new ValidationResult.Builder()
.subject(TEXT.getDisplayName())
.valid(false)
.explanation("it is required if no attachment has been specified, nor 'Upload FlowFile' has been set to 'Yes'.")
.build());
}
return validationResults;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
CloseableHttpResponse response = null;
try {
String url;
String contentType;
HttpEntity requestBody;
if (!context.getProperty(UPLOAD_FLOWFILE).asBoolean()) {
url = context.getProperty(POST_MESSAGE_URL).getValue();
contentType = ContentType.APPLICATION_JSON.toString();
requestBody = createTextMessageRequestBody(context, flowFile);
} else {
url = context.getProperty(FILE_UPLOAD_URL).getValue();
contentType = null; // it will be set implicitly by HttpClient in case of multipart post request
requestBody = createFileMessageRequestBody(context, session, flowFile);
}
HttpPost request = new HttpPost(url);
request.setHeader(HttpHeaders.AUTHORIZATION, "Bearer " + context.getProperty(ACCESS_TOKEN).getValue());
if (contentType != null) {
request.setHeader(HttpHeaders.CONTENT_TYPE, contentType);
}
request.setEntity(requestBody);
response = client.execute(request);
int statusCode = response.getStatusLine().getStatusCode();
getLogger().debug("Status code: " + statusCode);
if (!(statusCode >= 200 && statusCode < 300)) {
throw new PostSlackException("HTTP error code: " + statusCode);
}
JsonObject responseJson;
try {
responseJson = Json.createReader(response.getEntity().getContent()).readObject();
} catch (JsonParsingException e) {
throw new PostSlackException("Slack response JSON cannot be parsed.", e);
}
getLogger().debug("Slack response: " + responseJson.toString());
try {
if (!responseJson.getBoolean("ok")) {
throw new PostSlackException("Slack error response: " + responseJson.getString("error"));
}
} catch (NullPointerException | ClassCastException e) {
throw new PostSlackException("Slack response JSON does not contain 'ok' key or it has invalid value.", e);
}
JsonString warning = responseJson.getJsonString("warning");
if (warning != null) {
getLogger().warn("Slack warning message: " + warning.getString());
}
if (context.getProperty(UPLOAD_FLOWFILE).asBoolean()) {
JsonObject file = responseJson.getJsonObject("file");
if (file != null) {
JsonString fileUrl = file.getJsonString("url_private");
if (fileUrl != null) {
session.putAttribute(flowFile, "slack.file.url", fileUrl.getString());
}
}
}
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().send(flowFile, url);
} catch (IOException | PostSlackException e) {
getLogger().error("Failed to send message to Slack.", e);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
context.yield();
} finally {
if (response != null) {
try {
// consume the entire content of the response (entity)
// so that the manager can release the connection back to the pool
EntityUtils.consume(response.getEntity());
response.close();
} catch (IOException e) {
getLogger().error("Could not properly close HTTP response.", e);
}
}
}
}
private HttpEntity createTextMessageRequestBody(ProcessContext context, FlowFile flowFile) throws PostSlackException, UnsupportedEncodingException {
JsonObjectBuilder jsonBuilder = Json.createObjectBuilder();
String channel = context.getProperty(CHANNEL).evaluateAttributeExpressions(flowFile).getValue();
if (channel == null || channel.isEmpty()) {
throw new PostSlackException("The channel must be specified.");
}
jsonBuilder.add("channel", channel);
String text = context.getProperty(TEXT).evaluateAttributeExpressions(flowFile).getValue();
if (text != null && !text.isEmpty()){
jsonBuilder.add("text", text);
} else {
if (attachmentProperties.isEmpty()) {
throw new PostSlackException("The text of the message must be specified if no attachment has been specified and 'Upload File' has been set to 'No'.");
}
}
if (!attachmentProperties.isEmpty()) {
JsonArrayBuilder jsonArrayBuilder = Json.createArrayBuilder();
for (PropertyDescriptor attachmentProperty : attachmentProperties) {
String propertyValue = context.getProperty(attachmentProperty).evaluateAttributeExpressions(flowFile).getValue();
if (propertyValue != null && !propertyValue.isEmpty()) {
try {
jsonArrayBuilder.add(Json.createReader(new StringReader(propertyValue)).readObject());
} catch (JsonParsingException e) {
getLogger().warn(attachmentProperty.getName() + " property contains no valid JSON, has been skipped.");
}
} else {
getLogger().warn(attachmentProperty.getName() + " property has no value, has been skipped.");
}
}
jsonBuilder.add("attachments", jsonArrayBuilder);
}
return new StringEntity(jsonBuilder.build().toString(), StandardCharsets.UTF_8);
}
private HttpEntity createFileMessageRequestBody(ProcessContext context, ProcessSession session, FlowFile flowFile) throws PostSlackException {
MultipartEntityBuilder multipartBuilder = MultipartEntityBuilder.create();
String channel = context.getProperty(CHANNEL).evaluateAttributeExpressions(flowFile).getValue();
if (channel == null || channel.isEmpty()) {
throw new PostSlackException("The channel must be specified.");
}
multipartBuilder.addTextBody("channels", channel, MIME_TYPE_PLAINTEXT_UTF8);
String text = context.getProperty(TEXT).evaluateAttributeExpressions(flowFile).getValue();
if (text != null && !text.isEmpty()) {
multipartBuilder.addTextBody("initial_comment", text, MIME_TYPE_PLAINTEXT_UTF8);
}
String title = context.getProperty(FILE_TITLE).evaluateAttributeExpressions(flowFile).getValue();
if (title != null && !title.isEmpty()) {
multipartBuilder.addTextBody("title", title, MIME_TYPE_PLAINTEXT_UTF8);
}
String fileName = context.getProperty(FILE_NAME).evaluateAttributeExpressions(flowFile).getValue();
if (fileName == null || fileName.isEmpty()) {
fileName = "file";
getLogger().warn("File name not specified, has been set to {}.", fileName);
}
multipartBuilder.addTextBody("filename", fileName, MIME_TYPE_PLAINTEXT_UTF8);
ContentType mimeType;
String mimeTypeStr = context.getProperty(FILE_MIME_TYPE).evaluateAttributeExpressions(flowFile).getValue();
if (mimeTypeStr == null || mimeTypeStr.isEmpty()) {
mimeType = ContentType.APPLICATION_OCTET_STREAM;
getLogger().warn("Mime type not specified, has been set to {}.", mimeType.getMimeType());
} else {
mimeType = ContentType.getByMimeType(mimeTypeStr);
if (mimeType == null) {
mimeType = ContentType.APPLICATION_OCTET_STREAM;
getLogger().warn("Unknown mime type specified ({}), has been set to {}.", mimeTypeStr, mimeType.getMimeType());
}
}
multipartBuilder.addBinaryBody("file", session.read(flowFile), mimeType, fileName);
return multipartBuilder.build();
}
private static class PostSlackException extends Exception {
PostSlackException(String message) {
super(message);
}
PostSlackException(String message, Throwable cause) {
super(message, cause);
}
}
}

View File

@ -0,0 +1,517 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.slack;
import com.slack.api.bolt.App;
import com.slack.api.bolt.AppConfig;
import com.slack.api.methods.MethodsClient;
import com.slack.api.methods.request.chat.ChatPostMessageRequest;
import com.slack.api.methods.request.files.FilesUploadV2Request;
import com.slack.api.methods.response.chat.ChatPostMessageResponse;
import com.slack.api.methods.response.files.FilesUploadV2Response;
import com.slack.api.model.File;
import com.slack.api.model.File.ShareDetail;
import com.slack.api.model.File.Shares;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.configuration.DefaultSettings;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.MultiProcessorUseCase;
import org.apache.nifi.annotation.documentation.ProcessorConfiguration;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.UseCase;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.slack.util.SlackResponseUtil;
import org.apache.nifi.processors.slack.util.ChannelMapper;
import org.apache.nifi.processors.slack.util.RateLimit;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.FormatUtils;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("""
Posts a message to the specified Slack channel. The content of the message can be either a user-defined message that makes use of Expression Language or
the contents of the FlowFile can be sent as the message. If sending a user-defined message, the contents of the FlowFile may also be optionally uploaded as
a file attachment.
""")
@SeeAlso({ListenSlack.class, ConsumeSlack.class})
@Tags({"slack", "conversation", "chat.postMessage", "social media", "team", "text", "unstructured", "write", "upload", "send", "publish"})
@WritesAttributes({
@WritesAttribute(attribute = "slack.channel.id", description = "The ID of the Slack Channel from which the messages were retrieved"),
@WritesAttribute(attribute = "slack.ts", description = "The timestamp of the slack messages that was sent; this is used by Slack as a unique identifier")
})
@DefaultSettings(yieldDuration = "3 sec")
@UseCase(
description = "Send specific text as a message to Slack, optionally including the FlowFile's contents as an attached file.",
configuration = """
Set "Access Token" to the value of your Slack OAuth Access Token.
Set "Channel" to the ID of the channel or the name of the channel prefixed with the # symbol. For example, "C0123456789" or "#general".
Set "Publish Strategy" to "Use 'Message Text' Property".
Set "Message Text" to the text that you would like to send as the Slack message.
Set "Include FlowFile Content as Attachment" to "true" if the FlowFile's contents should be attached as a file, or "false" to send just the message text without an attachment.
"""
)
@UseCase(
description = "Send the contents of the FlowFile as a message to Slack.",
configuration = """
Set "Access Token" to the value of your Slack OAuth Access Token.
Set "Channel" to the ID of the channel or the name of the channel prefixed with the # symbol. For example, "C0123456789" or "#general".
Set "Publish Strategy" to "Send FlowFile Content as Message".
"""
)
@MultiProcessorUseCase(
description = "Respond to a Slack message in a thread.",
keywords = {"slack", "respond", "reply", "thread"},
configurations = {
@ProcessorConfiguration(
processorClassName = "org.apache.nifi.processors.standard.EvaluateJsonPath",
configuration = """
Set "Destination" to "flowfile-attribute"
Add a new property named "thread.ts" with a value of `$.threadTs`
Add a new property named "message.ts" with a value of `$.ts`
Add a new property named "channel.id" with a value of `$.channel`
Add a new property named "user.id" with a value of `$.user`
Connect the "matched" Relationship to PublishSlack.
"""
),
@ProcessorConfiguration(
processorClass = PublishSlack.class,
configuration = """
Set "Access Token" to the value of your Slack OAuth Access Token.
Set "Channel" to `${'channel.id'}`
Set "Publish Strategy" to "Use 'Message Text' Property".
Set "Message Text" to the text that you would like to send as the response. If desired, you can reference the user of the original message by including the text `<@${'user.id'}>`.
For example: `Hey, <@${'user.id'}>, thanks for asking...`
Set "Include FlowFile Content as Attachment" to "false".
Set "Thread Timestamp" to `${'thread.ts':replaceEmpty( ${'message.ts'} )}`
"""
)
}
)
public class PublishSlack extends AbstractProcessor {
static final AllowableValue PUBLISH_STRATEGY_CONTENT_AS_MESSAGE = new AllowableValue("Send FlowFile Content as Message", "Send FlowFile Content as Message",
"The contents of the FlowFile will be sent as the message text.");
static final AllowableValue PUBLISH_STRATEGY_USE_PROPERTY = new AllowableValue("Use 'Message Text' Property", "Use 'Message Text' Property",
"The value of the Message Text Property will be sent as the message text.");
static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder()
.name("Access Token")
.description("OAuth Access Token used for authenticating/authorizing the Slack request sent by NiFi. This may be either a User Token or a Bot Token. " +
"The token must be granted the chat:write scope. Additionally, in order to upload FlowFile contents as an attachment, it must be granted files:write.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true)
.sensitive(true)
.build();
static PropertyDescriptor CHANNEL = new PropertyDescriptor.Builder()
.name("Channel")
.description("The name or identifier of the channel to send the message to. If using a channel name, it must be prefixed with the # character. " +
"For example, #general. This is valid only for public channels. Otherwise, the unique identifier of the channel to publish to must be " +
"provided.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.build();
static PropertyDescriptor PUBLISH_STRATEGY = new PropertyDescriptor.Builder()
.name("Publish Strategy")
.description("Specifies how the Processor will send the message or file to Slack.")
.required(true)
.allowableValues(PUBLISH_STRATEGY_CONTENT_AS_MESSAGE, PUBLISH_STRATEGY_USE_PROPERTY)
.defaultValue(PUBLISH_STRATEGY_CONTENT_AS_MESSAGE.getValue())
.build();
static PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder()
.name("Character Set")
.description("Specifies the name of the Character Set used to encode the FlowFile contents.")
.required(true)
.defaultValue("UTF-8")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.dependsOn(PUBLISH_STRATEGY, PUBLISH_STRATEGY_CONTENT_AS_MESSAGE)
.build();
static PropertyDescriptor MESSAGE_TEXT = new PropertyDescriptor.Builder()
.name("Message Text")
.description("The text of the message to send to Slack.")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.addValidator(Validator.VALID)
.dependsOn(PUBLISH_STRATEGY, PUBLISH_STRATEGY_USE_PROPERTY)
.build();
static PropertyDescriptor SEND_CONTENT_AS_ATTACHMENT = new PropertyDescriptor.Builder()
.name("Include FlowFile Content as Attachment")
.description("Specifies whether or not the contents of the FlowFile should be uploaded as an attachment to the Slack message.")
.allowableValues("true", "false")
.required(true)
.dependsOn(PUBLISH_STRATEGY, PUBLISH_STRATEGY_USE_PROPERTY)
.defaultValue("false")
.build();
static PropertyDescriptor MAX_FILE_SIZE = new PropertyDescriptor.Builder()
.name("Max FlowFile Size")
.description("The maximum size of a FlowFile that can be sent to Slack. If any FlowFile exceeds this size, it will be routed to failure. " +
"This plays an important role because the entire contents of the file must be loaded into NiFi's heap in order to send the data " +
"to Slack.")
.required(true)
.dependsOn(SEND_CONTENT_AS_ATTACHMENT, "true")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.defaultValue("1 MB")
.build();
static PropertyDescriptor THREAD_TS = new PropertyDescriptor.Builder()
.name("Thread Timestamp")
.description("The Timestamp identifier for the thread that this message is to be a part of. If not specified, the message will be a top-level message instead of " +
"being in a thread.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.build();
private static final List<PropertyDescriptor> properties = List.of(ACCESS_TOKEN,
CHANNEL,
PUBLISH_STRATEGY,
MESSAGE_TEXT,
CHARACTER_SET,
SEND_CONTENT_AS_ATTACHMENT,
MAX_FILE_SIZE,
THREAD_TS);
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles are routed to success after being successfully sent to Slack")
.build();
public static final Relationship REL_RATE_LIMITED = new Relationship.Builder()
.name("rate limited")
.description("FlowFiles are routed to 'rate limited' if the Rate Limit has been exceeded")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFiles are routed to 'failure' if unable to be sent to Slack for any other reason")
.build();
private static final Set<Relationship> relationships = Set.of(
REL_SUCCESS,
REL_RATE_LIMITED,
REL_FAILURE);
private final RateLimit rateLimit = new RateLimit(getLogger());
private volatile ChannelMapper channelMapper;
private volatile App slackApp;
private volatile MethodsClient client;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@OnScheduled
public void setup(final ProcessContext context) {
slackApp = createSlackApp(context);
client = slackApp.client();
channelMapper = new ChannelMapper(client);
}
@OnStopped
public void shutdown() {
if (slackApp != null) {
slackApp.stop();
}
}
private App createSlackApp(final ProcessContext context) {
final String botToken = context.getProperty(ACCESS_TOKEN).getValue();
final AppConfig appConfig = AppConfig.builder()
.singleTeamBotToken(botToken)
.build();
return new App(appConfig);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
if (rateLimit.isLimitReached()) {
getLogger().debug("Will not publish to Slack because rate limit has been reached");
context.yield();
return;
}
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final String channelId = getChannelId(flowFile, session, context);
if (channelId == null) {
// error will have already been logged
return;
}
// Get the message text
final String publishStrategy = context.getProperty(PUBLISH_STRATEGY).getValue();
if (PUBLISH_STRATEGY_CONTENT_AS_MESSAGE.getValue().equalsIgnoreCase(publishStrategy)) {
publishContentAsMessage(flowFile, channelId, context, session);
} else if (context.getProperty(SEND_CONTENT_AS_ATTACHMENT).asBoolean()) {
publishAsFile(flowFile, channelId, context, session);
} else {
final String messageText = context.getProperty(MESSAGE_TEXT).evaluateAttributeExpressions(flowFile).getValue();
publishAsMessage(flowFile, channelId, messageText, context, session);
}
}
private String getChannelId(final FlowFile flowFile, final ProcessSession session, final ProcessContext context) {
final String channelNameOrId = context.getProperty(CHANNEL).evaluateAttributeExpressions(flowFile).getValue();
if (channelNameOrId.isEmpty()) {
getLogger().error("No Channel ID was given for {}; routing to failure", flowFile);
session.transfer(flowFile, REL_FAILURE);
return null;
}
if (!channelNameOrId.startsWith("#")) {
return channelNameOrId;
}
// Resolve Channel name to an ID
try {
final String channelId = channelMapper.lookupChannelId(channelNameOrId);
if (channelId == null) {
getLogger().error("Could not find Channel with name {} for {}; routing to failure", channelNameOrId, flowFile);
session.transfer(flowFile, REL_FAILURE);
return null;
}
return channelId;
} catch (final Exception e) {
final Relationship relationship = handleClientException(channelNameOrId, flowFile, session, context, e);
getLogger().error("Failed to resolve Slack Channel ID for {}; transferring to {}", flowFile, relationship, e);
return null;
}
}
private void publishContentAsMessage(FlowFile flowFile, final String channelId, final ProcessContext context, final ProcessSession session) {
// Slack limits the message size to 100,000 characters. We don't have a way to know based on the size of the FlowFile how many characters it will contain,
// but we can be rather certain that if the size exceeds 500,000 bytes, it will also exceed 100,000 characters. As a result, we pre-emptively route to
// 'too large' in order to avoid buffering the contents into memory.
if (flowFile.getSize() > 500_000) {
getLogger().error("Cannot send contents of FlowFile {} to Slack because its length exceeds 500,000 bytes; routing to 'failure'", flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
}
final String charsetName = context.getProperty(CHARACTER_SET).evaluateAttributeExpressions(flowFile).getValue();
final byte[] buffer = new byte[(int) flowFile.getSize()];
final String messageText;
try (final InputStream in = session.read(flowFile)) {
StreamUtils.fillBuffer(in, buffer, true);
messageText = new String(buffer, charsetName);
} catch (final IOException ioe) {
getLogger().error("Failed to send contents of FlowFile {} to Slack; routing to failure", ioe);
session.transfer(flowFile, REL_FAILURE);
return;
}
if (messageText.length() > 100_000) {
getLogger().error("Cannot send contents of FlowFile {} to Slack because its length exceeds 100,000 characters; routing to 'failure'");
session.transfer(flowFile, REL_FAILURE);
return;
}
publishAsMessage(flowFile, channelId, messageText, context, session);
}
private void publishAsMessage(FlowFile flowFile, final String channelId, final String messageText, final ProcessContext context, final ProcessSession session) {
final String threadTs = context.getProperty(THREAD_TS).evaluateAttributeExpressions(flowFile).getValue();
final ChatPostMessageRequest request = ChatPostMessageRequest.builder()
.channel(channelId)
.text(messageText)
.threadTs(threadTs)
.build();
final ChatPostMessageResponse postMessageResponse;
try {
postMessageResponse = client.chatPostMessage(request);
} catch (final Exception e) {
final Relationship relationship = handleClientException(channelId, flowFile, session, context, e);
getLogger().error("Failed to send message to Slack for {}; transferring to {}", flowFile, relationship, e);
return;
}
if (!postMessageResponse.isOk()) {
final String errorMessage = SlackResponseUtil.getErrorMessage(postMessageResponse.getError(), postMessageResponse.getNeeded(),
postMessageResponse.getProvided(), postMessageResponse.getWarning());
getLogger().error("Could not send message to Slack for {} - received error: {}", flowFile, errorMessage);
session.transfer(flowFile, REL_FAILURE);
return;
}
final String ts = postMessageResponse.getTs();
final Map<String, String> attributes = Map.of("slack.ts", ts,
"slack.channel.id", channelId);
flowFile = session.putAllAttributes(flowFile, attributes);
session.getProvenanceReporter().send(flowFile, "https://slack.com/api/chat.postMessage");
session.transfer(flowFile, REL_SUCCESS);
}
private void publishAsFile(FlowFile flowFile, final String channelId, final ProcessContext context, final ProcessSession session) {
final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
final long maxSize = context.getProperty(MAX_FILE_SIZE).asDataSize(DataUnit.B).longValue();
if (flowFile.getSize() > maxSize) {
getLogger().warn("{} exceeds max allowable file size. Max File Size = {}; FlowFile size = {}; routing to 'failure'",
flowFile, FormatUtils.formatDataSize(maxSize), FormatUtils.formatDataSize(flowFile.getSize()));
session.transfer(flowFile, REL_FAILURE);
return;
}
final FilesUploadV2Response uploadResponse;
try {
final byte[] buffer = new byte[(int) flowFile.getSize()];
try (final InputStream in = session.read(flowFile)) {
StreamUtils.fillBuffer(in, buffer, true);
}
final String message = context.getProperty(MESSAGE_TEXT).evaluateAttributeExpressions(flowFile).getValue();
final String threadTs = context.getProperty(THREAD_TS).evaluateAttributeExpressions(flowFile).getValue();
final FilesUploadV2Request uploadRequest = FilesUploadV2Request.builder()
.filename(filename)
.title(filename)
.initialComment(message)
.channel(channelId)
.threadTs(threadTs)
.fileData(buffer)
.build();
uploadResponse = client.filesUploadV2(uploadRequest);
} catch (final Exception e) {
final Relationship relationship = handleClientException(channelId, flowFile, session, context, e);
getLogger().error("Could not upload contents of {} to Slack; routing to {}", flowFile, relationship, e);
return;
}
if (!uploadResponse.isOk()) {
final String errorMessage = SlackResponseUtil.getErrorMessage(uploadResponse.getError(), uploadResponse.getNeeded(),
uploadResponse.getProvided(), uploadResponse.getWarning());
getLogger().error("Could not upload contents of {} to Slack - received error: {}", flowFile, errorMessage);
session.transfer(flowFile, REL_FAILURE);
return;
}
// Get timestamp that the file was shared so that we can add as an attribute.
final File file = uploadResponse.getFile();
final Shares shares = file.getShares();
String ts = null;
if (shares != null) {
ts = getTs(shares.getPrivateChannels());
if (ts == null) {
ts = getTs(shares.getPublicChannels());
}
}
final Map<String, String> attributes = new HashMap<>();
attributes.put("slack.channel.id", channelId);
if (ts != null) {
attributes.put("slack.ts", ts);
}
flowFile = session.putAllAttributes(flowFile, attributes);
session.getProvenanceReporter().send(flowFile, "https://slack.com/api/files.upload");
session.transfer(flowFile, REL_SUCCESS);
}
private Relationship handleClientException(final String channel, final FlowFile flowFile, final ProcessSession session, final ProcessContext context, final Exception cause) {
final boolean rateLimited = yieldOnRateLimit(cause, channel, context);
final Relationship relationship = rateLimited ? REL_RATE_LIMITED : REL_FAILURE;
session.transfer(flowFile, relationship);
return relationship;
}
private boolean yieldOnRateLimit(final Throwable t, final String channelId, final ProcessContext context) {
final boolean rateLimited = SlackResponseUtil.isRateLimited(t);
if (rateLimited) {
getLogger().warn("Slack indicated that the Rate Limit has been exceeded when attempting to publish messages to channel {}", channelId);
} else {
getLogger().error("Failed to retrieve messages for channel {}", channelId, t);
}
final int retryAfterSeconds = SlackResponseUtil.getRetryAfterSeconds(t);
rateLimit.retryAfter(Duration.ofSeconds(retryAfterSeconds));
context.yield();
return rateLimited;
}
private String getTs(final Map<String, List<ShareDetail>> shareDetails) {
if (shareDetails == null) {
return null;
}
for (final List<ShareDetail> detailList : shareDetails.values()) {
for (final ShareDetail detail : detailList) {
final String ts = detail.getTs();
if (ts != null) {
return ts;
}
}
}
return null;
}
}

View File

@ -1,326 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.slack;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.StringReader;
import java.io.StringWriter;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URL;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import javax.json.Json;
import javax.json.JsonArrayBuilder;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import javax.json.JsonReader;
import javax.json.JsonWriter;
import javax.json.stream.JsonParsingException;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;
@Tags({"put", "slack", "notify"})
@CapabilityDescription("Publishes a message to Slack")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@DynamicProperty(name = "A JSON object to add to Slack's \"attachments\" JSON payload.", value = "JSON-formatted string to add to Slack's payload JSON appended to the \"attachments\" JSON array.",
expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
description = "Converts the contents of each value specified by the Dynamic Property's value to JSON and appends it to the payload being sent to Slack.")
public class PutSlack extends AbstractProcessor {
public static final PropertyDescriptor WEBHOOK_URL = new PropertyDescriptor
.Builder()
.name("webhook-url")
.displayName("Webhook URL")
.description("The POST URL provided by Slack to send messages into a channel.")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.addValidator(StandardValidators.URL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.sensitive(true)
.build();
public static final PropertyDescriptor WEBHOOK_TEXT = new PropertyDescriptor
.Builder()
.name("webhook-text")
.displayName("Webhook Text")
.description("The text sent in the webhook message")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor CHANNEL = new PropertyDescriptor
.Builder()
.name("channel")
.displayName("Channel")
.description("A public channel using #channel or direct message using @username. If not specified, " +
"the default webhook channel as specified in Slack's Incoming Webhooks web interface is used.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor THREAD_TS = new PropertyDescriptor.Builder()
.name("Thread Timestamp")
.description("The timestamp of the parent message, also known as a thread_ts, or Thread Timestamp. If not specified, the message will be send to the channel " +
"as an independent message. If this value is populated, the message will be sent as a reply to the message whose timestamp is equal to the given value.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor USERNAME = new PropertyDescriptor
.Builder()
.name("username")
.displayName("Username")
.description("The displayed Slack username")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor ICON_URL = new PropertyDescriptor
.Builder()
.name("icon-url")
.displayName("Icon URL")
.description("Icon URL to be used for the message")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.URL_VALIDATOR)
.build();
public static final PropertyDescriptor ICON_EMOJI = new PropertyDescriptor
.Builder()
.name("icon-emoji")
.displayName("Icon Emoji")
.description("Icon Emoji to be used for the message. Must begin and end with a colon, e.g. :ghost:")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.addValidator(new EmojiValidator())
.build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("Specifies an optional SSL Context Service that, if provided, will be used to create connections")
.identifiesControllerService(SSLContextService.class)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles are routed to success after being successfully sent to Slack")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFiles are routed to failure if unable to be sent to Slack")
.build();
private final SortedSet<PropertyDescriptor> attachmentDescriptors = Collections.synchronizedSortedSet(new TreeSet<>());
private static final List<PropertyDescriptor> descriptors = List.of(
WEBHOOK_URL,
WEBHOOK_TEXT,
CHANNEL,
USERNAME,
THREAD_TS,
ICON_URL,
ICON_EMOJI,
SSL_CONTEXT_SERVICE);
private static final Set<Relationship> relationships = Set.of(REL_SUCCESS, REL_FAILURE);
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.required(false)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.dynamic(true)
.build();
}
@OnScheduled
public void initialize(final ProcessContext context) {
attachmentDescriptors.clear();
for (Map.Entry<PropertyDescriptor, String> property : context.getProperties().entrySet()) {
PropertyDescriptor descriptor = property.getKey();
if (descriptor.isDynamic()) {
attachmentDescriptors.add(descriptor);
}
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
if ( flowFile == null ) {
return;
}
final JsonObjectBuilder builder = Json.createObjectBuilder();
final String text = context.getProperty(WEBHOOK_TEXT).evaluateAttributeExpressions(flowFile).getValue();
if (isEmpty(text)) {
getLogger().error("FlowFile should have non-empty " + WEBHOOK_TEXT.getDisplayName());
session.transfer(flowFile, REL_FAILURE);
return;
}
builder.add("text", text);
final String channel = context.getProperty(CHANNEL).evaluateAttributeExpressions(flowFile).getValue();
if (!isEmpty(channel)) {
builder.add("channel", channel);
}
final String username = context.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue();
if (!isEmpty(username)) {
builder.add("username", username);
}
final String iconUrl = context.getProperty(ICON_URL).evaluateAttributeExpressions(flowFile).getValue();
if (!isEmpty(iconUrl)) {
builder.add("icon_url", iconUrl);
}
final String threadTs = context.getProperty(THREAD_TS).evaluateAttributeExpressions(flowFile).getValue();
if (!isEmpty(threadTs)) {
builder.add("thread_ts", threadTs);
}
final String iconEmoji = context.getProperty(ICON_EMOJI).evaluateAttributeExpressions(flowFile).getValue();
if (!isEmpty(iconEmoji)) {
builder.add("icon_emoji", iconEmoji);
}
final SSLContextService sslService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
try {
// Get Attachments Array
if (!attachmentDescriptors.isEmpty()) {
final JsonArrayBuilder jsonArrayBuilder = Json.createArrayBuilder();
for (final PropertyDescriptor attachmentDescriptor : attachmentDescriptors) {
final String attachmentString = context.getProperty(attachmentDescriptor).evaluateAttributeExpressions(flowFile).getValue();
final JsonReader reader = Json.createReader(new StringReader(attachmentString));
final JsonObject attachmentJson = reader.readObject();
jsonArrayBuilder.add(attachmentJson);
}
builder.add("attachments", jsonArrayBuilder);
}
final JsonObject jsonObject = builder.build();
final StringWriter stringWriter = new StringWriter();
final JsonWriter jsonWriter = Json.createWriter(stringWriter);
jsonWriter.writeObject(jsonObject);
jsonWriter.close();
final URL url = URI.create(context.getProperty(WEBHOOK_URL).evaluateAttributeExpressions(flowFile).getValue()).toURL();
final HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("POST");
conn.setDoOutput(true);
if (sslService != null) {
final SSLContext sslContext = sslService.createContext();
((HttpsURLConnection) conn).setSSLSocketFactory(sslContext.getSocketFactory());
}
final DataOutputStream outputStream = new DataOutputStream(conn.getOutputStream());
final String payload = "payload=" + URLEncoder.encode(stringWriter.getBuffer().toString(), StandardCharsets.UTF_8);
outputStream.writeBytes(payload);
outputStream.close();
int responseCode = conn.getResponseCode();
if (responseCode >= 200 && responseCode < 300) {
getLogger().info("Successfully posted message to Slack");
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().send(flowFile, url.toString());
} else {
getLogger().error("Failed to post message to Slack with response code {}", responseCode);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
context.yield();
}
} catch (final JsonParsingException e) {
getLogger().error("Failed to parse JSON", e);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
} catch (final IOException e) {
getLogger().error("Failed to open connection", e);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
context.yield();
}
}
private boolean isEmpty(final String value) {
return value == null || value.isEmpty();
}
private static class EmojiValidator implements Validator {
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
if (input.startsWith(":") && input.endsWith(":") && input.length() > 2) {
return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
}
return new ValidationResult.Builder().input(input).subject(subject).valid(false)
.explanation("Must begin and end with a colon")
.build();
}
}
}

View File

@ -48,6 +48,7 @@ import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processors.slack.util.SlackResponseUtil;
public class ConsumeChannel {
private static final String CONVERSATION_HISTORY_URL = "https://slack.com/api/conversations.history";
@ -140,7 +141,7 @@ public class ConsumeChannel {
.build();
}
final String errorMessage = ConsumeSlackUtil.getErrorMessage(response.getError(), response.getNeeded(), response.getProvided(), response.getWarning());
final String errorMessage = SlackResponseUtil.getErrorMessage(response.getError(), response.getNeeded(), response.getProvided(), response.getWarning());
return new ConfigVerificationResult.Builder()
.verificationStepName("Check authorization for Channel " + channelId)
.outcome(ConfigVerificationResult.Outcome.FAILED)
@ -422,7 +423,7 @@ public class ConsumeChannel {
// Gather slack conversation history
final ConversationsHistoryResponse response = client.fetchConversationsHistory(request);
if (!response.isOk()) {
final String error = ConsumeSlackUtil.getErrorMessage(response.getError(), response.getNeeded(), response.getProvided(), response.getWarning());
final String error = SlackResponseUtil.getErrorMessage(response.getError(), response.getNeeded(), response.getProvided(), response.getWarning());
logger.error("Received unexpected response from Slack when attempting to retrieve messages for channel {}: {}", channelId, error);
context.yield();
return new StandardConsumptionResults(null, null, null, true, false, false);
@ -479,7 +480,7 @@ public class ConsumeChannel {
}
// Simple case is that we need to output only the message.
if (!ConsumeSlackUtil.hasReplies(message)) {
if (!SlackResponseUtil.hasReplies(message)) {
continue;
}
@ -611,8 +612,8 @@ public class ConsumeChannel {
private void yieldOnException(final PartialThreadException e, final String channelId, final Message message, final ProcessContext context) {
if (ConsumeSlackUtil.isRateLimited(e.getCause())) {
final int retryAfterSeconds = ConsumeSlackUtil.getRetryAfterSeconds(e);
if (SlackResponseUtil.isRateLimited(e.getCause())) {
final int retryAfterSeconds = SlackResponseUtil.getRetryAfterSeconds(e);
logger.warn("Slack indicated that the Rate Limit has been exceeded when attempting to retrieve messages for channel {}; will continue in {} seconds",
channelId, retryAfterSeconds);
} else {
@ -620,7 +621,7 @@ public class ConsumeChannel {
message.getThreadTs(), e.getMessage(), e);
}
final int retryAfterSeconds = ConsumeSlackUtil.getRetryAfterSeconds(e);
final int retryAfterSeconds = SlackResponseUtil.getRetryAfterSeconds(e);
final long timeOfNextRequest = System.currentTimeMillis() + (retryAfterSeconds * 1000L);
nextRequestTime.getAndUpdate(currentTime -> Math.max(currentTime, timeOfNextRequest));
context.yield();
@ -662,7 +663,7 @@ public class ConsumeChannel {
}
if (!response.isOk()) {
final String errorMessage = ConsumeSlackUtil.getErrorMessage(response.getError(), response.getNeeded(), response.getProvided(), response.getWarning());
final String errorMessage = SlackResponseUtil.getErrorMessage(response.getError(), response.getNeeded(), response.getProvided(), response.getWarning());
throw new PartialThreadException(replies, cursor, errorMessage);
}

View File

@ -18,6 +18,7 @@ package org.apache.nifi.processors.slack.consume;
import com.slack.api.methods.response.users.UsersInfoResponse;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.slack.util.SlackResponseUtil;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -46,11 +47,11 @@ public class UsernameLookup {
return username;
}
final String errorMessage = ConsumeSlackUtil.getErrorMessage(response.getError(), response.getNeeded(), response.getProvided(), response.getWarning());
final String errorMessage = SlackResponseUtil.getErrorMessage(response.getError(), response.getNeeded(), response.getProvided(), response.getWarning());
logger.warn("Failed to retrieve Username for User ID {}: {}", userId, errorMessage);
return null;
} catch (final Exception e) {
if (ConsumeSlackUtil.isRateLimited(e)) {
if (SlackResponseUtil.isRateLimited(e)) {
logger.warn("Failed to retrieve Username for User ID {} because the Rate Limit has been exceeded", userId);
} else {
logger.warn("Failed to retrieve Username for User ID {}: {}", userId, e.getMessage(), e);

View File

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.slack.util;
import com.slack.api.methods.MethodsClient;
import com.slack.api.methods.SlackApiException;
import com.slack.api.methods.request.conversations.ConversationsListRequest;
import com.slack.api.methods.response.conversations.ConversationsListResponse;
import org.apache.nifi.util.StringUtils;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class ChannelMapper {
private final Map<String, String> mappedChannels = new ConcurrentHashMap<>();
private final MethodsClient client;
public ChannelMapper(final MethodsClient client) {
this.client = client;
}
public String lookupChannelId(String channelName) throws SlackApiException, IOException {
if (channelName == null) {
return null;
}
channelName = channelName.trim();
if (channelName.startsWith("#")) {
if (channelName.length() == 1) {
return null;
}
channelName = channelName.substring(1);
}
final String channelId = mappedChannels.get(channelName);
if (channelId == null) {
lookupChannels(channelName);
}
return mappedChannels.get(channelName);
}
private void lookupChannels(final String desiredChannelName) throws SlackApiException, IOException {
String cursor = null;
while (true) {
final ConversationsListRequest request = ConversationsListRequest.builder()
.cursor(cursor)
.limit(1000)
.build();
final ConversationsListResponse response = client.conversationsList(request);
if (response.isOk()) {
response.getChannels().forEach(channel -> mappedChannels.put(channel.getName(), channel.getId()));
cursor = response.getResponseMetadata().getNextCursor();
if (StringUtils.isEmpty(cursor)) {
return;
}
if (mappedChannels.containsKey(desiredChannelName)) {
return;
}
continue;
}
final String errorMessage = SlackResponseUtil.getErrorMessage(response.getError(), response.getNeeded(), response.getProvided(), response.getWarning());
throw new RuntimeException("Failed to determine Channel IDs: " + errorMessage);
}
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.slack.util;
import org.apache.nifi.logging.ComponentLog;
import java.time.Duration;
import java.util.Date;
import java.util.concurrent.atomic.AtomicLong;
public class RateLimit {
private final AtomicLong nextRequestTime = new AtomicLong(0L);
private final ComponentLog logger;
public RateLimit(final ComponentLog logger) {
this.logger = logger;
}
public void retryAfter(final Duration duration) {
final long timeOfNextRequest = System.currentTimeMillis() + duration.toMillis();
nextRequestTime.getAndUpdate(currentTime -> Math.max(currentTime, timeOfNextRequest));
}
public boolean isLimitReached() {
final long nextTime = nextRequestTime.get();
if (nextTime > 0 && System.currentTimeMillis() < nextTime) {
logger.debug("Will not interact with Slack until {} due to Slack's Rate Limit", new Date(nextTime));
return true;
} else if (nextTime > 0) {
// Set nextRequestTime to 0 so that we no longer bother to make system calls to System.currentTimeMillis()
nextRequestTime.compareAndSet(nextTime, 0);
}
return false;
}
}

View File

@ -14,15 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.slack.consume;
package org.apache.nifi.processors.slack.util;
import com.slack.api.methods.SlackApiException;
import com.slack.api.model.Message;
import org.apache.nifi.processors.slack.consume.PartialThreadException;
import org.apache.nifi.util.StringUtils;
import java.util.Objects;
public class ConsumeSlackUtil {
public class SlackResponseUtil {
public static String getErrorMessage(final String error, final String needed, final String provided, final String warning) {
final String mainMessage = Objects.requireNonNullElse(error, warning);
@ -60,11 +61,4 @@ public class ConsumeSlackUtil {
final String threadTs = message.getThreadTs();
return !StringUtils.isEmpty(threadTs);
}
public static void main(final String[] args) {
final String ts = "1694011369.515799";
final double d = Double.parseDouble(ts);
final long l = (long) (d * 1000D);
System.out.println(new java.util.Date(l));
}
}

View File

@ -12,4 +12,5 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.services.slack.SlackRecordSink

View File

@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.slack.PutSlack
org.apache.nifi.processors.slack.PostSlack
org.apache.nifi.processors.slack.ConsumeSlack
org.apache.nifi.processors.slack.ListenSlack
org.apache.nifi.processors.slack.ListenSlack
org.apache.nifi.processors.slack.PublishSlack

View File

@ -1,107 +0,0 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>PostSlack</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
<style>
dt {font-style: italic;}
</style>
</head>
<body>
<!-- Processor Documentation ================================================== -->
<h2>Description:</h2>
<p>
The PostSlack processor sends messages on Slack, a team-oriented messaging service.
The message can be a simple text message, furthermore Slack attachments can also be specified
or the FlowFile content (e.g. an image) can be uploaded and attached to the message too.
</p>
<h3>Slack App setup</h3>
<p>
This processor uses Slack Web API methods to post messages to a specific channel.
Before using PostSlack, you need to create a Slack App, to add a Bot User to the app,
and to install the app in your Slack workspace. After the app installed, you can get
the Bot User's OAuth Access Token that will be needed to authenticate and authorize
your PostSlack processor to Slack.
</p>
<h3>Message types</h3>
<p>
You can send the following types of messages with PostSlack:
</p>
<dl>
<dt>Text-only message</dt>
<dd>
A simple text message. In this case you must specify the &lt;Text&gt; property,
while &lt;Upload FlowFile&gt; is 'No' and no attachments defined through dynamic properties.
File related properties will be ignored in this case.
</dd>
<dt>Text message with attachment</dt>
<dd>
Besides the &lt;Text&gt; property, one or more attachments are also defined through dynamic properties
(for more details see the <a href="#slack-attachments">Slack attachments</a> section below).
The &lt;Upload FlowFile&gt; property needs to be set to 'No'. File related properties will be ignored.
</dd>
<dt>Attachment-only message</dt>
<dd>
The same as the previous one, but the &lt;Text&gt; property is not specified
(so no text section will be displayed at the beginning of the message, only the attachment(s)).
</dd>
<dt>Text message with file upload</dt>
<dd>
You need to specify the &lt;Text&gt; property and set &lt;Upload FlowFile&gt; to 'Yes'.
The content of the FlowFile will be uploaded with the message and it will be displayed
below the text. You should specify &lt;File Name&gt; and &lt;File Mime Type&gt; properties too
(otherwise some fallback values will be set), and optionally &lt;File Title&gt;.
The dynamic properties will be ignored in this case.
</dd>
<dt>File upload message without text</dt>
<dd>
The same as the previous one, but the &lt;Text&gt; property is not specified
(so no text section will be displayed at the beginning of the message, only the uploaded file).
</dd>
</dl>
<h3 id="slack-attachments">Slack attachments</h3>
<p>
Slack content and link attachments can be added to the message through dynamic properties.
Please note that this kind of Slack message attachments does not involve the file
upload itself, but rather contain links to external resources (or internal resources already uploaded
to Slack). Please also note that this functionality does not work together with file upload
(so it can only be used when 'Upload FlowFile' has been set to 'No').
</p>
<p>
The Dynamic Properties can be used to specify these Slack message attachments as JSON snippets.
Each property value will be converted to JSON and will be added to the array of attachments in the JSON
payload being sent to Slack.
</p>
<p>
Example JSON snippets to define Slack attachments:
</p>
<pre>
{
"text": "Text that appears within the attachment",
"image_url": "http://some-website.com/path/to/image.jpg"
}
</pre>
<pre>
{
"title": "Title of the attachment",
"image_url": "http://some-website.com/path/to/image.jpg"
}
</pre>
</body>
</html>

View File

@ -0,0 +1,96 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8"/>
<title>PublishSlack</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
</head>
<body>
<h2>Description:</h2>
<p>
PublishSlack allows for the ability to send messages to Slack using Slack's <code>chat.postMessage</code> API.
This Processor should be preferred over the deprecated PutSlack and PostSlack Processors, as it aims to incorporate
the capabilities of both of those Processors, improve the maintainability, and ease the configuration for the user.
</p>
<h2>Slack Setup</h2>
<p>
In order use this Processor, it requires that a Slack App be created and installed in your Slack workspace.
An OAuth User or Bot Token must be created for the App.
The token must have the <code>chat:write</code> Token Scope.
Please see <a href="https://api.slack.com/start/quickstart">Slack's documentation</a> for the
latest information on how to create an Application and install it into your workspace.
</p>
<p>
Depending on the Processor's configuration, you may also require additional Scopes.
For example, if configured to upload the contents of the FlowFile as a message attachment, the <code>files:write</code>
User Token Scope or Bot Token Scope must be granted.
Additionally, the Channels to consume from may be listed either as a Channel ID or (for public Channels) a Channel Name.
However, if a name, such as <code>#general</code> is used, the token must be provided the <code>channels:read</code> scope
in order to determine the Channel ID for you.
</p>
<p>
Rather than requiring the <code>channels:read</code> Scope, you may alternatively supply only Channel IDs for the
"Channel" property. To determine the ID of a Channel, navigate to the desired Channel in Slack. Click the name of
the Channel at the top of the screen. This provides a popup that provides information about the Channel. Scroll to
the bottom of the popup, and you will be shown the Channel ID with the ability to click a button to Copy the ID
to your clipboard.
</p>
<p>
At the time of this writing, the following steps may be used to create a Slack App with the necessary scope of
<code>chat:write</code> scope. However, these instructions are subject to change at any time, so it is
best to read through <a href="https://api.slack.com/start/quickstart">Slack's Quickstart Guide</a>.
</p>
<ul>
<li>
Create a Slack App. Click <a href="https://api.slack.com/apps">here</a> to get started. From here,
click the "Create New App" button and choose "From scratch." Give your App a name and choose the workspace
that you want to use for developing the app.
</li>
<li>
Creating your app will take you to the configuration page for your application.
For example, <code>https://api.slack.com/apps/&lt;APP_IDENTIFIER&gt;</code>. From here, click on
"OAuth & Permissions" in the left-hand menu. Scroll down to the "Scopes" section and click the
"Add an OAuth Scope" button under 'Bot Token Scopes'. Choose the <code>chat:write</code> scope.
</li>
<li>
Scroll back to the top, and under the "OAuth Tokens for Your Workspace" section, click the
"Install to Workspace" button. This will prompt you to allow the application to be added to your workspace,
if you have the appropriate permissions. Otherwise, it will generate a notification for a Workspace Owner
to approve the installation. Additionally, it will generate a "Bot User OAuth Token".
</li>
<li>
Copy the value of the "Bot User OAuth Token." This will be used as the value for the ConsumeSlack Processor's
<code>Access Token</code> property.
</li>
<li>
The Bot must then be enabled for each Channel that you would like to consume messages from. In order to do that,
in the Slack application, go to the Channel that you would like to consume from and press <code>/</code>.
Choose the <code>Add apps to this channel</code> option, and add the Application that you created as a Bot to
the channel.
</li>
</ul>
</body>
</html>

View File

@ -1,53 +0,0 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>PutSlack</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
</head>
<body>
<!-- Processor Documentation ================================================== -->
<h2>Description:</h2>
<p>
The PutSlack processor sends messages to <a href="https://www.slack.com">Slack</a>,
a team-oriented messaging service.
</p>
<p>
This processor uses Slack's <a href="https://slack.com/apps/A0F7XDUAZ-incoming-webhooks">incoming webhooks</a>
custom integration to post messages to a specific channel. Before using PutSlack, your Slack team should be
configured for the incoming webhooks custom integration, and you'll need to configure at least one incoming
webhook.
</p>
<p>
To configure PutSlack, set the following mandatory properties:
<ul>
<li>
<code>Webhook URL</code>: The URL received from Slack that allows the processor to send messages to your team.
</li>
<li>
<code>Webhook Text</code>: The text of the message to send to Slack.
</li>
</ul>
</p>
<p>
Dynamic properties can be used to append items to the "attachments" branch of the JSON payload. Each dynamic property will be processed and added
as an item within the array. The keys are not used by the processor. Instead, for each flowfile, the values of the dynamic properties is converted to JSON and added
to the the attachments key of the JSON payload sent to Slack. For information on the attachment data structure, see <a href="https://api.slack.com/docs/message-attachments">https://api.slack.com/docs/message-attachments</a>
</p>
</body>
</html>

View File

@ -1,152 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.slack;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
public class PostSlackConfigValidationTest {
private TestRunner testRunner;
@BeforeEach
public void setup() {
testRunner = TestRunners.newTestRunner(PostSlack.class);
}
@Test
public void validationShouldPassIfTheConfigIsFine() {
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty(PostSlack.TEXT, "my-text");
testRunner.assertValid();
}
@Test
public void validationShouldFailIfPostMessageUrlIsEmptyString() {
testRunner.setProperty(PostSlack.POST_MESSAGE_URL, "");
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty(PostSlack.TEXT, "my-text");
testRunner.assertNotValid();
}
@Test
public void validationShouldFailIfPostMessageUrlIsNotValid() {
testRunner.setProperty(PostSlack.POST_MESSAGE_URL, "not-url");
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty(PostSlack.TEXT, "my-text");
testRunner.assertNotValid();
}
@Test
public void validationShouldFailIfFileUploadUrlIsEmptyString() {
testRunner.setProperty(PostSlack.FILE_UPLOAD_URL, "");
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty(PostSlack.TEXT, "my-text");
testRunner.assertNotValid();
}
@Test
public void validationShouldFailIfFileUploadUrlIsNotValid() {
testRunner.setProperty(PostSlack.FILE_UPLOAD_URL, "not-url");
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty(PostSlack.TEXT, "my-text");
testRunner.assertNotValid();
}
@Test
public void validationShouldFailIfAccessTokenIsNotGiven() {
testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty(PostSlack.TEXT, "my-text");
testRunner.assertNotValid();
}
@Test
public void validationShouldFailIfAccessTokenIsEmptyString() {
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty(PostSlack.TEXT, "my-text");
testRunner.assertNotValid();
}
@Test
public void validationShouldFailIfChannelIsNotGiven() {
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.TEXT, "my-text");
testRunner.assertNotValid();
}
@Test
public void validationShouldFailIfChannelIsEmptyString() {
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "");
testRunner.setProperty(PostSlack.TEXT, "my-text");
testRunner.assertNotValid();
}
@Test
public void validationShouldFailIfTextIsNotGivenAndNoAttachmentSpecifiedNorFileUploadChosen() {
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.assertNotValid();
}
@Test
public void validationShouldPassIfTextIsNotGivenButAttachmentSpecified() {
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty("attachment_01", "{\"my-attachment-key\": \"my-attachment-value\"}");
testRunner.assertValid();
}
@Test
public void validationShouldPassIfTextIsNotGivenButFileUploadChosen() {
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty(PostSlack.UPLOAD_FLOWFILE, PostSlack.UPLOAD_FLOWFILE_YES);
testRunner.assertValid();
}
@Test
public void validationShouldFailIfTextIsEmptyString() {
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty(PostSlack.TEXT, "");
testRunner.setProperty("attachment_01", "{\"my-attachment-key\": \"my-attachment-value\"}");
testRunner.setProperty(PostSlack.UPLOAD_FLOWFILE, PostSlack.UPLOAD_FLOWFILE_YES);
testRunner.assertNotValid();
}
}

View File

@ -1,328 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.slack;
import okhttp3.Headers;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class PostSlackFileMessageTest {
private static final String RESPONSE_SUCCESS_FILE_MSG = "{\"ok\": true, \"file\": {\"url_private\": \"slack-file-url\"}}";
private TestRunner testRunner;
private MockWebServer mockWebServer;
private String url;
@BeforeEach
public void init() {
mockWebServer = new MockWebServer();
url = mockWebServer.url("/").toString();
testRunner = TestRunners.newTestRunner(PostSlack.class);
}
@Test
public void sendMessageWithBasicPropertiesSuccessfully() throws InterruptedException {
testRunner.setProperty(PostSlack.FILE_UPLOAD_URL, url);
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty(PostSlack.UPLOAD_FLOWFILE, PostSlack.UPLOAD_FLOWFILE_YES);
Map<String, String> flowFileAttributes = new HashMap<>();
flowFileAttributes.put(CoreAttributes.FILENAME.key(), "my-file-name");
flowFileAttributes.put(CoreAttributes.MIME_TYPE.key(), "image/png");
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(RESPONSE_SUCCESS_FILE_MSG));
// in order not to make the assertion logic (even more) complicated, the file content is tested with character data instead of binary data
testRunner.enqueue("my-data", flowFileAttributes);
testRunner.run(1);
testRunner.assertAllFlowFilesTransferred(PutSlack.REL_SUCCESS);
assertRequest("my-file-name", "image/png", null, null);
FlowFile flowFileOut = testRunner.getFlowFilesForRelationship(PutSlack.REL_SUCCESS).get(0);
assertEquals("slack-file-url", flowFileOut.getAttribute("slack.file.url"));
}
@Test
public void sendMessageWithAllPropertiesSuccessfully() throws InterruptedException {
testRunner.setProperty(PostSlack.FILE_UPLOAD_URL, url);
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty(PostSlack.TEXT, "my-text");
testRunner.setProperty(PostSlack.UPLOAD_FLOWFILE, PostSlack.UPLOAD_FLOWFILE_YES);
testRunner.setProperty(PostSlack.FILE_TITLE, "my-file-title");
testRunner.setProperty(PostSlack.FILE_NAME, "my-file-name");
testRunner.setProperty(PostSlack.FILE_MIME_TYPE, "image/png");
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(RESPONSE_SUCCESS_FILE_MSG));
testRunner.enqueue("my-data");
testRunner.run(1);
testRunner.assertAllFlowFilesTransferred(PutSlack.REL_SUCCESS);
assertRequest("my-file-name", "image/png", "my-text", "my-file-title");
FlowFile flowFileOut = testRunner.getFlowFilesForRelationship(PutSlack.REL_SUCCESS).get(0);
assertEquals("slack-file-url", flowFileOut.getAttribute("slack.file.url"));
}
@Test
public void processShouldFailWhenChannelIsEmpty() {
testRunner.setProperty(PostSlack.FILE_UPLOAD_URL, url);
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "${dummy}");
testRunner.setProperty(PostSlack.UPLOAD_FLOWFILE, PostSlack.UPLOAD_FLOWFILE_YES);
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody("{}"));
testRunner.enqueue("my-data");
testRunner.run(1);
testRunner.assertAllFlowFilesTransferred(PutSlack.REL_FAILURE);
}
@Test
public void fileNameShouldHaveFallbackValueWhenEmpty() throws InterruptedException {
testRunner.setProperty(PostSlack.FILE_UPLOAD_URL, url);
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty(PostSlack.UPLOAD_FLOWFILE, PostSlack.UPLOAD_FLOWFILE_YES);
testRunner.setProperty(PostSlack.FILE_NAME, "${dummy}");
testRunner.setProperty(PostSlack.FILE_MIME_TYPE, "image/png");
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(RESPONSE_SUCCESS_FILE_MSG));
testRunner.enqueue("my-data");
testRunner.run(1);
testRunner.assertAllFlowFilesTransferred(PutSlack.REL_SUCCESS);
// fallback value for file name is 'file'
assertRequest("file", "image/png", null, null);
FlowFile flowFileOut = testRunner.getFlowFilesForRelationship(PutSlack.REL_SUCCESS).get(0);
assertEquals("slack-file-url", flowFileOut.getAttribute("slack.file.url"));
}
@Test
public void mimeTypeShouldHaveFallbackValueWhenEmpty() throws InterruptedException {
testRunner.setProperty(PostSlack.FILE_UPLOAD_URL, url);
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty(PostSlack.UPLOAD_FLOWFILE, PostSlack.UPLOAD_FLOWFILE_YES);
testRunner.setProperty(PostSlack.FILE_NAME, "my-file-name");
testRunner.setProperty(PostSlack.FILE_MIME_TYPE, "${dummy}");
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(RESPONSE_SUCCESS_FILE_MSG));
testRunner.enqueue("my-data");
testRunner.run(1);
testRunner.assertAllFlowFilesTransferred(PutSlack.REL_SUCCESS);
// fallback value for mime type is 'application/octet-stream'
assertRequest("my-file-name", "application/octet-stream", null, null);
FlowFile flowFileOut = testRunner.getFlowFilesForRelationship(PutSlack.REL_SUCCESS).get(0);
assertEquals("slack-file-url", flowFileOut.getAttribute("slack.file.url"));
}
@Test
public void mimeTypeShouldHaveFallbackValueWhenInvalid() throws InterruptedException {
testRunner.setProperty(PostSlack.FILE_UPLOAD_URL, url);
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty(PostSlack.UPLOAD_FLOWFILE, PostSlack.UPLOAD_FLOWFILE_YES);
testRunner.setProperty(PostSlack.FILE_NAME, "my-file-name");
testRunner.setProperty(PostSlack.FILE_MIME_TYPE, "invalid");
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(RESPONSE_SUCCESS_FILE_MSG));
testRunner.enqueue("my-data");
testRunner.run(1);
testRunner.assertAllFlowFilesTransferred(PutSlack.REL_SUCCESS);
// fallback value for mime type is 'application/octet-stream'
assertRequest("my-file-name", "application/octet-stream", null, null);
FlowFile flowFileOut = testRunner.getFlowFilesForRelationship(PutSlack.REL_SUCCESS).get(0);
assertEquals("slack-file-url", flowFileOut.getAttribute("slack.file.url"));
}
@Test
public void sendInternationalMessageSuccessfully() throws InterruptedException {
testRunner.setProperty(PostSlack.FILE_UPLOAD_URL, url);
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty(PostSlack.TEXT, "Iñtërnâtiônàližætiøn");
testRunner.setProperty(PostSlack.UPLOAD_FLOWFILE, PostSlack.UPLOAD_FLOWFILE_YES);
testRunner.setProperty(PostSlack.FILE_TITLE, "Iñtërnâtiônàližætiøn");
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(RESPONSE_SUCCESS_FILE_MSG));
testRunner.enqueue(new byte[0]);
testRunner.run(1);
testRunner.assertAllFlowFilesTransferred(PutSlack.REL_SUCCESS);
final RecordedRequest recordedRequest = mockWebServer.takeRequest();
final String body = recordedRequest.getBody().readString(StandardCharsets.UTF_8);
final Headers headers = recordedRequest.getHeaders();
Map<String, String> parts = parsePostBodyParts(parseMultipartBoundary(headers.get("Content-Type")), body);
assertEquals("Iñtërnâtiônàližætiøn", parts.get("initial_comment"));
assertEquals("Iñtërnâtiônàližætiøn", parts.get("title"));
FlowFile flowFileOut = testRunner.getFlowFilesForRelationship(PutSlack.REL_SUCCESS).get(0);
assertEquals("slack-file-url", flowFileOut.getAttribute("slack.file.url"));
}
private void assertRequest(String fileName, String mimeType, String text, String title) throws InterruptedException {
final RecordedRequest recordedRequest = mockWebServer.takeRequest();
final String body = recordedRequest.getBody().readString(StandardCharsets.UTF_8);
final Headers headers = recordedRequest.getHeaders();
assertEquals("Bearer my-access-token", headers.get("Authorization"));
String contentType = headers.get("Content-Type");
assertTrue(contentType.startsWith("multipart/form-data"));
String boundary = parseMultipartBoundary(contentType);
assertNotNull(boundary, "Multipart boundary not found in Content-Type header: " + contentType);
Map<String, String> parts = parsePostBodyParts(boundary, body);
assertNotNull(parts.get("channels"), "'channels' parameter not found in the POST request body");
assertEquals("my-channel", parts.get("channels"), "'channels' parameter has wrong value");
if (text != null) {
assertNotNull(parts.get("initial_comment"), "'initial_comment' parameter not found in the POST request body");
assertEquals(text, parts.get("initial_comment"), "'initial_comment' parameter has wrong value");
}
assertNotNull(parts.get("filename"), "'filename' parameter not found in the POST request body");
assertEquals(fileName, parts.get("filename"), "'fileName' parameter has wrong value");
if (title != null) {
assertNotNull(parts.get("title"), "'title' parameter not found in the POST request body");
assertEquals(title, parts.get("title"), "'title' parameter has wrong value");
}
assertNotNull(parts.get("file"), "The file part not found in the POST request body");
Map<String, String> fileParameters = parseFilePart(boundary, body);
assertEquals("my-data", fileParameters.get("data"), "File data is wrong in the POST request body");
assertEquals(fileName, fileParameters.get("filename"), "'filename' attribute of the file part has wrong value");
assertEquals(mimeType, fileParameters.get("contentType"), "Content-Type of the file part is wrong");
}
private String parseMultipartBoundary(String contentType) {
String boundary = null;
Pattern boundaryPattern = Pattern.compile("boundary=(.*?)$");
Matcher boundaryMatcher = boundaryPattern.matcher(contentType);
if (boundaryMatcher.find()) {
boundary = "--" + boundaryMatcher.group(1);
}
return boundary;
}
private Map<String, String> parsePostBodyParts(String boundary, String body) {
Pattern partNamePattern = Pattern.compile("name=\"(.*?)\"");
Pattern partDataPattern = Pattern.compile("\r\n\r\n(.*?)\r\n$");
String[] postBodyParts = body.split(boundary);
Map<String, String> parts = new HashMap<>();
for (String part: postBodyParts) {
Matcher partNameMatcher = partNamePattern.matcher(part);
Matcher partDataMatcher = partDataPattern.matcher(part);
if (partNameMatcher.find() && partDataMatcher.find()) {
String partName = partNameMatcher.group(1);
String partData = partDataMatcher.group(1);
parts.put(partName, partData);
}
}
return parts;
}
private Map<String, String> parseFilePart(String boundary, String body) {
Pattern partNamePattern = Pattern.compile("name=\"file\"");
Pattern partDataPattern = Pattern.compile("\r\n\r\n(.*?)\r\n$");
Pattern partFilenamePattern = Pattern.compile("filename=\"(.*?)\"");
Pattern partContentTypePattern = Pattern.compile("Content-Type: (.*?)\r\n");
String[] postBodyParts = body.split(boundary);
Map<String, String> fileParameters = new HashMap<>();
for (String part: postBodyParts) {
Matcher partNameMatcher = partNamePattern.matcher(part);
if (partNameMatcher.find()) {
Matcher partDataMatcher = partDataPattern.matcher(part);
if (partDataMatcher.find()) {
fileParameters.put("data", partDataMatcher.group(1));
}
Matcher partFilenameMatcher = partFilenamePattern.matcher(part);
if (partFilenameMatcher.find()) {
fileParameters.put("filename", partFilenameMatcher.group(1));
}
Matcher partContentTypeMatcher = partContentTypePattern.matcher(part);
if (partContentTypeMatcher.find()) {
fileParameters.put("contentType", partContentTypeMatcher.group(1));
}
}
}
return fileParameters;
}
}

View File

@ -1,294 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.slack;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import javax.json.Json;
import javax.json.JsonObject;
import javax.json.JsonReader;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class PostSlackTextMessageTest {
private static final String RESPONSE_SUCCESS_TEXT_MSG = "{\"ok\": true}";
private static final String RESPONSE_SUCCESS_FILE_MSG = "{\"ok\": true, \"file\": {\"url_private\": \"slack-file-url\"}}";
private static final String RESPONSE_WARNING = "{\"ok\": true, \"warning\": \"slack-warning\"}";
private static final String RESPONSE_ERROR = "{\"ok\": false, \"error\": \"slack-error\"}";
private static final String RESPONSE_EMPTY_JSON = "{}";
private static final String RESPONSE_INVALID_JSON = "{invalid-json}";
private TestRunner testRunner;
private MockWebServer mockWebServer;
private String url;
@BeforeEach
public void init() {
mockWebServer = new MockWebServer();
url = mockWebServer.url("/").toString();
testRunner = TestRunners.newTestRunner(PostSlack.class);
}
@Test
public void sendTextOnlyMessageSuccessfully() {
testRunner.setProperty(PostSlack.POST_MESSAGE_URL, url);
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty(PostSlack.TEXT, "my-text");
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(RESPONSE_SUCCESS_TEXT_MSG));
testRunner.enqueue(new byte[0]);
testRunner.run(1);
testRunner.assertAllFlowFilesTransferred(PutSlack.REL_SUCCESS);
JsonObject requestBodyJson = getRequestBodyJson();
assertBasicRequest(requestBodyJson);
assertEquals("my-text", requestBodyJson.getString("text"));
}
@Test
public void sendTextWithAttachmentMessageSuccessfully() {
testRunner.setProperty(PostSlack.POST_MESSAGE_URL, url);
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty(PostSlack.TEXT, "my-text");
testRunner.setProperty("attachment_01", "{\"my-attachment-key\": \"my-attachment-value\"}");
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(RESPONSE_SUCCESS_FILE_MSG));
testRunner.enqueue(new byte[0]);
testRunner.run(1);
testRunner.assertAllFlowFilesTransferred(PutSlack.REL_SUCCESS);
JsonObject requestBodyJson = getRequestBodyJson();
assertBasicRequest(requestBodyJson);
assertEquals("my-text", requestBodyJson.getString("text"));
assertEquals("[{\"my-attachment-key\":\"my-attachment-value\"}]", requestBodyJson.getJsonArray("attachments").toString());
}
@Test
public void sendAttachmentOnlyMessageSuccessfully() {
testRunner.setProperty(PostSlack.POST_MESSAGE_URL, url);
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty("attachment_01", "{\"my-attachment-key\": \"my-attachment-value\"}");
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(RESPONSE_SUCCESS_FILE_MSG));
testRunner.enqueue(new byte[0]);
testRunner.run(1);
testRunner.assertAllFlowFilesTransferred(PutSlack.REL_SUCCESS);
JsonObject requestBodyJson = getRequestBodyJson();
assertBasicRequest(requestBodyJson);
assertEquals("[{\"my-attachment-key\":\"my-attachment-value\"}]", requestBodyJson.getJsonArray("attachments").toString());
}
@Test
public void processShouldFailWhenChannelIsEmpty() {
testRunner.setProperty(PostSlack.POST_MESSAGE_URL, url);
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "${dummy}");
testRunner.setProperty(PostSlack.TEXT, "my-text");
testRunner.enqueue(new byte[0]);
testRunner.run(1);
testRunner.assertAllFlowFilesTransferred(PutSlack.REL_FAILURE);
}
@Test
public void processShouldFailWhenTextIsEmptyAndNoAttachmentSpecified() {
testRunner.setProperty(PostSlack.POST_MESSAGE_URL, url);
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty(PostSlack.TEXT, "${dummy}");
testRunner.enqueue(new byte[0]);
testRunner.run(1);
testRunner.assertAllFlowFilesTransferred(PutSlack.REL_FAILURE);
}
@Test
public void emptyAttachmentShouldBeSkipped() {
testRunner.setProperty(PostSlack.POST_MESSAGE_URL, url);
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty("attachment_01", "${dummy}");
testRunner.setProperty("attachment_02", "{\"my-attachment-key\": \"my-attachment-value\"}");
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(RESPONSE_SUCCESS_FILE_MSG));
testRunner.enqueue(new byte[0]);
testRunner.run(1);
testRunner.assertAllFlowFilesTransferred(PutSlack.REL_SUCCESS);
JsonObject requestBodyJson = getRequestBodyJson();
assertBasicRequest(requestBodyJson);
assertEquals(1, requestBodyJson.getJsonArray("attachments").size());
assertEquals("[{\"my-attachment-key\":\"my-attachment-value\"}]", requestBodyJson.getJsonArray("attachments").toString());
}
@Test
public void invalidAttachmentShouldBeSkipped() {
testRunner.setProperty(PostSlack.POST_MESSAGE_URL, url);
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty("attachment_01", "{invalid-json}");
testRunner.setProperty("attachment_02", "{\"my-attachment-key\": \"my-attachment-value\"}");
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(RESPONSE_SUCCESS_FILE_MSG));
testRunner.enqueue(new byte[0]);
testRunner.run(1);
testRunner.assertAllFlowFilesTransferred(PutSlack.REL_SUCCESS);
JsonObject requestBodyJson = getRequestBodyJson();
assertBasicRequest(requestBodyJson);
assertEquals(1, requestBodyJson.getJsonArray("attachments").size());
assertEquals("[{\"my-attachment-key\":\"my-attachment-value\"}]", requestBodyJson.getJsonArray("attachments").toString());
}
@Test
public void processShouldFailWhenHttpErrorCodeReturned() {
testRunner.setProperty(PostSlack.POST_MESSAGE_URL, url);
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty(PostSlack.TEXT, "my-text");
mockWebServer.enqueue(new MockResponse().setResponseCode(500));
testRunner.enqueue(new byte[0]);
testRunner.run(1);
testRunner.assertAllFlowFilesTransferred(PutSlack.REL_FAILURE);
}
@Test
public void processShouldFailWhenSlackReturnsError() {
testRunner.setProperty(PostSlack.POST_MESSAGE_URL, url);
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty(PostSlack.TEXT, "my-text");
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(RESPONSE_ERROR));
testRunner.enqueue(new byte[0]);
testRunner.run(1);
testRunner.assertAllFlowFilesTransferred(PutSlack.REL_FAILURE);
}
@Test
public void processShouldNotFailWhenSlackReturnsWarning() {
testRunner.setProperty(PostSlack.POST_MESSAGE_URL, url);
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty(PostSlack.TEXT, "my-text");
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(RESPONSE_WARNING));
testRunner.enqueue(new byte[0]);
testRunner.run(1);
testRunner.assertAllFlowFilesTransferred(PutSlack.REL_SUCCESS);
assertBasicRequest(getRequestBodyJson());
}
@Test
public void processShouldFailWhenSlackReturnsEmptyJson() {
testRunner.setProperty(PostSlack.POST_MESSAGE_URL, url);
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty(PostSlack.TEXT, "my-text");
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(RESPONSE_EMPTY_JSON));
testRunner.enqueue(new byte[0]);
testRunner.run(1);
testRunner.assertAllFlowFilesTransferred(PutSlack.REL_FAILURE);
}
@Test
public void processShouldFailWhenSlackReturnsInvalidJson() {
testRunner.setProperty(PostSlack.POST_MESSAGE_URL, url);
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty(PostSlack.TEXT, "my-text");
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(RESPONSE_INVALID_JSON));
testRunner.enqueue(new byte[0]);
testRunner.run(1);
testRunner.assertAllFlowFilesTransferred(PutSlack.REL_FAILURE);
}
@Test
public void sendInternationalMessageSuccessfully() {
testRunner.setProperty(PostSlack.POST_MESSAGE_URL, url);
testRunner.setProperty(PostSlack.ACCESS_TOKEN, "my-access-token");
testRunner.setProperty(PostSlack.CHANNEL, "my-channel");
testRunner.setProperty(PostSlack.TEXT, "Iñtërnâtiônàližætiøn");
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(RESPONSE_SUCCESS_TEXT_MSG));
testRunner.enqueue(new byte[0]);
testRunner.run(1);
testRunner.assertAllFlowFilesTransferred(PutSlack.REL_SUCCESS);
JsonObject requestBodyJson = getRequestBodyJson();
assertBasicRequest(requestBodyJson);
assertEquals("Iñtërnâtiônàližætiøn", requestBodyJson.getString("text"));
}
private void assertBasicRequest(JsonObject requestBodyJson) {
assertEquals("my-channel", requestBodyJson.getString("channel"));
}
private JsonObject getRequestBodyJson() {
try {
final RecordedRequest recordedRequest = mockWebServer.takeRequest();
try (final JsonReader reader = Json.createReader(new InputStreamReader(
recordedRequest.getBody().inputStream(), StandardCharsets.UTF_8))) {
return reader.readObject();
}
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -1,249 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.slack;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class PutSlackTest {
private TestRunner testRunner;
public static final String WEBHOOK_TEST_TEXT = "Hello From Apache NiFi";
private MockWebServer mockWebServer;
private String url;
@BeforeEach
public void init() {
mockWebServer = new MockWebServer();
url = mockWebServer.url("/").toString();
testRunner = TestRunners.newTestRunner(PutSlack.class);
}
@Test
public void testBlankText() {
testRunner.setProperty(PutSlack.WEBHOOK_URL, url);
testRunner.setProperty(PutSlack.WEBHOOK_TEXT, "");
testRunner.enqueue(new byte[0]);
assertThrows(AssertionError.class, () -> testRunner.run(1));
}
@Test
public void testBlankTextViaExpression() {
testRunner.setProperty(PutSlack.WEBHOOK_URL, url);
testRunner.setProperty(PutSlack.WEBHOOK_TEXT, "${invalid-attr}"); // Create a blank webhook text
testRunner.enqueue(new byte[0]);
testRunner.run(1);
testRunner.assertAllFlowFilesTransferred(PutSlack.REL_FAILURE);
}
@Test
public void testInvalidIconUrl() {
testRunner.setProperty(PutSlack.WEBHOOK_URL, url);
testRunner.setProperty(PutSlack.WEBHOOK_TEXT, WEBHOOK_TEST_TEXT);
testRunner.setProperty(PutSlack.ICON_URL, "invalid");
testRunner.enqueue(new byte[0]);
assertThrows(AssertionError.class, () -> testRunner.run(1));
}
@Test
public void testInvalidIconEmoji() {
testRunner.setProperty(PutSlack.WEBHOOK_URL, url);
testRunner.setProperty(PutSlack.WEBHOOK_TEXT, WEBHOOK_TEST_TEXT);
testRunner.setProperty(PutSlack.ICON_EMOJI, "invalid");
testRunner.enqueue(new byte[0]);
assertThrows(AssertionError.class, () -> testRunner.run(1));
}
@Test
public void testInvalidDynamicProperties() {
testRunner.setProperty(PutSlack.WEBHOOK_URL, url);
testRunner.setProperty(PutSlack.WEBHOOK_TEXT, WEBHOOK_TEST_TEXT);
PropertyDescriptor dynamicProp = new PropertyDescriptor.Builder()
.dynamic(true)
.name("foo")
.build();
testRunner.setProperty(dynamicProp, "{\"a\": a}");
testRunner.enqueue("{}".getBytes());
testRunner.run(1);
testRunner.assertTransferCount(PutSlack.REL_FAILURE, 1);
}
@Test
public void testValidDynamicProperties() {
testRunner.setProperty(PutSlack.WEBHOOK_URL, url);
testRunner.setProperty(PutSlack.WEBHOOK_TEXT, WEBHOOK_TEST_TEXT);
PropertyDescriptor dynamicProp = new PropertyDescriptor.Builder()
.dynamic(true)
.name("foo")
.build();
testRunner.setProperty(dynamicProp, "{\"a\": \"a\"}");
mockWebServer.enqueue(new MockResponse().setResponseCode(200));
testRunner.enqueue("{}".getBytes());
testRunner.run(1);
testRunner.assertTransferCount(PutSlack.REL_FAILURE, 0);
}
@Test
public void testValidDynamicPropertiesWithExpressionLanguage() {
ProcessSession session = testRunner.getProcessSessionFactory().createSession();
FlowFile ff = session.create();
Map<String, String> props = new HashMap<>();
props.put("foo", "\"bar\"");
props.put("ping", "pong");
ff = session.putAllAttributes(ff, props);
testRunner.setProperty(PutSlack.WEBHOOK_URL, url);
testRunner.setProperty(PutSlack.WEBHOOK_TEXT, WEBHOOK_TEST_TEXT);
PropertyDescriptor dynamicProp = new PropertyDescriptor.Builder()
.dynamic(true)
.name("foo")
.build();
testRunner.setProperty(dynamicProp, "{\"foo\": ${foo}, \"ping\":\"${ping}\"}");
mockWebServer.enqueue(new MockResponse().setResponseCode(200));
testRunner.enqueue(ff);
testRunner.run(1);
testRunner.assertTransferCount(PutSlack.REL_SUCCESS, 1);
}
@Test
public void testInvalidDynamicPropertiesWithExpressionLanguage() {
ProcessSession session = testRunner.getProcessSessionFactory().createSession();
FlowFile ff = session.create();
Map<String, String> props = new HashMap<>();
props.put("foo", "\"\"bar\"");
props.put("ping", "\"pong");
ff = session.putAllAttributes(ff, props);
testRunner.setProperty(PutSlack.WEBHOOK_URL, url);
testRunner.setProperty(PutSlack.WEBHOOK_TEXT, WEBHOOK_TEST_TEXT);
PropertyDescriptor dynamicProp = new PropertyDescriptor.Builder()
.dynamic(true)
.name("foo")
.build();
testRunner.setProperty(dynamicProp, "{\"foo\": ${foo}, \"ping\":\"${ping}\"}");
testRunner.enqueue(ff);
testRunner.run(1);
testRunner.assertTransferCount(PutSlack.REL_SUCCESS, 0);
testRunner.assertTransferCount(PutSlack.REL_FAILURE, 1);
}
@Test
public void testSimplePut() throws InterruptedException {
testRunner.setProperty(PutSlack.WEBHOOK_URL, url);
testRunner.setProperty(PutSlack.WEBHOOK_TEXT, PutSlackTest.WEBHOOK_TEST_TEXT);
mockWebServer.enqueue(new MockResponse().setResponseCode(200));
testRunner.enqueue(new byte[0]);
testRunner.run(1);
testRunner.assertAllFlowFilesTransferred(PutSlack.REL_SUCCESS, 1);
String expected = "payload=%7B%22text%22%3A%22Hello+From+Apache+NiFi%22%7D";
final RecordedRequest recordedRequest = mockWebServer.takeRequest();
final String requestBody = recordedRequest.getBody().readString(StandardCharsets.UTF_8);
assertEquals(expected, requestBody);
}
@Test
public void testSimplePutWithAttributes() throws InterruptedException {
testRunner.setProperty(PutSlack.WEBHOOK_URL, url);
testRunner.setProperty(PutSlack.WEBHOOK_TEXT, PutSlackTest.WEBHOOK_TEST_TEXT);
testRunner.setProperty(PutSlack.CHANNEL, "#test-attributes");
testRunner.setProperty(PutSlack.USERNAME, "integration-test-webhook");
testRunner.setProperty(PutSlack.ICON_EMOJI, ":smile:");
mockWebServer.enqueue(new MockResponse().setResponseCode(200));
testRunner.enqueue(new byte[0]);
testRunner.run(1);
testRunner.assertAllFlowFilesTransferred(PutSlack.REL_SUCCESS, 1);
final String expected = "payload=%7B%22text%22%3A%22Hello+From+Apache+NiFi%22%2C%22channel%22%3A%22%23test-attributes%22%2C%22username%22%3A%22" +
"integration-test-webhook%22%2C%22icon_emoji%22%3A%22%3Asmile%3A%22%7D";
final RecordedRequest recordedRequest = mockWebServer.takeRequest();
final String requestBody = recordedRequest.getBody().readString(StandardCharsets.UTF_8);
assertEquals(expected, requestBody);
}
@Test
public void testSimplePutWithAttributesIconURL() throws InterruptedException {
testRunner.setProperty(PutSlack.WEBHOOK_URL, url);
testRunner.setProperty(PutSlack.WEBHOOK_TEXT, PutSlackTest.WEBHOOK_TEST_TEXT);
testRunner.setProperty(PutSlack.CHANNEL, "#test-attributes-url");
testRunner.setProperty(PutSlack.USERNAME, "integration-test-webhook");
testRunner.setProperty(PutSlack.ICON_URL, "http://lorempixel.com/48/48/");
mockWebServer.enqueue(new MockResponse().setResponseCode(200));
testRunner.enqueue(new byte[0]);
testRunner.run(1);
testRunner.assertAllFlowFilesTransferred(PutSlack.REL_SUCCESS, 1);
final String expected = "payload=%7B%22text%22%3A%22Hello+From+Apache+NiFi%22%2C%22channel%22%3A%22%23test-attributes-url%22%2C%22username%22%3A%22"
+ "integration-test-webhook%22%2C%22icon_url%22%3A%22http%3A%2F%2Florempixel.com%2F48%2F48%2F%22%7D";
final RecordedRequest recordedRequest = mockWebServer.takeRequest();
final String requestBody = recordedRequest.getBody().readString(StandardCharsets.UTF_8);
assertEquals(expected, requestBody);
}
@Test
public void testSimplePutWithEL() throws InterruptedException {
testRunner.setProperty(PutSlack.WEBHOOK_URL, "${slack.url}");
testRunner.setProperty(PutSlack.WEBHOOK_TEXT, PutSlackTest.WEBHOOK_TEST_TEXT);
mockWebServer.enqueue(new MockResponse().setResponseCode(200));
testRunner.enqueue(new byte[0], new HashMap<String,String>(){{
put("slack.url", url);
}});
testRunner.run(1);
testRunner.assertAllFlowFilesTransferred(PutSlack.REL_SUCCESS, 1);
String expected = "payload=%7B%22text%22%3A%22Hello+From+Apache+NiFi%22%7D";
final RecordedRequest recordedRequest = mockWebServer.takeRequest();
final String requestBody = recordedRequest.getBody().readString(StandardCharsets.UTF_8);
assertEquals(expected, requestBody);
}
}