NIFI-12630 Fix NPE getLogger in ConsumeSlack and PublishSlack

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes #8474
This commit is contained in:
Jim Steinebrey 2024-03-05 10:52:26 -05:00 committed by Matt Burgess
parent 26f5fa2be0
commit bda9b6360d
3 changed files with 39 additions and 6 deletions

View File

@ -182,7 +182,7 @@ public class ConsumeSlack extends AbstractProcessor implements VerifiableProcess
.build(); .build();
private final RateLimit rateLimit = new RateLimit(getLogger()); private RateLimit rateLimit;
private final Queue<ConsumeChannel> channels = new LinkedBlockingQueue<>(); private final Queue<ConsumeChannel> channels = new LinkedBlockingQueue<>();
private volatile App slackApp; private volatile App slackApp;
@ -205,6 +205,7 @@ public class ConsumeSlack extends AbstractProcessor implements VerifiableProcess
@OnScheduled @OnScheduled
public void setup(final ProcessContext context) throws IOException, SlackApiException { public void setup(final ProcessContext context) throws IOException, SlackApiException {
rateLimit = new RateLimit(getLogger());
slackApp = createSlackApp(context); slackApp = createSlackApp(context);
final List<ConsumeChannel> consumeChannels = createChannels(context, slackApp); final List<ConsumeChannel> consumeChannels = createChannels(context, slackApp);
@ -212,9 +213,18 @@ public class ConsumeSlack extends AbstractProcessor implements VerifiableProcess
} }
@OnStopped @OnStopped
public void onStopped() { public void shutdown() {
channels.clear(); channels.clear();
slackApp.stop(); if (slackApp != null) {
slackApp.stop();
slackApp = null;
}
rateLimit = null;
}
public RateLimit getRateLimit() {
return rateLimit;
} }

View File

@ -241,7 +241,7 @@ public class PublishSlack extends AbstractProcessor {
REL_RATE_LIMITED, REL_RATE_LIMITED,
REL_FAILURE); REL_FAILURE);
private final RateLimit rateLimit = new RateLimit(getLogger()); private RateLimit rateLimit;
private volatile ChannelMapper channelMapper; private volatile ChannelMapper channelMapper;
private volatile App slackApp; private volatile App slackApp;
@ -259,6 +259,7 @@ public class PublishSlack extends AbstractProcessor {
@OnScheduled @OnScheduled
public void setup(final ProcessContext context) { public void setup(final ProcessContext context) {
rateLimit = new RateLimit(getLogger());
slackApp = createSlackApp(context); slackApp = createSlackApp(context);
client = slackApp.client(); client = slackApp.client();
@ -267,9 +268,13 @@ public class PublishSlack extends AbstractProcessor {
@OnStopped @OnStopped
public void shutdown() { public void shutdown() {
channelMapper = null;
client = null;
if (slackApp != null) { if (slackApp != null) {
slackApp.stop(); slackApp.stop();
slackApp = null;
} }
rateLimit = null;
} }
private App createSlackApp(final ProcessContext context) { private App createSlackApp(final ProcessContext context) {

View File

@ -35,6 +35,7 @@ import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
@ -55,14 +56,14 @@ public class TestConsumeSlack {
private static final ObjectMapper objectMapper = new ObjectMapper(); private static final ObjectMapper objectMapper = new ObjectMapper();
private TestRunner testRunner; private TestRunner testRunner;
private MockConsumeSlackClient client; private MockConsumeSlackClient client;
ConsumeSlack processor;
@BeforeEach @BeforeEach
public void setup() { public void setup() {
client = new MockConsumeSlackClient(); client = new MockConsumeSlackClient();
// Create an instance of the processor that mocks out the initialize() method to return a client we can use for testing // Create an instance of the processor that mocks out the initialize() method to return a client we can use for testing
final ConsumeSlack processor = new ConsumeSlack() { processor = new ConsumeSlack() {
@Override @Override
protected ConsumeSlackClient initializeClient(final App slackApp) { protected ConsumeSlackClient initializeClient(final App slackApp) {
return client; return client;
@ -75,6 +76,23 @@ public class TestConsumeSlack {
testRunner.setProperty(ConsumeSlack.BATCH_SIZE, "5"); testRunner.setProperty(ConsumeSlack.BATCH_SIZE, "5");
} }
@Test
public void testRequestRateLimited() {
testRunner.setProperty(ConsumeSlack.CHANNEL_IDS, "cid1,cid2");
final Message message = createMessage("U12345", "Hello world", "1683903832.350");
client.addHistoryResponse(noMore(createSuccessfulHistoryResponse(message)));
testRunner.run(1, false, true);
testRunner.assertAllFlowFilesTransferred(ConsumeSlack.REL_SUCCESS, 1);
testRunner.clearTransferState();
// Create another HttpResponse because each response can only be read once.
client.addHistoryResponse(noMore(createSuccessfulHistoryResponse(message)));
// Set processor to be in rate limited state, therefore it will process 0 flowfiles
processor.getRateLimit().retryAfter(Duration.ofSeconds(30));
testRunner.run(1, true, false);
testRunner.assertAllFlowFilesTransferred(ConsumeSlack.REL_SUCCESS, 0);
}
@Test @Test
public void testSuccessfullyReceivedSingleMessage() throws JsonProcessingException { public void testSuccessfullyReceivedSingleMessage() throws JsonProcessingException {