NIFI-7754 - Support HTTP Proxy for JettyWebSocketClient

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #4502.
This commit is contained in:
Andreas Schöneck 2020-08-20 15:51:07 +02:00 committed by Pierre Villard
parent 85501e08e6
commit 44738b72cd
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
2 changed files with 87 additions and 1 deletions

View File

@ -23,6 +23,7 @@ import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
@ -33,6 +34,8 @@ import org.apache.nifi.util.StringUtils;
import org.apache.nifi.websocket.WebSocketClientService;
import org.apache.nifi.websocket.WebSocketConfigurationException;
import org.apache.nifi.websocket.WebSocketMessageRouter;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpProxy;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.api.Session;
@ -43,6 +46,7 @@ import java.io.IOException;
import java.net.URI;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -136,6 +140,24 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService implemen
.defaultValue("US-ASCII")
.build();
public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor.Builder()
.name("proxy-host")
.displayName("HTTP Proxy Host")
.description("The host name of the HTTP Proxy.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.build();
public static final PropertyDescriptor PROXY_PORT = new PropertyDescriptor.Builder()
.name("proxy-port")
.displayName("HTTP Proxy Port")
.description("The port number of the HTTP Proxy.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.PORT_VALIDATOR)
.build();
private static final List<PropertyDescriptor> properties;
static {
@ -148,6 +170,8 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService implemen
props.add(USER_NAME);
props.add(USER_PASSWORD);
props.add(AUTH_CHARSET);
props.add(PROXY_HOST);
props.add(PROXY_PORT);
properties = Collections.unmodifiableList(props);
}
@ -173,7 +197,18 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService implemen
if (sslService != null) {
sslContextFactory = createSslFactory(sslService, false, false, null);
}
client = new WebSocketClient(sslContextFactory);
HttpClient httpClient = new HttpClient(sslContextFactory);
final String proxyHost = context.getProperty(PROXY_HOST).evaluateAttributeExpressions().getValue();
final Integer proxyPort = context.getProperty(PROXY_PORT).evaluateAttributeExpressions().asInteger();
if (proxyHost != null && proxyPort != null) {
HttpProxy httpProxy = new HttpProxy(proxyHost, proxyPort);
httpClient.getProxyConfiguration().getProxies().add(httpProxy);
}
client = new WebSocketClient(httpClient);
configurePolicy(context, client.getPolicy());
final String userName = context.getProperty(USER_NAME).evaluateAttributeExpressions().getValue();
@ -208,6 +243,19 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService implemen
}, sessionMaintenanceInterval, sessionMaintenanceInterval, TimeUnit.MILLISECONDS);
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>(1);
final boolean proxyHostSet = validationContext.getProperty(PROXY_HOST).isSet();
final boolean proxyPortSet = validationContext.getProperty(PROXY_PORT).isSet();
if ((proxyHostSet && !proxyPortSet) || (!proxyHostSet && proxyPortSet)) {
results.add(new ValidationResult.Builder().subject("HTTP Proxy Host and Port").valid(false).explanation(
"If HTTP Proxy Host or HTTP Proxy Port is set, both must be set").build());
}
return results;
}
@OnDisabled
@OnShutdown
@Override

View File

@ -22,6 +22,7 @@ import org.junit.Test;
import java.util.Collection;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestJettyWebSocketClient {
@ -59,4 +60,41 @@ public class TestJettyWebSocketClient {
assertEquals(JettyWebSocketClient.WS_URI.getName(), result.getSubject());
}
@Test
public void testValidationProxyHostOnly() throws Exception {
final JettyWebSocketClient service = new JettyWebSocketClient();
final ControllerServiceTestContext context = new ControllerServiceTestContext(service, "service-id");
context.setCustomValue(JettyWebSocketClient.WS_URI, "wss://localhost:9001/test");
context.setCustomValue(JettyWebSocketClient.PROXY_HOST, "localhost");
service.initialize(context.getInitializationContext());
final Collection<ValidationResult> results = service.validate(context.getValidationContext());
assertEquals(1, results.size());
final ValidationResult result = results.iterator().next();
assertTrue(result.getSubject().contains("Proxy"));
}
@Test
public void testValidationProxyPortOnly() throws Exception {
final JettyWebSocketClient service = new JettyWebSocketClient();
final ControllerServiceTestContext context = new ControllerServiceTestContext(service, "service-id");
context.setCustomValue(JettyWebSocketClient.WS_URI, "wss://localhost:9001/test");
context.setCustomValue(JettyWebSocketClient.PROXY_PORT, "3128");
service.initialize(context.getInitializationContext());
final Collection<ValidationResult> results = service.validate(context.getValidationContext());
assertEquals(1, results.size());
final ValidationResult result = results.iterator().next();
assertTrue(result.getSubject().contains("Proxy"));
}
@Test
public void testValidationSuccessWithProxy() throws Exception {
final JettyWebSocketClient service = new JettyWebSocketClient();
final ControllerServiceTestContext context = new ControllerServiceTestContext(service, "service-id");
context.setCustomValue(JettyWebSocketClient.WS_URI, "wss://localhost:9001/test");
context.setCustomValue(JettyWebSocketClient.PROXY_HOST, "localhost");
context.setCustomValue(JettyWebSocketClient.PROXY_PORT, "3128");
service.initialize(context.getInitializationContext());
final Collection<ValidationResult> results = service.validate(context.getValidationContext());
assertEquals(0, results.size());
}
}