NIFI-12092 Add backoff parameters to JettyWebsocketClient reconnect

This closes #7761.

Signed-off-by: Tamas Palfy <tpalfy@apache.org>
This commit is contained in:
Lehel Boer 2023-09-20 01:55:24 +03:00 committed by Tamas Palfy
parent 711f2f1188
commit 7e4ca9365f
1 changed files with 48 additions and 19 deletions

View File

@ -16,7 +16,6 @@
*/
package org.apache.nifi.websocket.jetty;
import org.apache.nifi.websocket.jetty.dto.SessionInfo;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
@ -36,6 +35,8 @@ import org.apache.nifi.util.StringUtils;
import org.apache.nifi.websocket.WebSocketClientService;
import org.apache.nifi.websocket.WebSocketConfigurationException;
import org.apache.nifi.websocket.WebSocketMessageRouter;
import org.apache.nifi.websocket.jetty.dto.SessionInfo;
import org.apache.nifi.websocket.jetty.util.HeaderMapExtractor;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpProxy;
import org.eclipse.jetty.client.dynamic.HttpClientTransportDynamic;
@ -46,7 +47,6 @@ import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.apache.nifi.websocket.jetty.util.HeaderMapExtractor;
import javax.net.ssl.SSLContext;
import java.io.IOException;
@ -65,6 +65,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock;
@Tags({"WebSocket", "Jetty", "client"})
@ -165,7 +166,7 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService implemen
.displayName("Custom Authorization")
.description(
"Configures a custom HTTP Authorization Header as described in RFC 7235 Section 4.2." +
" Setting a custom Authorization Header excludes configuring the User Name and User Password properties for Basic Authentication.")
" Setting a custom Authorization Header excludes configuring the User Name and User Password properties for Basic Authentication.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@ -191,6 +192,8 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService implemen
.addValidator(StandardValidators.PORT_VALIDATOR)
.build();
private static final int INITIAL_BACKOFF_MILLIS = 100;
private static final int MAXIMUM_BACKOFF_MILLIS = 3200;
private static final List<PropertyDescriptor> properties;
static {
@ -215,6 +218,7 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService implemen
private WebSocketClient client;
private URI webSocketUri;
private long connectionTimeoutMillis;
private int connectCount;
private volatile ScheduledExecutorService sessionMaintenanceScheduler;
private ConfigurationContext configurationContext;
protected String authorizationHeader;
@ -229,6 +233,8 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService implemen
public void startClient(final ConfigurationContext context) throws Exception {
configurationContext = context;
connectCount = configurationContext.getProperty(CONNECTION_ATTEMPT_COUNT).evaluateAttributeExpressions().asInteger();
final HttpClient httpClient;
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class);
if (sslContextService == null) {
@ -370,30 +376,49 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService implemen
request.setHeader(HttpHeader.AUTHORIZATION.asString(), authorizationHeader);
}
final int connectCount = configurationContext.getProperty(CONNECTION_ATTEMPT_COUNT).evaluateAttributeExpressions().asInteger();
final Session session = attemptConnection(listener, request, connectCount);
Session session = null;
for (int i = 0; i < connectCount; i++) {
final Future<Session> connect = createWebsocketSession(listener, request);
getLogger().info("Connecting to : {}", webSocketUri);
try {
session = connect.get(connectionTimeoutMillis, TimeUnit.MILLISECONDS);
break;
} catch (Exception e) {
if (i == connectCount - 1) {
throw new IOException("Failed to connect " + webSocketUri + " due to: " + e, e);
} else {
getLogger().warn("Failed to connect to {}, reconnection attempt {}", webSocketUri, i + 1);
}
}
}
getLogger().info("Connected, session={}", session);
activeSessions.put(clientId, new SessionInfo(listener.getSessionId(), flowFileAttributes));
} finally {
connectionLock.unlock();
}
}
private Session attemptConnection(RoutingWebSocketListener listener, ClientUpgradeRequest request, int connectCount) throws IOException {
int backoffMillis = INITIAL_BACKOFF_MILLIS;
int backoffJitterMillis;
for (int i = 0; i < connectCount; i++) {
backoffJitterMillis = (int) (INITIAL_BACKOFF_MILLIS * getBackoffJitter(-0.2, 0.2));
final Future<Session> connect = createWebsocketSession(listener, request);
getLogger().info("Connecting to : {}", webSocketUri);
try {
return connect.get(connectionTimeoutMillis, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
getLogger().warn("Connection attempt to {} timed out", webSocketUri);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
final String errorMessage = String.format("Thread was interrupted while attempting to connect to %s", webSocketUri);
throw new ProcessException(errorMessage, e);
} catch (Exception e) {
getLogger().warn("Failed to connect to {}, reconnection attempt {}", webSocketUri, i + 1, e);
}
if (i < connectCount - 1) {
final int sleepTime = backoffMillis + backoffJitterMillis;
try {
getLogger().info("Sleeping {} ms before new connection attempt.", sleepTime);
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
final String errorMessage = String.format("Thread was interrupted while reconnecting to %s with %s backoffMillis", webSocketUri, sleepTime);
throw new ProcessException(errorMessage, e);
}
backoffMillis = Math.min(backoffMillis * 2, MAXIMUM_BACKOFF_MILLIS);
}
}
throw new IOException("Failed to connect " + webSocketUri + " after " + connectCount + " attempts");
}
Future<Session> createWebsocketSession(RoutingWebSocketListener listener, ClientUpgradeRequest request) throws IOException {
@ -453,4 +478,8 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService implemen
policy.setMaxTextMessageSize(maxTextMessageSize);
policy.setMaxBinaryMessageSize(maxBinaryMessageSize);
}
public double getBackoffJitter(final double min, final double max) {
return Math.random() * (max - min) + min;
}
}