From 1208e03b4cee85e0e4c9fadf807549985eb3a2b3 Mon Sep 17 00:00:00 2001 From: Emilio Setiadarma Date: Mon, 24 Oct 2022 18:19:29 -0700 Subject: [PATCH] NIFI-8497 Added SlackRecordSink This closes #6593 Signed-off-by: David Handermann --- .../nifi-slack-bundle/nifi-slack-nar/pom.xml | 6 + .../nifi-slack-processors/pom.xml | 50 ++++ .../services/slack/PostMessageResponse.java | 67 +++++ .../nifi/services/slack/SlackRecordSink.java | 157 ++++++++++++ .../nifi/services/slack/SlackRestService.java | 121 ++++++++++ .../slack/SlackRestServiceException.java | 27 +++ ...g.apache.nifi.controller.ControllerService | 15 ++ .../services/slack/TestSlackRecordSink.java | 228 ++++++++++++++++++ 8 files changed, 671 insertions(+) create mode 100644 nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/PostMessageResponse.java create mode 100644 nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRecordSink.java create mode 100644 nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRestService.java create mode 100644 nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRestServiceException.java create mode 100644 nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService create mode 100644 nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/test/java/org/apache/nifi/services/slack/TestSlackRecordSink.java diff --git a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-nar/pom.xml b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-nar/pom.xml index 46f5d27717..9fea5bafd1 100644 --- a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-nar/pom.xml +++ b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-nar/pom.xml @@ -35,5 +35,11 @@ nifi-slack-processors 2.0.0-SNAPSHOT + + org.apache.nifi + nifi-standard-services-api-nar + 2.0.0-SNAPSHOT + nar + diff --git a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/pom.xml b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/pom.xml index e6b25f4a6b..6d957e1fbb 100644 --- a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/pom.xml +++ b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/pom.xml @@ -85,5 +85,55 @@ nifi-ssl-context-service-api test + + org.apache.nifi + nifi-record-sink-api + provided + + + org.apache.nifi + nifi-record + compile + + + org.apache.nifi + nifi-record-serialization-service-api + provided + + + org.apache.nifi + nifi-mock-record-utils + + + org.apache.nifi + nifi-web-client-api + 2.0.0-SNAPSHOT + provided + + + org.apache.nifi + nifi-web-client-provider-api + provided + + + com.fasterxml.jackson.core + jackson-databind + + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + + + org.apache.nifi + nifi-web-client-provider-service + 2.0.0-SNAPSHOT + test + + + org.apache.nifi + nifi-proxy-configuration-api + test + + diff --git a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/PostMessageResponse.java b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/PostMessageResponse.java new file mode 100644 index 0000000000..6ebea1cffa --- /dev/null +++ b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/PostMessageResponse.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.slack; + +import java.time.Instant; + +public class PostMessageResponse { + private boolean ok; + private String channel; + private Instant ts; + private String error; + private String warning; + + public boolean isOk() { + return ok; + } + + public void setOk(boolean ok) { + this.ok = ok; + } + + public String getChannel() { + return channel; + } + + public void setChannel(String channel) { + this.channel = channel; + } + + public Instant getTs() { + return ts; + } + + public void setTs(Instant ts) { + this.ts = ts; + } + + public String getError() { + return error; + } + + public void setError(String error) { + this.error = error; + } + + public String getWarning() { + return warning; + } + + public void setWarning(String warning) { + this.warning = warning; + } +} diff --git a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRecordSink.java b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRecordSink.java new file mode 100644 index 0000000000..f833122cec --- /dev/null +++ b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRecordSink.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.slack; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.record.sink.RecordSinkService; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.web.client.provider.api.WebClientServiceProvider; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +@Tags({"slack", "record", "sink"}) +@CapabilityDescription("Format and send Records to a configured Channel using the Slack Post Message API. " + + "The service requires a Slack App with a Bot User configured for access to a Slack workspace. " + + "The Bot User OAuth Bearer Token is required for posting messages to Slack.") +public class SlackRecordSink extends AbstractControllerService implements RecordSinkService { + + private static final String SLACK_API_URL = "https://slack.com/api"; + + public static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder() + .name("api-url") + .displayName("API URL") + .description("Slack Web API URL for posting text messages to channels." + + " It only needs to be changed if Slack changes its API URL.") + .required(true) + .defaultValue(SLACK_API_URL) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(StandardValidators.URL_VALIDATOR) + .build(); + + public static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder() + .name("access-token") + .displayName("Access Token") + .description("Bot OAuth Token used for authenticating and authorizing the Slack request sent by NiFi.") + .required(true) + .sensitive(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor CHANNEL_ID = new PropertyDescriptor.Builder() + .name("channel-id") + .displayName("Channel ID") + .description("Slack channel, private group, or IM channel to send the message to. Use Channel ID instead of the name.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor INPUT_CHARACTER_SET = new PropertyDescriptor.Builder() + .name("input-character-set") + .displayName("Input Character Set") + .description("Specifies the character set of the records used to generate the Slack message.") + .required(true) + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .defaultValue(StandardCharsets.UTF_8.name()) + .build(); + + public static final PropertyDescriptor WEB_SERVICE_CLIENT_PROVIDER = new PropertyDescriptor.Builder() + .name("web-service-client-provider") + .displayName("Web Service Client Provider") + .description("Controller service to provide HTTP client for communicating with Slack API") + .required(true) + .identifiesControllerService(WebClientServiceProvider.class) + .build(); + + private volatile RecordSetWriterFactory writerFactory; + private SlackRestService service; + + @Override + public List getSupportedPropertyDescriptors() { + return Collections.unmodifiableList(Arrays.asList( + API_URL, + ACCESS_TOKEN, + CHANNEL_ID, + INPUT_CHARACTER_SET, + RECORD_WRITER_FACTORY, + WEB_SERVICE_CLIENT_PROVIDER + )); + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + writerFactory = context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class); + final WebClientServiceProvider webClientServiceProvider = context + .getProperty(WEB_SERVICE_CLIENT_PROVIDER) + .asControllerService(WebClientServiceProvider.class); + final String accessToken = context.getProperty(ACCESS_TOKEN).getValue(); + final String apiUrl = context.getProperty(API_URL).getValue(); + final String charset = context.getProperty(INPUT_CHARACTER_SET).getValue(); + service = new SlackRestService(webClientServiceProvider, accessToken, apiUrl, charset, getLogger()); + } + + @Override + public WriteResult sendData(final RecordSet recordSet, final Map attributes, final boolean sendZeroResults) throws IOException { + WriteResult writeResult; + final String channel = getConfigurationContext().getProperty(CHANNEL_ID).getValue(); + int recordCount = 0; + try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) { + try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), recordSet.getSchema(), out, attributes)) { + writer.beginRecordSet(); + Record record = recordSet.next(); + while (record != null) { + writer.write(record); + writer.flush(); + record = recordSet.next(); + recordCount++; + } + writeResult = writer.finishRecordSet(); + writer.flush(); + } catch (final SchemaNotFoundException e) { + final String errorMessage = String.format("RecordSetWriter could not be created because the schema was not found. The schema name for the RecordSet to write is %s", + recordSet.getSchema().getSchemaName()); + throw new ProcessException(errorMessage, e); + } + if (recordCount > 0 || sendZeroResults) { + try { + final String message = out.toString(); + service.sendMessageToChannel(message, channel); + } catch (final SlackRestServiceException e) { + throw new IOException("Failed to send messages to Slack", e); + } + } + } + return writeResult; + } +} diff --git a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRestService.java b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRestService.java new file mode 100644 index 0000000000..3a23eb3e7b --- /dev/null +++ b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRestService.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.slack; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.web.client.api.HttpResponseEntity; +import org.apache.nifi.web.client.api.HttpUriBuilder; +import org.apache.nifi.web.client.provider.api.WebClientServiceProvider; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.OptionalLong; + +public class SlackRestService { + private static final String POST_MESSAGE_PATH = "chat.postMessage"; + private final WebClientServiceProvider webClientServiceProvider; + private final String accessToken; + private final String apiUrl; + private final ObjectMapper objectMapper; + private final ComponentLog logger; + private final String charset; + + public SlackRestService(final WebClientServiceProvider webClientServiceProvider, + final String accessToken, + final String apiUrl, + final String charset, + final ComponentLog logger) { + this.webClientServiceProvider = webClientServiceProvider; + this.accessToken = accessToken; + this.apiUrl = apiUrl; + this.objectMapper = new ObjectMapper(); + objectMapper.registerModule(new JavaTimeModule()); + objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + this.charset = charset; + this.logger = logger; + } + + public void sendMessageToChannel(final String message, final String channel) throws SlackRestServiceException { + final URI apiUri = URI.create(apiUrl); + final HttpUriBuilder uriBuilder = webClientServiceProvider.getHttpUriBuilder() + .scheme(apiUri.getScheme()) + .host(apiUri.getHost()) + .encodedPath(apiUri.getPath()) + .addPathSegment(POST_MESSAGE_PATH); + if (apiUri.getPort() != -1) { + uriBuilder.port(apiUri.getPort()); + } + final URI uri = uriBuilder.build(); + + final ObjectNode requestBodyJson = createRequestBody(channel, message); + + final InputStream requestBodyInputStream; + try { + requestBodyInputStream = new ByteArrayInputStream(objectMapper.writeValueAsBytes(requestBodyJson)); + } catch (final JsonProcessingException e) { + throw new SlackRestServiceException("JSON message serialization failed", e); + } + + try (final HttpResponseEntity response = webClientServiceProvider.getWebClientService() + .post() + .uri(uri) + .header("Authorization", String.format("Bearer %s", accessToken)) + .header("Content-Type", String.format("application/json; charset=\"%s\"", charset)) + .body(requestBodyInputStream, OptionalLong.of(requestBodyInputStream.available())) + .retrieve()) { + final int statusCode = response.statusCode(); + if (!(statusCode >= 200 && statusCode < 300)) { + throw new SlackRestServiceException("HTTP error code: " + statusCode); + } + + try { + final PostMessageResponse slackResponse = objectMapper.readValue(response.body(), PostMessageResponse.class); + checkResponse(slackResponse, channel); + } catch (final IOException e) { + throw new SlackRestServiceException("JSON response parsing failed", e); + } + + } catch (final IOException e) { + throw new ProcessException("Slack HTTP request failed", e); + } + } + + private ObjectNode createRequestBody(final String channel, final String message) { + final ObjectNode requestBodyJson = objectMapper.createObjectNode(); + requestBodyJson.put("channel", channel); + requestBodyJson.put("text", message); + return requestBodyJson; + } + + private void checkResponse(final PostMessageResponse response, final String channel) throws SlackRestServiceException { + if (!response.isOk()) { + throw new SlackRestServiceException("Slack error response: " + response.getError()); + } + + if (response.getWarning() != null) { + logger.warn("Post message to channel [{}] warning: {}", channel, response.getWarning()); + } + } +} diff --git a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRestServiceException.java b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRestServiceException.java new file mode 100644 index 0000000000..b2f09baf62 --- /dev/null +++ b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRestServiceException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.slack; + +class SlackRestServiceException extends Exception { + SlackRestServiceException(final String message) { + super(message); + } + + SlackRestServiceException(final String message, final Throwable cause) { + super(message, cause); + } +} diff --git a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService new file mode 100644 index 0000000000..5069eb222b --- /dev/null +++ b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.services.slack.SlackRecordSink diff --git a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/test/java/org/apache/nifi/services/slack/TestSlackRecordSink.java b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/test/java/org/apache/nifi/services/slack/TestSlackRecordSink.java new file mode 100644 index 0000000000..8ab2896d7b --- /dev/null +++ b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/test/java/org/apache/nifi/services/slack/TestSlackRecordSink.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.slack; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import okhttp3.mockwebserver.RecordedRequest; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.MockRecordWriter; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.util.NoOpProcessor; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.nifi.web.client.provider.api.WebClientServiceProvider; +import org.apache.nifi.web.client.provider.service.StandardWebClientServiceProvider; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestSlackRecordSink { + private static final String RESPONSE_SUCCESS_TEXT_MSG_WITH_TIMESTAMP = "{\"ok\": true, \"ts\": \"1503435956.000247\"}"; + private static final String RESPONSE_WARNING = "{\"ok\": true, \"warning\": \"slack-warning\"}"; + private static final String RESPONSE_ERROR = "{\"ok\": false, \"error\": \"slack-error\"}"; + private static final String RESPONSE_EMPTY_JSON = "{}"; + private static final String RESPONSE_INVALID_JSON = "{invalid-json}"; + + private static final String CHANNEL_NAME = "my-channel"; + private static final String BEARER_TOKEN = "bearer-token"; + + private TestRunner testRunner; + private MockWebServer mockWebServer; + private SlackRecordSink slackRecordSink; + private MockRecordWriter writerFactory; + private RecordSet recordSet; + private String recordContentsAsString; + private ObjectMapper mapper; + + @BeforeEach + public void setup() throws InitializationException, IOException { + mapper = new ObjectMapper(); + + mockWebServer = new MockWebServer(); + mockWebServer.start(); + String url = mockWebServer.url("/api/").toString(); + + testRunner = TestRunners.newTestRunner(NoOpProcessor.class); + + final WebClientServiceProvider webClientServiceProvider = new StandardWebClientServiceProvider(); + testRunner.addControllerService("webClientServiceProvider", webClientServiceProvider); + testRunner.enableControllerService(webClientServiceProvider); + + slackRecordSink = new SlackRecordSink(); + + testRunner.addControllerService("slackRecordSink", slackRecordSink); + testRunner.setProperty(slackRecordSink, SlackRecordSink.API_URL, url); + testRunner.setProperty(slackRecordSink, SlackRecordSink.ACCESS_TOKEN, BEARER_TOKEN); + testRunner.setProperty(slackRecordSink, SlackRecordSink.CHANNEL_ID, CHANNEL_NAME); + testRunner.setProperty(slackRecordSink, SlackRecordSink.WEB_SERVICE_CLIENT_PROVIDER, "webClientServiceProvider"); + + writerFactory = new MockRecordWriter(); + testRunner.addControllerService("writer", writerFactory); + testRunner.setProperty(slackRecordSink, SlackRecordSink.RECORD_WRITER_FACTORY, "writer"); + + final List fields = new ArrayList<>(); + fields.add(new RecordField("a", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("b", RecordFieldType.BOOLEAN.getDataType())); + + final RecordSchema schema = new SimpleRecordSchema(fields); + + final Map valueMap1 = new HashMap<>(); + valueMap1.put("a", "Hello"); + valueMap1.put("b", true); + final Record record1 = new MapRecord(schema, valueMap1); + + final Map valueMap2 = new HashMap<>(); + valueMap2.put("a", "World"); + valueMap2.put("b", false); + final Record record2 = new MapRecord(schema, valueMap2); + + recordContentsAsString = "\"Hello\",\"true\"\n\"World\",\"false\"\n"; + recordSet = RecordSet.of(schema, record1, record2); + } + + @AfterEach + public void cleanUp() throws IOException { + mockWebServer.shutdown(); + } + + @Test + public void testSendMessage() throws IOException { + testRunner.enableControllerService(writerFactory); + testRunner.assertValid(slackRecordSink); + testRunner.enableControllerService(slackRecordSink); + + mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(RESPONSE_SUCCESS_TEXT_MSG_WITH_TIMESTAMP)); + + final WriteResult writeResult = slackRecordSink.sendData(recordSet, Collections.emptyMap(), false); + + assertNotNull(writeResult); + assertEquals(2, writeResult.getRecordCount()); + assertEquals(Collections.EMPTY_MAP, writeResult.getAttributes()); + final JsonNode requestBodyJson = getRequestBodyJson(); + assertEquals(CHANNEL_NAME, requestBodyJson.get("channel").asText()); + assertEquals(recordContentsAsString, requestBodyJson.get("text").asText()); + } + + @Test + public void testNotValidIfChannelEmpty() { + testRunner.setProperty(slackRecordSink, SlackRecordSink.CHANNEL_ID, (String) null); + + testRunner.enableControllerService(writerFactory); + testRunner.assertNotValid(slackRecordSink); + } + + @Test + public void testNotValidIfBearerTokenEmpty() { + testRunner.setProperty(slackRecordSink, SlackRecordSink.ACCESS_TOKEN, (String) null); + + testRunner.enableControllerService(writerFactory); + testRunner.assertNotValid(slackRecordSink); + } + + @Test + public void testFailureWhenHttpErrorCodeReturned() { + testRunner.enableControllerService(writerFactory); + testRunner.assertValid(slackRecordSink); + testRunner.enableControllerService(slackRecordSink); + + mockWebServer.enqueue(new MockResponse().setResponseCode(500)); + + final IOException e = assertThrows(IOException.class, () -> slackRecordSink.sendData(recordSet, Collections.emptyMap(), false)); + assertTrue(e.getCause().getMessage().contains("500")); + } + + @Test + public void testFailureWhenSlackReturnsError() { + testRunner.enableControllerService(writerFactory); + testRunner.assertValid(slackRecordSink); + testRunner.enableControllerService(slackRecordSink); + + mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(RESPONSE_ERROR)); + + final IOException e = assertThrows(IOException.class, () -> slackRecordSink.sendData(recordSet, Collections.emptyMap(), false)); + assertTrue(e.getCause().getMessage().contains("slack-error")); + } + + @Test + public void testNoFailureWhenSlackReturnsWarning() { + testRunner.enableControllerService(writerFactory); + testRunner.assertValid(slackRecordSink); + testRunner.enableControllerService(slackRecordSink); + + mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(RESPONSE_WARNING)); + + assertDoesNotThrow(() -> { + slackRecordSink.sendData(recordSet, Collections.emptyMap(), false); + }); + } + + @Test + public void testFailureWhenSlackReturnsEmptyJson() { + testRunner.enableControllerService(writerFactory); + testRunner.assertValid(slackRecordSink); + testRunner.enableControllerService(slackRecordSink); + + mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(RESPONSE_EMPTY_JSON)); + + final IOException e = assertThrows(IOException.class, () -> slackRecordSink.sendData(recordSet, Collections.emptyMap(), false)); + assertTrue(e.getCause().getMessage().contains("null")); + } + + @Test + public void testFailureWhenSlackReturnsInvalidJson() { + testRunner.enableControllerService(writerFactory); + testRunner.assertValid(slackRecordSink); + testRunner.enableControllerService(slackRecordSink); + + mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(RESPONSE_INVALID_JSON)); + + final IOException e = assertThrows(IOException.class, () -> slackRecordSink.sendData(recordSet, Collections.emptyMap(), false)); + assertTrue(e.getCause().getMessage().contains("parsing")); + } + + private JsonNode getRequestBodyJson() { + try { + final RecordedRequest recordedRequest = mockWebServer.takeRequest(); + return mapper.readTree(recordedRequest.getBody().inputStream()); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } +}