Added support for headers in HttpClient

- HttpResponse now holds the response headers
- Added specific support for content type of the response, based on which we create the xcontent payload.

Original commit: elastic/x-pack-elasticsearch@beae27f576
This commit is contained in:
uboness 2015-04-29 15:54:17 +02:00
parent 46c111b016
commit 6acc3f2616
6 changed files with 249 additions and 56 deletions

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.watcher.input.http;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
@ -42,17 +41,25 @@ public class ExecutableHttpInput extends ExecutableInput<HttpInput, HttpInput.Re
HttpRequest request = input.getRequest().render(templateEngine, model);
HttpResponse response = client.execute(request);
final Payload payload;
if (!response.hasContent()) {
payload = Payload.EMPTY;
} else if (input.getExtractKeys() != null) {
XContentParser parser = XContentHelper.createParser(response.body());
return new HttpInput.Result(Payload.EMPTY, request, response.status());
}
XContentType contentType = response.xContentType();
XContentParser parser = contentType != null ?
contentType.xContent().createParser(response.body()) :
XContentHelper.createParser(response.body());
final Payload payload;
if (input.getExtractKeys() != null) {
Map<String, Object> filteredKeys = XContentFilterKeysUtils.filterMapOrdered(input.getExtractKeys(), parser);
payload = new Payload.Simple(filteredKeys);
} else {
Tuple<XContentType, Map<String, Object>> result = XContentHelper.convertToMap(response.body(), true);
payload = new Payload.Simple(result.v2());
Map<String, Object> map = parser.mapOrderedAndClose();
payload = new Payload.Simple(map);
}
return new HttpInput.Result(payload, request, response.status());
}

View File

@ -10,6 +10,7 @@ import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.base.Charsets;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.Settings;
@ -25,6 +26,7 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.KeyStore;
import java.security.SecureRandom;
import java.util.List;
import java.util.Map;
/**
@ -126,16 +128,21 @@ public class HttpClient extends AbstractLifecycleComponent<HttpClient> {
}
urlConnection.connect();
final HttpResponse response;
final int statusCode = urlConnection.getResponseCode();
ImmutableMap.Builder<String, String[]> responseHeaders = ImmutableMap.builder();
for (Map.Entry<String, List<String>> header : urlConnection.getHeaderFields().entrySet()) {
// HttpURLConnection#getHeaderFields returns the first status line as a header
// with a `null` key (facepalm)... so we have to skip that one.
if (header.getKey() != null) {
responseHeaders.put(header.getKey(), header.getValue().toArray(new String[header.getValue().size()]));
}
}
logger.debug("http status code [{}]", statusCode);
if (statusCode < 400) {
byte[] body = Streams.copyToByteArray(urlConnection.getInputStream());
response = new HttpResponse(statusCode, body);
} else {
response = new HttpResponse(statusCode);
return new HttpResponse(statusCode, body, responseHeaders.build());
}
return response;
return new HttpResponse(statusCode, responseHeaders.build());
}
/** SSL Initialization **/

View File

