diff --git a/src/main/java/org/elasticsearch/watcher/input/http/ExecutableHttpInput.java b/src/main/java/org/elasticsearch/watcher/input/http/ExecutableHttpInput.java index 2a030d4f4fb..cda72a4c5ae 100644 --- a/src/main/java/org/elasticsearch/watcher/input/http/ExecutableHttpInput.java +++ b/src/main/java/org/elasticsearch/watcher/input/http/ExecutableHttpInput.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.watcher.input.http; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; @@ -42,17 +41,25 @@ public class ExecutableHttpInput extends ExecutableInput filteredKeys = XContentFilterKeysUtils.filterMapOrdered(input.getExtractKeys(), parser); payload = new Payload.Simple(filteredKeys); } else { - Tuple> result = XContentHelper.convertToMap(response.body(), true); - payload = new Payload.Simple(result.v2()); + Map map = parser.mapOrderedAndClose(); + payload = new Payload.Simple(map); } + return new HttpInput.Result(payload, request, response.status()); } diff --git a/src/main/java/org/elasticsearch/watcher/support/http/HttpClient.java b/src/main/java/org/elasticsearch/watcher/support/http/HttpClient.java index 4b32afcdfb2..b7287ff67b1 100644 --- a/src/main/java/org/elasticsearch/watcher/support/http/HttpClient.java +++ b/src/main/java/org/elasticsearch/watcher/support/http/HttpClient.java @@ -10,6 +10,7 @@ import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.base.Charsets; import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.settings.Settings; @@ -25,6 +26,7 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.security.KeyStore; import java.security.SecureRandom; +import java.util.List; import java.util.Map; /** @@ -126,16 +128,21 @@ public class HttpClient extends AbstractLifecycleComponent { } urlConnection.connect(); - final HttpResponse response; final int statusCode = urlConnection.getResponseCode(); + ImmutableMap.Builder responseHeaders = ImmutableMap.builder(); + for (Map.Entry> header : urlConnection.getHeaderFields().entrySet()) { + // HttpURLConnection#getHeaderFields returns the first status line as a header + // with a `null` key (facepalm)... so we have to skip that one. + if (header.getKey() != null) { + responseHeaders.put(header.getKey(), header.getValue().toArray(new String[header.getValue().size()])); + } + } logger.debug("http status code [{}]", statusCode); if (statusCode < 400) { byte[] body = Streams.copyToByteArray(urlConnection.getInputStream()); - response = new HttpResponse(statusCode, body); - } else { - response = new HttpResponse(statusCode); + return new HttpResponse(statusCode, body, responseHeaders.build()); } - return response; + return new HttpResponse(statusCode, responseHeaders.build()); } /** SSL Initialization **/ diff --git a/src/main/java/org/elasticsearch/watcher/support/http/HttpResponse.java b/src/main/java/org/elasticsearch/watcher/support/http/HttpResponse.java index e113fa4253e..d6a85d3679e 100644 --- a/src/main/java/org/elasticsearch/watcher/support/http/HttpResponse.java +++ b/src/main/java/org/elasticsearch/watcher/support/http/HttpResponse.java @@ -8,36 +8,54 @@ package org.elasticsearch.watcher.support.http; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.ImmutableMap; +import org.elasticsearch.common.netty.handler.codec.http.HttpHeaders; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.watcher.WatcherException; +import javax.annotation.Nullable; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; public class HttpResponse implements ToXContent { - public static final ParseField STATUS_FIELD = new ParseField("status"); - public static final ParseField BODY_FIELD = new ParseField("body"); - private final int status; + private final ImmutableMap headers; private final BytesReference body; public HttpResponse(int status) { - this(status, (BytesReference) null); + this(status, ImmutableMap.of()); } - public HttpResponse(int status, String body) { - this(status, new BytesArray(body)); + public HttpResponse(int status, ImmutableMap headers) { + this(status, (BytesReference) null, headers); } - public HttpResponse(int status, byte[] body) { - this(status, new BytesArray(body)); + public HttpResponse(int status, @Nullable String body) { + this(status, body != null ? new BytesArray(body) : null, ImmutableMap.of()); } - public HttpResponse(int status, BytesReference body) { + public HttpResponse(int status, @Nullable String body, ImmutableMap headers) { + this(status, body != null ? new BytesArray(body) : null, headers); + } + + public HttpResponse(int status, @Nullable byte[] body) { + this(status, body != null ? new BytesArray(body) : null, ImmutableMap.of()); + } + + public HttpResponse(int status, @Nullable byte[] body, ImmutableMap headers) { + this(status, body != null ? new BytesArray(body) : null, headers); + } + + public HttpResponse(int status, @Nullable BytesReference body, ImmutableMap headers) { this.status = status; this.body = body; + this.headers = headers; } public int status() { @@ -52,30 +70,58 @@ public class HttpResponse implements ToXContent { return body; } + public ImmutableMap headers() { + return headers; + } + + public String contentType() { + String[] values = headers.get(HttpHeaders.Names.CONTENT_TYPE); + if (values == null || values.length == 0) { + return null; + } + return values[0]; + } + + public XContentType xContentType() { + String[] values = headers.get(HttpHeaders.Names.CONTENT_TYPE); + if (values == null || values.length == 0) { + return null; + } + return XContentType.fromRestContentType(values[0]); + } + @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - HttpResponse response = (HttpResponse) o; - - if (status != response.status) return false; - return body.equals(response.body); + HttpResponse that = (HttpResponse) o; + if (status != that.status) return false; + if (!headers.equals(that.headers)) return false; + return !(body != null ? !body.equals(that.body) : that.body != null); } @Override public int hashCode() { int result = status; - result = 31 * result + body.hashCode(); + result = 31 * result + headers.hashCode(); + result = 31 * result + (body != null ? body.hashCode() : 0); return result; } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder = builder.startObject().field(STATUS_FIELD.getPreferredName(), status); + builder = builder.startObject().field(Field.STATUS.getPreferredName(), status); + if (!headers.isEmpty()) { + builder.startObject(Field.HEADERS.getPreferredName()); + for (Map.Entry header : headers.entrySet()) { + builder.array(header.getKey(), header.getValue()); + } + builder.endObject(); + } if (hasContent()) { - builder = builder.field(BODY_FIELD.getPreferredName(), body.toUtf8()); + builder = builder.field(Field.BODY.getPreferredName(), body.toUtf8()); } builder.endObject(); return builder; @@ -86,49 +132,72 @@ public class HttpResponse implements ToXContent { int status = -1; String body = null; + ImmutableMap.Builder headers = ImmutableMap.builder(); String currentFieldName = null; XContentParser.Token token; while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { currentFieldName = parser.currentName(); - } else { - if (token == XContentParser.Token.VALUE_NUMBER) { - if (STATUS_FIELD.match(currentFieldName)) { - status = parser.intValue(); - } else { - throw new ParseException("could not parse http response. unknown numeric field [" + currentFieldName + "]"); - } - } else if (token == XContentParser.Token.VALUE_STRING) { - if (BODY_FIELD.match(currentFieldName)) { - body = parser.text(); - } else { - throw new ParseException("could not parse http response. unknown string field [" + currentFieldName + "]"); - } + } else if (currentFieldName == null) { + throw new ParseException("could not parse http response. expected a field name but found [{}] instead", token); + } else if (token == XContentParser.Token.VALUE_NUMBER) { + if (Field.STATUS.match(currentFieldName)) { + status = parser.intValue(); } else { - throw new ParseException("could not parse http response. unknown unexpected token [" + token + "]"); + throw new ParseException("could not parse http response. unknown numeric field [{}]", currentFieldName); } + } else if (token == XContentParser.Token.VALUE_STRING) { + if (Field.BODY.match(currentFieldName)) { + body = parser.text(); + } else { + throw new ParseException("could not parse http response. unknown string field [{}]", currentFieldName); + } + } else if (token == XContentParser.Token.START_OBJECT) { + String headerName = null; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + headerName = parser.currentName(); + } else if (headerName == null){ + throw new ParseException("could not parse http response. expected a header name but found [{}] instead", token); + } else if (token.isValue()) { + headers.put(headerName, new String[] { String.valueOf(parser.objectText()) }); + } else if (token == XContentParser.Token.START_ARRAY) { + List values = new ArrayList<>(); + while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { + if (!token.isValue()) { + throw new ParseException("could not parse http response. expected a header value for header [{}] but found [{}] instead", headerName, token); + } else { + values.add(String.valueOf(parser.objectText())); + } + } + headers.put(headerName, values.toArray(new String[values.size()])); + } + } + } else { + throw new ParseException("could not parse http response. unexpected token [{}]", token); } } if (status < 0) { - throw new ParseException("could not parse http response. missing [status] numeric field holding the response's http status code"); - } - if (body == null) { - return new HttpResponse(status); - } else { - return new HttpResponse(status, body); + throw new ParseException("could not parse http response. missing required numeric [{}] field holding the response's http status code", Field.STATUS.getPreferredName()); } + return new HttpResponse(status, body, headers.build()); } public static class ParseException extends WatcherException { - - public ParseException(String msg) { - super(msg); + public ParseException(String msg, Object... args) { + super(msg, args); } - public ParseException(String msg, Throwable cause) { - super(msg, cause); + public ParseException(String msg, Throwable cause, Object... args) { + super(msg, cause, args); } } + + interface Field { + ParseField STATUS = new ParseField("status"); + ParseField HEADERS = new ParseField("headers"); + ParseField BODY = new ParseField("body"); + } } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/watcher/support/xcontent/WatcherXContentUtils.java b/src/main/java/org/elasticsearch/watcher/support/xcontent/WatcherXContentUtils.java index 0fc05938108..1791b54a4cf 100644 --- a/src/main/java/org/elasticsearch/watcher/support/xcontent/WatcherXContentUtils.java +++ b/src/main/java/org/elasticsearch/watcher/support/xcontent/WatcherXContentUtils.java @@ -21,7 +21,7 @@ public class WatcherXContentUtils { // TODO open this up in core public static List readList(XContentParser parser, XContentParser.Token token) throws IOException { - ArrayList list = new ArrayList<>(); + List list = new ArrayList<>(); while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { list.add(readValue(parser, token)); } diff --git a/src/test/java/org/elasticsearch/watcher/input/http/HttpInputTests.java b/src/test/java/org/elasticsearch/watcher/input/http/HttpInputTests.java index bfb61713678..86bbc9fce31 100644 --- a/src/test/java/org/elasticsearch/watcher/input/http/HttpInputTests.java +++ b/src/test/java/org/elasticsearch/watcher/input/http/HttpInputTests.java @@ -11,11 +11,13 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.joda.time.DateTime; +import org.elasticsearch.common.netty.handler.codec.http.HttpHeaders; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.test.ElasticsearchTestCase; import org.elasticsearch.watcher.actions.ActionStatus; import org.elasticsearch.watcher.actions.ActionWrapper; @@ -73,7 +75,7 @@ public class HttpInputTests extends ElasticsearchTestCase { httpParser = new HttpInputFactory(ImmutableSettings.EMPTY, httpClient, templateEngine, new HttpRequest.Parser(registry), new HttpRequestTemplate.Parser(registry)); } - @Test + @Test @Repeat(iterations = 20) public void testExecute() throws Exception { String host = "_host"; int port = 123; @@ -83,7 +85,28 @@ public class HttpInputTests extends ElasticsearchTestCase { HttpInput httpInput = InputBuilders.httpInput(request.build()).build(); ExecutableHttpInput input = new ExecutableHttpInput(httpInput, logger, httpClient, templateEngine); - HttpResponse response = new HttpResponse(123, "{\"key\" : \"value\"}".getBytes(UTF8)); + HttpResponse response; + switch (randomIntBetween(1, 6)) { + case 1: + response = new HttpResponse(123, "{\"key\" : \"value\"}".getBytes(UTF8)); + break; + case 2: + response = new HttpResponse(123, "---\nkey : value".getBytes(UTF8)); + break; + case 3: + response = new HttpResponse(123, "{\"key\" : \"value\"}".getBytes(UTF8), ImmutableMap.of(HttpHeaders.Names.CONTENT_TYPE, new String[] { XContentType.JSON.restContentType() })); + break; + case 4: + response = new HttpResponse(123, "key: value".getBytes(UTF8), ImmutableMap.of(HttpHeaders.Names.CONTENT_TYPE, new String[] { XContentType.YAML.restContentType() })); + break; + case 5: + response = new HttpResponse(123, "---\nkey: value".getBytes(UTF8), ImmutableMap.of(HttpHeaders.Names.CONTENT_TYPE, new String[] { "unrecognized_content_type" })); + break; + default: + response = new HttpResponse(123, "{\"key\" : \"value\"}".getBytes(UTF8), ImmutableMap.of(HttpHeaders.Names.CONTENT_TYPE, new String[] { "unrecognized_content_type" })); + break; + } + when(httpClient.execute(any(HttpRequest.class))).thenReturn(response); when(templateEngine.render(eq(Template.inline("_body").build()), any(Map.class))).thenReturn("_body"); diff --git a/src/test/java/org/elasticsearch/watcher/support/http/HttpResponseTests.java b/src/test/java/org/elasticsearch/watcher/support/http/HttpResponseTests.java new file mode 100644 index 00000000000..3f50788424f --- /dev/null +++ b/src/test/java/org/elasticsearch/watcher/support/http/HttpResponseTests.java @@ -0,0 +1,87 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.support.http; + +import com.carrotsearch.randomizedtesting.annotations.Repeat; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.ImmutableMap; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.ElasticsearchTestCase; +import org.junit.Test; + +import java.util.Map; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.watcher.test.WatcherTestUtils.xContentParser; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.Matchers.arrayContaining; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; + +/** + * + */ +public class HttpResponseTests extends ElasticsearchTestCase { + + @Test @Repeat(iterations = 20) + public void testParse_SelfGenerated() throws Exception { + int status = randomIntBetween(200, 600); + ImmutableMap headers = ImmutableMap.of(); + if (randomBoolean()) { + headers = ImmutableMap.of("key", new String[] { "value" }); + } + String body = randomBoolean() ? "body" : null; + final HttpResponse response; + if (randomBoolean() && headers.isEmpty() && body == null) { + response = new HttpResponse(status); + } else if (body != null ){ + switch (randomIntBetween(0, 2)) { + case 0: + response = new HttpResponse(status, body, headers); + break; + case 1: + response = new HttpResponse(status, body.getBytes(UTF8), headers); + break; + default: // 2 + response = new HttpResponse(status, new BytesArray(body), headers); + break; + } + } else { // body is null + switch (randomIntBetween(0, 3)) { + case 0: + response = new HttpResponse(status, (String) null, headers); + break; + case 1: + response = new HttpResponse(status, (byte[]) null, headers); + break; + case 2: + response = new HttpResponse(status, (BytesReference) null, headers); + break; + default: //3 + response = new HttpResponse(status, headers); + break; + } + } + + XContentBuilder builder = jsonBuilder().value(response); + XContentParser parser = xContentParser(builder); + parser.nextToken(); + HttpResponse parsedResponse = HttpResponse.parse(parser); + assertThat(parsedResponse, notNullValue()); + assertThat(parsedResponse.status(), is(status)); + if (body == null) { + assertThat(parsedResponse.body(), nullValue()); + } else { + assertThat(parsedResponse.body().toUtf8(), is(body)); + } + assertThat(parsedResponse.headers().size(), is(headers.size())); + for (Map.Entry header : parsedResponse.headers().entrySet()) { + assertThat(header.getValue(), arrayContaining(headers.get(header.getKey()))); + } + } +}