NIFI-13146 ConsumeSlack rate limit error mitigation (#8748)

Co-authored-by: Krisztina Zsihovszki <kzsihovszki@cloudera.com>
This commit is contained in:
krisztina-zsihovszki 2024-05-09 21:36:41 +02:00 committed by GitHub
parent 326df914bc
commit 5a3b47353e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 79 additions and 30 deletions

View File

@ -24,10 +24,12 @@ import com.slack.api.bolt.AppConfig;
import com.slack.api.methods.MethodsClient;
import com.slack.api.methods.SlackApiException;
import com.slack.api.methods.request.conversations.ConversationsHistoryRequest;
import com.slack.api.methods.request.conversations.ConversationsInfoRequest;
import com.slack.api.methods.request.conversations.ConversationsListRequest;
import com.slack.api.methods.request.conversations.ConversationsRepliesRequest;
import com.slack.api.methods.request.users.UsersInfoRequest;
import com.slack.api.methods.response.conversations.ConversationsHistoryResponse;
import com.slack.api.methods.response.conversations.ConversationsInfoResponse;
import com.slack.api.methods.response.conversations.ConversationsListResponse;
import com.slack.api.methods.response.conversations.ConversationsRepliesResponse;
import com.slack.api.methods.response.users.UsersInfoResponse;
@ -75,6 +77,8 @@ import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import static java.lang.String.format;
@PrimaryNodeOnly
@TriggerSerially
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@ -255,38 +259,54 @@ public class ConsumeSlack extends AbstractProcessor implements VerifiableProcess
.filter(s -> !s.isEmpty())
.forEach(channels::add);
// Fetch all channel ID's to have a name/ID channel mapping
Map<String, String> channelMapping = client.fetchChannelIds();
Map<String, String> channelMapping = new HashMap<>();
if (channelIdsProvidedOnly(channels)) {
//resolve the channel names by the specified channel IDs
for (String channelId : channels) {
String channelName = client.fetchChannelName(channelId);
getLogger().info("Resolved Channel ID {} to name {}", channelId, channelName);
channelMapping.put(channelId, channelName);
}
} else {
// Fetch all channel ID's to have a name/ID channel mapping
Map<String, String> allChannelNameIdMapping = client.fetchChannelIds();
for (final String channel : channels) {
String channelName;
String channelId;
final String channelIdOrName = channel.replace("#", "");
channelId = allChannelNameIdMapping.get(channelIdOrName);
if (channelId != null) {
channelName = channelIdOrName;
getLogger().info("Resolved Channel {} to ID {}", channelName, channelId);
} else {
channelId = channelIdOrName;
channelName = allChannelNameIdMapping
.keySet()
.stream()
.filter(entry -> channelIdOrName.equals(allChannelNameIdMapping.get(entry)))
.findFirst()
.orElse("");
getLogger().info("Resolved Channel ID {} to name {}", channelId, channelName);
}
channelMapping.put(channelId, channelName);
}
}
// Create ConsumeChannel objects for each Channel ID
final UsernameLookup usernameLookup = new UsernameLookup(client, getLogger());
final List<ConsumeChannel> consumeChannels = new ArrayList<>();
for (final String channel : channels) {
String channelName;
String channelId;
final String channelIdOrName = channel.replace("#", "");
channelId = channelMapping.get(channelIdOrName);
if(channelId != null) {
channelName = channelIdOrName;
getLogger().info("Resolved Channel {} to ID {}", channelName, channelId);
} else {
channelId = channelIdOrName;
channelName = channelMapping
.keySet()
.stream()
.filter(entry -> channelIdOrName.equals(channelMapping.get(entry)))
.findFirst()
.orElse("");
getLogger().info("Resolved Channel ID {} to name {}", channelId, channelName);
}
for (final Map.Entry<String, String> channel : channelMapping.entrySet()) {
final ConsumeChannel consumeChannel = new ConsumeChannel.Builder()
.channelId(channelId)
.channelName(channelName)
.channelId(channel.getKey())
.channelName(channel.getValue())
.batchSize(context.getProperty(BATCH_SIZE).asInteger())
.client(client)
.includeMessageBlocks(context.getProperty(INCLUDE_MESSAGE_BLOCKS).asBoolean())
@ -305,7 +325,6 @@ public class ConsumeSlack extends AbstractProcessor implements VerifiableProcess
return consumeChannels;
}
protected ConsumeSlackClient initializeClient(final App slackApp) {
slackApp.start();
return new DelegatingSlackClient(slackApp.client());
@ -364,6 +383,9 @@ public class ConsumeSlack extends AbstractProcessor implements VerifiableProcess
}
}
private static boolean channelIdsProvidedOnly(List<String> channels) {
return channels.stream().noneMatch(channelValue -> channelValue.contains("#"));
}
private void yieldOnException(final Throwable t, final String channelId, final ProcessContext context) {
if (SlackResponseUtil.isRateLimited(t)) {
@ -467,5 +489,21 @@ public class ConsumeSlack extends AbstractProcessor implements VerifiableProcess
throw new RuntimeException("Failed to determine Channel IDs: " + errorMessage);
}
}
@Override
public String fetchChannelName(String channelId) throws SlackApiException, IOException {
final ConversationsInfoRequest request = ConversationsInfoRequest.builder()
.channel(channelId)
.build();
final ConversationsInfoResponse response = delegate.conversationsInfo(request);
if (response.isOk()) {
return response.getChannel().getName();
} else {
final String errorMessage = SlackResponseUtil.getErrorMessage(response.getError(), response.getNeeded(), response.getProvided(), response.getWarning());
throw new RuntimeException(format("Failed to determine Channel name from ID [%s]: %s", channelId, errorMessage));
}
}
}
}

