Merge branch 'master' into feature/sql

Original commit: elastic/x-pack-elasticsearch@360ed34b70
This commit is contained in:
Nik Everett 2017-08-08 14:39:32 -04:00
commit 4c7e02191f
59 changed files with 490 additions and 502 deletions

View File

@ -29,6 +29,7 @@ import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.xpack.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
import static org.hamcrest.Matchers.is;
public class XDocsClientYamlTestSuiteIT extends ESClientYamlSuiteTestCase {
private static final String USER_TOKEN = basicAuthHeaderValue("test_admin", new SecureString("x-pack-test-password".toCharArray()));
@ -78,9 +79,25 @@ public class XDocsClientYamlTestSuiteIT extends ESClientYamlSuiteTestCase {
*/
@After
public void reenableWatcher() throws Exception {
if (isWatcherTest()) {
assertBusy(() -> {
ClientYamlTestResponse response =
getAdminExecutionContext().callApi("xpack.watcher.stats", emptyMap(), emptyList(), emptyMap());
String state = (String) response.evaluate("stats.0.watcher_state");
if (state.equals("started") == false || state.equals("starting") == false) {
getAdminExecutionContext().callApi("xpack.watcher.start", emptyMap(), emptyList(), emptyMap());
}
assertThat(state, is("started"));
});
}
}
private boolean isWatcherTest() {
String testName = getTestName();
return testName != null && testName.contains("watcher");
}
/**
* Deletes users after every test just in case any test adds any.
*/

View File

@ -154,7 +154,6 @@ public class HttpClient extends AbstractComponent {
// timeouts
if (request.connectionTimeout() != null) {
config.setConnectTimeout(Math.toIntExact(request.connectionTimeout.millis()));
} else {
config.setConnectTimeout(Math.toIntExact(defaultConnectionTimeout.millis()));

View File

@ -155,8 +155,9 @@ public class MachineLearning implements ActionPlugin {
public static final Setting<Boolean> AUTODETECT_PROCESS =
Setting.boolSetting("xpack.ml.autodetect_process", true, Property.NodeScope);
public static final Setting<Boolean> ML_ENABLED =
Setting.boolSetting("node.ml", XPackSettings.MACHINE_LEARNING_ENABLED, Setting.Property.NodeScope);
Setting.boolSetting("node.ml", XPackSettings.MACHINE_LEARNING_ENABLED, Property.NodeScope);
public static final String ML_ENABLED_NODE_ATTR = "ml.enabled";
public static final String MAX_OPEN_JOBS_NODE_ATTR = "ml.max_open_jobs";
public static final Setting<Integer> CONCURRENT_JOB_ALLOCATIONS =
Setting.intSetting("xpack.ml.node_concurrent_job_allocations", 2, 0, Property.Dynamic, Property.NodeScope);
@ -200,7 +201,10 @@ public class MachineLearning implements ActionPlugin {
Settings.Builder additionalSettings = Settings.builder();
Boolean allocationEnabled = ML_ENABLED.get(settings);
if (allocationEnabled != null && allocationEnabled) {
// TODO: the simple true/false flag will not be required once all supported versions have the number - consider removing in 7.0
additionalSettings.put("node.attr." + ML_ENABLED_NODE_ATTR, "true");
additionalSettings.put("node.attr." + MAX_OPEN_JOBS_NODE_ATTR,
AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.get(settings));
}
return additionalSettings.build();
}

View File

@ -42,7 +42,6 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
@ -566,14 +565,20 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
private final AutodetectProcessManager autodetectProcessManager;
private final int maxNumberOfOpenJobs;
/**
* The maximum number of open jobs can be different on each node. However, nodes on older versions
* won't add their setting to the cluster state, so for backwards compatibility with these nodes we
* assume the older node's setting is the same as that of the node running this code.
* TODO: remove this member in 7.0
*/
private final int fallbackMaxNumberOfOpenJobs;
private volatile int maxConcurrentJobAllocations;
public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterService,
AutodetectProcessManager autodetectProcessManager) {
super(settings, TASK_NAME, MachineLearning.UTILITY_THREAD_POOL_NAME);
this.autodetectProcessManager = autodetectProcessManager;
this.maxNumberOfOpenJobs = AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.get(settings);
this.fallbackMaxNumberOfOpenJobs = AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.get(settings);
this.maxConcurrentJobAllocations = MachineLearning.CONCURRENT_JOB_ALLOCATIONS.get(settings);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, this::setMaxConcurrentJobAllocations);
@ -581,7 +586,8 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
@Override
public Assignment getAssignment(JobParams params, ClusterState clusterState) {
return selectLeastLoadedMlNode(params.getJobId(), clusterState, maxConcurrentJobAllocations, maxNumberOfOpenJobs, logger);
return selectLeastLoadedMlNode(params.getJobId(), clusterState, maxConcurrentJobAllocations, fallbackMaxNumberOfOpenJobs,
logger);
}
@Override
@ -591,7 +597,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE);
OpenJobAction.validate(params.getJobId(), mlMetadata);
Assignment assignment = selectLeastLoadedMlNode(params.getJobId(), clusterState, maxConcurrentJobAllocations,
maxNumberOfOpenJobs, logger);
fallbackMaxNumberOfOpenJobs, logger);
if (assignment.getExecutorNode() == null) {
String msg = "Could not open job because no suitable nodes were found, allocation explanation ["
+ assignment.getExplanation() + "]";
@ -649,7 +655,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
}
static Assignment selectLeastLoadedMlNode(String jobId, ClusterState clusterState, int maxConcurrentJobAllocations,
long maxNumberOfOpenJobs, Logger logger) {
int fallbackMaxNumberOfOpenJobs, Logger logger) {
List<String> unavailableIndices = verifyIndicesPrimaryShardsAreActive(jobId, clusterState);
if (unavailableIndices.size() != 0) {
String reason = "Not opening job [" + jobId + "], because not all primary shards are active for the following indices [" +
@ -716,6 +722,20 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
continue;
}
String maxNumberOfOpenJobsStr = nodeAttributes.get(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR);
int maxNumberOfOpenJobs = fallbackMaxNumberOfOpenJobs;
// TODO: remove leniency and reject the node if the attribute is null in 7.0
if (maxNumberOfOpenJobsStr != null) {
try {
maxNumberOfOpenJobs = Integer.parseInt(maxNumberOfOpenJobsStr);
} catch (NumberFormatException e) {
String reason = "Not opening job [" + jobId + "] on node [" + node + "], because " +
MachineLearning.MAX_OPEN_JOBS_NODE_ATTR + " attribute [" + maxNumberOfOpenJobsStr + "] is not an integer";
logger.trace(reason);
reasons.add(reason);
continue;
}
}
long available = maxNumberOfOpenJobs - numberOfAssignedJobs;
if (available == 0) {
String reason = "Not opening job [" + jobId + "] on node [" + node + "], because this node is full. " +

View File

@ -694,11 +694,6 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
public void setGroups(List<String> groups) {
this.groups = groups == null ? Collections.emptyList() : groups;
for (String group : this.groups) {
if (MlStrings.isValidId(group) == false) {
throw new IllegalArgumentException(Messages.getMessage(Messages.INVALID_GROUP, group));
}
}
}
public Date getCreateTime() {
@ -994,6 +989,8 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
throw new IllegalArgumentException(Messages.getMessage(Messages.JOB_CONFIG_ID_TOO_LONG, MAX_JOB_ID_LENGTH));
}
validateGroups();
// Results index name not specified in user input means use the default, so is acceptable in this validation
if (!Strings.isNullOrEmpty(resultsIndexName) && !MlStrings.isValidId(resultsIndexName)) {
throw new IllegalArgumentException(
@ -1003,6 +1000,14 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
// Creation time is NOT required in user input, hence validated only on build
}
private void validateGroups() {
for (String group : this.groups) {
if (MlStrings.isValidId(group) == false) {
throw new IllegalArgumentException(Messages.getMessage(Messages.INVALID_GROUP, group));
}
}
}
/**
* Builds a job with the given {@code createTime} and the current version.
* This should be used when a new job is created as opposed to {@link #build()}.

View File

@ -73,8 +73,11 @@ public class StateProcessor extends AbstractComponent {
// No more zero bytes in this block
break;
}
// Ignore completely empty chunks
if (nextZeroByte > splitFrom) {
// No validation - assume the native process has formatted the state correctly
persist(jobId, bytesRef.slice(splitFrom, nextZeroByte - splitFrom));
}
splitFrom = nextZeroByte + 1;
}
if (splitFrom >= bytesRef.length()) {

View File

@ -31,7 +31,6 @@ import org.elasticsearch.xpack.monitoring.collector.Collector;
import org.elasticsearch.xpack.monitoring.collector.cluster.ClusterStatsCollector;
import org.elasticsearch.xpack.monitoring.collector.indices.IndexRecoveryCollector;
import org.elasticsearch.xpack.monitoring.collector.indices.IndexStatsCollector;
import org.elasticsearch.xpack.monitoring.collector.indices.IndicesStatsCollector;
import org.elasticsearch.xpack.monitoring.collector.ml.JobStatsCollector;
import org.elasticsearch.xpack.monitoring.collector.node.NodeStatsCollector;
import org.elasticsearch.xpack.monitoring.collector.shards.ShardsCollector;
@ -117,7 +116,6 @@ public class Monitoring implements ActionPlugin {
final Exporters exporters = new Exporters(settings, exporterFactories, clusterService, licenseState, threadPool.getThreadContext());
Set<Collector> collectors = new HashSet<>();
collectors.add(new IndicesStatsCollector(settings, clusterService, monitoringSettings, licenseState, client));
collectors.add(new IndexStatsCollector(settings, clusterService, monitoringSettings, licenseState, client));
collectors.add(new ClusterStatsCollector(settings, clusterService, monitoringSettings, licenseState, client, licenseService));
collectors.add(new ShardsCollector(settings, clusterService, monitoringSettings, licenseState));

View File

@ -57,12 +57,6 @@ public class MonitoringSettings extends AbstractComponent {
public static final Setting<TimeValue> INDEX_STATS_TIMEOUT =
timeSetting(collectionKey("index.stats.timeout"), TimeValue.timeValueSeconds(10), Property.Dynamic, Property.NodeScope);
/**
* Timeout value when collecting total indices statistics (default to 10s)
*/
public static final Setting<TimeValue> INDICES_STATS_TIMEOUT =
timeSetting(collectionKey("indices.stats.timeout"), TimeValue.timeValueSeconds(10), Property.Dynamic, Property.NodeScope);
/**
* List of indices names whose stats will be exported (default to all indices)
*/
@ -127,7 +121,6 @@ public class MonitoringSettings extends AbstractComponent {
INTERVAL,
INDEX_RECOVERY_TIMEOUT,
INDEX_STATS_TIMEOUT,
INDICES_STATS_TIMEOUT,
INDEX_RECOVERY_ACTIVE_ONLY,
CLUSTER_STATE_TIMEOUT,
CLUSTER_STATS_TIMEOUT,
@ -142,7 +135,6 @@ public class MonitoringSettings extends AbstractComponent {
private volatile TimeValue indexStatsTimeout;
private volatile TimeValue indicesStatsTimeout;
private volatile TimeValue clusterStateTimeout;
private volatile TimeValue clusterStatsTimeout;
private volatile TimeValue recoveryTimeout;
@ -155,8 +147,6 @@ public class MonitoringSettings extends AbstractComponent {
setIndexStatsTimeout(INDEX_STATS_TIMEOUT.get(settings));
clusterSettings.addSettingsUpdateConsumer(INDEX_STATS_TIMEOUT, this::setIndexStatsTimeout);
setIndicesStatsTimeout(INDICES_STATS_TIMEOUT.get(settings));
clusterSettings.addSettingsUpdateConsumer(INDICES_STATS_TIMEOUT, this::setIndicesStatsTimeout);
setIndices(INDICES.get(settings));
clusterSettings.addSettingsUpdateConsumer(INDICES, this::setIndices);
setClusterStateTimeout(CLUSTER_STATE_TIMEOUT.get(settings));
@ -175,8 +165,6 @@ public class MonitoringSettings extends AbstractComponent {
return indexStatsTimeout;
}
public TimeValue indicesStatsTimeout() { return indicesStatsTimeout; }
public String[] indices() {
return indices;
}
@ -205,10 +193,6 @@ public class MonitoringSettings extends AbstractComponent {
this.indexStatsTimeout = indexStatsTimeout;
}
private void setIndicesStatsTimeout(TimeValue indicesStatsTimeout) {
this.indicesStatsTimeout = indicesStatsTimeout;
}
private void setClusterStateTimeout(TimeValue clusterStateTimeout) {
this.clusterStateTimeout = clusterStateTimeout;
}

View File

@ -24,10 +24,10 @@ import java.util.Collections;
import java.util.List;
/**
* Collector for indices statistics.
* Collector for indices and singular index statistics.
* <p>
* This collector runs on the master node only and collect a {@link IndexStatsMonitoringDoc}
* document for each existing index in the cluster.
* This collector runs on the master node only and collect a single {@link IndicesStatsMonitoringDoc} for the cluster and a
* {@link IndexStatsMonitoringDoc} document for each existing index in the cluster.
*/
public class IndexStatsCollector extends Collector {
@ -47,8 +47,8 @@ public class IndexStatsCollector extends Collector {
@Override
protected Collection<MonitoringDoc> doCollect() throws Exception {
List<MonitoringDoc> results = new ArrayList<>();
IndicesStatsResponse indicesStats = client.admin().indices().prepareStats()
final List<MonitoringDoc> results = new ArrayList<>();
final IndicesStatsResponse indicesStats = client.admin().indices().prepareStats()
.setIndices(monitoringSettings.indices())
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.clear()
@ -64,14 +64,18 @@ public class IndexStatsCollector extends Collector {
.setRequestCache(true)
.get(monitoringSettings.indexStatsTimeout());
long timestamp = System.currentTimeMillis();
String clusterUUID = clusterUUID();
DiscoveryNode sourceNode = localNode();
final long timestamp = System.currentTimeMillis();
final String clusterUUID = clusterUUID();
final DiscoveryNode sourceNode = localNode();
// add the indices stats that we use to collect the index stats
results.add(new IndicesStatsMonitoringDoc(monitoringId(), monitoringVersion(), clusterUUID, timestamp, sourceNode, indicesStats));
// collect each index stats document
for (IndexStats indexStats : indicesStats.getIndices().values()) {
results.add(new IndexStatsMonitoringDoc(monitoringId(), monitoringVersion(),
clusterUUID, timestamp, sourceNode, indexStats));
results.add(new IndexStatsMonitoringDoc(monitoringId(), monitoringVersion(), clusterUUID, timestamp, sourceNode, indexStats));
}
return Collections.unmodifiableCollection(results);
}
}

View File

@ -1,61 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring.collector.indices;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.xpack.monitoring.MonitoringSettings;
import org.elasticsearch.xpack.monitoring.collector.Collector;
import org.elasticsearch.xpack.monitoring.exporter.MonitoringDoc;
import org.elasticsearch.xpack.security.InternalClient;
import java.util.Collection;
import java.util.Collections;
/**
* Collector for indices statistics.
* <p>
* This collector runs on the master node only and collect one {@link IndicesStatsMonitoringDoc}
* document.
*/
public class IndicesStatsCollector extends Collector {
private final Client client;
public IndicesStatsCollector(Settings settings, ClusterService clusterService,
MonitoringSettings monitoringSettings,
XPackLicenseState licenseState, InternalClient client) {
super(settings, "indices-stats", clusterService, monitoringSettings, licenseState);
this.client = client;
}
@Override
protected boolean shouldCollect() {
return super.shouldCollect() && isLocalNodeMaster();
}
@Override
protected Collection<MonitoringDoc> doCollect() throws Exception {
IndicesStatsResponse indicesStats = client.admin().indices().prepareStats()
.setIndices(monitoringSettings.indices())
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.clear()
.setDocs(true)
.setIndexing(true)
.setSearch(true)
.setStore(true)
.get(monitoringSettings.indicesStatsTimeout());
IndicesStatsMonitoringDoc indicesStatsDoc = new IndicesStatsMonitoringDoc(monitoringId(),
monitoringVersion(), clusterUUID(), System.currentTimeMillis(), localNode(),
indicesStats);
return Collections.singletonList(indicesStatsDoc);
}
}

View File

@ -10,7 +10,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.xpack.monitoring.exporter.MonitoringDoc;
/**
* Monitoring document collected by {@link IndicesStatsCollector}
* Monitoring document collected by {@link IndexStatsCollector}
*/
public class IndicesStatsMonitoringDoc extends MonitoringDoc {

View File

@ -90,8 +90,7 @@ public class IntegrationAccount extends HipChatAccount {
response));
} catch (Exception e) {
logger.error("failed to execute hipchat api http request", e);
sentMessages.add(SentMessages.SentMessage.error(room, SentMessages.SentMessage.TargetType.ROOM, message,
ExceptionsHelper.detailedMessage(e)));
sentMessages.add(SentMessages.SentMessage.error(room, SentMessages.SentMessage.TargetType.ROOM, message, e));
}
return new SentMessages(name, sentMessages);
}

View File

@ -5,7 +5,10 @@
*/
package org.elasticsearch.xpack.notification.hipchat;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -20,6 +23,9 @@ import java.util.Locale;
public class SentMessages implements ToXContentObject, Iterable<SentMessages.SentMessage> {
private static final ParseField ACCOUNT = new ParseField("account");
private static final ParseField SENT_MESSAGES = new ParseField("sent_messages");
private String accountName;
private List<SentMessage> messages;
@ -48,8 +54,8 @@ public class SentMessages implements ToXContentObject, Iterable<SentMessages.Sen
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Field.ACCOUNT, accountName);
builder.startArray(Field.SENT_MESSAGES);
builder.field(ACCOUNT.getPreferredName(), accountName);
builder.startArray(SENT_MESSAGES.getPreferredName());
for (SentMessage message : messages) {
message.toXContent(builder, params);
}
@ -59,6 +65,11 @@ public class SentMessages implements ToXContentObject, Iterable<SentMessages.Sen
public static class SentMessage implements ToXContent {
private static final ParseField STATUS = new ParseField("status");
private static final ParseField REQUEST = new ParseField("request");
private static final ParseField RESPONSE = new ParseField("response");
private static final ParseField MESSAGE = new ParseField("message");
public enum TargetType {
ROOM, USER;
@ -70,30 +81,25 @@ public class SentMessages implements ToXContentObject, Iterable<SentMessages.Sen
final HipChatMessage message;
@Nullable final HttpRequest request;
@Nullable final HttpResponse response;
@Nullable final String failureReason;
@Nullable final Exception exception;
public static SentMessage responded(String targetName, TargetType targetType, HipChatMessage message, HttpRequest request,
HttpResponse response) {
String failureReason = resolveFailureReason(response);
return new SentMessage(targetName, targetType, message, request, response, failureReason);
return new SentMessage(targetName, targetType, message, request, response, null);
}
public static SentMessage error(String targetName, TargetType targetType, HipChatMessage message, String reason) {
return new SentMessage(targetName, targetType, message, null, null, reason);
public static SentMessage error(String targetName, TargetType targetType, HipChatMessage message, Exception e) {
return new SentMessage(targetName, targetType, message, null, null, e);
}
private SentMessage(String targetName, TargetType targetType, HipChatMessage message, HttpRequest request, HttpResponse response,
String failureReason) {
Exception exception) {
this.targetName = targetName;
this.targetType = targetType;
this.message = message;
this.request = request;
this.response = response;
this.failureReason = failureReason;
}
public boolean successful() {
return failureReason == null;
this.exception = exception;
}
public HttpRequest getRequest() {
@ -104,60 +110,36 @@ public class SentMessages implements ToXContentObject, Iterable<SentMessages.Sen
return response;
}
public String getFailureReason() {
return failureReason;
public Exception getException() {
return exception;
}
public boolean isSuccess() {
return response != null && response.status() >= 200 && response.status() < 300;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (failureReason != null) {
builder.field(Field.STATUS, "failure");
builder.field(Field.REASON, failureReason);
builder.field(STATUS.getPreferredName(), isSuccess() ? "success" : "failure");
if (isSuccess() == false) {
builder.field(STATUS.getPreferredName(), "failure");
if (request != null) {
builder.field(Field.REQUEST);
builder.field(REQUEST.getPreferredName());
request.toXContent(builder, params);
}
if (response != null) {
builder.field(Field.RESPONSE);
builder.field(RESPONSE.getPreferredName());
response.toXContent(builder, params);
}
} else {
builder.field(Field.STATUS, "success");
if (exception != null) {
ElasticsearchException.generateFailureXContent(builder, params, exception, true);
}
}
builder.field(targetType.fieldName, targetName);
builder.field(Field.MESSAGE);
builder.field(MESSAGE.getPreferredName());
message.toXContent(builder, params, false);
return builder.endObject();
}
private static String resolveFailureReason(HttpResponse response) {
int status = response.status();
if (status < 300) {
return null;
}
switch (status) {
case 400: return "Bad Request";
case 401: return "Unauthorized. The provided authentication token is invalid.";
case 403: return "Forbidden. The account doesn't have permission to send this message.";
case 404: // Not Found
case 405: // Method Not Allowed
case 406: return "The account used invalid HipChat APIs"; // Not Acceptable
case 503:
case 500: return "HipChat Server Error.";
default:
return "Unknown Error";
}
}
}
interface Field {
String ACCOUNT = new String("account");
String SENT_MESSAGES = new String("sent_messages");
String STATUS = new String("status");
String REASON = new String("reason");
String REQUEST = new String("request");
String RESPONSE = new String("response");
String MESSAGE = new String("message");
}
}

View File

@ -88,8 +88,7 @@ public class UserAccount extends HipChatAccount {
response));
} catch (IOException e) {
logger.error("failed to execute hipchat api http request", e);
sentMessages.add(SentMessages.SentMessage.error(room, SentMessages.SentMessage.TargetType.ROOM, message,
ExceptionsHelper.detailedMessage(e)));
sentMessages.add(SentMessages.SentMessage.error(room, SentMessages.SentMessage.TargetType.ROOM, message, e));
}
}
}
@ -102,8 +101,7 @@ public class UserAccount extends HipChatAccount {
response));
} catch (Exception e) {
logger.error("failed to execute hipchat api http request", e);
sentMessages.add(SentMessages.SentMessage.error(user, SentMessages.SentMessage.TargetType.USER, message,
ExceptionsHelper.detailedMessage(e)));
sentMessages.add(SentMessages.SentMessage.error(user, SentMessages.SentMessage.TargetType.USER, message, e));
}
}
}

View File

@ -84,8 +84,7 @@ public class V1Account extends HipChatAccount {
response));
} catch (Exception e) {
logger.error("failed to execute hipchat api http request", e);
sentMessages.add(SentMessages.SentMessage.error(room, SentMessages.SentMessage.TargetType.ROOM, message,
ExceptionsHelper.detailedMessage(e)));
sentMessages.add(SentMessages.SentMessage.error(room, SentMessages.SentMessage.TargetType.ROOM, message, e));
}
}
}

