Added http input.

The http input allows to let any http interface be the input for a watch.
The http input can be configured with the following options:
* `method` - Optional http method. (default to GET)
* `host` - The host of the http service.
* `port` - The port of the http service.
* `path` - The url path.
* `params` - Optional url query string options.
* `header` - Optional http header.
* `auth` - Optional authentication http heads.
* `body` - Optional body

The response of the http request is expected to be valid json.

Closes elastic/elasticsearch#157

Original commit: elastic/x-pack-elasticsearch@0b1f122615
This commit is contained in:
Martijn van Groningen 2015-03-26 23:03:31 +01:00
parent a632d57803
commit e00bb69982
25 changed files with 1638 additions and 81 deletions

View File

@ -104,6 +104,13 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.squareup.okhttp</groupId>
<artifactId>mockwebserver</artifactId>
<version>2.3.0</version>
<scope>test</scope>
</dependency>
<!-- Regular dependencies -->
<dependency>

View File

@ -20,6 +20,7 @@ import org.elasticsearch.watcher.rest.WatcherRestModule;
import org.elasticsearch.watcher.shield.WatcherShieldModule;
import org.elasticsearch.watcher.support.TemplateUtils;
import org.elasticsearch.watcher.support.clock.ClockModule;
import org.elasticsearch.watcher.support.http.HttpClientModule;
import org.elasticsearch.watcher.support.init.InitializingModule;
import org.elasticsearch.watcher.support.template.TemplateModule;
import org.elasticsearch.watcher.transform.TransformModule;
@ -42,6 +43,7 @@ public class WatcherModule extends AbstractModule implements SpawnModules {
new InitializingModule(),
new WatchModule(),
new TemplateModule(),
new HttpClientModule(),
new ClockModule(),
new WatcherClientModule(),
new TransformModule(),

View File

@ -5,14 +5,13 @@
*/
package org.elasticsearch.watcher.actions;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.MapBinder;
import org.elasticsearch.watcher.actions.email.EmailAction;
import org.elasticsearch.watcher.actions.email.service.EmailService;
import org.elasticsearch.watcher.actions.email.service.InternalEmailService;
import org.elasticsearch.watcher.actions.index.IndexAction;
import org.elasticsearch.watcher.actions.webhook.HttpClient;
import org.elasticsearch.watcher.actions.webhook.WebhookAction;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.MapBinder;
import java.util.HashMap;
import java.util.Map;
@ -46,7 +45,6 @@ public class ActionModule extends AbstractModule {
}
bind(ActionRegistry.class).asEagerSingleton();
bind(HttpClient.class).asEagerSingleton();
bind(EmailService.class).to(InternalEmailService.class).asEagerSingleton();
}

View File

@ -1,43 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.actions.webhook;
import org.elasticsearch.common.base.Charsets;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.netty.handler.codec.http.HttpMethod;
import org.elasticsearch.common.settings.Settings;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
/**
* Client class to wrap http connections
*/
public class HttpClient extends AbstractComponent {
@Inject
public HttpClient(Settings settings) {
super(settings);
}
public int execute(HttpMethod method, String url, String body) throws IOException {
logger.debug("making [{}] request to [{}]", method.getName(), url);
if (logger.isTraceEnabled()) {
logger.trace("sending [{}] as body of request", body);
}
URL encodedUrl = new URL(URLEncoder.encode(url, Charsets.UTF_8.name()));
HttpURLConnection httpConnection = (HttpURLConnection) encodedUrl.openConnection();
httpConnection.setRequestMethod(method.getName());
httpConnection.setRequestProperty("Accept-Charset", Charsets.UTF_8.name());
httpConnection.setDoOutput(true);
httpConnection.setRequestProperty("Content-Length", Integer.toString(body.length()));
httpConnection.getOutputStream().write(body.getBytes(Charsets.UTF_8.name()));
return httpConnection.getResponseCode();
}
}

View File

