diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java index fb224bbbea..df1c6913ed 100644 --- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java @@ -16,7 +16,6 @@ */ package org.apache.nifi.websocket.jetty; -import org.apache.nifi.websocket.jetty.dto.SessionInfo; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnDisabled; @@ -36,6 +35,8 @@ import org.apache.nifi.util.StringUtils; import org.apache.nifi.websocket.WebSocketClientService; import org.apache.nifi.websocket.WebSocketConfigurationException; import org.apache.nifi.websocket.WebSocketMessageRouter; +import org.apache.nifi.websocket.jetty.dto.SessionInfo; +import org.apache.nifi.websocket.jetty.util.HeaderMapExtractor; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.HttpProxy; import org.eclipse.jetty.client.dynamic.HttpClientTransportDynamic; @@ -46,7 +47,6 @@ import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; import org.eclipse.jetty.websocket.client.WebSocketClient; -import org.apache.nifi.websocket.jetty.util.HeaderMapExtractor; import javax.net.ssl.SSLContext; import java.io.IOException; @@ -65,6 +65,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.ReentrantLock; @Tags({"WebSocket", "Jetty", "client"}) @@ -165,7 +166,7 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService implemen .displayName("Custom Authorization") .description( "Configures a custom HTTP Authorization Header as described in RFC 7235 Section 4.2." + - " Setting a custom Authorization Header excludes configuring the User Name and User Password properties for Basic Authentication.") + " Setting a custom Authorization Header excludes configuring the User Name and User Password properties for Basic Authentication.") .required(false) .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) @@ -191,6 +192,8 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService implemen .addValidator(StandardValidators.PORT_VALIDATOR) .build(); + private static final int INITIAL_BACKOFF_MILLIS = 100; + private static final int MAXIMUM_BACKOFF_MILLIS = 3200; private static final List properties; static { @@ -215,6 +218,7 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService implemen private WebSocketClient client; private URI webSocketUri; private long connectionTimeoutMillis; + private int connectCount; private volatile ScheduledExecutorService sessionMaintenanceScheduler; private ConfigurationContext configurationContext; protected String authorizationHeader; @@ -229,6 +233,8 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService implemen public void startClient(final ConfigurationContext context) throws Exception { configurationContext = context; + connectCount = configurationContext.getProperty(CONNECTION_ATTEMPT_COUNT).evaluateAttributeExpressions().asInteger(); + final HttpClient httpClient; final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class); if (sslContextService == null) { @@ -370,30 +376,49 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService implemen request.setHeader(HttpHeader.AUTHORIZATION.asString(), authorizationHeader); } - final int connectCount = configurationContext.getProperty(CONNECTION_ATTEMPT_COUNT).evaluateAttributeExpressions().asInteger(); + final Session session = attemptConnection(listener, request, connectCount); - Session session = null; - for (int i = 0; i < connectCount; i++) { - final Future connect = createWebsocketSession(listener, request); - getLogger().info("Connecting to : {}", webSocketUri); - try { - session = connect.get(connectionTimeoutMillis, TimeUnit.MILLISECONDS); - break; - } catch (Exception e) { - if (i == connectCount - 1) { - throw new IOException("Failed to connect " + webSocketUri + " due to: " + e, e); - } else { - getLogger().warn("Failed to connect to {}, reconnection attempt {}", webSocketUri, i + 1); - } - } - } getLogger().info("Connected, session={}", session); activeSessions.put(clientId, new SessionInfo(listener.getSessionId(), flowFileAttributes)); } finally { connectionLock.unlock(); } + } + private Session attemptConnection(RoutingWebSocketListener listener, ClientUpgradeRequest request, int connectCount) throws IOException { + int backoffMillis = INITIAL_BACKOFF_MILLIS; + int backoffJitterMillis; + for (int i = 0; i < connectCount; i++) { + backoffJitterMillis = (int) (INITIAL_BACKOFF_MILLIS * getBackoffJitter(-0.2, 0.2)); + final Future connect = createWebsocketSession(listener, request); + getLogger().info("Connecting to : {}", webSocketUri); + try { + return connect.get(connectionTimeoutMillis, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + getLogger().warn("Connection attempt to {} timed out", webSocketUri); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + final String errorMessage = String.format("Thread was interrupted while attempting to connect to %s", webSocketUri); + throw new ProcessException(errorMessage, e); + } catch (Exception e) { + getLogger().warn("Failed to connect to {}, reconnection attempt {}", webSocketUri, i + 1, e); + } + + if (i < connectCount - 1) { + final int sleepTime = backoffMillis + backoffJitterMillis; + try { + getLogger().info("Sleeping {} ms before new connection attempt.", sleepTime); + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + final String errorMessage = String.format("Thread was interrupted while reconnecting to %s with %s backoffMillis", webSocketUri, sleepTime); + throw new ProcessException(errorMessage, e); + } + backoffMillis = Math.min(backoffMillis * 2, MAXIMUM_BACKOFF_MILLIS); + } + } + throw new IOException("Failed to connect " + webSocketUri + " after " + connectCount + " attempts"); } Future createWebsocketSession(RoutingWebSocketListener listener, ClientUpgradeRequest request) throws IOException { @@ -453,4 +478,8 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService implemen policy.setMaxTextMessageSize(maxTextMessageSize); policy.setMaxBinaryMessageSize(maxBinaryMessageSize); } + + public double getBackoffJitter(final double min, final double max) { + return Math.random() * (max - min) + min; + } }