View File

@ -5,7 +5,10 @@
*/
package org.elasticsearch.xpack.notification.slack;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -20,6 +23,9 @@ import java.util.List;
public class SentMessages implements ToXContentObject, Iterable<SentMessages.SentMessage> {
private static final ParseField ACCOUNT = new ParseField("account");
private static final ParseField SENT_MESSAGES = new ParseField("sent_messages");
private String accountName;
private List<SentMessage> messages;
@ -48,8 +54,8 @@ public class SentMessages implements ToXContentObject, Iterable<SentMessages.Sen
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Field.ACCOUNT, accountName);
builder.startArray(Field.SENT_MESSAGES);
builder.field(ACCOUNT.getPreferredName(), accountName);
builder.startArray(SENT_MESSAGES.getPreferredName());
for (SentMessage message : messages) {
message.toXContent(builder, params);
}
@ -59,31 +65,32 @@ public class SentMessages implements ToXContentObject, Iterable<SentMessages.Sen
public static class SentMessage implements ToXContent {
private static final ParseField STATUS = new ParseField("status");
private static final ParseField REQUEST = new ParseField("request");
private static final ParseField RESPONSE = new ParseField("response");
private static final ParseField TO = new ParseField("to");
private static final ParseField MESSAGE = new ParseField("message");
final String to;
final SlackMessage message;
@Nullable final HttpRequest request;
@Nullable final HttpResponse response;
@Nullable final String failureReason;
@Nullable final Exception exception;
public static SentMessage responded(String to, SlackMessage message, HttpRequest request, HttpResponse response) {
String failureReason = resolveFailureReason(response);
return new SentMessage(to, message, request, response, failureReason);
return new SentMessage(to, message, request, response, null);
}
public static SentMessage error(String to, SlackMessage message, String reason) {
return new SentMessage(to, message, null, null, reason);
public static SentMessage error(String to, SlackMessage message, Exception e) {
return new SentMessage(to, message, null, null, e);
}
private SentMessage(String to, SlackMessage message, HttpRequest request, HttpResponse response, String failureReason) {
private SentMessage(String to, SlackMessage message, HttpRequest request, HttpResponse response, Exception exception) {
this.to = to;
this.message = message;
this.request = request;
this.response = response;
this.failureReason = failureReason;
}
public boolean successful() {
return failureReason == null;
this.exception = exception;
}
public HttpRequest getRequest() {
@ -94,54 +101,37 @@ public class SentMessages implements ToXContentObject, Iterable<SentMessages.Sen
return response;
}
public Exception getException() {
return exception;
}
public boolean isSuccess() {
return response != null && response.status() >= 200 && response.status() < 300;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (failureReason != null) {
builder.field(Field.STATUS, "failure");
builder.field(Field.REASON, failureReason);
builder.field(STATUS.getPreferredName(), isSuccess() ? "success" : "failure");
if (isSuccess() == false) {
if (request != null) {
builder.field(Field.REQUEST);
builder.field(REQUEST.getPreferredName());
request.toXContent(builder, params);
}
if (response != null) {
builder.field(Field.RESPONSE);
builder.field(RESPONSE.getPreferredName());
response.toXContent(builder, params);
}
} else {
builder.field(Field.STATUS, "success");
if (exception != null) {
ElasticsearchException.generateFailureXContent(builder, params, exception, true);
}
}
if (to != null) {
builder.field(Field.TO, to);
builder.field(TO.getPreferredName(), to);
}
builder.field(Field.MESSAGE);
builder.field(MESSAGE.getPreferredName());
message.toXContent(builder, params, false);
return builder.endObject();
}
private static String resolveFailureReason(HttpResponse response) {
int status = response.status();
if (status < 300) {
return null;
}
if (status > 399 && status < 500) {
return "Bad Request";
}
if (status > 499) {
return "Slack Server Error";
}
return "Unknown Error";
}
}
interface Field {
String ACCOUNT = new String("account");
String SENT_MESSAGES = new String("sent_messages");
String STATUS = new String("status");
String REASON = new String("reason");
String REQUEST = new String("request");
String RESPONSE = new String("response");
String MESSAGE = new String("message");
String TO = new String("to");
}
}

View File

@ -111,7 +111,7 @@ public class SlackAccount {
return SentMessages.SentMessage.responded(to, message, request, response);
} catch (Exception e) {
logger.error("failed to execute slack api http request", e);
return SentMessages.SentMessage.error(to, message, ExceptionsHelper.detailedMessage(e));
return SentMessages.SentMessage.error(to, message, e);
}
}

View File

@ -29,6 +29,7 @@ import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.indices.IndexTemplateMissingException;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
@ -290,13 +291,22 @@ public class Upgrade implements ActionPlugin {
private static ActionListener<DeleteIndexTemplateResponse> deleteIndexTemplateListener(String name, ActionListener<Boolean> listener,
Runnable runnable) {
return ActionListener.wrap(r -> {
return ActionListener.wrap(
r -> {
if (r.isAcknowledged()) {
runnable.run();
} else {
listener.onFailure(new ElasticsearchException("Deleting [{}] template was not acknowledged", name));
}
}, listener::onFailure);
},
// if the index template we tried to delete is gone already, no need to worry
e -> {
if (e instanceof IndexTemplateMissingException) {
runnable.run();
} else {
listener.onFailure(e);
}
});
}
private static void startWatcherIfNeeded(Boolean shouldStartWatcher, Client client, ActionListener<TransportResponse.Empty> listener) {

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.watcher.actions;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.common.xcontent.ToXContent;
@ -58,6 +59,8 @@ public interface Action extends ToXContentObject {
*/
public static class StoppedResult extends Result {
private static ParseField REASON = new ParseField("reason");
private final String reason;
protected StoppedResult(String type, Status status, String reason, Object... args) {
@ -71,7 +74,7 @@ public interface Action extends ToXContentObject {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.field(Field.REASON.getPreferredName(), reason);
return builder.field(REASON.getPreferredName(), reason);
}
}
@ -85,7 +88,26 @@ public interface Action extends ToXContentObject {
public Failure(String type, String reason, Object... args) {
super(type, Status.FAILURE, reason, args);
}
}
public static class FailureWithException extends Result {
private final Exception exception;
public FailureWithException(String type, Exception exception) {
super(type, Status.FAILURE);
this.exception = exception;
}
public Exception getException() {
return exception;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
ElasticsearchException.generateFailureXContent(builder, params, exception, true);
return builder;
}
}
/**
@ -127,8 +149,4 @@ public interface Action extends ToXContentObject {
A build();
}
interface Field {
ParseField REASON = new ParseField("reason");
}
}

View File

@ -8,7 +8,6 @@ package org.elasticsearch.xpack.watcher.actions;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.unit.TimeValue;
@ -138,9 +137,7 @@ public class ActionWrapper implements ToXContentObject {
action.logger().error(
(Supplier<?>) () -> new ParameterizedMessage(
"failed to execute action [{}/{}]. failed to transform payload.", ctx.watch().id(), id), e);
return new ActionWrapper.Result(id, conditionResult, null,
new Action.Result.Failure(action.type(), "Failed to transform payload. error: {}",
ExceptionsHelper.detailedMessage(e)));
return new ActionWrapper.Result(id, conditionResult, null, new Action.Result.FailureWithException(action.type(), e));
}
}
try {
@ -149,7 +146,7 @@ public class ActionWrapper implements ToXContentObject {
} catch (Exception e) {
action.logger().error(
(Supplier<?>) () -> new ParameterizedMessage("failed to execute action [{}/{}]", ctx.watch().id(), id), e);
return new ActionWrapper.Result(id, new Action.Result.Failure(action.type(), ExceptionsHelper.detailedMessage(e)));
return new ActionWrapper.Result(id, new Action.Result.FailureWithException(action.type(), e));
}
}
@ -160,19 +157,15 @@ public class ActionWrapper implements ToXContentObject {
ActionWrapper that = (ActionWrapper) o;
if (!id.equals(that.id)) return false;
if (condition != null ? !condition.equals(that.condition) : that.condition != null) return false;
if (transform != null ? !transform.equals(that.transform) : that.transform != null) return false;
return action.equals(that.action);
return Objects.equals(id, that.id) &&
Objects.equals(condition, that.condition) &&
Objects.equals(transform, that.transform) &&
Objects.equals(action, that.action);
}
@Override
public int hashCode() {
int result = id.hashCode();
result = 31 * result + (condition != null ? condition.hashCode() : 0);
result = 31 * result + (transform != null ? transform.hashCode() : 0);
result = 31 * result + action.hashCode();
return result;
return Objects.hash(id, condition, transform, action);
}
@Override
@ -189,7 +182,7 @@ public class ActionWrapper implements ToXContentObject {
.endObject();
}
if (transform != null) {
builder.startObject(Transform.Field.TRANSFORM.getPreferredName())
builder.startObject(Transform.TRANSFORM.getPreferredName())
.field(transform.type(), transform, params)
.endObject();
}
@ -215,7 +208,7 @@ public class ActionWrapper implements ToXContentObject {
} else {
if (Watch.Field.CONDITION.match(currentFieldName)) {
condition = actionRegistry.getConditionRegistry().parseExecutable(watchId, parser);
} else if (Transform.Field.TRANSFORM.match(currentFieldName)) {
} else if (Transform.TRANSFORM.match(currentFieldName)) {
transform = actionRegistry.getTransformRegistry().parse(watchId, parser);
} else if (Throttler.Field.THROTTLE_PERIOD.match(currentFieldName)) {
throttlePeriod = timeValueMillis(parser.longValue());
@ -309,7 +302,7 @@ public class ActionWrapper implements ToXContentObject {
builder.field(Watch.Field.CONDITION.getPreferredName(), condition, params);
}
if (transform != null) {
builder.field(Transform.Field.TRANSFORM.getPreferredName(), transform, params);
builder.field(Transform.TRANSFORM.getPreferredName(), transform, params);
}
action.toXContent(builder, params);
return builder.endObject();

View File

@ -282,7 +282,7 @@ public class EmailAction implements Action {
}
}
interface Field extends Action.Field {
interface Field {
// common fields
ParseField ACCOUNT = new ParseField("account");

View File

@ -57,7 +57,7 @@ public class ExecutableEmailAction extends ExecutableAction<EmailAction> {
Attachment attachment = parser.toAttachment(ctx, payload, emailAttachment);
attachments.put(attachment.id(), attachment);
} catch (ElasticsearchException | IOException e) {
return new EmailAction.Result.Failure(action.type(), e.getMessage());
return new EmailAction.Result.FailureWithException(action.type(), e);
}
}
}

View File

@ -137,7 +137,7 @@ public class HipChatAction implements Action {
boolean hasSuccesses = false;
boolean hasFailures = false;
for (SentMessages.SentMessage message : sentMessages) {
if (message.successful()) {
if (message.isSuccess()) {
hasSuccesses = true;
} else {
hasFailures = true;

View File

@ -287,7 +287,7 @@ public class IndexAction implements Action {
}
}
interface Field extends Action.Field {
interface Field {
ParseField INDEX = new ParseField("index");
ParseField DOC_TYPE = new ParseField("doc_type");
ParseField DOC_ID = new ParseField("doc_id");

View File

@ -186,7 +186,7 @@ public class LoggingAction implements Action {
}
}
interface Field extends Action.Field {
interface Field {
ParseField CATEGORY = new ParseField("category");
ParseField LEVEL = new ParseField("level");
ParseField TEXT = new ParseField("text");

View File

@ -136,7 +136,7 @@ public class SlackAction implements Action {
boolean hasSuccesses = false;
boolean hasFailures = false;
for (SentMessages.SentMessage message : sentMessages) {
if (message.successful()) {
if (message.isSuccess()) {
hasSuccesses = true;
} else {
hasFailures = true;

View File

@ -41,11 +41,10 @@ public class ExecutableWebhookAction extends ExecutableAction<WebhookAction> {
HttpResponse response = httpClient.execute(request);
int status = response.status();
if (status >= 400) {
logger.warn("received http status [{}] when connecting to watch action [{}/{}/{}]", status, ctx.watch().id(), type(), actionId);
if (response.status() >= 400) {
return new WebhookAction.Result.Failure(request, response);
}
} else {
return new WebhookAction.Result.Success(request, response);
}
}
}

View File

@ -9,10 +9,10 @@ import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.watcher.actions.Action;
import org.elasticsearch.xpack.common.http.HttpRequest;
import org.elasticsearch.xpack.common.http.HttpRequestTemplate;
import org.elasticsearch.xpack.common.http.HttpResponse;
import org.elasticsearch.xpack.watcher.actions.Action;
import java.io.IOException;
@ -170,7 +170,7 @@ public class WebhookAction implements Action {
}
}
interface Field extends Action.Field {
interface Field {
ParseField REQUEST = new ParseField("request");
ParseField RESPONSE = new ParseField("response");
}

View File

@ -195,7 +195,7 @@ public class WatchSourceBuilder extends ToXContentToBytes implements ToXContent
.endObject();
}
if (transform != null) {
builder.startObject(Transform.Field.TRANSFORM.getPreferredName())
builder.startObject(Transform.TRANSFORM.getPreferredName())
.field(transform.type(), transform, params)
.endObject();
}

View File

@ -200,8 +200,7 @@ public class ExecutionService extends AbstractComponent {
e -> {
Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof EsRejectedExecutionException) {
logger.debug("failed to store watch records due to overloaded threadpool [{}]",
ExceptionsHelper.detailedMessage(e));
logger.debug("failed to store watch records due to filled up watcher threadpool");
} else {
logger.warn("failed to store watch records", e);
}

View File

@ -82,7 +82,7 @@ public class WatchExecutionResult implements ToXContentObject {
builder.field(Field.CONDITION.getPreferredName(), conditionResult, params);
}
if (transformResult != null) {
builder.field(Transform.Field.TRANSFORM.getPreferredName(), transformResult, params);
builder.field(Transform.TRANSFORM.getPreferredName(), transformResult, params);
}
builder.startArray(Field.ACTIONS.getPreferredName());
for (ActionWrapper.Result result : actionsResults.values()) {

View File

@ -5,7 +5,8 @@
*/
package org.elasticsearch.xpack.watcher.input;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -20,27 +21,31 @@ public interface Input extends ToXContentObject {
abstract class Result implements ToXContentObject {
private static final ParseField STATUS = new ParseField("status");
private static final ParseField TYPE = new ParseField("type");
private static final ParseField PAYLOAD = new ParseField("payload");
public enum Status {
SUCCESS, FAILURE
}
protected final String type;
protected final Status status;
private final String reason;
private final Payload payload;
@Nullable private final Exception exception;
protected Result(String type, Payload payload) {
this.status = Status.SUCCESS;
this.type = type;
this.payload = payload;
this.reason = null;
this.exception = null;
}
protected Result(String type, Exception e) {
this.status = Status.FAILURE;
this.type = type;
this.reason = ExceptionsHelper.detailedMessage(e);
this.payload = Payload.EMPTY;
this.exception = e;
}
public String type() {
@ -55,24 +60,24 @@ public interface Input extends ToXContentObject {
return payload;
}
public String reason() {
public Exception getException() {
assert status == Status.FAILURE;
return reason;
return exception;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Field.TYPE.getPreferredName(), type);
builder.field(Field.STATUS.getPreferredName(), status.name().toLowerCase(Locale.ROOT));
builder.field(TYPE.getPreferredName(), type);
builder.field(STATUS.getPreferredName(), status.name().toLowerCase(Locale.ROOT));
switch (status) {
case SUCCESS:
assert payload != null;
builder.field(Field.PAYLOAD.getPreferredName(), payload, params);
builder.field(PAYLOAD.getPreferredName(), payload, params);
break;
case FAILURE:
assert reason != null;
builder.field(Field.REASON.getPreferredName(), reason);
assert exception != null;
ElasticsearchException.generateFailureXContent(builder, params, exception, true);
break;
default:
assert false;
@ -87,11 +92,4 @@ public interface Input extends ToXContentObject {
interface Builder<I extends Input> {
I build();
}
interface Field {
ParseField STATUS = new ParseField("status");
ParseField TYPE = new ParseField("type");
ParseField PAYLOAD = new ParseField("payload");
ParseField REASON = new ParseField("reason");
}
}

View File

@ -202,7 +202,7 @@ public class HttpInput implements Input {
}
}
interface Field extends Input.Field {
interface Field {
ParseField REQUEST = new ParseField("request");
ParseField EXTRACT = new ParseField("extract");
ParseField STATUS_CODE = new ParseField("status_code");

View File

@ -233,7 +233,7 @@ public class SearchInput implements Input {
}
}
public interface Field extends Input.Field {
public interface Field {
ParseField REQUEST = new ParseField("request");
ParseField EXTRACT = new ParseField("extract");
ParseField TIMEOUT = new ParseField("timeout_in_millis");

View File

@ -36,8 +36,9 @@ public class WatcherIndexTemplateRegistry extends AbstractComponent implements C
// version 2: added mappings for jira action
// version 3: include watch status in history
// version 6: upgrade to ES 6, removal of _status field
// version 7: add full exception stack traces for better debugging
// Note: if you change this, also inform the kibana team around the watcher-ui
public static final String INDEX_TEMPLATE_VERSION = "6";
public static final String INDEX_TEMPLATE_VERSION = "7";
public static final String HISTORY_TEMPLATE_NAME = ".watch-history-" + INDEX_TEMPLATE_VERSION;
public static final String TRIGGERED_TEMPLATE_NAME = ".triggered_watches";

View File

@ -5,7 +5,7 @@
*/
package org.elasticsearch.xpack.watcher.transform;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ToXContent;
@ -18,10 +18,17 @@ import java.util.Locale;
public interface Transform extends ToXContent {
ParseField TRANSFORM = new ParseField("transform");
String type();
abstract class Result implements ToXContentObject {
private static final ParseField TYPE = new ParseField("type");
private static final ParseField STATUS = new ParseField("status");
private static final ParseField PAYLOAD = new ParseField("payload");
private static final ParseField REASON = new ParseField("reason");
public enum Status {
SUCCESS, FAILURE
}
@ -30,23 +37,30 @@ public interface Transform extends ToXContent {
protected final Status status;
@Nullable protected final Payload payload;
@Nullable protected final String reason;
@Nullable protected final Exception exception;
public Result(String type, Payload payload) {
this.type = type;
this.status = Status.SUCCESS;
this.payload = payload;
this.reason = null;
this.exception = null;
}
public Result(String type, String reason) {
this.type = type;
this.status = Status.FAILURE;
this.reason = reason;
this.payload = null;
this.exception = null;
}
public Result(String type, Exception e) {
this(type, ExceptionsHelper.detailedMessage(e));
}
public Result(String type, String errorMessage) {
this.type = type;
this.status = Status.FAILURE;
this.reason = errorMessage;
this.reason = e.getMessage();
this.payload = null;
this.exception = e;
}
public String type() {
@ -70,16 +84,17 @@ public interface Transform extends ToXContent {
@Override
public final XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Field.TYPE.getPreferredName(), type);
builder.field(Field.STATUS.getPreferredName(), status.name().toLowerCase(Locale.ROOT));
builder.field(TYPE.getPreferredName(), type);
builder.field(STATUS.getPreferredName(), status.name().toLowerCase(Locale.ROOT));
switch (status) {
case SUCCESS:
assert reason == null;
builder.field(Field.PAYLOAD.getPreferredName(), payload, params);
assert exception == null;
builder.field(PAYLOAD.getPreferredName(), payload, params);
break;
case FAILURE:
assert payload == null;
builder.field(Field.REASON.getPreferredName(), reason);
builder.field(REASON.getPreferredName(), reason);
ElasticsearchException.generateFailureXContent(builder, params, exception, true);
break;
default:
assert false;
@ -96,14 +111,4 @@ public interface Transform extends ToXContent {
T build();
}
interface Field {
ParseField TRANSFORM = new ParseField("transform");
ParseField TYPE = new ParseField("type");
ParseField STATUS = new ParseField("status");
ParseField PAYLOAD = new ParseField("payload");
ParseField REASON = new ParseField("reason");
}
}

View File

@ -162,7 +162,7 @@ public class ChainTransform implements Transform {
}
}
interface Field extends Transform.Field {
interface Field {
ParseField RESULTS = new ParseField("results");
}
}

View File

@ -189,7 +189,7 @@ public class SearchTransform implements Transform {
}
}
public interface Field extends Transform.Field {
public interface Field {
ParseField REQUEST = new ParseField("request");
ParseField TIMEOUT = new ParseField("timeout_in_millis");
ParseField TIMEOUT_HUMAN = new ParseField("timeout");

View File

@ -138,7 +138,7 @@
},
"transform": {
"script": {
"source": "ctx.vars.email_recipient = (ctx.payload.kibana_settings.hits.total > 0) ? ctx.payload.kibana_settings.hits.hits[0]._source.kibana_settings.xpack.defaultAdminEmail : null;ctx.vars.is_new = ctx.vars.fails_check && !ctx.vars.not_resolved;ctx.vars.is_resolved = !ctx.vars.fails_check && ctx.vars.not_resolved;def state = ctx.payload.check.hits.hits[0]._source.cluster_state.status;if (ctx.vars.not_resolved){ctx.payload = ctx.payload.alert.hits.hits[0]._source;if (ctx.vars.fails_check == false) {ctx.payload.resolved_timestamp = ctx.execution_time;}} else {ctx.payload = ['timestamp': ctx.execution_time, 'metadata': ctx.metadata.xpack];}if (ctx.vars.fails_check) {ctx.payload.prefix = 'Elasticsearch cluster status is ' + state + '.';if (state == 'red') {ctx.payload.message = 'Allocate missing primary shards and replica shards.';ctx.payload.metadata.severity = 2100;} else {ctx.payload.message = 'Allocate missing replica shards.';ctx.payload.metadata.severity = 1100;}}ctx.vars.state = state.toUpperCase();ctx.payload.update_timestamp = ctx.execution_time;return ctx.payload;"
"source": "ctx.vars.email_recipient = (ctx.payload.kibana_settings.hits.total > 0) ? ctx.payload.kibana_settings.hits.hits[0]._source.kibana_settings.xpack.default_admin_email : null;ctx.vars.is_new = ctx.vars.fails_check && !ctx.vars.not_resolved;ctx.vars.is_resolved = !ctx.vars.fails_check && ctx.vars.not_resolved;def state = ctx.payload.check.hits.hits[0]._source.cluster_state.status;if (ctx.vars.not_resolved){ctx.payload = ctx.payload.alert.hits.hits[0]._source;if (ctx.vars.fails_check == false) {ctx.payload.resolved_timestamp = ctx.execution_time;}} else {ctx.payload = ['timestamp': ctx.execution_time, 'metadata': ctx.metadata.xpack];}if (ctx.vars.fails_check) {ctx.payload.prefix = 'Elasticsearch cluster status is ' + state + '.';if (state == 'red') {ctx.payload.message = 'Allocate missing primary shards and replica shards.';ctx.payload.metadata.severity = 2100;} else {ctx.payload.message = 'Allocate missing replica shards.';ctx.payload.metadata.severity = 1100;}}ctx.vars.state = state.toUpperCase();ctx.payload.update_timestamp = ctx.execution_time;return ctx.payload;"
}
},
"actions": {

View File

@ -134,7 +134,7 @@
},
"transform": {
"script": {
"source": "ctx.vars.email_recipient = (ctx.payload.kibana_settings.hits.total > 0) ? ctx.payload.kibana_settings.hits.hits[0]._source.kibana_settings.xpack.defaultAdminEmail : null;ctx.vars.is_new = ctx.vars.fails_check && !ctx.vars.not_resolved;ctx.vars.is_resolved = !ctx.vars.fails_check && ctx.vars.not_resolved;def versionMessage = null;if (ctx.vars.fails_check) {def versions = new ArrayList(ctx.payload.check.hits.hits[0]._source.cluster_stats.nodes.versions);Collections.sort(versions);versionMessage = 'Versions: [' + String.join(', ', versions) + '].';}if (ctx.vars.not_resolved) {ctx.payload = ctx.payload.alert.hits.hits[0]._source;if (ctx.vars.fails_check) {ctx.payload.message = versionMessage;} else {ctx.payload.resolved_timestamp = ctx.execution_time;}} else {ctx.payload = [ 'timestamp': ctx.execution_time, 'prefix': 'This cluster is running with multiple versions of Elasticsearch.', 'message': versionMessage, 'metadata': ctx.metadata.xpack ];}ctx.payload.update_timestamp = ctx.execution_time;return ctx.payload;"
"source": "ctx.vars.email_recipient = (ctx.payload.kibana_settings.hits.total > 0) ? ctx.payload.kibana_settings.hits.hits[0]._source.kibana_settings.xpack.default_admin_email : null;ctx.vars.is_new = ctx.vars.fails_check && !ctx.vars.not_resolved;ctx.vars.is_resolved = !ctx.vars.fails_check && ctx.vars.not_resolved;def versionMessage = null;if (ctx.vars.fails_check) {def versions = new ArrayList(ctx.payload.check.hits.hits[0]._source.cluster_stats.nodes.versions);Collections.sort(versions);versionMessage = 'Versions: [' + String.join(', ', versions) + '].';}if (ctx.vars.not_resolved) {ctx.payload = ctx.payload.alert.hits.hits[0]._source;if (ctx.vars.fails_check) {ctx.payload.message = versionMessage;} else {ctx.payload.resolved_timestamp = ctx.execution_time;}} else {ctx.payload = [ 'timestamp': ctx.execution_time, 'prefix': 'This cluster is running with multiple versions of Elasticsearch.', 'message': versionMessage, 'metadata': ctx.metadata.xpack ];}ctx.payload.update_timestamp = ctx.execution_time;return ctx.payload;"
}
},
"actions": {

View File

@ -161,7 +161,7 @@
},
"transform": {
"script": {
"source": "ctx.vars.email_recipient = (ctx.payload.kibana_settings.hits.total > 0) ? ctx.payload.kibana_settings.hits.hits[0]._source.kibana_settings.xpack.defaultAdminEmail : null;ctx.vars.is_new = ctx.vars.fails_check && !ctx.vars.not_resolved;ctx.vars.is_resolved = !ctx.vars.fails_check && ctx.vars.not_resolved;def versionMessage = null;if (ctx.vars.fails_check) {versionMessage = 'Versions: [' + String.join(', ', ctx.vars.versions) + '].';}if (ctx.vars.not_resolved) {ctx.payload = ctx.payload.alert.hits.hits[0]._source;if (ctx.vars.fails_check) {ctx.payload.message = versionMessage;} else {ctx.payload.resolved_timestamp = ctx.execution_time;}} else {ctx.payload = [ 'timestamp': ctx.execution_time, 'prefix': 'This cluster is running with multiple versions of Kibana.', 'message': versionMessage, 'metadata': ctx.metadata.xpack ];}ctx.payload.update_timestamp = ctx.execution_time;return ctx.payload;"
"source": "ctx.vars.email_recipient = (ctx.payload.kibana_settings.hits.total > 0) ? ctx.payload.kibana_settings.hits.hits[0]._source.kibana_settings.xpack.default_admin_email : null;ctx.vars.is_new = ctx.vars.fails_check && !ctx.vars.not_resolved;ctx.vars.is_resolved = !ctx.vars.fails_check && ctx.vars.not_resolved;def versionMessage = null;if (ctx.vars.fails_check) {versionMessage = 'Versions: [' + String.join(', ', ctx.vars.versions) + '].';}if (ctx.vars.not_resolved) {ctx.payload = ctx.payload.alert.hits.hits[0]._source;if (ctx.vars.fails_check) {ctx.payload.message = versionMessage;} else {ctx.payload.resolved_timestamp = ctx.execution_time;}} else {ctx.payload = [ 'timestamp': ctx.execution_time, 'prefix': 'This cluster is running with multiple versions of Kibana.', 'message': versionMessage, 'metadata': ctx.metadata.xpack ];}ctx.payload.update_timestamp = ctx.execution_time;return ctx.payload;"
}
},
"actions": {

View File

@ -161,7 +161,7 @@
},
"transform": {
"script": {
"source": "ctx.vars.email_recipient = (ctx.payload.kibana_settings.hits.total > 0) ? ctx.payload.kibana_settings.hits.hits[0]._source.kibana_settings.xpack.defaultAdminEmail : null;ctx.vars.is_new = ctx.vars.fails_check && !ctx.vars.not_resolved;ctx.vars.is_resolved = !ctx.vars.fails_check && ctx.vars.not_resolved;def versionMessage = null;if (ctx.vars.fails_check) {versionMessage = 'Versions: [' + String.join(', ', ctx.vars.versions) + '].';}if (ctx.vars.not_resolved) {ctx.payload = ctx.payload.alert.hits.hits[0]._source;if (ctx.vars.fails_check) {ctx.payload.message = versionMessage;} else {ctx.payload.resolved_timestamp = ctx.execution_time;}} else {ctx.payload = [ 'timestamp': ctx.execution_time, 'prefix': 'This cluster is running with multiple versions of Logstash.', 'message': versionMessage, 'metadata': ctx.metadata.xpack ];}ctx.payload.update_timestamp = ctx.execution_time;return ctx.payload;"
"source": "ctx.vars.email_recipient = (ctx.payload.kibana_settings.hits.total > 0) ? ctx.payload.kibana_settings.hits.hits[0]._source.kibana_settings.xpack.default_admin_email : null;ctx.vars.is_new = ctx.vars.fails_check && !ctx.vars.not_resolved;ctx.vars.is_resolved = !ctx.vars.fails_check && ctx.vars.not_resolved;def versionMessage = null;if (ctx.vars.fails_check) {versionMessage = 'Versions: [' + String.join(', ', ctx.vars.versions) + '].';}if (ctx.vars.not_resolved) {ctx.payload = ctx.payload.alert.hits.hits[0]._source;if (ctx.vars.fails_check) {ctx.payload.message = versionMessage;} else {ctx.payload.resolved_timestamp = ctx.execution_time;}} else {ctx.payload = [ 'timestamp': ctx.execution_time, 'prefix': 'This cluster is running with multiple versions of Logstash.', 'message': versionMessage, 'metadata': ctx.metadata.xpack ];}ctx.payload.update_timestamp = ctx.execution_time;return ctx.payload;"
}
},
"actions": {

View File

@ -127,7 +127,7 @@
},
"transform": {
"script": {
"source": "ctx.vars.email_recipient = (ctx.payload.kibana_settings.hits.total > 0) ? ctx.payload.kibana_settings.hits.hits[0]._source.kibana_settings.xpack.defaultAdminEmail : null;ctx.vars.is_new = ctx.vars.fails_check && !ctx.vars.not_resolved;ctx.vars.is_resolved = !ctx.vars.fails_check && ctx.vars.not_resolved;def alertMessage = null;if (ctx.vars.fails_check) {alertMessage = 'Update your license.';}if (ctx.vars.not_resolved) {ctx.payload = ctx.payload.alert.hits.hits[0]._source;ctx.payload.metadata = ctx.metadata.xpack;if (ctx.vars.fails_check == false) {ctx.payload.resolved_timestamp = ctx.execution_time;}} else {ctx.payload = ['timestamp': ctx.execution_time,'prefix': 'This cluster\\'s license is going to expire in {{#relativeTime}}metadata.time{{/relativeTime}} at {{#absoluteTime}}metadata.time{{/absoluteTime}}.','message': alertMessage,'metadata': ctx.metadata.xpack];}if (ctx.vars.fails_check) {ctx.payload.metadata.time = ctx.vars.expiry.toString();}ctx.payload.update_timestamp = ctx.execution_time;return ctx.payload;"
"source": "ctx.vars.email_recipient = (ctx.payload.kibana_settings.hits.total > 0) ? ctx.payload.kibana_settings.hits.hits[0]._source.kibana_settings.xpack.default_admin_email : null;ctx.vars.is_new = ctx.vars.fails_check && !ctx.vars.not_resolved;ctx.vars.is_resolved = !ctx.vars.fails_check && ctx.vars.not_resolved;def alertMessage = null;if (ctx.vars.fails_check) {alertMessage = 'Update your license.';}if (ctx.vars.not_resolved) {ctx.payload = ctx.payload.alert.hits.hits[0]._source;ctx.payload.metadata = ctx.metadata.xpack;if (ctx.vars.fails_check == false) {ctx.payload.resolved_timestamp = ctx.execution_time;}} else {ctx.payload = ['timestamp': ctx.execution_time,'prefix': 'This cluster\\'s license is going to expire in {{#relativeTime}}metadata.time{{/relativeTime}} at {{#absoluteTime}}metadata.time{{/absoluteTime}}.','message': alertMessage,'metadata': ctx.metadata.xpack];}if (ctx.vars.fails_check) {ctx.payload.metadata.time = ctx.vars.expiry.toString();}ctx.payload.update_timestamp = ctx.execution_time;return ctx.payload;"
}
},
"actions": {

View File

@ -30,6 +30,16 @@
}
}
},
{
"disabled_exception_fields": {
"path_match": "result\\.(input(\\..+)*|(transform(\\..+)*)|(actions\\.transform(\\..+)*)|actions)\\.error",
"match_pattern": "regex",
"mapping": {
"type": "object",
"enabled": false
}
}
},
{
"disabled_jira_custom_fields": {
"path_match": "result.actions.jira.fields.customfield_*",

View File

@ -215,6 +215,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode());
Map<String, String> expectedNodeAttr = new HashMap<>();
expectedNodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true");
expectedNodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "10");
assertEquals(expectedNodeAttr, node.getAttributes());
JobTaskStatus jobTaskStatus = (JobTaskStatus) task.getStatus();
assertNotNull(jobTaskStatus);
@ -402,6 +403,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode());
Map<String, String> expectedNodeAttr = new HashMap<>();
expectedNodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true");
expectedNodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "10");
assertEquals(expectedNodeAttr, node.getAttributes());
JobTaskStatus jobTaskStatus = (JobTaskStatus) task.getStatus();

View File

@ -470,15 +470,17 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
public void testEmptyGroup() {
Job.Builder builder = buildJobBuilder("foo");
builder.setGroups(Arrays.asList("foo-group", ""));
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> builder.setGroups(Arrays.asList("foo-group", "")));
() -> builder.build());
assertThat(e.getMessage(), containsString("Invalid group id ''"));
}
public void testInvalidGroup() {
Job.Builder builder = buildJobBuilder("foo");
builder.setGroups(Arrays.asList("foo-group", "$$$"));
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> builder.setGroups(Arrays.asList("foo-group", "$$$")));
() -> builder.build());
assertThat(e.getMessage(), containsString("Invalid group id '$$$'"));
}

View File

@ -26,6 +26,7 @@ import java.util.List;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -51,13 +52,13 @@ public class StateProcessorTests extends ESTestCase {
private static final int LARGE_DOC_SIZE = 1000000;
private Client client;
private ActionFuture<BulkResponse> bulkResponseFuture;
private StateProcessor stateProcessor;
@Before
public void initialize() throws IOException {
client = mock(Client.class);
bulkResponseFuture = mock(ActionFuture.class);
@SuppressWarnings("unchecked")
ActionFuture<BulkResponse> bulkResponseFuture = mock(ActionFuture.class);
stateProcessor = spy(new StateProcessor(Settings.EMPTY, client));
when(client.bulk(any(BulkRequest.class))).thenReturn(bulkResponseFuture);
}
@ -87,12 +88,12 @@ public class StateProcessorTests extends ESTestCase {
stateProcessor.process("_id", stream);
verify(stateProcessor, times(6)).persist(eq("_id"), any());
verify(stateProcessor, never()).persist(eq("_id"), any());
Mockito.verifyNoMoreInteractions(client);
}
public void testStateReadGivenConsecutiveSpacesFollowedByZeroByte() throws IOException {
String zeroBytes = " \0";
String zeroBytes = " \n\0";
ByteArrayInputStream stream = new ByteArrayInputStream(zeroBytes.getBytes(StandardCharsets.UTF_8));
stateProcessor.process("_id", stream);

View File

@ -27,7 +27,6 @@ import static org.hamcrest.Matchers.equalTo;
public class MonitoringSettingsIntegTests extends MonitoringIntegTestCase {
private final TimeValue interval = newRandomTimeValue();
private final TimeValue indexStatsTimeout = newRandomTimeValue();
private final TimeValue indicesStatsTimeout = newRandomTimeValue();
private final String[] indices = randomStringArray();
private final TimeValue clusterStateTimeout = newRandomTimeValue();
private final TimeValue clusterStatsTimeout = newRandomTimeValue();
@ -55,7 +54,6 @@ public class MonitoringSettingsIntegTests extends MonitoringIntegTestCase {
return Settings.builder()
.put(MonitoringSettings.INTERVAL.getKey(), interval)
.put(MonitoringSettings.INDEX_STATS_TIMEOUT.getKey(), indexStatsTimeout)
.put(MonitoringSettings.INDICES_STATS_TIMEOUT.getKey(), indicesStatsTimeout)
.putArray(MonitoringSettings.INDICES.getKey(), indices)
.put(MonitoringSettings.CLUSTER_STATE_TIMEOUT.getKey(), clusterStateTimeout)
.put(MonitoringSettings.CLUSTER_STATS_TIMEOUT.getKey(), clusterStatsTimeout)
@ -68,7 +66,6 @@ public class MonitoringSettingsIntegTests extends MonitoringIntegTestCase {
public void testMonitoringSettings() throws Exception {
for (final MonitoringSettings monitoringSettings : internalCluster().getInstances(MonitoringSettings.class)) {
assertThat(monitoringSettings.indexStatsTimeout().millis(), equalTo(indexStatsTimeout.millis()));
assertThat(monitoringSettings.indicesStatsTimeout().millis(), equalTo(indicesStatsTimeout.millis()));
assertArrayEquals(monitoringSettings.indices(), indices);
assertThat(monitoringSettings.clusterStateTimeout().millis(), equalTo(clusterStateTimeout.millis()));
assertThat(monitoringSettings.clusterStatsTimeout().millis(), equalTo(clusterStatsTimeout.millis()));
@ -89,7 +86,6 @@ public class MonitoringSettingsIntegTests extends MonitoringIntegTestCase {
MonitoringSettings.INTERVAL,
MonitoringSettings.INDEX_RECOVERY_TIMEOUT,
MonitoringSettings.INDEX_STATS_TIMEOUT,
MonitoringSettings.INDICES_STATS_TIMEOUT,
MonitoringSettings.INDEX_RECOVERY_ACTIVE_ONLY,
MonitoringSettings.CLUSTER_STATE_TIMEOUT,
MonitoringSettings.CLUSTER_STATS_TIMEOUT,
@ -129,8 +125,6 @@ public class MonitoringSettingsIntegTests extends MonitoringIntegTestCase {
for (final MonitoringSettings monitoringSettings1 : internalCluster().getInstances(MonitoringSettings.class)) {
if (setting == MonitoringSettings.INDEX_STATS_TIMEOUT) {
assertEquals(monitoringSettings1.indexStatsTimeout(), setting.get(updatedSettings));
} else if (setting == MonitoringSettings.INDICES_STATS_TIMEOUT) {
assertEquals(monitoringSettings1.indicesStatsTimeout(), setting.get(updatedSettings));
} else if (setting == MonitoringSettings.CLUSTER_STATS_TIMEOUT) {
assertEquals(monitoringSettings1.clusterStatsTimeout(), setting.get(updatedSettings));
} else if (setting == MonitoringSettings.JOB_STATS_TIMEOUT) {

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.monitoring.collector.indices;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
@ -19,8 +20,12 @@ import org.elasticsearch.xpack.monitoring.exporter.MonitoringDoc;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@ -40,19 +45,19 @@ public class IndexStatsCollectorTests extends AbstractCollectorTestCase {
public void testEmptyCluster() throws Exception {
final String node = internalCluster().startNode();
waitForNoBlocksOnNode(node);
assertThat(newIndexStatsCollector(node).doCollect(), hasSize(0));
assertThat(newIndexStatsCollector(node).doCollect(), hasSize(1));
}
public void testEmptyClusterAllIndices() throws Exception {
final String node = internalCluster().startNode(Settings.builder().put(MonitoringSettings.INDICES.getKey(), MetaData.ALL));
waitForNoBlocksOnNode(node);
assertThat(newIndexStatsCollector(node).doCollect(), hasSize(0));
assertThat(newIndexStatsCollector(node).doCollect(), hasSize(1));
}
public void testEmptyClusterMissingIndex() throws Exception {
final String node = internalCluster().startNode(Settings.builder().put(MonitoringSettings.INDICES.getKey(), "unknown"));
waitForNoBlocksOnNode(node);
assertThat(newIndexStatsCollector(node).doCollect(), hasSize(0));
assertThat(newIndexStatsCollector(node).doCollect(), hasSize(1));
}
public void testIndexStatsCollectorOneIndex() throws Exception {
@ -68,19 +73,40 @@ public class IndexStatsCollectorTests extends AbstractCollectorTestCase {
client().prepareIndex(indexName, "test").setSource("num", i).get();
}
flush();
refresh();
assertHitCount(client().prepareSearch().setSize(0).get(), nbDocs);
Collection<MonitoringDoc> results = newIndexStatsCollector().doCollect();
assertThat(results, hasSize(1));
assertThat(results, hasSize(2));
MonitoringDoc monitoringDoc = results.iterator().next();
assertNotNull(monitoringDoc);
assertThat(monitoringDoc, instanceOf(IndexStatsMonitoringDoc.class));
// indices stats
final Optional<IndicesStatsMonitoringDoc> indicesStatsDoc =
results.stream().filter(doc -> doc instanceof IndicesStatsMonitoringDoc).map(doc -> (IndicesStatsMonitoringDoc)doc).findFirst();
IndexStatsMonitoringDoc indexStatsMonitoringDoc = (IndexStatsMonitoringDoc) monitoringDoc;
assertThat(indicesStatsDoc.isPresent(), is(true));
IndicesStatsMonitoringDoc indicesStatsMonitoringDoc = indicesStatsDoc.get();
assertThat(indicesStatsMonitoringDoc.getClusterUUID(), equalTo(client().admin().cluster().
prepareState().setMetaData(true).get().getState().metaData().clusterUUID()));
assertThat(indicesStatsMonitoringDoc.getTimestamp(), greaterThan(0L));
assertThat(indicesStatsMonitoringDoc.getSourceNode(), notNullValue());
IndicesStatsResponse indicesStats = indicesStatsMonitoringDoc.getIndicesStats();
assertNotNull(indicesStats);
assertThat(indicesStats.getIndices().keySet(), hasSize(1));
assertThat(indicesStats.getIndex(indexName).getShards(), arrayWithSize(getNumShards(indexName).totalNumShards));
// index stats
final Optional<IndexStatsMonitoringDoc> indexStatsDoc =
results.stream()
.filter(doc -> doc instanceof IndexStatsMonitoringDoc)
.map(doc -> (IndexStatsMonitoringDoc)doc)
.findFirst();
assertThat(indexStatsDoc.isPresent(), is(true));
IndexStatsMonitoringDoc indexStatsMonitoringDoc = indexStatsDoc.get();
assertThat(indexStatsMonitoringDoc.getMonitoringId(), equalTo(MonitoredSystem.ES.getSystem()));
assertThat(indexStatsMonitoringDoc.getMonitoringVersion(), equalTo(Version.CURRENT.toString()));
assertThat(indexStatsMonitoringDoc.getClusterUUID(),
@ -118,7 +144,6 @@ public class IndexStatsCollectorTests extends AbstractCollectorTestCase {
}
}
flush();
refresh();
for (int i = 0; i < nbIndices; i++) {
@ -128,18 +153,45 @@ public class IndexStatsCollectorTests extends AbstractCollectorTestCase {
String clusterUUID = client().admin().cluster().prepareState().setMetaData(true).get().getState().metaData().clusterUUID();
Collection<MonitoringDoc> results = newIndexStatsCollector().doCollect();
assertThat(results, hasSize(nbIndices));
// extra document is for the IndicesStatsMonitoringDoc
assertThat(results, hasSize(nbIndices + 1));
// indices stats
final Optional<IndicesStatsMonitoringDoc> indicesStatsDoc =
results.stream()
.filter(doc -> doc instanceof IndicesStatsMonitoringDoc)
.map(doc -> (IndicesStatsMonitoringDoc)doc)
.findFirst();
assertThat(indicesStatsDoc.isPresent(), is(true));
IndicesStatsMonitoringDoc indicesStatsMonitoringDoc = indicesStatsDoc.get();
assertThat(indicesStatsMonitoringDoc.getMonitoringId(), equalTo(MonitoredSystem.ES.getSystem()));
assertThat(indicesStatsMonitoringDoc.getMonitoringVersion(), equalTo(Version.CURRENT.toString()));
assertThat(indicesStatsMonitoringDoc.getClusterUUID(),
equalTo(client().admin().cluster().prepareState().setMetaData(true).get().getState().metaData().clusterUUID()));
assertThat(indicesStatsMonitoringDoc.getTimestamp(), greaterThan(0L));
IndicesStatsResponse indicesStats = indicesStatsMonitoringDoc.getIndicesStats();
assertNotNull(indicesStats);
assertThat(indicesStats.getIndices().keySet(), hasSize(nbIndices));
// index stats
final List<IndexStatsMonitoringDoc> indexStatsDocs =
results.stream()
.filter(doc -> doc instanceof IndexStatsMonitoringDoc)
.map(doc -> (IndexStatsMonitoringDoc)doc)
.collect(Collectors.toList());
assertThat(indexStatsDocs, hasSize(nbIndices));
for (int i = 0; i < nbIndices; i++) {
String indexName = indexPrefix + i;
boolean found = false;
Iterator<MonitoringDoc> it = results.iterator();
Iterator<IndexStatsMonitoringDoc> it = indexStatsDocs.iterator();
while (!found && it.hasNext()) {
MonitoringDoc monitoringDoc = it.next();
assertThat(monitoringDoc, instanceOf(IndexStatsMonitoringDoc.class));
IndexStatsMonitoringDoc indexStatsMonitoringDoc = (IndexStatsMonitoringDoc) monitoringDoc;
IndexStatsMonitoringDoc indexStatsMonitoringDoc = it.next();
IndexStats indexStats = indexStatsMonitoringDoc.getIndexStats();
assertNotNull(indexStats);

View File

@ -1,156 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring.collector.indices;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.xpack.monitoring.MonitoredSystem;
import org.elasticsearch.xpack.monitoring.MonitoringSettings;
import org.elasticsearch.xpack.monitoring.collector.AbstractCollectorTestCase;
import org.elasticsearch.xpack.monitoring.exporter.MonitoringDoc;
import org.hamcrest.Matchers;
import java.util.Collection;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue;
@ClusterScope(numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0)
public class IndicesStatsCollectorTests extends AbstractCollectorTestCase {
@Override
protected int numberOfReplicas() {
return 0;
}
public void testEmptyCluster() throws Exception {
final String node = internalCluster().startNode();
waitForNoBlocksOnNode(node);
assertThat(newIndicesStatsCollector(node).doCollect(), hasSize(1));
}
public void testEmptyClusterAllIndices() throws Exception {
final String node = internalCluster().startNode(Settings.builder().put(MonitoringSettings.INDICES.getKey(), MetaData.ALL));
waitForNoBlocksOnNode(node);
assertThat(newIndicesStatsCollector(node).doCollect(), hasSize(1));
}
public void testEmptyClusterMissingIndex() throws Exception {
final String node = internalCluster().startNode(Settings.builder().put(MonitoringSettings.INDICES.getKey(), "unknown"));
waitForNoBlocksOnNode(node);
assertThat(newIndicesStatsCollector(node).doCollect(), hasSize(1));
}
public void testIndicesStatsCollectorOneIndex() throws Exception {
final String node = internalCluster().startNode();
waitForNoBlocksOnNode(node);
final String indexName = "one-index";
createIndex(indexName);
ensureGreen(indexName);
final int nbDocs = randomIntBetween(1, 20);
for (int i = 0; i < nbDocs; i++) {
client().prepareIndex(indexName, "test").setSource("num", i).get();
}
flush();
refresh();
assertHitCount(client().prepareSearch().setSize(0).get(), nbDocs);
Collection<MonitoringDoc> results = newIndicesStatsCollector().doCollect();
assertThat(results, hasSize(1));
MonitoringDoc monitoringDoc = results.iterator().next();
assertThat(monitoringDoc, instanceOf(IndicesStatsMonitoringDoc.class));
IndicesStatsMonitoringDoc indicesStatsMonitoringDoc = (IndicesStatsMonitoringDoc) monitoringDoc;
assertThat(indicesStatsMonitoringDoc.getClusterUUID(), equalTo(client().admin().cluster().
prepareState().setMetaData(true).get().getState().metaData().clusterUUID()));
assertThat(indicesStatsMonitoringDoc.getTimestamp(), greaterThan(0L));
assertThat(indicesStatsMonitoringDoc.getSourceNode(), notNullValue());
IndicesStatsResponse indicesStats = indicesStatsMonitoringDoc.getIndicesStats();
assertNotNull(indicesStats);
assertThat(indicesStats.getIndices().keySet(), hasSize(1));
IndexStats indexStats = indicesStats.getIndex(indexName);
assertThat(indexStats.getShards(), Matchers.arrayWithSize(getNumShards(indexName).totalNumShards));
}
public void testIndicesStatsCollectorMultipleIndices() throws Exception {
final String node = internalCluster().startNode();
waitForNoBlocksOnNode(node);
final String indexPrefix = "multi-indices-";
final int nbIndices = randomIntBetween(1, 5);
int[] docsPerIndex = new int[nbIndices];
for (int i = 0; i < nbIndices; i++) {
String index = indexPrefix + i;
createIndex(index);
ensureGreen(index);
docsPerIndex[i] = randomIntBetween(1, 20);
for (int j = 0; j < docsPerIndex[i]; j++) {
client().prepareIndex(index, "test").setSource("num", i).get();
}
}
flush();
refresh();
for (int i = 0; i < nbIndices; i++) {
assertHitCount(client().prepareSearch(indexPrefix + i).setSize(0).get(), docsPerIndex[i]);
}
Collection<MonitoringDoc> results = newIndicesStatsCollector().doCollect();
assertThat(results, hasSize(1));
MonitoringDoc monitoringDoc = results.iterator().next();
assertThat(monitoringDoc, instanceOf(IndicesStatsMonitoringDoc.class));
IndicesStatsMonitoringDoc indicesStatsMonitoringDoc = (IndicesStatsMonitoringDoc) monitoringDoc;
assertThat(indicesStatsMonitoringDoc.getMonitoringId(), equalTo(MonitoredSystem.ES.getSystem()));
assertThat(indicesStatsMonitoringDoc.getMonitoringVersion(), equalTo(Version.CURRENT.toString()));
assertThat(indicesStatsMonitoringDoc.getClusterUUID(),
equalTo(client().admin().cluster().prepareState().setMetaData(true).get().getState().metaData().clusterUUID()));
assertThat(indicesStatsMonitoringDoc.getTimestamp(), greaterThan(0L));
IndicesStatsResponse indicesStats = indicesStatsMonitoringDoc.getIndicesStats();
assertNotNull(indicesStats);
assertThat(indicesStats.getIndices().keySet(), hasSize(nbIndices));
}
private IndicesStatsCollector newIndicesStatsCollector() {
// This collector runs on master node only
return newIndicesStatsCollector(internalCluster().getMasterName());
}
private IndicesStatsCollector newIndicesStatsCollector(String nodeId) {
if (!Strings.hasText(nodeId)) {
nodeId = randomFrom(internalCluster().getNodeNames());
}
return new IndicesStatsCollector(internalCluster().getInstance(Settings.class, nodeId),
internalCluster().getInstance(ClusterService.class, nodeId),
internalCluster().getInstance(MonitoringSettings.class, nodeId),
internalCluster().getInstance(XPackLicenseState.class, nodeId),
securedClient(nodeId));
}
}

View File

@ -553,9 +553,9 @@ public class EmailActionTests extends ESTestCase {
.buildMock();
Action.Result result = executableEmailAction.execute("test", ctx, new Payload.Simple());
assertThat(result, instanceOf(EmailAction.Result.Failure.class));
EmailAction.Result.Failure failure = (EmailAction.Result.Failure) result;
assertThat(failure.reason(),
assertThat(result, instanceOf(EmailAction.Result.FailureWithException.class));
EmailAction.Result.FailureWithException failure = (EmailAction.Result.FailureWithException) result;
assertThat(failure.getException().getMessage(),
is("Watch[watch1] attachment[second] HTTP error status host[localhost], port[80], method[GET], path[/second], " +
"status[403]"));
}

View File

@ -110,9 +110,10 @@ public class SlackActionTests extends ESTestCase {
for (int i = 0; i < count; i++) {
HttpResponse response = mock(HttpResponse.class);
HttpRequest request = mock(HttpRequest.class);
switch (randomIntBetween(0, 2)) {
int randomInt = randomIntBetween(0, 2);
switch (randomInt) {
case 0:
messages.add(SentMessages.SentMessage.error(randomAlphaOfLength(10), message, "unknown error"));
messages.add(SentMessages.SentMessage.error(randomAlphaOfLength(10), message, new Exception("unknown error")));
hasError = true;
break;
case 1:

View File

@ -45,6 +45,7 @@ import org.elasticsearch.xpack.watcher.watch.WatchStatus;
import org.joda.time.DateTime;
import org.junit.Before;
import java.io.IOException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
@ -229,7 +230,7 @@ public class ExecutionServiceTests extends ESTestCase {
input = mock(ExecutableInput.class);
Input.Result inputResult = mock(Input.Result.class);
when(inputResult.status()).thenReturn(Input.Result.Status.FAILURE);
when(inputResult.reason()).thenReturn("_reason");
when(inputResult.getException()).thenReturn(new IOException());
when(input.execute(eq(context), any(Payload.class))).thenReturn(inputResult);
Condition.Result conditionResult = AlwaysCondition.RESULT_INSTANCE;

View File

@ -5,7 +5,12 @@
*/
package org.elasticsearch.xpack.watcher.history;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.test.http.MockResponse;
@ -15,20 +20,31 @@ import org.elasticsearch.xpack.common.http.HttpMethod;
import org.elasticsearch.xpack.common.http.HttpRequestTemplate;
import org.elasticsearch.xpack.watcher.condition.AlwaysCondition;
import org.elasticsearch.xpack.watcher.execution.ExecutionState;
import org.elasticsearch.xpack.watcher.support.xcontent.ObjectPath;
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
import org.elasticsearch.xpack.watcher.transport.actions.put.PutWatchResponse;
import org.junit.After;
import org.junit.Before;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.webhookAction;
import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder;
import static org.elasticsearch.xpack.watcher.input.InputBuilders.httpInput;
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
/**
@ -111,4 +127,64 @@ public class HistoryTemplateHttpMappingsTests extends AbstractWatcherIntegration
assertThat(webServer.requests().get(0).getUri().getPath(), is("/input/path"));
assertThat(webServer.requests().get(1).getUri().getPath(), is("/webhook/path"));
}
public void testExceptionMapping() {
// delete all history indices to ensure that we start with a fresh mapping
assertAcked(client().admin().indices().prepareDelete(HistoryStore.INDEX_PREFIX + "*"));
String id = randomAlphaOfLength(10);
// switch between delaying the input or the action http request
boolean abortAtInput = randomBoolean();
if (abortAtInput) {
webServer.enqueue(new MockResponse().setBeforeReplyDelay(TimeValue.timeValueSeconds(5)));
} else {
webServer.enqueue(new MockResponse().setBody("{}"));
webServer.enqueue(new MockResponse().setBeforeReplyDelay(TimeValue.timeValueSeconds(5)));
}
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch(id).setSource(watchBuilder()
.trigger(schedule(interval("5s")))
.input(httpInput(HttpRequestTemplate.builder("localhost", webServer.getPort())
.path("/")
.readTimeout(TimeValue.timeValueMillis(10))))
.condition(AlwaysCondition.INSTANCE)
.addAction("_webhook", webhookAction(HttpRequestTemplate.builder("localhost", webServer.getPort())
.readTimeout(TimeValue.timeValueMillis(10))
.path("/webhook/path")
.method(HttpMethod.POST)
.body("_body"))))
.get();
assertThat(putWatchResponse.isCreated(), is(true));
watcherClient().prepareExecuteWatch(id).setRecordExecution(true).get();
// ensure watcher history index has been written with this id
flushAndRefresh(HistoryStore.INDEX_PREFIX + "*");
SearchResponse searchResponse = client().prepareSearch(HistoryStore.INDEX_PREFIX + "*")
.setQuery(QueryBuilders.termQuery("watch_id", id))
.get();
assertHitCount(searchResponse, 1L);
// ensure that enabled is set to false
List<Boolean> indexed = new ArrayList<>();
GetMappingsResponse mappingsResponse = client().admin().indices().prepareGetMappings(HistoryStore.INDEX_PREFIX + "*").get();
Iterator<ImmutableOpenMap<String, MappingMetaData>> iterator = mappingsResponse.getMappings().valuesIt();
while (iterator.hasNext()) {
ImmutableOpenMap<String, MappingMetaData> mapping = iterator.next();
assertThat(mapping.containsKey("doc"), is(true));
Map<String, Object> docMapping = mapping.get("doc").getSourceAsMap();
if (abortAtInput) {
Boolean enabled = ObjectPath.eval("properties.result.properties.input.properties.error.enabled", docMapping);
indexed.add(enabled);
} else {
Boolean enabled = ObjectPath.eval("properties.result.properties.actions.properties.error.enabled", docMapping);
indexed.add(enabled);
}
}
assertThat(indexed, hasSize(greaterThanOrEqualTo(1)));
logger.info("GOT [{}]", indexed);
assertThat(indexed, hasItem(false));
assertThat(indexed, not(hasItem(true)));
}
}

View File

@ -55,6 +55,7 @@ import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.startsWith;
import static org.joda.time.DateTimeZone.UTC;
public class ChainInputTests extends ESTestCase {
@ -165,7 +166,8 @@ public class ChainInputTests extends ESTestCase {
XContentBuilder builder = jsonBuilder();
chainedResult.toXContent(builder, ToXContent.EMPTY_PARAMS);
assertThat(builder.bytes().utf8ToString(), containsString("\"reason\":\"ElasticsearchException[foo]\""));
assertThat(builder.bytes().utf8ToString(), containsString("\"type\":\"exception\""));
assertThat(builder.bytes().utf8ToString(), containsString("\"reason\":\"foo\""));
}
/* https://github.com/elastic/x-plugins/issues/3736

View File

@ -10,7 +10,10 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.ESTestCase;
@ -33,6 +36,7 @@ import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.watcher.input.InputBuilders;
import org.elasticsearch.xpack.watcher.input.simple.ExecutableSimpleInput;
import org.elasticsearch.xpack.watcher.input.simple.SimpleInput;
import org.elasticsearch.xpack.watcher.support.xcontent.ObjectPath;
import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent;
@ -42,6 +46,7 @@ import org.elasticsearch.xpack.watcher.watch.WatchStatus;
import org.joda.time.DateTime;
import org.junit.Before;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
@ -60,7 +65,9 @@ import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith;
import static org.joda.time.DateTimeZone.UTC;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
@ -325,6 +332,32 @@ public class HttpInputTests extends ESTestCase {
assertThat(data.get(1).get("foo"), is("second"));
}
public void testExceptionCase() throws Exception {
when(httpClient.execute(any(HttpRequest.class))).thenThrow(new IOException("could not connect"));
HttpRequestTemplate.Builder request = HttpRequestTemplate.builder("localhost", 8080);
HttpInput httpInput = InputBuilders.httpInput(request.build()).build();
ExecutableHttpInput input = new ExecutableHttpInput(httpInput, logger, httpClient, templateEngine);
WatchExecutionContext ctx = createWatchExecutionContext();
HttpInput.Result result = input.execute(ctx, new Payload.Simple());
assertThat(result.getException(), is(notNullValue()));
assertThat(result.getException(), is(instanceOf(IOException.class)));
assertThat(result.getException().getMessage(), is("could not connect"));
try (XContentBuilder builder = jsonBuilder()) {
result.toXContent(builder, ToXContent.EMPTY_PARAMS);
BytesReference bytes = builder.bytes();
try (XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(NamedXContentRegistry.EMPTY, bytes)) {
Map<String, Object> data = parser.map();
String reason = ObjectPath.eval("error.reason", data);
assertThat(reason, is("could not connect"));
String type = ObjectPath.eval("error.type", data);
assertThat(type, is("i_o_exception"));
}
}
}
private WatchExecutionContext createWatchExecutionContext() {
Watch watch = new Watch("test-watch",

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.watcher.test.integration;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.junit.annotations.Network;
@ -201,7 +202,10 @@ public class HipChatServiceTests extends AbstractWatcherIntegrationTestCase {
for (SentMessages.SentMessage message : messages) {
logger.info("Request: [{}]", message.getRequest());
logger.info("Response: [{}]", message.getResponse());
assertThat("Expected no failures, but got [" + message.getFailureReason() + "]", message.successful(), is(true));
if (message.getException() != null) {
logger.info("Exception stacktrace: [{}]", ExceptionsHelper.stackTrace(message.getException()));
}
assertThat(message.isSuccess(), is(true));
assertThat(message.getRequest(), notNullValue());
assertThat(message.getResponse(), notNullValue());
assertThat(message.getResponse().status(), lessThan(300));

View File

@ -38,6 +38,7 @@ import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interva
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
@Network
public class SlackServiceTests extends AbstractWatcherIntegrationTestCase {
@ -78,7 +79,7 @@ public class SlackServiceTests extends AbstractWatcherIntegrationTestCase {
assertThat(messages.count(), is(2));
for (SentMessages.SentMessage sentMessage : messages) {
try {
assertThat(sentMessage.successful(), is(true));
assertThat(sentMessage.getException(), is(nullValue()));
assertThat(sentMessage.getRequest(), notNullValue());
assertThat(sentMessage.getResponse(), notNullValue());
assertThat(sentMessage.getResponse().status(), lessThan(300));

View File

@ -192,4 +192,5 @@ setup:
- match: { watch_record.trigger_event.type: "manual" }
- match: { watch_record.state: "executed" }
- match: { watch_record.result.transform.status: "failure" }
- match: { watch_record.result.transform.reason: "ParsingException[no [query] registered for [does_not_exist]]" }
- match: { watch_record.result.transform.reason: "no [query] registered for [does_not_exist]" }
- is_true: watch_record.result.transform.error