Enable inlined xcontent structure in http input body

The body of the http input may hold xcontent data (json/yaml). The problem is that today, the body only accepts text and writing
 escapted json in text makes it very hard to maintain.

 This commit introduces another field settings - `xbody`. Now, use `xbody` for inlined xcontent and `body` for any text content. When `xbody` is used, we keep around the xcontent type and the http request that is then executed will include a content-type header that will match the xcontent type of input.

Original commit: elastic/x-pack-elasticsearch@7210908064
This commit is contained in:
uboness 2015-05-02 04:12:32 +03:00
parent 1dcb61654c
commit 5e9f747005
6 changed files with 204 additions and 44 deletions

View File

@ -239,8 +239,8 @@ public class ExecutionService extends AbstractComponent {
try {
executor.execute(new WatchExecutionTask(ctx, watchRecord));
} catch (EsRejectedExecutionException e) {
logger.debug("failed to execute triggered watch [{}]", watchRecord.name());
watchRecord.update(WatchRecord.State.FAILED, "failed to run triggered watch [" + watchRecord.name() + "] due to thread pool capacity");
logger.debug("failed to execute triggered watch [{}]", watchRecord.watchId());
watchRecord.update(WatchRecord.State.FAILED, "failed to run triggered watch [" + watchRecord.watchId() + "] due to thread pool capacity");
historyStore.update(watchRecord);
}
}
@ -285,9 +285,9 @@ public class ExecutionService extends AbstractComponent {
assert records != null;
int counter = 0;
for (WatchRecord record : records) {
Watch watch = watchStore.get(record.name());
Watch watch = watchStore.get(record.watchId());
if (watch == null) {
String message = "unable to find watch for record [" + record.name() + "]/[" + record.id() + "], perhaps it has been deleted, ignoring...";
String message = "unable to find watch for record [" + record.watchId() + "]/[" + record.id() + "], perhaps it has been deleted, ignoring...";
record.update(WatchRecord.State.DELETED_WHILE_QUEUED, message);
historyStore.update(record);
} else {
@ -320,7 +320,7 @@ public class ExecutionService extends AbstractComponent {
WatchLockService.Lock lock = watchLockService.acquire(ctx.watch().id());
try {
watchRecord.update(WatchRecord.State.CHECKING, null);
logger.debug("checking watch [{}]", watchRecord.name());
logger.debug("checking watch [{}]", watchRecord.watchId());
WatchExecutionResult result = executeInner(ctx);
watchRecord.seal(result);
if (ctx.recordExecution()) {
@ -330,14 +330,14 @@ public class ExecutionService extends AbstractComponent {
} catch (Exception e) {
if (started()) {
String detailedMessage = ExceptionsHelper.detailedMessage(e);
logger.warn("failed to execute watch [{}]/[{}], failure [{}]", watchRecord.name(), ctx.id(), detailedMessage);
logger.warn("failed to execute watch [{}]/[{}], failure [{}]", watchRecord.watchId(), ctx.id(), detailedMessage);
try {
watchRecord.update(WatchRecord.State.FAILED, detailedMessage);
if (ctx.recordExecution()) {
historyStore.update(watchRecord);
}
} catch (Exception e2) {
logger.error("failed to update watch record [{}]/[{}], failure [{}], original failure [{}]", watchRecord.name(), ctx.id(), ExceptionsHelper.detailedMessage(e2), detailedMessage);
logger.error("failed to update watch record [{}]/[{}], failure [{}], original failure [{}]", watchRecord.watchId(), ctx.id(), ExceptionsHelper.detailedMessage(e2), detailedMessage);
}
} else {
logger.debug("failed to execute watch [{}] after shutdown", e, watchRecord);

View File

@ -38,7 +38,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class WatchRecord implements ToXContent {
private Wid id;
private String name;
private String watchId;
private TriggerEvent triggerEvent;
private ExecutableInput input;
private Condition condition;
@ -58,7 +58,7 @@ public class WatchRecord implements ToXContent {
public WatchRecord(Wid id, Watch watch, TriggerEvent triggerEvent) {
this.id = id;
this.name = watch.id();
this.watchId = watch.id();
this.triggerEvent = triggerEvent;
this.condition = watch.condition().condition();
this.input = watch.input();
@ -75,8 +75,8 @@ public class WatchRecord implements ToXContent {
return triggerEvent;
}
public String name() {
return name;
public String watchId() {
return watchId;
}
public ExecutableInput input() { return input; }
@ -131,7 +131,7 @@ public class WatchRecord implements ToXContent {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Parser.WATCH_ID_FIELD.getPreferredName(), name);
builder.field(Parser.WATCH_ID_FIELD.getPreferredName(), watchId);
builder.startObject(Parser.TRIGGER_EVENT_FIELD.getPreferredName())
.field(triggerEvent.type(), triggerEvent, params)
.endObject();
@ -268,7 +268,7 @@ public class WatchRecord implements ToXContent {
}
} else if (token.isValue()) {
if (WATCH_ID_FIELD.match(currentFieldName)) {
record.name = parser.text();
record.watchId = parser.text();
} else if (MESSAGE_FIELD.match(currentFieldName)) {
record.message = parser.textOrNull();
} else if (STATE_FIELD.match(currentFieldName)) {
@ -281,7 +281,7 @@ public class WatchRecord implements ToXContent {
}
}
assert record.name() != null : "watch record [" + id +"] is missing watch_id";
assert record.id() != null : "watch record [" + id +"] is missing watch_id";
assert record.triggerEvent() != null : "watch record [" + id +"] is missing trigger";
assert record.input() != null : "watch record [" + id +"] is missing input";
assert record.condition() != null : "watch record [" + id +"] is condition input";

View File

@ -7,12 +7,12 @@ package org.elasticsearch.watcher.support.http;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.netty.handler.codec.http.HttpHeaders;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.watcher.WatcherException;
import org.elasticsearch.watcher.support.http.auth.HttpAuth;
import org.elasticsearch.watcher.support.http.auth.HttpAuthRegistry;
@ -38,10 +38,11 @@ public class HttpRequestTemplate implements ToXContent {
private final ImmutableMap<String, Template> headers;
private final HttpAuth auth;
private final Template body;
private final XContentType xContentType;
public HttpRequestTemplate(String host, int port, @Nullable Scheme scheme, @Nullable HttpMethod method, @Nullable Template path,
Map<String, Template> params, Map<String, Template> headers, HttpAuth auth,
Template body) {
Template body, XContentType xContentType) {
this.host = host;
this.port = port;
this.scheme = scheme != null ? scheme :Scheme.HTTP;
@ -51,6 +52,7 @@ public class HttpRequestTemplate implements ToXContent {
this.headers = headers != null ? ImmutableMap.copyOf(headers) : ImmutableMap.<String, Template>of();
this.auth = auth;
this.body = body;
this.xContentType = xContentType;
}
public Scheme scheme() {
@ -103,8 +105,14 @@ public class HttpRequestTemplate implements ToXContent {
}
request.setParams(mapBuilder.map());
}
if (headers != null && !headers.isEmpty()) {
if ((headers == null || headers.isEmpty()) && xContentType != null) {
request.setHeaders(ImmutableMap.of(HttpHeaders.Names.CONTENT_TYPE, xContentType.restContentType()));
} else if (headers != null && !headers.isEmpty()) {
MapBuilder<String, String> mapBuilder = MapBuilder.newMapBuilder();
if (xContentType != null) {
// putting the content type first, so it can be overridden by custom headers
mapBuilder.put(HttpHeaders.Names.CONTENT_TYPE, xContentType.restContentType());
}
for (Map.Entry<String, Template> entry : headers.entrySet()) {
mapBuilder.put(entry.getKey(), engine.render(entry.getValue(), model));
}
@ -148,7 +156,11 @@ public class HttpRequestTemplate implements ToXContent {
.endObject();
}
if (body != null) {
builder.field(Parser.BODY_FIELD.getPreferredName(), body, params);
if (xContentType != null) {
builder.rawField(Parser.XBODY_FIELD.getPreferredName(), new BytesArray(body.getTemplate()));
} else {
builder.field(Parser.BODY_FIELD.getPreferredName(), body, params);
}
}
return builder.endObject();
}
@ -161,20 +173,21 @@ public class HttpRequestTemplate implements ToXContent {
HttpRequestTemplate that = (HttpRequestTemplate) o;
if (port != that.port) return false;
if (auth != null ? !auth.equals(that.auth) : that.auth != null) return false;
if (body != null ? !body.equals(that.body) : that.body != null) return false;
if (headers != null ? !headers.equals(that.headers) : that.headers != null) return false;
if (scheme != that.scheme) return false;
if (host != null ? !host.equals(that.host) : that.host != null) return false;
if (method != that.method) return false;
if (params != null ? !params.equals(that.params) : that.params != null) return false;
if (path != null ? !path.equals(that.path) : that.path != null) return false;
return true;
if (params != null ? !params.equals(that.params) : that.params != null) return false;
if (headers != null ? !headers.equals(that.headers) : that.headers != null) return false;
if (auth != null ? !auth.equals(that.auth) : that.auth != null) return false;
if (body != null ? !body.equals(that.body) : that.body != null) return false;
return xContentType == that.xContentType;
}
@Override
public int hashCode() {
int result = host != null ? host.hashCode() : 0;
int result = scheme != null ? scheme.hashCode() : 0;
result = 31 * result + (host != null ? host.hashCode() : 0);
result = 31 * result + port;
result = 31 * result + (method != null ? method.hashCode() : 0);
result = 31 * result + (path != null ? path.hashCode() : 0);
@ -182,6 +195,7 @@ public class HttpRequestTemplate implements ToXContent {
result = 31 * result + (headers != null ? headers.hashCode() : 0);
result = 31 * result + (auth != null ? auth.hashCode() : 0);
result = 31 * result + (body != null ? body.hashCode() : 0);
result = 31 * result + (xContentType != null ? xContentType.hashCode() : 0);
return result;
}
@ -200,6 +214,7 @@ public class HttpRequestTemplate implements ToXContent {
public static final ParseField HEADERS_FIELD = new ParseField("headers");
public static final ParseField AUTH_FIELD = new ParseField("auth");
public static final ParseField BODY_FIELD = new ParseField("body");
public static final ParseField XBODY_FIELD = new ParseField("xbody");
private final HttpAuthRegistry httpAuthRegistry;
@ -211,6 +226,8 @@ public class HttpRequestTemplate implements ToXContent {
public HttpRequestTemplate parse(XContentParser parser) throws IOException {
assert parser.currentToken() == XContentParser.Token.START_OBJECT;
boolean seenBody = false;
boolean seenXBody = false;
Builder builder = new Builder();
XContentParser.Token token;
String currentFieldName = null;
@ -224,12 +241,24 @@ public class HttpRequestTemplate implements ToXContent {
} else if (PARAMS_FIELD.match(currentFieldName)) {
builder.putParams(parseFieldTemplates(currentFieldName, parser));
} else if (BODY_FIELD.match(currentFieldName)) {
if (seenXBody) {
throw new ParseException("could not parse http request template. both [{}] and [{}] are set, only one of the two is allowed", XBODY_FIELD.getPreferredName(), BODY_FIELD.getPreferredName());
}
seenBody = true;
builder.body(parseFieldTemplate(currentFieldName, parser));
} else if (XBODY_FIELD.match(currentFieldName)) {
if (seenBody) {
throw new ParseException("could not parse http request template. both [{}] and [{}] are set, only one of the two is allowed", XBODY_FIELD.getPreferredName(), BODY_FIELD.getPreferredName());
}
seenXBody = true;
XContentBuilder contentBuilder = XContentBuilder.builder(parser.contentType().xContent());
XContentHelper.copyCurrentStructure(contentBuilder.generator(), parser);
builder.body(contentBuilder);
} else if (token == XContentParser.Token.START_OBJECT) {
if (AUTH_FIELD.match(currentFieldName)) {
builder.auth(httpAuthRegistry.parse(parser));
} else {
throw new ParseException("could not parse http request template. unexpected object field [" + currentFieldName + "]");
throw new ParseException("could not parse http request template. unexpected object field [{}]", currentFieldName);
}
} else if (token == XContentParser.Token.VALUE_STRING) {
if (SCHEME_FIELD.match(currentFieldName)) {
@ -239,24 +268,24 @@ public class HttpRequestTemplate implements ToXContent {
} else if (HOST_FIELD.match(currentFieldName)) {
builder.host = parser.text();
} else {
throw new ParseException("could not parse http request template. unexpected string field [" + currentFieldName + "]");
throw new ParseException("could not parse http request template. unexpected string field [{}]", currentFieldName);
}
} else if (token == XContentParser.Token.VALUE_NUMBER) {
if (PORT_FIELD.match(currentFieldName)) {
builder.port = parser.intValue();
} else {
throw new ParseException("could not parse http request template. unexpected numeric field [" + currentFieldName + "]");
throw new ParseException("could not parse http request template. unexpected numeric field [{}]", currentFieldName);
}
} else {
throw new ParseException("could not parse http request template. unexpected token [" + token + "] for field [" + currentFieldName + "]");
throw new ParseException("could not parse http request template. unexpected token [{}] for field [{}]", token, currentFieldName);
}
}
if (builder.host == null) {
throw new ParseException("could not parse http request template. missing required [host] string field");
throw new ParseException("could not parse http request template. missing required [{}] string field", HOST_FIELD.getPreferredName());
}
if (builder.port <= 0) {
throw new ParseException("could not parse http request template. missing required [port] numeric field");
throw new ParseException("could not parse http request template. missing required [{}] numeric field", PORT_FIELD.getPreferredName());
}
return builder.build();
@ -266,7 +295,7 @@ public class HttpRequestTemplate implements ToXContent {
try {
return Template.parse(parser);
} catch (Template.ParseException pe) {
throw new ParseException("could not parse http request template. could not parse value for [" + field + "] field", pe);
throw new ParseException("could not parse http request template. could not parse value for [{}] field", pe, field);
}
}
@ -289,12 +318,12 @@ public class HttpRequestTemplate implements ToXContent {
public static class ParseException extends WatcherException {
public ParseException(String msg) {
super(msg);
public ParseException(String msg, Object... args) {
super(msg, args);
}
public ParseException(String msg, Throwable cause) {
super(msg, cause);
public ParseException(String msg, Throwable cause, Object... args) {
super(msg, cause, args);
}
}
@ -309,6 +338,7 @@ public class HttpRequestTemplate implements ToXContent {
private final ImmutableMap.Builder<String, Template> headers = ImmutableMap.builder();
private HttpAuth auth;
private Template body;
private XContentType xContentType;
private Builder() {
}
@ -363,11 +393,20 @@ public class HttpRequestTemplate implements ToXContent {
}
public Builder body(String body) {
return body(new Template(body));
return body(body, null);
}
public Builder body(Template body) {
return body(body, null);
}
public Builder body(String body, XContentType xContentType) {
return body(new Template(body), xContentType);
}
public Builder body(Template body, XContentType xContentType) {
this.body = body;
this.xContentType = xContentType;
return this;
}
@ -380,11 +419,11 @@ public class HttpRequestTemplate implements ToXContent {
}
public Builder body(XContentBuilder content) {
return body(content.bytes().toUtf8());
return body(content.bytes().toUtf8(), content.contentType());
}
public HttpRequestTemplate build() {
return new HttpRequestTemplate(host, port, scheme, method, path, params.build(), headers.build(), auth, body);
return new HttpRequestTemplate(host, port, scheme, method, path, params.build(), headers.build(), auth, body, xContentType);
}
}

View File

@ -84,8 +84,8 @@ public class Template implements ToXContent {
public static Template parse(XContentParser parser) throws IOException {
XContentParser.Token token = parser.currentToken();
if (token == XContentParser.Token.VALUE_STRING) {
return new Template(parser.text());
if (token.isValue()) {
return new Template(String.valueOf(parser.objectText()));
}
if (token != XContentParser.Token.START_OBJECT) {
throw new ParseException("expected a string value or an object, but found [" + token + "] instead");

View File

@ -106,7 +106,7 @@ public class HttpInputTests extends ElasticsearchTestCase {
assertThat(result.payload().data(), equalTo(MapBuilder.<String, Object>newMapBuilder().put("key", "value").map()));
}
@Test //@Repeat(iterations = 20)
@Test @Repeat(iterations = 20)
public void testParser() throws Exception {
final HttpMethod httpMethod = rarely() ? null : randomFrom(HttpMethod.values());
Scheme scheme = randomFrom(Scheme.HTTP, Scheme.HTTPS, null);

View File

@ -0,0 +1,121 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.support.http;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.netty.handler.codec.http.HttpHeaders;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.watcher.support.http.auth.HttpAuthFactory;
import org.elasticsearch.watcher.support.http.auth.HttpAuthRegistry;
import org.elasticsearch.watcher.support.http.auth.basic.BasicAuth;
import org.elasticsearch.watcher.support.http.auth.basic.BasicAuthFactory;
import org.elasticsearch.watcher.support.secret.SecretService;
import org.elasticsearch.watcher.support.template.Template;
import org.elasticsearch.watcher.support.template.TemplateEngine;
import org.junit.Test;
import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.is;
/**
*
*/
public class HttpRequestTemplateTests extends ElasticsearchTestCase {
@Test @Repeat(iterations = 5)
public void testXBody() throws Exception {
XContentType type = randomFrom(XContentType.JSON, XContentType.YAML);
HttpRequestTemplate template = HttpRequestTemplate.builder("_host", 1234)
.body(XContentBuilder.builder(type.xContent()).startObject().endObject())
.build();
HttpRequest request = template.render(new MockTemplateEngine(), ImmutableMap.<String, Object>of());
assertThat(request.headers, hasEntry(HttpHeaders.Names.CONTENT_TYPE, type.restContentType()));
}
@Test
public void testBody() throws Exception {
HttpRequestTemplate template = HttpRequestTemplate.builder("_host", 1234)
.body("_body")
.build();
HttpRequest request = template.render(new MockTemplateEngine(), ImmutableMap.<String, Object>of());
assertThat(request.headers.size(), is(0));
}
@Test(expected = HttpRequestTemplate.ParseException.class)
public void testParse_BothBodyAndXBody() throws Exception {
XContentBuilder builder = jsonBuilder()
.startObject()
.field("body", "_body")
.field("xbody", "{}")
.endObject();
XContentParser xContentParser = JsonXContent.jsonXContent.createParser(builder.bytes());
HttpAuthRegistry registry = new HttpAuthRegistry(ImmutableMap.<String, HttpAuthFactory>of(BasicAuth.TYPE, new BasicAuthFactory(new SecretService.PlainText())));
HttpRequestTemplate.Parser parser = new HttpRequestTemplate.Parser(registry);
xContentParser.nextToken();
HttpRequestTemplate parsed = parser.parse(xContentParser);
}
@Test @Repeat(iterations = 20)
public void testParse_SelfGenerated() throws Exception {
HttpRequestTemplate.Builder builder = HttpRequestTemplate.builder("_host", 1234);
if (randomBoolean()) {
builder.method(randomFrom(HttpMethod.values()));
}
if (randomBoolean()) {
builder.path("/path");
}
boolean xbody = randomBoolean();
if (randomBoolean()) {
if (xbody) {
builder.body(jsonBuilder().startObject().endObject());
} else {
builder.body("_body");
}
}
if (randomBoolean()) {
builder.auth(new BasicAuth("_username", "_password".toCharArray()));
}
if (randomBoolean()) {
builder.putParam("_key", new Template("_value"));
}
if (randomBoolean()) {
builder.putHeader("_key", new Template("_value"));
}
HttpRequestTemplate template = builder.build();
HttpAuthRegistry registry = new HttpAuthRegistry(ImmutableMap.<String, HttpAuthFactory>of(BasicAuth.TYPE, new BasicAuthFactory(new SecretService.PlainText())));
HttpRequestTemplate.Parser parser = new HttpRequestTemplate.Parser(registry);
XContentBuilder xContentBuilder = template.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS);
XContentParser xContentParser = JsonXContent.jsonXContent.createParser(xContentBuilder.bytes());
xContentParser.nextToken();
HttpRequestTemplate parsed = parser.parse(xContentParser);
assertThat(parsed, equalTo(template));
}
static class MockTemplateEngine implements TemplateEngine {
@Override
public String render(Template template, Map<String, Object> model) {
return template.getTemplate();
}
}
}