@ -8,36 +8,54 @@ package org.elasticsearch.watcher.support.http;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.netty.handler.codec.http.HttpHeaders;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.watcher.WatcherException;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class HttpResponse implements ToXContent {
public static final ParseField STATUS_FIELD = new ParseField("status");
public static final ParseField BODY_FIELD = new ParseField("body");
private final int status;
private final ImmutableMap<String, String[]> headers;
private final BytesReference body;
public HttpResponse(int status) {
this(status, (BytesReference) null);
this(status, ImmutableMap.<String, String[]>of());
}
public HttpResponse(int status, String body) {
this(status, new BytesArray(body));
public HttpResponse(int status, ImmutableMap<String, String[]> headers) {
this(status, (BytesReference) null, headers);
}
public HttpResponse(int status, byte[] body) {
this(status, new BytesArray(body));
public HttpResponse(int status, @Nullable String body) {
this(status, body != null ? new BytesArray(body) : null, ImmutableMap.<String, String[]>of());
}
public HttpResponse(int status, BytesReference body) {
public HttpResponse(int status, @Nullable String body, ImmutableMap<String, String[]> headers) {
this(status, body != null ? new BytesArray(body) : null, headers);
}
public HttpResponse(int status, @Nullable byte[] body) {
this(status, body != null ? new BytesArray(body) : null, ImmutableMap.<String, String[]>of());
}
public HttpResponse(int status, @Nullable byte[] body, ImmutableMap<String, String[]> headers) {
this(status, body != null ? new BytesArray(body) : null, headers);
}
public HttpResponse(int status, @Nullable BytesReference body, ImmutableMap<String, String[]> headers) {
this.status = status;
this.body = body;
this.headers = headers;
}
public int status() {
@ -52,30 +70,58 @@ public class HttpResponse implements ToXContent {
return body;
}
public ImmutableMap<String, String[]> headers() {
return headers;
}
public String contentType() {
String[] values = headers.get(HttpHeaders.Names.CONTENT_TYPE);
if (values == null || values.length == 0) {
return null;
}
return values[0];
}
public XContentType xContentType() {
String[] values = headers.get(HttpHeaders.Names.CONTENT_TYPE);
if (values == null || values.length == 0) {
return null;
}
return XContentType.fromRestContentType(values[0]);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
HttpResponse response = (HttpResponse) o;
if (status != response.status) return false;
return body.equals(response.body);
HttpResponse that = (HttpResponse) o;
if (status != that.status) return false;
if (!headers.equals(that.headers)) return false;
return !(body != null ? !body.equals(that.body) : that.body != null);
}
@Override
public int hashCode() {
int result = status;
result = 31 * result + body.hashCode();
result = 31 * result + headers.hashCode();
result = 31 * result + (body != null ? body.hashCode() : 0);
return result;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder = builder.startObject().field(STATUS_FIELD.getPreferredName(), status);
builder = builder.startObject().field(Field.STATUS.getPreferredName(), status);
if (!headers.isEmpty()) {
builder.startObject(Field.HEADERS.getPreferredName());
for (Map.Entry<String, String[]> header : headers.entrySet()) {
builder.array(header.getKey(), header.getValue());
}
builder.endObject();
}
if (hasContent()) {
builder = builder.field(BODY_FIELD.getPreferredName(), body.toUtf8());
builder = builder.field(Field.BODY.getPreferredName(), body.toUtf8());
}
builder.endObject();
return builder;
@ -86,49 +132,72 @@ public class HttpResponse implements ToXContent {
int status = -1;
String body = null;
ImmutableMap.Builder<String, String[]> headers = ImmutableMap.builder();
String currentFieldName = null;
XContentParser.Token token;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else {
if (token == XContentParser.Token.VALUE_NUMBER) {
if (STATUS_FIELD.match(currentFieldName)) {
status = parser.intValue();
} else {
throw new ParseException("could not parse http response. unknown numeric field [" + currentFieldName + "]");
}
} else if (token == XContentParser.Token.VALUE_STRING) {
if (BODY_FIELD.match(currentFieldName)) {
body = parser.text();
} else {
throw new ParseException("could not parse http response. unknown string field [" + currentFieldName + "]");
}
} else if (currentFieldName == null) {
throw new ParseException("could not parse http response. expected a field name but found [{}] instead", token);
} else if (token == XContentParser.Token.VALUE_NUMBER) {
if (Field.STATUS.match(currentFieldName)) {
status = parser.intValue();
} else {
throw new ParseException("could not parse http response. unknown unexpected token [" + token + "]");
throw new ParseException("could not parse http response. unknown numeric field [{}]", currentFieldName);
}
} else if (token == XContentParser.Token.VALUE_STRING) {
if (Field.BODY.match(currentFieldName)) {
body = parser.text();
} else {
throw new ParseException("could not parse http response. unknown string field [{}]", currentFieldName);
}
} else if (token == XContentParser.Token.START_OBJECT) {
String headerName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
headerName = parser.currentName();
} else if (headerName == null){
throw new ParseException("could not parse http response. expected a header name but found [{}] instead", token);
} else if (token.isValue()) {
headers.put(headerName, new String[] { String.valueOf(parser.objectText()) });
} else if (token == XContentParser.Token.START_ARRAY) {
List<String> values = new ArrayList<>();
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
if (!token.isValue()) {
throw new ParseException("could not parse http response. expected a header value for header [{}] but found [{}] instead", headerName, token);
} else {
values.add(String.valueOf(parser.objectText()));
}
}
headers.put(headerName, values.toArray(new String[values.size()]));
}
}
} else {
throw new ParseException("could not parse http response. unexpected token [{}]", token);
}
}
if (status < 0) {
throw new ParseException("could not parse http response. missing [status] numeric field holding the response's http status code");
}
if (body == null) {
return new HttpResponse(status);
} else {
return new HttpResponse(status, body);
throw new ParseException("could not parse http response. missing required numeric [{}] field holding the response's http status code", Field.STATUS.getPreferredName());
}
return new HttpResponse(status, body, headers.build());
}
public static class ParseException extends WatcherException {
public ParseException(String msg) {
super(msg);
public ParseException(String msg, Object... args) {
super(msg, args);
}
public ParseException(String msg, Throwable cause) {
super(msg, cause);
public ParseException(String msg, Throwable cause, Object... args) {
super(msg, cause, args);
}
}
interface Field {
ParseField STATUS = new ParseField("status");
ParseField HEADERS = new ParseField("headers");
ParseField BODY = new ParseField("body");
}
}

View File

@ -21,7 +21,7 @@ public class WatcherXContentUtils {
// TODO open this up in core
public static List<Object> readList(XContentParser parser, XContentParser.Token token) throws IOException {
ArrayList<Object> list = new ArrayList<>();
List<Object> list = new ArrayList<>();
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
list.add(readValue(parser, token));
}

View File

@ -11,11 +11,13 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.netty.handler.codec.http.HttpHeaders;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.watcher.actions.ActionStatus;
import org.elasticsearch.watcher.actions.ActionWrapper;
@ -73,7 +75,7 @@ public class HttpInputTests extends ElasticsearchTestCase {
httpParser = new HttpInputFactory(ImmutableSettings.EMPTY, httpClient, templateEngine, new HttpRequest.Parser(registry), new HttpRequestTemplate.Parser(registry));
}
@Test
@Test @Repeat(iterations = 20)
public void testExecute() throws Exception {
String host = "_host";
int port = 123;
@ -83,7 +85,28 @@ public class HttpInputTests extends ElasticsearchTestCase {
HttpInput httpInput = InputBuilders.httpInput(request.build()).build();
ExecutableHttpInput input = new ExecutableHttpInput(httpInput, logger, httpClient, templateEngine);
HttpResponse response = new HttpResponse(123, "{\"key\" : \"value\"}".getBytes(UTF8));
HttpResponse response;
switch (randomIntBetween(1, 6)) {
case 1:
response = new HttpResponse(123, "{\"key\" : \"value\"}".getBytes(UTF8));
break;
case 2:
response = new HttpResponse(123, "---\nkey : value".getBytes(UTF8));
break;
case 3:
response = new HttpResponse(123, "{\"key\" : \"value\"}".getBytes(UTF8), ImmutableMap.of(HttpHeaders.Names.CONTENT_TYPE, new String[] { XContentType.JSON.restContentType() }));
break;
case 4:
response = new HttpResponse(123, "key: value".getBytes(UTF8), ImmutableMap.of(HttpHeaders.Names.CONTENT_TYPE, new String[] { XContentType.YAML.restContentType() }));
break;
case 5:
response = new HttpResponse(123, "---\nkey: value".getBytes(UTF8), ImmutableMap.of(HttpHeaders.Names.CONTENT_TYPE, new String[] { "unrecognized_content_type" }));
break;
default:
response = new HttpResponse(123, "{\"key\" : \"value\"}".getBytes(UTF8), ImmutableMap.of(HttpHeaders.Names.CONTENT_TYPE, new String[] { "unrecognized_content_type" }));
break;
}
when(httpClient.execute(any(HttpRequest.class))).thenReturn(response);
when(templateEngine.render(eq(Template.inline("_body").build()), any(Map.class))).thenReturn("_body");

View File

@ -0,0 +1,87 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.support.http;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.watcher.test.WatcherTestUtils.xContentParser;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.Matchers.arrayContaining;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
/**
*
*/
public class HttpResponseTests extends ElasticsearchTestCase {
@Test @Repeat(iterations = 20)
public void testParse_SelfGenerated() throws Exception {
int status = randomIntBetween(200, 600);
ImmutableMap<String, String[]> headers = ImmutableMap.of();
if (randomBoolean()) {
headers = ImmutableMap.of("key", new String[] { "value" });
}
String body = randomBoolean() ? "body" : null;
final HttpResponse response;
if (randomBoolean() && headers.isEmpty() && body == null) {
response = new HttpResponse(status);
} else if (body != null ){
switch (randomIntBetween(0, 2)) {
case 0:
response = new HttpResponse(status, body, headers);
break;
case 1:
response = new HttpResponse(status, body.getBytes(UTF8), headers);
break;
default: // 2
response = new HttpResponse(status, new BytesArray(body), headers);
break;
}
} else { // body is null
switch (randomIntBetween(0, 3)) {
case 0:
response = new HttpResponse(status, (String) null, headers);
break;
case 1:
response = new HttpResponse(status, (byte[]) null, headers);
break;
case 2:
response = new HttpResponse(status, (BytesReference) null, headers);
break;
default: //3
response = new HttpResponse(status, headers);
break;
}
}
XContentBuilder builder = jsonBuilder().value(response);
XContentParser parser = xContentParser(builder);
parser.nextToken();
HttpResponse parsedResponse = HttpResponse.parse(parser);
assertThat(parsedResponse, notNullValue());
assertThat(parsedResponse.status(), is(status));
if (body == null) {
assertThat(parsedResponse.body(), nullValue());
} else {
assertThat(parsedResponse.body().toUtf8(), is(body));
}
assertThat(parsedResponse.headers().size(), is(headers.size()));
for (Map.Entry<String, String[]> header : parsedResponse.headers().entrySet()) {
assertThat(header.getValue(), arrayContaining(headers.get(header.getKey())));
}
}
}