@ -5,27 +5,29 @@
*/
package org.elasticsearch.watcher.actions.webhook;
import org.elasticsearch.watcher.WatcherSettingsException;
import org.elasticsearch.watcher.watch.WatchExecutionContext;
import org.elasticsearch.watcher.watch.Payload;
import org.elasticsearch.watcher.actions.Action;
import org.elasticsearch.watcher.actions.ActionException;
import org.elasticsearch.watcher.actions.ActionSettingsException;
import org.elasticsearch.watcher.support.Script;
import org.elasticsearch.watcher.support.Variables;
import org.elasticsearch.watcher.support.template.Template;
import org.elasticsearch.watcher.support.template.XContentTemplate;
import org.elasticsearch.watcher.transform.Transform;
import org.elasticsearch.watcher.transform.TransformRegistry;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.netty.handler.codec.http.HttpMethod;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.WatcherSettingsException;
import org.elasticsearch.watcher.actions.Action;
import org.elasticsearch.watcher.actions.ActionException;
import org.elasticsearch.watcher.actions.ActionSettingsException;
import org.elasticsearch.watcher.support.Script;
import org.elasticsearch.watcher.support.Variables;
import org.elasticsearch.watcher.support.http.HttpClient;
import org.elasticsearch.watcher.support.http.HttpMethod;
import org.elasticsearch.watcher.support.http.HttpResponse;
import org.elasticsearch.watcher.support.template.Template;
import org.elasticsearch.watcher.support.template.XContentTemplate;
import org.elasticsearch.watcher.transform.Transform;
import org.elasticsearch.watcher.transform.TransformRegistry;
import org.elasticsearch.watcher.watch.Payload;
import org.elasticsearch.watcher.watch.WatchExecutionContext;
import java.io.IOException;
import java.util.Locale;
@ -62,17 +64,17 @@ public class WebhookAction extends Action<WebhookAction.Result> {
String urlText = url.render(model);
String bodyText = body != null ? body.render(model) : XContentTemplate.YAML.render(model);
try {
int status = httpClient.execute(method, urlText, bodyText);
if (status >= 400) {
logger.warn("got status [" + status + "] when connecting to [" + urlText + "]");
} else {
if (status >= 300) {
logger.warn("a 200 range return code was expected, but got [" + status + "]");
try (HttpResponse response = httpClient.execute(method, urlText, bodyText)) {
int status = response.status();
if (status >= 400) {
logger.warn("got status [" + status + "] when connecting to [" + urlText + "]");
} else {
if (status >= 300) {
logger.warn("a 200 range return code was expected, but got [" + status + "]");
}
}
return new Result.Executed(status, urlText, bodyText);
}
return new Result.Executed(status, urlText, bodyText);
} catch (IOException ioe) {
logger.error("failed to connect to [{}] for watch [{}]", ioe, urlText, ctx.watch().name());
return new Result.Failure("failed to send http request. " + ioe.getMessage());
@ -87,7 +89,7 @@ public class WebhookAction extends Action<WebhookAction.Result> {
.field(transform.type(), transform)
.endObject();
}
builder.field(Parser.METHOD_FIELD.getPreferredName(), method.getName().toLowerCase(Locale.ROOT));
builder.field(Parser.METHOD_FIELD.getPreferredName(), method.method());
builder.field(Parser.URL_FIELD.getPreferredName(), url);
if (body != null) {
builder.field(Parser.BODY_FIELD.getPreferredName(), body);
@ -222,7 +224,7 @@ public class WebhookAction extends Action<WebhookAction.Result> {
if (METHOD_FIELD.match(currentFieldName)) {
method = HttpMethod.valueOf(parser.text().toUpperCase(Locale.ROOT));
if (method != HttpMethod.POST && method != HttpMethod.GET && method != HttpMethod.PUT) {
throw new ActionSettingsException("could not parse webhook action. unsupported http method [" + method.getName() + "]");
throw new ActionSettingsException("could not parse webhook action. unsupported http method [" + method.method() + "]");
}
} else if (URL_FIELD.match(currentFieldName)) {
try {
@ -339,7 +341,7 @@ public class WebhookAction extends Action<WebhookAction.Result> {
builder.startObject();
builder.field(Parser.URL_FIELD.getPreferredName(), url);
if (method != null) {
builder.field(Parser.METHOD_FIELD.getPreferredName(), method.getName().toLowerCase(Locale.ROOT));
builder.field(Parser.METHOD_FIELD.getPreferredName(), method.method());
}
if (body != null) {
builder.field(Parser.BODY_FIELD.getPreferredName(), body);

View File

@ -7,6 +7,7 @@ package org.elasticsearch.watcher.input;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.watcher.input.http.HttpInput;
import org.elasticsearch.watcher.input.search.SearchInput;
import org.elasticsearch.watcher.input.simple.SimpleInput;
@ -36,4 +37,8 @@ public final class InputBuilders {
public static SimpleInput.SourceBuilder simpleInput(Map<String, Object> data) {
return new SimpleInput.SourceBuilder(data);
}
public static HttpInput.SourceBuilder httpInput() {
return new HttpInput.SourceBuilder();
}
}

View File

@ -5,10 +5,11 @@
*/
package org.elasticsearch.watcher.input;
import org.elasticsearch.watcher.input.search.SearchInput;
import org.elasticsearch.watcher.input.simple.SimpleInput;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.MapBinder;
import org.elasticsearch.watcher.input.http.HttpInput;
import org.elasticsearch.watcher.input.search.SearchInput;
import org.elasticsearch.watcher.input.simple.SimpleInput;
import java.util.HashMap;
import java.util.Map;
@ -26,12 +27,13 @@ public class InputModule extends AbstractModule {
@Override
protected void configure() {
MapBinder<String, Input.Parser> parsersBinder = MapBinder.newMapBinder(binder(), String.class, Input.Parser.class);
bind(SearchInput.Parser.class).asEagerSingleton();
parsersBinder.addBinding(SearchInput.TYPE).to(SearchInput.Parser.class);
bind(SimpleInput.Parser.class).asEagerSingleton();
parsersBinder.addBinding(SimpleInput.TYPE).to(SimpleInput.Parser.class);
bind(HttpInput.Parser.class).asEagerSingleton();
parsersBinder.addBinding(HttpInput.TYPE).to(HttpInput.Parser.class);
for (Map.Entry<String, Class<? extends Input.Parser>> entry : parsers.entrySet()) {
bind(entry.getValue()).asEagerSingleton();

View File

@ -0,0 +1,251 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.input.http;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.watcher.input.Input;
import org.elasticsearch.watcher.support.Variables;
import org.elasticsearch.watcher.support.http.HttpClient;
import org.elasticsearch.watcher.support.http.HttpRequest;
import org.elasticsearch.watcher.support.http.HttpResponse;
import org.elasticsearch.watcher.support.http.TemplatedHttpRequest;
import org.elasticsearch.watcher.support.http.auth.HttpAuth;
import org.elasticsearch.watcher.support.template.Template;
import org.elasticsearch.watcher.watch.Payload;
import org.elasticsearch.watcher.watch.WatchExecutionContext;
import java.io.IOException;
import java.util.Map;
/**
*/
public class HttpInput extends Input<HttpInput.Result> {
public static final String TYPE = "http";
private final HttpClient client;
private final TemplatedHttpRequest request;
public HttpInput(ESLogger logger, HttpClient client, TemplatedHttpRequest request) {
super(logger);
this.request = request;
this.client = client;
}
@Override
public String type() {
return TYPE;
}
@Override
public Result execute(WatchExecutionContext ctx) throws IOException {
Map<String, Object> model = Variables.createCtxModel(ctx, null);
HttpRequest httpRequest = request.render(model);
try (HttpResponse response = client.execute(httpRequest)) {
Tuple<XContentType, Map<String, Object>> result = XContentHelper.convertToMap(response.body(), true);
return new Result(TYPE, new Payload.Simple(result.v2()), httpRequest, response.status());
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return request.toXContent(builder, params);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
HttpInput httpInput = (HttpInput) o;
if (!request.equals(httpInput.request)) return false;
return true;
}
@Override
public int hashCode() {
return request.hashCode();
}
TemplatedHttpRequest getRequest() {
return request;
}
public final static class Result extends Input.Result {
private final HttpRequest request;
private final int statusCode;
public Result(String type, Payload payload, HttpRequest request, int statusCode) {
super(type, payload);
this.request = request;
this.statusCode = statusCode;
}
@Override
protected XContentBuilder toXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.field(Parser.HTTP_STATUS_FIELD.getPreferredName(), statusCode);
builder.field(Parser.REQUEST_FIELD.getPreferredName(), request);
return builder;
}
HttpRequest request() {
return request;
}
int statusCode() {
return statusCode;
}
}
public final static class Parser extends AbstractComponent implements Input.Parser<Result, HttpInput> {
public static final ParseField REQUEST_FIELD = new ParseField("request");
public static final ParseField HTTP_STATUS_FIELD = new ParseField("http_status");
private final HttpClient client;
private final HttpRequest.Parser requestParser;
private final TemplatedHttpRequest.Parser templatedRequestParser;
@Inject
public Parser(Settings settings, HttpClient client, HttpRequest.Parser requestParser, TemplatedHttpRequest.Parser templatedRequestParser) {
super(settings);
this.client = client;
this.requestParser = requestParser;
this.templatedRequestParser = templatedRequestParser;
}
@Override
public String type() {
return TYPE;
}
@Override
public HttpInput parse(XContentParser parser) throws IOException {
TemplatedHttpRequest request = templatedRequestParser.parse(parser);
return new HttpInput(logger, client, request);
}
@Override
public Result parseResult(XContentParser parser) throws IOException {
Payload payload = null;
HttpRequest request = null;
int statusCode = -1;
XContentParser.Token token;
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT) {
if (Input.Result.PAYLOAD_FIELD.match(currentFieldName)) {
payload = new Payload.XContent(parser);
} else if (REQUEST_FIELD.match(currentFieldName)) {
request = requestParser.parse(parser);
}
} else if (token == XContentParser.Token.VALUE_NUMBER) {
statusCode = parser.intValue();
}
}
return new Result(TYPE, payload, request, statusCode);
}
}
public final static class SourceBuilder implements Input.SourceBuilder {
private String host;
private int port;
private String method;
private Template path;
private Map<String, Template> params;
private Map<String, Template> headers;
private HttpAuth auth;
private Template body;
public SourceBuilder setHost(String host) {
this.host = host;
return this;
}
public SourceBuilder setPort(int port) {
this.port = port;
return this;
}
public SourceBuilder setMethod(String method) {
this.method = method;
return this;
}
public SourceBuilder setPath(Template path) {
this.path = path;
return this;
}
public SourceBuilder setParams(Map<String, Template> params) {
this.params = params;
return this;
}
public SourceBuilder setHeaders(Map<String, Template> headers) {
this.headers = headers;
return this;
}
public SourceBuilder setAuth(HttpAuth auth) {
this.auth = auth;
return this;
}
public SourceBuilder setBody(Template body) {
this.body = body;
return this;
}
@Override
public String type() {
return TYPE;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params p) throws IOException {
builder.startObject();
builder.field(HttpRequest.Parser.HOST_FIELD.getPreferredName(), host);
builder.field(HttpRequest.Parser.PORT_FIELD.getPreferredName(), port);
if (method != null) {
builder.field(HttpRequest.Parser.METHOD_FIELD.getPreferredName(), method);
}
if (path != null) {
builder.field(HttpRequest.Parser.PATH_FIELD.getPreferredName(), path);
}
if (params != null) {
builder.field(HttpRequest.Parser.PARAMS_FIELD.getPreferredName(), params);
}
if (headers != null) {
builder.field(HttpRequest.Parser.HEADERS_FIELD.getPreferredName(), headers);
}
if (auth != null) {
builder.field(HttpRequest.Parser.AUTH_FIELD.getPreferredName(), auth);
}
if (body != null) {
builder.field(HttpRequest.Parser.BODY_FIELD.getPreferredName(), body);
}
return builder.endObject();
}
}
}

View File

@ -0,0 +1,92 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.support.http;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.base.Charsets;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import java.io.IOException;
import java.net.*;
import java.util.Map;
/**
* Client class to wrap http connections
*/
public class HttpClient extends AbstractComponent {
@Inject
public HttpClient(Settings settings) {
super(settings);
}
// TODO: Remove this when webhook action has been refactored to use this client properly
public HttpResponse execute(HttpMethod method, String urlString, String body) throws IOException {
URL url = new URL(urlString);
HttpRequest request = new HttpRequest();
request.method(method);
return doExecute(url, request);
}
public HttpResponse execute(HttpRequest request) throws IOException {
String queryString = null;
if (request.params() != null) {
StringBuilder builder = new StringBuilder();
for (Map.Entry<String, String> entry : request.params().entrySet()) {
if (builder.length() != 0) {
builder.append('&');
}
builder.append(URLEncoder.encode(entry.getKey(), "utf-8"))
.append('=')
.append(URLEncoder.encode(entry.getValue(), "utf-8"));
}
queryString = builder.toString();
}
URI uri;
try {
uri = new URI("http", null, request.host(), request.port(), request.path(), queryString, null);
} catch (URISyntaxException e) {
throw ExceptionsHelper.convertToElastic(e);
}
URL url = uri.toURL();
return doExecute(url, request);
}
// TODO: Embed this method in execute() when webhook action has been refactored to use this client properly
private HttpResponse doExecute(URL url, HttpRequest request) throws IOException {
logger.debug("making [{}] request to [{}]", request.method().method(), url);
logger.trace("sending [{}] as body of request", request.body());
HttpURLConnection urlConnection = (HttpURLConnection) url.openConnection();
urlConnection.setRequestMethod(request.method().method());
if (request.headers() != null) {
for (Map.Entry<String, String> entry : request.headers().entrySet()) {
urlConnection.setRequestProperty(entry.getKey(), entry.getValue());
}
}
if (request.auth() != null) {
request.auth().update(urlConnection);
}
urlConnection.setUseCaches(false);
urlConnection.setRequestProperty("Accept-Charset", Charsets.UTF_8.name());
if (request.body() != null) {
urlConnection.setDoOutput(true);
byte[] bytes = request.body().getBytes(Charsets.UTF_8.name());
urlConnection.setRequestProperty("Content-Length", String.valueOf(bytes.length));
urlConnection.getOutputStream().write(bytes);
urlConnection.getOutputStream().close();
}
HttpResponse response = new HttpResponse();
response.inputStream(urlConnection.getInputStream());
response.status(urlConnection.getResponseCode());
logger.debug("http status code: {}", response.status());
return response;
}
}

View File

@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.support.http;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.SpawnModules;
import org.elasticsearch.watcher.support.http.auth.AuthModule;
/**
*/
public class HttpClientModule extends AbstractModule implements SpawnModules {
@Override
public Iterable<? extends Module> spawnModules() {
return ImmutableList.of(new AuthModule());
}
@Override
protected void configure() {
bind(TemplatedHttpRequest.Parser.class).asEagerSingleton();
bind(HttpRequest.Parser.class).asEagerSingleton();
bind(HttpClient.class).asEagerSingleton();
}
}

View File

@ -0,0 +1,49 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.support.http;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import java.util.Locale;
/**
*/
public enum HttpMethod {
HEAD("HEAD"),
GET("GET"),
POST("POST"),
PUT("PUT"),
DELETE("DELETE");
private final String method;
HttpMethod(String method) {
this.method = method;
}
public String method() {
return method;
}
public static HttpMethod parse(String value) {
value = value.toUpperCase(Locale.ROOT);
switch (value) {
case "HEAD":
return HEAD;
case "GET":
return GET;
case "POST":
return POST;
case "PUT":
return PUT;
case "DELETE":
return DELETE;
default:
throw new ElasticsearchIllegalArgumentException("unsupported http method [" + value + "]");
}
}
}

View File

@ -0,0 +1,220 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.support.http;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.support.WatcherUtils;
import org.elasticsearch.watcher.support.http.auth.HttpAuth;
import org.elasticsearch.watcher.support.http.auth.HttpAuthRegistry;
import java.io.IOException;
import java.util.Map;
public class HttpRequest implements ToXContent {
private String host;
private int port;
private HttpMethod method;
private String path;
private Map<String, String> params;
private Map<String, String> headers;
private HttpAuth auth;
private String body;
public HttpRequest() {
method = HttpMethod.GET;
}
public String host() {
return host;
}
public void host(String host) {
this.host = host;
}
public int port() {
return port;
}
public void port(int port) {
this.port = port;
}
public HttpMethod method() {
return method;
}
public void method(HttpMethod method) {
this.method = method;
}
public String path() {
return path;
}
public void path(String path) {
this.path = path;
}
public Map<String, String> params() {
return params;
}
public void params(Map<String, String> params) {
this.params = params;
}
public Map<String, String> headers() {
return headers;
}
public void headers(Map<String, String> headers) {
this.headers = headers;
}
public HttpAuth auth() {
return auth;
}
public void auth(HttpAuth auth) {
this.auth = auth;
}
public String body() {
return body;
}
public void body(String body) {
this.body = body;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
builder.field(Parser.HOST_FIELD.getPreferredName(), host);
builder.field(Parser.PORT_FIELD.getPreferredName(), port);
builder.field(Parser.METHOD_FIELD.getPreferredName(), method);
if (path != null) {
builder.field(Parser.PATH_FIELD.getPreferredName(), path);
}
if (this.params != null) {
builder.startObject(Parser.PARAMS_FIELD.getPreferredName()).value(this.params).endObject();
}
if (headers != null) {
builder.startObject(Parser.HEADERS_FIELD.getPreferredName()).value(headers).endObject();
}
if (auth != null) {
builder.field(Parser.AUTH_FIELD.getPreferredName(), auth);
}
if (body != null) {
builder.field(Parser.BODY_FIELD.getPreferredName(), body);
}
return builder.endObject();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
HttpRequest request = (HttpRequest) o;
if (port != request.port) return false;
if (auth != null ? !auth.equals(request.auth) : request.auth != null) return false;
if (body != null ? !body.equals(request.body) : request.body != null) return false;
if (headers != null ? !headers.equals(request.headers) : request.headers != null) return false;
if (!host.equals(request.host)) return false;
if (method != request.method) return false;
if (params != null ? !params.equals(request.params) : request.params != null) return false;
if (path != null ? !path.equals(request.path) : request.path != null) return false;
return true;
}
@Override
public int hashCode() {
int result = host.hashCode();
result = 31 * result + port;
result = 31 * result + method.hashCode();
result = 31 * result + (path != null ? path.hashCode() : 0);
result = 31 * result + (params != null ? params.hashCode() : 0);
result = 31 * result + (headers != null ? headers.hashCode() : 0);
result = 31 * result + (auth != null ? auth.hashCode() : 0);
result = 31 * result + (body != null ? body.hashCode() : 0);
return result;
}
public static class Parser {
public static final ParseField HOST_FIELD = new ParseField("host");
public static final ParseField PORT_FIELD = new ParseField("port");
public static final ParseField METHOD_FIELD = new ParseField("method");
public static final ParseField PATH_FIELD = new ParseField("path");
public static final ParseField PARAMS_FIELD = new ParseField("params");
public static final ParseField HEADERS_FIELD = new ParseField("headers");
public static final ParseField AUTH_FIELD = new ParseField("auth");
public static final ParseField BODY_FIELD = new ParseField("body");
private final HttpAuthRegistry httpAuthRegistry;
@Inject
public Parser(HttpAuthRegistry httpAuthRegistry) {
this.httpAuthRegistry = httpAuthRegistry;
}
public HttpRequest parse(XContentParser parser) throws IOException {
HttpRequest request = new HttpRequest();
XContentParser.Token token;
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT) {
if (HEADERS_FIELD.match(currentFieldName)) {
request.headers((Map) WatcherUtils.flattenModel(parser.map()));
} else if (PARAMS_FIELD.match(currentFieldName)) {
request.params((Map) WatcherUtils.flattenModel(parser.map()));
} else if (AUTH_FIELD.match(currentFieldName)) {
request.auth(httpAuthRegistry.parse(parser));
} else if (BODY_FIELD.match(currentFieldName)) {
request.body(parser.text());
} else {
throw new ElasticsearchParseException("could not parse http request. unexpected field [" + currentFieldName + "]");
}
} else if (token == XContentParser.Token.VALUE_STRING) {
if (METHOD_FIELD.match(currentFieldName)) {
request.method(HttpMethod.parse(parser.text()));
} else if (HOST_FIELD.match(currentFieldName)) {
request.host(parser.text());
} else if (PATH_FIELD.match(currentFieldName)) {
request.path(parser.text());
} else if (BODY_FIELD.match(currentFieldName)) {
request.body(parser.text());
} else {
throw new ElasticsearchParseException("could not parse http request. unexpected field [" + currentFieldName + "]");
}
} else if (token == XContentParser.Token.VALUE_NUMBER) {
if (PORT_FIELD.match(currentFieldName)) {
request.port(parser.intValue());
} else {
throw new ElasticsearchParseException("could not parse http request. unexpected field [" + currentFieldName + "]");
}
} else {
throw new ElasticsearchParseException("could not parse http request. unexpected token [" + token + "]");
}
}
return request;
}
}
}

View File

@ -0,0 +1,49 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.support.http;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.io.ByteStreams;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
public class HttpResponse implements Closeable {
private int status;
private InputStream inputStream;
private byte[] body;
public int status() {
return status;
}
public void status(int status) {
this.status = status;
}
public byte[] body() {
if (body == null) {
try {
body = ByteStreams.toByteArray(inputStream);
inputStream.close();
} catch (IOException e) {
throw ExceptionsHelper.convertToElastic(e);
}
}
return body;
}
public void inputStream(InputStream inputStream) {
this.inputStream = inputStream;
}
@Override
public void close() throws IOException {
inputStream.close();
}
}

View File

@ -0,0 +1,243 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.support.http;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.support.http.auth.HttpAuth;
import org.elasticsearch.watcher.support.http.auth.HttpAuthRegistry;
import org.elasticsearch.watcher.support.template.Template;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
*/
public class TemplatedHttpRequest implements ToXContent {
private String host;
private int port;
private HttpMethod method;
private Template path;
private Map<String, Template> params;
private Map<String, Template> headers;
private HttpAuth auth;
private Template body;
public TemplatedHttpRequest() {
method = HttpMethod.GET;
}
public String host() {
return host;
}
public void host(String host) {
this.host = host;
}
public int port() {
return port;
}
public void port(int port) {
this.port = port;
}
public HttpMethod method() {
return method;
}
public void method(HttpMethod method) {
this.method = method;
}
public Template path() {
return path;
}
public void path(Template path) {
this.path = path;
}
public Map<String, Template> params() {
return params;
}
public void params(Map<String, Template> params) {
this.params = params;
}
public Map<String, Template> headers() {
return headers;
}
public void headers(Map<String, Template> headers) {
this.headers = headers;
}
public HttpAuth auth() {
return auth;
}
public void auth(HttpAuth auth) {
this.auth = auth;
}
public Template body() {
return body;
}
public void body(Template body) {
this.body = body;
}
public HttpRequest render(Map<String, Object> model) {
HttpRequest copy = new HttpRequest();
copy.host(host);
copy.port(port);
copy.method(method);
if (path != null) {
copy.path(path.render(model));
}
if (params != null) {
MapBuilder<String, String> mapBuilder = MapBuilder.newMapBuilder();
for (Map.Entry<String, Template> entry : params.entrySet()) {
mapBuilder.put(entry.getKey(), entry.getValue().render(model));
}
copy.params(mapBuilder.map());
}
if (headers != null) {
MapBuilder<String, String> mapBuilder = MapBuilder.newMapBuilder();
for (Map.Entry<String, Template> entry : headers.entrySet()) {
mapBuilder.put(entry.getKey(), entry.getValue().render(model));
}
copy.headers(mapBuilder.map());
}
copy.auth(auth);
if (body != null) {
copy.body(body.render(model));
}
return copy;
}
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
builder.field(Parser.HOST_FIELD.getPreferredName(), host);
builder.field(Parser.PORT_FIELD.getPreferredName(), port);
builder.field(Parser.METHOD_FIELD.getPreferredName(), method);
if (path != null) {
builder.field(Parser.PATH_FIELD.getPreferredName(), path);
}
if (this.params != null) {
builder.startObject(Parser.PARAMS_FIELD.getPreferredName()).value(this.params).endObject();
}
if (headers != null) {
builder.startObject(Parser.HEADERS_FIELD.getPreferredName()).value(headers).endObject();
}
if (auth != null) {
builder.field(Parser.AUTH_FIELD.getPreferredName(), auth);
}
if (body != null) {
builder.field(Parser.BODY_FIELD.getPreferredName(), body);
}
return builder.endObject();
}
public static class Parser {
public static final ParseField HOST_FIELD = new ParseField("host");
public static final ParseField PORT_FIELD = new ParseField("port");
public static final ParseField METHOD_FIELD = new ParseField("method");
public static final ParseField PATH_FIELD = new ParseField("path");
public static final ParseField PARAMS_FIELD = new ParseField("params");
public static final ParseField HEADERS_FIELD = new ParseField("headers");
public static final ParseField AUTH_FIELD = new ParseField("auth");
public static final ParseField BODY_FIELD = new ParseField("body");
private final Template.Parser templateParser;
private final HttpAuthRegistry httpAuthRegistry;
@Inject
public Parser(Template.Parser templateParser, HttpAuthRegistry httpAuthRegistry) {
this.templateParser = templateParser;
this.httpAuthRegistry = httpAuthRegistry;
}
public TemplatedHttpRequest parse(XContentParser parser) throws IOException {
TemplatedHttpRequest request = new TemplatedHttpRequest();
XContentParser.Token token;
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT) {
if (PATH_FIELD.match(currentFieldName)) {
request.path(templateParser.parse(parser));
} else if (HEADERS_FIELD.match(currentFieldName)) {
request.headers(parseTemplates(parser));
} else if (PARAMS_FIELD.match(currentFieldName)) {
request.params(parseTemplates(parser));
} else if (AUTH_FIELD.match(currentFieldName)) {
request.auth(httpAuthRegistry.parse(parser));
} else if (BODY_FIELD.match(currentFieldName)) {
request.body(templateParser.parse(parser));
} else {
throw new ElasticsearchParseException("could not parse templated http request. unexpected field [" + currentFieldName + "]");
}
} else if (token == XContentParser.Token.VALUE_STRING) {
if (METHOD_FIELD.match(currentFieldName)) {
request.method(HttpMethod.parse(parser.text()));
} else if (HOST_FIELD.match(currentFieldName)) {
request.host(parser.text());
} else if (PATH_FIELD.match(currentFieldName)) {
request.path(templateParser.parse(parser));
} else if (BODY_FIELD.match(currentFieldName)) {
request.body(templateParser.parse(parser));
} else {
throw new ElasticsearchParseException("could not parse templated http request. unexpected field [" + currentFieldName + "]");
}
} else if (token == XContentParser.Token.VALUE_NUMBER) {
if (PORT_FIELD.match(currentFieldName)) {
request.port(parser.intValue());
} else {
throw new ElasticsearchParseException("could not parse templated http request. unexpected field [" + currentFieldName + "]");
}
} else {
throw new ElasticsearchParseException("could not parse templated http request. unexpected token [" + token + "]");
}
}
return request;
}
private Map<String, Template> parseTemplates(XContentParser parser) throws IOException {
Map<String, Template> templates = new HashMap<>();
String currentFieldName = null;
for (XContentParser.Token token = parser.nextToken(); token != XContentParser.Token.END_OBJECT; token = parser.nextToken()) {
switch (token) {
case FIELD_NAME:
currentFieldName = parser.currentName();
break;
case VALUE_STRING:
case START_OBJECT:
templates.put(currentFieldName, templateParser.parse(parser));
break;
default:
throw new ElasticsearchParseException("could not parse templated http request. unexpected token [" + token + "]");
}
}
return templates;
}
}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.support.http.auth;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.MapBinder;
/**
*/
public class AuthModule extends AbstractModule {
@Override
protected void configure() {
MapBinder<String, HttpAuth.Parser> parsersBinder = MapBinder.newMapBinder(binder(), String.class, HttpAuth.Parser.class);
bind(BasicAuth.Parser.class).asEagerSingleton();
parsersBinder.addBinding(BasicAuth.TYPE).to(BasicAuth.Parser.class);
bind(HttpAuthRegistry.class).asEagerSingleton();
}
}

View File

@ -0,0 +1,113 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.support.http.auth;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.Base64;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.HttpURLConnection;
import java.util.Locale;
/**
*/
public class BasicAuth extends HttpAuth {
public static final String TYPE = "basic";
private final String username;
private final String password;
private final String basicAuth;
public BasicAuth(String username, String password) throws UnsupportedEncodingException {
this.username = username;
this.password = password;
basicAuth = "Basic " + Base64.encodeBytes(String.format(Locale.ROOT, "%s:%s", username, password).getBytes("utf-8"));
}
public String type() {
return TYPE;
}
@Override
public XContentBuilder innerToXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Parser.USERNAME_FIELD.getPreferredName(), username);
builder.field(Parser.PASSWORD_FIELD.getPreferredName(), password);
return builder.endObject();
}
public void update(HttpURLConnection connection) {
connection.setRequestProperty("Authorization", basicAuth);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
BasicAuth basicAuth = (BasicAuth) o;
if (!password.equals(basicAuth.password)) return false;
if (!username.equals(basicAuth.username)) return false;
return true;
}
@Override
public int hashCode() {
int result = username.hashCode();
result = 31 * result + password.hashCode();
return result;
}
public static class Parser implements HttpAuth.Parser<BasicAuth> {
static final ParseField USERNAME_FIELD = new ParseField("username");
static final ParseField PASSWORD_FIELD = new ParseField("password");
public String type() {
return TYPE;
}
public BasicAuth parse(XContentParser parser) throws IOException {
String username = null;
String password = null;
String fieldName = null;
XContentParser.Token token;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
fieldName = parser.currentName();
} else if (token == XContentParser.Token.VALUE_STRING) {
if (USERNAME_FIELD.getPreferredName().equals(fieldName)) {
username = parser.text();
} else if (PASSWORD_FIELD.getPreferredName().equals(fieldName)) {
password = parser.text();
} else {
throw new ElasticsearchParseException("unsupported field [" + fieldName + "]");
}
} else {
throw new ElasticsearchParseException("unsupported token [" + token + "]");
}
}
if (username == null) {
throw new HttpAuthException("username is a required option");
}
if (password == null) {
throw new HttpAuthException("password is a required option");
}
return new BasicAuth(username, password);
}
}
}

View File

@ -0,0 +1,39 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.support.http.auth;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.net.HttpURLConnection;
public abstract class HttpAuth implements ToXContent {
public abstract String type();
public abstract void update(HttpURLConnection connection);
@Override
public final XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(type());
builder = innerToXContent(builder, params);
return builder.endObject();
}
public abstract XContentBuilder innerToXContent(XContentBuilder builder, Params params) throws IOException;
public static interface Parser<Auth extends HttpAuth> {
String type();
Auth parse(XContentParser parser) throws IOException;
}
}

View File

@ -0,0 +1,21 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.support.http.auth;
import org.elasticsearch.watcher.WatcherException;
/**
*/
public class HttpAuthException extends WatcherException {
public HttpAuthException(String msg) {
super(msg);
}
public HttpAuthException(String msg, Throwable cause) {
super(msg, cause);
}
}

View File

@ -0,0 +1,45 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.support.http.auth;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.Map;
/**
*
*/
public class HttpAuthRegistry {
private final ImmutableMap<String, HttpAuth.Parser> parsers;
@Inject
public HttpAuthRegistry(Map<String, HttpAuth.Parser> parsers) {
this.parsers = ImmutableMap.copyOf(parsers);
}
public HttpAuth parse(XContentParser parser) throws IOException {
String type = null;
XContentParser.Token token;
HttpAuth auth = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
type = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT && type != null) {
HttpAuth.Parser inputParser = parsers.get(type);
if (inputParser == null) {
throw new HttpAuthException("unknown http auth type [" + type + "]");
}
auth = inputParser.parse(parser);
}
}
return auth;
}
}

