From e00bb69982677d244bd7c317e11f2c5c8a56aed2 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 26 Mar 2015 23:03:31 +0100 Subject: [PATCH] Added http input. The http input allows to let any http interface be the input for a watch. The http input can be configured with the following options: * `method` - Optional http method. (default to GET) * `host` - The host of the http service. * `port` - The port of the http service. * `path` - The url path. * `params` - Optional url query string options. * `header` - Optional http header. * `auth` - Optional authentication http heads. * `body` - Optional body The response of the http request is expected to be valid json. Closes elastic/elasticsearch#157 Original commit: elastic/x-pack-elasticsearch@0b1f1226159b0ae9fb0331f69887cddafdbdffbc --- pom.xml | 7 + .../elasticsearch/watcher/WatcherModule.java | 2 + .../watcher/actions/ActionModule.java | 6 +- .../watcher/actions/webhook/HttpClient.java | 43 --- .../actions/webhook/WebhookAction.java | 52 ++-- .../watcher/input/InputBuilders.java | 5 + .../watcher/input/InputModule.java | 8 +- .../watcher/input/http/HttpInput.java | 251 ++++++++++++++++++ .../watcher/support/http/HttpClient.java | 92 +++++++ .../support/http/HttpClientModule.java | 31 +++ .../watcher/support/http/HttpMethod.java | 49 ++++ .../watcher/support/http/HttpRequest.java | 220 +++++++++++++++ .../watcher/support/http/HttpResponse.java | 49 ++++ .../support/http/TemplatedHttpRequest.java | 243 +++++++++++++++++ .../watcher/support/http/auth/AuthModule.java | 22 ++ .../watcher/support/http/auth/BasicAuth.java | 113 ++++++++ .../watcher/support/http/auth/HttpAuth.java | 39 +++ .../support/http/auth/HttpAuthException.java | 21 ++ .../support/http/auth/HttpAuthRegistry.java | 45 ++++ .../watcher/input/http/HttpInputTests.java | 243 +++++++++++++++++ .../watcher/support/http/HttpClientTest.java | 95 +++++++ .../test/AbstractWatcherIntegrationTests.java | 2 +- .../watcher/test/WatcherTestUtils.java | 5 +- .../integration/HttpInputIntegrationTest.java | 72 +++++ .../watcher/watch/WatchTests.java | 4 +- 25 files changed, 1638 insertions(+), 81 deletions(-) delete mode 100644 src/main/java/org/elasticsearch/watcher/actions/webhook/HttpClient.java create mode 100644 src/main/java/org/elasticsearch/watcher/input/http/HttpInput.java create mode 100644 src/main/java/org/elasticsearch/watcher/support/http/HttpClient.java create mode 100644 src/main/java/org/elasticsearch/watcher/support/http/HttpClientModule.java create mode 100644 src/main/java/org/elasticsearch/watcher/support/http/HttpMethod.java create mode 100644 src/main/java/org/elasticsearch/watcher/support/http/HttpRequest.java create mode 100644 src/main/java/org/elasticsearch/watcher/support/http/HttpResponse.java create mode 100644 src/main/java/org/elasticsearch/watcher/support/http/TemplatedHttpRequest.java create mode 100644 src/main/java/org/elasticsearch/watcher/support/http/auth/AuthModule.java create mode 100644 src/main/java/org/elasticsearch/watcher/support/http/auth/BasicAuth.java create mode 100644 src/main/java/org/elasticsearch/watcher/support/http/auth/HttpAuth.java create mode 100644 src/main/java/org/elasticsearch/watcher/support/http/auth/HttpAuthException.java create mode 100644 src/main/java/org/elasticsearch/watcher/support/http/auth/HttpAuthRegistry.java create mode 100644 src/test/java/org/elasticsearch/watcher/input/http/HttpInputTests.java create mode 100644 src/test/java/org/elasticsearch/watcher/support/http/HttpClientTest.java create mode 100644 src/test/java/org/elasticsearch/watcher/test/integration/HttpInputIntegrationTest.java diff --git a/pom.xml b/pom.xml index 2dea9dbbedd..f1009b72c48 100644 --- a/pom.xml +++ b/pom.xml @@ -104,6 +104,13 @@ test + + com.squareup.okhttp + mockwebserver + 2.3.0 + test + + diff --git a/src/main/java/org/elasticsearch/watcher/WatcherModule.java b/src/main/java/org/elasticsearch/watcher/WatcherModule.java index c7bd278e300..3b43644ffe3 100644 --- a/src/main/java/org/elasticsearch/watcher/WatcherModule.java +++ b/src/main/java/org/elasticsearch/watcher/WatcherModule.java @@ -20,6 +20,7 @@ import org.elasticsearch.watcher.rest.WatcherRestModule; import org.elasticsearch.watcher.shield.WatcherShieldModule; import org.elasticsearch.watcher.support.TemplateUtils; import org.elasticsearch.watcher.support.clock.ClockModule; +import org.elasticsearch.watcher.support.http.HttpClientModule; import org.elasticsearch.watcher.support.init.InitializingModule; import org.elasticsearch.watcher.support.template.TemplateModule; import org.elasticsearch.watcher.transform.TransformModule; @@ -42,6 +43,7 @@ public class WatcherModule extends AbstractModule implements SpawnModules { new InitializingModule(), new WatchModule(), new TemplateModule(), + new HttpClientModule(), new ClockModule(), new WatcherClientModule(), new TransformModule(), diff --git a/src/main/java/org/elasticsearch/watcher/actions/ActionModule.java b/src/main/java/org/elasticsearch/watcher/actions/ActionModule.java index a9f77910c14..2ccd7b37da7 100644 --- a/src/main/java/org/elasticsearch/watcher/actions/ActionModule.java +++ b/src/main/java/org/elasticsearch/watcher/actions/ActionModule.java @@ -5,14 +5,13 @@ */ package org.elasticsearch.watcher.actions; +import org.elasticsearch.common.inject.AbstractModule; +import org.elasticsearch.common.inject.multibindings.MapBinder; import org.elasticsearch.watcher.actions.email.EmailAction; import org.elasticsearch.watcher.actions.email.service.EmailService; import org.elasticsearch.watcher.actions.email.service.InternalEmailService; import org.elasticsearch.watcher.actions.index.IndexAction; -import org.elasticsearch.watcher.actions.webhook.HttpClient; import org.elasticsearch.watcher.actions.webhook.WebhookAction; -import org.elasticsearch.common.inject.AbstractModule; -import org.elasticsearch.common.inject.multibindings.MapBinder; import java.util.HashMap; import java.util.Map; @@ -46,7 +45,6 @@ public class ActionModule extends AbstractModule { } bind(ActionRegistry.class).asEagerSingleton(); - bind(HttpClient.class).asEagerSingleton(); bind(EmailService.class).to(InternalEmailService.class).asEagerSingleton(); } diff --git a/src/main/java/org/elasticsearch/watcher/actions/webhook/HttpClient.java b/src/main/java/org/elasticsearch/watcher/actions/webhook/HttpClient.java deleted file mode 100644 index 49fb375ed72..00000000000 --- a/src/main/java/org/elasticsearch/watcher/actions/webhook/HttpClient.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.watcher.actions.webhook; - -import org.elasticsearch.common.base.Charsets; -import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.netty.handler.codec.http.HttpMethod; -import org.elasticsearch.common.settings.Settings; - -import java.io.IOException; -import java.net.HttpURLConnection; -import java.net.URL; -import java.net.URLEncoder; - -/** - * Client class to wrap http connections - */ -public class HttpClient extends AbstractComponent { - - @Inject - public HttpClient(Settings settings) { - super(settings); - } - - public int execute(HttpMethod method, String url, String body) throws IOException { - logger.debug("making [{}] request to [{}]", method.getName(), url); - if (logger.isTraceEnabled()) { - logger.trace("sending [{}] as body of request", body); - } - URL encodedUrl = new URL(URLEncoder.encode(url, Charsets.UTF_8.name())); - HttpURLConnection httpConnection = (HttpURLConnection) encodedUrl.openConnection(); - httpConnection.setRequestMethod(method.getName()); - httpConnection.setRequestProperty("Accept-Charset", Charsets.UTF_8.name()); - httpConnection.setDoOutput(true); - httpConnection.setRequestProperty("Content-Length", Integer.toString(body.length())); - httpConnection.getOutputStream().write(body.getBytes(Charsets.UTF_8.name())); - return httpConnection.getResponseCode(); - } -} diff --git a/src/main/java/org/elasticsearch/watcher/actions/webhook/WebhookAction.java b/src/main/java/org/elasticsearch/watcher/actions/webhook/WebhookAction.java index 86b07e02444..7d0e32df67c 100644 --- a/src/main/java/org/elasticsearch/watcher/actions/webhook/WebhookAction.java +++ b/src/main/java/org/elasticsearch/watcher/actions/webhook/WebhookAction.java @@ -5,27 +5,29 @@ */ package org.elasticsearch.watcher.actions.webhook; -import org.elasticsearch.watcher.WatcherSettingsException; -import org.elasticsearch.watcher.watch.WatchExecutionContext; -import org.elasticsearch.watcher.watch.Payload; -import org.elasticsearch.watcher.actions.Action; -import org.elasticsearch.watcher.actions.ActionException; -import org.elasticsearch.watcher.actions.ActionSettingsException; -import org.elasticsearch.watcher.support.Script; -import org.elasticsearch.watcher.support.Variables; -import org.elasticsearch.watcher.support.template.Template; -import org.elasticsearch.watcher.support.template.XContentTemplate; -import org.elasticsearch.watcher.transform.Transform; -import org.elasticsearch.watcher.transform.TransformRegistry; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.netty.handler.codec.http.HttpMethod; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.watcher.WatcherSettingsException; +import org.elasticsearch.watcher.actions.Action; +import org.elasticsearch.watcher.actions.ActionException; +import org.elasticsearch.watcher.actions.ActionSettingsException; +import org.elasticsearch.watcher.support.Script; +import org.elasticsearch.watcher.support.Variables; +import org.elasticsearch.watcher.support.http.HttpClient; +import org.elasticsearch.watcher.support.http.HttpMethod; +import org.elasticsearch.watcher.support.http.HttpResponse; +import org.elasticsearch.watcher.support.template.Template; +import org.elasticsearch.watcher.support.template.XContentTemplate; +import org.elasticsearch.watcher.transform.Transform; +import org.elasticsearch.watcher.transform.TransformRegistry; +import org.elasticsearch.watcher.watch.Payload; +import org.elasticsearch.watcher.watch.WatchExecutionContext; import java.io.IOException; import java.util.Locale; @@ -62,17 +64,17 @@ public class WebhookAction extends Action { String urlText = url.render(model); String bodyText = body != null ? body.render(model) : XContentTemplate.YAML.render(model); try { - - int status = httpClient.execute(method, urlText, bodyText); - if (status >= 400) { - logger.warn("got status [" + status + "] when connecting to [" + urlText + "]"); - } else { - if (status >= 300) { - logger.warn("a 200 range return code was expected, but got [" + status + "]"); + try (HttpResponse response = httpClient.execute(method, urlText, bodyText)) { + int status = response.status(); + if (status >= 400) { + logger.warn("got status [" + status + "] when connecting to [" + urlText + "]"); + } else { + if (status >= 300) { + logger.warn("a 200 range return code was expected, but got [" + status + "]"); + } } + return new Result.Executed(status, urlText, bodyText); } - return new Result.Executed(status, urlText, bodyText); - } catch (IOException ioe) { logger.error("failed to connect to [{}] for watch [{}]", ioe, urlText, ctx.watch().name()); return new Result.Failure("failed to send http request. " + ioe.getMessage()); @@ -87,7 +89,7 @@ public class WebhookAction extends Action { .field(transform.type(), transform) .endObject(); } - builder.field(Parser.METHOD_FIELD.getPreferredName(), method.getName().toLowerCase(Locale.ROOT)); + builder.field(Parser.METHOD_FIELD.getPreferredName(), method.method()); builder.field(Parser.URL_FIELD.getPreferredName(), url); if (body != null) { builder.field(Parser.BODY_FIELD.getPreferredName(), body); @@ -222,7 +224,7 @@ public class WebhookAction extends Action { if (METHOD_FIELD.match(currentFieldName)) { method = HttpMethod.valueOf(parser.text().toUpperCase(Locale.ROOT)); if (method != HttpMethod.POST && method != HttpMethod.GET && method != HttpMethod.PUT) { - throw new ActionSettingsException("could not parse webhook action. unsupported http method [" + method.getName() + "]"); + throw new ActionSettingsException("could not parse webhook action. unsupported http method [" + method.method() + "]"); } } else if (URL_FIELD.match(currentFieldName)) { try { @@ -339,7 +341,7 @@ public class WebhookAction extends Action { builder.startObject(); builder.field(Parser.URL_FIELD.getPreferredName(), url); if (method != null) { - builder.field(Parser.METHOD_FIELD.getPreferredName(), method.getName().toLowerCase(Locale.ROOT)); + builder.field(Parser.METHOD_FIELD.getPreferredName(), method.method()); } if (body != null) { builder.field(Parser.BODY_FIELD.getPreferredName(), body); diff --git a/src/main/java/org/elasticsearch/watcher/input/InputBuilders.java b/src/main/java/org/elasticsearch/watcher/input/InputBuilders.java index fcb24bd7ba8..6dda2287f25 100644 --- a/src/main/java/org/elasticsearch/watcher/input/InputBuilders.java +++ b/src/main/java/org/elasticsearch/watcher/input/InputBuilders.java @@ -7,6 +7,7 @@ package org.elasticsearch.watcher.input; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.watcher.input.http.HttpInput; import org.elasticsearch.watcher.input.search.SearchInput; import org.elasticsearch.watcher.input.simple.SimpleInput; @@ -36,4 +37,8 @@ public final class InputBuilders { public static SimpleInput.SourceBuilder simpleInput(Map data) { return new SimpleInput.SourceBuilder(data); } + + public static HttpInput.SourceBuilder httpInput() { + return new HttpInput.SourceBuilder(); + } } diff --git a/src/main/java/org/elasticsearch/watcher/input/InputModule.java b/src/main/java/org/elasticsearch/watcher/input/InputModule.java index 6594dad5ae1..884ec84490e 100644 --- a/src/main/java/org/elasticsearch/watcher/input/InputModule.java +++ b/src/main/java/org/elasticsearch/watcher/input/InputModule.java @@ -5,10 +5,11 @@ */ package org.elasticsearch.watcher.input; -import org.elasticsearch.watcher.input.search.SearchInput; -import org.elasticsearch.watcher.input.simple.SimpleInput; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.multibindings.MapBinder; +import org.elasticsearch.watcher.input.http.HttpInput; +import org.elasticsearch.watcher.input.search.SearchInput; +import org.elasticsearch.watcher.input.simple.SimpleInput; import java.util.HashMap; import java.util.Map; @@ -26,12 +27,13 @@ public class InputModule extends AbstractModule { @Override protected void configure() { - MapBinder parsersBinder = MapBinder.newMapBinder(binder(), String.class, Input.Parser.class); bind(SearchInput.Parser.class).asEagerSingleton(); parsersBinder.addBinding(SearchInput.TYPE).to(SearchInput.Parser.class); bind(SimpleInput.Parser.class).asEagerSingleton(); parsersBinder.addBinding(SimpleInput.TYPE).to(SimpleInput.Parser.class); + bind(HttpInput.Parser.class).asEagerSingleton(); + parsersBinder.addBinding(HttpInput.TYPE).to(HttpInput.Parser.class); for (Map.Entry> entry : parsers.entrySet()) { bind(entry.getValue()).asEagerSingleton(); diff --git a/src/main/java/org/elasticsearch/watcher/input/http/HttpInput.java b/src/main/java/org/elasticsearch/watcher/input/http/HttpInput.java new file mode 100644 index 00000000000..ffbbb7afe28 --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/input/http/HttpInput.java @@ -0,0 +1,251 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.input.http; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.watcher.input.Input; +import org.elasticsearch.watcher.support.Variables; +import org.elasticsearch.watcher.support.http.HttpClient; +import org.elasticsearch.watcher.support.http.HttpRequest; +import org.elasticsearch.watcher.support.http.HttpResponse; +import org.elasticsearch.watcher.support.http.TemplatedHttpRequest; +import org.elasticsearch.watcher.support.http.auth.HttpAuth; +import org.elasticsearch.watcher.support.template.Template; +import org.elasticsearch.watcher.watch.Payload; +import org.elasticsearch.watcher.watch.WatchExecutionContext; + +import java.io.IOException; +import java.util.Map; + +/** + */ +public class HttpInput extends Input { + + public static final String TYPE = "http"; + + private final HttpClient client; + private final TemplatedHttpRequest request; + + public HttpInput(ESLogger logger, HttpClient client, TemplatedHttpRequest request) { + super(logger); + this.request = request; + this.client = client; + } + + @Override + public String type() { + return TYPE; + } + + @Override + public Result execute(WatchExecutionContext ctx) throws IOException { + Map model = Variables.createCtxModel(ctx, null); + HttpRequest httpRequest = request.render(model); + try (HttpResponse response = client.execute(httpRequest)) { + Tuple> result = XContentHelper.convertToMap(response.body(), true); + return new Result(TYPE, new Payload.Simple(result.v2()), httpRequest, response.status()); + } + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return request.toXContent(builder, params); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + HttpInput httpInput = (HttpInput) o; + + if (!request.equals(httpInput.request)) return false; + + return true; + } + + @Override + public int hashCode() { + return request.hashCode(); + } + + TemplatedHttpRequest getRequest() { + return request; + } + + public final static class Result extends Input.Result { + + private final HttpRequest request; + private final int statusCode; + + public Result(String type, Payload payload, HttpRequest request, int statusCode) { + super(type, payload); + this.request = request; + this.statusCode = statusCode; + } + @Override + protected XContentBuilder toXContentBody(XContentBuilder builder, Params params) throws IOException { + builder.field(Parser.HTTP_STATUS_FIELD.getPreferredName(), statusCode); + builder.field(Parser.REQUEST_FIELD.getPreferredName(), request); + return builder; + } + + HttpRequest request() { + return request; + } + + int statusCode() { + return statusCode; + } + } + + public final static class Parser extends AbstractComponent implements Input.Parser { + + public static final ParseField REQUEST_FIELD = new ParseField("request"); + public static final ParseField HTTP_STATUS_FIELD = new ParseField("http_status"); + + private final HttpClient client; + private final HttpRequest.Parser requestParser; + private final TemplatedHttpRequest.Parser templatedRequestParser; + + @Inject + public Parser(Settings settings, HttpClient client, HttpRequest.Parser requestParser, TemplatedHttpRequest.Parser templatedRequestParser) { + super(settings); + this.client = client; + this.requestParser = requestParser; + this.templatedRequestParser = templatedRequestParser; + } + + @Override + public String type() { + return TYPE; + } + + @Override + public HttpInput parse(XContentParser parser) throws IOException { + TemplatedHttpRequest request = templatedRequestParser.parse(parser); + return new HttpInput(logger, client, request); + } + + @Override + public Result parseResult(XContentParser parser) throws IOException { + Payload payload = null; + HttpRequest request = null; + int statusCode = -1; + + XContentParser.Token token; + String currentFieldName = null; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token == XContentParser.Token.START_OBJECT) { + if (Input.Result.PAYLOAD_FIELD.match(currentFieldName)) { + payload = new Payload.XContent(parser); + } else if (REQUEST_FIELD.match(currentFieldName)) { + request = requestParser.parse(parser); + } + } else if (token == XContentParser.Token.VALUE_NUMBER) { + statusCode = parser.intValue(); + } + } + return new Result(TYPE, payload, request, statusCode); + } + + } + + public final static class SourceBuilder implements Input.SourceBuilder { + + private String host; + private int port; + private String method; + private Template path; + private Map params; + private Map headers; + private HttpAuth auth; + private Template body; + + public SourceBuilder setHost(String host) { + this.host = host; + return this; + } + + public SourceBuilder setPort(int port) { + this.port = port; + return this; + } + + public SourceBuilder setMethod(String method) { + this.method = method; + return this; + } + + public SourceBuilder setPath(Template path) { + this.path = path; + return this; + } + + public SourceBuilder setParams(Map params) { + this.params = params; + return this; + } + + public SourceBuilder setHeaders(Map headers) { + this.headers = headers; + return this; + } + + public SourceBuilder setAuth(HttpAuth auth) { + this.auth = auth; + return this; + } + + public SourceBuilder setBody(Template body) { + this.body = body; + return this; + } + + @Override + public String type() { + return TYPE; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params p) throws IOException { + builder.startObject(); + builder.field(HttpRequest.Parser.HOST_FIELD.getPreferredName(), host); + builder.field(HttpRequest.Parser.PORT_FIELD.getPreferredName(), port); + if (method != null) { + builder.field(HttpRequest.Parser.METHOD_FIELD.getPreferredName(), method); + } + if (path != null) { + builder.field(HttpRequest.Parser.PATH_FIELD.getPreferredName(), path); + } + if (params != null) { + builder.field(HttpRequest.Parser.PARAMS_FIELD.getPreferredName(), params); + } + if (headers != null) { + builder.field(HttpRequest.Parser.HEADERS_FIELD.getPreferredName(), headers); + } + if (auth != null) { + builder.field(HttpRequest.Parser.AUTH_FIELD.getPreferredName(), auth); + } + if (body != null) { + builder.field(HttpRequest.Parser.BODY_FIELD.getPreferredName(), body); + } + return builder.endObject(); + } + } + +} diff --git a/src/main/java/org/elasticsearch/watcher/support/http/HttpClient.java b/src/main/java/org/elasticsearch/watcher/support/http/HttpClient.java new file mode 100644 index 00000000000..c511050b771 --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/support/http/HttpClient.java @@ -0,0 +1,92 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.support.http; + +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.common.base.Charsets; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; + +import java.io.IOException; +import java.net.*; +import java.util.Map; + +/** + * Client class to wrap http connections + */ +public class HttpClient extends AbstractComponent { + + @Inject + public HttpClient(Settings settings) { + super(settings); + } + + // TODO: Remove this when webhook action has been refactored to use this client properly + public HttpResponse execute(HttpMethod method, String urlString, String body) throws IOException { + URL url = new URL(urlString); + HttpRequest request = new HttpRequest(); + request.method(method); + return doExecute(url, request); + } + + public HttpResponse execute(HttpRequest request) throws IOException { + String queryString = null; + if (request.params() != null) { + StringBuilder builder = new StringBuilder(); + for (Map.Entry entry : request.params().entrySet()) { + if (builder.length() != 0) { + builder.append('&'); + } + builder.append(URLEncoder.encode(entry.getKey(), "utf-8")) + .append('=') + .append(URLEncoder.encode(entry.getValue(), "utf-8")); + } + queryString = builder.toString(); + } + + URI uri; + try { + uri = new URI("http", null, request.host(), request.port(), request.path(), queryString, null); + } catch (URISyntaxException e) { + throw ExceptionsHelper.convertToElastic(e); + } + URL url = uri.toURL(); + return doExecute(url, request); + } + + // TODO: Embed this method in execute() when webhook action has been refactored to use this client properly + private HttpResponse doExecute(URL url, HttpRequest request) throws IOException { + logger.debug("making [{}] request to [{}]", request.method().method(), url); + logger.trace("sending [{}] as body of request", request.body()); + HttpURLConnection urlConnection = (HttpURLConnection) url.openConnection(); + urlConnection.setRequestMethod(request.method().method()); + if (request.headers() != null) { + for (Map.Entry entry : request.headers().entrySet()) { + urlConnection.setRequestProperty(entry.getKey(), entry.getValue()); + } + } + if (request.auth() != null) { + request.auth().update(urlConnection); + } + urlConnection.setUseCaches(false); + urlConnection.setRequestProperty("Accept-Charset", Charsets.UTF_8.name()); + if (request.body() != null) { + urlConnection.setDoOutput(true); + byte[] bytes = request.body().getBytes(Charsets.UTF_8.name()); + urlConnection.setRequestProperty("Content-Length", String.valueOf(bytes.length)); + urlConnection.getOutputStream().write(bytes); + urlConnection.getOutputStream().close(); + } + + HttpResponse response = new HttpResponse(); + response.inputStream(urlConnection.getInputStream()); + response.status(urlConnection.getResponseCode()); + logger.debug("http status code: {}", response.status()); + return response; + } + +} diff --git a/src/main/java/org/elasticsearch/watcher/support/http/HttpClientModule.java b/src/main/java/org/elasticsearch/watcher/support/http/HttpClientModule.java new file mode 100644 index 00000000000..20cc417e02d --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/support/http/HttpClientModule.java @@ -0,0 +1,31 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.support.http; + +import org.elasticsearch.common.collect.ImmutableList; +import org.elasticsearch.common.inject.AbstractModule; +import org.elasticsearch.common.inject.Module; +import org.elasticsearch.common.inject.SpawnModules; +import org.elasticsearch.watcher.support.http.auth.AuthModule; + + +/** + */ +public class HttpClientModule extends AbstractModule implements SpawnModules { + + @Override + public Iterable spawnModules() { + return ImmutableList.of(new AuthModule()); + } + + @Override + protected void configure() { + bind(TemplatedHttpRequest.Parser.class).asEagerSingleton(); + bind(HttpRequest.Parser.class).asEagerSingleton(); + bind(HttpClient.class).asEagerSingleton(); + } + +} diff --git a/src/main/java/org/elasticsearch/watcher/support/http/HttpMethod.java b/src/main/java/org/elasticsearch/watcher/support/http/HttpMethod.java new file mode 100644 index 00000000000..96e437caa32 --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/support/http/HttpMethod.java @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.support.http; + +import org.elasticsearch.ElasticsearchIllegalArgumentException; + +import java.util.Locale; + +/** + */ +public enum HttpMethod { + + HEAD("HEAD"), + GET("GET"), + POST("POST"), + PUT("PUT"), + DELETE("DELETE"); + + private final String method; + + HttpMethod(String method) { + this.method = method; + } + + public String method() { + return method; + } + + public static HttpMethod parse(String value) { + value = value.toUpperCase(Locale.ROOT); + switch (value) { + case "HEAD": + return HEAD; + case "GET": + return GET; + case "POST": + return POST; + case "PUT": + return PUT; + case "DELETE": + return DELETE; + default: + throw new ElasticsearchIllegalArgumentException("unsupported http method [" + value + "]"); + } + } +} diff --git a/src/main/java/org/elasticsearch/watcher/support/http/HttpRequest.java b/src/main/java/org/elasticsearch/watcher/support/http/HttpRequest.java new file mode 100644 index 00000000000..a4a0f013e68 --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/support/http/HttpRequest.java @@ -0,0 +1,220 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.support.http; + +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.watcher.support.WatcherUtils; +import org.elasticsearch.watcher.support.http.auth.HttpAuth; +import org.elasticsearch.watcher.support.http.auth.HttpAuthRegistry; + +import java.io.IOException; +import java.util.Map; + +public class HttpRequest implements ToXContent { + + private String host; + private int port; + private HttpMethod method; + private String path; + private Map params; + private Map headers; + private HttpAuth auth; + private String body; + + public HttpRequest() { + method = HttpMethod.GET; + } + + public String host() { + return host; + } + + public void host(String host) { + this.host = host; + } + + public int port() { + return port; + } + + public void port(int port) { + this.port = port; + } + + public HttpMethod method() { + return method; + } + + public void method(HttpMethod method) { + this.method = method; + } + + public String path() { + return path; + } + + public void path(String path) { + this.path = path; + } + + public Map params() { + return params; + } + + public void params(Map params) { + this.params = params; + } + + public Map headers() { + return headers; + } + + public void headers(Map headers) { + this.headers = headers; + } + + public HttpAuth auth() { + return auth; + } + + public void auth(HttpAuth auth) { + this.auth = auth; + } + + public String body() { + return body; + } + + public void body(String body) { + this.body = body; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.startObject(); + builder.field(Parser.HOST_FIELD.getPreferredName(), host); + builder.field(Parser.PORT_FIELD.getPreferredName(), port); + builder.field(Parser.METHOD_FIELD.getPreferredName(), method); + if (path != null) { + builder.field(Parser.PATH_FIELD.getPreferredName(), path); + } + if (this.params != null) { + builder.startObject(Parser.PARAMS_FIELD.getPreferredName()).value(this.params).endObject(); + } + if (headers != null) { + builder.startObject(Parser.HEADERS_FIELD.getPreferredName()).value(headers).endObject(); + } + if (auth != null) { + builder.field(Parser.AUTH_FIELD.getPreferredName(), auth); + } + if (body != null) { + builder.field(Parser.BODY_FIELD.getPreferredName(), body); + } + return builder.endObject(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + HttpRequest request = (HttpRequest) o; + + if (port != request.port) return false; + if (auth != null ? !auth.equals(request.auth) : request.auth != null) return false; + if (body != null ? !body.equals(request.body) : request.body != null) return false; + if (headers != null ? !headers.equals(request.headers) : request.headers != null) return false; + if (!host.equals(request.host)) return false; + if (method != request.method) return false; + if (params != null ? !params.equals(request.params) : request.params != null) return false; + if (path != null ? !path.equals(request.path) : request.path != null) return false; + + return true; + } + + @Override + public int hashCode() { + int result = host.hashCode(); + result = 31 * result + port; + result = 31 * result + method.hashCode(); + result = 31 * result + (path != null ? path.hashCode() : 0); + result = 31 * result + (params != null ? params.hashCode() : 0); + result = 31 * result + (headers != null ? headers.hashCode() : 0); + result = 31 * result + (auth != null ? auth.hashCode() : 0); + result = 31 * result + (body != null ? body.hashCode() : 0); + return result; + } + + public static class Parser { + + public static final ParseField HOST_FIELD = new ParseField("host"); + public static final ParseField PORT_FIELD = new ParseField("port"); + public static final ParseField METHOD_FIELD = new ParseField("method"); + public static final ParseField PATH_FIELD = new ParseField("path"); + public static final ParseField PARAMS_FIELD = new ParseField("params"); + public static final ParseField HEADERS_FIELD = new ParseField("headers"); + public static final ParseField AUTH_FIELD = new ParseField("auth"); + public static final ParseField BODY_FIELD = new ParseField("body"); + + private final HttpAuthRegistry httpAuthRegistry; + + @Inject + public Parser(HttpAuthRegistry httpAuthRegistry) { + this.httpAuthRegistry = httpAuthRegistry; + } + + public HttpRequest parse(XContentParser parser) throws IOException { + HttpRequest request = new HttpRequest(); + XContentParser.Token token; + String currentFieldName = null; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token == XContentParser.Token.START_OBJECT) { + if (HEADERS_FIELD.match(currentFieldName)) { + request.headers((Map) WatcherUtils.flattenModel(parser.map())); + } else if (PARAMS_FIELD.match(currentFieldName)) { + request.params((Map) WatcherUtils.flattenModel(parser.map())); + } else if (AUTH_FIELD.match(currentFieldName)) { + request.auth(httpAuthRegistry.parse(parser)); + } else if (BODY_FIELD.match(currentFieldName)) { + request.body(parser.text()); + } else { + throw new ElasticsearchParseException("could not parse http request. unexpected field [" + currentFieldName + "]"); + } + } else if (token == XContentParser.Token.VALUE_STRING) { + if (METHOD_FIELD.match(currentFieldName)) { + request.method(HttpMethod.parse(parser.text())); + } else if (HOST_FIELD.match(currentFieldName)) { + request.host(parser.text()); + } else if (PATH_FIELD.match(currentFieldName)) { + request.path(parser.text()); + } else if (BODY_FIELD.match(currentFieldName)) { + request.body(parser.text()); + } else { + throw new ElasticsearchParseException("could not parse http request. unexpected field [" + currentFieldName + "]"); + } + } else if (token == XContentParser.Token.VALUE_NUMBER) { + if (PORT_FIELD.match(currentFieldName)) { + request.port(parser.intValue()); + } else { + throw new ElasticsearchParseException("could not parse http request. unexpected field [" + currentFieldName + "]"); + } + } else { + throw new ElasticsearchParseException("could not parse http request. unexpected token [" + token + "]"); + } + } + return request; + } + + } + +} diff --git a/src/main/java/org/elasticsearch/watcher/support/http/HttpResponse.java b/src/main/java/org/elasticsearch/watcher/support/http/HttpResponse.java new file mode 100644 index 00000000000..baa7d326556 --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/support/http/HttpResponse.java @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.support.http; + +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.common.io.ByteStreams; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; + +public class HttpResponse implements Closeable { + + private int status; + private InputStream inputStream; + private byte[] body; + + public int status() { + return status; + } + + public void status(int status) { + this.status = status; + } + + public byte[] body() { + if (body == null) { + try { + body = ByteStreams.toByteArray(inputStream); + inputStream.close(); + } catch (IOException e) { + throw ExceptionsHelper.convertToElastic(e); + } + } + return body; + } + + public void inputStream(InputStream inputStream) { + this.inputStream = inputStream; + } + + @Override + public void close() throws IOException { + inputStream.close(); + } +} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/watcher/support/http/TemplatedHttpRequest.java b/src/main/java/org/elasticsearch/watcher/support/http/TemplatedHttpRequest.java new file mode 100644 index 00000000000..7cec7bdb843 --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/support/http/TemplatedHttpRequest.java @@ -0,0 +1,243 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.support.http; + +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.collect.MapBuilder; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.watcher.support.http.auth.HttpAuth; +import org.elasticsearch.watcher.support.http.auth.HttpAuthRegistry; +import org.elasticsearch.watcher.support.template.Template; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + */ +public class TemplatedHttpRequest implements ToXContent { + + private String host; + private int port; + private HttpMethod method; + private Template path; + private Map params; + private Map headers; + private HttpAuth auth; + private Template body; + + public TemplatedHttpRequest() { + method = HttpMethod.GET; + } + + public String host() { + return host; + } + + public void host(String host) { + this.host = host; + } + + public int port() { + return port; + } + + public void port(int port) { + this.port = port; + } + + public HttpMethod method() { + return method; + } + + public void method(HttpMethod method) { + this.method = method; + } + + public Template path() { + return path; + } + + public void path(Template path) { + this.path = path; + } + + public Map params() { + return params; + } + + public void params(Map params) { + this.params = params; + } + + public Map headers() { + return headers; + } + + public void headers(Map headers) { + this.headers = headers; + } + + public HttpAuth auth() { + return auth; + } + + public void auth(HttpAuth auth) { + this.auth = auth; + } + + public Template body() { + return body; + } + + public void body(Template body) { + this.body = body; + } + + public HttpRequest render(Map model) { + HttpRequest copy = new HttpRequest(); + copy.host(host); + copy.port(port); + copy.method(method); + if (path != null) { + copy.path(path.render(model)); + } + if (params != null) { + MapBuilder mapBuilder = MapBuilder.newMapBuilder(); + for (Map.Entry entry : params.entrySet()) { + mapBuilder.put(entry.getKey(), entry.getValue().render(model)); + } + copy.params(mapBuilder.map()); + } + if (headers != null) { + MapBuilder mapBuilder = MapBuilder.newMapBuilder(); + for (Map.Entry entry : headers.entrySet()) { + mapBuilder.put(entry.getKey(), entry.getValue().render(model)); + } + copy.headers(mapBuilder.map()); + } + copy.auth(auth); + if (body != null) { + copy.body(body.render(model)); + } + return copy; + } + + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.startObject(); + builder.field(Parser.HOST_FIELD.getPreferredName(), host); + builder.field(Parser.PORT_FIELD.getPreferredName(), port); + builder.field(Parser.METHOD_FIELD.getPreferredName(), method); + if (path != null) { + builder.field(Parser.PATH_FIELD.getPreferredName(), path); + } + if (this.params != null) { + builder.startObject(Parser.PARAMS_FIELD.getPreferredName()).value(this.params).endObject(); + } + if (headers != null) { + builder.startObject(Parser.HEADERS_FIELD.getPreferredName()).value(headers).endObject(); + } + if (auth != null) { + builder.field(Parser.AUTH_FIELD.getPreferredName(), auth); + } + if (body != null) { + builder.field(Parser.BODY_FIELD.getPreferredName(), body); + } + return builder.endObject(); + } + + public static class Parser { + + public static final ParseField HOST_FIELD = new ParseField("host"); + public static final ParseField PORT_FIELD = new ParseField("port"); + public static final ParseField METHOD_FIELD = new ParseField("method"); + public static final ParseField PATH_FIELD = new ParseField("path"); + public static final ParseField PARAMS_FIELD = new ParseField("params"); + public static final ParseField HEADERS_FIELD = new ParseField("headers"); + public static final ParseField AUTH_FIELD = new ParseField("auth"); + public static final ParseField BODY_FIELD = new ParseField("body"); + + private final Template.Parser templateParser; + private final HttpAuthRegistry httpAuthRegistry; + + @Inject + public Parser(Template.Parser templateParser, HttpAuthRegistry httpAuthRegistry) { + this.templateParser = templateParser; + this.httpAuthRegistry = httpAuthRegistry; + } + + public TemplatedHttpRequest parse(XContentParser parser) throws IOException { + TemplatedHttpRequest request = new TemplatedHttpRequest(); + XContentParser.Token token; + String currentFieldName = null; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token == XContentParser.Token.START_OBJECT) { + if (PATH_FIELD.match(currentFieldName)) { + request.path(templateParser.parse(parser)); + } else if (HEADERS_FIELD.match(currentFieldName)) { + request.headers(parseTemplates(parser)); + } else if (PARAMS_FIELD.match(currentFieldName)) { + request.params(parseTemplates(parser)); + } else if (AUTH_FIELD.match(currentFieldName)) { + request.auth(httpAuthRegistry.parse(parser)); + } else if (BODY_FIELD.match(currentFieldName)) { + request.body(templateParser.parse(parser)); + } else { + throw new ElasticsearchParseException("could not parse templated http request. unexpected field [" + currentFieldName + "]"); + } + } else if (token == XContentParser.Token.VALUE_STRING) { + if (METHOD_FIELD.match(currentFieldName)) { + request.method(HttpMethod.parse(parser.text())); + } else if (HOST_FIELD.match(currentFieldName)) { + request.host(parser.text()); + } else if (PATH_FIELD.match(currentFieldName)) { + request.path(templateParser.parse(parser)); + } else if (BODY_FIELD.match(currentFieldName)) { + request.body(templateParser.parse(parser)); + } else { + throw new ElasticsearchParseException("could not parse templated http request. unexpected field [" + currentFieldName + "]"); + } + } else if (token == XContentParser.Token.VALUE_NUMBER) { + if (PORT_FIELD.match(currentFieldName)) { + request.port(parser.intValue()); + } else { + throw new ElasticsearchParseException("could not parse templated http request. unexpected field [" + currentFieldName + "]"); + } + } else { + throw new ElasticsearchParseException("could not parse templated http request. unexpected token [" + token + "]"); + } + } + return request; + } + + private Map parseTemplates(XContentParser parser) throws IOException { + Map templates = new HashMap<>(); + String currentFieldName = null; + for (XContentParser.Token token = parser.nextToken(); token != XContentParser.Token.END_OBJECT; token = parser.nextToken()) { + switch (token) { + case FIELD_NAME: + currentFieldName = parser.currentName(); + break; + case VALUE_STRING: + case START_OBJECT: + templates.put(currentFieldName, templateParser.parse(parser)); + break; + default: + throw new ElasticsearchParseException("could not parse templated http request. unexpected token [" + token + "]"); + } + } + return templates; + } + + } + +} diff --git a/src/main/java/org/elasticsearch/watcher/support/http/auth/AuthModule.java b/src/main/java/org/elasticsearch/watcher/support/http/auth/AuthModule.java new file mode 100644 index 00000000000..1120e0ce56d --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/support/http/auth/AuthModule.java @@ -0,0 +1,22 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.support.http.auth; + +import org.elasticsearch.common.inject.AbstractModule; +import org.elasticsearch.common.inject.multibindings.MapBinder; + +/** + */ +public class AuthModule extends AbstractModule { + + @Override + protected void configure() { + MapBinder parsersBinder = MapBinder.newMapBinder(binder(), String.class, HttpAuth.Parser.class); + bind(BasicAuth.Parser.class).asEagerSingleton(); + parsersBinder.addBinding(BasicAuth.TYPE).to(BasicAuth.Parser.class); + bind(HttpAuthRegistry.class).asEagerSingleton(); + } +} diff --git a/src/main/java/org/elasticsearch/watcher/support/http/auth/BasicAuth.java b/src/main/java/org/elasticsearch/watcher/support/http/auth/BasicAuth.java new file mode 100644 index 00000000000..76952283742 --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/support/http/auth/BasicAuth.java @@ -0,0 +1,113 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.support.http.auth; + +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.common.Base64; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.HttpURLConnection; +import java.util.Locale; + +/** + */ +public class BasicAuth extends HttpAuth { + + public static final String TYPE = "basic"; + + private final String username; + private final String password; + + private final String basicAuth; + + public BasicAuth(String username, String password) throws UnsupportedEncodingException { + this.username = username; + this.password = password; + basicAuth = "Basic " + Base64.encodeBytes(String.format(Locale.ROOT, "%s:%s", username, password).getBytes("utf-8")); + } + + public String type() { + return TYPE; + } + + @Override + public XContentBuilder innerToXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(Parser.USERNAME_FIELD.getPreferredName(), username); + builder.field(Parser.PASSWORD_FIELD.getPreferredName(), password); + return builder.endObject(); + } + + public void update(HttpURLConnection connection) { + connection.setRequestProperty("Authorization", basicAuth); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + BasicAuth basicAuth = (BasicAuth) o; + + if (!password.equals(basicAuth.password)) return false; + if (!username.equals(basicAuth.username)) return false; + + return true; + } + + @Override + public int hashCode() { + int result = username.hashCode(); + result = 31 * result + password.hashCode(); + return result; + } + + public static class Parser implements HttpAuth.Parser { + + static final ParseField USERNAME_FIELD = new ParseField("username"); + static final ParseField PASSWORD_FIELD = new ParseField("password"); + + public String type() { + return TYPE; + } + + public BasicAuth parse(XContentParser parser) throws IOException { + String username = null; + String password = null; + + String fieldName = null; + XContentParser.Token token; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + fieldName = parser.currentName(); + } else if (token == XContentParser.Token.VALUE_STRING) { + if (USERNAME_FIELD.getPreferredName().equals(fieldName)) { + username = parser.text(); + } else if (PASSWORD_FIELD.getPreferredName().equals(fieldName)) { + password = parser.text(); + } else { + throw new ElasticsearchParseException("unsupported field [" + fieldName + "]"); + } + } else { + throw new ElasticsearchParseException("unsupported token [" + token + "]"); + } + } + + if (username == null) { + throw new HttpAuthException("username is a required option"); + } + if (password == null) { + throw new HttpAuthException("password is a required option"); + } + + return new BasicAuth(username, password); + } + } +} diff --git a/src/main/java/org/elasticsearch/watcher/support/http/auth/HttpAuth.java b/src/main/java/org/elasticsearch/watcher/support/http/auth/HttpAuth.java new file mode 100644 index 00000000000..61fb1f95093 --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/support/http/auth/HttpAuth.java @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.support.http.auth; + +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.net.HttpURLConnection; + +public abstract class HttpAuth implements ToXContent { + + public abstract String type(); + + public abstract void update(HttpURLConnection connection); + + @Override + public final XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(type()); + builder = innerToXContent(builder, params); + return builder.endObject(); + } + + public abstract XContentBuilder innerToXContent(XContentBuilder builder, Params params) throws IOException; + + public static interface Parser { + + String type(); + + Auth parse(XContentParser parser) throws IOException; + + } + +} diff --git a/src/main/java/org/elasticsearch/watcher/support/http/auth/HttpAuthException.java b/src/main/java/org/elasticsearch/watcher/support/http/auth/HttpAuthException.java new file mode 100644 index 00000000000..a157e50790b --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/support/http/auth/HttpAuthException.java @@ -0,0 +1,21 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.support.http.auth; + +import org.elasticsearch.watcher.WatcherException; + +/** + */ +public class HttpAuthException extends WatcherException { + + public HttpAuthException(String msg) { + super(msg); + } + + public HttpAuthException(String msg, Throwable cause) { + super(msg, cause); + } +} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/watcher/support/http/auth/HttpAuthRegistry.java b/src/main/java/org/elasticsearch/watcher/support/http/auth/HttpAuthRegistry.java new file mode 100644 index 00000000000..80f852b669b --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/support/http/auth/HttpAuthRegistry.java @@ -0,0 +1,45 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.support.http.auth; + +import org.elasticsearch.common.collect.ImmutableMap; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Map; + +/** + * + */ +public class HttpAuthRegistry { + + private final ImmutableMap parsers; + + @Inject + public HttpAuthRegistry(Map parsers) { + this.parsers = ImmutableMap.copyOf(parsers); + } + + public HttpAuth parse(XContentParser parser) throws IOException { + String type = null; + XContentParser.Token token; + HttpAuth auth = null; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + type = parser.currentName(); + } else if (token == XContentParser.Token.START_OBJECT && type != null) { + HttpAuth.Parser inputParser = parsers.get(type); + if (inputParser == null) { + throw new HttpAuthException("unknown http auth type [" + type + "]"); + } + auth = inputParser.parse(parser); + } + } + return auth; + } + +} diff --git a/src/test/java/org/elasticsearch/watcher/input/http/HttpInputTests.java b/src/test/java/org/elasticsearch/watcher/input/http/HttpInputTests.java new file mode 100644 index 00000000000..e21e9503d03 --- /dev/null +++ b/src/test/java/org/elasticsearch/watcher/input/http/HttpInputTests.java @@ -0,0 +1,243 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.input.http; + +import com.carrotsearch.ant.tasks.junit4.dependencies.com.google.common.collect.ImmutableMap; +import com.carrotsearch.randomizedtesting.annotations.Repeat; +import org.elasticsearch.ElasticsearchIllegalArgumentException; +import org.elasticsearch.common.collect.MapBuilder; +import org.elasticsearch.common.joda.time.DateTime; +import org.elasticsearch.common.joda.time.DateTimeZone; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.ElasticsearchTestCase; +import org.elasticsearch.watcher.actions.Action; +import org.elasticsearch.watcher.actions.Actions; +import org.elasticsearch.watcher.condition.simple.AlwaysTrueCondition; +import org.elasticsearch.watcher.input.Input; +import org.elasticsearch.watcher.input.simple.SimpleInput; +import org.elasticsearch.watcher.support.clock.ClockMock; +import org.elasticsearch.watcher.support.http.*; +import org.elasticsearch.watcher.support.http.auth.BasicAuth; +import org.elasticsearch.watcher.support.http.auth.HttpAuth; +import org.elasticsearch.watcher.support.http.auth.HttpAuthRegistry; +import org.elasticsearch.watcher.support.template.Template; +import org.elasticsearch.watcher.trigger.schedule.IntervalSchedule; +import org.elasticsearch.watcher.trigger.schedule.ScheduleTrigger; +import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent; +import org.elasticsearch.watcher.watch.Payload; +import org.elasticsearch.watcher.watch.Watch; +import org.elasticsearch.watcher.watch.WatchExecutionContext; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Map; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyMap; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + */ +public class HttpInputTests extends ElasticsearchTestCase { + + private HttpClient httpClient; + private HttpInput.Parser httpParser; + + @Before + public void init() throws Exception { + httpClient = mock(HttpClient.class); + Template.Parser templateParser = new MockTemplate.Parser(); + HttpAuthRegistry registry = new HttpAuthRegistry(ImmutableMap.of("basic", new BasicAuth.Parser())); + httpParser = new HttpInput.Parser( + ImmutableSettings.EMPTY, httpClient, new HttpRequest.Parser(registry), new TemplatedHttpRequest.Parser(templateParser, registry) + ); + } + + @Test + public void testExecute() throws Exception { + String host = "_host"; + int port = 123; + String body = "_body"; + TemplatedHttpRequest request = new TemplatedHttpRequest(); + request.method(HttpMethod.POST); + request.host(host); + request.port(port); + Template mockBody = mock(Template.class); + when(mockBody.render(anyMap())).thenReturn(body); + request.body(mockBody); + HttpInput input = new HttpInput(logger, httpClient, request); + + HttpResponse response = new HttpResponse(); + response.status(123); + response.inputStream(new ByteArrayInputStream("{\"key\" : \"value\"}".getBytes(UTF8))); + when(httpClient.execute(any(HttpRequest.class))).thenReturn(response); + + Watch watch = new Watch("test-watch", + new ClockMock(), + new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.MINUTES))), + new SimpleInput(logger, new Payload.Simple()), + new AlwaysTrueCondition(logger), + null, + new Actions(new ArrayList()), + null, + null, + new Watch.Status()); + WatchExecutionContext ctx = new WatchExecutionContext("test-watch1", + watch, + new DateTime(0, DateTimeZone.UTC), + new ScheduleTriggerEvent(new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC))); + HttpInput.Result result = input.execute(ctx); + assertThat(result.type(), equalTo(HttpInput.TYPE)); + assertThat(result.payload().data(), equalTo(MapBuilder.newMapBuilder().put("key", "value").map())); + } + + @Test + @Repeat(iterations = 12) + public void testParser() throws Exception { + final String httpMethod = randomFrom("PUT", "POST", "GET", "DELETE", "HEAD", null); + String host = randomAsciiOfLength(3); + int port = randomInt(); + Template path = new MockTemplate(randomAsciiOfLength(3)); + String body = randomBoolean() ? randomAsciiOfLength(3) : null; + Map params = randomBoolean() ? new MapBuilder().put("a", new MockTemplate("b")).map() : null; + Map headers = randomBoolean() ? new MapBuilder().put("c", new MockTemplate("d")).map() : null; + HttpAuth auth = randomBoolean() ? new BasicAuth("username", "password") : null; + HttpInput.SourceBuilder sourceBuilder = new HttpInput.SourceBuilder() + .setMethod(httpMethod) + .setHost(host) + .setPort(port) + .setPath(path) + .setBody(body != null ? new MockTemplate(body) : null) + .setParams(params) + .setHeaders(headers) + .setAuth(auth); + XContentParser parser = XContentHelper.createParser(jsonBuilder().value(sourceBuilder).bytes()); + parser.nextToken(); + HttpInput result = httpParser.parse(parser); + + assertThat(result.type(), equalTo(HttpInput.TYPE)); + assertThat(result.getRequest().method().method(), equalTo(httpMethod != null ? httpMethod : "GET")); // get is the default + assertThat(result.getRequest().host(), equalTo(host)); + assertThat(result.getRequest().port(), equalTo(port)); + assertThat(result.getRequest().path(), equalTo(path)); + assertThat(result.getRequest().params(), equalTo(params)); + assertThat(result.getRequest().headers(), equalTo(headers)); + assertThat(result.getRequest().auth(), equalTo(auth)); + if (body != null) { + assertThat(result.getRequest().body().render(Collections.emptyMap()), equalTo(body)); + } else { + assertThat(result.getRequest().body(), nullValue()); + } + } + + @Test(expected = ElasticsearchIllegalArgumentException.class) + public void testParser_invalidHttpMethod() throws Exception { + Map headers = new MapBuilder().put("a", new MockTemplate("b")).map(); + HttpInput.SourceBuilder sourceBuilder = new HttpInput.SourceBuilder() + .setMethod("_method") + .setHost("_host") + .setPort(123) + .setBody(new MockTemplate("_body")) + .setHeaders(headers); + XContentParser parser = XContentHelper.createParser(jsonBuilder().value(sourceBuilder).bytes()); + parser.nextToken(); + httpParser.parse(parser); + } + + @Test + public void testParseResult() throws Exception { + String httpMethod = "get"; + String body = "_body"; + Map headers = new MapBuilder().put("a", new MockTemplate("b")).map(); + HttpInput.SourceBuilder sourceBuilder = new HttpInput.SourceBuilder() + .setMethod(httpMethod) + .setHost("_host") + .setPort(123) + .setBody(new MockTemplate(body)) + .setHeaders(headers); + + Map payload = MapBuilder.newMapBuilder().put("x", "y").map(); + + XContentBuilder builder = jsonBuilder().startObject(); + builder.field(HttpInput.Parser.HTTP_STATUS_FIELD.getPreferredName(), 123); + builder.field(HttpInput.Parser.REQUEST_FIELD.getPreferredName(), sourceBuilder); + builder.field(Input.Result.PAYLOAD_FIELD.getPreferredName(), payload); + builder.endObject(); + + XContentParser parser = XContentHelper.createParser(builder.bytes()); + parser.nextToken(); + HttpInput.Result result = httpParser.parseResult(parser); + assertThat(result.type(), equalTo(HttpInput.TYPE)); + assertThat(result.payload().data(), equalTo(payload)); + assertThat(result.statusCode(), equalTo(123)); + assertThat(result.request().method().method(), equalTo("GET")); + assertThat(result.request().headers().size(), equalTo(headers.size())); + for (Map.Entry entry : headers.entrySet()) { + assertThat(entry.getValue().render(Collections.emptyMap()), equalTo(result.request().headers().get(entry.getKey()))); + } + assertThat(result.request().host(), equalTo("_host")); + assertThat(result.request().port(), equalTo(123)); + assertThat(result.request().body(), equalTo("_body")); + } + + private static class MockTemplate implements Template { + + private final String value; + + private MockTemplate(String value) { + this.value = value; + } + + @Override + public String render(Map model) { + return value; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return builder.value(value); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + MockTemplate that = (MockTemplate) o; + + if (!value.equals(that.value)) return false; + + return true; + } + + @Override + public int hashCode() { + return value.hashCode(); + } + + static class Parser implements Template.Parser { + + @Override + public Template parse(XContentParser parser) throws IOException, ParseException { + String value = parser.text(); + return new MockTemplate(value); + } + } + + } + +} diff --git a/src/test/java/org/elasticsearch/watcher/support/http/HttpClientTest.java b/src/test/java/org/elasticsearch/watcher/support/http/HttpClientTest.java new file mode 100644 index 00000000000..5ef7a5b1c4e --- /dev/null +++ b/src/test/java/org/elasticsearch/watcher/support/http/HttpClientTest.java @@ -0,0 +1,95 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.support.http; + +import com.carrotsearch.randomizedtesting.annotations.Repeat; +import com.squareup.okhttp.mockwebserver.MockResponse; +import com.squareup.okhttp.mockwebserver.MockWebServer; +import com.squareup.okhttp.mockwebserver.RecordedRequest; +import org.elasticsearch.common.base.Charsets; +import org.elasticsearch.common.collect.MapBuilder; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.test.ElasticsearchTestCase; +import org.elasticsearch.watcher.support.http.auth.BasicAuth; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.hamcrest.Matchers.equalTo; + +/** + */ +public class HttpClientTest extends ElasticsearchTestCase { + + private MockWebServer webServer; + private HttpClient httpClient; + + @Before + public void init() throws Exception { + webServer = new MockWebServer(); + webServer.start(9200); + httpClient = new HttpClient(ImmutableSettings.EMPTY); + } + + @After + public void after() throws Exception { + webServer.shutdown(); + } + + @Test + @Repeat(iterations = 10) + public void testBasics() throws Exception { + int responseCode = randomIntBetween(200, 203); + String body = randomAsciiOfLengthBetween(2, 8096); + webServer.enqueue(new MockResponse().setResponseCode(responseCode).setBody(body)); + + + HttpRequest request = new HttpRequest(); + request.method(HttpMethod.POST); + request.host("localhost"); + request.port(9200); + request.path("/" + randomAsciiOfLength(5)); + String paramKey; + String paramValue; + request.params(MapBuilder.newMapBuilder() + .put(paramKey = randomAsciiOfLength(3), paramValue = randomAsciiOfLength(3)) + .map()); + String headerKey; + String headerValue; + request.headers(MapBuilder.newMapBuilder() + .put(headerKey = randomAsciiOfLength(3), headerValue = randomAsciiOfLength(3)) + .map()); + request.body(randomAsciiOfLength(5)); + HttpResponse response = httpClient.execute(request); + RecordedRequest recordedRequest = webServer.takeRequest(); + + assertThat(response.status(), equalTo(responseCode)); + assertThat(new String(response.body(), Charsets.UTF_8), equalTo(body)); + assertThat(webServer.getRequestCount(), equalTo(1)); + assertThat(recordedRequest.getBody().readString(Charsets.UTF_8), equalTo(request.body())); + assertThat(recordedRequest.getPath().split("\\?")[0], equalTo(request.path())); + assertThat(recordedRequest.getPath().split("\\?")[1], equalTo(paramKey + "=" + paramValue)); + assertThat(recordedRequest.getHeader(headerKey), equalTo(headerValue)); + } + + @Test + public void testBasicAuth() throws Exception { + webServer.enqueue(new MockResponse().setResponseCode(200).setBody("body")); + HttpRequest request = new HttpRequest(); + request.method(HttpMethod.POST); + request.host("localhost"); + request.port(9200); + request.path("/test"); + request.auth(new BasicAuth("user", "pass")); + request.body("body"); + HttpResponse response = httpClient.execute(request); + assertThat(response.status(), equalTo(200)); + assertThat(new String(response.body(), Charsets.UTF_8), equalTo("body")); + RecordedRequest recordedRequest = webServer.takeRequest(); + assertThat(recordedRequest.getHeader("Authorization"), equalTo("Basic dXNlcjpwYXNz")); + } + +} diff --git a/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java b/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java index 03313184fd0..abd71140955 100644 --- a/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java +++ b/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java @@ -40,12 +40,12 @@ import org.elasticsearch.watcher.actions.email.service.Authentication; import org.elasticsearch.watcher.actions.email.service.Email; import org.elasticsearch.watcher.actions.email.service.EmailService; import org.elasticsearch.watcher.actions.email.service.Profile; -import org.elasticsearch.watcher.actions.webhook.HttpClient; import org.elasticsearch.watcher.client.WatcherClient; import org.elasticsearch.watcher.history.HistoryStore; import org.elasticsearch.watcher.history.WatchRecord; import org.elasticsearch.watcher.support.WatcherUtils; import org.elasticsearch.watcher.support.clock.ClockMock; +import org.elasticsearch.watcher.support.http.HttpClient; import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy; import org.elasticsearch.watcher.support.template.Template; import org.elasticsearch.watcher.transport.actions.stats.WatcherStatsResponse; diff --git a/src/test/java/org/elasticsearch/watcher/test/WatcherTestUtils.java b/src/test/java/org/elasticsearch/watcher/test/WatcherTestUtils.java index 40fde1abff7..8abd0e11840 100644 --- a/src/test/java/org/elasticsearch/watcher/test/WatcherTestUtils.java +++ b/src/test/java/org/elasticsearch/watcher/test/WatcherTestUtils.java @@ -10,9 +10,7 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.joda.time.DateTime; -import org.elasticsearch.common.joda.time.DateTimeZone; import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.netty.handler.codec.http.HttpMethod; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.search.builder.SearchSourceBuilder; @@ -24,13 +22,14 @@ import org.elasticsearch.watcher.actions.email.service.Authentication; import org.elasticsearch.watcher.actions.email.service.Email; import org.elasticsearch.watcher.actions.email.service.EmailService; import org.elasticsearch.watcher.actions.email.service.Profile; -import org.elasticsearch.watcher.actions.webhook.HttpClient; import org.elasticsearch.watcher.actions.webhook.WebhookAction; import org.elasticsearch.watcher.condition.script.ScriptCondition; import org.elasticsearch.watcher.input.search.SearchInput; import org.elasticsearch.watcher.support.Script; import org.elasticsearch.watcher.support.WatcherUtils; import org.elasticsearch.watcher.support.clock.SystemClock; +import org.elasticsearch.watcher.support.http.HttpClient; +import org.elasticsearch.watcher.support.http.HttpMethod; import org.elasticsearch.watcher.support.init.proxy.ClientProxy; import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy; import org.elasticsearch.watcher.support.template.ScriptTemplate; diff --git a/src/test/java/org/elasticsearch/watcher/test/integration/HttpInputIntegrationTest.java b/src/test/java/org/elasticsearch/watcher/test/integration/HttpInputIntegrationTest.java new file mode 100644 index 00000000000..5d5f6926ecb --- /dev/null +++ b/src/test/java/org/elasticsearch/watcher/test/integration/HttpInputIntegrationTest.java @@ -0,0 +1,72 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.test.integration; + +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.node.internal.InternalNode; +import org.elasticsearch.watcher.client.WatchSourceBuilder; +import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy; +import org.elasticsearch.watcher.support.template.ScriptTemplate; +import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests; +import org.elasticsearch.watcher.trigger.TriggerBuilders; +import org.junit.Test; + +import java.net.InetSocketAddress; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.watcher.actions.ActionBuilders.indexAction; +import static org.elasticsearch.watcher.client.WatchSourceBuilder.watchSourceBuilder; +import static org.elasticsearch.watcher.condition.ConditionBuilders.scriptCondition; +import static org.elasticsearch.watcher.input.InputBuilders.httpInput; +import static org.elasticsearch.watcher.trigger.schedule.Schedules.interval; + +/** + */ +public class HttpInputIntegrationTest extends AbstractWatcherIntegrationTests { + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return ImmutableSettings.builder() + .put(InternalNode.HTTP_ENABLED, true) + .put(super.nodeSettings(nodeOrdinal)) + .build(); + } + + @Override + protected boolean shieldEnabled() { + return false; + } + + @Test + public void testHttpInput() throws Exception { + ScriptServiceProxy sc = scriptService(); + client().prepareIndex("index", "type", "id").setSource("{}").setRefresh(true).get(); + + InetSocketAddress address = internalTestCluster().httpAddresses()[0]; + String body = jsonBuilder().startObject().field("size", 1).endObject().string(); + WatchSourceBuilder source = watchSourceBuilder() + .trigger(TriggerBuilders.schedule(interval("5s"))) + .input(httpInput() + .setHost(address.getHostName()) + .setPort(address.getPort()) + .setPath(new ScriptTemplate(sc, "/index/_search")) + .setBody(new ScriptTemplate(sc, body)) + ) + .condition(scriptCondition("ctx.payload.hits.total == 1")) + .addAction(indexAction("idx", "action")); + watcherClient().preparePutWatch("_name") + .source(source) + .get(); + + if (timeWarped()) { + timeWarp().scheduler().trigger("_name"); + refresh(); + } + assertWatchWithMinimumPerformedActionsCount("_name", 1, false); + } + +} diff --git a/src/test/java/org/elasticsearch/watcher/watch/WatchTests.java b/src/test/java/org/elasticsearch/watcher/watch/WatchTests.java index 6b374138ba2..8f6061935f6 100644 --- a/src/test/java/org/elasticsearch/watcher/watch/WatchTests.java +++ b/src/test/java/org/elasticsearch/watcher/watch/WatchTests.java @@ -12,7 +12,6 @@ import org.elasticsearch.common.collect.ImmutableList; import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.common.netty.handler.codec.http.HttpMethod; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -27,7 +26,6 @@ import org.elasticsearch.watcher.actions.email.service.Email; import org.elasticsearch.watcher.actions.email.service.EmailService; import org.elasticsearch.watcher.actions.email.service.Profile; import org.elasticsearch.watcher.actions.index.IndexAction; -import org.elasticsearch.watcher.actions.webhook.HttpClient; import org.elasticsearch.watcher.actions.webhook.WebhookAction; import org.elasticsearch.watcher.condition.Condition; import org.elasticsearch.watcher.condition.ConditionRegistry; @@ -40,6 +38,8 @@ import org.elasticsearch.watcher.input.simple.SimpleInput; import org.elasticsearch.watcher.support.Script; import org.elasticsearch.watcher.support.WatcherUtils; import org.elasticsearch.watcher.support.clock.SystemClock; +import org.elasticsearch.watcher.support.http.HttpClient; +import org.elasticsearch.watcher.support.http.HttpMethod; import org.elasticsearch.watcher.support.init.proxy.ClientProxy; import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy; import org.elasticsearch.watcher.support.template.ScriptTemplate;