Watcher: Expose HTTP response headers in payload
This exposes the headers (all lower-cased) in the payload, so that the can be accessed in the conditions. Closes elastic/elasticsearch#1560 Original commit: elastic/x-pack-elasticsearch@c9b08558fe
This commit is contained in:
parent
74edbe6332
commit
23ebbed95a
|
@ -21,6 +21,8 @@ import org.elasticsearch.xpack.watcher.support.http.HttpResponse;
|
|||
import org.elasticsearch.xpack.watcher.support.text.TextTemplateEngine;
|
||||
import org.elasticsearch.xpack.watcher.watch.Payload;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
|
@ -50,9 +52,11 @@ public class ExecutableHttpInput extends ExecutableInput<HttpInput, HttpInput.Re
|
|||
|
||||
HttpInput.Result doExecute(WatchExecutionContext ctx, HttpRequest request) throws Exception {
|
||||
HttpResponse response = client.execute(request);
|
||||
Map<String, List<String>> headers = response.headers();
|
||||
|
||||
if (!response.hasContent()) {
|
||||
return new HttpInput.Result(request, response.status(), Payload.EMPTY);
|
||||
Payload payload = headers.size() > 0 ? new Payload.Simple("_headers", headers) : Payload.EMPTY;
|
||||
return new HttpInput.Result(request, -1, payload);
|
||||
}
|
||||
|
||||
XContentType contentType = response.xContentType();
|
||||
|
@ -81,18 +85,19 @@ public class ExecutableHttpInput extends ExecutableInput<HttpInput, HttpInput.Re
|
|||
}
|
||||
}
|
||||
|
||||
final Payload payload;
|
||||
final Map<String, Object> payloadMap = new HashMap<>();
|
||||
if (input.getExtractKeys() != null) {
|
||||
Map<String, Object> filteredKeys = XContentFilterKeysUtils.filterMapOrdered(input.getExtractKeys(), parser);
|
||||
payload = new Payload.Simple(filteredKeys);
|
||||
payloadMap.putAll(XContentFilterKeysUtils.filterMapOrdered(input.getExtractKeys(), parser));
|
||||
} else {
|
||||
if (parser != null) {
|
||||
Map<String, Object> map = parser.mapOrdered();
|
||||
payload = new Payload.Simple(map);
|
||||
payloadMap.putAll(parser.mapOrdered());
|
||||
} else {
|
||||
payload = new Payload.Simple("_value", response.body().toUtf8());
|
||||
payloadMap.put("_value", response.body().toUtf8());
|
||||
}
|
||||
}
|
||||
return new HttpInput.Result(request, response.status(), payload);
|
||||
if (headers.size() > 0) {
|
||||
payloadMap.put("_headers", headers);
|
||||
}
|
||||
return new HttpInput.Result(request, response.status(), new Payload.Simple(payloadMap));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -161,10 +161,6 @@ public class HttpInput implements Input {
|
|||
return request;
|
||||
}
|
||||
|
||||
public int statusCode() {
|
||||
return statusCode;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected XContentBuilder typeXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
if (request == null) {
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.watcher.support.http;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
|
@ -81,6 +82,18 @@ public class HttpResponse implements ToXContent {
|
|||
return body;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns all the headers, with keys being lowercased, so they are always consistent
|
||||
* in the payload
|
||||
*/
|
||||
public Map<String, List<String>> headers() {
|
||||
ImmutableMap.Builder<String, List<String>> builder = ImmutableMap.builder();
|
||||
for (Map.Entry<String, String[]> entry : headers.entrySet()) {
|
||||
builder.put(entry.getKey().toLowerCase(Locale.ROOT), Arrays.asList(entry.getValue()));
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
public String[] header(String header) {
|
||||
return headers.get(header.toLowerCase(Locale.ROOT));
|
||||
}
|
||||
|
|
|
@ -14,7 +14,6 @@ import org.elasticsearch.common.xcontent.XContentHelper;
|
|||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.watcher.actions.ActionWrapper;
|
||||
import org.elasticsearch.xpack.watcher.actions.ExecutableActions;
|
||||
import org.elasticsearch.xpack.watcher.condition.always.ExecutableAlwaysCondition;
|
||||
import org.elasticsearch.xpack.watcher.execution.TriggeredExecutionContext;
|
||||
|
@ -48,6 +47,9 @@ import org.junit.Before;
|
|||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
|
@ -55,6 +57,7 @@ import static java.util.Collections.singletonMap;
|
|||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasEntry;
|
||||
import static org.hamcrest.Matchers.hasKey;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.joda.time.DateTimeZone.UTC;
|
||||
|
@ -121,27 +124,13 @@ public class HttpInputTests extends ESTestCase {
|
|||
}
|
||||
|
||||
ExecutableHttpInput input = new ExecutableHttpInput(httpInput, logger, httpClient, templateEngine);
|
||||
|
||||
when(httpClient.execute(any(HttpRequest.class))).thenReturn(response);
|
||||
|
||||
when(templateEngine.render(eq(TextTemplate.inline("_body").build()), any(Map.class))).thenReturn("_body");
|
||||
|
||||
Watch watch = new Watch("test-watch",
|
||||
new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.MINUTES))),
|
||||
new ExecutableSimpleInput(new SimpleInput(new Payload.Simple()), logger),
|
||||
new ExecutableAlwaysCondition(logger),
|
||||
null,
|
||||
null,
|
||||
new ExecutableActions(new ArrayList<ActionWrapper>()),
|
||||
null,
|
||||
new WatchStatus(new DateTime(0, UTC), emptyMap()));
|
||||
WatchExecutionContext ctx = new TriggeredExecutionContext(watch,
|
||||
new DateTime(0, UTC),
|
||||
new ScheduleTriggerEvent(watch.id(), new DateTime(0, UTC), new DateTime(0, UTC)),
|
||||
TimeValue.timeValueSeconds(5));
|
||||
WatchExecutionContext ctx = createWatchExecutionContext();
|
||||
HttpInput.Result result = input.execute(ctx, new Payload.Simple());
|
||||
assertThat(result.type(), equalTo(HttpInput.TYPE));
|
||||
assertThat(result.payload().data(), equalTo(MapBuilder.<String, Object>newMapBuilder().put("key", "value").map()));
|
||||
assertThat(result.payload().data(), hasEntry("key", "value"));
|
||||
}
|
||||
|
||||
public void testExecuteNonJson() throws Exception {
|
||||
|
@ -156,19 +145,8 @@ public class HttpInputTests extends ESTestCase {
|
|||
HttpResponse response = new HttpResponse(123, notJson.getBytes(StandardCharsets.UTF_8));
|
||||
when(httpClient.execute(any(HttpRequest.class))).thenReturn(response);
|
||||
when(templateEngine.render(eq(TextTemplate.inline("_body").build()), any(Map.class))).thenReturn("_body");
|
||||
Watch watch = new Watch("test-watch",
|
||||
new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.MINUTES))),
|
||||
new ExecutableSimpleInput(new SimpleInput(new Payload.Simple()), logger),
|
||||
new ExecutableAlwaysCondition(logger),
|
||||
null,
|
||||
null,
|
||||
new ExecutableActions(new ArrayList<ActionWrapper>()),
|
||||
null,
|
||||
new WatchStatus(new DateTime(0, UTC), emptyMap()));
|
||||
WatchExecutionContext ctx = new TriggeredExecutionContext(watch,
|
||||
new DateTime(0, UTC),
|
||||
new ScheduleTriggerEvent(watch.id(), new DateTime(0, UTC), new DateTime(0, UTC)),
|
||||
TimeValue.timeValueSeconds(5));
|
||||
|
||||
WatchExecutionContext ctx = createWatchExecutionContext();
|
||||
HttpInput.Result result = input.execute(ctx, new Payload.Simple());
|
||||
assertThat(result.type(), equalTo(HttpInput.TYPE));
|
||||
assertThat(result.payload().data().get("_value").toString(), equalTo(notJson));
|
||||
|
@ -242,7 +220,7 @@ public class HttpInputTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testParser_invalidHttpMethod() throws Exception {
|
||||
public void testParserInvalidHttpMethod() throws Exception {
|
||||
XContentBuilder builder = jsonBuilder().startObject()
|
||||
.startObject("request")
|
||||
.field("method", "_method")
|
||||
|
@ -258,4 +236,56 @@ public class HttpInputTests extends ESTestCase {
|
|||
assertThat(e.getMessage(), is("unsupported http method [_METHOD]"));
|
||||
}
|
||||
}
|
||||
|
||||
public void testThatHeadersAreIncludedInPayload() throws Exception {
|
||||
String headerName = randomAsciiOfLength(10);
|
||||
String headerValue = randomAsciiOfLength(10);
|
||||
boolean responseHasContent = randomBoolean();
|
||||
|
||||
HttpRequestTemplate.Builder request = HttpRequestTemplate.builder("localhost", 8080);
|
||||
HttpInput httpInput = InputBuilders.httpInput(request.build()).build();
|
||||
ExecutableHttpInput input = new ExecutableHttpInput(httpInput, logger, httpClient, templateEngine);
|
||||
|
||||
Map<String, String[]> responseHeaders = new HashMap<>();
|
||||
responseHeaders.put(headerName, new String[] { headerValue });
|
||||
HttpResponse response;
|
||||
if (responseHasContent) {
|
||||
response = new HttpResponse(200, "body".getBytes(StandardCharsets.UTF_8), responseHeaders);
|
||||
} else {
|
||||
BytesReference bytesReference = null;
|
||||
response = new HttpResponse(200, bytesReference, responseHeaders);
|
||||
}
|
||||
|
||||
when(httpClient.execute(any(HttpRequest.class))).thenReturn(response);
|
||||
|
||||
when(templateEngine.render(eq(TextTemplate.inline("_body").build()), any(Map.class))).thenReturn("_body");
|
||||
|
||||
WatchExecutionContext ctx = createWatchExecutionContext();
|
||||
HttpInput.Result result = input.execute(ctx, new Payload.Simple());
|
||||
|
||||
assertThat(result.type(), equalTo(HttpInput.TYPE));
|
||||
List<String> expectedHeaderValues = new ArrayList<>();
|
||||
expectedHeaderValues.add(headerValue);
|
||||
Map<String, Object> expectedHeaderMap = MapBuilder.<String, Object>newMapBuilder()
|
||||
.put(headerName.toLowerCase(Locale.ROOT), expectedHeaderValues)
|
||||
.map();
|
||||
assertThat(result.payload().data(), hasKey("_headers"));
|
||||
assertThat(result.payload().data().get("_headers"), equalTo(expectedHeaderMap));
|
||||
}
|
||||
|
||||
private WatchExecutionContext createWatchExecutionContext() {
|
||||
Watch watch = new Watch("test-watch",
|
||||
new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.MINUTES))),
|
||||
new ExecutableSimpleInput(new SimpleInput(new Payload.Simple()), logger),
|
||||
new ExecutableAlwaysCondition(logger),
|
||||
null,
|
||||
null,
|
||||
new ExecutableActions(new ArrayList<>()),
|
||||
null,
|
||||
new WatchStatus(new DateTime(0, UTC), emptyMap()));
|
||||
return new TriggeredExecutionContext(watch,
|
||||
new DateTime(0, UTC),
|
||||
new ScheduleTriggerEvent(watch.id(), new DateTime(0, UTC), new DateTime(0, UTC)),
|
||||
TimeValue.timeValueSeconds(5));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue