diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientConfig.java b/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientConfig.java index 90170e81a0b..08e5dacc157 100644 --- a/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientConfig.java +++ b/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientConfig.java @@ -82,6 +82,7 @@ public class HttpClientConfig private final int numConnections; private final SSLContext sslContext; + private final HttpClientProxyConfig proxyConfig; private final Duration readTimeout; private final Duration sslHandshakeTimeout; private final int bossPoolSize; @@ -92,6 +93,7 @@ public class HttpClientConfig private HttpClientConfig( int numConnections, SSLContext sslContext, + HttpClientProxyConfig proxyConfig, Duration readTimeout, Duration sslHandshakeTimeout, int bossPoolSize, @@ -102,6 +104,7 @@ public class HttpClientConfig { this.numConnections = numConnections; this.sslContext = sslContext; + this.proxyConfig = proxyConfig; this.readTimeout = readTimeout; this.sslHandshakeTimeout = sslHandshakeTimeout; this.bossPoolSize = bossPoolSize; @@ -120,6 +123,11 @@ public class HttpClientConfig return sslContext; } + public HttpClientProxyConfig getProxyConfig() + { + return proxyConfig; + } + public Duration getReadTimeout() { return readTimeout; @@ -154,6 +162,7 @@ public class HttpClientConfig { private int numConnections = 1; private SSLContext sslContext = null; + private HttpClientProxyConfig proxyConfig = null; private Duration readTimeout = null; private Duration sslHandshakeTimeout = null; private int bossCount = DEFAULT_BOSS_COUNT; @@ -177,6 +186,12 @@ public class HttpClientConfig return this; } + public Builder withHttpProxyConfig(HttpClientProxyConfig config) + { + this.proxyConfig = config; + return this; + } + public Builder withReadTimeout(Duration readTimeout) { this.readTimeout = readTimeout; @@ -212,6 +227,7 @@ public class HttpClientConfig return new HttpClientConfig( numConnections, sslContext, + proxyConfig, readTimeout, sslHandshakeTimeout, bossCount, diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientInit.java b/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientInit.java index 279a6a8b51d..fd3ee805e5f 100644 --- a/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientInit.java +++ b/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientInit.java @@ -84,6 +84,7 @@ public class HttpClientInit new ChannelResourceFactory( createBootstrap(lifecycle, timer, config.getBossPoolSize(), config.getWorkerPoolSize()), config.getSslContext(), + config.getProxyConfig(), timer, config.getSslHandshakeTimeout() == null ? -1 : config.getSslHandshakeTimeout().getMillis() ), diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientProxyConfig.java b/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientProxyConfig.java new file mode 100644 index 00000000000..fec5c9e365f --- /dev/null +++ b/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientProxyConfig.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.http.client; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import javax.validation.constraints.Max; +import javax.validation.constraints.Min; + +public class HttpClientProxyConfig +{ + @JsonProperty("host") + private String host; + + @JsonProperty("port") + @Min(0) + @Max(65_535) + private int port; + + @JsonProperty("user") + private String user; + + @JsonProperty("password") + private String password; + + public HttpClientProxyConfig() + { + } + + public HttpClientProxyConfig(String host, int port, String user, String password) + { + this.host = host; + this.port = port; + this.user = user; + this.password = password; + } + + public String getHost() + { + return host; + } + + public int getPort() + { + return port; + } + + public String getUser() + { + return user; + } + + public String getPassword() + { + return password; + } + + @SuppressWarnings("VariableNotUsedInsideIf") + @Override + public String toString() + { + return "HttpClientProxyConfig{" + + "proxyHost='" + host + '\'' + + ", proxyPort=" + port + + ", user='" + user + '\'' + + ", password='" + ((password == null) ? "__is_null__" : "***") + '\'' + + '}'; + } +} diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/Request.java b/core/src/main/java/org/apache/druid/java/util/http/client/Request.java index b012be5c2ce..479468090e6 100644 --- a/core/src/main/java/org/apache/druid/java/util/http/client/Request.java +++ b/core/src/main/java/org/apache/druid/java/util/http/client/Request.java @@ -164,12 +164,16 @@ public class Request public Request setBasicAuthentication(String username, String password) { - final String base64Value = base64Encode(username + ":" + password); - setHeader(HttpHeaders.Names.AUTHORIZATION, "Basic " + base64Value); + setHeader(HttpHeaders.Names.AUTHORIZATION, makeBasicAuthenticationString(username, password)); return this; } - private String base64Encode(final String value) + public static String makeBasicAuthenticationString(String username, String password) + { + return "Basic " + base64Encode(username + ":" + password); + } + + private static String base64Encode(final String value) { final ChannelBufferFactory bufferFactory = HeapChannelBufferFactory.getInstance(); diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/pool/ChannelResourceFactory.java b/core/src/main/java/org/apache/druid/java/util/http/client/pool/ChannelResourceFactory.java index 9c7f4c7d0ae..d6465be305e 100644 --- a/core/src/main/java/org/apache/druid/java/util/http/client/pool/ChannelResourceFactory.java +++ b/core/src/main/java/org/apache/druid/java/util/http/client/pool/ChannelResourceFactory.java @@ -22,6 +22,8 @@ package org.apache.druid.java.util.http.client.pool; import com.google.common.base.Preconditions; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.http.client.HttpClientProxyConfig; +import org.apache.druid.java.util.http.client.Request; import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelException; @@ -31,7 +33,14 @@ import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.ExceptionEvent; +import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelUpstreamHandler; +import org.jboss.netty.handler.codec.http.DefaultHttpRequest; +import org.jboss.netty.handler.codec.http.HttpClientCodec; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponse; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.jboss.netty.handler.codec.http.HttpVersion; import org.jboss.netty.handler.ssl.SslHandler; import org.jboss.netty.util.Timer; @@ -50,21 +59,25 @@ public class ChannelResourceFactory implements ResourceFactory= 0 ? sslHandshakeTimeout : DEFAULT_SSL_HANDSHAKE_TIMEOUT_MILLIS; @@ -88,7 +101,99 @@ public class ChannelResourceFactory implements ResourceFactory requestContent = new AtomicReference<>(); + + final ExecutorService exec = Executors.newSingleThreadExecutor(); + final ServerSocket serverSocket = new ServerSocket(0); + exec.submit( + new Runnable() + { + @Override + public void run() + { + while (!Thread.currentThread().isInterrupted()) { + try ( + Socket clientSocket = serverSocket.accept(); + BufferedReader in = new BufferedReader( + new InputStreamReader(clientSocket.getInputStream(), StandardCharsets.UTF_8) + ); + OutputStream out = clientSocket.getOutputStream() + ) { + StringBuilder request = new StringBuilder(); + String line; + while (!"".equals((line = in.readLine()))) { + request.append(line).append("\r\n"); + } + requestContent.set(request.toString()); + out.write("HTTP/1.1 200 OK\r\n\r\n".getBytes(StandardCharsets.UTF_8)); + + while (!in.readLine().equals("")) { + // skip lines + } + out.write("HTTP/1.1 200 OK\r\nContent-Length: 6\r\n\r\nhello!".getBytes(StandardCharsets.UTF_8)); + } + catch (Exception e) { + Assert.fail(e.toString()); + } + } + } + } + ); + + final Lifecycle lifecycle = new Lifecycle(); + try { + final HttpClientConfig config = HttpClientConfig + .builder() + .withHttpProxyConfig( + new HttpClientProxyConfig("localhost", serverSocket.getLocalPort(), "bob", "sally") + ) + .build(); + final HttpClient client = HttpClientInit.createClient(config, lifecycle); + final StatusResponseHolder response = client + .go( + new Request( + HttpMethod.GET, + new URL("http://anotherHost:8080/") + ), + StatusResponseHandler.getInstance() + ).get(); + + Assert.assertEquals(200, response.getStatus().getCode()); + Assert.assertEquals("hello!", response.getContent()); + + Assert.assertEquals( + "CONNECT anotherHost:8080 HTTP/1.1\r\nProxy-Authorization: Basic Ym9iOnNhbGx5\r\n", + requestContent.get() + ); + } + finally { + exec.shutdownNow(); + serverSocket.close(); + lifecycle.stop(); + } + } + @Test public void testCompressionCodecConfig() throws Exception { diff --git a/docs/development/modules.md b/docs/development/modules.md index e17985efa57..50e2a97f77b 100644 --- a/docs/development/modules.md +++ b/docs/development/modules.md @@ -356,6 +356,22 @@ druid.coordinator.cleanupMetadata.duty.killSupervisors.retainDuration=PT0M druid.coordinator.cleanupMetadata.period=PT10S ``` +### Routing data through a HTTP proxy for your extension + +You can add the ability for the `HttpClient` of your extension to connect through an HTTP proxy. + +To support proxy connection for your extension's HTTP client: +1. Add `HttpClientProxyConfig` as a `@JsonProperty` to the HTTP config class of your extension. +2. In the extension's module class, add `HttpProxyConfig` config to `HttpClientConfig`. +For example, where `config` variable is the extension's HTTP config from step 1: +``` +final HttpClientConfig.Builder builder = HttpClientConfig + .builder() + .withNumConnections(1) + .withReadTimeout(config.getReadTimeout().toStandardDuration()) + .withHttpProxyConfig(config.getProxyConfig()); +``` + ### Bundle your extension with all the other Druid extensions When you do `mvn install`, Druid extensions will be packaged within the Druid tarball and `extensions` directory, which are both underneath `distribution/target/`.