HttpClient: Add proxy support
Allows to configure watcher.http.proxy.host and watcher.http.proxy.port properties to configure a HTTP proxy as well as specify a proxy whenever a HTTP request is executed. Closes elastic/elasticsearch#587 Original commit: elastic/x-pack-elasticsearch@75ef260fef
@ -134,6 +134,8 @@ holds an object where the keys serve as the header names and the values serve as
| `request.params` | no | - | The URL query string parameters. The parameter values can be static text or include Mustache <<templates, templates>>.
| `request.auth` | no | - | Authentication related HTTP headers. Currently, only basic authentication is supported.
| `request.body` | no | - | The HTTP request body. The body can be static text or include Mustache <<templates, templates>>. When not specified, an empty body is sent.
| `request.proxy.host` | no | - | The proxy host to use when connecting to the host.
| `request.proxy.port` | no | - | The proxy port to use when connecting to the host.
| `request.connection_timeout` | no | 10s | The timeout for setting up the http connection. If the connection could not be set up within this time, the action will timeout and fail. It is
also possible to <<configuring-default-http-timeouts, configure>> the default connection timeout for all http connection timeouts.
| `request.read_timeout` | no | 10s | The timeout for reading data from http connection. If no response was received within this time, the action will timeout and fail. It is
@ -35,6 +35,8 @@ NOTE: If the body of the response from the HTTP endpoint is in the JSON or YAM
| `request.headers` | no | - | The HTTP request headers. The header values can be static text or include `mustache` <<templates, templates>>.
| `request.params` | no | - | The URL query string parameters. The parameter values can be static text or contain `mustache` <<templates, templates>>.
| `request.auth` | no | - | Authentication related HTTP headers. Currently, only basic authentication is supported.
| `request.proxy.host` | no | - | The proxy host to use when connecting to the host.
| `request.proxy.port` | no | - | The proxy port to use when connecting to the host.
| `request.connection_timeout` | no | 10s | The timeout for setting up the http connection. If the connection could not be set up within this time, the input will timeout and fail. It is
also possible to <<configuring-default-http-timeouts, configure>> the default connection timeout for all http connection timeouts.
| `request.read_timeout` | no | 10s | The timeout for reading data from http connection. If no response was received within this time, the input will timeout and fail. It is
@ -37,6 +37,12 @@ bin/plugin remove watcher
=== Change List
==== 2.1.0
* Support for configuring a proxy in the webhook action, http input and configuring a default proxy (which is also used by the slack action), using the `watcher.http.proxy.host` and `watcher.http.proxy.port` settings.
==== 2.0.0
.Bug fixes
@ -8,6 +8,7 @@ package org.elasticsearch.watcher.support.http;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.Streams;
@ -50,6 +51,7 @@ import static java.util.Collections.unmodifiableMap;
public class HttpClient extends AbstractLifecycleComponent<HttpClient> {
static final String SETTINGS_SSL_PREFIX = "watcher.http.ssl.";
static final String SETTINGS_PROXY_PREFIX = "watcher.http.proxy.";
static final String SETTINGS_SSL_SHIELD_PREFIX = "shield.ssl.";
public static final String SETTINGS_SSL_PROTOCOL = SETTINGS_SSL_PREFIX + "protocol";
@ -68,6 +70,8 @@ public class HttpClient extends AbstractLifecycleComponent<HttpClient> {
public static final String SETTINGS_SSL_TRUSTSTORE_ALGORITHM = SETTINGS_SSL_PREFIX + "truststore.algorithm";
public static final String SETTINGS_PROXY_HOST = SETTINGS_PROXY_PREFIX + "host";
public static final String SETTINGS_PROXY_PORT = SETTINGS_PROXY_PREFIX + "post";
private final HttpAuthRegistry httpAuthRegistry;
private final Environment env;
@ -75,6 +79,7 @@ public class HttpClient extends AbstractLifecycleComponent<HttpClient> {
private final TimeValue defaultReadTimeout;
private SSLSocketFactory sslSocketFactory;
private HttpProxy proxy = HttpProxy.NO_PROXY;
public HttpClient(Settings settings, HttpAuthRegistry httpAuthRegistry, Environment env) {
@ -87,6 +92,16 @@ public class HttpClient extends AbstractLifecycleComponent<HttpClient> {
protected void doStart() throws ElasticsearchException {
Integer proxyPort = settings.getAsInt(SETTINGS_PROXY_PORT, null);
String proxyHost = settings.get(SETTINGS_PROXY_HOST, null);
if (proxyPort != null && Strings.hasText(proxyHost)) {
proxy = new HttpProxy(proxyHost, proxyPort);
} else {
if (proxyPort == null && Strings.hasText(proxyHost) || proxyPort != null && !Strings.hasText(proxyHost)) {
logger.error("disabling proxy. Watcher HTTP HttpProxy requires both settings: [{}] and [{}]", SETTINGS_PROXY_HOST, SETTINGS_PROXY_PORT);
if (!settings.getByPrefix(SETTINGS_SSL_PREFIX).getAsMap().isEmpty() ||
!settings.getByPrefix(SETTINGS_SSL_SHIELD_PREFIX).getAsMap().isEmpty()) {
sslSocketFactory = createSSLSocketFactory(settings);
@ -137,7 +152,11 @@ public class HttpClient extends AbstractLifecycleComponent<HttpClient> {
logger.debug("making [{}] request to [{}]", request.method().method(), url);
logger.trace("sending [{}] as body of request", request.body());
HttpURLConnection urlConnection = (HttpURLConnection) url.openConnection();
// proxy configured in the request always wins!
HttpProxy proxyToUse = request.proxy != null ? request.proxy : proxy;
HttpURLConnection urlConnection = (HttpURLConnection) url.openConnection(proxyToUse.proxy());
if (urlConnection instanceof HttpsURLConnection && sslSocketFactory != null) {
HttpsURLConnection httpsConn = (HttpsURLConnection) urlConnection;
@ -0,0 +1,116 @@
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
package org.elasticsearch.watcher.support.http;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.*;
import java.io.IOException;
import java.net.*;
import java.util.Objects;
public class HttpProxy implements ToXContent, Streamable {
public static final HttpProxy NO_PROXY = new HttpProxy(null, null);
private String host;
private Integer port;
public HttpProxy(String host, Integer port) {
this.host = host;
this.port = port;
public void readFrom(StreamInput in) throws IOException {
host = in.readOptionalString();
port = in.readOptionalVInt();
public void writeTo(StreamOutput out) throws IOException {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
if (Strings.hasText(host) && port != null) {
builder.startObject("proxy").field("host", host).field("port", port).endObject();
return builder;
public String getHost() {
return host;
public Integer getPort() {
return port;
public Proxy proxy() throws UnknownHostException {
if (Strings.hasText(host) && port != null) {
return new Proxy(Proxy.Type.HTTP, new InetSocketAddress(InetAddress.getByName(host), port));
return Proxy.NO_PROXY;
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
HttpProxy that = (HttpProxy) o;
return Objects.equals(port, that.port) && Objects.equals(host, that.host);
public int hashCode() {
return Objects.hash(host, port);
public static HttpProxy parse(XContentParser parser) throws IOException {
XContentParser.Token token;
String currentFieldName = null;
String host = null;
Integer port = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.HOST)) {
host = parser.text();
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.PORT)) {
port = parser.intValue();
if (port <= 0 || port >= 65535) {
throw new ElasticsearchParseException("Proxy port must be between 1 and 65534, but was " + port);
if (port == null || host == null) {
throw new ElasticsearchParseException("Proxy must contain 'port' and 'host' field");
return new HttpProxy(host, port);
public interface Field {
ParseField HOST = new ParseField("host");
ParseField PORT = new ParseField("port");
@ -11,11 +11,7 @@ import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.watcher.support.WatcherDateTimeUtils;
import org.elasticsearch.watcher.support.WatcherUtils;
import org.elasticsearch.watcher.support.http.auth.HttpAuth;
@ -44,10 +40,12 @@ public class HttpRequest implements ToXContent {
final @Nullable String body;
final @Nullable TimeValue connectionTimeout;
final @Nullable TimeValue readTimeout;
final @Nullable HttpProxy proxy;
public HttpRequest(String host, int port, @Nullable Scheme scheme, @Nullable HttpMethod method, @Nullable String path,
@Nullable Map<String, String> params, @Nullable Map<String, String> headers,
@Nullable HttpAuth auth, @Nullable String body, @Nullable TimeValue connectionTimeout, @Nullable TimeValue readTimeout) {
@Nullable HttpAuth auth, @Nullable String body, @Nullable TimeValue connectionTimeout, @Nullable TimeValue readTimeout,
@Nullable HttpProxy proxy) {
this.host = host;
this.port = port;
this.scheme = scheme != null ? scheme : Scheme.HTTP;
@ -59,6 +57,7 @@ public class HttpRequest implements ToXContent {
this.body = body;
this.connectionTimeout = connectionTimeout;
this.readTimeout = readTimeout;
this.proxy = proxy;
public Scheme scheme() {
@ -109,6 +108,10 @@ public class HttpRequest implements ToXContent {
return readTimeout;
public HttpProxy proxy() {
return proxy;
public static String encodeUrl(String text) {
try {
return URLEncoder.encode(text, "UTF-8");
@ -153,6 +156,9 @@ public class HttpRequest implements ToXContent {
if (readTimeout != null) {
builder.field(Field.READ_TIMEOUT.getPreferredName(), readTimeout);
if (proxy != null) {
builder.field(Field.PROXY.getPreferredName(), proxy);
return builder.endObject();
@ -173,6 +179,7 @@ public class HttpRequest implements ToXContent {
if (auth != null ? !auth.equals(that.auth) : that.auth != null) return false;
if (connectionTimeout != null ? !connectionTimeout.equals(that.connectionTimeout) : that.connectionTimeout != null) return false;
if (readTimeout != null ? !readTimeout.equals(that.readTimeout) : that.readTimeout != null) return false;
if (proxy != null ? !proxy.equals(that.proxy) : that.proxy != null) return false;
return !(body != null ? !body.equals(that.body) : that.body != null);
@ -190,6 +197,7 @@ public class HttpRequest implements ToXContent {
result = 31 * result + (connectionTimeout != null ? connectionTimeout.hashCode() : 0);
result = 31 * result + (readTimeout != null ? readTimeout.hashCode() : 0);
result = 31 * result + (body != null ? body.hashCode() : 0);
result = 31 * result + (proxy != null ? proxy.hashCode() : 0);
return result;
@ -215,6 +223,9 @@ public class HttpRequest implements ToXContent {
sb.append("connection_timeout=[").append(connectionTimeout).append("], ");
sb.append("read_timeout=[").append(readTimeout).append("], ");
if (proxy != null) {
sb.append("proxy=[").append(proxy).append("], ");
sb.append("body=[").append(body).append("], ");
return sb.toString();
@ -239,6 +250,12 @@ public class HttpRequest implements ToXContent {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.PROXY)) {
try {
} catch (Exception e) {
throw new ElasticsearchParseException("could not parse http request. could not parse [{}] field", currentFieldName);
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.AUTH)) {
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.CONNECTION_TIMEOUT)) {
@ -313,6 +330,7 @@ public class HttpRequest implements ToXContent {
private String body;
private TimeValue connectionTimeout;
private TimeValue readTimeout;
private HttpProxy proxy;
private Builder(String host, int port) {
this.host = host;
@ -394,8 +412,13 @@ public class HttpRequest implements ToXContent {
return this;
public Builder proxy(HttpProxy proxy) {
this.proxy = proxy;
return this;
public HttpRequest build() {
HttpRequest request = new HttpRequest(host, port, scheme, method, path, unmodifiableMap(params), unmodifiableMap(headers), auth, body, connectionTimeout, readTimeout);
HttpRequest request = new HttpRequest(host, port, scheme, method, path, unmodifiableMap(params), unmodifiableMap(headers), auth, body, connectionTimeout, readTimeout, proxy);
params = null;
headers = null;
return request;
@ -414,5 +437,6 @@ public class HttpRequest implements ToXContent {
ParseField BODY = new ParseField("body");
ParseField CONNECTION_TIMEOUT = new ParseField("connection_timeout");
ParseField READ_TIMEOUT = new ParseField("read_timeout");
ParseField PROXY = new ParseField("proxy");
@ -8,6 +8,7 @@ package org.elasticsearch.watcher.support.http;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.TimeValue;
@ -45,10 +46,12 @@ public class HttpRequestTemplate implements ToXContent {
private final TextTemplate body;
private final @Nullable TimeValue connectionTimeout;
private final @Nullable TimeValue readTimeout;
private final @Nullable HttpProxy proxy;
public HttpRequestTemplate(String host, int port, @Nullable Scheme scheme, @Nullable HttpMethod method, @Nullable TextTemplate path,
Map<String, TextTemplate> params, Map<String, TextTemplate> headers, HttpAuth auth,
TextTemplate body, @Nullable TimeValue connectionTimeout, @Nullable TimeValue readTimeout) {
TextTemplate body, @Nullable TimeValue connectionTimeout, @Nullable TimeValue readTimeout,
@Nullable HttpProxy proxy) {
this.host = host;
this.port = port;
this.scheme = scheme != null ? scheme :Scheme.HTTP;
@ -60,6 +63,7 @@ public class HttpRequestTemplate implements ToXContent {
this.body = body;
this.connectionTimeout = connectionTimeout;
this.readTimeout = readTimeout;
this.proxy = proxy;
public Scheme scheme() {
@ -106,6 +110,10 @@ public class HttpRequestTemplate implements ToXContent {
return readTimeout;
public HttpProxy proxy() {
return proxy;
public HttpRequest render(TextTemplateEngine engine, Map<String, Object> model) {
HttpRequest.Builder request = HttpRequest.builder(host, port);
@ -145,6 +153,9 @@ public class HttpRequestTemplate implements ToXContent {
if (readTimeout != null) {
if (proxy != null) {
return request.build();
@ -186,6 +197,9 @@ public class HttpRequestTemplate implements ToXContent {
if (readTimeout != null) {
builder.field(Field.READ_TIMEOUT.getPreferredName(), readTimeout);
if (proxy != null) {
proxy.toXContent(builder, params);
return builder.endObject();
@ -206,6 +220,7 @@ public class HttpRequestTemplate implements ToXContent {
if (auth != null ? !auth.equals(that.auth) : that.auth != null) return false;
if (connectionTimeout != null ? !connectionTimeout.equals(that.connectionTimeout) : that.connectionTimeout != null) return false;
if (readTimeout != null ? !readTimeout.equals(that.readTimeout) : that.readTimeout != null) return false;
if (proxy != null ? !proxy.equals(that.proxy) : that.proxy != null) return false;
return body != null ? body.equals(that.body) : that.body == null;
@ -222,6 +237,7 @@ public class HttpRequestTemplate implements ToXContent {
result = 31 * result + (body != null ? body.hashCode() : 0);
result = 31 * result + (connectionTimeout != null ? connectionTimeout.hashCode() : 0);
result = 31 * result + (readTimeout != null ? readTimeout.hashCode() : 0);
result = 31 * result + (proxy != null ? proxy.hashCode() : 0);
return result;
@ -247,6 +263,8 @@ public class HttpRequestTemplate implements ToXContent {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.PROXY)) {
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.PATH)) {
builder.path(parseFieldTemplate(currentFieldName, parser));
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.HEADERS)) {
@ -298,7 +316,7 @@ public class HttpRequestTemplate implements ToXContent {
throw new ElasticsearchParseException("could not parse http request template. missing required [{}] string field", Field.HOST.getPreferredName());
if (builder.port <= 0) {
throw new ElasticsearchParseException("could not parse http request template. missing required [{}] numeric field", Field.PORT.getPreferredName());
throw new ElasticsearchParseException("could not parse http request template. wrong port for [{}]", Field.PORT.getPreferredName());
return builder.build();
@ -341,6 +359,7 @@ public class HttpRequestTemplate implements ToXContent {
private TextTemplate body;
private TimeValue connectionTimeout;
private TimeValue readTimeout;
private HttpProxy proxy;
private Builder() {
@ -433,9 +452,14 @@ public class HttpRequestTemplate implements ToXContent {
return this;
public Builder proxy(HttpProxy proxy) {
this.proxy = proxy;
return this;
public HttpRequestTemplate build() {
return new HttpRequestTemplate(host, port, scheme, method, path, unmodifiableMap(new HashMap<>(params)),
unmodifiableMap(new HashMap<>(headers)), auth, body, connectionTimeout, readTimeout);
unmodifiableMap(new HashMap<>(headers)), auth, body, connectionTimeout, readTimeout, proxy);
@ -5,6 +5,8 @@
package org.elasticsearch.watcher.actions.webhook;
import com.squareup.okhttp.mockwebserver.MockResponse;
import com.squareup.okhttp.mockwebserver.MockWebServer;
import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.settings.Settings;
@ -12,22 +14,15 @@ import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.env.Environment;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.actions.Action;
import org.elasticsearch.watcher.actions.Action.Result.Status;
import org.elasticsearch.watcher.actions.email.service.Attachment;
import org.elasticsearch.watcher.actions.email.service.Authentication;
import org.elasticsearch.watcher.actions.email.service.Email;
import org.elasticsearch.watcher.actions.email.service.EmailService;
import org.elasticsearch.watcher.actions.email.service.Profile;
import org.elasticsearch.watcher.actions.email.service.*;
import org.elasticsearch.watcher.execution.TriggeredExecutionContext;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.support.http.HttpClient;
import org.elasticsearch.watcher.support.http.HttpMethod;
import org.elasticsearch.watcher.support.http.HttpRequest;
import org.elasticsearch.watcher.support.http.HttpRequestTemplate;
import org.elasticsearch.watcher.support.http.HttpResponse;
import org.elasticsearch.watcher.support.http.*;
import org.elasticsearch.watcher.support.http.auth.HttpAuthRegistry;
import org.elasticsearch.watcher.support.http.auth.basic.BasicAuthFactory;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
@ -45,28 +40,21 @@ import org.joda.time.DateTime;
import org.junit.After;
import org.junit.Before;
import javax.mail.internet.AddressException;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import javax.mail.internet.AddressException;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.*;
import static org.hamcrest.core.Is.is;
import static org.joda.time.DateTimeZone.UTC;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;
@ -284,6 +272,31 @@ public class WebhookActionTests extends ESTestCase {
assertThat(result, Matchers.instanceOf(WebhookAction.Result.Success.class));
public void testThatSelectingProxyWorks() throws Exception {
Environment environment = new Environment(Settings.builder().put("path.home", createTempDir()).build());
HttpClient httpClient = new HttpClient(Settings.EMPTY, authRegistry, environment).start();
MockWebServer proxyServer = new MockWebServer();
try {
proxyServer.enqueue(new MockResponse().setResponseCode(200).setBody("fullProxiedContent"));
HttpRequestTemplate.Builder builder = HttpRequestTemplate.builder("localhost", 65535)
.path("/").proxy(new HttpProxy("localhost", proxyServer.getPort()));
WebhookAction action = new WebhookAction(builder.build());
ExecutableWebhookAction executable = new ExecutableWebhookAction(action, logger, httpClient, templateEngine);
String watchId = "test_url_encode" + randomAsciiOfLength(10);
Watch watch = createWatch(watchId, mock(ClientProxy.class), "account1");
WatchExecutionContext ctx = new TriggeredExecutionContext(watch, new DateTime(UTC), new ScheduleTriggerEvent(watchId, new DateTime(UTC), new DateTime(UTC)), timeValueSeconds(5));
executable.execute("_id", ctx, new Payload.Simple());
assertThat(proxyServer.getRequestCount(), is(1));
} finally {
private Watch createWatch(String watchId, ClientProxy client, final String account) throws AddressException, IOException {
return WatcherTestUtils.createTestWatch(watchId,
@ -8,7 +8,6 @@ package org.elasticsearch.watcher.support.http;
import com.squareup.okhttp.mockwebserver.MockResponse;
import com.squareup.okhttp.mockwebserver.MockWebServer;
import com.squareup.okhttp.mockwebserver.RecordedRequest;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.settings.Settings;
@ -21,29 +20,24 @@ import org.elasticsearch.watcher.support.http.auth.basic.BasicAuthFactory;
import org.elasticsearch.watcher.support.secret.SecretService;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import javax.net.ssl.SSLSocket;
import javax.net.ssl.SSLSocketFactory;
import java.io.IOException;
import java.net.BindException;
import java.net.InetAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.net.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.security.UnrecoverableKeyException;
import javax.net.ssl.SSLSocket;
import javax.net.ssl.SSLSocketFactory;
import static java.util.Collections.singletonMap;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.*;
import static org.hamcrest.core.Is.is;
public class HttpClientTests extends ESTestCase {
private MockWebServer webServer;
private HttpClient httpClient;
private HttpAuthRegistry authRegistry;
@ -56,17 +50,9 @@ public class HttpClientTests extends ESTestCase {
public void init() throws Exception {
secretService = new SecretService.PlainText();
authRegistry = new HttpAuthRegistry(singletonMap(BasicAuth.TYPE, new BasicAuthFactory(secretService)));
for (webPort = 9200; webPort < 9300; webPort++) {
try {
webServer = new MockWebServer();
httpClient = new HttpClient(Settings.EMPTY, authRegistry, environment).start();
} catch (BindException be) {
logger.warn("port [{}] was already in use trying next port", webPort);
throw new ElasticsearchException("unable to find open port between 9200 and 9300");
webServer = startWebServer(9200, 9300);
webPort = webServer.getPort();
httpClient = new HttpClient(Settings.EMPTY, authRegistry, environment).start();
@ -310,6 +296,76 @@ public class HttpClientTests extends ESTestCase {
assertThat(response.body(), notNullValue());
public void testThatProxyCanBeConfigured() throws Exception {
// this test fakes a proxy server that sends a response instead of forwarding it to the mock web server
MockWebServer proxyServer = startWebServer(62000, 63000);
proxyServer.enqueue(new MockResponse().setResponseCode(200).setBody("fullProxiedContent"));
try {
Settings settings = Settings.builder()
.put(HttpClient.SETTINGS_PROXY_HOST, "localhost")
.put(HttpClient.SETTINGS_PROXY_PORT, proxyServer.getPort())
HttpClient httpClient = new HttpClient(settings, authRegistry, environment).start();
HttpRequest.Builder requestBuilder = HttpRequest.builder("localhost", webPort)
HttpResponse response = httpClient.execute(requestBuilder.build());
assertThat(response.status(), equalTo(200));
assertThat(response.body().toUtf8(), equalTo("fullProxiedContent"));
// ensure we hit the proxyServer and not the webserver
assertThat(webServer.getRequestCount(), equalTo(0));
assertThat(proxyServer.getRequestCount(), equalTo(1));
} finally {
public void testThatProxyCanBeOverriddenByRequest() throws Exception {
// this test fakes a proxy server that sends a response instead of forwarding it to the mock web server
MockWebServer proxyServer = startWebServer(62000, 63000);
proxyServer.enqueue(new MockResponse().setResponseCode(200).setBody("fullProxiedContent"));
try {
Settings settings = Settings.builder()
.put(HttpClient.SETTINGS_PROXY_HOST, "localhost")
.put(HttpClient.SETTINGS_PROXY_PORT, proxyServer.getPort() + 1)
HttpClient httpClient = new HttpClient(settings, authRegistry, environment).start();
HttpRequest.Builder requestBuilder = HttpRequest.builder("localhost", webPort)
.proxy(new HttpProxy("localhost", proxyServer.getPort()))
HttpResponse response = httpClient.execute(requestBuilder.build());
assertThat(response.status(), equalTo(200));
assertThat(response.body().toUtf8(), equalTo("fullProxiedContent"));
// ensure we hit the proxyServer and not the webserver
assertThat(webServer.getRequestCount(), equalTo(0));
assertThat(proxyServer.getRequestCount(), equalTo(1));
} finally {
private MockWebServer startWebServer(int startPort, int endPort) throws IOException {
for (int port = startPort; port < endPort; port++) {
try {
MockWebServer mockWebServer = new MockWebServer();
return mockWebServer;
} catch (BindException be) {
logger.warn("port [{}] was already in use trying next port", webPort);
throw new ElasticsearchException("unable to find open port between 9200 and 9300");
static class ClientAuthRequiringSSLSocketFactory extends SSLSocketFactory {
final SSLSocketFactory delegate;
@ -6,10 +6,7 @@
package org.elasticsearch.watcher.support.http;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.watcher.support.http.auth.HttpAuthRegistry;
@ -20,19 +17,20 @@ import org.elasticsearch.watcher.support.text.TextTemplate;
import org.elasticsearch.watcher.support.text.TextTemplateEngine;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import java.util.Collections;
import java.util.Map;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.*;
import static org.mockito.Mockito.mock;
public class HttpRequestTemplateTests extends ESTestCase {
public void testBodyWithXContent() throws Exception {
XContentType type = randomFrom(XContentType.JSON, XContentType.YAML);
HttpRequestTemplate template = HttpRequestTemplate.builder("_host", 1234)
@ -50,6 +48,34 @@ public class HttpRequestTemplateTests extends ESTestCase {
assertThat(request.headers.size(), is(0));
public void testProxy() throws Exception {
HttpRequestTemplate template = HttpRequestTemplate.builder("_host", 1234)
.proxy(new HttpProxy("localhost", 8080))
HttpRequest request = template.render(new MockTextTemplateEngine(), Collections.emptyMap());
assertThat(request.proxy().getHost(), is("localhost"));
assertThat(request.proxy().getPort(), is(8080));
public void testProxyParsing() throws Exception {
HttpRequestTemplate.Builder builder = HttpRequestTemplate.builder("_host", 1234);
String proxyHost = randomAsciiOfLength(10);
int proxyPort = randomIntBetween(1, 65534);
builder.proxy(new HttpProxy(proxyHost, proxyPort));
HttpRequestTemplate template = builder.build();
XContentBuilder xContentBuilder = template.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS);
XContentParser xContentParser = JsonXContent.jsonXContent.createParser(xContentBuilder.bytes());
HttpRequestTemplate.Parser parser = new HttpRequestTemplate.Parser(mock(HttpAuthRegistry.class));
HttpRequestTemplate parsedTemplate = parser.parse(xContentParser);
assertThat(parsedTemplate.proxy().getPort(), is(proxyPort));
assertThat(parsedTemplate.proxy().getHost(), is(proxyHost));
public void testParseSelfGenerated() throws Exception {
HttpRequestTemplate.Builder builder = HttpRequestTemplate.builder("_host", 1234);
@ -84,6 +110,10 @@ public class HttpRequestTemplateTests extends ESTestCase {
if (readTimeout > 0) {
boolean enableProxy = randomBoolean();
if (enableProxy) {
builder.proxy(new HttpProxy(randomAsciiOfLength(10), randomIntBetween(1, 65534)));
HttpRequestTemplate template = builder.build();