View File

@ -0,0 +1,243 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.input.http;
import com.carrotsearch.ant.tasks.junit4.dependencies.com.google.common.collect.ImmutableMap;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.joda.time.DateTimeZone;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.watcher.actions.Action;
import org.elasticsearch.watcher.actions.Actions;
import org.elasticsearch.watcher.condition.simple.AlwaysTrueCondition;
import org.elasticsearch.watcher.input.Input;
import org.elasticsearch.watcher.input.simple.SimpleInput;
import org.elasticsearch.watcher.support.clock.ClockMock;
import org.elasticsearch.watcher.support.http.*;
import org.elasticsearch.watcher.support.http.auth.BasicAuth;
import org.elasticsearch.watcher.support.http.auth.HttpAuth;
import org.elasticsearch.watcher.support.http.auth.HttpAuthRegistry;
import org.elasticsearch.watcher.support.template.Template;
import org.elasticsearch.watcher.trigger.schedule.IntervalSchedule;
import org.elasticsearch.watcher.trigger.schedule.ScheduleTrigger;
import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent;
import org.elasticsearch.watcher.watch.Payload;
import org.elasticsearch.watcher.watch.Watch;
import org.elasticsearch.watcher.watch.WatchExecutionContext;
import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyMap;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
*/
public class HttpInputTests extends ElasticsearchTestCase {
private HttpClient httpClient;
private HttpInput.Parser httpParser;
@Before
public void init() throws Exception {
httpClient = mock(HttpClient.class);
Template.Parser templateParser = new MockTemplate.Parser();
HttpAuthRegistry registry = new HttpAuthRegistry(ImmutableMap.of("basic", new BasicAuth.Parser()));
httpParser = new HttpInput.Parser(
ImmutableSettings.EMPTY, httpClient, new HttpRequest.Parser(registry), new TemplatedHttpRequest.Parser(templateParser, registry)
);
}
@Test
public void testExecute() throws Exception {
String host = "_host";
int port = 123;
String body = "_body";
TemplatedHttpRequest request = new TemplatedHttpRequest();
request.method(HttpMethod.POST);
request.host(host);
request.port(port);
Template mockBody = mock(Template.class);
when(mockBody.render(anyMap())).thenReturn(body);
request.body(mockBody);
HttpInput input = new HttpInput(logger, httpClient, request);
HttpResponse response = new HttpResponse();
response.status(123);
response.inputStream(new ByteArrayInputStream("{\"key\" : \"value\"}".getBytes(UTF8)));
when(httpClient.execute(any(HttpRequest.class))).thenReturn(response);
Watch watch = new Watch("test-watch",
new ClockMock(),
new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.MINUTES))),
new SimpleInput(logger, new Payload.Simple()),
new AlwaysTrueCondition(logger),
null,
new Actions(new ArrayList<Action>()),
null,
null,
new Watch.Status());
WatchExecutionContext ctx = new WatchExecutionContext("test-watch1",
watch,
new DateTime(0, DateTimeZone.UTC),
new ScheduleTriggerEvent(new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC)));
HttpInput.Result result = input.execute(ctx);
assertThat(result.type(), equalTo(HttpInput.TYPE));
assertThat(result.payload().data(), equalTo(MapBuilder.<String, Object>newMapBuilder().put("key", "value").map()));
}
@Test
@Repeat(iterations = 12)
public void testParser() throws Exception {
final String httpMethod = randomFrom("PUT", "POST", "GET", "DELETE", "HEAD", null);
String host = randomAsciiOfLength(3);
int port = randomInt();
Template path = new MockTemplate(randomAsciiOfLength(3));
String body = randomBoolean() ? randomAsciiOfLength(3) : null;
Map<String, Template> params = randomBoolean() ? new MapBuilder<String, Template>().put("a", new MockTemplate("b")).map() : null;
Map<String, Template> headers = randomBoolean() ? new MapBuilder<String, Template>().put("c", new MockTemplate("d")).map() : null;
HttpAuth auth = randomBoolean() ? new BasicAuth("username", "password") : null;
HttpInput.SourceBuilder sourceBuilder = new HttpInput.SourceBuilder()
.setMethod(httpMethod)
.setHost(host)
.setPort(port)
.setPath(path)
.setBody(body != null ? new MockTemplate(body) : null)
.setParams(params)
.setHeaders(headers)
.setAuth(auth);
XContentParser parser = XContentHelper.createParser(jsonBuilder().value(sourceBuilder).bytes());
parser.nextToken();
HttpInput result = httpParser.parse(parser);
assertThat(result.type(), equalTo(HttpInput.TYPE));
assertThat(result.getRequest().method().method(), equalTo(httpMethod != null ? httpMethod : "GET")); // get is the default
assertThat(result.getRequest().host(), equalTo(host));
assertThat(result.getRequest().port(), equalTo(port));
assertThat(result.getRequest().path(), equalTo(path));
assertThat(result.getRequest().params(), equalTo(params));
assertThat(result.getRequest().headers(), equalTo(headers));
assertThat(result.getRequest().auth(), equalTo(auth));
if (body != null) {
assertThat(result.getRequest().body().render(Collections.<String, Object>emptyMap()), equalTo(body));
} else {
assertThat(result.getRequest().body(), nullValue());
}
}
@Test(expected = ElasticsearchIllegalArgumentException.class)
public void testParser_invalidHttpMethod() throws Exception {
Map<String, Template> headers = new MapBuilder<String, Template>().put("a", new MockTemplate("b")).map();
HttpInput.SourceBuilder sourceBuilder = new HttpInput.SourceBuilder()
.setMethod("_method")
.setHost("_host")
.setPort(123)
.setBody(new MockTemplate("_body"))
.setHeaders(headers);
XContentParser parser = XContentHelper.createParser(jsonBuilder().value(sourceBuilder).bytes());
parser.nextToken();
httpParser.parse(parser);
}
@Test
public void testParseResult() throws Exception {
String httpMethod = "get";
String body = "_body";
Map<String, Template> headers = new MapBuilder<String, Template>().put("a", new MockTemplate("b")).map();
HttpInput.SourceBuilder sourceBuilder = new HttpInput.SourceBuilder()
.setMethod(httpMethod)
.setHost("_host")
.setPort(123)
.setBody(new MockTemplate(body))
.setHeaders(headers);
Map<String, Object> payload = MapBuilder.<String, Object>newMapBuilder().put("x", "y").map();
XContentBuilder builder = jsonBuilder().startObject();
builder.field(HttpInput.Parser.HTTP_STATUS_FIELD.getPreferredName(), 123);
builder.field(HttpInput.Parser.REQUEST_FIELD.getPreferredName(), sourceBuilder);
builder.field(Input.Result.PAYLOAD_FIELD.getPreferredName(), payload);
builder.endObject();
XContentParser parser = XContentHelper.createParser(builder.bytes());
parser.nextToken();
HttpInput.Result result = httpParser.parseResult(parser);
assertThat(result.type(), equalTo(HttpInput.TYPE));
assertThat(result.payload().data(), equalTo(payload));
assertThat(result.statusCode(), equalTo(123));
assertThat(result.request().method().method(), equalTo("GET"));
assertThat(result.request().headers().size(), equalTo(headers.size()));
for (Map.Entry<String, Template> entry : headers.entrySet()) {
assertThat(entry.getValue().render(Collections.<String, Object>emptyMap()), equalTo(result.request().headers().get(entry.getKey())));
}
assertThat(result.request().host(), equalTo("_host"));
assertThat(result.request().port(), equalTo(123));
assertThat(result.request().body(), equalTo("_body"));
}
private static class MockTemplate implements Template {
private final String value;
private MockTemplate(String value) {
this.value = value;
}
@Override
public String render(Map<String, Object> model) {
return value;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.value(value);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MockTemplate that = (MockTemplate) o;
if (!value.equals(that.value)) return false;
return true;
}
@Override
public int hashCode() {
return value.hashCode();
}
static class Parser implements Template.Parser {
@Override
public Template parse(XContentParser parser) throws IOException, ParseException {
String value = parser.text();
return new MockTemplate(value);
}
}
}
}