View File

@ -39,4 +39,6 @@ public interface ConsumeSlackClient {
Map<String, String> fetchChannelIds() throws SlackApiException, IOException;
String fetchChannelName(String channelId) throws SlackApiException, IOException;
}

View File

@ -46,6 +46,7 @@ import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -78,7 +79,7 @@ public class TestConsumeSlack {
@Test
public void testRequestRateLimited() {
testRunner.setProperty(ConsumeSlack.CHANNEL_IDS, "cid1,cid2");
testRunner.setProperty(ConsumeSlack.CHANNEL_IDS, "cid1,#cname2");
final Message message = createMessage("U12345", "Hello world", "1683903832.350");
client.addHistoryResponse(noMore(createSuccessfulHistoryResponse(message)));
@ -110,7 +111,7 @@ public class TestConsumeSlack {
assertEquals(message, outputMessages[0]);
outFlowFile1.assertAttributeEquals("slack.channel.id", "cid1");
outFlowFile1.assertAttributeEquals("slack.channel.name", "cname1");
outFlowFile1.assertAttributeEquals("slack.channel.name", "#cname1");
}
@ -134,7 +135,7 @@ public class TestConsumeSlack {
assertArrayEquals(expectedMessages, outputMessages);
outFlowFile1.assertAttributeEquals("slack.channel.id", "cid1");
outFlowFile1.assertAttributeEquals("slack.channel.name", "cname1");
outFlowFile1.assertAttributeEquals("slack.channel.name", "#cname1");
}
@ -583,10 +584,18 @@ public class TestConsumeSlack {
@Override
public Map<String, String> fetchChannelIds() {
final Map<String, String> nameIdMapping = new HashMap<String, String>();
nameIdMapping.put("cname1", "cid1");
nameIdMapping.put("#cname1", "cid1");
nameIdMapping.put("#cname2", "cid2");
return nameIdMapping;
}
@Override
public String fetchChannelName(String channelId) {
Map<String, String> invertedMap = fetchChannelIds().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey));
return invertedMap.get(channelId);
}
private void checkRateLimit() {
final int seconds = retryAfterSeconds.getAndSet(0);
if (seconds <= 0) {