diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java index e412006322..cc46e9f061 100644 --- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java @@ -23,6 +23,7 @@ import org.apache.nifi.annotation.lifecycle.OnDisabled; import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.annotation.lifecycle.OnShutdown; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.expression.ExpressionLanguageScope; @@ -33,6 +34,8 @@ import org.apache.nifi.util.StringUtils; import org.apache.nifi.websocket.WebSocketClientService; import org.apache.nifi.websocket.WebSocketConfigurationException; import org.apache.nifi.websocket.WebSocketMessageRouter; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.HttpProxy; import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.websocket.api.Session; @@ -43,6 +46,7 @@ import java.io.IOException; import java.net.URI; import java.nio.charset.Charset; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -136,6 +140,24 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService implemen .defaultValue("US-ASCII") .build(); + public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor.Builder() + .name("proxy-host") + .displayName("HTTP Proxy Host") + .description("The host name of the HTTP Proxy.") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .build(); + + public static final PropertyDescriptor PROXY_PORT = new PropertyDescriptor.Builder() + .name("proxy-port") + .displayName("HTTP Proxy Port") + .description("The port number of the HTTP Proxy.") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.PORT_VALIDATOR) + .build(); + private static final List properties; static { @@ -148,6 +170,8 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService implemen props.add(USER_NAME); props.add(USER_PASSWORD); props.add(AUTH_CHARSET); + props.add(PROXY_HOST); + props.add(PROXY_PORT); properties = Collections.unmodifiableList(props); } @@ -173,7 +197,18 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService implemen if (sslService != null) { sslContextFactory = createSslFactory(sslService, false, false, null); } - client = new WebSocketClient(sslContextFactory); + + HttpClient httpClient = new HttpClient(sslContextFactory); + + final String proxyHost = context.getProperty(PROXY_HOST).evaluateAttributeExpressions().getValue(); + final Integer proxyPort = context.getProperty(PROXY_PORT).evaluateAttributeExpressions().asInteger(); + + if (proxyHost != null && proxyPort != null) { + HttpProxy httpProxy = new HttpProxy(proxyHost, proxyPort); + httpClient.getProxyConfiguration().getProxies().add(httpProxy); + } + + client = new WebSocketClient(httpClient); configurePolicy(context, client.getPolicy()); final String userName = context.getProperty(USER_NAME).evaluateAttributeExpressions().getValue(); @@ -208,6 +243,19 @@ public class JettyWebSocketClient extends AbstractJettyWebSocketService implemen }, sessionMaintenanceInterval, sessionMaintenanceInterval, TimeUnit.MILLISECONDS); } + @Override + protected Collection customValidate(ValidationContext validationContext) { + final List results = new ArrayList<>(1); + final boolean proxyHostSet = validationContext.getProperty(PROXY_HOST).isSet(); + final boolean proxyPortSet = validationContext.getProperty(PROXY_PORT).isSet(); + + if ((proxyHostSet && !proxyPortSet) || (!proxyHostSet && proxyPortSet)) { + results.add(new ValidationResult.Builder().subject("HTTP Proxy Host and Port").valid(false).explanation( + "If HTTP Proxy Host or HTTP Proxy Port is set, both must be set").build()); + } + return results; + } + @OnDisabled @OnShutdown @Override diff --git a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketClient.java b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketClient.java index a20b54ec1b..7ffae72f52 100644 --- a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketClient.java +++ b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketClient.java @@ -22,6 +22,7 @@ import org.junit.Test; import java.util.Collection; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class TestJettyWebSocketClient { @@ -59,4 +60,41 @@ public class TestJettyWebSocketClient { assertEquals(JettyWebSocketClient.WS_URI.getName(), result.getSubject()); } + @Test + public void testValidationProxyHostOnly() throws Exception { + final JettyWebSocketClient service = new JettyWebSocketClient(); + final ControllerServiceTestContext context = new ControllerServiceTestContext(service, "service-id"); + context.setCustomValue(JettyWebSocketClient.WS_URI, "wss://localhost:9001/test"); + context.setCustomValue(JettyWebSocketClient.PROXY_HOST, "localhost"); + service.initialize(context.getInitializationContext()); + final Collection results = service.validate(context.getValidationContext()); + assertEquals(1, results.size()); + final ValidationResult result = results.iterator().next(); + assertTrue(result.getSubject().contains("Proxy")); + } + + @Test + public void testValidationProxyPortOnly() throws Exception { + final JettyWebSocketClient service = new JettyWebSocketClient(); + final ControllerServiceTestContext context = new ControllerServiceTestContext(service, "service-id"); + context.setCustomValue(JettyWebSocketClient.WS_URI, "wss://localhost:9001/test"); + context.setCustomValue(JettyWebSocketClient.PROXY_PORT, "3128"); + service.initialize(context.getInitializationContext()); + final Collection results = service.validate(context.getValidationContext()); + assertEquals(1, results.size()); + final ValidationResult result = results.iterator().next(); + assertTrue(result.getSubject().contains("Proxy")); + } + + @Test + public void testValidationSuccessWithProxy() throws Exception { + final JettyWebSocketClient service = new JettyWebSocketClient(); + final ControllerServiceTestContext context = new ControllerServiceTestContext(service, "service-id"); + context.setCustomValue(JettyWebSocketClient.WS_URI, "wss://localhost:9001/test"); + context.setCustomValue(JettyWebSocketClient.PROXY_HOST, "localhost"); + context.setCustomValue(JettyWebSocketClient.PROXY_PORT, "3128"); + service.initialize(context.getInitializationContext()); + final Collection results = service.validate(context.getValidationContext()); + assertEquals(0, results.size()); + } }