View File

@ -0,0 +1,95 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.support.http;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
import com.squareup.okhttp.mockwebserver.MockResponse;
import com.squareup.okhttp.mockwebserver.MockWebServer;
import com.squareup.okhttp.mockwebserver.RecordedRequest;
import org.elasticsearch.common.base.Charsets;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.watcher.support.http.auth.BasicAuth;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.hamcrest.Matchers.equalTo;
/**
*/
public class HttpClientTest extends ElasticsearchTestCase {
private MockWebServer webServer;
private HttpClient httpClient;
@Before
public void init() throws Exception {
webServer = new MockWebServer();
webServer.start(9200);
httpClient = new HttpClient(ImmutableSettings.EMPTY);
}
@After
public void after() throws Exception {
webServer.shutdown();
}
@Test
@Repeat(iterations = 10)
public void testBasics() throws Exception {
int responseCode = randomIntBetween(200, 203);
String body = randomAsciiOfLengthBetween(2, 8096);
webServer.enqueue(new MockResponse().setResponseCode(responseCode).setBody(body));
HttpRequest request = new HttpRequest();
request.method(HttpMethod.POST);
request.host("localhost");
request.port(9200);
request.path("/" + randomAsciiOfLength(5));
String paramKey;
String paramValue;
request.params(MapBuilder.<String, String>newMapBuilder()
.put(paramKey = randomAsciiOfLength(3), paramValue = randomAsciiOfLength(3))
.map());
String headerKey;
String headerValue;
request.headers(MapBuilder.<String, String>newMapBuilder()
.put(headerKey = randomAsciiOfLength(3), headerValue = randomAsciiOfLength(3))
.map());
request.body(randomAsciiOfLength(5));
HttpResponse response = httpClient.execute(request);
RecordedRequest recordedRequest = webServer.takeRequest();
assertThat(response.status(), equalTo(responseCode));
assertThat(new String(response.body(), Charsets.UTF_8), equalTo(body));
assertThat(webServer.getRequestCount(), equalTo(1));
assertThat(recordedRequest.getBody().readString(Charsets.UTF_8), equalTo(request.body()));
assertThat(recordedRequest.getPath().split("\\?")[0], equalTo(request.path()));
assertThat(recordedRequest.getPath().split("\\?")[1], equalTo(paramKey + "=" + paramValue));
assertThat(recordedRequest.getHeader(headerKey), equalTo(headerValue));
}
@Test
public void testBasicAuth() throws Exception {
webServer.enqueue(new MockResponse().setResponseCode(200).setBody("body"));
HttpRequest request = new HttpRequest();
request.method(HttpMethod.POST);
request.host("localhost");
request.port(9200);
request.path("/test");
request.auth(new BasicAuth("user", "pass"));
request.body("body");
HttpResponse response = httpClient.execute(request);
assertThat(response.status(), equalTo(200));
assertThat(new String(response.body(), Charsets.UTF_8), equalTo("body"));
RecordedRequest recordedRequest = webServer.takeRequest();
assertThat(recordedRequest.getHeader("Authorization"), equalTo("Basic dXNlcjpwYXNz"));
}
}

