NIFI-12427 Add channel name attribute to ConsumeSlack

This closes #8078

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Pierre Villard 2023-11-29 16:35:32 +01:00 committed by exceptionfactory
parent eeb2b1a644
commit d0dd4e03e0
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
3 changed files with 44 additions and 23 deletions

View File

@ -68,7 +68,6 @@ import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
@ -239,37 +238,45 @@ public class ConsumeSlack extends AbstractProcessor implements VerifiableProcess
final ConsumeSlackClient client = initializeClient(slackApp);
// Split channel ID's by commas and trim any white space
final List<String> channelIds = new ArrayList<>();
final String channelIdsValue = context.getProperty(CHANNEL_IDS).getValue();
Arrays.stream(channelIdsValue.split(","))
final List<String> channels = new ArrayList<>();
final String channelsValue = context.getProperty(CHANNEL_IDS).getValue();
Arrays.stream(channelsValue.split(","))
.map(String::trim)
.filter(s -> !s.isEmpty())
.forEach(channelIds::add);
.forEach(channels::add);
// If any of the Channel ID's is a channel name instead, fetch all channel ID's and replace names with ID's
final boolean lookupChannels = channelIds.stream().anyMatch(id -> id.startsWith("#"));
if (lookupChannels) {
final Map<String, String> channelMapping = client.fetchChannelIds();
final ListIterator<String> channelItr = channelIds.listIterator();
while (channelItr.hasNext()) {
final String channelIdOrName = channelItr.next().replace("#", "");
final String resolved = channelMapping.get(channelIdOrName);
if (resolved != null) {
channelItr.remove();
channelItr.add(resolved);
getLogger().info("Resolved Channel {} to ID {}", channelIdOrName, resolved);
}
}
}
// Fetch all channel ID's to have a name/ID channel mapping
Map<String, String> channelMapping = client.fetchChannelIds();
// Create ConsumeChannel objects for each Channel ID
final UsernameLookup usernameLookup = new UsernameLookup(client, getLogger());
final List<ConsumeChannel> consumeChannels = new ArrayList<>();
for (final String channelId : channelIds) {
for (final String channel : channels) {
String channelName;
String channelId;
final String channelIdOrName = channel.replace("#", "");
channelId = channelMapping.get(channelIdOrName);
if(channelId != null) {
channelName = channelIdOrName;
getLogger().info("Resolved Channel {} to ID {}", channelName, channelId);
} else {
channelId = channelIdOrName;
channelName = channelMapping
.keySet()
.stream()
.filter(entry -> channelIdOrName.equals(channelMapping.get(entry)))
.findFirst()
.orElse("");
getLogger().info("Resolved Channel ID {} to name {}", channelId, channelName);
}
final ConsumeChannel consumeChannel = new ConsumeChannel.Builder()
.channelId(channelId)
.channelName(channelName)
.batchSize(context.getProperty(BATCH_SIZE).asInteger())
.client(client)
.includeMessageBlocks(context.getProperty(INCLUDE_MESSAGE_BLOCKS).asBoolean())

View File

@ -61,6 +61,7 @@ public class ConsumeChannel {
private final ConsumeSlackClient client;
private final String channelId;
private final String channelName;
private final int batchSize;
private final long replyMonitorFrequencyMillis;
private final long replyMonitorWindowMillis;
@ -80,6 +81,7 @@ public class ConsumeChannel {
private ConsumeChannel(final Builder builder) {
this.client = builder.client;
this.channelId = builder.channelId;
this.channelName = builder.channelName;
this.batchSize = builder.batchSize;
this.replyMonitorFrequencyMillis = builder.replyMonitorFrequencyMillis;
this.replyMonitorWindowMillis = builder.replyMonitorWindowMillis;
@ -545,6 +547,7 @@ public class ConsumeChannel {
// Determine attributes for outbound FlowFile
final Map<String, String> attributes = new HashMap<>();
attributes.put("slack.channel.id", channelId);
attributes.put("slack.channel.name", channelName);
attributes.put("slack.message.count", Integer.toString(messageCount));
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
@ -698,6 +701,7 @@ public class ConsumeChannel {
public static class Builder {
private ConsumeSlackClient client;
private String channelId;
private String channelName;
private boolean includeMessageBlocks;
private boolean resolveUsernames;
private int batchSize = 50;
@ -713,6 +717,11 @@ public class ConsumeChannel {
return this;
}
public Builder channelName(final String channelName) {
this.channelName = channelName;
return this;
}
public Builder client(final ConsumeSlackClient client) {
this.client = client;
return this;
@ -827,6 +836,7 @@ public class ConsumeChannel {
return continuePolling;
}
@Override
public boolean isMore() {
return isMore;
}

View File

@ -92,6 +92,7 @@ public class TestConsumeSlack {
assertEquals(message, outputMessages[0]);
outFlowFile1.assertAttributeEquals("slack.channel.id", "cid1");
outFlowFile1.assertAttributeEquals("slack.channel.name", "cname1");
}
@ -115,6 +116,7 @@ public class TestConsumeSlack {
assertArrayEquals(expectedMessages, outputMessages);
outFlowFile1.assertAttributeEquals("slack.channel.id", "cid1");
outFlowFile1.assertAttributeEquals("slack.channel.name", "cname1");
}
@ -562,7 +564,9 @@ public class TestConsumeSlack {
@Override
public Map<String, String> fetchChannelIds() {
return Collections.emptyMap();
final Map<String, String> nameIdMapping = new HashMap<String, String>();
nameIdMapping.put("cname1", "cid1");
return nameIdMapping;
}
private void checkRateLimit() {