diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/PutWebSocket.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/PutWebSocket.java index beef839e7d..5a438b59a9 100644 --- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/PutWebSocket.java +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/PutWebSocket.java @@ -17,6 +17,26 @@ package org.apache.nifi.processors.websocket; +import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_CS_ID; +import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_ENDPOINT_ID; +import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_FAILURE_DETAIL; +import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_LOCAL_ADDRESS; +import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_MESSAGE_TYPE; +import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_REMOTE_ADDRESS; +import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_SESSION_ID; +import static org.apache.nifi.websocket.WebSocketMessage.CHARSET_NAME; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.TriggerSerially; @@ -38,25 +58,6 @@ import org.apache.nifi.websocket.WebSocketConfigurationException; import org.apache.nifi.websocket.WebSocketMessage; import org.apache.nifi.websocket.WebSocketService; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_CS_ID; -import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_ENDPOINT_ID; -import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_FAILURE_DETAIL; -import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_LOCAL_ADDRESS; -import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_MESSAGE_TYPE; -import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_REMOTE_ADDRESS; -import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_SESSION_ID; -import static org.apache.nifi.websocket.WebSocketMessage.CHARSET_NAME; - @Tags({"WebSocket", "publish", "send"}) @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) @TriggerSerially @@ -76,7 +77,8 @@ public class PutWebSocket extends AbstractProcessor { public static final PropertyDescriptor PROP_WS_SESSION_ID = new PropertyDescriptor.Builder() .name("websocket-session-id") .displayName("WebSocket Session Id") - .description("A NiFi Expression to retrieve the session id.") + .description("A NiFi Expression to retrieve the session id. If not specified, a message will be " + + "sent to all connected WebSocket peers for the WebSocket controller service endpoint.") .required(true) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) .expressionLanguageSupported(true) @@ -166,8 +168,11 @@ public class PutWebSocket extends AbstractProcessor { .evaluateAttributeExpressions(flowfile).getValue(); final WebSocketMessage.Type messageType = WebSocketMessage.Type.valueOf(messageTypeStr); - if (StringUtils.isEmpty(sessionId) - || StringUtils.isEmpty(webSocketServiceId) + if (StringUtils.isEmpty(sessionId)) { + getLogger().debug("Specific SessionID not specified. Message will be broadcast to all connected clients."); + } + + if (StringUtils.isEmpty(webSocketServiceId) || StringUtils.isEmpty(webSocketServiceEndpoint)) { transferToFailure(processSession, flowfile, "Required WebSocket attribute was not found."); return; @@ -187,9 +192,14 @@ public class PutWebSocket extends AbstractProcessor { final byte[] messageContent = new byte[(int) flowfile.getSize()]; final long startSending = System.currentTimeMillis(); + final AtomicReference transitUri = new AtomicReference<>(); final Map attrs = new HashMap<>(); attrs.put(ATTR_WS_CS_ID, webSocketService.getIdentifier()); - attrs.put(ATTR_WS_SESSION_ID, sessionId); + + if (!StringUtils.isEmpty(sessionId)) { + attrs.put(ATTR_WS_SESSION_ID, sessionId); + } + attrs.put(ATTR_WS_ENDPOINT_ID, webSocketServiceEndpoint); attrs.put(ATTR_WS_MESSAGE_TYPE, messageTypeStr); @@ -211,14 +221,15 @@ public class PutWebSocket extends AbstractProcessor { attrs.put(ATTR_WS_LOCAL_ADDRESS, sender.getLocalAddress().toString()); attrs.put(ATTR_WS_REMOTE_ADDRESS, sender.getRemoteAddress().toString()); - - final FlowFile updatedFlowFile = processSession.putAllAttributes(flowfile, attrs); - final long transmissionMillis = System.currentTimeMillis() - startSending; - processSession.getProvenanceReporter().send(updatedFlowFile, sender.getTransitUri(), transmissionMillis); - - processSession.transfer(updatedFlowFile, REL_SUCCESS); + transitUri.set(sender.getTransitUri()); }); + final FlowFile updatedFlowFile = processSession.putAllAttributes(flowfile, attrs); + final long transmissionMillis = System.currentTimeMillis() - startSending; + processSession.getProvenanceReporter().send(updatedFlowFile, transitUri.get(), transmissionMillis); + + processSession.transfer(updatedFlowFile, REL_SUCCESS); + } catch (WebSocketConfigurationException|IllegalStateException|IOException e) { // WebSocketConfigurationException: If the corresponding WebSocketGatewayProcessor has been stopped. // IllegalStateException: Session is already closed or not found. @@ -235,5 +246,4 @@ public class PutWebSocket extends AbstractProcessor { return flowfile; } - } diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestPutWebSocket.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestPutWebSocket.java index a987fe45cd..52f6f2a28b 100644 --- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestPutWebSocket.java +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestPutWebSocket.java @@ -16,24 +16,6 @@ */ package org.apache.nifi.processors.websocket; -import org.apache.nifi.controller.ControllerService; -import org.apache.nifi.provenance.ProvenanceEventRecord; -import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.apache.nifi.websocket.AbstractWebSocketSession; -import org.apache.nifi.websocket.SendMessage; -import org.apache.nifi.websocket.WebSocketMessage; -import org.apache.nifi.websocket.WebSocketService; -import org.apache.nifi.websocket.WebSocketSession; -import org.junit.Test; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_CS_ID; import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_ENDPOINT_ID; import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_FAILURE_DETAIL; @@ -50,6 +32,24 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.nifi.websocket.AbstractWebSocketSession; +import org.apache.nifi.websocket.SendMessage; +import org.apache.nifi.websocket.WebSocketMessage; +import org.apache.nifi.websocket.WebSocketService; +import org.apache.nifi.websocket.WebSocketSession; +import org.junit.Test; + public class TestPutWebSocket { @@ -92,12 +92,12 @@ public class TestPutWebSocket { runner.run(); final List succeededFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SUCCESS); - assertEquals(0, succeededFlowFiles.size()); + //assertEquals(0, succeededFlowFiles.size()); //No longer valid test after NIFI-3318 since not specifying sessionid will send to all clients + assertEquals(1, succeededFlowFiles.size()); final List failedFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_FAILURE); - assertEquals(1, failedFlowFiles.size()); - final MockFlowFile failedFlowFile = failedFlowFiles.iterator().next(); - assertNotNull(failedFlowFile.getAttribute(ATTR_WS_FAILURE_DETAIL)); + //assertEquals(1, failedFlowFiles.size()); //No longer valid test after NIFI-3318 + assertEquals(0, failedFlowFiles.size()); final List provenanceEvents = runner.getProvenanceEvents(); assertEquals(0, provenanceEvents.size()); diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java index 057b33da43..e5034e1a12 100644 --- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java @@ -16,14 +16,15 @@ */ package org.apache.nifi.websocket; -import org.apache.nifi.processor.Processor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.processor.Processor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class WebSocketMessageRouter { private static final Logger logger = LoggerFactory.getLogger(WebSocketMessageRouter.class); private final String endpointId; @@ -101,8 +102,20 @@ public class WebSocketMessageRouter { } public void sendMessage(final String sessionId, final SendMessage sendMessage) throws IOException { - final WebSocketSession session = getSessionOrFail(sessionId); - sendMessage.send(session); + if (!StringUtils.isEmpty(sessionId)) { + final WebSocketSession session = getSessionOrFail(sessionId); + sendMessage.send(session); + } else { + //The sessionID is not specified so broadcast the message to all connected client sessions. + sessions.keySet().forEach(itrSessionId -> { + try { + final WebSocketSession session = getSessionOrFail(itrSessionId); + sendMessage.send(session); + } catch (IOException e) { + logger.warn("Failed to send message to session {} due to {}", itrSessionId, e, e); + } + }); + } } public void disconnect(final String sessionId, final String reason) throws IOException {