View File

@ -40,12 +40,12 @@ import org.elasticsearch.watcher.actions.email.service.Authentication;
import org.elasticsearch.watcher.actions.email.service.Email;
import org.elasticsearch.watcher.actions.email.service.EmailService;
import org.elasticsearch.watcher.actions.email.service.Profile;
import org.elasticsearch.watcher.actions.webhook.HttpClient;
import org.elasticsearch.watcher.client.WatcherClient;
import org.elasticsearch.watcher.history.HistoryStore;
import org.elasticsearch.watcher.history.WatchRecord;
import org.elasticsearch.watcher.support.WatcherUtils;
import org.elasticsearch.watcher.support.clock.ClockMock;
import org.elasticsearch.watcher.support.http.HttpClient;
import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.watcher.support.template.Template;
import org.elasticsearch.watcher.transport.actions.stats.WatcherStatsResponse;

View File

@ -10,9 +10,7 @@ import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.joda.time.DateTimeZone;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.netty.handler.codec.http.HttpMethod;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.search.builder.SearchSourceBuilder;
@ -24,13 +22,14 @@ import org.elasticsearch.watcher.actions.email.service.Authentication;
import org.elasticsearch.watcher.actions.email.service.Email;
import org.elasticsearch.watcher.actions.email.service.EmailService;
import org.elasticsearch.watcher.actions.email.service.Profile;
import org.elasticsearch.watcher.actions.webhook.HttpClient;
import org.elasticsearch.watcher.actions.webhook.WebhookAction;
import org.elasticsearch.watcher.condition.script.ScriptCondition;
import org.elasticsearch.watcher.input.search.SearchInput;
import org.elasticsearch.watcher.support.Script;
import org.elasticsearch.watcher.support.WatcherUtils;
import org.elasticsearch.watcher.support.clock.SystemClock;
import org.elasticsearch.watcher.support.http.HttpClient;
import org.elasticsearch.watcher.support.http.HttpMethod;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.watcher.support.template.ScriptTemplate;

View File

@ -0,0 +1,72 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.test.integration;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.watcher.client.WatchSourceBuilder;
import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.watcher.support.template.ScriptTemplate;
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
import org.elasticsearch.watcher.trigger.TriggerBuilders;
import org.junit.Test;
import java.net.InetSocketAddress;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.watcher.actions.ActionBuilders.indexAction;
import static org.elasticsearch.watcher.client.WatchSourceBuilder.watchSourceBuilder;
import static org.elasticsearch.watcher.condition.ConditionBuilders.scriptCondition;
import static org.elasticsearch.watcher.input.InputBuilders.httpInput;
import static org.elasticsearch.watcher.trigger.schedule.Schedules.interval;
/**
*/
public class HttpInputIntegrationTest extends AbstractWatcherIntegrationTests {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return ImmutableSettings.builder()
.put(InternalNode.HTTP_ENABLED, true)
.put(super.nodeSettings(nodeOrdinal))
.build();
}
@Override
protected boolean shieldEnabled() {
return false;
}
@Test
public void testHttpInput() throws Exception {
ScriptServiceProxy sc = scriptService();
client().prepareIndex("index", "type", "id").setSource("{}").setRefresh(true).get();
InetSocketAddress address = internalTestCluster().httpAddresses()[0];
String body = jsonBuilder().startObject().field("size", 1).endObject().string();
WatchSourceBuilder source = watchSourceBuilder()
.trigger(TriggerBuilders.schedule(interval("5s")))
.input(httpInput()
.setHost(address.getHostName())
.setPort(address.getPort())
.setPath(new ScriptTemplate(sc, "/index/_search"))
.setBody(new ScriptTemplate(sc, body))
)
.condition(scriptCondition("ctx.payload.hits.total == 1"))
.addAction(indexAction("idx", "action"));
watcherClient().preparePutWatch("_name")
.source(source)
.get();
if (timeWarped()) {
timeWarp().scheduler().trigger("_name");
refresh();
}
assertWatchWithMinimumPerformedActionsCount("_name", 1, false);
}
}

View File

@ -12,7 +12,6 @@ import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.netty.handler.codec.http.HttpMethod;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -27,7 +26,6 @@ import org.elasticsearch.watcher.actions.email.service.Email;
import org.elasticsearch.watcher.actions.email.service.EmailService;
import org.elasticsearch.watcher.actions.email.service.Profile;
import org.elasticsearch.watcher.actions.index.IndexAction;
import org.elasticsearch.watcher.actions.webhook.HttpClient;
import org.elasticsearch.watcher.actions.webhook.WebhookAction;
import org.elasticsearch.watcher.condition.Condition;
import org.elasticsearch.watcher.condition.ConditionRegistry;
@ -40,6 +38,8 @@ import org.elasticsearch.watcher.input.simple.SimpleInput;
import org.elasticsearch.watcher.support.Script;
import org.elasticsearch.watcher.support.WatcherUtils;
import org.elasticsearch.watcher.support.clock.SystemClock;
import org.elasticsearch.watcher.support.http.HttpClient;
import org.elasticsearch.watcher.support.http.HttpMethod;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.watcher.support.template.ScriptTemplate;