Watcher: Introduce dedicated reporting attachment type (elastic/elasticsearch#3665)

Instead of using the long running and long blocking single polling HTTP attachment for our reporting,
we should use the async API provided by kibana. The new workflow (all blocking and in a single watch)
looks like this:

1. An initial request is sent to trigger the report generation, which returns a path
2. This path is used to continuously check if the report is done (then it is sent back) or kibana sends another HTTP error code, which will result in watcher to sleep for another interval until the report is finally returned.

Features include configurable interval time and retry count, so that the total amount of waiting can be tweaked into two directions.

This is what the reporting type looks like right now

```
{
   "my-attachment":{
      "reporting":{
         "url":"http://www.example.org/my-dashboard",
         "retries":6, // optional, default 40
         "interval":"1s", // optional, default 15s
         "auth":{
            "basic":{
               "username":"foo",
               "password":"secret"
            }
         }
      }
   }
}
```

The interval/retries can also be configured via settings.

Note, that this is just a temporal workaround until the watcher execution can execute in an asynchronous fashion.

Closes elastic/elasticsearch#3524

Original commit: elastic/x-pack-elasticsearch@d1eaa856b9
This commit is contained in:
Alexander Reelsen 2016-10-19 12:21:25 +02:00 committed by GitHub
parent 1c3baa61fe
commit baf1596418
14 changed files with 868 additions and 56 deletions

View File

@ -5,22 +5,6 @@
*/
package org.elasticsearch.xpack;
import java.io.IOException;
import java.nio.file.Path;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
@ -33,7 +17,6 @@ import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.common.inject.util.Providers;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
@ -85,6 +68,7 @@ import org.elasticsearch.xpack.notification.email.attachment.DataAttachmentParse
import org.elasticsearch.xpack.notification.email.attachment.EmailAttachmentParser;
import org.elasticsearch.xpack.notification.email.attachment.EmailAttachmentsParser;
import org.elasticsearch.xpack.notification.email.attachment.HttpEmailAttachementParser;
import org.elasticsearch.xpack.notification.email.attachment.ReportingAttachmentParser;
import org.elasticsearch.xpack.notification.email.support.BodyPartSource;
import org.elasticsearch.xpack.notification.hipchat.HipChatService;
import org.elasticsearch.xpack.notification.pagerduty.PagerDutyAccount;
@ -104,6 +88,22 @@ import org.elasticsearch.xpack.support.clock.SystemClock;
import org.elasticsearch.xpack.watcher.Watcher;
import org.elasticsearch.xpack.watcher.WatcherFeatureSet;
import java.io.IOException;
import java.nio.file.Path;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, IngestPlugin, NetworkPlugin {
public static final String NAME = "x-pack";
@ -246,7 +246,7 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I
components.add(httpClient);
components.addAll(createNotificationComponents(clusterService.getClusterSettings(), httpClient,
httpTemplateParser, scriptService));
httpTemplateParser, scriptService, httpAuthRegistry));
// just create the reloader as it will pull all of the loaded ssl configurations and start watching them
new SSLConfigurationReloader(settings, env, sslService, resourceWatcherService);
@ -254,7 +254,8 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I
}
private Collection<Object> createNotificationComponents(ClusterSettings clusterSettings, HttpClient httpClient,
HttpRequestTemplate.Parser httpTemplateParser, ScriptService scriptService) {
HttpRequestTemplate.Parser httpTemplateParser, ScriptService scriptService,
HttpAuthRegistry httpAuthRegistry) {
List<Object> components = new ArrayList<>();
components.add(new EmailService(settings, security.getCryptoService(), clusterSettings));
components.add(new HipChatService(settings, httpClient, clusterSettings));
@ -266,6 +267,8 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I
Map<String, EmailAttachmentParser> parsers = new HashMap<>();
parsers.put(HttpEmailAttachementParser.TYPE, new HttpEmailAttachementParser(httpClient, httpTemplateParser, textTemplateEngine));
parsers.put(DataAttachmentParser.TYPE, new DataAttachmentParser());
parsers.put(ReportingAttachmentParser.TYPE, new ReportingAttachmentParser(settings, httpClient, textTemplateEngine,
httpAuthRegistry));
components.add(new EmailAttachmentsParser(parsers));
return components;
@ -316,6 +319,8 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I
settings.add(EmailService.EMAIL_ACCOUNT_SETTING);
settings.add(HipChatService.HIPCHAT_ACCOUNT_SETTING);
settings.add(PagerDutyService.PAGERDUTY_ACCOUNT_SETTING);
settings.add(ReportingAttachmentParser.RETRIES_SETTING);
settings.add(ReportingAttachmentParser.INTERVAL_SETTING);
// http settings
settings.add(Setting.simpleString("xpack.http.default_read_timeout", Setting.Property.NodeScope));

View File

@ -252,6 +252,10 @@ public class HttpRequestTemplate implements ToXContent {
return new Builder(host, port);
}
public static Builder builder(String url) {
return new Builder(url);
}
static Builder builder() {
return new Builder();
}
@ -392,6 +396,10 @@ public class HttpRequestTemplate implements ToXContent {
private Builder() {
}
private Builder(String url) {
fromUrl(url);
}
private Builder(String host, int port) {
this.host = host;
this.port = port;

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.common.http.auth;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.common.http.auth.basic;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.common.http.auth.HttpAuthFactory;
import org.elasticsearch.xpack.security.crypto.CryptoService;

View File

@ -10,10 +10,10 @@ import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.notification.email.Attachment;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.watcher.support.Variables;
import org.elasticsearch.xpack.watcher.watch.Payload;
import org.elasticsearch.xpack.notification.email.Attachment;
import java.io.IOException;
import java.util.Map;
@ -57,7 +57,7 @@ public class DataAttachmentParser implements EmailAttachmentParser<DataAttachmen
}
@Override
public Attachment toAttachment(WatchExecutionContext ctx, Payload payload, DataAttachment attachment) {
public Attachment toAttachment(WatchExecutionContext ctx, Payload payload, DataAttachment attachment) throws IOException {
Map<String, Object> model = Variables.createCtxModel(ctx, payload);
return attachment.getDataAttachment().create(attachment.id(), model);
}

View File

@ -5,12 +5,11 @@
*/
package org.elasticsearch.xpack.notification.email.attachment;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.notification.email.Attachment;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.watcher.watch.Payload;
import org.elasticsearch.xpack.notification.email.Attachment;
import java.io.IOException;
@ -62,6 +61,6 @@ public interface EmailAttachmentParser<T extends EmailAttachmentParser.EmailAtta
* @param attachment The typed attachment
* @return An attachment that is ready to be used in a MimeMessage
*/
Attachment toAttachment(WatchExecutionContext context, Payload payload, T attachment) throws ElasticsearchException;
Attachment toAttachment(WatchExecutionContext context, Payload payload, T attachment) throws IOException;
}

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.notification.email.attachment;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;

View File

@ -5,14 +5,12 @@
*/
package org.elasticsearch.xpack.notification.email.attachment;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.common.http.HttpClient;
import org.elasticsearch.xpack.common.http.HttpRequest;
@ -39,14 +37,12 @@ public class HttpEmailAttachementParser implements EmailAttachmentParser<HttpReq
private final HttpClient httpClient;
private HttpRequestTemplate.Parser requestTemplateParser;
private final TextTemplateEngine templateEngine;
private final Logger logger;
public HttpEmailAttachementParser(HttpClient httpClient, HttpRequestTemplate.Parser requestTemplateParser,
TextTemplateEngine templateEngine) {
this.httpClient = httpClient;
this.requestTemplateParser = requestTemplateParser;
this.templateEngine = templateEngine;
this.logger = Loggers.getLogger(getClass());
}
@Override
@ -86,11 +82,10 @@ public class HttpEmailAttachementParser implements EmailAttachmentParser<HttpReq
@Override
public Attachment toAttachment(WatchExecutionContext context, Payload payload,
HttpRequestAttachment attachment) throws ElasticsearchException {
HttpRequestAttachment attachment) throws IOException {
Map<String, Object> model = Variables.createCtxModel(context, payload);
HttpRequest httpRequest = attachment.getRequestTemplate().render(templateEngine, model);
try {
HttpResponse response = httpClient.execute(httpRequest);
// check for status 200, only then append attachment
if (response.status() >= 200 && response.status() < 300) {
@ -111,11 +106,5 @@ public class HttpEmailAttachementParser implements EmailAttachmentParser<HttpReq
context.watch().id(), attachment.id(), httpRequest.host(), httpRequest.port(), httpRequest.method(),
httpRequest.path(), response.status());
}
} catch (IOException e) {
throw new ElasticsearchException("Watch[{}] attachment[{}] Error executing HTTP request host[{}], port[{}], " +
"method[{}], path[{}], exception[{}]",
context.watch().id(), attachment.id(), httpRequest.host(), httpRequest.port(), httpRequest.method(),
httpRequest.path(), e.getMessage());
}
}
}

View File

@ -0,0 +1,121 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.notification.email.attachment;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.common.http.auth.HttpAuth;
import java.io.IOException;
import java.util.Objects;
public class ReportingAttachment implements EmailAttachmentParser.EmailAttachment {
private static final ParseField INLINE = new ParseField("inline");
private static final ParseField AUTH = new ParseField("auth");
private static final ParseField INTERVAL = new ParseField("interval");
private static final ParseField RETRIES = new ParseField("retries");
private static final ParseField URL = new ParseField("url");
private final boolean inline;
private final String id;
private final HttpAuth auth;
private final String url;
private final TimeValue interval;
private final Integer retries;
public ReportingAttachment(String id, String url, boolean inline) {
this(id, url, inline, null, null, null);
}
public ReportingAttachment(String id, String url, boolean inline, @Nullable TimeValue interval, @Nullable Integer retries,
@Nullable HttpAuth auth) {
this.id = id;
this.url = url;
this.retries = retries;
this.inline = inline;
this.auth = auth;
this.interval = interval;
if (retries != null && retries < 0) {
throw new IllegalArgumentException("Retries for attachment must be >= 0");
}
}
@Override
public String type() {
return ReportingAttachmentParser.TYPE;
}
@Override
public String id() {
return id;
}
@Override
public boolean inline() {
return inline;
}
public HttpAuth auth() {
return auth;
}
public String url() {
return url;
}
public TimeValue interval() {
return interval;
}
public Integer retries() {
return retries;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(id).startObject(ReportingAttachmentParser.TYPE)
.field(URL.getPreferredName(), url);
if (retries != null) {
builder.field(RETRIES.getPreferredName(), retries);
}
if (interval != null) {
builder.field(INTERVAL.getPreferredName(), interval);
}
if (inline) {
builder.field(INLINE.getPreferredName(), inline);
}
if (auth != null) {
builder.startObject(AUTH.getPreferredName());
builder.field(auth.type(), auth, params);
builder.endObject();
}
return builder.endObject().endObject();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ReportingAttachment otherAttachment = (ReportingAttachment) o;
return Objects.equals(id, otherAttachment.id) && Objects.equals(url, otherAttachment.url) &&
Objects.equals(interval, otherAttachment.interval) && Objects.equals(inline, otherAttachment.inline) &&
Objects.equals(retries, otherAttachment.retries) && Objects.equals(auth, otherAttachment.auth);
}
@Override
public int hashCode() {
return Objects.hash(id, url, interval, inline, retries, auth);
}
}

