diff --git a/pom.xml b/pom.xml
index 2dea9dbbedd..f1009b72c48 100644
--- a/pom.xml
+++ b/pom.xml
@@ -104,6 +104,13 @@
test
+
+ com.squareup.okhttp
+ mockwebserver
+ 2.3.0
+ test
+
+
diff --git a/src/main/java/org/elasticsearch/watcher/WatcherModule.java b/src/main/java/org/elasticsearch/watcher/WatcherModule.java
index c7bd278e300..3b43644ffe3 100644
--- a/src/main/java/org/elasticsearch/watcher/WatcherModule.java
+++ b/src/main/java/org/elasticsearch/watcher/WatcherModule.java
@@ -20,6 +20,7 @@ import org.elasticsearch.watcher.rest.WatcherRestModule;
import org.elasticsearch.watcher.shield.WatcherShieldModule;
import org.elasticsearch.watcher.support.TemplateUtils;
import org.elasticsearch.watcher.support.clock.ClockModule;
+import org.elasticsearch.watcher.support.http.HttpClientModule;
import org.elasticsearch.watcher.support.init.InitializingModule;
import org.elasticsearch.watcher.support.template.TemplateModule;
import org.elasticsearch.watcher.transform.TransformModule;
@@ -42,6 +43,7 @@ public class WatcherModule extends AbstractModule implements SpawnModules {
new InitializingModule(),
new WatchModule(),
new TemplateModule(),
+ new HttpClientModule(),
new ClockModule(),
new WatcherClientModule(),
new TransformModule(),
diff --git a/src/main/java/org/elasticsearch/watcher/actions/ActionModule.java b/src/main/java/org/elasticsearch/watcher/actions/ActionModule.java
index a9f77910c14..2ccd7b37da7 100644
--- a/src/main/java/org/elasticsearch/watcher/actions/ActionModule.java
+++ b/src/main/java/org/elasticsearch/watcher/actions/ActionModule.java
@@ -5,14 +5,13 @@
*/
package org.elasticsearch.watcher.actions;
+import org.elasticsearch.common.inject.AbstractModule;
+import org.elasticsearch.common.inject.multibindings.MapBinder;
import org.elasticsearch.watcher.actions.email.EmailAction;
import org.elasticsearch.watcher.actions.email.service.EmailService;
import org.elasticsearch.watcher.actions.email.service.InternalEmailService;
import org.elasticsearch.watcher.actions.index.IndexAction;
-import org.elasticsearch.watcher.actions.webhook.HttpClient;
import org.elasticsearch.watcher.actions.webhook.WebhookAction;
-import org.elasticsearch.common.inject.AbstractModule;
-import org.elasticsearch.common.inject.multibindings.MapBinder;
import java.util.HashMap;
import java.util.Map;
@@ -46,7 +45,6 @@ public class ActionModule extends AbstractModule {
}
bind(ActionRegistry.class).asEagerSingleton();
- bind(HttpClient.class).asEagerSingleton();
bind(EmailService.class).to(InternalEmailService.class).asEagerSingleton();
}
diff --git a/src/main/java/org/elasticsearch/watcher/actions/webhook/HttpClient.java b/src/main/java/org/elasticsearch/watcher/actions/webhook/HttpClient.java
deleted file mode 100644
index 49fb375ed72..00000000000
--- a/src/main/java/org/elasticsearch/watcher/actions/webhook/HttpClient.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License;
- * you may not use this file except in compliance with the Elastic License.
- */
-package org.elasticsearch.watcher.actions.webhook;
-
-import org.elasticsearch.common.base.Charsets;
-import org.elasticsearch.common.component.AbstractComponent;
-import org.elasticsearch.common.inject.Inject;
-import org.elasticsearch.common.netty.handler.codec.http.HttpMethod;
-import org.elasticsearch.common.settings.Settings;
-
-import java.io.IOException;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.net.URLEncoder;
-
-/**
- * Client class to wrap http connections
- */
-public class HttpClient extends AbstractComponent {
-
- @Inject
- public HttpClient(Settings settings) {
- super(settings);
- }
-
- public int execute(HttpMethod method, String url, String body) throws IOException {
- logger.debug("making [{}] request to [{}]", method.getName(), url);
- if (logger.isTraceEnabled()) {
- logger.trace("sending [{}] as body of request", body);
- }
- URL encodedUrl = new URL(URLEncoder.encode(url, Charsets.UTF_8.name()));
- HttpURLConnection httpConnection = (HttpURLConnection) encodedUrl.openConnection();
- httpConnection.setRequestMethod(method.getName());
- httpConnection.setRequestProperty("Accept-Charset", Charsets.UTF_8.name());
- httpConnection.setDoOutput(true);
- httpConnection.setRequestProperty("Content-Length", Integer.toString(body.length()));
- httpConnection.getOutputStream().write(body.getBytes(Charsets.UTF_8.name()));
- return httpConnection.getResponseCode();
- }
-}
diff --git a/src/main/java/org/elasticsearch/watcher/actions/webhook/WebhookAction.java b/src/main/java/org/elasticsearch/watcher/actions/webhook/WebhookAction.java
index 86b07e02444..7d0e32df67c 100644
--- a/src/main/java/org/elasticsearch/watcher/actions/webhook/WebhookAction.java
+++ b/src/main/java/org/elasticsearch/watcher/actions/webhook/WebhookAction.java
@@ -5,27 +5,29 @@
*/
package org.elasticsearch.watcher.actions.webhook;
-import org.elasticsearch.watcher.WatcherSettingsException;
-import org.elasticsearch.watcher.watch.WatchExecutionContext;
-import org.elasticsearch.watcher.watch.Payload;
-import org.elasticsearch.watcher.actions.Action;
-import org.elasticsearch.watcher.actions.ActionException;
-import org.elasticsearch.watcher.actions.ActionSettingsException;
-import org.elasticsearch.watcher.support.Script;
-import org.elasticsearch.watcher.support.Variables;
-import org.elasticsearch.watcher.support.template.Template;
-import org.elasticsearch.watcher.support.template.XContentTemplate;
-import org.elasticsearch.watcher.transform.Transform;
-import org.elasticsearch.watcher.transform.TransformRegistry;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
-import org.elasticsearch.common.netty.handler.codec.http.HttpMethod;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.watcher.WatcherSettingsException;
+import org.elasticsearch.watcher.actions.Action;
+import org.elasticsearch.watcher.actions.ActionException;
+import org.elasticsearch.watcher.actions.ActionSettingsException;
+import org.elasticsearch.watcher.support.Script;
+import org.elasticsearch.watcher.support.Variables;
+import org.elasticsearch.watcher.support.http.HttpClient;
+import org.elasticsearch.watcher.support.http.HttpMethod;
+import org.elasticsearch.watcher.support.http.HttpResponse;
+import org.elasticsearch.watcher.support.template.Template;
+import org.elasticsearch.watcher.support.template.XContentTemplate;
+import org.elasticsearch.watcher.transform.Transform;
+import org.elasticsearch.watcher.transform.TransformRegistry;
+import org.elasticsearch.watcher.watch.Payload;
+import org.elasticsearch.watcher.watch.WatchExecutionContext;
import java.io.IOException;
import java.util.Locale;
@@ -62,17 +64,17 @@ public class WebhookAction extends Action {
String urlText = url.render(model);
String bodyText = body != null ? body.render(model) : XContentTemplate.YAML.render(model);
try {
-
- int status = httpClient.execute(method, urlText, bodyText);
- if (status >= 400) {
- logger.warn("got status [" + status + "] when connecting to [" + urlText + "]");
- } else {
- if (status >= 300) {
- logger.warn("a 200 range return code was expected, but got [" + status + "]");
+ try (HttpResponse response = httpClient.execute(method, urlText, bodyText)) {
+ int status = response.status();
+ if (status >= 400) {
+ logger.warn("got status [" + status + "] when connecting to [" + urlText + "]");
+ } else {
+ if (status >= 300) {
+ logger.warn("a 200 range return code was expected, but got [" + status + "]");
+ }
}
+ return new Result.Executed(status, urlText, bodyText);
}
- return new Result.Executed(status, urlText, bodyText);
-
} catch (IOException ioe) {
logger.error("failed to connect to [{}] for watch [{}]", ioe, urlText, ctx.watch().name());
return new Result.Failure("failed to send http request. " + ioe.getMessage());
@@ -87,7 +89,7 @@ public class WebhookAction extends Action {
.field(transform.type(), transform)
.endObject();
}
- builder.field(Parser.METHOD_FIELD.getPreferredName(), method.getName().toLowerCase(Locale.ROOT));
+ builder.field(Parser.METHOD_FIELD.getPreferredName(), method.method());
builder.field(Parser.URL_FIELD.getPreferredName(), url);
if (body != null) {
builder.field(Parser.BODY_FIELD.getPreferredName(), body);
@@ -222,7 +224,7 @@ public class WebhookAction extends Action {
if (METHOD_FIELD.match(currentFieldName)) {
method = HttpMethod.valueOf(parser.text().toUpperCase(Locale.ROOT));
if (method != HttpMethod.POST && method != HttpMethod.GET && method != HttpMethod.PUT) {
- throw new ActionSettingsException("could not parse webhook action. unsupported http method [" + method.getName() + "]");
+ throw new ActionSettingsException("could not parse webhook action. unsupported http method [" + method.method() + "]");
}
} else if (URL_FIELD.match(currentFieldName)) {
try {
@@ -339,7 +341,7 @@ public class WebhookAction extends Action {
builder.startObject();
builder.field(Parser.URL_FIELD.getPreferredName(), url);
if (method != null) {
- builder.field(Parser.METHOD_FIELD.getPreferredName(), method.getName().toLowerCase(Locale.ROOT));
+ builder.field(Parser.METHOD_FIELD.getPreferredName(), method.method());
}
if (body != null) {
builder.field(Parser.BODY_FIELD.getPreferredName(), body);
diff --git a/src/main/java/org/elasticsearch/watcher/input/InputBuilders.java b/src/main/java/org/elasticsearch/watcher/input/InputBuilders.java
index fcb24bd7ba8..6dda2287f25 100644
--- a/src/main/java/org/elasticsearch/watcher/input/InputBuilders.java
+++ b/src/main/java/org/elasticsearch/watcher/input/InputBuilders.java
@@ -7,6 +7,7 @@ package org.elasticsearch.watcher.input;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.watcher.input.http.HttpInput;
import org.elasticsearch.watcher.input.search.SearchInput;
import org.elasticsearch.watcher.input.simple.SimpleInput;
@@ -36,4 +37,8 @@ public final class InputBuilders {
public static SimpleInput.SourceBuilder simpleInput(Map data) {
return new SimpleInput.SourceBuilder(data);
}
+
+ public static HttpInput.SourceBuilder httpInput() {
+ return new HttpInput.SourceBuilder();
+ }
}
diff --git a/src/main/java/org/elasticsearch/watcher/input/InputModule.java b/src/main/java/org/elasticsearch/watcher/input/InputModule.java
index 6594dad5ae1..884ec84490e 100644
--- a/src/main/java/org/elasticsearch/watcher/input/InputModule.java
+++ b/src/main/java/org/elasticsearch/watcher/input/InputModule.java
@@ -5,10 +5,11 @@
*/
package org.elasticsearch.watcher.input;
-import org.elasticsearch.watcher.input.search.SearchInput;
-import org.elasticsearch.watcher.input.simple.SimpleInput;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.MapBinder;
+import org.elasticsearch.watcher.input.http.HttpInput;
+import org.elasticsearch.watcher.input.search.SearchInput;
+import org.elasticsearch.watcher.input.simple.SimpleInput;
import java.util.HashMap;
import java.util.Map;
@@ -26,12 +27,13 @@ public class InputModule extends AbstractModule {
@Override
protected void configure() {
-
MapBinder parsersBinder = MapBinder.newMapBinder(binder(), String.class, Input.Parser.class);
bind(SearchInput.Parser.class).asEagerSingleton();
parsersBinder.addBinding(SearchInput.TYPE).to(SearchInput.Parser.class);
bind(SimpleInput.Parser.class).asEagerSingleton();
parsersBinder.addBinding(SimpleInput.TYPE).to(SimpleInput.Parser.class);
+ bind(HttpInput.Parser.class).asEagerSingleton();
+ parsersBinder.addBinding(HttpInput.TYPE).to(HttpInput.Parser.class);
for (Map.Entry> entry : parsers.entrySet()) {
bind(entry.getValue()).asEagerSingleton();
diff --git a/src/main/java/org/elasticsearch/watcher/input/http/HttpInput.java b/src/main/java/org/elasticsearch/watcher/input/http/HttpInput.java
new file mode 100644
index 00000000000..ffbbb7afe28
--- /dev/null
+++ b/src/main/java/org/elasticsearch/watcher/input/http/HttpInput.java
@@ -0,0 +1,251 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.watcher.input.http;
+
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.collect.Tuple;
+import org.elasticsearch.common.component.AbstractComponent;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.logging.ESLogger;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentHelper;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.watcher.input.Input;
+import org.elasticsearch.watcher.support.Variables;
+import org.elasticsearch.watcher.support.http.HttpClient;
+import org.elasticsearch.watcher.support.http.HttpRequest;
+import org.elasticsearch.watcher.support.http.HttpResponse;
+import org.elasticsearch.watcher.support.http.TemplatedHttpRequest;
+import org.elasticsearch.watcher.support.http.auth.HttpAuth;
+import org.elasticsearch.watcher.support.template.Template;
+import org.elasticsearch.watcher.watch.Payload;
+import org.elasticsearch.watcher.watch.WatchExecutionContext;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ */
+public class HttpInput extends Input {
+
+ public static final String TYPE = "http";
+
+ private final HttpClient client;
+ private final TemplatedHttpRequest request;
+
+ public HttpInput(ESLogger logger, HttpClient client, TemplatedHttpRequest request) {
+ super(logger);
+ this.request = request;
+ this.client = client;
+ }
+
+ @Override
+ public String type() {
+ return TYPE;
+ }
+
+ @Override
+ public Result execute(WatchExecutionContext ctx) throws IOException {
+ Map model = Variables.createCtxModel(ctx, null);
+ HttpRequest httpRequest = request.render(model);
+ try (HttpResponse response = client.execute(httpRequest)) {
+ Tuple> result = XContentHelper.convertToMap(response.body(), true);
+ return new Result(TYPE, new Payload.Simple(result.v2()), httpRequest, response.status());
+ }
+ }
+
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+ return request.toXContent(builder, params);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ HttpInput httpInput = (HttpInput) o;
+
+ if (!request.equals(httpInput.request)) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return request.hashCode();
+ }
+
+ TemplatedHttpRequest getRequest() {
+ return request;
+ }
+
+ public final static class Result extends Input.Result {
+
+ private final HttpRequest request;
+ private final int statusCode;
+
+ public Result(String type, Payload payload, HttpRequest request, int statusCode) {
+ super(type, payload);
+ this.request = request;
+ this.statusCode = statusCode;
+ }
+ @Override
+ protected XContentBuilder toXContentBody(XContentBuilder builder, Params params) throws IOException {
+ builder.field(Parser.HTTP_STATUS_FIELD.getPreferredName(), statusCode);
+ builder.field(Parser.REQUEST_FIELD.getPreferredName(), request);
+ return builder;
+ }
+
+ HttpRequest request() {
+ return request;
+ }
+
+ int statusCode() {
+ return statusCode;
+ }
+ }
+
+ public final static class Parser extends AbstractComponent implements Input.Parser {
+
+ public static final ParseField REQUEST_FIELD = new ParseField("request");
+ public static final ParseField HTTP_STATUS_FIELD = new ParseField("http_status");
+
+ private final HttpClient client;
+ private final HttpRequest.Parser requestParser;
+ private final TemplatedHttpRequest.Parser templatedRequestParser;
+
+ @Inject
+ public Parser(Settings settings, HttpClient client, HttpRequest.Parser requestParser, TemplatedHttpRequest.Parser templatedRequestParser) {
+ super(settings);
+ this.client = client;
+ this.requestParser = requestParser;
+ this.templatedRequestParser = templatedRequestParser;
+ }
+
+ @Override
+ public String type() {
+ return TYPE;
+ }
+
+ @Override
+ public HttpInput parse(XContentParser parser) throws IOException {
+ TemplatedHttpRequest request = templatedRequestParser.parse(parser);
+ return new HttpInput(logger, client, request);
+ }
+
+ @Override
+ public Result parseResult(XContentParser parser) throws IOException {
+ Payload payload = null;
+ HttpRequest request = null;
+ int statusCode = -1;
+
+ XContentParser.Token token;
+ String currentFieldName = null;
+ while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
+ if (token == XContentParser.Token.FIELD_NAME) {
+ currentFieldName = parser.currentName();
+ } else if (token == XContentParser.Token.START_OBJECT) {
+ if (Input.Result.PAYLOAD_FIELD.match(currentFieldName)) {
+ payload = new Payload.XContent(parser);
+ } else if (REQUEST_FIELD.match(currentFieldName)) {
+ request = requestParser.parse(parser);
+ }
+ } else if (token == XContentParser.Token.VALUE_NUMBER) {
+ statusCode = parser.intValue();
+ }
+ }
+ return new Result(TYPE, payload, request, statusCode);
+ }
+
+ }
+
+ public final static class SourceBuilder implements Input.SourceBuilder {
+
+ private String host;
+ private int port;
+ private String method;
+ private Template path;
+ private Map params;
+ private Map headers;
+ private HttpAuth auth;
+ private Template body;
+
+ public SourceBuilder setHost(String host) {
+ this.host = host;
+ return this;
+ }
+
+ public SourceBuilder setPort(int port) {
+ this.port = port;
+ return this;
+ }
+
+ public SourceBuilder setMethod(String method) {
+ this.method = method;
+ return this;
+ }
+
+ public SourceBuilder setPath(Template path) {
+ this.path = path;
+ return this;
+ }
+
+ public SourceBuilder setParams(Map params) {
+ this.params = params;
+ return this;
+ }
+
+ public SourceBuilder setHeaders(Map headers) {
+ this.headers = headers;
+ return this;
+ }
+
+ public SourceBuilder setAuth(HttpAuth auth) {
+ this.auth = auth;
+ return this;
+ }
+
+ public SourceBuilder setBody(Template body) {
+ this.body = body;
+ return this;
+ }
+
+ @Override
+ public String type() {
+ return TYPE;
+ }
+
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, Params p) throws IOException {
+ builder.startObject();
+ builder.field(HttpRequest.Parser.HOST_FIELD.getPreferredName(), host);
+ builder.field(HttpRequest.Parser.PORT_FIELD.getPreferredName(), port);
+ if (method != null) {
+ builder.field(HttpRequest.Parser.METHOD_FIELD.getPreferredName(), method);
+ }
+ if (path != null) {
+ builder.field(HttpRequest.Parser.PATH_FIELD.getPreferredName(), path);
+ }
+ if (params != null) {
+ builder.field(HttpRequest.Parser.PARAMS_FIELD.getPreferredName(), params);
+ }
+ if (headers != null) {
+ builder.field(HttpRequest.Parser.HEADERS_FIELD.getPreferredName(), headers);
+ }
+ if (auth != null) {
+ builder.field(HttpRequest.Parser.AUTH_FIELD.getPreferredName(), auth);
+ }
+ if (body != null) {
+ builder.field(HttpRequest.Parser.BODY_FIELD.getPreferredName(), body);
+ }
+ return builder.endObject();
+ }
+ }
+
+}
diff --git a/src/main/java/org/elasticsearch/watcher/support/http/HttpClient.java b/src/main/java/org/elasticsearch/watcher/support/http/HttpClient.java
new file mode 100644
index 00000000000..c511050b771
--- /dev/null
+++ b/src/main/java/org/elasticsearch/watcher/support/http/HttpClient.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.watcher.support.http;
+
+import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.common.base.Charsets;
+import org.elasticsearch.common.component.AbstractComponent;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.settings.Settings;
+
+import java.io.IOException;
+import java.net.*;
+import java.util.Map;
+
+/**
+ * Client class to wrap http connections
+ */
+public class HttpClient extends AbstractComponent {
+
+ @Inject
+ public HttpClient(Settings settings) {
+ super(settings);
+ }
+
+ // TODO: Remove this when webhook action has been refactored to use this client properly
+ public HttpResponse execute(HttpMethod method, String urlString, String body) throws IOException {
+ URL url = new URL(urlString);
+ HttpRequest request = new HttpRequest();
+ request.method(method);
+ return doExecute(url, request);
+ }
+
+ public HttpResponse execute(HttpRequest request) throws IOException {
+ String queryString = null;
+ if (request.params() != null) {
+ StringBuilder builder = new StringBuilder();
+ for (Map.Entry entry : request.params().entrySet()) {
+ if (builder.length() != 0) {
+ builder.append('&');
+ }
+ builder.append(URLEncoder.encode(entry.getKey(), "utf-8"))
+ .append('=')
+ .append(URLEncoder.encode(entry.getValue(), "utf-8"));
+ }
+ queryString = builder.toString();
+ }
+
+ URI uri;
+ try {
+ uri = new URI("http", null, request.host(), request.port(), request.path(), queryString, null);
+ } catch (URISyntaxException e) {
+ throw ExceptionsHelper.convertToElastic(e);
+ }
+ URL url = uri.toURL();
+ return doExecute(url, request);
+ }
+
+ // TODO: Embed this method in execute() when webhook action has been refactored to use this client properly
+ private HttpResponse doExecute(URL url, HttpRequest request) throws IOException {
+ logger.debug("making [{}] request to [{}]", request.method().method(), url);
+ logger.trace("sending [{}] as body of request", request.body());
+ HttpURLConnection urlConnection = (HttpURLConnection) url.openConnection();
+ urlConnection.setRequestMethod(request.method().method());
+ if (request.headers() != null) {
+ for (Map.Entry entry : request.headers().entrySet()) {
+ urlConnection.setRequestProperty(entry.getKey(), entry.getValue());
+ }
+ }
+ if (request.auth() != null) {
+ request.auth().update(urlConnection);
+ }
+ urlConnection.setUseCaches(false);
+ urlConnection.setRequestProperty("Accept-Charset", Charsets.UTF_8.name());
+ if (request.body() != null) {
+ urlConnection.setDoOutput(true);
+ byte[] bytes = request.body().getBytes(Charsets.UTF_8.name());
+ urlConnection.setRequestProperty("Content-Length", String.valueOf(bytes.length));
+ urlConnection.getOutputStream().write(bytes);
+ urlConnection.getOutputStream().close();
+ }
+
+ HttpResponse response = new HttpResponse();
+ response.inputStream(urlConnection.getInputStream());
+ response.status(urlConnection.getResponseCode());
+ logger.debug("http status code: {}", response.status());
+ return response;
+ }
+
+}
diff --git a/src/main/java/org/elasticsearch/watcher/support/http/HttpClientModule.java b/src/main/java/org/elasticsearch/watcher/support/http/HttpClientModule.java
new file mode 100644
index 00000000000..20cc417e02d
--- /dev/null
+++ b/src/main/java/org/elasticsearch/watcher/support/http/HttpClientModule.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.watcher.support.http;
+
+import org.elasticsearch.common.collect.ImmutableList;
+import org.elasticsearch.common.inject.AbstractModule;
+import org.elasticsearch.common.inject.Module;
+import org.elasticsearch.common.inject.SpawnModules;
+import org.elasticsearch.watcher.support.http.auth.AuthModule;
+
+
+/**
+ */
+public class HttpClientModule extends AbstractModule implements SpawnModules {
+
+ @Override
+ public Iterable extends Module> spawnModules() {
+ return ImmutableList.of(new AuthModule());
+ }
+
+ @Override
+ protected void configure() {
+ bind(TemplatedHttpRequest.Parser.class).asEagerSingleton();
+ bind(HttpRequest.Parser.class).asEagerSingleton();
+ bind(HttpClient.class).asEagerSingleton();
+ }
+
+}
diff --git a/src/main/java/org/elasticsearch/watcher/support/http/HttpMethod.java b/src/main/java/org/elasticsearch/watcher/support/http/HttpMethod.java
new file mode 100644
index 00000000000..96e437caa32
--- /dev/null
+++ b/src/main/java/org/elasticsearch/watcher/support/http/HttpMethod.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.watcher.support.http;
+
+import org.elasticsearch.ElasticsearchIllegalArgumentException;
+
+import java.util.Locale;
+
+/**
+ */
+public enum HttpMethod {
+
+ HEAD("HEAD"),
+ GET("GET"),
+ POST("POST"),
+ PUT("PUT"),
+ DELETE("DELETE");
+
+ private final String method;
+
+ HttpMethod(String method) {
+ this.method = method;
+ }
+
+ public String method() {
+ return method;
+ }
+
+ public static HttpMethod parse(String value) {
+ value = value.toUpperCase(Locale.ROOT);
+ switch (value) {
+ case "HEAD":
+ return HEAD;
+ case "GET":
+ return GET;
+ case "POST":
+ return POST;
+ case "PUT":
+ return PUT;
+ case "DELETE":
+ return DELETE;
+ default:
+ throw new ElasticsearchIllegalArgumentException("unsupported http method [" + value + "]");
+ }
+ }
+}
diff --git a/src/main/java/org/elasticsearch/watcher/support/http/HttpRequest.java b/src/main/java/org/elasticsearch/watcher/support/http/HttpRequest.java
new file mode 100644
index 00000000000..a4a0f013e68
--- /dev/null
+++ b/src/main/java/org/elasticsearch/watcher/support/http/HttpRequest.java
@@ -0,0 +1,220 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.watcher.support.http;
+
+import org.elasticsearch.ElasticsearchParseException;
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.watcher.support.WatcherUtils;
+import org.elasticsearch.watcher.support.http.auth.HttpAuth;
+import org.elasticsearch.watcher.support.http.auth.HttpAuthRegistry;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class HttpRequest implements ToXContent {
+
+ private String host;
+ private int port;
+ private HttpMethod method;
+ private String path;
+ private Map params;
+ private Map headers;
+ private HttpAuth auth;
+ private String body;
+
+ public HttpRequest() {
+ method = HttpMethod.GET;
+ }
+
+ public String host() {
+ return host;
+ }
+
+ public void host(String host) {
+ this.host = host;
+ }
+
+ public int port() {
+ return port;
+ }
+
+ public void port(int port) {
+ this.port = port;
+ }
+
+ public HttpMethod method() {
+ return method;
+ }
+
+ public void method(HttpMethod method) {
+ this.method = method;
+ }
+
+ public String path() {
+ return path;
+ }
+
+ public void path(String path) {
+ this.path = path;
+ }
+
+ public Map params() {
+ return params;
+ }
+
+ public void params(Map params) {
+ this.params = params;
+ }
+
+ public Map headers() {
+ return headers;
+ }
+
+ public void headers(Map headers) {
+ this.headers = headers;
+ }
+
+ public HttpAuth auth() {
+ return auth;
+ }
+
+ public void auth(HttpAuth auth) {
+ this.auth = auth;
+ }
+
+ public String body() {
+ return body;
+ }
+
+ public void body(String body) {
+ this.body = body;
+ }
+
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
+ builder.startObject();
+ builder.field(Parser.HOST_FIELD.getPreferredName(), host);
+ builder.field(Parser.PORT_FIELD.getPreferredName(), port);
+ builder.field(Parser.METHOD_FIELD.getPreferredName(), method);
+ if (path != null) {
+ builder.field(Parser.PATH_FIELD.getPreferredName(), path);
+ }
+ if (this.params != null) {
+ builder.startObject(Parser.PARAMS_FIELD.getPreferredName()).value(this.params).endObject();
+ }
+ if (headers != null) {
+ builder.startObject(Parser.HEADERS_FIELD.getPreferredName()).value(headers).endObject();
+ }
+ if (auth != null) {
+ builder.field(Parser.AUTH_FIELD.getPreferredName(), auth);
+ }
+ if (body != null) {
+ builder.field(Parser.BODY_FIELD.getPreferredName(), body);
+ }
+ return builder.endObject();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ HttpRequest request = (HttpRequest) o;
+
+ if (port != request.port) return false;
+ if (auth != null ? !auth.equals(request.auth) : request.auth != null) return false;
+ if (body != null ? !body.equals(request.body) : request.body != null) return false;
+ if (headers != null ? !headers.equals(request.headers) : request.headers != null) return false;
+ if (!host.equals(request.host)) return false;
+ if (method != request.method) return false;
+ if (params != null ? !params.equals(request.params) : request.params != null) return false;
+ if (path != null ? !path.equals(request.path) : request.path != null) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = host.hashCode();
+ result = 31 * result + port;
+ result = 31 * result + method.hashCode();
+ result = 31 * result + (path != null ? path.hashCode() : 0);
+ result = 31 * result + (params != null ? params.hashCode() : 0);
+ result = 31 * result + (headers != null ? headers.hashCode() : 0);
+ result = 31 * result + (auth != null ? auth.hashCode() : 0);
+ result = 31 * result + (body != null ? body.hashCode() : 0);
+ return result;
+ }
+
+ public static class Parser {
+
+ public static final ParseField HOST_FIELD = new ParseField("host");
+ public static final ParseField PORT_FIELD = new ParseField("port");
+ public static final ParseField METHOD_FIELD = new ParseField("method");
+ public static final ParseField PATH_FIELD = new ParseField("path");
+ public static final ParseField PARAMS_FIELD = new ParseField("params");
+ public static final ParseField HEADERS_FIELD = new ParseField("headers");
+ public static final ParseField AUTH_FIELD = new ParseField("auth");
+ public static final ParseField BODY_FIELD = new ParseField("body");
+
+ private final HttpAuthRegistry httpAuthRegistry;
+
+ @Inject
+ public Parser(HttpAuthRegistry httpAuthRegistry) {
+ this.httpAuthRegistry = httpAuthRegistry;
+ }
+
+ public HttpRequest parse(XContentParser parser) throws IOException {
+ HttpRequest request = new HttpRequest();
+ XContentParser.Token token;
+ String currentFieldName = null;
+ while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
+ if (token == XContentParser.Token.FIELD_NAME) {
+ currentFieldName = parser.currentName();
+ } else if (token == XContentParser.Token.START_OBJECT) {
+ if (HEADERS_FIELD.match(currentFieldName)) {
+ request.headers((Map) WatcherUtils.flattenModel(parser.map()));
+ } else if (PARAMS_FIELD.match(currentFieldName)) {
+ request.params((Map) WatcherUtils.flattenModel(parser.map()));
+ } else if (AUTH_FIELD.match(currentFieldName)) {
+ request.auth(httpAuthRegistry.parse(parser));
+ } else if (BODY_FIELD.match(currentFieldName)) {
+ request.body(parser.text());
+ } else {
+ throw new ElasticsearchParseException("could not parse http request. unexpected field [" + currentFieldName + "]");
+ }
+ } else if (token == XContentParser.Token.VALUE_STRING) {
+ if (METHOD_FIELD.match(currentFieldName)) {
+ request.method(HttpMethod.parse(parser.text()));
+ } else if (HOST_FIELD.match(currentFieldName)) {
+ request.host(parser.text());
+ } else if (PATH_FIELD.match(currentFieldName)) {
+ request.path(parser.text());
+ } else if (BODY_FIELD.match(currentFieldName)) {
+ request.body(parser.text());
+ } else {
+ throw new ElasticsearchParseException("could not parse http request. unexpected field [" + currentFieldName + "]");
+ }
+ } else if (token == XContentParser.Token.VALUE_NUMBER) {
+ if (PORT_FIELD.match(currentFieldName)) {
+ request.port(parser.intValue());
+ } else {
+ throw new ElasticsearchParseException("could not parse http request. unexpected field [" + currentFieldName + "]");
+ }
+ } else {
+ throw new ElasticsearchParseException("could not parse http request. unexpected token [" + token + "]");
+ }
+ }
+ return request;
+ }
+
+ }
+
+}
diff --git a/src/main/java/org/elasticsearch/watcher/support/http/HttpResponse.java b/src/main/java/org/elasticsearch/watcher/support/http/HttpResponse.java
new file mode 100644
index 00000000000..baa7d326556
--- /dev/null
+++ b/src/main/java/org/elasticsearch/watcher/support/http/HttpResponse.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.watcher.support.http;
+
+import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.common.io.ByteStreams;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+
+public class HttpResponse implements Closeable {
+
+ private int status;
+ private InputStream inputStream;
+ private byte[] body;
+
+ public int status() {
+ return status;
+ }
+
+ public void status(int status) {
+ this.status = status;
+ }
+
+ public byte[] body() {
+ if (body == null) {
+ try {
+ body = ByteStreams.toByteArray(inputStream);
+ inputStream.close();
+ } catch (IOException e) {
+ throw ExceptionsHelper.convertToElastic(e);
+ }
+ }
+ return body;
+ }
+
+ public void inputStream(InputStream inputStream) {
+ this.inputStream = inputStream;
+ }
+
+ @Override
+ public void close() throws IOException {
+ inputStream.close();
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/org/elasticsearch/watcher/support/http/TemplatedHttpRequest.java b/src/main/java/org/elasticsearch/watcher/support/http/TemplatedHttpRequest.java
new file mode 100644
index 00000000000..7cec7bdb843
--- /dev/null
+++ b/src/main/java/org/elasticsearch/watcher/support/http/TemplatedHttpRequest.java
@@ -0,0 +1,243 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.watcher.support.http;
+
+import org.elasticsearch.ElasticsearchParseException;
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.collect.MapBuilder;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.watcher.support.http.auth.HttpAuth;
+import org.elasticsearch.watcher.support.http.auth.HttpAuthRegistry;
+import org.elasticsearch.watcher.support.template.Template;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ */
+public class TemplatedHttpRequest implements ToXContent {
+
+ private String host;
+ private int port;
+ private HttpMethod method;
+ private Template path;
+ private Map params;
+ private Map headers;
+ private HttpAuth auth;
+ private Template body;
+
+ public TemplatedHttpRequest() {
+ method = HttpMethod.GET;
+ }
+
+ public String host() {
+ return host;
+ }
+
+ public void host(String host) {
+ this.host = host;
+ }
+
+ public int port() {
+ return port;
+ }
+
+ public void port(int port) {
+ this.port = port;
+ }
+
+ public HttpMethod method() {
+ return method;
+ }
+
+ public void method(HttpMethod method) {
+ this.method = method;
+ }
+
+ public Template path() {
+ return path;
+ }
+
+ public void path(Template path) {
+ this.path = path;
+ }
+
+ public Map params() {
+ return params;
+ }
+
+ public void params(Map params) {
+ this.params = params;
+ }
+
+ public Map headers() {
+ return headers;
+ }
+
+ public void headers(Map headers) {
+ this.headers = headers;
+ }
+
+ public HttpAuth auth() {
+ return auth;
+ }
+
+ public void auth(HttpAuth auth) {
+ this.auth = auth;
+ }
+
+ public Template body() {
+ return body;
+ }
+
+ public void body(Template body) {
+ this.body = body;
+ }
+
+ public HttpRequest render(Map model) {
+ HttpRequest copy = new HttpRequest();
+ copy.host(host);
+ copy.port(port);
+ copy.method(method);
+ if (path != null) {
+ copy.path(path.render(model));
+ }
+ if (params != null) {
+ MapBuilder mapBuilder = MapBuilder.newMapBuilder();
+ for (Map.Entry entry : params.entrySet()) {
+ mapBuilder.put(entry.getKey(), entry.getValue().render(model));
+ }
+ copy.params(mapBuilder.map());
+ }
+ if (headers != null) {
+ MapBuilder mapBuilder = MapBuilder.newMapBuilder();
+ for (Map.Entry entry : headers.entrySet()) {
+ mapBuilder.put(entry.getKey(), entry.getValue().render(model));
+ }
+ copy.headers(mapBuilder.map());
+ }
+ copy.auth(auth);
+ if (body != null) {
+ copy.body(body.render(model));
+ }
+ return copy;
+ }
+
+ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
+ builder.startObject();
+ builder.field(Parser.HOST_FIELD.getPreferredName(), host);
+ builder.field(Parser.PORT_FIELD.getPreferredName(), port);
+ builder.field(Parser.METHOD_FIELD.getPreferredName(), method);
+ if (path != null) {
+ builder.field(Parser.PATH_FIELD.getPreferredName(), path);
+ }
+ if (this.params != null) {
+ builder.startObject(Parser.PARAMS_FIELD.getPreferredName()).value(this.params).endObject();
+ }
+ if (headers != null) {
+ builder.startObject(Parser.HEADERS_FIELD.getPreferredName()).value(headers).endObject();
+ }
+ if (auth != null) {
+ builder.field(Parser.AUTH_FIELD.getPreferredName(), auth);
+ }
+ if (body != null) {
+ builder.field(Parser.BODY_FIELD.getPreferredName(), body);
+ }
+ return builder.endObject();
+ }
+
+ public static class Parser {
+
+ public static final ParseField HOST_FIELD = new ParseField("host");
+ public static final ParseField PORT_FIELD = new ParseField("port");
+ public static final ParseField METHOD_FIELD = new ParseField("method");
+ public static final ParseField PATH_FIELD = new ParseField("path");
+ public static final ParseField PARAMS_FIELD = new ParseField("params");
+ public static final ParseField HEADERS_FIELD = new ParseField("headers");
+ public static final ParseField AUTH_FIELD = new ParseField("auth");
+ public static final ParseField BODY_FIELD = new ParseField("body");
+
+ private final Template.Parser templateParser;
+ private final HttpAuthRegistry httpAuthRegistry;
+
+ @Inject
+ public Parser(Template.Parser templateParser, HttpAuthRegistry httpAuthRegistry) {
+ this.templateParser = templateParser;
+ this.httpAuthRegistry = httpAuthRegistry;
+ }
+
+ public TemplatedHttpRequest parse(XContentParser parser) throws IOException {
+ TemplatedHttpRequest request = new TemplatedHttpRequest();
+ XContentParser.Token token;
+ String currentFieldName = null;
+ while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
+ if (token == XContentParser.Token.FIELD_NAME) {
+ currentFieldName = parser.currentName();
+ } else if (token == XContentParser.Token.START_OBJECT) {
+ if (PATH_FIELD.match(currentFieldName)) {
+ request.path(templateParser.parse(parser));
+ } else if (HEADERS_FIELD.match(currentFieldName)) {
+ request.headers(parseTemplates(parser));
+ } else if (PARAMS_FIELD.match(currentFieldName)) {
+ request.params(parseTemplates(parser));
+ } else if (AUTH_FIELD.match(currentFieldName)) {
+ request.auth(httpAuthRegistry.parse(parser));
+ } else if (BODY_FIELD.match(currentFieldName)) {
+ request.body(templateParser.parse(parser));
+ } else {
+ throw new ElasticsearchParseException("could not parse templated http request. unexpected field [" + currentFieldName + "]");
+ }
+ } else if (token == XContentParser.Token.VALUE_STRING) {
+ if (METHOD_FIELD.match(currentFieldName)) {
+ request.method(HttpMethod.parse(parser.text()));
+ } else if (HOST_FIELD.match(currentFieldName)) {
+ request.host(parser.text());
+ } else if (PATH_FIELD.match(currentFieldName)) {
+ request.path(templateParser.parse(parser));
+ } else if (BODY_FIELD.match(currentFieldName)) {
+ request.body(templateParser.parse(parser));
+ } else {
+ throw new ElasticsearchParseException("could not parse templated http request. unexpected field [" + currentFieldName + "]");
+ }
+ } else if (token == XContentParser.Token.VALUE_NUMBER) {
+ if (PORT_FIELD.match(currentFieldName)) {
+ request.port(parser.intValue());
+ } else {
+ throw new ElasticsearchParseException("could not parse templated http request. unexpected field [" + currentFieldName + "]");
+ }
+ } else {
+ throw new ElasticsearchParseException("could not parse templated http request. unexpected token [" + token + "]");
+ }
+ }
+ return request;
+ }
+
+ private Map parseTemplates(XContentParser parser) throws IOException {
+ Map templates = new HashMap<>();
+ String currentFieldName = null;
+ for (XContentParser.Token token = parser.nextToken(); token != XContentParser.Token.END_OBJECT; token = parser.nextToken()) {
+ switch (token) {
+ case FIELD_NAME:
+ currentFieldName = parser.currentName();
+ break;
+ case VALUE_STRING:
+ case START_OBJECT:
+ templates.put(currentFieldName, templateParser.parse(parser));
+ break;
+ default:
+ throw new ElasticsearchParseException("could not parse templated http request. unexpected token [" + token + "]");
+ }
+ }
+ return templates;
+ }
+
+ }
+
+}
diff --git a/src/main/java/org/elasticsearch/watcher/support/http/auth/AuthModule.java b/src/main/java/org/elasticsearch/watcher/support/http/auth/AuthModule.java
new file mode 100644
index 00000000000..1120e0ce56d
--- /dev/null
+++ b/src/main/java/org/elasticsearch/watcher/support/http/auth/AuthModule.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.watcher.support.http.auth;
+
+import org.elasticsearch.common.inject.AbstractModule;
+import org.elasticsearch.common.inject.multibindings.MapBinder;
+
+/**
+ */
+public class AuthModule extends AbstractModule {
+
+ @Override
+ protected void configure() {
+ MapBinder parsersBinder = MapBinder.newMapBinder(binder(), String.class, HttpAuth.Parser.class);
+ bind(BasicAuth.Parser.class).asEagerSingleton();
+ parsersBinder.addBinding(BasicAuth.TYPE).to(BasicAuth.Parser.class);
+ bind(HttpAuthRegistry.class).asEagerSingleton();
+ }
+}
diff --git a/src/main/java/org/elasticsearch/watcher/support/http/auth/BasicAuth.java b/src/main/java/org/elasticsearch/watcher/support/http/auth/BasicAuth.java
new file mode 100644
index 00000000000..76952283742
--- /dev/null
+++ b/src/main/java/org/elasticsearch/watcher/support/http/auth/BasicAuth.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.watcher.support.http.auth;
+
+import org.elasticsearch.ElasticsearchParseException;
+import org.elasticsearch.common.Base64;
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentParser;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.HttpURLConnection;
+import java.util.Locale;
+
+/**
+ */
+public class BasicAuth extends HttpAuth {
+
+ public static final String TYPE = "basic";
+
+ private final String username;
+ private final String password;
+
+ private final String basicAuth;
+
+ public BasicAuth(String username, String password) throws UnsupportedEncodingException {
+ this.username = username;
+ this.password = password;
+ basicAuth = "Basic " + Base64.encodeBytes(String.format(Locale.ROOT, "%s:%s", username, password).getBytes("utf-8"));
+ }
+
+ public String type() {
+ return TYPE;
+ }
+
+ @Override
+ public XContentBuilder innerToXContent(XContentBuilder builder, Params params) throws IOException {
+ builder.startObject();
+ builder.field(Parser.USERNAME_FIELD.getPreferredName(), username);
+ builder.field(Parser.PASSWORD_FIELD.getPreferredName(), password);
+ return builder.endObject();
+ }
+
+ public void update(HttpURLConnection connection) {
+ connection.setRequestProperty("Authorization", basicAuth);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ BasicAuth basicAuth = (BasicAuth) o;
+
+ if (!password.equals(basicAuth.password)) return false;
+ if (!username.equals(basicAuth.username)) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = username.hashCode();
+ result = 31 * result + password.hashCode();
+ return result;
+ }
+
+ public static class Parser implements HttpAuth.Parser {
+
+ static final ParseField USERNAME_FIELD = new ParseField("username");
+ static final ParseField PASSWORD_FIELD = new ParseField("password");
+
+ public String type() {
+ return TYPE;
+ }
+
+ public BasicAuth parse(XContentParser parser) throws IOException {
+ String username = null;
+ String password = null;
+
+ String fieldName = null;
+ XContentParser.Token token;
+ while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
+ if (token == XContentParser.Token.FIELD_NAME) {
+ fieldName = parser.currentName();
+ } else if (token == XContentParser.Token.VALUE_STRING) {
+ if (USERNAME_FIELD.getPreferredName().equals(fieldName)) {
+ username = parser.text();
+ } else if (PASSWORD_FIELD.getPreferredName().equals(fieldName)) {
+ password = parser.text();
+ } else {
+ throw new ElasticsearchParseException("unsupported field [" + fieldName + "]");
+ }
+ } else {
+ throw new ElasticsearchParseException("unsupported token [" + token + "]");
+ }
+ }
+
+ if (username == null) {
+ throw new HttpAuthException("username is a required option");
+ }
+ if (password == null) {
+ throw new HttpAuthException("password is a required option");
+ }
+
+ return new BasicAuth(username, password);
+ }
+ }
+}
diff --git a/src/main/java/org/elasticsearch/watcher/support/http/auth/HttpAuth.java b/src/main/java/org/elasticsearch/watcher/support/http/auth/HttpAuth.java
new file mode 100644
index 00000000000..61fb1f95093
--- /dev/null
+++ b/src/main/java/org/elasticsearch/watcher/support/http/auth/HttpAuth.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.watcher.support.http.auth;
+
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentParser;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+
+public abstract class HttpAuth implements ToXContent {
+
+ public abstract String type();
+
+ public abstract void update(HttpURLConnection connection);
+
+ @Override
+ public final XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+ builder.startObject();
+ builder.field(type());
+ builder = innerToXContent(builder, params);
+ return builder.endObject();
+ }
+
+ public abstract XContentBuilder innerToXContent(XContentBuilder builder, Params params) throws IOException;
+
+ public static interface Parser {
+
+ String type();
+
+ Auth parse(XContentParser parser) throws IOException;
+
+ }
+
+}
diff --git a/src/main/java/org/elasticsearch/watcher/support/http/auth/HttpAuthException.java b/src/main/java/org/elasticsearch/watcher/support/http/auth/HttpAuthException.java
new file mode 100644
index 00000000000..a157e50790b
--- /dev/null
+++ b/src/main/java/org/elasticsearch/watcher/support/http/auth/HttpAuthException.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.watcher.support.http.auth;
+
+import org.elasticsearch.watcher.WatcherException;
+
+/**
+ */
+public class HttpAuthException extends WatcherException {
+
+ public HttpAuthException(String msg) {
+ super(msg);
+ }
+
+ public HttpAuthException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/org/elasticsearch/watcher/support/http/auth/HttpAuthRegistry.java b/src/main/java/org/elasticsearch/watcher/support/http/auth/HttpAuthRegistry.java
new file mode 100644
index 00000000000..80f852b669b
--- /dev/null
+++ b/src/main/java/org/elasticsearch/watcher/support/http/auth/HttpAuthRegistry.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.watcher.support.http.auth;
+
+import org.elasticsearch.common.collect.ImmutableMap;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.xcontent.XContentParser;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ *
+ */
+public class HttpAuthRegistry {
+
+ private final ImmutableMap parsers;
+
+ @Inject
+ public HttpAuthRegistry(Map parsers) {
+ this.parsers = ImmutableMap.copyOf(parsers);
+ }
+
+ public HttpAuth parse(XContentParser parser) throws IOException {
+ String type = null;
+ XContentParser.Token token;
+ HttpAuth auth = null;
+ while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
+ if (token == XContentParser.Token.FIELD_NAME) {
+ type = parser.currentName();
+ } else if (token == XContentParser.Token.START_OBJECT && type != null) {
+ HttpAuth.Parser inputParser = parsers.get(type);
+ if (inputParser == null) {
+ throw new HttpAuthException("unknown http auth type [" + type + "]");
+ }
+ auth = inputParser.parse(parser);
+ }
+ }
+ return auth;
+ }
+
+}
diff --git a/src/test/java/org/elasticsearch/watcher/input/http/HttpInputTests.java b/src/test/java/org/elasticsearch/watcher/input/http/HttpInputTests.java
new file mode 100644
index 00000000000..e21e9503d03
--- /dev/null
+++ b/src/test/java/org/elasticsearch/watcher/input/http/HttpInputTests.java
@@ -0,0 +1,243 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.watcher.input.http;
+
+import com.carrotsearch.ant.tasks.junit4.dependencies.com.google.common.collect.ImmutableMap;
+import com.carrotsearch.randomizedtesting.annotations.Repeat;
+import org.elasticsearch.ElasticsearchIllegalArgumentException;
+import org.elasticsearch.common.collect.MapBuilder;
+import org.elasticsearch.common.joda.time.DateTime;
+import org.elasticsearch.common.joda.time.DateTimeZone;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentHelper;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.test.ElasticsearchTestCase;
+import org.elasticsearch.watcher.actions.Action;
+import org.elasticsearch.watcher.actions.Actions;
+import org.elasticsearch.watcher.condition.simple.AlwaysTrueCondition;
+import org.elasticsearch.watcher.input.Input;
+import org.elasticsearch.watcher.input.simple.SimpleInput;
+import org.elasticsearch.watcher.support.clock.ClockMock;
+import org.elasticsearch.watcher.support.http.*;
+import org.elasticsearch.watcher.support.http.auth.BasicAuth;
+import org.elasticsearch.watcher.support.http.auth.HttpAuth;
+import org.elasticsearch.watcher.support.http.auth.HttpAuthRegistry;
+import org.elasticsearch.watcher.support.template.Template;
+import org.elasticsearch.watcher.trigger.schedule.IntervalSchedule;
+import org.elasticsearch.watcher.trigger.schedule.ScheduleTrigger;
+import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent;
+import org.elasticsearch.watcher.watch.Payload;
+import org.elasticsearch.watcher.watch.Watch;
+import org.elasticsearch.watcher.watch.WatchExecutionContext;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.nullValue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyMap;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ */
+public class HttpInputTests extends ElasticsearchTestCase {
+
+ private HttpClient httpClient;
+ private HttpInput.Parser httpParser;
+
+ @Before
+ public void init() throws Exception {
+ httpClient = mock(HttpClient.class);
+ Template.Parser templateParser = new MockTemplate.Parser();
+ HttpAuthRegistry registry = new HttpAuthRegistry(ImmutableMap.of("basic", new BasicAuth.Parser()));
+ httpParser = new HttpInput.Parser(
+ ImmutableSettings.EMPTY, httpClient, new HttpRequest.Parser(registry), new TemplatedHttpRequest.Parser(templateParser, registry)
+ );
+ }
+
+ @Test
+ public void testExecute() throws Exception {
+ String host = "_host";
+ int port = 123;
+ String body = "_body";
+ TemplatedHttpRequest request = new TemplatedHttpRequest();
+ request.method(HttpMethod.POST);
+ request.host(host);
+ request.port(port);
+ Template mockBody = mock(Template.class);
+ when(mockBody.render(anyMap())).thenReturn(body);
+ request.body(mockBody);
+ HttpInput input = new HttpInput(logger, httpClient, request);
+
+ HttpResponse response = new HttpResponse();
+ response.status(123);
+ response.inputStream(new ByteArrayInputStream("{\"key\" : \"value\"}".getBytes(UTF8)));
+ when(httpClient.execute(any(HttpRequest.class))).thenReturn(response);
+
+ Watch watch = new Watch("test-watch",
+ new ClockMock(),
+ new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.MINUTES))),
+ new SimpleInput(logger, new Payload.Simple()),
+ new AlwaysTrueCondition(logger),
+ null,
+ new Actions(new ArrayList()),
+ null,
+ null,
+ new Watch.Status());
+ WatchExecutionContext ctx = new WatchExecutionContext("test-watch1",
+ watch,
+ new DateTime(0, DateTimeZone.UTC),
+ new ScheduleTriggerEvent(new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC)));
+ HttpInput.Result result = input.execute(ctx);
+ assertThat(result.type(), equalTo(HttpInput.TYPE));
+ assertThat(result.payload().data(), equalTo(MapBuilder.newMapBuilder().put("key", "value").map()));
+ }
+
+ @Test
+ @Repeat(iterations = 12)
+ public void testParser() throws Exception {
+ final String httpMethod = randomFrom("PUT", "POST", "GET", "DELETE", "HEAD", null);
+ String host = randomAsciiOfLength(3);
+ int port = randomInt();
+ Template path = new MockTemplate(randomAsciiOfLength(3));
+ String body = randomBoolean() ? randomAsciiOfLength(3) : null;
+ Map params = randomBoolean() ? new MapBuilder().put("a", new MockTemplate("b")).map() : null;
+ Map headers = randomBoolean() ? new MapBuilder().put("c", new MockTemplate("d")).map() : null;
+ HttpAuth auth = randomBoolean() ? new BasicAuth("username", "password") : null;
+ HttpInput.SourceBuilder sourceBuilder = new HttpInput.SourceBuilder()
+ .setMethod(httpMethod)
+ .setHost(host)
+ .setPort(port)
+ .setPath(path)
+ .setBody(body != null ? new MockTemplate(body) : null)
+ .setParams(params)
+ .setHeaders(headers)
+ .setAuth(auth);
+ XContentParser parser = XContentHelper.createParser(jsonBuilder().value(sourceBuilder).bytes());
+ parser.nextToken();
+ HttpInput result = httpParser.parse(parser);
+
+ assertThat(result.type(), equalTo(HttpInput.TYPE));
+ assertThat(result.getRequest().method().method(), equalTo(httpMethod != null ? httpMethod : "GET")); // get is the default
+ assertThat(result.getRequest().host(), equalTo(host));
+ assertThat(result.getRequest().port(), equalTo(port));
+ assertThat(result.getRequest().path(), equalTo(path));
+ assertThat(result.getRequest().params(), equalTo(params));
+ assertThat(result.getRequest().headers(), equalTo(headers));
+ assertThat(result.getRequest().auth(), equalTo(auth));
+ if (body != null) {
+ assertThat(result.getRequest().body().render(Collections.emptyMap()), equalTo(body));
+ } else {
+ assertThat(result.getRequest().body(), nullValue());
+ }
+ }
+
+ @Test(expected = ElasticsearchIllegalArgumentException.class)
+ public void testParser_invalidHttpMethod() throws Exception {
+ Map headers = new MapBuilder().put("a", new MockTemplate("b")).map();
+ HttpInput.SourceBuilder sourceBuilder = new HttpInput.SourceBuilder()
+ .setMethod("_method")
+ .setHost("_host")
+ .setPort(123)
+ .setBody(new MockTemplate("_body"))
+ .setHeaders(headers);
+ XContentParser parser = XContentHelper.createParser(jsonBuilder().value(sourceBuilder).bytes());
+ parser.nextToken();
+ httpParser.parse(parser);
+ }
+
+ @Test
+ public void testParseResult() throws Exception {
+ String httpMethod = "get";
+ String body = "_body";
+ Map headers = new MapBuilder().put("a", new MockTemplate("b")).map();
+ HttpInput.SourceBuilder sourceBuilder = new HttpInput.SourceBuilder()
+ .setMethod(httpMethod)
+ .setHost("_host")
+ .setPort(123)
+ .setBody(new MockTemplate(body))
+ .setHeaders(headers);
+
+ Map payload = MapBuilder.newMapBuilder().put("x", "y").map();
+
+ XContentBuilder builder = jsonBuilder().startObject();
+ builder.field(HttpInput.Parser.HTTP_STATUS_FIELD.getPreferredName(), 123);
+ builder.field(HttpInput.Parser.REQUEST_FIELD.getPreferredName(), sourceBuilder);
+ builder.field(Input.Result.PAYLOAD_FIELD.getPreferredName(), payload);
+ builder.endObject();
+
+ XContentParser parser = XContentHelper.createParser(builder.bytes());
+ parser.nextToken();
+ HttpInput.Result result = httpParser.parseResult(parser);
+ assertThat(result.type(), equalTo(HttpInput.TYPE));
+ assertThat(result.payload().data(), equalTo(payload));
+ assertThat(result.statusCode(), equalTo(123));
+ assertThat(result.request().method().method(), equalTo("GET"));
+ assertThat(result.request().headers().size(), equalTo(headers.size()));
+ for (Map.Entry entry : headers.entrySet()) {
+ assertThat(entry.getValue().render(Collections.emptyMap()), equalTo(result.request().headers().get(entry.getKey())));
+ }
+ assertThat(result.request().host(), equalTo("_host"));
+ assertThat(result.request().port(), equalTo(123));
+ assertThat(result.request().body(), equalTo("_body"));
+ }
+
+ private static class MockTemplate implements Template {
+
+ private final String value;
+
+ private MockTemplate(String value) {
+ this.value = value;
+ }
+
+ @Override
+ public String render(Map model) {
+ return value;
+ }
+
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+ return builder.value(value);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ MockTemplate that = (MockTemplate) o;
+
+ if (!value.equals(that.value)) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return value.hashCode();
+ }
+
+ static class Parser implements Template.Parser {
+
+ @Override
+ public Template parse(XContentParser parser) throws IOException, ParseException {
+ String value = parser.text();
+ return new MockTemplate(value);
+ }
+ }
+
+ }
+
+}
diff --git a/src/test/java/org/elasticsearch/watcher/support/http/HttpClientTest.java b/src/test/java/org/elasticsearch/watcher/support/http/HttpClientTest.java
new file mode 100644
index 00000000000..5ef7a5b1c4e
--- /dev/null
+++ b/src/test/java/org/elasticsearch/watcher/support/http/HttpClientTest.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.watcher.support.http;
+
+import com.carrotsearch.randomizedtesting.annotations.Repeat;
+import com.squareup.okhttp.mockwebserver.MockResponse;
+import com.squareup.okhttp.mockwebserver.MockWebServer;
+import com.squareup.okhttp.mockwebserver.RecordedRequest;
+import org.elasticsearch.common.base.Charsets;
+import org.elasticsearch.common.collect.MapBuilder;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.test.ElasticsearchTestCase;
+import org.elasticsearch.watcher.support.http.auth.BasicAuth;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.equalTo;
+
+/**
+ */
+public class HttpClientTest extends ElasticsearchTestCase {
+
+ private MockWebServer webServer;
+ private HttpClient httpClient;
+
+ @Before
+ public void init() throws Exception {
+ webServer = new MockWebServer();
+ webServer.start(9200);
+ httpClient = new HttpClient(ImmutableSettings.EMPTY);
+ }
+
+ @After
+ public void after() throws Exception {
+ webServer.shutdown();
+ }
+
+ @Test
+ @Repeat(iterations = 10)
+ public void testBasics() throws Exception {
+ int responseCode = randomIntBetween(200, 203);
+ String body = randomAsciiOfLengthBetween(2, 8096);
+ webServer.enqueue(new MockResponse().setResponseCode(responseCode).setBody(body));
+
+
+ HttpRequest request = new HttpRequest();
+ request.method(HttpMethod.POST);
+ request.host("localhost");
+ request.port(9200);
+ request.path("/" + randomAsciiOfLength(5));
+ String paramKey;
+ String paramValue;
+ request.params(MapBuilder.newMapBuilder()
+ .put(paramKey = randomAsciiOfLength(3), paramValue = randomAsciiOfLength(3))
+ .map());
+ String headerKey;
+ String headerValue;
+ request.headers(MapBuilder.newMapBuilder()
+ .put(headerKey = randomAsciiOfLength(3), headerValue = randomAsciiOfLength(3))
+ .map());
+ request.body(randomAsciiOfLength(5));
+ HttpResponse response = httpClient.execute(request);
+ RecordedRequest recordedRequest = webServer.takeRequest();
+
+ assertThat(response.status(), equalTo(responseCode));
+ assertThat(new String(response.body(), Charsets.UTF_8), equalTo(body));
+ assertThat(webServer.getRequestCount(), equalTo(1));
+ assertThat(recordedRequest.getBody().readString(Charsets.UTF_8), equalTo(request.body()));
+ assertThat(recordedRequest.getPath().split("\\?")[0], equalTo(request.path()));
+ assertThat(recordedRequest.getPath().split("\\?")[1], equalTo(paramKey + "=" + paramValue));
+ assertThat(recordedRequest.getHeader(headerKey), equalTo(headerValue));
+ }
+
+ @Test
+ public void testBasicAuth() throws Exception {
+ webServer.enqueue(new MockResponse().setResponseCode(200).setBody("body"));
+ HttpRequest request = new HttpRequest();
+ request.method(HttpMethod.POST);
+ request.host("localhost");
+ request.port(9200);
+ request.path("/test");
+ request.auth(new BasicAuth("user", "pass"));
+ request.body("body");
+ HttpResponse response = httpClient.execute(request);
+ assertThat(response.status(), equalTo(200));
+ assertThat(new String(response.body(), Charsets.UTF_8), equalTo("body"));
+ RecordedRequest recordedRequest = webServer.takeRequest();
+ assertThat(recordedRequest.getHeader("Authorization"), equalTo("Basic dXNlcjpwYXNz"));
+ }
+
+}
diff --git a/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java b/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java
index 03313184fd0..abd71140955 100644
--- a/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java
+++ b/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java
@@ -40,12 +40,12 @@ import org.elasticsearch.watcher.actions.email.service.Authentication;
import org.elasticsearch.watcher.actions.email.service.Email;
import org.elasticsearch.watcher.actions.email.service.EmailService;
import org.elasticsearch.watcher.actions.email.service.Profile;
-import org.elasticsearch.watcher.actions.webhook.HttpClient;
import org.elasticsearch.watcher.client.WatcherClient;
import org.elasticsearch.watcher.history.HistoryStore;
import org.elasticsearch.watcher.history.WatchRecord;
import org.elasticsearch.watcher.support.WatcherUtils;
import org.elasticsearch.watcher.support.clock.ClockMock;
+import org.elasticsearch.watcher.support.http.HttpClient;
import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.watcher.support.template.Template;
import org.elasticsearch.watcher.transport.actions.stats.WatcherStatsResponse;
diff --git a/src/test/java/org/elasticsearch/watcher/test/WatcherTestUtils.java b/src/test/java/org/elasticsearch/watcher/test/WatcherTestUtils.java
index 40fde1abff7..8abd0e11840 100644
--- a/src/test/java/org/elasticsearch/watcher/test/WatcherTestUtils.java
+++ b/src/test/java/org/elasticsearch/watcher/test/WatcherTestUtils.java
@@ -10,9 +10,7 @@ import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.joda.time.DateTime;
-import org.elasticsearch.common.joda.time.DateTimeZone;
import org.elasticsearch.common.logging.ESLogger;
-import org.elasticsearch.common.netty.handler.codec.http.HttpMethod;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.search.builder.SearchSourceBuilder;
@@ -24,13 +22,14 @@ import org.elasticsearch.watcher.actions.email.service.Authentication;
import org.elasticsearch.watcher.actions.email.service.Email;
import org.elasticsearch.watcher.actions.email.service.EmailService;
import org.elasticsearch.watcher.actions.email.service.Profile;
-import org.elasticsearch.watcher.actions.webhook.HttpClient;
import org.elasticsearch.watcher.actions.webhook.WebhookAction;
import org.elasticsearch.watcher.condition.script.ScriptCondition;
import org.elasticsearch.watcher.input.search.SearchInput;
import org.elasticsearch.watcher.support.Script;
import org.elasticsearch.watcher.support.WatcherUtils;
import org.elasticsearch.watcher.support.clock.SystemClock;
+import org.elasticsearch.watcher.support.http.HttpClient;
+import org.elasticsearch.watcher.support.http.HttpMethod;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.watcher.support.template.ScriptTemplate;
diff --git a/src/test/java/org/elasticsearch/watcher/test/integration/HttpInputIntegrationTest.java b/src/test/java/org/elasticsearch/watcher/test/integration/HttpInputIntegrationTest.java
new file mode 100644
index 00000000000..5d5f6926ecb
--- /dev/null
+++ b/src/test/java/org/elasticsearch/watcher/test/integration/HttpInputIntegrationTest.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.watcher.test.integration;
+
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.node.internal.InternalNode;
+import org.elasticsearch.watcher.client.WatchSourceBuilder;
+import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
+import org.elasticsearch.watcher.support.template.ScriptTemplate;
+import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
+import org.elasticsearch.watcher.trigger.TriggerBuilders;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.elasticsearch.watcher.actions.ActionBuilders.indexAction;
+import static org.elasticsearch.watcher.client.WatchSourceBuilder.watchSourceBuilder;
+import static org.elasticsearch.watcher.condition.ConditionBuilders.scriptCondition;
+import static org.elasticsearch.watcher.input.InputBuilders.httpInput;
+import static org.elasticsearch.watcher.trigger.schedule.Schedules.interval;
+
+/**
+ */
+public class HttpInputIntegrationTest extends AbstractWatcherIntegrationTests {
+
+ @Override
+ protected Settings nodeSettings(int nodeOrdinal) {
+ return ImmutableSettings.builder()
+ .put(InternalNode.HTTP_ENABLED, true)
+ .put(super.nodeSettings(nodeOrdinal))
+ .build();
+ }
+
+ @Override
+ protected boolean shieldEnabled() {
+ return false;
+ }
+
+ @Test
+ public void testHttpInput() throws Exception {
+ ScriptServiceProxy sc = scriptService();
+ client().prepareIndex("index", "type", "id").setSource("{}").setRefresh(true).get();
+
+ InetSocketAddress address = internalTestCluster().httpAddresses()[0];
+ String body = jsonBuilder().startObject().field("size", 1).endObject().string();
+ WatchSourceBuilder source = watchSourceBuilder()
+ .trigger(TriggerBuilders.schedule(interval("5s")))
+ .input(httpInput()
+ .setHost(address.getHostName())
+ .setPort(address.getPort())
+ .setPath(new ScriptTemplate(sc, "/index/_search"))
+ .setBody(new ScriptTemplate(sc, body))
+ )
+ .condition(scriptCondition("ctx.payload.hits.total == 1"))
+ .addAction(indexAction("idx", "action"));
+ watcherClient().preparePutWatch("_name")
+ .source(source)
+ .get();
+
+ if (timeWarped()) {
+ timeWarp().scheduler().trigger("_name");
+ refresh();
+ }
+ assertWatchWithMinimumPerformedActionsCount("_name", 1, false);
+ }
+
+}
diff --git a/src/test/java/org/elasticsearch/watcher/watch/WatchTests.java b/src/test/java/org/elasticsearch/watcher/watch/WatchTests.java
index 6b374138ba2..8f6061935f6 100644
--- a/src/test/java/org/elasticsearch/watcher/watch/WatchTests.java
+++ b/src/test/java/org/elasticsearch/watcher/watch/WatchTests.java
@@ -12,7 +12,6 @@ import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
-import org.elasticsearch.common.netty.handler.codec.http.HttpMethod;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@@ -27,7 +26,6 @@ import org.elasticsearch.watcher.actions.email.service.Email;
import org.elasticsearch.watcher.actions.email.service.EmailService;
import org.elasticsearch.watcher.actions.email.service.Profile;
import org.elasticsearch.watcher.actions.index.IndexAction;
-import org.elasticsearch.watcher.actions.webhook.HttpClient;
import org.elasticsearch.watcher.actions.webhook.WebhookAction;
import org.elasticsearch.watcher.condition.Condition;
import org.elasticsearch.watcher.condition.ConditionRegistry;
@@ -40,6 +38,8 @@ import org.elasticsearch.watcher.input.simple.SimpleInput;
import org.elasticsearch.watcher.support.Script;
import org.elasticsearch.watcher.support.WatcherUtils;
import org.elasticsearch.watcher.support.clock.SystemClock;
+import org.elasticsearch.watcher.support.http.HttpClient;
+import org.elasticsearch.watcher.support.http.HttpMethod;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.watcher.support.template.ScriptTemplate;