From a36a41da733ebb805b75797f6d2d21cd61c6be87 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Tue, 9 Nov 2021 17:24:06 -0800 Subject: [PATCH] Support routing data through an HTTP proxy (#11891) * Support routing data through an HTTP proxy * Support routing data through an HTTP proxy This adds the ability for the HttpClient to connect through an HTTP proxy. We augment the channel factory to check if it is supposed to be proxied and, if so, we connect to the proxy host first, issue a CONNECT command through to the final recipient host and *then* give the channel to the normal http client for usage. * add docs * address comments Co-authored-by: imply-cheddar <86940447+imply-cheddar@users.noreply.github.com> --- .../util/http/client/HttpClientConfig.java | 16 ++ .../java/util/http/client/HttpClientInit.java | 1 + .../http/client/HttpClientProxyConfig.java | 86 ++++++++++ .../druid/java/util/http/client/Request.java | 10 +- .../client/pool/ChannelResourceFactory.java | 150 +++++++++++++++--- .../util/http/client/FriendlyServersTest.java | 76 +++++++++ docs/development/modules.md | 16 ++ 7 files changed, 330 insertions(+), 25 deletions(-) create mode 100644 core/src/main/java/org/apache/druid/java/util/http/client/HttpClientProxyConfig.java diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientConfig.java b/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientConfig.java index 90170e81a0b..08e5dacc157 100644 --- a/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientConfig.java +++ b/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientConfig.java @@ -82,6 +82,7 @@ public class HttpClientConfig private final int numConnections; private final SSLContext sslContext; + private final HttpClientProxyConfig proxyConfig; private final Duration readTimeout; private final Duration sslHandshakeTimeout; private final int bossPoolSize; @@ -92,6 +93,7 @@ public class HttpClientConfig private HttpClientConfig( int numConnections, SSLContext sslContext, + HttpClientProxyConfig proxyConfig, Duration readTimeout, Duration sslHandshakeTimeout, int bossPoolSize, @@ -102,6 +104,7 @@ public class HttpClientConfig { this.numConnections = numConnections; this.sslContext = sslContext; + this.proxyConfig = proxyConfig; this.readTimeout = readTimeout; this.sslHandshakeTimeout = sslHandshakeTimeout; this.bossPoolSize = bossPoolSize; @@ -120,6 +123,11 @@ public class HttpClientConfig return sslContext; } + public HttpClientProxyConfig getProxyConfig() + { + return proxyConfig; + } + public Duration getReadTimeout() { return readTimeout; @@ -154,6 +162,7 @@ public class HttpClientConfig { private int numConnections = 1; private SSLContext sslContext = null; + private HttpClientProxyConfig proxyConfig = null; private Duration readTimeout = null; private Duration sslHandshakeTimeout = null; private int bossCount = DEFAULT_BOSS_COUNT; @@ -177,6 +186,12 @@ public class HttpClientConfig return this; } + public Builder withHttpProxyConfig(HttpClientProxyConfig config) + { + this.proxyConfig = config; + return this; + } + public Builder withReadTimeout(Duration readTimeout) { this.readTimeout = readTimeout; @@ -212,6 +227,7 @@ public class HttpClientConfig return new HttpClientConfig( numConnections, sslContext, + proxyConfig, readTimeout, sslHandshakeTimeout, bossCount, diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientInit.java b/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientInit.java index 279a6a8b51d..fd3ee805e5f 100644 --- a/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientInit.java +++ b/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientInit.java @@ -84,6 +84,7 @@ public class HttpClientInit new ChannelResourceFactory( createBootstrap(lifecycle, timer, config.getBossPoolSize(), config.getWorkerPoolSize()), config.getSslContext(), + config.getProxyConfig(), timer, config.getSslHandshakeTimeout() == null ? -1 : config.getSslHandshakeTimeout().getMillis() ), diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientProxyConfig.java b/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientProxyConfig.java new file mode 100644 index 00000000000..fec5c9e365f --- /dev/null +++ b/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientProxyConfig.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.http.client; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import javax.validation.constraints.Max; +import javax.validation.constraints.Min; + +public class HttpClientProxyConfig +{ + @JsonProperty("host") + private String host; + + @JsonProperty("port") + @Min(0) + @Max(65_535) + private int port; + + @JsonProperty("user") + private String user; + + @JsonProperty("password") + private String password; + + public HttpClientProxyConfig() + { + } + + public HttpClientProxyConfig(String host, int port, String user, String password) + { + this.host = host; + this.port = port; + this.user = user; + this.password = password; + } + + public String getHost() + { + return host; + } + + public int getPort() + { + return port; + } + + public String getUser() + { + return user; + } + + public String getPassword() + { + return password; + } + + @SuppressWarnings("VariableNotUsedInsideIf") + @Override + public String toString() + { + return "HttpClientProxyConfig{" + + "proxyHost='" + host + '\'' + + ", proxyPort=" + port + + ", user='" + user + '\'' + + ", password='" + ((password == null) ? "__is_null__" : "***") + '\'' + + '}'; + } +} diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/Request.java b/core/src/main/java/org/apache/druid/java/util/http/client/Request.java index b012be5c2ce..479468090e6 100644 --- a/core/src/main/java/org/apache/druid/java/util/http/client/Request.java +++ b/core/src/main/java/org/apache/druid/java/util/http/client/Request.java @@ -164,12 +164,16 @@ public class Request public Request setBasicAuthentication(String username, String password) { - final String base64Value = base64Encode(username + ":" + password); - setHeader(HttpHeaders.Names.AUTHORIZATION, "Basic " + base64Value); + setHeader(HttpHeaders.Names.AUTHORIZATION, makeBasicAuthenticationString(username, password)); return this; } - private String base64Encode(final String value) + public static String makeBasicAuthenticationString(String username, String password) + { + return "Basic " + base64Encode(username + ":" + password); + } + + private static String base64Encode(final String value) { final ChannelBufferFactory bufferFactory = HeapChannelBufferFactory.getInstance(); diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/pool/ChannelResourceFactory.java b/core/src/main/java/org/apache/druid/java/util/http/client/pool/ChannelResourceFactory.java index 9c7f4c7d0ae..d6465be305e 100644 --- a/core/src/main/java/org/apache/druid/java/util/http/client/pool/ChannelResourceFactory.java +++ b/core/src/main/java/org/apache/druid/java/util/http/client/pool/ChannelResourceFactory.java @@ -22,6 +22,8 @@ package org.apache.druid.java.util.http.client.pool; import com.google.common.base.Preconditions; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.http.client.HttpClientProxyConfig; +import org.apache.druid.java.util.http.client.Request; import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelException; @@ -31,7 +33,14 @@ import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.ExceptionEvent; +import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelUpstreamHandler; +import org.jboss.netty.handler.codec.http.DefaultHttpRequest; +import org.jboss.netty.handler.codec.http.HttpClientCodec; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponse; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.jboss.netty.handler.codec.http.HttpVersion; import org.jboss.netty.handler.ssl.SslHandler; import org.jboss.netty.util.Timer; @@ -50,21 +59,25 @@ public class ChannelResourceFactory implements ResourceFactory= 0 ? sslHandshakeTimeout : DEFAULT_SSL_HANDSHAKE_TIMEOUT_MILLIS; @@ -88,7 +101,99 @@ public class ChannelResourceFactory implements ResourceFactory requestContent = new AtomicReference<>(); + + final ExecutorService exec = Executors.newSingleThreadExecutor(); + final ServerSocket serverSocket = new ServerSocket(0); + exec.submit( + new Runnable() + { + @Override + public void run() + { + while (!Thread.currentThread().isInterrupted()) { + try ( + Socket clientSocket = serverSocket.accept(); + BufferedReader in = new BufferedReader( + new InputStreamReader(clientSocket.getInputStream(), StandardCharsets.UTF_8) + ); + OutputStream out = clientSocket.getOutputStream() + ) { + StringBuilder request = new StringBuilder(); + String line; + while (!"".equals((line = in.readLine()))) { + request.append(line).append("\r\n"); + } + requestContent.set(request.toString()); + out.write("HTTP/1.1 200 OK\r\n\r\n".getBytes(StandardCharsets.UTF_8)); + + while (!in.readLine().equals("")) { + // skip lines + } + out.write("HTTP/1.1 200 OK\r\nContent-Length: 6\r\n\r\nhello!".getBytes(StandardCharsets.UTF_8)); + } + catch (Exception e) { + Assert.fail(e.toString()); + } + } + } + } + ); + + final Lifecycle lifecycle = new Lifecycle(); + try { + final HttpClientConfig config = HttpClientConfig + .builder() + .withHttpProxyConfig( + new HttpClientProxyConfig("localhost", serverSocket.getLocalPort(), "bob", "sally") + ) + .build(); + final HttpClient client = HttpClientInit.createClient(config, lifecycle); + final StatusResponseHolder response = client + .go( + new Request( + HttpMethod.GET, + new URL("http://anotherHost:8080/") + ), + StatusResponseHandler.getInstance() + ).get(); + + Assert.assertEquals(200, response.getStatus().getCode()); + Assert.assertEquals("hello!", response.getContent()); + + Assert.assertEquals( + "CONNECT anotherHost:8080 HTTP/1.1\r\nProxy-Authorization: Basic Ym9iOnNhbGx5\r\n", + requestContent.get() + ); + } + finally { + exec.shutdownNow(); + serverSocket.close(); + lifecycle.stop(); + } + } + @Test public void testCompressionCodecConfig() throws Exception { diff --git a/docs/development/modules.md b/docs/development/modules.md index e17985efa57..50e2a97f77b 100644 --- a/docs/development/modules.md +++ b/docs/development/modules.md @@ -356,6 +356,22 @@ druid.coordinator.cleanupMetadata.duty.killSupervisors.retainDuration=PT0M druid.coordinator.cleanupMetadata.period=PT10S ``` +### Routing data through a HTTP proxy for your extension + +You can add the ability for the `HttpClient` of your extension to connect through an HTTP proxy. + +To support proxy connection for your extension's HTTP client: +1. Add `HttpClientProxyConfig` as a `@JsonProperty` to the HTTP config class of your extension. +2. In the extension's module class, add `HttpProxyConfig` config to `HttpClientConfig`. +For example, where `config` variable is the extension's HTTP config from step 1: +``` +final HttpClientConfig.Builder builder = HttpClientConfig + .builder() + .withNumConnections(1) + .withReadTimeout(config.getReadTimeout().toStandardDuration()) + .withHttpProxyConfig(config.getProxyConfig()); +``` + ### Bundle your extension with all the other Druid extensions When you do `mvn install`, Druid extensions will be packaged within the Druid tarball and `extensions` directory, which are both underneath `distribution/target/`.