NIFI-9558: ConnectWebSocket leaks connections and duplicates FlowFiles in incoming connection mode (new PR)

This closes #6176.

Signed-off-by: Tamas Palfy <tpalfy@apache.org>
This commit is contained in:
Lehel 2022-01-13 00:15:00 +01:00 committed by Tamas Palfy
parent 328d38facb
commit 46bb7d153d
4 changed files with 23 additions and 25 deletions

View File

@ -185,6 +185,17 @@ public abstract class AbstractWebSocketGatewayProcessor extends AbstractSessionF
} }
if (!isProcessorRegisteredToService()) { if (!isProcessorRegisteredToService()) {
register(context);
} else if (webSocketService instanceof WebSocketClientService && context.hasIncomingConnection()) {
// Deregister processor to close previous sessions when flowfile is provided.
deregister();
register(context);
}
context.yield();//nothing really to do here since handling WebSocket messages is done at ControllerService.
}
private void register(ProcessContext context) {
try { try {
registerProcessorToService(context, webSocketService -> onWebSocketServiceReady(webSocketService, context)); registerProcessorToService(context, webSocketService -> onWebSocketServiceReady(webSocketService, context));
} catch (IOException | WebSocketConfigurationException e) { } catch (IOException | WebSocketConfigurationException e) {
@ -193,18 +204,6 @@ public abstract class AbstractWebSocketGatewayProcessor extends AbstractSessionF
context.yield(); context.yield();
throw new ProcessException("Failed to register processor to WebSocket service due to: " + e, e); throw new ProcessException("Failed to register processor to WebSocket service due to: " + e, e);
} }
} else if (context.hasIncomingConnection()) {
try {
onWebSocketServiceReady(webSocketService, context);
} catch (IOException e) {
deregister();
context.yield();
throw new ProcessException("Failed to renew session and connect to WebSocket service due to: " + e, e);
}
}
context.yield();//nothing really to do here since handling WebSocket messages is done at ControllerService.
} }

View File

@ -48,7 +48,9 @@ import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.
@TriggerSerially @TriggerSerially
@CapabilityDescription("Acts as a WebSocket client endpoint to interact with a remote WebSocket server." + @CapabilityDescription("Acts as a WebSocket client endpoint to interact with a remote WebSocket server." +
" FlowFiles are transferred to downstream relationships according to received message types" + " FlowFiles are transferred to downstream relationships according to received message types" +
" as WebSocket client configured with this processor receives messages from remote WebSocket server.") " as WebSocket client configured with this processor receives messages from remote WebSocket server." +
" If a new flowfile is passed to the processor, the previous sessions will be closed and any data being" +
" sent will be aborted.")
@WritesAttributes({ @WritesAttributes({
@WritesAttribute(attribute = ATTR_WS_CS_ID, description = "WebSocket Controller Service id."), @WritesAttribute(attribute = ATTR_WS_CS_ID, description = "WebSocket Controller Service id."),
@WritesAttribute(attribute = ATTR_WS_SESSION_ID, description = "Established WebSocket session id."), @WritesAttribute(attribute = ATTR_WS_SESSION_ID, description = "Established WebSocket session id."),

View File

@ -35,7 +35,8 @@
<p> <p>
You can define custom websocket headers in the incoming flowfile as additional attributes. The attribute key You can define custom websocket headers in the incoming flowfile as additional attributes. The attribute key
shall start with "header." and continue with they header key. For example: "header.Authorization". The attribute shall start with "header." and continue with they header key. For example: "header.Authorization". The attribute
value will be the corresponding header value. value will be the corresponding header value. If a new flowfile is passed to the processor, the previous sessions will be closed,
and any data being sent will be aborted.
<ol> <ol>
<li>header.Autorization | Basic base64UserNamePassWord</li> <li>header.Autorization | Basic base64UserNamePassWord</li>
<li>header.Content-Type | application, audio, example</li> <li>header.Content-Type | application, audio, example</li>

View File

@ -44,7 +44,6 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@ -52,7 +51,7 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
public class TestConnectWebSocket extends TestListenWebSocket { class TestConnectWebSocket extends TestListenWebSocket {
@Test @Test
public void testSuccess() throws Exception { public void testSuccess() throws Exception {
@ -126,7 +125,7 @@ public class TestConnectWebSocket extends TestListenWebSocket {
} }
@Test @Test
public void testDynamicUrlsParsedFromFlowFileAndAbleToConnect() throws InitializationException { void testDynamicUrlsParsedFromFlowFileAndAbleToConnect() throws InitializationException {
// Start websocket server // Start websocket server
final int port = NetworkUtils.availablePort(); final int port = NetworkUtils.availablePort();
TestRunner webSocketListener = getListenWebSocket(port); TestRunner webSocketListener = getListenWebSocket(port);
@ -163,9 +162,6 @@ public class TestConnectWebSocket extends TestListenWebSocket {
final List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(ConnectWebSocket.REL_CONNECTED); final List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(ConnectWebSocket.REL_CONNECTED);
assertEquals(1, flowFilesForRelationship.size()); assertEquals(1, flowFilesForRelationship.size());
final AssertionError assertionError = assertThrows(AssertionError.class, () -> runner.run(1));
assertTrue(assertionError.getCause().getLocalizedMessage().contains("Failed to renew session and connect to WebSocket service"));
runner.stop(); runner.stop();
webSocketListener.stop(); webSocketListener.stop();
} }