NIFI-11535: Transfer ConnectWebsocket connection configuration FlowFile to relationships

Also moved dto and util packages under org.apache.nifi.websocket.jetty

This closes #7246.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Lehel 2023-05-15 15:52:12 +02:00 committed by Peter Turcsanyi
parent fcf2446bb5
commit 335365874a
No known key found for this signature in database
GPG Key ID: 55A813F1C3E553DC
7 changed files with 67 additions and 16 deletions

View File

@ -77,6 +77,18 @@ public abstract class AbstractWebSocketGatewayProcessor extends AbstractSessionF
.description("The WebSocket binary message output")
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFile holding connection configuration attributes (like URL or HTTP headers) in case of successful connection")
.autoTerminateDefault(true)
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFile holding connection configuration attributes (like URL or HTTP headers) in case of connection failure")
.autoTerminateDefault(true)
.build();
static Set<Relationship> getAbstractRelationships() {
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_CONNECTED);
@ -130,8 +142,11 @@ public abstract class AbstractWebSocketGatewayProcessor extends AbstractSessionF
final FlowFile flowFile = session.get();
try {
webSocketClientService.connect(endpointId, flowFile.getAttributes());
} finally {
session.remove(flowFile);
session.transfer(flowFile, REL_SUCCESS);
session.commitAsync();
} catch (Exception e) {
getLogger().error("Websocket connection failure", e);
session.transfer(flowFile, REL_FAILURE);
session.commitAsync();
}
} else {

View File

@ -89,6 +89,8 @@ public class ConnectWebSocket extends AbstractWebSocketGatewayProcessor {
descriptors = Collections.unmodifiableList(innerDescriptorsList);
final Set<Relationship> innerRelationshipsSet = getAbstractRelationships();
innerRelationshipsSet.add(REL_SUCCESS);
innerRelationshipsSet.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(innerRelationshipsSet);
}

View File

@ -136,17 +136,9 @@ class TestConnectWebSocket extends TestListenWebSocket {
final String serviceId = "ws-service";
final String endpointId = "client-1";
Map<String, String> attributes = new HashMap<>();
attributes.put("dynamicUrlPart", "test");
MockFlowFile flowFile = new MockFlowFile(1L);
flowFile.putAttributes(attributes);
MockFlowFile flowFile = getFlowFile();
runner.enqueue(flowFile);
attributes.put("dynamicUrlPart", "test2");
MockFlowFile flowFileWithWrongUrl = new MockFlowFile(2L);
flowFileWithWrongUrl.putAttributes(attributes);
runner.enqueue(flowFileWithWrongUrl);
JettyWebSocketClient service = new JettyWebSocketClient();
@ -162,10 +154,44 @@ class TestConnectWebSocket extends TestListenWebSocket {
final List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(ConnectWebSocket.REL_CONNECTED);
assertEquals(1, flowFilesForRelationship.size());
final List<MockFlowFile> flowFilesForSuccess = runner.getFlowFilesForRelationship(ConnectWebSocket.REL_SUCCESS);
assertEquals(1, flowFilesForSuccess.size());
runner.stop();
webSocketListener.stop();
}
@Test
void testDynamicUrlsParsedFromFlowFileButNotAbleToConnect() throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(ConnectWebSocket.class);
final String serviceId = "ws-service";
final String endpointId = "client-1";
MockFlowFile flowFile = getFlowFile();
runner.enqueue(flowFile);
JettyWebSocketClient service = new JettyWebSocketClient();
runner.addControllerService(serviceId, service);
runner.setProperty(service, JettyWebSocketClient.WS_URI, "ws://localhost/${dynamicUrlPart}");
runner.enableControllerService(service);
runner.setProperty(ConnectWebSocket.PROP_WEBSOCKET_CLIENT_SERVICE, serviceId);
runner.setProperty(ConnectWebSocket.PROP_WEBSOCKET_CLIENT_ID, endpointId);
runner.run(1, false);
final List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(ConnectWebSocket.REL_CONNECTED);
assertEquals(0, flowFilesForRelationship.size());
final List<MockFlowFile> flowFilesForSuccess = runner.getFlowFilesForRelationship(ConnectWebSocket.REL_FAILURE);
assertEquals(1, flowFilesForSuccess.size());
runner.stop();
}
private TestRunner getListenWebSocket(final int port) throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(ListenWebSocket.class);
@ -180,4 +206,12 @@ class TestConnectWebSocket extends TestListenWebSocket {
return runner;
}
private MockFlowFile getFlowFile() {
Map<String, String> attributes = new HashMap<>();
attributes.put("dynamicUrlPart", "test");
MockFlowFile flowFile = new MockFlowFile(1L);
flowFile.putAttributes(attributes);
return flowFile;
}
}

View File

@ -16,7 +16,7 @@
*/
package org.apache.nifi.websocket.jetty;
import dto.SessionInfo;
import org.apache.nifi.websocket.jetty.dto.SessionInfo;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
@ -42,7 +42,7 @@ import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import util.HeaderMapExtractor;
import org.apache.nifi.websocket.jetty.util.HeaderMapExtractor;
import java.io.IOException;
import java.net.URI;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dto;
package org.apache.nifi.websocket.jetty.dto;
import java.util.Map;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package util;
package org.apache.nifi.websocket.jetty.util;
import org.apache.nifi.util.StringUtils;

View File

@ -17,7 +17,7 @@
package org.apache.nifi.websocket.util;
import org.junit.jupiter.api.Test;
import util.HeaderMapExtractor;
import org.apache.nifi.websocket.jetty.util.HeaderMapExtractor;
import java.util.Arrays;
import java.util.Collections;