NIFI-12277 Added SSLContextService to Slack Processors

This closes #7936

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mark Bathori 2023-10-26 16:51:57 +02:00 committed by exceptionfactory
parent 0e1ae2bd6f
commit b2c4baf429
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
3 changed files with 50 additions and 9 deletions

View File

@ -96,7 +96,6 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>

View File

@ -33,10 +33,17 @@ import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import javax.json.JsonString;
import javax.json.stream.JsonParsingException;
import javax.net.ssl.SSLContext;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.conn.socket.PlainConnectionSocketFactory;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.entity.mime.MultipartEntityBuilder;
@ -65,6 +72,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;
@Tags({"slack", "post", "notify", "upload", "message"})
@CapabilityDescription("Sends a message on Slack. The FlowFile content (e.g. an image) can be uploaded and attached to the message.")
@ -180,6 +188,12 @@ public class PostSlack extends AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("Specifies an optional SSL Context Service that, if provided, will be used to create connections")
.identifiesControllerService(SSLContextService.class)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles are routed to success after being successfully sent to Slack")
@ -199,7 +213,8 @@ public class PostSlack extends AbstractProcessor {
UPLOAD_FLOWFILE,
FILE_TITLE,
FILE_NAME,
FILE_MIME_TYPE);
FILE_MIME_TYPE,
SSL_CONTEXT_SERVICE);
public static final Set<Relationship> relationships = Set.of(REL_SUCCESS, REL_FAILURE);
@ -235,18 +250,27 @@ public class PostSlack extends AbstractProcessor {
}
@OnScheduled
public void initDynamicProperties(ProcessContext context) {
public void onScheduled(ProcessContext context) {
attachmentProperties.clear();
attachmentProperties.addAll(
context.getProperties().keySet()
.stream()
.filter(PropertyDescriptor::isDynamic)
.toList());
}
@OnScheduled
public void initHttpResources() {
connManager = new PoolingHttpClientConnectionManager();
final SSLContextService sslService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sslService != null) {
final SSLContext sslContext = sslService.createContext();
final Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder.<ConnectionSocketFactory>create()
.register("http", PlainConnectionSocketFactory.getSocketFactory())
.register("https", new SSLConnectionSocketFactory(sslContext))
.build();
connManager = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
} else {
connManager = new PoolingHttpClientConnectionManager();
}
client = HttpClientBuilder.create()
.setConnectionManager(connManager)

View File

@ -38,6 +38,9 @@ import javax.json.JsonObjectBuilder;
import javax.json.JsonReader;
import javax.json.JsonWriter;
import javax.json.stream.JsonParsingException;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -55,6 +58,7 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;
@Tags({"put", "slack", "notify"})
@CapabilityDescription("Publishes a message to Slack")
@ -137,6 +141,12 @@ public class PutSlack extends AbstractProcessor {
.addValidator(new EmojiValidator())
.build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("Specifies an optional SSL Context Service that, if provided, will be used to create connections")
.identifiesControllerService(SSLContextService.class)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles are routed to success after being successfully sent to Slack")
@ -156,7 +166,8 @@ public class PutSlack extends AbstractProcessor {
USERNAME,
THREAD_TS,
ICON_URL,
ICON_EMOJI);
ICON_EMOJI,
SSL_CONTEXT_SERVICE);
private static final Set<Relationship> relationships = Set.of(REL_SUCCESS, REL_FAILURE);
@ -236,6 +247,8 @@ public class PutSlack extends AbstractProcessor {
builder.add("icon_emoji", iconEmoji);
}
final SSLContextService sslService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
try {
// Get Attachments Array
if (!attachmentDescriptors.isEmpty()) {
@ -261,12 +274,17 @@ public class PutSlack extends AbstractProcessor {
conn.setRequestMethod("POST");
conn.setDoOutput(true);
if (sslService != null) {
final SSLContext sslContext = sslService.createContext();
((HttpsURLConnection) conn).setSSLSocketFactory(sslContext.getSocketFactory());
}
final DataOutputStream outputStream = new DataOutputStream(conn.getOutputStream());
final String payload = "payload=" + URLEncoder.encode(stringWriter.getBuffer().toString(), StandardCharsets.UTF_8);
outputStream.writeBytes(payload);
outputStream.close();
final int responseCode = conn.getResponseCode();
int responseCode = conn.getResponseCode();
if (responseCode >= 200 && responseCode < 300) {
getLogger().info("Successfully posted message to Slack");
session.transfer(flowFile, REL_SUCCESS);