diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java index 0262d989e4..c749456004 100644 --- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java @@ -147,6 +147,10 @@ public abstract class AbstractWebSocketGatewayProcessor extends AbstractSessionF @OnStopped public void onStopped(final ProcessContext context) throws IOException { + deregister(); + } + + private void deregister() { if (webSocketService == null) { return; } @@ -170,11 +174,14 @@ public abstract class AbstractWebSocketGatewayProcessor extends AbstractSessionF try { registerProcessorToService(context, webSocketService -> onWebSocketServiceReady(webSocketService)); } catch (IOException|WebSocketConfigurationException e) { + // Deregister processor if it failed so that it can retry next onTrigger. + deregister(); + context.yield(); throw new ProcessException("Failed to register processor to WebSocket service due to: " + e, e); } } - context.yield();//nothing really to do here since threading managed by smtp server sessions + context.yield();//nothing really to do here since handling WebSocket messages is done at ControllerService. } diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java index 3d19eac5ec..281b0160ee 100644 --- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java @@ -90,7 +90,12 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService implemen public static final PropertyDescriptor SESSION_MAINTENANCE_INTERVAL = new PropertyDescriptor.Builder() .name("session-maintenance-interval") .displayName("Session Maintenance Interval") - .description("The interval between session maintenance activities.") + .description("The interval between session maintenance activities." + + " A WebSocket session established with a WebSocket server can be terminated due to different reasons" + + " including restarting the WebSocket server or timing out inactive sessions." + + " This session maintenance activity is periodically executed in order to reconnect those lost sessions," + + " so that a WebSocket client can reuse the same session id transparently after it reconnects successfully. " + + " The maintenance activity is executed until corresponding processors or this controller service is stopped.") .required(true) .expressionLanguageSupported(true) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) @@ -238,10 +243,11 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService implemen } final String sessionId = activeSessions.get(clientId); - // If this session is stil alive, do nothing. + // If this session is still alive, do nothing. if (!router.containsSession(sessionId)) { // This session is no longer active, reconnect it. // If it fails, the sessionId will remain in activeSessions, and retries later. + // This reconnect attempt is continued until user explicitly stops a processor or this controller service. connect(clientId, sessionId); } }