NIFI-9444: Added reconnect property to JettyWebsocketClient.

Signed-off-by: Nathan Gough <thenatog@gmail.com>

This closes #5583.
This commit is contained in:
Lehel 2021-12-08 14:09:19 +01:00 committed by Nathan Gough
parent 97198e35a0
commit f5dccb5522
2 changed files with 136 additions and 7 deletions

View File

@ -101,6 +101,16 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService implemen
.defaultValue("3 sec") .defaultValue("3 sec")
.build(); .build();
public static final PropertyDescriptor CONNECTION_ATTEMPT_COUNT = new PropertyDescriptor.Builder()
.name("connection-attempt-timeout")
.displayName("Connection Attempt Count")
.description("The number of times to try and establish a connection.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("3")
.build();
public static final PropertyDescriptor SESSION_MAINTENANCE_INTERVAL = new PropertyDescriptor.Builder() public static final PropertyDescriptor SESSION_MAINTENANCE_INTERVAL = new PropertyDescriptor.Builder()
.name("session-maintenance-interval") .name("session-maintenance-interval")
.displayName("Session Maintenance Interval") .displayName("Session Maintenance Interval")
@ -183,6 +193,7 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService implemen
props.add(WS_URI); props.add(WS_URI);
props.add(SSL_CONTEXT); props.add(SSL_CONTEXT);
props.add(CONNECTION_TIMEOUT); props.add(CONNECTION_TIMEOUT);
props.add(CONNECTION_ATTEMPT_COUNT);
props.add(SESSION_MAINTENANCE_INTERVAL); props.add(SESSION_MAINTENANCE_INTERVAL);
props.add(USER_NAME); props.add(USER_NAME);
props.add(USER_PASSWORD); props.add(USER_PASSWORD);
@ -347,14 +358,23 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService implemen
if (!StringUtils.isEmpty(authorizationHeader)) { if (!StringUtils.isEmpty(authorizationHeader)) {
request.setHeader(HttpHeader.AUTHORIZATION.asString(), authorizationHeader); request.setHeader(HttpHeader.AUTHORIZATION.asString(), authorizationHeader);
} }
final Future<Session> connect = client.connect(listener, webSocketUri, request);
getLogger().info("Connecting to : {}", webSocketUri);
final Session session; final int connectCount = configurationContext.getProperty(CONNECTION_ATTEMPT_COUNT).evaluateAttributeExpressions().asInteger();
try {
session = connect.get(connectionTimeoutMillis, TimeUnit.MILLISECONDS); Session session = null;
} catch (Exception e) { for (int i = 0; i < connectCount; i++) {
throw new IOException("Failed to connect " + webSocketUri + " due to: " + e, e); final Future<Session> connect = createWebsocketSession(listener, request);
getLogger().info("Connecting to : {}", webSocketUri);
try {
session = connect.get(connectionTimeoutMillis, TimeUnit.MILLISECONDS);
break;
} catch (Exception e) {
if (i == connectCount - 1) {
throw new IOException("Failed to connect " + webSocketUri + " due to: " + e, e);
} else {
getLogger().warn("Failed to connect to {}, reconnection attempt {}", webSocketUri, i + 1);
}
}
} }
getLogger().info("Connected, session={}", session); getLogger().info("Connected, session={}", session);
activeSessions.put(clientId, new SessionInfo(listener.getSessionId(), flowFileAttributes)); activeSessions.put(clientId, new SessionInfo(listener.getSessionId(), flowFileAttributes));
@ -365,6 +385,10 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService implemen
} }
Future<Session> createWebsocketSession(RoutingWebSocketListener listener, ClientUpgradeRequest request) throws IOException {
return client.connect(listener, webSocketUri, request);
}
void maintainSessions() throws Exception { void maintainSessions() throws Exception {
if (client == null) { if (client == null) {
return; return;

View File

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.websocket.jetty;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.websocket.WebSocketClientService;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
class ITJettyWebsocketReconnect {
private ControllerServiceTestContext clientServiceContext;
private WebSocketClientService clientService;
@BeforeEach
public void setup() throws Exception {
setupClient();
}
@AfterEach
public void teardown() throws Exception {
clientService.stopClient();
}
private void setupClient() throws Exception {
clientService = new JettyWebSocketTestClient();
clientServiceContext = new ControllerServiceTestContext(clientService, "JettyWebSocketClient1");
clientServiceContext.setCustomValue(JettyWebSocketClient.WS_URI, "ws://localhost:" + NetworkUtils.getAvailableTcpPort() + "/test");
clientServiceContext.setCustomValue(JettyWebSocketClient.USER_NAME, "user2");
clientServiceContext.setCustomValue(JettyWebSocketClient.USER_PASSWORD, "password2");
clientService.initialize(clientServiceContext.getInitializationContext());
clientService.startClient(clientServiceContext.getConfigurationContext());
}
@Test
void testClientAttemptsToReconnect() throws Exception {
final ITJettyWebSocketCommunication.MockWebSocketProcessor clientProcessor = mock(ITJettyWebSocketCommunication.MockWebSocketProcessor.class);
doReturn("clientProcessor1").when(clientProcessor).getIdentifier();
final String clientId = "client1";
clientService.registerProcessor(clientId, clientProcessor);
assertThrows(IOException.class,
() -> clientService.connect(clientId)
);
JettyWebSocketTestClient testClientService = (JettyWebSocketTestClient) clientService;
verify(testClientService.getMockSession(), times(3)).get(anyLong(), any(TimeUnit.class));
}
private static class JettyWebSocketTestClient extends JettyWebSocketClient {
private CompletableFuture<Session> mockSession;
public JettyWebSocketTestClient() throws ExecutionException, InterruptedException, TimeoutException {
mockSession = mock(CompletableFuture.class);
when(mockSession.get(anyLong(), any(TimeUnit.class))).thenThrow(new RuntimeException("Test: Connecting timed out."));
}
Future<Session> createWebsocketSession(RoutingWebSocketListener listener, ClientUpgradeRequest request) {
return mockSession;
}
public CompletableFuture<Session> getMockSession() {
return mockSession;
}
}
}