mirror of https://github.com/apache/nifi.git
NIFI-8497 Added SlackRecordSink
This closes #6593 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
c1890a5bb8
commit
1208e03b4c
|
@ -35,5 +35,11 @@
|
|||
<artifactId>nifi-slack-processors</artifactId>
|
||||
<version>2.0.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-standard-services-api-nar</artifactId>
|
||||
<version>2.0.0-SNAPSHOT</version>
|
||||
<type>nar</type>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
|
|
|
@ -85,5 +85,55 @@
|
|||
<artifactId>nifi-ssl-context-service-api</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-record-sink-api</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-record</artifactId>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-record-serialization-service-api</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-mock-record-utils</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-web-client-api</artifactId>
|
||||
<version>2.0.0-SNAPSHOT</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-web-client-provider-api</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-databind</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.datatype</groupId>
|
||||
<artifactId>jackson-datatype-jsr310</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-web-client-provider-service</artifactId>
|
||||
<version>2.0.0-SNAPSHOT</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-proxy-configuration-api</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
</project>
|
||||
|
|
|
@ -0,0 +1,67 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.services.slack;
|
||||
|
||||
import java.time.Instant;
|
||||
|
||||
public class PostMessageResponse {
|
||||
private boolean ok;
|
||||
private String channel;
|
||||
private Instant ts;
|
||||
private String error;
|
||||
private String warning;
|
||||
|
||||
public boolean isOk() {
|
||||
return ok;
|
||||
}
|
||||
|
||||
public void setOk(boolean ok) {
|
||||
this.ok = ok;
|
||||
}
|
||||
|
||||
public String getChannel() {
|
||||
return channel;
|
||||
}
|
||||
|
||||
public void setChannel(String channel) {
|
||||
this.channel = channel;
|
||||
}
|
||||
|
||||
public Instant getTs() {
|
||||
return ts;
|
||||
}
|
||||
|
||||
public void setTs(Instant ts) {
|
||||
this.ts = ts;
|
||||
}
|
||||
|
||||
public String getError() {
|
||||
return error;
|
||||
}
|
||||
|
||||
public void setError(String error) {
|
||||
this.error = error;
|
||||
}
|
||||
|
||||
public String getWarning() {
|
||||
return warning;
|
||||
}
|
||||
|
||||
public void setWarning(String warning) {
|
||||
this.warning = warning;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,157 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.services.slack;
|
||||
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.controller.AbstractControllerService;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.record.sink.RecordSinkService;
|
||||
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
||||
import org.apache.nifi.serialization.RecordSetWriter;
|
||||
import org.apache.nifi.serialization.RecordSetWriterFactory;
|
||||
import org.apache.nifi.serialization.WriteResult;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
import org.apache.nifi.serialization.record.RecordSet;
|
||||
import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@Tags({"slack", "record", "sink"})
|
||||
@CapabilityDescription("Format and send Records to a configured Channel using the Slack Post Message API. " +
|
||||
"The service requires a Slack App with a Bot User configured for access to a Slack workspace. " +
|
||||
"The Bot User OAuth Bearer Token is required for posting messages to Slack.")
|
||||
public class SlackRecordSink extends AbstractControllerService implements RecordSinkService {
|
||||
|
||||
private static final String SLACK_API_URL = "https://slack.com/api";
|
||||
|
||||
public static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder()
|
||||
.name("api-url")
|
||||
.displayName("API URL")
|
||||
.description("Slack Web API URL for posting text messages to channels." +
|
||||
" It only needs to be changed if Slack changes its API URL.")
|
||||
.required(true)
|
||||
.defaultValue(SLACK_API_URL)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.addValidator(StandardValidators.URL_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder()
|
||||
.name("access-token")
|
||||
.displayName("Access Token")
|
||||
.description("Bot OAuth Token used for authenticating and authorizing the Slack request sent by NiFi.")
|
||||
.required(true)
|
||||
.sensitive(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor CHANNEL_ID = new PropertyDescriptor.Builder()
|
||||
.name("channel-id")
|
||||
.displayName("Channel ID")
|
||||
.description("Slack channel, private group, or IM channel to send the message to. Use Channel ID instead of the name.")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor INPUT_CHARACTER_SET = new PropertyDescriptor.Builder()
|
||||
.name("input-character-set")
|
||||
.displayName("Input Character Set")
|
||||
.description("Specifies the character set of the records used to generate the Slack message.")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
|
||||
.defaultValue(StandardCharsets.UTF_8.name())
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor WEB_SERVICE_CLIENT_PROVIDER = new PropertyDescriptor.Builder()
|
||||
.name("web-service-client-provider")
|
||||
.displayName("Web Service Client Provider")
|
||||
.description("Controller service to provide HTTP client for communicating with Slack API")
|
||||
.required(true)
|
||||
.identifiesControllerService(WebClientServiceProvider.class)
|
||||
.build();
|
||||
|
||||
private volatile RecordSetWriterFactory writerFactory;
|
||||
private SlackRestService service;
|
||||
|
||||
@Override
|
||||
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return Collections.unmodifiableList(Arrays.asList(
|
||||
API_URL,
|
||||
ACCESS_TOKEN,
|
||||
CHANNEL_ID,
|
||||
INPUT_CHARACTER_SET,
|
||||
RECORD_WRITER_FACTORY,
|
||||
WEB_SERVICE_CLIENT_PROVIDER
|
||||
));
|
||||
}
|
||||
|
||||
@OnEnabled
|
||||
public void onEnabled(final ConfigurationContext context) {
|
||||
writerFactory = context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
|
||||
final WebClientServiceProvider webClientServiceProvider = context
|
||||
.getProperty(WEB_SERVICE_CLIENT_PROVIDER)
|
||||
.asControllerService(WebClientServiceProvider.class);
|
||||
final String accessToken = context.getProperty(ACCESS_TOKEN).getValue();
|
||||
final String apiUrl = context.getProperty(API_URL).getValue();
|
||||
final String charset = context.getProperty(INPUT_CHARACTER_SET).getValue();
|
||||
service = new SlackRestService(webClientServiceProvider, accessToken, apiUrl, charset, getLogger());
|
||||
}
|
||||
|
||||
@Override
|
||||
public WriteResult sendData(final RecordSet recordSet, final Map<String, String> attributes, final boolean sendZeroResults) throws IOException {
|
||||
WriteResult writeResult;
|
||||
final String channel = getConfigurationContext().getProperty(CHANNEL_ID).getValue();
|
||||
int recordCount = 0;
|
||||
try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) {
|
||||
try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), recordSet.getSchema(), out, attributes)) {
|
||||
writer.beginRecordSet();
|
||||
Record record = recordSet.next();
|
||||
while (record != null) {
|
||||
writer.write(record);
|
||||
writer.flush();
|
||||
record = recordSet.next();
|
||||
recordCount++;
|
||||
}
|
||||
writeResult = writer.finishRecordSet();
|
||||
writer.flush();
|
||||
} catch (final SchemaNotFoundException e) {
|
||||
final String errorMessage = String.format("RecordSetWriter could not be created because the schema was not found. The schema name for the RecordSet to write is %s",
|
||||
recordSet.getSchema().getSchemaName());
|
||||
throw new ProcessException(errorMessage, e);
|
||||
}
|
||||
if (recordCount > 0 || sendZeroResults) {
|
||||
try {
|
||||
final String message = out.toString();
|
||||
service.sendMessageToChannel(message, channel);
|
||||
} catch (final SlackRestServiceException e) {
|
||||
throw new IOException("Failed to send messages to Slack", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
return writeResult;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,121 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.services.slack;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.web.client.api.HttpResponseEntity;
|
||||
import org.apache.nifi.web.client.api.HttpUriBuilder;
|
||||
import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.util.OptionalLong;
|
||||
|
||||
public class SlackRestService {
|
||||
private static final String POST_MESSAGE_PATH = "chat.postMessage";
|
||||
private final WebClientServiceProvider webClientServiceProvider;
|
||||
private final String accessToken;
|
||||
private final String apiUrl;
|
||||
private final ObjectMapper objectMapper;
|
||||
private final ComponentLog logger;
|
||||
private final String charset;
|
||||
|
||||
public SlackRestService(final WebClientServiceProvider webClientServiceProvider,
|
||||
final String accessToken,
|
||||
final String apiUrl,
|
||||
final String charset,
|
||||
final ComponentLog logger) {
|
||||
this.webClientServiceProvider = webClientServiceProvider;
|
||||
this.accessToken = accessToken;
|
||||
this.apiUrl = apiUrl;
|
||||
this.objectMapper = new ObjectMapper();
|
||||
objectMapper.registerModule(new JavaTimeModule());
|
||||
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||
this.charset = charset;
|
||||
this.logger = logger;
|
||||
}
|
||||
|
||||
public void sendMessageToChannel(final String message, final String channel) throws SlackRestServiceException {
|
||||
final URI apiUri = URI.create(apiUrl);
|
||||
final HttpUriBuilder uriBuilder = webClientServiceProvider.getHttpUriBuilder()
|
||||
.scheme(apiUri.getScheme())
|
||||
.host(apiUri.getHost())
|
||||
.encodedPath(apiUri.getPath())
|
||||
.addPathSegment(POST_MESSAGE_PATH);
|
||||
if (apiUri.getPort() != -1) {
|
||||
uriBuilder.port(apiUri.getPort());
|
||||
}
|
||||
final URI uri = uriBuilder.build();
|
||||
|
||||
final ObjectNode requestBodyJson = createRequestBody(channel, message);
|
||||
|
||||
final InputStream requestBodyInputStream;
|
||||
try {
|
||||
requestBodyInputStream = new ByteArrayInputStream(objectMapper.writeValueAsBytes(requestBodyJson));
|
||||
} catch (final JsonProcessingException e) {
|
||||
throw new SlackRestServiceException("JSON message serialization failed", e);
|
||||
}
|
||||
|
||||
try (final HttpResponseEntity response = webClientServiceProvider.getWebClientService()
|
||||
.post()
|
||||
.uri(uri)
|
||||
.header("Authorization", String.format("Bearer %s", accessToken))
|
||||
.header("Content-Type", String.format("application/json; charset=\"%s\"", charset))
|
||||
.body(requestBodyInputStream, OptionalLong.of(requestBodyInputStream.available()))
|
||||
.retrieve()) {
|
||||
final int statusCode = response.statusCode();
|
||||
if (!(statusCode >= 200 && statusCode < 300)) {
|
||||
throw new SlackRestServiceException("HTTP error code: " + statusCode);
|
||||
}
|
||||
|
||||
try {
|
||||
final PostMessageResponse slackResponse = objectMapper.readValue(response.body(), PostMessageResponse.class);
|
||||
checkResponse(slackResponse, channel);
|
||||
} catch (final IOException e) {
|
||||
throw new SlackRestServiceException("JSON response parsing failed", e);
|
||||
}
|
||||
|
||||
} catch (final IOException e) {
|
||||
throw new ProcessException("Slack HTTP request failed", e);
|
||||
}
|
||||
}
|
||||
|
||||
private ObjectNode createRequestBody(final String channel, final String message) {
|
||||
final ObjectNode requestBodyJson = objectMapper.createObjectNode();
|
||||
requestBodyJson.put("channel", channel);
|
||||
requestBodyJson.put("text", message);
|
||||
return requestBodyJson;
|
||||
}
|
||||
|
||||
private void checkResponse(final PostMessageResponse response, final String channel) throws SlackRestServiceException {
|
||||
if (!response.isOk()) {
|
||||
throw new SlackRestServiceException("Slack error response: " + response.getError());
|
||||
}
|
||||
|
||||
if (response.getWarning() != null) {
|
||||
logger.warn("Post message to channel [{}] warning: {}", channel, response.getWarning());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,27 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.services.slack;
|
||||
|
||||
class SlackRestServiceException extends Exception {
|
||||
SlackRestServiceException(final String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
SlackRestServiceException(final String message, final Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,15 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
org.apache.nifi.services.slack.SlackRecordSink
|
|
@ -0,0 +1,228 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.services.slack;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import okhttp3.mockwebserver.MockResponse;
|
||||
import okhttp3.mockwebserver.MockWebServer;
|
||||
import okhttp3.mockwebserver.RecordedRequest;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.serialization.SimpleRecordSchema;
|
||||
import org.apache.nifi.serialization.WriteResult;
|
||||
import org.apache.nifi.serialization.record.MapRecord;
|
||||
import org.apache.nifi.serialization.record.MockRecordWriter;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
import org.apache.nifi.serialization.record.RecordField;
|
||||
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.serialization.record.RecordSet;
|
||||
import org.apache.nifi.util.NoOpProcessor;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
|
||||
import org.apache.nifi.web.client.provider.service.StandardWebClientServiceProvider;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class TestSlackRecordSink {
|
||||
private static final String RESPONSE_SUCCESS_TEXT_MSG_WITH_TIMESTAMP = "{\"ok\": true, \"ts\": \"1503435956.000247\"}";
|
||||
private static final String RESPONSE_WARNING = "{\"ok\": true, \"warning\": \"slack-warning\"}";
|
||||
private static final String RESPONSE_ERROR = "{\"ok\": false, \"error\": \"slack-error\"}";
|
||||
private static final String RESPONSE_EMPTY_JSON = "{}";
|
||||
private static final String RESPONSE_INVALID_JSON = "{invalid-json}";
|
||||
|
||||
private static final String CHANNEL_NAME = "my-channel";
|
||||
private static final String BEARER_TOKEN = "bearer-token";
|
||||
|
||||
private TestRunner testRunner;
|
||||
private MockWebServer mockWebServer;
|
||||
private SlackRecordSink slackRecordSink;
|
||||
private MockRecordWriter writerFactory;
|
||||
private RecordSet recordSet;
|
||||
private String recordContentsAsString;
|
||||
private ObjectMapper mapper;
|
||||
|
||||
@BeforeEach
|
||||
public void setup() throws InitializationException, IOException {
|
||||
mapper = new ObjectMapper();
|
||||
|
||||
mockWebServer = new MockWebServer();
|
||||
mockWebServer.start();
|
||||
String url = mockWebServer.url("/api/").toString();
|
||||
|
||||
testRunner = TestRunners.newTestRunner(NoOpProcessor.class);
|
||||
|
||||
final WebClientServiceProvider webClientServiceProvider = new StandardWebClientServiceProvider();
|
||||
testRunner.addControllerService("webClientServiceProvider", webClientServiceProvider);
|
||||
testRunner.enableControllerService(webClientServiceProvider);
|
||||
|
||||
slackRecordSink = new SlackRecordSink();
|
||||
|
||||
testRunner.addControllerService("slackRecordSink", slackRecordSink);
|
||||
testRunner.setProperty(slackRecordSink, SlackRecordSink.API_URL, url);
|
||||
testRunner.setProperty(slackRecordSink, SlackRecordSink.ACCESS_TOKEN, BEARER_TOKEN);
|
||||
testRunner.setProperty(slackRecordSink, SlackRecordSink.CHANNEL_ID, CHANNEL_NAME);
|
||||
testRunner.setProperty(slackRecordSink, SlackRecordSink.WEB_SERVICE_CLIENT_PROVIDER, "webClientServiceProvider");
|
||||
|
||||
writerFactory = new MockRecordWriter();
|
||||
testRunner.addControllerService("writer", writerFactory);
|
||||
testRunner.setProperty(slackRecordSink, SlackRecordSink.RECORD_WRITER_FACTORY, "writer");
|
||||
|
||||
final List<RecordField> fields = new ArrayList<>();
|
||||
fields.add(new RecordField("a", RecordFieldType.STRING.getDataType()));
|
||||
fields.add(new RecordField("b", RecordFieldType.BOOLEAN.getDataType()));
|
||||
|
||||
final RecordSchema schema = new SimpleRecordSchema(fields);
|
||||
|
||||
final Map<String, Object> valueMap1 = new HashMap<>();
|
||||
valueMap1.put("a", "Hello");
|
||||
valueMap1.put("b", true);
|
||||
final Record record1 = new MapRecord(schema, valueMap1);
|
||||
|
||||
final Map<String, Object> valueMap2 = new HashMap<>();
|
||||
valueMap2.put("a", "World");
|
||||
valueMap2.put("b", false);
|
||||
final Record record2 = new MapRecord(schema, valueMap2);
|
||||
|
||||
recordContentsAsString = "\"Hello\",\"true\"\n\"World\",\"false\"\n";
|
||||
recordSet = RecordSet.of(schema, record1, record2);
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void cleanUp() throws IOException {
|
||||
mockWebServer.shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSendMessage() throws IOException {
|
||||
testRunner.enableControllerService(writerFactory);
|
||||
testRunner.assertValid(slackRecordSink);
|
||||
testRunner.enableControllerService(slackRecordSink);
|
||||
|
||||
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(RESPONSE_SUCCESS_TEXT_MSG_WITH_TIMESTAMP));
|
||||
|
||||
final WriteResult writeResult = slackRecordSink.sendData(recordSet, Collections.emptyMap(), false);
|
||||
|
||||
assertNotNull(writeResult);
|
||||
assertEquals(2, writeResult.getRecordCount());
|
||||
assertEquals(Collections.EMPTY_MAP, writeResult.getAttributes());
|
||||
final JsonNode requestBodyJson = getRequestBodyJson();
|
||||
assertEquals(CHANNEL_NAME, requestBodyJson.get("channel").asText());
|
||||
assertEquals(recordContentsAsString, requestBodyJson.get("text").asText());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNotValidIfChannelEmpty() {
|
||||
testRunner.setProperty(slackRecordSink, SlackRecordSink.CHANNEL_ID, (String) null);
|
||||
|
||||
testRunner.enableControllerService(writerFactory);
|
||||
testRunner.assertNotValid(slackRecordSink);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNotValidIfBearerTokenEmpty() {
|
||||
testRunner.setProperty(slackRecordSink, SlackRecordSink.ACCESS_TOKEN, (String) null);
|
||||
|
||||
testRunner.enableControllerService(writerFactory);
|
||||
testRunner.assertNotValid(slackRecordSink);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailureWhenHttpErrorCodeReturned() {
|
||||
testRunner.enableControllerService(writerFactory);
|
||||
testRunner.assertValid(slackRecordSink);
|
||||
testRunner.enableControllerService(slackRecordSink);
|
||||
|
||||
mockWebServer.enqueue(new MockResponse().setResponseCode(500));
|
||||
|
||||
final IOException e = assertThrows(IOException.class, () -> slackRecordSink.sendData(recordSet, Collections.emptyMap(), false));
|
||||
assertTrue(e.getCause().getMessage().contains("500"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailureWhenSlackReturnsError() {
|
||||
testRunner.enableControllerService(writerFactory);
|
||||
testRunner.assertValid(slackRecordSink);
|
||||
testRunner.enableControllerService(slackRecordSink);
|
||||
|
||||
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(RESPONSE_ERROR));
|
||||
|
||||
final IOException e = assertThrows(IOException.class, () -> slackRecordSink.sendData(recordSet, Collections.emptyMap(), false));
|
||||
assertTrue(e.getCause().getMessage().contains("slack-error"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoFailureWhenSlackReturnsWarning() {
|
||||
testRunner.enableControllerService(writerFactory);
|
||||
testRunner.assertValid(slackRecordSink);
|
||||
testRunner.enableControllerService(slackRecordSink);
|
||||
|
||||
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(RESPONSE_WARNING));
|
||||
|
||||
assertDoesNotThrow(() -> {
|
||||
slackRecordSink.sendData(recordSet, Collections.emptyMap(), false);
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailureWhenSlackReturnsEmptyJson() {
|
||||
testRunner.enableControllerService(writerFactory);
|
||||
testRunner.assertValid(slackRecordSink);
|
||||
testRunner.enableControllerService(slackRecordSink);
|
||||
|
||||
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(RESPONSE_EMPTY_JSON));
|
||||
|
||||
final IOException e = assertThrows(IOException.class, () -> slackRecordSink.sendData(recordSet, Collections.emptyMap(), false));
|
||||
assertTrue(e.getCause().getMessage().contains("null"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailureWhenSlackReturnsInvalidJson() {
|
||||
testRunner.enableControllerService(writerFactory);
|
||||
testRunner.assertValid(slackRecordSink);
|
||||
testRunner.enableControllerService(slackRecordSink);
|
||||
|
||||
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody(RESPONSE_INVALID_JSON));
|
||||
|
||||
final IOException e = assertThrows(IOException.class, () -> slackRecordSink.sendData(recordSet, Collections.emptyMap(), false));
|
||||
assertTrue(e.getCause().getMessage().contains("parsing"));
|
||||
}
|
||||
|
||||
private JsonNode getRequestBodyJson() {
|
||||
try {
|
||||
final RecordedRequest recordedRequest = mockWebServer.takeRequest();
|
||||
return mapper.readTree(recordedRequest.getBody().inputStream());
|
||||
} catch (final Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue