Add support for http input endpoints that do not return JSON formatted bodys.
This change allows the httpinput to receive non json formatted data from a http endpoint (such as the elasticsearch _cat apis). If non json is read it will be stored in the `payload._value` in the same way that the `ScriptTransform` handles non map/json data returned by transforming scripts. Added response_content_type to http input so that the expected content type in the response can be configured. This accepts `yaml`, `json` and `text` but will be overridden by the http headers. Original commit: elastic/x-pack-elasticsearch@753d37f14e
This commit is contained in:
parent
613ce8762c
commit
14ffe290f7
|
@ -5,8 +5,9 @@
|
|||
*/
|
||||
package org.elasticsearch.watcher.input.http;
|
||||
|
||||
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.watcher.execution.WatchExecutionContext;
|
||||
|
@ -47,20 +48,42 @@ public class ExecutableHttpInput extends ExecutableInput<HttpInput, HttpInput.Re
|
|||
}
|
||||
|
||||
XContentType contentType = response.xContentType();
|
||||
XContentParser parser = contentType != null ?
|
||||
contentType.xContent().createParser(response.body()) :
|
||||
XContentHelper.createParser(response.body());
|
||||
if (input.getExpectedResponseXContentType() != null) {
|
||||
if (contentType != input.getExpectedResponseXContentType().contentType()) {
|
||||
logger.warn("[{}] [{}] input expected content type [{}] but read [{}] from headers", type(), ctx.id(), input.getExpectedResponseXContentType(), contentType);
|
||||
}
|
||||
|
||||
if (contentType == null) {
|
||||
contentType = input.getExpectedResponseXContentType().contentType();
|
||||
}
|
||||
} else {
|
||||
//Attempt to auto detect content type
|
||||
if (contentType == null) {
|
||||
contentType = XContentFactory.xContentType(response.body());
|
||||
}
|
||||
}
|
||||
|
||||
XContentParser parser = null;
|
||||
if (contentType != null) {
|
||||
try {
|
||||
parser = contentType.xContent().createParser(response.body());
|
||||
} catch (Exception e) {
|
||||
throw new HttpInputException("[{}] [{}] input could not parse response body [{}] it does not appear to be [{}]", type(), ctx.id(), response.body().toUtf8(), contentType.shortName());
|
||||
}
|
||||
}
|
||||
|
||||
final Payload payload;
|
||||
if (input.getExtractKeys() != null) {
|
||||
Map<String, Object> filteredKeys = XContentFilterKeysUtils.filterMapOrdered(input.getExtractKeys(), parser);
|
||||
payload = new Payload.Simple(filteredKeys);
|
||||
} else {
|
||||
Map<String, Object> map = parser.mapOrderedAndClose();
|
||||
payload = new Payload.Simple(map);
|
||||
if (parser != null) {
|
||||
Map<String, Object> map = parser.mapOrderedAndClose();
|
||||
payload = new Payload.Simple(map);
|
||||
} else {
|
||||
payload = new Payload.Simple("_value", response.body().toUtf8());
|
||||
}
|
||||
}
|
||||
|
||||
return new HttpInput.Result(payload, request, response.status());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -11,6 +11,7 @@ import org.elasticsearch.common.collect.ImmutableSet;
|
|||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.watcher.input.Input;
|
||||
import org.elasticsearch.watcher.support.http.HttpContentType;
|
||||
import org.elasticsearch.watcher.support.http.HttpRequest;
|
||||
import org.elasticsearch.watcher.support.http.HttpRequestTemplate;
|
||||
import org.elasticsearch.watcher.watch.Payload;
|
||||
|
@ -28,10 +29,12 @@ public class HttpInput implements Input {
|
|||
public static final String TYPE = "http";
|
||||
|
||||
private final HttpRequestTemplate request;
|
||||
private final @Nullable HttpContentType expectedResponseXContentType;
|
||||
private final @Nullable Set<String> extractKeys;
|
||||
|
||||
public HttpInput(HttpRequestTemplate request, @Nullable Set<String> extractKeys) {
|
||||
public HttpInput(HttpRequestTemplate request, @Nullable HttpContentType expectedResponseXContentType, @Nullable Set<String> extractKeys) {
|
||||
this.request = request;
|
||||
this.expectedResponseXContentType = expectedResponseXContentType;
|
||||
this.extractKeys = extractKeys;
|
||||
}
|
||||
|
||||
|
@ -48,6 +51,10 @@ public class HttpInput implements Input {
|
|||
return extractKeys;
|
||||
}
|
||||
|
||||
public HttpContentType getExpectedResponseXContentType() {
|
||||
return expectedResponseXContentType;
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
|
@ -55,6 +62,9 @@ public class HttpInput implements Input {
|
|||
if (extractKeys != null) {
|
||||
builder.field(Field.EXTRACT.getPreferredName(), extractKeys);
|
||||
}
|
||||
if (expectedResponseXContentType != null) {
|
||||
builder.field(Field.RESPONSE_CONTENT_TYPE.getPreferredName(), expectedResponseXContentType, params);
|
||||
}
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
@ -62,6 +72,7 @@ public class HttpInput implements Input {
|
|||
public static HttpInput parse(String watchId, XContentParser parser, HttpRequestTemplate.Parser requestParser) throws IOException {
|
||||
Set<String> extract = null;
|
||||
HttpRequestTemplate request = null;
|
||||
HttpContentType expectedResponseBodyType = null;
|
||||
|
||||
String currentFieldName = null;
|
||||
XContentParser.Token token;
|
||||
|
@ -87,6 +98,15 @@ public class HttpInput implements Input {
|
|||
} else {
|
||||
throw new HttpInputException("could not parse [{}] input for watch [{}]. unexpected array field [{}]", TYPE, watchId, currentFieldName);
|
||||
}
|
||||
} else if (token == XContentParser.Token.VALUE_STRING) {
|
||||
if (Field.RESPONSE_CONTENT_TYPE.match(currentFieldName)) {
|
||||
expectedResponseBodyType = HttpContentType.resolve(parser.text());
|
||||
if (expectedResponseBodyType == null) {
|
||||
throw new HttpInputException("could not parse [{}] input for watch [{}]. unknown content type [{}]", TYPE, watchId, parser.text());
|
||||
}
|
||||
} else {
|
||||
throw new HttpInputException("could not parse [{}] input for watch [{}]. unexpected string field [{}]", TYPE, watchId, currentFieldName);
|
||||
}
|
||||
} else {
|
||||
throw new HttpInputException("could not parse [{}] input for watch [{}]. unexpected token [{}]", TYPE, watchId, token);
|
||||
}
|
||||
|
@ -96,7 +116,11 @@ public class HttpInput implements Input {
|
|||
throw new HttpInputException("could not parse [{}] input for watch [{}]. missing require [{}] field", TYPE, watchId, Field.REQUEST.getPreferredName());
|
||||
}
|
||||
|
||||
return new HttpInput(request, extract);
|
||||
if (expectedResponseBodyType == HttpContentType.TEXT && extract != null ) {
|
||||
throw new HttpInputException("could not parse [{}] input for watch [{}]. key extraction is not supported for content type [{}]", TYPE, watchId, expectedResponseBodyType);
|
||||
}
|
||||
|
||||
return new HttpInput(request, expectedResponseBodyType, extract);
|
||||
}
|
||||
|
||||
public static Builder builder(HttpRequestTemplate httpRequest) {
|
||||
|
@ -135,6 +159,7 @@ public class HttpInput implements Input {
|
|||
|
||||
private final HttpRequestTemplate request;
|
||||
private final ImmutableSet.Builder<String> extractKeys = ImmutableSet.builder();
|
||||
private HttpContentType expectedResponseXContentType = null;
|
||||
|
||||
private Builder(HttpRequestTemplate request) {
|
||||
this.request = request;
|
||||
|
@ -150,10 +175,15 @@ public class HttpInput implements Input {
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder expectedResponseXContentType(HttpContentType expectedResponseXContentType) {
|
||||
this.expectedResponseXContentType = expectedResponseXContentType;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpInput build() {
|
||||
ImmutableSet<String> keys = extractKeys.build();
|
||||
return new HttpInput(request, keys.isEmpty() ? null : keys);
|
||||
return new HttpInput(request, expectedResponseXContentType, keys.isEmpty() ? null : keys);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -161,5 +191,6 @@ public class HttpInput implements Input {
|
|||
ParseField REQUEST = new ParseField("request");
|
||||
ParseField EXTRACT = new ParseField("extract");
|
||||
ParseField STATUS = new ParseField("status");
|
||||
ParseField RESPONSE_CONTENT_TYPE = new ParseField("response_content_type");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,66 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.watcher.support.http;
|
||||
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.watcher.WatcherException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Locale;
|
||||
|
||||
/**
|
||||
*/
|
||||
public enum HttpContentType implements ToXContent {
|
||||
|
||||
JSON() {
|
||||
@Override
|
||||
public XContentType contentType() {
|
||||
return XContentType.JSON;
|
||||
}
|
||||
},
|
||||
|
||||
YAML() {
|
||||
@Override
|
||||
public XContentType contentType() {
|
||||
return XContentType.YAML;
|
||||
}
|
||||
},
|
||||
|
||||
TEXT() {
|
||||
@Override
|
||||
public XContentType contentType() {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
public abstract XContentType contentType();
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
return builder.value(id());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return id();
|
||||
}
|
||||
|
||||
public String id() {
|
||||
return name().toLowerCase(Locale.ROOT);
|
||||
}
|
||||
|
||||
public static HttpContentType resolve(String id) {
|
||||
switch (id.toLowerCase(Locale.ROOT)) {
|
||||
case "json" : return JSON;
|
||||
case "yaml": return YAML;
|
||||
case "text": return TEXT;
|
||||
default:
|
||||
throw new WatcherException("unknown content type [{}]", id);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -82,31 +82,38 @@ public class HttpInputTests extends ElasticsearchTestCase {
|
|||
HttpRequestTemplate.Builder request = HttpRequestTemplate.builder(host, port)
|
||||
.method(HttpMethod.POST)
|
||||
.body("_body");
|
||||
HttpInput httpInput = InputBuilders.httpInput(request.build()).build();
|
||||
ExecutableHttpInput input = new ExecutableHttpInput(httpInput, logger, httpClient, templateEngine);
|
||||
HttpInput httpInput;
|
||||
|
||||
HttpResponse response;
|
||||
switch (randomIntBetween(1, 6)) {
|
||||
case 1:
|
||||
response = new HttpResponse(123, "{\"key\" : \"value\"}".getBytes(UTF8));
|
||||
httpInput = InputBuilders.httpInput(request.build()).build();
|
||||
break;
|
||||
case 2:
|
||||
response = new HttpResponse(123, "---\nkey : value".getBytes(UTF8));
|
||||
httpInput = InputBuilders.httpInput(request.build()).expectedResponseXContentType(HttpContentType.YAML).build();
|
||||
break;
|
||||
case 3:
|
||||
response = new HttpResponse(123, "{\"key\" : \"value\"}".getBytes(UTF8), ImmutableMap.of(HttpHeaders.Names.CONTENT_TYPE, new String[] { XContentType.JSON.restContentType() }));
|
||||
httpInput = InputBuilders.httpInput(request.build()).build();
|
||||
break;
|
||||
case 4:
|
||||
response = new HttpResponse(123, "key: value".getBytes(UTF8), ImmutableMap.of(HttpHeaders.Names.CONTENT_TYPE, new String[] { XContentType.YAML.restContentType() }));
|
||||
httpInput = InputBuilders.httpInput(request.build()).build();
|
||||
break;
|
||||
case 5:
|
||||
response = new HttpResponse(123, "---\nkey: value".getBytes(UTF8), ImmutableMap.of(HttpHeaders.Names.CONTENT_TYPE, new String[] { "unrecognized_content_type" }));
|
||||
httpInput = InputBuilders.httpInput(request.build()).expectedResponseXContentType(HttpContentType.YAML).build();
|
||||
break;
|
||||
default:
|
||||
response = new HttpResponse(123, "{\"key\" : \"value\"}".getBytes(UTF8), ImmutableMap.of(HttpHeaders.Names.CONTENT_TYPE, new String[] { "unrecognized_content_type" }));
|
||||
httpInput = InputBuilders.httpInput(request.build()).build();
|
||||
break;
|
||||
}
|
||||
|
||||
ExecutableHttpInput input = new ExecutableHttpInput(httpInput, logger, httpClient, templateEngine);
|
||||
|
||||
when(httpClient.execute(any(HttpRequest.class))).thenReturn(response);
|
||||
|
||||
when(templateEngine.render(eq(Template.inline("_body").build()), any(Map.class))).thenReturn("_body");
|
||||
|
@ -129,6 +136,38 @@ public class HttpInputTests extends ElasticsearchTestCase {
|
|||
assertThat(result.payload().data(), equalTo(MapBuilder.<String, Object>newMapBuilder().put("key", "value").map()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExecute_nonJson() throws Exception {
|
||||
String host = "_host";
|
||||
int port = 123;
|
||||
HttpRequestTemplate.Builder request = HttpRequestTemplate.builder(host, port)
|
||||
.method(HttpMethod.POST)
|
||||
.body("_body");
|
||||
HttpInput httpInput = InputBuilders.httpInput(request.build()).expectedResponseXContentType(HttpContentType.TEXT).build();
|
||||
ExecutableHttpInput input = new ExecutableHttpInput(httpInput, logger, httpClient, templateEngine);
|
||||
String notJson = "This is not json";
|
||||
HttpResponse response = new HttpResponse(123, notJson.getBytes(UTF8));
|
||||
when(httpClient.execute(any(HttpRequest.class))).thenReturn(response);
|
||||
when(templateEngine.render(eq(Template.inline("_body").build()), any(Map.class))).thenReturn("_body");
|
||||
Watch watch = new Watch("test-watch",
|
||||
new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.MINUTES))),
|
||||
new ExecutableSimpleInput(new SimpleInput(new Payload.Simple()), logger),
|
||||
new ExecutableAlwaysCondition(logger),
|
||||
null,
|
||||
null,
|
||||
new ExecutableActions(new ArrayList<ActionWrapper>()),
|
||||
null,
|
||||
new WatchStatus(ImmutableMap.<String, ActionStatus>of()));
|
||||
WatchExecutionContext ctx = new TriggeredExecutionContext(watch,
|
||||
new DateTime(0, UTC),
|
||||
new ScheduleTriggerEvent(watch.id(), new DateTime(0, UTC), new DateTime(0, UTC)),
|
||||
TimeValue.timeValueSeconds(5));
|
||||
HttpInput.Result result = input.execute(ctx);
|
||||
assertThat(result.type(), equalTo(HttpInput.TYPE));
|
||||
assertThat(result.payload().data().get("_value").toString(), equalTo(notJson));
|
||||
}
|
||||
|
||||
|
||||
@Test @Repeat(iterations = 20)
|
||||
public void testParser() throws Exception {
|
||||
final HttpMethod httpMethod = rarely() ? null : randomFrom(HttpMethod.values());
|
||||
|
@ -154,8 +193,18 @@ public class HttpInputTests extends ElasticsearchTestCase {
|
|||
if (headers != null) {
|
||||
requestBuilder.putHeaders(headers);
|
||||
}
|
||||
HttpInput.Builder inputBuilder = InputBuilders.httpInput(requestBuilder);
|
||||
HttpContentType expectedResponseXContentType = randomFrom(HttpContentType.values());
|
||||
|
||||
BytesReference source = jsonBuilder().value(InputBuilders.httpInput(requestBuilder).build()).bytes();
|
||||
String[] extractKeys = randomFrom(new String[]{"foo", "bar"}, new String[]{"baz"}, null);
|
||||
if (expectedResponseXContentType != HttpContentType.TEXT) {
|
||||
if (extractKeys != null) {
|
||||
inputBuilder.extractKeys(extractKeys);
|
||||
}
|
||||
}
|
||||
|
||||
inputBuilder.expectedResponseXContentType(expectedResponseXContentType);
|
||||
BytesReference source = jsonBuilder().value(inputBuilder.build()).bytes();
|
||||
XContentParser parser = XContentHelper.createParser(source);
|
||||
parser.nextToken();
|
||||
HttpInput result = httpParser.parseInput("_id", parser);
|
||||
|
@ -166,6 +215,12 @@ public class HttpInputTests extends ElasticsearchTestCase {
|
|||
assertThat(result.getRequest().host(), equalTo(host));
|
||||
assertThat(result.getRequest().port(), equalTo(port));
|
||||
assertThat(result.getRequest().path(), is(Template.inline(path).build()));
|
||||
assertThat(result.getExpectedResponseXContentType(), equalTo(expectedResponseXContentType));
|
||||
if (expectedResponseXContentType != HttpContentType.TEXT && extractKeys != null) {
|
||||
for (String key : extractKeys) {
|
||||
assertThat(result.getExtractKeys().contains(key), is(true));
|
||||
}
|
||||
}
|
||||
if (params != null) {
|
||||
assertThat(result.getRequest().params(), hasEntry(is("a"), is(Template.inline("b").build())));
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue