From 335365874a249dfb3c1044cf39bfbc584085d83a Mon Sep 17 00:00:00 2001 From: Lehel Date: Mon, 15 May 2023 15:52:12 +0200 Subject: [PATCH] NIFI-11535: Transfer ConnectWebsocket connection configuration FlowFile to relationships Also moved dto and util packages under org.apache.nifi.websocket.jetty This closes #7246. Signed-off-by: Peter Turcsanyi --- .../AbstractWebSocketGatewayProcessor.java | 19 ++++++- .../websocket/ConnectWebSocket.java | 2 + .../websocket/TestConnectWebSocket.java | 52 +++++++++++++++---- .../websocket/jetty/JettyWebSocketClient.java | 4 +- .../websocket/jetty}/dto/SessionInfo.java | 2 +- .../jetty}/util/HeaderMapExtractor.java | 2 +- .../util/HeaderMapExtractorTest.java | 2 +- 7 files changed, 67 insertions(+), 16 deletions(-) rename nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/{ => org/apache/nifi/websocket/jetty}/dto/SessionInfo.java (94%) rename nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/{ => org/apache/nifi/websocket/jetty}/util/HeaderMapExtractor.java (95%) diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java index 9824c4a6d2..25ea1da832 100644 --- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java @@ -77,6 +77,18 @@ public abstract class AbstractWebSocketGatewayProcessor extends AbstractSessionF .description("The WebSocket binary message output") .build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFile holding connection configuration attributes (like URL or HTTP headers) in case of successful connection") + .autoTerminateDefault(true) + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFile holding connection configuration attributes (like URL or HTTP headers) in case of connection failure") + .autoTerminateDefault(true) + .build(); + static Set getAbstractRelationships() { final Set relationships = new HashSet<>(); relationships.add(REL_CONNECTED); @@ -130,8 +142,11 @@ public abstract class AbstractWebSocketGatewayProcessor extends AbstractSessionF final FlowFile flowFile = session.get(); try { webSocketClientService.connect(endpointId, flowFile.getAttributes()); - } finally { - session.remove(flowFile); + session.transfer(flowFile, REL_SUCCESS); + session.commitAsync(); + } catch (Exception e) { + getLogger().error("Websocket connection failure", e); + session.transfer(flowFile, REL_FAILURE); session.commitAsync(); } } else { diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/ConnectWebSocket.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/ConnectWebSocket.java index be53854d7b..7e2e142014 100644 --- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/ConnectWebSocket.java +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/ConnectWebSocket.java @@ -89,6 +89,8 @@ public class ConnectWebSocket extends AbstractWebSocketGatewayProcessor { descriptors = Collections.unmodifiableList(innerDescriptorsList); final Set innerRelationshipsSet = getAbstractRelationships(); + innerRelationshipsSet.add(REL_SUCCESS); + innerRelationshipsSet.add(REL_FAILURE); relationships = Collections.unmodifiableSet(innerRelationshipsSet); } diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestConnectWebSocket.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestConnectWebSocket.java index 01c7b8079b..5b6d644749 100644 --- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestConnectWebSocket.java +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestConnectWebSocket.java @@ -136,17 +136,9 @@ class TestConnectWebSocket extends TestListenWebSocket { final String serviceId = "ws-service"; final String endpointId = "client-1"; - Map attributes = new HashMap<>(); - attributes.put("dynamicUrlPart", "test"); - MockFlowFile flowFile = new MockFlowFile(1L); - flowFile.putAttributes(attributes); + MockFlowFile flowFile = getFlowFile(); runner.enqueue(flowFile); - attributes.put("dynamicUrlPart", "test2"); - MockFlowFile flowFileWithWrongUrl = new MockFlowFile(2L); - flowFileWithWrongUrl.putAttributes(attributes); - runner.enqueue(flowFileWithWrongUrl); - JettyWebSocketClient service = new JettyWebSocketClient(); @@ -162,10 +154,44 @@ class TestConnectWebSocket extends TestListenWebSocket { final List flowFilesForRelationship = runner.getFlowFilesForRelationship(ConnectWebSocket.REL_CONNECTED); assertEquals(1, flowFilesForRelationship.size()); + final List flowFilesForSuccess = runner.getFlowFilesForRelationship(ConnectWebSocket.REL_SUCCESS); + assertEquals(1, flowFilesForSuccess.size()); + runner.stop(); webSocketListener.stop(); } + @Test + void testDynamicUrlsParsedFromFlowFileButNotAbleToConnect() throws InitializationException { + final TestRunner runner = TestRunners.newTestRunner(ConnectWebSocket.class); + + final String serviceId = "ws-service"; + final String endpointId = "client-1"; + + MockFlowFile flowFile = getFlowFile(); + runner.enqueue(flowFile); + + JettyWebSocketClient service = new JettyWebSocketClient(); + + + runner.addControllerService(serviceId, service); + runner.setProperty(service, JettyWebSocketClient.WS_URI, "ws://localhost/${dynamicUrlPart}"); + runner.enableControllerService(service); + + runner.setProperty(ConnectWebSocket.PROP_WEBSOCKET_CLIENT_SERVICE, serviceId); + runner.setProperty(ConnectWebSocket.PROP_WEBSOCKET_CLIENT_ID, endpointId); + + runner.run(1, false); + + final List flowFilesForRelationship = runner.getFlowFilesForRelationship(ConnectWebSocket.REL_CONNECTED); + assertEquals(0, flowFilesForRelationship.size()); + + final List flowFilesForSuccess = runner.getFlowFilesForRelationship(ConnectWebSocket.REL_FAILURE); + assertEquals(1, flowFilesForSuccess.size()); + + runner.stop(); + } + private TestRunner getListenWebSocket(final int port) throws InitializationException { final TestRunner runner = TestRunners.newTestRunner(ListenWebSocket.class); @@ -180,4 +206,12 @@ class TestConnectWebSocket extends TestListenWebSocket { return runner; } + + private MockFlowFile getFlowFile() { + Map attributes = new HashMap<>(); + attributes.put("dynamicUrlPart", "test"); + MockFlowFile flowFile = new MockFlowFile(1L); + flowFile.putAttributes(attributes); + return flowFile; + } } diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java index 83fa91090b..712b2c1ce6 100644 --- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java @@ -16,7 +16,7 @@ */ package org.apache.nifi.websocket.jetty; -import dto.SessionInfo; +import org.apache.nifi.websocket.jetty.dto.SessionInfo; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnDisabled; @@ -42,7 +42,7 @@ import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; import org.eclipse.jetty.websocket.client.WebSocketClient; -import util.HeaderMapExtractor; +import org.apache.nifi.websocket.jetty.util.HeaderMapExtractor; import java.io.IOException; import java.net.URI; diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/dto/SessionInfo.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/dto/SessionInfo.java similarity index 94% rename from nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/dto/SessionInfo.java rename to nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/dto/SessionInfo.java index 7caa2f5ff9..11f379c9e0 100644 --- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/dto/SessionInfo.java +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/dto/SessionInfo.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package dto; +package org.apache.nifi.websocket.jetty.dto; import java.util.Map; diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/util/HeaderMapExtractor.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/util/HeaderMapExtractor.java similarity index 95% rename from nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/util/HeaderMapExtractor.java rename to nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/util/HeaderMapExtractor.java index f29db9f416..45c09e57ce 100644 --- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/util/HeaderMapExtractor.java +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/util/HeaderMapExtractor.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package util; +package org.apache.nifi.websocket.jetty.util; import org.apache.nifi.util.StringUtils; diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/util/HeaderMapExtractorTest.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/util/HeaderMapExtractorTest.java index 28d57e5021..211d22fdb1 100644 --- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/util/HeaderMapExtractorTest.java +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/util/HeaderMapExtractorTest.java @@ -17,7 +17,7 @@ package org.apache.nifi.websocket.util; import org.junit.jupiter.api.Test; -import util.HeaderMapExtractor; +import org.apache.nifi.websocket.jetty.util.HeaderMapExtractor; import java.util.Arrays; import java.util.Collections;