View File

@ -0,0 +1,300 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.notification.email.attachment;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.ParseFieldMatcherSupplier;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.common.http.HttpClient;
import org.elasticsearch.xpack.common.http.HttpMethod;
import org.elasticsearch.xpack.common.http.HttpRequest;
import org.elasticsearch.xpack.common.http.HttpRequestTemplate;
import org.elasticsearch.xpack.common.http.HttpResponse;
import org.elasticsearch.xpack.common.http.auth.HttpAuth;
import org.elasticsearch.xpack.common.http.auth.HttpAuthRegistry;
import org.elasticsearch.xpack.common.text.TextTemplate;
import org.elasticsearch.xpack.common.text.TextTemplateEngine;
import org.elasticsearch.xpack.notification.email.Attachment;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.watcher.support.Variables;
import org.elasticsearch.xpack.watcher.watch.Payload;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Map;
public class ReportingAttachmentParser implements EmailAttachmentParser<ReportingAttachment> {
public static final String TYPE = "reporting";
// total polling of 10 minutes happens this way by default
public static final Setting<TimeValue> INTERVAL_SETTING =
Setting.timeSetting("xpack.notification.reporting.interval", TimeValue.timeValueSeconds(15), Setting.Property.NodeScope);
public static final Setting<Integer> RETRIES_SETTING =
Setting.intSetting("xpack.notification.reporting.retries", 40, 0, Setting.Property.NodeScope);
private static final ObjectParser<Builder, AuthParseFieldMatcher> PARSER = new ObjectParser<>("reporting_attachment");
private static final ObjectParser<KibanaReportingPayload, ParseFieldMatcherSupplier> PAYLOAD_PARSER =
new ObjectParser<>("reporting_attachment_kibana_payload", true, null);
private static final ParseFieldMatcherSupplier STRICT_PARSING = () -> ParseFieldMatcher.STRICT;
static {
PARSER.declareInt(Builder::retries, new ParseField("retries"));
PARSER.declareBoolean(Builder::inline, new ParseField("inline"));
PARSER.declareString(Builder::interval, new ParseField("interval"));
PARSER.declareString(Builder::url, new ParseField("url"));
PARSER.declareObjectOrDefault(Builder::auth, (p, s) -> s.parseAuth(p), () -> null, new ParseField("auth"));
PAYLOAD_PARSER.declareString(KibanaReportingPayload::setPath, new ParseField("path"));
}
private final Logger logger;
private final TimeValue interval;
private final int retries;
private HttpClient httpClient;
private final TextTemplateEngine templateEngine;
private HttpAuthRegistry authRegistry;
public ReportingAttachmentParser(Settings settings, HttpClient httpClient,
TextTemplateEngine templateEngine, HttpAuthRegistry authRegistry) {
this.interval = INTERVAL_SETTING.get(settings);
this.retries = RETRIES_SETTING.get(settings);
this.httpClient = httpClient;
this.templateEngine = templateEngine;
this.authRegistry = authRegistry;
this.logger = Loggers.getLogger(getClass());
}
@Override
public String type() {
return TYPE;
}
@Override
public ReportingAttachment parse(String id, XContentParser parser) throws IOException {
Builder builder = new Builder(id);
PARSER.parse(parser, builder, new AuthParseFieldMatcher(authRegistry));
return builder.build();
}
@Override
public Attachment toAttachment(WatchExecutionContext context, Payload payload, ReportingAttachment attachment) throws IOException {
Map<String, Object> model = Variables.createCtxModel(context, payload);
String initialUrl = templateEngine.render(new TextTemplate(attachment.url()), model);
HttpRequestTemplate requestTemplate = HttpRequestTemplate.builder(initialUrl)
.connectionTimeout(TimeValue.timeValueSeconds(15))
.readTimeout(TimeValue.timeValueSeconds(15))
.method(HttpMethod.POST)
.auth(attachment.auth())
.putHeader("kbn-xsrf", new TextTemplate("reporting"))
.build();
HttpRequest request = requestTemplate.render(templateEngine, model);
HttpResponse reportGenerationResponse = requestReportGeneration(context.watch().id(), attachment.id(), request);
String path = extractIdFromJson(context.watch().id(), attachment.id(), reportGenerationResponse.body());
HttpRequestTemplate pollingRequestTemplate = HttpRequestTemplate.builder(request.host(), request.port())
.connectionTimeout(TimeValue.timeValueSeconds(10))
.readTimeout(TimeValue.timeValueSeconds(10))
.auth(attachment.auth())
.path(path)
.scheme(request.scheme())
.putHeader("kbn-xsrf", new TextTemplate("reporting"))
.build();
HttpRequest pollingRequest = pollingRequestTemplate.render(templateEngine, model);
int maxRetries = attachment.retries() != null ? attachment.retries() : this.retries;
long sleepMillis = getSleepMillis(context, attachment);
int retryCount = 0;
while (retryCount < maxRetries) {
retryCount++;
// IMPORTANT NOTE: This is only a temporary solution until we made the execution of watcher more async
// This still blocks other executions on the thread and we have to get away from that
sleep(sleepMillis, context, attachment);
HttpResponse response = httpClient.execute(pollingRequest);
if (response.status() == 503) {
// requires us to interval another run, no action to take, except logging
logger.trace("Watch[{}] reporting[{}] pdf is not ready, polling in [{}] again", context.watch().id(), attachment.id(),
TimeValue.timeValueMillis(sleepMillis));
} else if (response.status() >= 400) {
throw new ElasticsearchException("Watch[{}] reporting[{}] Error when polling pdf from host[{}], port[{}], " +
"method[{}], path[{}], status[{}]", context.watch().id(), attachment.id(), request.host(), request.port(),
request.method(), request.path(), reportGenerationResponse.status());
} else if (response.status() == 200) {
return new Attachment.Bytes(attachment.id(), BytesReference.toBytes(response.body()),
response.contentType(), attachment.inline());
} else {
String message = LoggerMessageFormat.format("", "Watch[{}] reporting[{}] Unexpected status code host[{}], port[{}], " +
"method[{}], path[{}], status[{}]", context.watch().id(), attachment.id(), request.host(), request.port(),
request.method(), request.path(), reportGenerationResponse.status());
throw new IllegalStateException(message);
}
}
throw new ElasticsearchException("Watch[{}] reporting[{}]: Aborting due to maximum number of retries hit [{}]",
context.watch().id(), attachment.id(), maxRetries);
}
private void sleep(long sleepMillis, WatchExecutionContext context, ReportingAttachment attachment) {
try {
Thread.sleep(sleepMillis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ElasticsearchException("Watch[{}] reporting[{}] thread was interrupted, while waiting for polling. Aborting.",
context.watch().id(), attachment.id());
}
}
/**
* Use the default time to sleep between polls if it was not set
*/
private long getSleepMillis(WatchExecutionContext context, ReportingAttachment attachment) {
long sleepMillis;
if (attachment.interval() == null) {
sleepMillis = interval.millis();
logger.trace("Watch[{}] reporting[{}] invalid interval configuration [{}], using configured default [{}]", context.watch().id(),
attachment.id(), attachment.interval(), this.interval);
} else {
sleepMillis = attachment.interval().millis();
}
return sleepMillis;
}
/**
* Trigger the initial report generation and catch possible exceptions
*/
private HttpResponse requestReportGeneration(String watchId, String attachmentId, HttpRequest request) throws IOException {
HttpResponse response = httpClient.execute(request);
if (response.status() != 200) {
throw new ElasticsearchException("Watch[{}] reporting[{}] Error response when trying to trigger reporting generation " +
"host[{}], port[{}] method[{}], path[{}], status[{}]", watchId, attachmentId, request.host(),
request.port(), request.method(), request.path(), response.status());
}
return response;
}
/**
* Extract the id from JSON payload, so we know which ID to poll for
*/
private String extractIdFromJson(String watchId, String attachmentId, BytesReference body) throws IOException {
try (XContentParser parser = JsonXContent.jsonXContent.createParser(body)) {
KibanaReportingPayload payload = new KibanaReportingPayload();
PAYLOAD_PARSER.parse(parser, payload, STRICT_PARSING);
String path = payload.getPath();
if (Strings.isEmpty(path)) {
throw new ElasticsearchException("Watch[{}] reporting[{}] field path found in JSON payload, payload was {}",
watchId, attachmentId, body.utf8ToString());
}
return path;
}
}
/**
* A helper class to parse the HTTPAuth data, which is read by an old school pull parser, that is handed over in the ctor.
* See the static parser definition at the top
*/
private static class AuthParseFieldMatcher implements ParseFieldMatcherSupplier {
private final HttpAuthRegistry authRegistry;
AuthParseFieldMatcher(HttpAuthRegistry authRegistry) {
this.authRegistry = authRegistry;
}
@Override
public ParseFieldMatcher getParseFieldMatcher() {
return ParseFieldMatcher.EMPTY;
}
public HttpAuth parseAuth(XContentParser parser) {
try {
return authRegistry.parse(parser);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
/**
* Helper class to extract the URL path of the dashboard from the response after a report was triggered
*
* Example JSON: { "path" : "/path/to/dashboard.pdf", ... otherstuff ... }
*/
static class KibanaReportingPayload {
private String path;
public String getPath() {
return path;
}
public void setPath(String path) {
this.path = path;
}
}
/**
* Builder helper class used by the ObjectParser to create an attachment from xcontent input
*/
static class Builder {
private final String id;
private boolean inline;
private String url;
private TimeValue interval;
private Integer retries;
private HttpAuth auth;
Builder(String id) {
this.id = id;
}
Builder url(String url) {
this.url = url;
return this;
}
// package protected, so it can be used by the object parser in ReportingAttachmentParser
Builder interval(String waitTime) {
this.interval = TimeValue.parseTimeValue(waitTime, "attachment.reporting.interval");
return this;
}
Builder retries(Integer retries) {
this.retries = retries;
return this;
}
Builder inline(boolean inline) {
this.inline = inline;
return this;
}
Builder auth(HttpAuth auth) {
this.auth = auth;
return this;
}
ReportingAttachment build() {
return new ReportingAttachment(id, url, inline, interval, retries, auth);
}
}
}

View File

@ -182,6 +182,7 @@ public class Watcher implements ActionPlugin, ScriptPlugin {
settings.add(Setting.simpleString("xpack.watcher.trigger.schedule.ticker.tick_interval", Setting.Property.NodeScope));
settings.add(Setting.simpleString("xpack.watcher.execution.scroll.timeout", Setting.Property.NodeScope));
settings.add(Setting.simpleString("xpack.watcher.start_immediately", Setting.Property.NodeScope));
return settings;
}

View File

@ -20,6 +20,7 @@ import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.watcher.support.Variables;
import org.elasticsearch.xpack.watcher.watch.Payload;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@ -55,7 +56,7 @@ public class ExecutableEmailAction extends ExecutableAction<EmailAction> {
try {
Attachment attachment = parser.toAttachment(ctx, payload, emailAttachment);
attachments.put(attachment.id(), attachment);
} catch (ElasticsearchException e) {
} catch (ElasticsearchException | IOException e) {
return new EmailAction.Result.Failure(action.type(), e.getMessage());
}
}

View File

@ -138,10 +138,9 @@ public class HttpEmailAttachementParserTests extends ESTestCase {
HttpRequestAttachment attachment = new HttpRequestAttachment("someid", requestTemplate, false, null);
WatchExecutionContext ctx = createWatchExecutionContext();
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
IOException exception = expectThrows(IOException.class,
() -> attachmentParsers.get(HttpEmailAttachementParser.TYPE).toAttachment(ctx, new Payload.Simple(), attachment));
assertThat(exception.getMessage(), is("Watch[watch1] attachment[someid] Error executing HTTP request host[localhost], port[80], " +
"method[GET], path[foo], exception[whatever]"));
assertThat(exception.getMessage(), is("whatever"));
}
private WatchExecutionContext createWatchExecutionContext() {

View File

@ -0,0 +1,392 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.notification.email.attachment;
import com.fasterxml.jackson.core.io.JsonEOFException;
import com.google.common.collect.Maps;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.common.http.HttpClient;
import org.elasticsearch.xpack.common.http.HttpMethod;
import org.elasticsearch.xpack.common.http.HttpRequest;
import org.elasticsearch.xpack.common.http.HttpResponse;
import org.elasticsearch.xpack.common.http.auth.HttpAuth;
import org.elasticsearch.xpack.common.http.auth.HttpAuthFactory;
import org.elasticsearch.xpack.common.http.auth.HttpAuthRegistry;
import org.elasticsearch.xpack.common.http.auth.basic.BasicAuth;
import org.elasticsearch.xpack.common.http.auth.basic.BasicAuthFactory;
import org.elasticsearch.xpack.common.text.TextTemplate;
import org.elasticsearch.xpack.common.text.TextTemplateEngine;
import org.elasticsearch.xpack.notification.email.Attachment;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.watcher.execution.Wid;
import org.elasticsearch.xpack.watcher.test.MockTextTemplateEngine;
import org.elasticsearch.xpack.watcher.watch.Payload;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.mockExecutionContextBuilder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.core.Is.is;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class ReportingAttachmentParserTests extends ESTestCase {
private HttpClient httpClient;
private Map<String, EmailAttachmentParser> attachmentParsers = new HashMap<>();
private EmailAttachmentsParser emailAttachmentsParser;
private ReportingAttachmentParser reportingAttachmentParser;
private HttpAuthRegistry authRegistry;
private MockTextTemplateEngine templateEngine = new MockTextTemplateEngine();
private String dashboardUrl = "http://www.example.org/ovb/api/reporting/generate/dashboard/My-Dashboard";
@Before
public void init() throws Exception {
httpClient = mock(HttpClient.class);
Map<String, HttpAuthFactory> factories = MapBuilder.<String, HttpAuthFactory>newMapBuilder()
.put("basic", new BasicAuthFactory(null))
.immutableMap();
authRegistry = new HttpAuthRegistry(factories);
reportingAttachmentParser = new ReportingAttachmentParser(Settings.EMPTY, httpClient, templateEngine, authRegistry);
attachmentParsers.put(ReportingAttachmentParser.TYPE, reportingAttachmentParser);
emailAttachmentsParser = new EmailAttachmentsParser(attachmentParsers);
}
public void testSerializationWorks() throws Exception {
String id = "some-id";
XContentBuilder builder = jsonBuilder().startObject().startObject(id)
.startObject(ReportingAttachmentParser.TYPE)
.field("url", dashboardUrl);
Integer retries = null;
boolean withRetries = randomBoolean();
if (withRetries) {
retries = randomIntBetween(1, 10);
builder.field("retries", retries);
}
TimeValue interval = null;
boolean withInterval = randomBoolean();
if (withInterval) {
builder.field("interval", "1s");
interval = TimeValue.timeValueSeconds(1);
}
boolean isInline = randomBoolean();
if (isInline) {
builder.field("inline", true);
}
HttpAuth auth = null;
boolean withAuth = randomBoolean();
if (withAuth) {
builder.startObject("auth").startObject("basic")
.field("username", "foo")
.field("password", "secret")
.endObject().endObject();
auth = new BasicAuth("foo", "secret".toCharArray());
}
builder.endObject().endObject().endObject();
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
EmailAttachments emailAttachments = emailAttachmentsParser.parse(parser);
assertThat(emailAttachments.getAttachments(), hasSize(1));
XContentBuilder toXcontentBuilder = jsonBuilder().startObject();
List<EmailAttachmentParser.EmailAttachment> attachments = new ArrayList<>(emailAttachments.getAttachments());
attachments.get(0).toXContent(toXcontentBuilder, ToXContent.EMPTY_PARAMS);
toXcontentBuilder.endObject();
assertThat(toXcontentBuilder.string(), is(builder.string()));
XContentBuilder attachmentXContentBuilder = jsonBuilder().startObject();
ReportingAttachment attachment = new ReportingAttachment(id, dashboardUrl, isInline, interval, retries, auth);
attachment.toXContent(attachmentXContentBuilder, ToXContent.EMPTY_PARAMS);
attachmentXContentBuilder.endObject();
assertThat(attachmentXContentBuilder.string(), is(builder.string()));
assertThat(attachments.get(0).inline(), is(isInline));
}
public void testGoodCase() throws Exception {
// returns interval HTTP code for five times, then return expected data
String content = randomAsciiOfLength(200);
String path = "/ovb/api/reporting/jobs/download/iu5zfzvk15oa8990bfas9wy2";
String randomContentType = randomAsciiOfLength(20);
Map<String, String[]> headers = Maps.newHashMap();
headers.put("Content-Type", new String[] { randomContentType });
when(httpClient.execute(any(HttpRequest.class)))
.thenReturn(new HttpResponse(200, "{\"path\":\""+ path +"\", \"other\":\"content\"}"))
.thenReturn(new HttpResponse(503))
.thenReturn(new HttpResponse(503))
.thenReturn(new HttpResponse(503))
.thenReturn(new HttpResponse(503))
.thenReturn(new HttpResponse(503))
.thenReturn(new HttpResponse(200, content, headers));
ReportingAttachment reportingAttachment =
new ReportingAttachment("foo", dashboardUrl, randomBoolean(), TimeValue.timeValueMillis(1), 10, null);
Attachment attachment = reportingAttachmentParser.toAttachment(createWatchExecutionContext(), Payload.EMPTY, reportingAttachment);
assertThat(attachment, instanceOf(Attachment.Bytes.class));
Attachment.Bytes bytesAttachment = (Attachment.Bytes) attachment;
assertThat(new String(bytesAttachment.bytes(), StandardCharsets.UTF_8), is(content));
assertThat(bytesAttachment.contentType(), is(randomContentType));
ArgumentCaptor<HttpRequest> requestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class);
verify(httpClient, times(7)).execute(requestArgumentCaptor.capture());
assertThat(requestArgumentCaptor.getAllValues(), hasSize(7));
// first invocation to the original URL
assertThat(requestArgumentCaptor.getAllValues().get(0).path(), is("/ovb/api/reporting/generate/dashboard/My-Dashboard"));
assertThat(requestArgumentCaptor.getAllValues().get(0).method(), is(HttpMethod.POST));
// all other invocations to the redirected urls from the JSON payload
for (int i = 1; i < 7; i++) {
assertThat(requestArgumentCaptor.getAllValues().get(i).path(), is(path));
assertThat(requestArgumentCaptor.getAllValues().get(i).params().keySet(), hasSize(0));
}
// test that the header "kbn-xsrf" has been set to "reporting" in all requests
requestArgumentCaptor.getAllValues().stream().forEach((req) -> {
assertThat(req.headers(), hasEntry("kbn-xsrf", "reporting"));
});
}
public void testInitialRequestFailsWithError() throws Exception {
when(httpClient.execute(any(HttpRequest.class)))
.thenReturn(new HttpResponse(403));
ReportingAttachment attachment = new ReportingAttachment("foo", dashboardUrl, randomBoolean());
ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> reportingAttachmentParser.toAttachment(createWatchExecutionContext(), Payload.EMPTY, attachment));
assertThat(e.getMessage(), containsString("Error response when trying to trigger reporting generation"));
}
public void testInitialRequestThrowsIOException() throws Exception {
when(httpClient.execute(any(HttpRequest.class))).thenThrow(new IOException("Connection timed out"));
ReportingAttachment attachment = new ReportingAttachment("foo", "http://www.example.org/", randomBoolean());
IOException e = expectThrows(IOException.class,
() -> reportingAttachmentParser.toAttachment(createWatchExecutionContext(), Payload.EMPTY, attachment));
assertThat(e.getMessage(), containsString("Connection timed out"));
}
public void testInitialRequestContainsInvalidPayload() throws Exception {
when(httpClient.execute(any(HttpRequest.class)))
// closing json bracket is missing
.thenReturn(new HttpResponse(200, "{\"path\":\"anything\""));
ReportingAttachment attachment = new ReportingAttachment("foo", dashboardUrl, randomBoolean());
JsonEOFException e = expectThrows(JsonEOFException.class,
() -> reportingAttachmentParser.toAttachment(createWatchExecutionContext(), Payload.EMPTY, attachment));
assertThat(e.getMessage(), containsString("Unexpected end-of-input"));
}
public void testInitialRequestContainsPathAsObject() throws Exception {
when(httpClient.execute(any(HttpRequest.class)))
// closing json bracket is missing
.thenReturn(new HttpResponse(200, "{\"path\": { \"foo\" : \"anything\"}}"));
ReportingAttachment attachment = new ReportingAttachment("foo", "http://www.example.org/", randomBoolean());
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> reportingAttachmentParser.toAttachment(createWatchExecutionContext(), Payload.EMPTY, attachment));
assertThat(e.getMessage(),
containsString("[reporting_attachment_kibana_payload] path doesn't support values of type: START_OBJECT"));
}
public void testInitialRequestDoesNotContainPathInJson() throws Exception {
when(httpClient.execute(any(HttpRequest.class))).thenReturn(new HttpResponse(200, "{\"foo\":\"bar\"}"));
ReportingAttachment attachment = new ReportingAttachment("foo", dashboardUrl, randomBoolean());
ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> reportingAttachmentParser.toAttachment(createWatchExecutionContext(), Payload.EMPTY, attachment));
assertThat(e.getMessage(), containsString("Watch[watch1] reporting[foo] field path found in JSON payload"));
}
public void testPollingRequestIsError() throws Exception {
when(httpClient.execute(any(HttpRequest.class)))
.thenReturn(new HttpResponse(200, "{\"path\":\"whatever\"}"))
.thenReturn(new HttpResponse(403));
ReportingAttachment attachment =
new ReportingAttachment("foo", "http://www.example.org/", randomBoolean(), TimeValue.timeValueMillis(1), 10, null);
ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> reportingAttachmentParser.toAttachment(createWatchExecutionContext(), Payload.EMPTY, attachment));
assertThat(e.getMessage(), containsString("Error when polling pdf"));
}
public void testPollingRequestRetryIsExceeded() throws Exception {
when(httpClient.execute(any(HttpRequest.class)))
.thenReturn(new HttpResponse(200, "{\"path\":\"whatever\"}"))
.thenReturn(new HttpResponse(503))
.thenReturn(new HttpResponse(503));
ReportingAttachment attachment =
new ReportingAttachment("foo", "http://www.example.org/", randomBoolean(), TimeValue.timeValueMillis(1), 1, null);
ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> reportingAttachmentParser.toAttachment(createWatchExecutionContext(), Payload.EMPTY, attachment));
assertThat(e.getMessage(), containsString("Aborting due to maximum number of retries hit [1]"));
}
public void testPollingRequestUnknownHTTPError() throws Exception {
when(httpClient.execute(any(HttpRequest.class)))
.thenReturn(new HttpResponse(200, "{\"path\":\"whatever\"}"))
.thenReturn(new HttpResponse(1));
ReportingAttachment attachment =
new ReportingAttachment("foo", "http://www.example.org/", randomBoolean(), TimeValue.timeValueMillis(1), null, null);
IllegalStateException e = expectThrows(IllegalStateException.class,
() -> reportingAttachmentParser.toAttachment(createWatchExecutionContext(), Payload.EMPTY, attachment));
assertThat(e.getMessage(), containsString("Unexpected status code"));
}
public void testPollingRequestIOException() throws Exception {
when(httpClient.execute(any(HttpRequest.class)))
.thenReturn(new HttpResponse(200, "{\"path\":\"whatever\"}"))
.thenThrow(new IOException("whatever"));
ReportingAttachment attachment =
new ReportingAttachment("foo", "http://www.example.org/", randomBoolean(), TimeValue.timeValueMillis(1), null, null);
IOException e = expectThrows(IOException.class,
() -> reportingAttachmentParser.toAttachment(createWatchExecutionContext(), Payload.EMPTY, attachment));
assertThat(e.getMessage(), containsString("whatever"));
}
public void testWithBasicAuth() throws Exception {
String content = randomAsciiOfLength(200);
when(httpClient.execute(any(HttpRequest.class)))
.thenReturn(new HttpResponse(200, "{\"path\":\"whatever\"}"))
.thenReturn(new HttpResponse(503))
.thenReturn(new HttpResponse(200, content));
ReportingAttachment attachment = new ReportingAttachment("foo", dashboardUrl, randomBoolean(),
TimeValue.timeValueMillis(1), 10, new BasicAuth("foo", "bar".toCharArray()));
reportingAttachmentParser.toAttachment(createWatchExecutionContext(), Payload.EMPTY, attachment);
ArgumentCaptor<HttpRequest> requestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class);
verify(httpClient, times(3)).execute(requestArgumentCaptor.capture());
List<HttpRequest> allRequests = requestArgumentCaptor.getAllValues();
assertThat(allRequests, hasSize(3));
for (HttpRequest request : allRequests) {
assertThat(request.auth(), is(notNullValue()));
assertThat(request.auth().type(), is("basic"));
assertThat(request.auth(), instanceOf(BasicAuth.class));
BasicAuth basicAuth = (BasicAuth) request.auth();
assertThat(basicAuth.getUsername(), is("foo"));
}
}
public void testPollingDefaultsRetries() throws Exception {
when(httpClient.execute(any(HttpRequest.class)))
.thenReturn(new HttpResponse(200, "{\"path\":\"whatever\"}"))
.thenReturn(new HttpResponse(503));
ReportingAttachment attachment = new ReportingAttachment("foo", dashboardUrl, randomBoolean(), TimeValue.timeValueMillis(1),
ReportingAttachmentParser.RETRIES_SETTING.getDefault(Settings.EMPTY), new BasicAuth("foo", "bar".toCharArray()));
expectThrows(ElasticsearchException.class, () ->
reportingAttachmentParser.toAttachment(createWatchExecutionContext(), Payload.EMPTY, attachment));
verify(httpClient, times(ReportingAttachmentParser.RETRIES_SETTING.getDefault(Settings.EMPTY) + 1)).execute(any());
}
public void testPollingDefaultCanBeOverriddenBySettings() throws Exception {
int retries = 10;
when(httpClient.execute(any(HttpRequest.class)))
.thenReturn(new HttpResponse(200, "{\"path\":\"whatever\"}"))
.thenReturn(new HttpResponse(503));
ReportingAttachment attachment = new ReportingAttachment("foo", dashboardUrl, randomBoolean());
Settings settings = Settings.builder()
.put(ReportingAttachmentParser.INTERVAL_SETTING.getKey(), "1ms")
.put(ReportingAttachmentParser.RETRIES_SETTING.getKey(), retries)
.build();
reportingAttachmentParser = new ReportingAttachmentParser(settings, httpClient, templateEngine, authRegistry);
expectThrows(ElasticsearchException.class, () ->
reportingAttachmentParser.toAttachment(createWatchExecutionContext(), Payload.EMPTY, attachment));
verify(httpClient, times(retries + 1)).execute(any());
}
public void testThatUrlIsTemplatable() throws Exception {
when(httpClient.execute(any(HttpRequest.class)))
.thenReturn(new HttpResponse(200, "{\"path\":\"whatever\"}"))
.thenReturn(new HttpResponse(503))
.thenReturn(new HttpResponse(200, randomAsciiOfLength(10)));
TextTemplateEngine replaceHttpWithHttpsTemplateEngine = new TextTemplateEngine(Settings.EMPTY, null) {
@Override
public String render(TextTemplate textTemplate, Map<String, Object> model) {
return textTemplate.getTemplate().replaceAll("REPLACEME", "REPLACED");
}
};
ReportingAttachment attachment = new ReportingAttachment("foo", "http://www.example.org/REPLACEME", randomBoolean(),
TimeValue.timeValueMillis(1), 10, new BasicAuth("foo", "bar".toCharArray()));
reportingAttachmentParser = new ReportingAttachmentParser(Settings.EMPTY, httpClient,
replaceHttpWithHttpsTemplateEngine, authRegistry);
reportingAttachmentParser.toAttachment(createWatchExecutionContext(), Payload.EMPTY, attachment);
ArgumentCaptor<HttpRequest> requestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class);
verify(httpClient, times(3)).execute(requestArgumentCaptor.capture());
List<String> paths = requestArgumentCaptor.getAllValues().stream().map(HttpRequest::path).collect(Collectors.toList());
assertThat(paths, not(hasItem(containsString("REPLACEME"))));
}
public void testRetrySettingCannotBeNegative() throws Exception {
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () ->
new ReportingAttachment("foo", "http://www.example.org/REPLACEME", randomBoolean(), null, -10, null));
assertThat(e.getMessage(), is("Retries for attachment must be >= 0"));
Settings invalidSettings = Settings.builder().put("xpack.notification.reporting.retries", -10).build();
e = expectThrows(IllegalArgumentException.class,
() -> new ReportingAttachmentParser(invalidSettings, httpClient, templateEngine, authRegistry));
assertThat(e.getMessage(), is("Failed to parse value [-10] for setting [xpack.notification.reporting.retries] must be >= 0"));
}
private WatchExecutionContext createWatchExecutionContext() {
DateTime now = DateTime.now(DateTimeZone.UTC);
return mockExecutionContextBuilder("watch1")
.wid(new Wid(randomAsciiOfLength(5), randomLong(), now))
.payload(new Payload.Simple())
.time("watch1", now)
.metadata(Collections.emptyMap())
.buildMock();
}
}