NIFI-12623: Expose ability to fetch User Details in ListenSlack and receive App Mention events

This closes #8258

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mark Payne 2024-01-16 17:14:14 -05:00 committed by exceptionfactory
parent a613c52437
commit aa837853d1
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
4 changed files with 148 additions and 7 deletions

View File

@ -17,7 +17,9 @@
package org.apache.nifi.processors.slack;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.slack.api.app_backend.events.payload.EventsApiPayload;
import com.slack.api.bolt.App;
@ -27,6 +29,8 @@ import com.slack.api.bolt.context.builtin.SlashCommandContext;
import com.slack.api.bolt.request.builtin.SlashCommandRequest;
import com.slack.api.bolt.response.Response;
import com.slack.api.bolt.socket_mode.SocketModeApp;
import com.slack.api.model.User;
import com.slack.api.model.event.AppMentionEvent;
import com.slack.api.model.event.FileSharedEvent;
import com.slack.api.model.event.MessageEvent;
import com.slack.api.model.event.MessageFileShareEvent;
@ -50,6 +54,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.slack.consume.UserDetailsLookup;
import java.io.IOException;
import java.io.OutputStream;
@ -87,6 +92,8 @@ public class ListenSlack extends AbstractProcessor {
static final AllowableValue RECEIVE_MESSAGE_EVENTS = new AllowableValue("Receive Message Events", "Receive Message Events",
"The Processor is to receive Slack Message Events");
static final AllowableValue RECEIVE_MENTION_EVENTS = new AllowableValue("Receive App Mention Events", "Receive App Mention Events",
"The Processor is to receive only slack messages that mention the bot user (App Mention Events)");
static final AllowableValue RECEIVE_COMMANDS = new AllowableValue("Receive Commands", "Receive Commands",
"The Processor is to receive Commands from Slack that are specific to your application. The Processor will not receive Message Events.");
@ -109,12 +116,24 @@ public class ListenSlack extends AbstractProcessor {
static final PropertyDescriptor EVENT_TYPE = new PropertyDescriptor.Builder()
.name("Event Type to Receive")
.description("Specifies whether the Processor should receive Slack Message Events or commands issued by users (e.g., /nifi do something)")
.description("Specifies the type of Event that the Processor should respond to")
.required(true)
.defaultValue(RECEIVE_MESSAGE_EVENTS.getValue())
.allowableValues(RECEIVE_MESSAGE_EVENTS, RECEIVE_COMMANDS)
.defaultValue(RECEIVE_MENTION_EVENTS.getValue())
.allowableValues(RECEIVE_MENTION_EVENTS, RECEIVE_MESSAGE_EVENTS, RECEIVE_COMMANDS)
.build();
final PropertyDescriptor RESOLVE_USER_DETAILS = new PropertyDescriptor.Builder()
.name("Resolve User Details")
.description("Specifies whether the Processor should lookup details about the Slack User who sent the received message. " +
"If true, the output JSON will contain an additional field named 'userDetails'. " +
"The 'user' field will still contain the ID of the user. In order to enable this capability, the Bot Token must be granted the 'users:read' " +
"and optionally the 'users.profile:read' Bot Token Scope. " +
"If the rate limit is exceeded when retrieving this information, the received message will be rejected and must be re-delivered.")
.required(true)
.defaultValue("false")
.allowableValues("true", "false")
.dependsOn(EVENT_TYPE, RECEIVE_MESSAGE_EVENTS, RECEIVE_MENTION_EVENTS)
.build();
static Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@ -123,13 +142,16 @@ public class ListenSlack extends AbstractProcessor {
private final TransferQueue<EventWrapper> eventTransferQueue = new LinkedTransferQueue<>();
private volatile SocketModeApp socketModeApp;
private volatile UserDetailsLookup userDetailsLookup;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return Arrays.asList(
APP_TOKEN,
BOT_TOKEN,
EVENT_TYPE);
EVENT_TYPE,
RESOLVE_USER_DETAILS);
}
@Override
@ -154,10 +176,17 @@ public class ListenSlack extends AbstractProcessor {
slackApp.event(MessageEvent.class, this::handleEvent);
slackApp.event(MessageFileShareEvent.class, this::handleEvent);
slackApp.event(FileSharedEvent.class, this::handleEvent);
} else if (context.getProperty(EVENT_TYPE).getValue().equals(RECEIVE_MENTION_EVENTS.getValue())) {
slackApp.event(AppMentionEvent.class, this::handleEvent);
// When there's an AppMention, we'll also get a MessageEvent. We need to handle this event, or we'll get warnings in the logs
// that no Event Handler is registered, and it will respond back to Slack with a 404. To avoid this, we just acknowledge the event.
slackApp.event(MessageEvent.class, (payload, ctx) -> ctx.ack());
} else {
slackApp.command(Pattern.compile(".*"), this::handleCommand);
}
userDetailsLookup = new UserDetailsLookup(userId -> slackApp.client().usersInfo(r -> r.user(userId)), getLogger());
socketModeApp = new SocketModeApp(appToken, slackApp);
socketModeApp.startAsync();
}
@ -210,7 +239,26 @@ public class ListenSlack extends AbstractProcessor {
try (final OutputStream out = session.write(flowFile);
final JsonGenerator generator = objectMapper.createGenerator(out)) {
generator.writeObject(messageEvent);
// If we need to resolve user details, we need a way to inject it into the JSON. Since we have an object model at this point,
// we serialize it to a string, then deserialize it back into a JsonNode, and then inject it into the JSON.
if (context.getProperty(RESOLVE_USER_DETAILS).asBoolean()) {
final String stringRepresentation = objectMapper.writeValueAsString(messageEvent);
final JsonNode jsonNode = objectMapper.readTree(stringRepresentation);
if (jsonNode.hasNonNull("user")) {
final String userId = jsonNode.get("user").asText();
final User userDetails = userDetailsLookup.getUserDetails(userId);
if (userDetails != null) {
final ObjectNode objectNode = (ObjectNode) jsonNode;
final String userDetailsJson = objectMapper.writeValueAsString(userDetails);
final JsonNode userDetailsNode = objectMapper.readTree(userDetailsJson);
objectNode.set("userDetails", userDetailsNode);
}
}
generator.writeTree(jsonNode);
} else {
generator.writeObject(messageEvent);
}
} catch (final IOException e) {
getLogger().error("Failed to write out Slack message", e);
session.remove(flowFile);

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.slack.consume;
import com.slack.api.methods.response.users.UsersInfoResponse;
import com.slack.api.model.User;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.slack.util.SlackResponseUtil;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class UserDetailsLookup {
private final ConcurrentMap<String, User> userIdToInfoMapping = new ConcurrentHashMap<>();
private final UserInfoClient client;
private final ComponentLog logger;
public UserDetailsLookup(final UserInfoClient client, final ComponentLog logger) {
this.client = client;
this.logger = logger;
}
public User getUserDetails(final String userId) {
final User cachedUserInfo = userIdToInfoMapping.get(userId);
if (cachedUserInfo != null) {
return cachedUserInfo;
}
try {
final UsersInfoResponse response = client.fetchUserInfo(userId);
if (response.isOk()) {
final User user = response.getUser();
userIdToInfoMapping.put(userId, user);
return user;
}
final String errorMessage = SlackResponseUtil.getErrorMessage(response.getError(), response.getNeeded(), response.getProvided(), response.getWarning());
logger.warn("Failed to retrieve user details for User ID {}: {}", userId, errorMessage);
return null;
} catch (final Exception e) {
if (SlackResponseUtil.isRateLimited(e)) {
logger.warn("Failed to retrieve user details for User ID {} because the Rate Limit has been exceeded", userId);
} else {
logger.warn("Failed to retrieve user details for User ID {}: {}", userId, e.getMessage(), e);
}
return null;
}
}
}

View File

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.slack.consume;
import com.slack.api.methods.SlackApiException;
import com.slack.api.methods.response.users.UsersInfoResponse;
import java.io.IOException;
public interface UserInfoClient {
UsersInfoResponse fetchUserInfo(String userId) throws IOException, SlackApiException;
}

View File

@ -32,8 +32,8 @@
</p>
<p>
This Processor may be used to listen for either Message Events, or Slack Commands. For example, you may wish to create
a Slack App that receives the <code>/nifi</code> command and when received, performs some task. The Processor does not
This Processor may be used to listen for Message Events, App Mention Events (when the bot user is mentioned in a message) or Slack Commands.
For example, you may wish to create a Slack App that receives the <code>/nifi</code> command and when received, performs some task. The Processor does not
allow listening for both Message Events and Commands, as the output format is very different for the two, and this
would lead to significant confusion. Instead, if there is a desire to consume both Message Events and Commands,
two ListenSlack Processors should be used - one for Messages and another the Commands.