diff --git a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/slack/ConsumeSlack.java b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/slack/ConsumeSlack.java index 43c58245fd..8b8c453978 100644 --- a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/slack/ConsumeSlack.java +++ b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/slack/ConsumeSlack.java @@ -182,7 +182,7 @@ public class ConsumeSlack extends AbstractProcessor implements VerifiableProcess .build(); - private final RateLimit rateLimit = new RateLimit(getLogger()); + private RateLimit rateLimit; private final Queue channels = new LinkedBlockingQueue<>(); private volatile App slackApp; @@ -205,6 +205,7 @@ public class ConsumeSlack extends AbstractProcessor implements VerifiableProcess @OnScheduled public void setup(final ProcessContext context) throws IOException, SlackApiException { + rateLimit = new RateLimit(getLogger()); slackApp = createSlackApp(context); final List consumeChannels = createChannels(context, slackApp); @@ -212,9 +213,18 @@ public class ConsumeSlack extends AbstractProcessor implements VerifiableProcess } @OnStopped - public void onStopped() { + public void shutdown() { channels.clear(); - slackApp.stop(); + if (slackApp != null) { + slackApp.stop(); + slackApp = null; + } + rateLimit = null; + } + + + public RateLimit getRateLimit() { + return rateLimit; } diff --git a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/slack/PublishSlack.java b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/slack/PublishSlack.java index 012c12d1de..4390d2a476 100644 --- a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/slack/PublishSlack.java +++ b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/slack/PublishSlack.java @@ -241,7 +241,7 @@ public class PublishSlack extends AbstractProcessor { REL_RATE_LIMITED, REL_FAILURE); - private final RateLimit rateLimit = new RateLimit(getLogger()); + private RateLimit rateLimit; private volatile ChannelMapper channelMapper; private volatile App slackApp; @@ -259,6 +259,7 @@ public class PublishSlack extends AbstractProcessor { @OnScheduled public void setup(final ProcessContext context) { + rateLimit = new RateLimit(getLogger()); slackApp = createSlackApp(context); client = slackApp.client(); @@ -267,9 +268,13 @@ public class PublishSlack extends AbstractProcessor { @OnStopped public void shutdown() { + channelMapper = null; + client = null; if (slackApp != null) { slackApp.stop(); + slackApp = null; } + rateLimit = null; } private App createSlackApp(final ProcessContext context) { diff --git a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/test/java/org/apache/nifi/processors/slack/TestConsumeSlack.java b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/test/java/org/apache/nifi/processors/slack/TestConsumeSlack.java index a3c73ff81d..f791f4bd66 100644 --- a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/test/java/org/apache/nifi/processors/slack/TestConsumeSlack.java +++ b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/test/java/org/apache/nifi/processors/slack/TestConsumeSlack.java @@ -35,6 +35,7 @@ import org.apache.nifi.util.TestRunners; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -55,14 +56,14 @@ public class TestConsumeSlack { private static final ObjectMapper objectMapper = new ObjectMapper(); private TestRunner testRunner; private MockConsumeSlackClient client; - + ConsumeSlack processor; @BeforeEach public void setup() { client = new MockConsumeSlackClient(); // Create an instance of the processor that mocks out the initialize() method to return a client we can use for testing - final ConsumeSlack processor = new ConsumeSlack() { + processor = new ConsumeSlack() { @Override protected ConsumeSlackClient initializeClient(final App slackApp) { return client; @@ -75,6 +76,23 @@ public class TestConsumeSlack { testRunner.setProperty(ConsumeSlack.BATCH_SIZE, "5"); } + @Test + public void testRequestRateLimited() { + testRunner.setProperty(ConsumeSlack.CHANNEL_IDS, "cid1,cid2"); + final Message message = createMessage("U12345", "Hello world", "1683903832.350"); + client.addHistoryResponse(noMore(createSuccessfulHistoryResponse(message))); + + testRunner.run(1, false, true); + testRunner.assertAllFlowFilesTransferred(ConsumeSlack.REL_SUCCESS, 1); + testRunner.clearTransferState(); + + // Create another HttpResponse because each response can only be read once. + client.addHistoryResponse(noMore(createSuccessfulHistoryResponse(message))); + // Set processor to be in rate limited state, therefore it will process 0 flowfiles + processor.getRateLimit().retryAfter(Duration.ofSeconds(30)); + testRunner.run(1, true, false); + testRunner.assertAllFlowFilesTransferred(ConsumeSlack.REL_SUCCESS, 0); + } @Test public void testSuccessfullyReceivedSingleMessage() throws JsonProcessingException {