[Monitoring] Remove Specific ClusterStateCollector/Resolver and Tests (elastic/x-pack-elasticsearch#1664)

This removes the Cluster State collector and resolver and moves the collection of the cluster state (and cluster health, which is already included in cluster stats).

This makes the tests a little more stable and removes an extra network hop during monitoring data collection.

Original commit: elastic/x-pack-elasticsearch@44851d2dd6
This commit is contained in:
Chris Earle 2017-06-08 15:55:23 -04:00 committed by GitHub
parent 3f5ae2d54f
commit b31c8e2661
16 changed files with 85 additions and 490 deletions

View File

@ -28,7 +28,6 @@ import org.elasticsearch.xpack.monitoring.action.MonitoringBulkAction;
import org.elasticsearch.xpack.monitoring.action.TransportMonitoringBulkAction; import org.elasticsearch.xpack.monitoring.action.TransportMonitoringBulkAction;
import org.elasticsearch.xpack.monitoring.cleaner.CleanerService; import org.elasticsearch.xpack.monitoring.cleaner.CleanerService;
import org.elasticsearch.xpack.monitoring.collector.Collector; import org.elasticsearch.xpack.monitoring.collector.Collector;
import org.elasticsearch.xpack.monitoring.collector.cluster.ClusterStateCollector;
import org.elasticsearch.xpack.monitoring.collector.cluster.ClusterStatsCollector; import org.elasticsearch.xpack.monitoring.collector.cluster.ClusterStatsCollector;
import org.elasticsearch.xpack.monitoring.collector.indices.IndexRecoveryCollector; import org.elasticsearch.xpack.monitoring.collector.indices.IndexRecoveryCollector;
import org.elasticsearch.xpack.monitoring.collector.indices.IndexStatsCollector; import org.elasticsearch.xpack.monitoring.collector.indices.IndexStatsCollector;
@ -121,7 +120,6 @@ public class Monitoring implements ActionPlugin {
collectors.add(new IndicesStatsCollector(settings, clusterService, monitoringSettings, licenseState, client)); collectors.add(new IndicesStatsCollector(settings, clusterService, monitoringSettings, licenseState, client));
collectors.add(new IndexStatsCollector(settings, clusterService, monitoringSettings, licenseState, client)); collectors.add(new IndexStatsCollector(settings, clusterService, monitoringSettings, licenseState, client));
collectors.add(new ClusterStatsCollector(settings, clusterService, monitoringSettings, licenseState, client, licenseService)); collectors.add(new ClusterStatsCollector(settings, clusterService, monitoringSettings, licenseState, client, licenseService));
collectors.add(new ClusterStateCollector(settings, clusterService, monitoringSettings, licenseState, client));
collectors.add(new ShardsCollector(settings, clusterService, monitoringSettings, licenseState)); collectors.add(new ShardsCollector(settings, clusterService, monitoringSettings, licenseState));
collectors.add(new NodeStatsCollector(settings, clusterService, monitoringSettings, licenseState, client)); collectors.add(new NodeStatsCollector(settings, clusterService, monitoringSettings, licenseState, client));
collectors.add(new IndexRecoveryCollector(settings, clusterService, monitoringSettings, licenseState, client)); collectors.add(new IndexRecoveryCollector(settings, clusterService, monitoringSettings, licenseState, client));

View File

@ -1,57 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring.collector.cluster;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.xpack.monitoring.MonitoringSettings;
import org.elasticsearch.xpack.monitoring.collector.Collector;
import org.elasticsearch.xpack.monitoring.exporter.MonitoringDoc;
import org.elasticsearch.xpack.security.InternalClient;
import java.util.Collection;
import java.util.Collections;
/**
* Collector for cluster state.
* <p>
* This collector runs on the master node only and collects {@link ClusterStateMonitoringDoc}
* document at a given frequency.
*/
public class ClusterStateCollector extends Collector {
private final Client client;
public ClusterStateCollector(Settings settings, ClusterService clusterService,
MonitoringSettings monitoringSettings,
XPackLicenseState licenseState, InternalClient client) {
super(settings, "cluster-state", clusterService, monitoringSettings, licenseState);
this.client = client;
}
@Override
protected boolean shouldCollect() {
return super.shouldCollect() && isLocalNodeMaster();
}
@Override
protected Collection<MonitoringDoc> doCollect() throws Exception {
ClusterState clusterState = clusterService.state();
String clusterUUID = clusterState.metaData().clusterUUID();
long timestamp = System.currentTimeMillis();
ClusterHealthResponse clusterHealth = client.admin().cluster().prepareHealth().get(monitoringSettings.clusterStateTimeout());
// Adds a cluster_state document with associated status
return Collections.singleton(
new ClusterStateMonitoringDoc(monitoringId(), monitoringVersion(), clusterUUID,
timestamp, localNode(), clusterState, clusterHealth.getStatus()));
}
}

View File

@ -1,39 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring.collector.cluster;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.xpack.monitoring.exporter.MonitoringDoc;
/**
* Monitoring document collected by {@link ClusterStateCollector} that contains the
* current cluster state.
*/
public class ClusterStateMonitoringDoc extends MonitoringDoc {
public static final String TYPE = "cluster_state";
private final ClusterState clusterState;
private final ClusterHealthStatus status;
public ClusterStateMonitoringDoc(String monitoringId, String monitoringVersion,
String clusterUUID, long timestamp, DiscoveryNode node,
ClusterState clusterState, ClusterHealthStatus status) {
super(monitoringId, monitoringVersion, TYPE, null, clusterUUID, timestamp, node);
this.clusterState = clusterState;
this.status = status;
}
public ClusterState getClusterState() {
return clusterState;
}
public ClusterHealthStatus getStatus() {
return status;
}
}

View File

@ -11,6 +11,7 @@ import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsResponse; import org.elasticsearch.action.admin.cluster.stats.ClusterStatsResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
@ -75,6 +76,7 @@ public class ClusterStatsCollector extends Collector {
final DiscoveryNode sourceNode = localNode(); final DiscoveryNode sourceNode = localNode();
final String clusterName = clusterService.getClusterName().value(); final String clusterName = clusterService.getClusterName().value();
final String version = Version.CURRENT.toString(); final String version = Version.CURRENT.toString();
final ClusterState clusterState = clusterService.state();
final License license = licenseService.getLicense(); final License license = licenseService.getLicense();
final List<XPackFeatureSet.Usage> usage = collect(usageSupplier); final List<XPackFeatureSet.Usage> usage = collect(usageSupplier);
@ -82,7 +84,7 @@ public class ClusterStatsCollector extends Collector {
return Collections.singleton( return Collections.singleton(
new ClusterStatsMonitoringDoc(monitoringId(), monitoringVersion(), new ClusterStatsMonitoringDoc(monitoringId(), monitoringVersion(),
clusterUUID, timestamp, sourceNode, clusterName, version, license, usage, clusterUUID, timestamp, sourceNode, clusterName, version, license, usage,
clusterStats)); clusterStats, clusterState, clusterStats.getStatus()));
} }
@Nullable @Nullable

View File

@ -6,6 +6,8 @@
package org.elasticsearch.xpack.monitoring.collector.cluster; package org.elasticsearch.xpack.monitoring.collector.cluster;
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsResponse; import org.elasticsearch.action.admin.cluster.stats.ClusterStatsResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.license.License; import org.elasticsearch.license.License;
import org.elasticsearch.xpack.XPackFeatureSet; import org.elasticsearch.xpack.XPackFeatureSet;
@ -31,18 +33,23 @@ public class ClusterStatsMonitoringDoc extends MonitoringDoc {
private final License license; private final License license;
private final List<XPackFeatureSet.Usage> usage; private final List<XPackFeatureSet.Usage> usage;
private final ClusterStatsResponse clusterStats; private final ClusterStatsResponse clusterStats;
private final ClusterState clusterState;
private final ClusterHealthStatus status;
public ClusterStatsMonitoringDoc(String monitoringId, String monitoringVersion, public ClusterStatsMonitoringDoc(String monitoringId, String monitoringVersion,
String clusterUUID, long timestamp, DiscoveryNode node, String clusterUUID, long timestamp, DiscoveryNode node,
String clusterName, String version, License license, String clusterName, String version, License license,
List<XPackFeatureSet.Usage> usage, List<XPackFeatureSet.Usage> usage,
ClusterStatsResponse clusterStats) { ClusterStatsResponse clusterStats,
ClusterState clusterState, ClusterHealthStatus status) {
super(monitoringId, monitoringVersion, TYPE, null, clusterUUID, timestamp, node); super(monitoringId, monitoringVersion, TYPE, null, clusterUUID, timestamp, node);
this.clusterName = clusterName; this.clusterName = clusterName;
this.version = version; this.version = version;
this.license = license; this.license = license;
this.usage = usage; this.usage = usage;
this.clusterStats = clusterStats; this.clusterStats = clusterStats;
this.clusterState = clusterState;
this.status = status;
} }
public String getClusterName() { public String getClusterName() {
@ -64,4 +71,13 @@ public class ClusterStatsMonitoringDoc extends MonitoringDoc {
public ClusterStatsResponse getClusterStats() { public ClusterStatsResponse getClusterStats() {
return clusterStats; return clusterStats;
} }
public ClusterState getClusterState() {
return clusterState;
}
public ClusterHealthStatus getStatus() {
return status;
}
} }

View File

@ -10,7 +10,6 @@ import org.elasticsearch.xpack.monitoring.MonitoredSystem;
import org.elasticsearch.xpack.monitoring.action.MonitoringBulkDoc; import org.elasticsearch.xpack.monitoring.action.MonitoringBulkDoc;
import org.elasticsearch.xpack.monitoring.action.MonitoringIndex; import org.elasticsearch.xpack.monitoring.action.MonitoringIndex;
import org.elasticsearch.xpack.monitoring.collector.cluster.ClusterStatsMonitoringDoc; import org.elasticsearch.xpack.monitoring.collector.cluster.ClusterStatsMonitoringDoc;
import org.elasticsearch.xpack.monitoring.collector.cluster.ClusterStateMonitoringDoc;
import org.elasticsearch.xpack.monitoring.collector.indices.IndexRecoveryMonitoringDoc; import org.elasticsearch.xpack.monitoring.collector.indices.IndexRecoveryMonitoringDoc;
import org.elasticsearch.xpack.monitoring.collector.indices.IndexStatsMonitoringDoc; import org.elasticsearch.xpack.monitoring.collector.indices.IndexStatsMonitoringDoc;
import org.elasticsearch.xpack.monitoring.collector.indices.IndicesStatsMonitoringDoc; import org.elasticsearch.xpack.monitoring.collector.indices.IndicesStatsMonitoringDoc;
@ -21,7 +20,6 @@ import org.elasticsearch.xpack.monitoring.exporter.MonitoringDoc;
import org.elasticsearch.xpack.monitoring.exporter.MonitoringTemplateUtils; import org.elasticsearch.xpack.monitoring.exporter.MonitoringTemplateUtils;
import org.elasticsearch.xpack.monitoring.resolver.bulk.MonitoringBulkTimestampedResolver; import org.elasticsearch.xpack.monitoring.resolver.bulk.MonitoringBulkTimestampedResolver;
import org.elasticsearch.xpack.monitoring.resolver.cluster.ClusterStatsResolver; import org.elasticsearch.xpack.monitoring.resolver.cluster.ClusterStatsResolver;
import org.elasticsearch.xpack.monitoring.resolver.cluster.ClusterStateResolver;
import org.elasticsearch.xpack.monitoring.resolver.indices.IndexRecoveryResolver; import org.elasticsearch.xpack.monitoring.resolver.indices.IndexRecoveryResolver;
import org.elasticsearch.xpack.monitoring.resolver.indices.IndexStatsResolver; import org.elasticsearch.xpack.monitoring.resolver.indices.IndexStatsResolver;
import org.elasticsearch.xpack.monitoring.resolver.indices.IndicesStatsResolver; import org.elasticsearch.xpack.monitoring.resolver.indices.IndicesStatsResolver;
@ -61,7 +59,6 @@ public class ResolversRegistry implements Iterable<MonitoringIndexNameResolver>
* Registers resolvers for elasticsearch documents collected by the monitoring plugin * Registers resolvers for elasticsearch documents collected by the monitoring plugin
*/ */
private void registerBuiltIn(MonitoredSystem id, Settings settings) { private void registerBuiltIn(MonitoredSystem id, Settings settings) {
registrations.add(resolveByClass(ClusterStateMonitoringDoc.class, new ClusterStateResolver(id, settings)));
registrations.add(resolveByClass(ClusterStatsMonitoringDoc.class, new ClusterStatsResolver(id, settings))); registrations.add(resolveByClass(ClusterStatsMonitoringDoc.class, new ClusterStatsResolver(id, settings)));
registrations.add(resolveByClass(IndexRecoveryMonitoringDoc.class, new IndexRecoveryResolver(id, settings))); registrations.add(resolveByClass(IndexRecoveryMonitoringDoc.class, new IndexRecoveryResolver(id, settings)));
registrations.add(resolveByClass(IndexStatsMonitoringDoc.class, new IndexStatsResolver(id, settings))); registrations.add(resolveByClass(IndexStatsMonitoringDoc.class, new IndexStatsResolver(id, settings)));

View File

@ -1,65 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring.resolver.cluster;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.monitoring.MonitoredSystem;
import org.elasticsearch.xpack.monitoring.collector.cluster.ClusterStateMonitoringDoc;
import org.elasticsearch.xpack.monitoring.resolver.MonitoringIndexNameResolver;
import java.io.IOException;
import java.util.Collections;
import java.util.Locale;
import java.util.Set;
public class ClusterStateResolver extends MonitoringIndexNameResolver.Timestamped<ClusterStateMonitoringDoc> {
public static final String TYPE = "cluster_state";
static final Set<String> FILTERS;
static {
Set<String> filters = Sets.newHashSet(
"cluster_uuid",
"timestamp",
"type",
"source_node",
"cluster_state.version",
"cluster_state.master_node",
"cluster_state.state_uuid",
"cluster_state.status",
"cluster_state.nodes");
FILTERS = Collections.unmodifiableSet(filters);
}
public ClusterStateResolver(MonitoredSystem id, Settings settings) {
super(id, settings);
}
@Override
public Set<String> filters() {
return FILTERS;
}
@Override
protected void buildXContent(ClusterStateMonitoringDoc document, XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject(Fields.CLUSTER_STATE);
ClusterState clusterState = document.getClusterState();
if (clusterState != null) {
builder.field(Fields.STATUS, document.getStatus().name().toLowerCase(Locale.ROOT));
clusterState.toXContent(builder, params);
}
builder.endObject();
}
static final class Fields {
static final String CLUSTER_STATE = TYPE;
static final String STATUS = "status";
}
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.monitoring.resolver.cluster; package org.elasticsearch.xpack.monitoring.resolver.cluster;
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsResponse; import org.elasticsearch.action.admin.cluster.stats.ClusterStatsResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.hash.MessageDigests; import org.elasticsearch.common.hash.MessageDigests;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -19,11 +20,20 @@ import org.elasticsearch.xpack.monitoring.resolver.MonitoringIndexNameResolver;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Locale;
import java.util.Map; import java.util.Map;
public class ClusterStatsResolver extends MonitoringIndexNameResolver.Timestamped<ClusterStatsMonitoringDoc> { public class ClusterStatsResolver extends MonitoringIndexNameResolver.Timestamped<ClusterStatsMonitoringDoc> {
private static final ToXContent.MapParams CLUSTER_STATS_PARAMS =
new ToXContent.MapParams(
Collections.singletonMap("metric",
ClusterState.Metric.VERSION + "," +
ClusterState.Metric.MASTER_NODE + "," +
ClusterState.Metric.NODES));
public ClusterStatsResolver(MonitoredSystem system, Settings settings) { public ClusterStatsResolver(MonitoredSystem system, Settings settings) {
super(system, settings); super(system, settings);
} }
@ -52,6 +62,14 @@ public class ClusterStatsResolver extends MonitoringIndexNameResolver.Timestampe
builder.endObject(); builder.endObject();
} }
final ClusterState clusterState = document.getClusterState();
if (clusterState != null) {
builder.startObject("cluster_state");
builder.field("status", document.getStatus().name().toLowerCase(Locale.ROOT));
clusterState.toXContent(builder, CLUSTER_STATS_PARAMS);
builder.endObject();
}
final List<XPackFeatureSet.Usage> usages = document.getUsage(); final List<XPackFeatureSet.Usage> usages = document.getUsage();
if (usages != null) { if (usages != null) {
// in the future we may choose to add other usages under the stack_stats section, but it is only xpack for now // in the future we may choose to add other usages under the stack_stats section, but it is only xpack for now

View File

@ -23,7 +23,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.test.SecuritySettingsSource; import org.elasticsearch.test.SecuritySettingsSource;
import org.elasticsearch.xpack.XPackSettings; import org.elasticsearch.xpack.XPackSettings;
import org.elasticsearch.xpack.monitoring.collector.cluster.ClusterStateMonitoringDoc; import org.elasticsearch.xpack.monitoring.collector.cluster.ClusterStatsMonitoringDoc;
import org.elasticsearch.xpack.monitoring.collector.indices.IndexStatsMonitoringDoc; import org.elasticsearch.xpack.monitoring.collector.indices.IndexStatsMonitoringDoc;
import org.elasticsearch.xpack.monitoring.collector.indices.IndicesStatsMonitoringDoc; import org.elasticsearch.xpack.monitoring.collector.indices.IndicesStatsMonitoringDoc;
import org.elasticsearch.xpack.monitoring.collector.node.NodeStatsMonitoringDoc; import org.elasticsearch.xpack.monitoring.collector.node.NodeStatsMonitoringDoc;
@ -131,14 +131,10 @@ public class OldMonitoringIndicesBackwardsCompatibilityTests extends AbstractOld
search(indexPattern, IndexStatsMonitoringDoc.TYPE, greaterThanOrEqualTo(10L)); search(indexPattern, IndexStatsMonitoringDoc.TYPE, greaterThanOrEqualTo(10L));
// All the other aliases should have been created by now so we can assert that we have the data we saved in the bwc indexes // All the other aliases should have been created by now so we can assert that we have the data we saved in the bwc indexes
SearchResponse firstShards = SearchResponse firstShards = search(indexPattern, ShardMonitoringDoc.TYPE, greaterThanOrEqualTo(10L));
search(indexPattern, ShardMonitoringDoc.TYPE, greaterThanOrEqualTo(10L)); SearchResponse firstIndices = search(indexPattern, IndicesStatsMonitoringDoc.TYPE, greaterThanOrEqualTo(3L));
SearchResponse firstIndices = SearchResponse firstNode = search(indexPattern, NodeStatsMonitoringDoc.TYPE, greaterThanOrEqualTo(3L));
search(indexPattern, IndicesStatsMonitoringDoc.TYPE, greaterThanOrEqualTo(3L)); SearchResponse firstState = search(indexPattern, ClusterStatsMonitoringDoc.TYPE, greaterThanOrEqualTo(3L));
SearchResponse firstNode =
search(indexPattern, NodeStatsMonitoringDoc.TYPE, greaterThanOrEqualTo(3L));
SearchResponse firstState =
search(indexPattern, ClusterStateMonitoringDoc.TYPE, greaterThanOrEqualTo(3L));
ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().clear().setNodes(true).get(); ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().clear().setNodes(true).get();
final String masterNodeId = clusterStateResponse.getState().getNodes().getMasterNodeId(); final String masterNodeId = clusterStateResponse.getState().getNodes().getMasterNodeId();
@ -164,7 +160,7 @@ public class OldMonitoringIndicesBackwardsCompatibilityTests extends AbstractOld
greaterThan(firstIndices.getHits().getTotalHits())), 1, TimeUnit.MINUTES); greaterThan(firstIndices.getHits().getTotalHits())), 1, TimeUnit.MINUTES);
assertBusy(() -> search(indexPattern, NodeStatsMonitoringDoc.TYPE, assertBusy(() -> search(indexPattern, NodeStatsMonitoringDoc.TYPE,
greaterThan(firstNode.getHits().getTotalHits())), 1, TimeUnit.MINUTES); greaterThan(firstNode.getHits().getTotalHits())), 1, TimeUnit.MINUTES);
assertBusy(() -> search(indexPattern, ClusterStateMonitoringDoc.TYPE, assertBusy(() -> search(indexPattern, ClusterStatsMonitoringDoc.TYPE,
greaterThan(firstState.getHits().getTotalHits())), 1, TimeUnit.MINUTES); greaterThan(firstState.getHits().getTotalHits())), 1, TimeUnit.MINUTES);
} finally { } finally {

View File

@ -1,144 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring.collector.cluster;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.xpack.monitoring.MonitoredSystem;
import org.elasticsearch.xpack.monitoring.MonitoringSettings;
import org.elasticsearch.xpack.monitoring.collector.AbstractCollectorTestCase;
import org.elasticsearch.xpack.monitoring.exporter.MonitoringDoc;
import java.util.Collection;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue;
public class ClusterStateCollectorTests extends AbstractCollectorTestCase {
public void testClusterStateCollectorNoIndices() throws Exception {
// waits for pending tasks before collecing cluster state.
// prevents the collector to read an older cluster state than the one used in assert later
ensureGreen();
assertMonitoringDocs(newClusterStateCollector().doCollect(), 0);
}
public void testClusterStateCollectorOneIndex() throws Exception {
int nbShards = randomIntBetween(1, 5);
assertAcked(prepareCreate("test").setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, nbShards)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build()));
int nbDocs = randomIntBetween(1, 20);
for (int i = 0; i < nbDocs; i++) {
client().prepareIndex("test", "test").setSource("num", i).get();
}
flush();
refresh();
assertHitCount(client().prepareSearch().setSize(0).get(), nbDocs);
assertMonitoringDocs(newClusterStateCollector().doCollect(), nbShards);
}
public void testClusterStateCollectorMultipleIndices() throws Exception {
int nbIndices = randomIntBetween(1, 5);
int[] docsPerIndex = new int[nbIndices];
int[] shardsPerIndex = new int[nbIndices];
int nbShards = 0;
for (int i = 0; i < nbIndices; i++) {
shardsPerIndex[i] = randomIntBetween(1, 5);
nbShards += shardsPerIndex[i];
assertAcked(prepareCreate("test-" + i).setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, shardsPerIndex[i])
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build()));
docsPerIndex[i] = randomIntBetween(1, 20);
for (int j = 0; j < docsPerIndex[i]; j++) {
client().prepareIndex("test-" + i, "test").setSource("num", i).get();
}
}
refresh();
for (int i = 0; i < nbIndices; i++) {
assertHitCount(client().prepareSearch("test-" + i).setSize(0).get(), docsPerIndex[i]);
}
Collection<MonitoringDoc> results = newClusterStateCollector().doCollect();
assertMonitoringDocs(results, nbShards);
MonitoringDoc monitoringDoc = results.iterator().next();
assertNotNull(monitoringDoc);
assertThat(monitoringDoc, instanceOf(ClusterStateMonitoringDoc.class));
ClusterStateMonitoringDoc clusterStateMonitoringDoc = (ClusterStateMonitoringDoc) monitoringDoc;
assertThat(clusterStateMonitoringDoc.getMonitoringId(), equalTo(MonitoredSystem.ES.getSystem()));
assertThat(clusterStateMonitoringDoc.getMonitoringVersion(), equalTo(Version.CURRENT.toString()));
assertThat(clusterStateMonitoringDoc.getClusterUUID(),
equalTo(client().admin().cluster().prepareState().setMetaData(true).get().getState().metaData().clusterUUID()));
assertThat(clusterStateMonitoringDoc.getTimestamp(), greaterThan(0L));
assertThat(clusterStateMonitoringDoc.getSourceNode(), notNullValue());
assertNotNull(clusterStateMonitoringDoc.getClusterState());
ClusterState clusterState = clusterStateMonitoringDoc.getClusterState();
for (int i = 0; i < nbIndices; i++) {
assertThat(clusterState.getRoutingTable().allShards("test-" + i), hasSize(shardsPerIndex[i]));
}
}
private ClusterStateCollector newClusterStateCollector() {
// This collector runs on master node only
return newClusterStateCollector(internalCluster().getMasterName());
}
private ClusterStateCollector newClusterStateCollector(String nodeId) {
assertNotNull(nodeId);
return new ClusterStateCollector(internalCluster().getInstance(Settings.class, nodeId),
internalCluster().getInstance(ClusterService.class, nodeId),
internalCluster().getInstance(MonitoringSettings.class, nodeId),
internalCluster().getInstance(XPackLicenseState.class, nodeId),
securedClient(nodeId));
}
private void assertMonitoringDocs(Collection<MonitoringDoc> results, final int nbShards) {
assertThat("expecting 1 document for cluster state", results, hasSize(1));
final ClusterState clusterState = securedClient().admin().cluster().prepareState().get().getState();
final String clusterUUID = clusterState.getMetaData().clusterUUID();
for (MonitoringDoc doc : results) {
assertThat(doc.getMonitoringId(), equalTo(MonitoredSystem.ES.getSystem()));
assertThat(doc.getMonitoringVersion(), equalTo(Version.CURRENT.toString()));
assertThat(doc.getClusterUUID(), equalTo(clusterUUID));
assertThat(doc.getTimestamp(), greaterThan(0L));
assertThat(doc.getSourceNode(), notNullValue());
assertThat(doc, instanceOf(ClusterStateMonitoringDoc.class));
if (doc instanceof ClusterStateMonitoringDoc) {
ClusterStateMonitoringDoc clusterStateMonitoringDoc = (ClusterStateMonitoringDoc) doc;
assertThat(clusterStateMonitoringDoc.getClusterState().getRoutingTable().allShards(), hasSize(nbShards));
assertThat(clusterStateMonitoringDoc.getClusterState().getNodes().getSize(), equalTo(internalCluster().size()));
} else {
fail("unknown monitoring document type " + doc);
}
}
}
}

View File

@ -16,7 +16,6 @@ import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests; import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
@ -37,7 +36,6 @@ import org.elasticsearch.test.http.MockResponse;
import org.elasticsearch.test.http.MockWebServer; import org.elasticsearch.test.http.MockWebServer;
import org.elasticsearch.xpack.monitoring.MonitoredSystem; import org.elasticsearch.xpack.monitoring.MonitoredSystem;
import org.elasticsearch.xpack.monitoring.MonitoringSettings; import org.elasticsearch.xpack.monitoring.MonitoringSettings;
import org.elasticsearch.xpack.monitoring.collector.cluster.ClusterStateMonitoringDoc;
import org.elasticsearch.xpack.monitoring.collector.indices.IndexRecoveryMonitoringDoc; import org.elasticsearch.xpack.monitoring.collector.indices.IndexRecoveryMonitoringDoc;
import org.elasticsearch.xpack.monitoring.exporter.ClusterAlertsUtil; import org.elasticsearch.xpack.monitoring.exporter.ClusterAlertsUtil;
import org.elasticsearch.xpack.monitoring.exporter.Exporter; import org.elasticsearch.xpack.monitoring.exporter.Exporter;
@ -659,13 +657,7 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
long timestamp = System.currentTimeMillis(); long timestamp = System.currentTimeMillis();
DiscoveryNode sourceNode = new DiscoveryNode("id", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); DiscoveryNode sourceNode = new DiscoveryNode("id", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
if (randomBoolean()) { return new IndexRecoveryMonitoringDoc(monitoringId, monitoringVersion, clusterUUID, timestamp, sourceNode, new RecoveryResponse());
return new IndexRecoveryMonitoringDoc(monitoringId, monitoringVersion, clusterUUID,
timestamp, sourceNode, new RecoveryResponse());
} else {
return new ClusterStateMonitoringDoc(monitoringId, monitoringVersion,clusterUUID,
timestamp, sourceNode, ClusterState.EMPTY_STATE, ClusterHealthStatus.GREEN);
}
} }
private List<MonitoringDoc> newRandomMonitoringDocs(int nb) { private List<MonitoringDoc> newRandomMonitoringDocs(int nb) {

View File

@ -148,11 +148,6 @@ public class LocalExporterIntegTests extends LocalExporterIntegTestCase {
assertThat(client().admin().indices().prepareExists(".monitoring-*").get().isExists(), is(true)); assertThat(client().admin().indices().prepareExists(".monitoring-*").get().isExists(), is(true));
ensureYellow(".monitoring-*"); ensureYellow(".monitoring-*");
assertThat(client().prepareSearch(".monitoring-es-*")
.setSize(0)
.setQuery(QueryBuilders.termQuery("type", "cluster_state"))
.get().getHits().getTotalHits(), greaterThan(0L));
assertThat(client().prepareSearch(".monitoring-es-*") assertThat(client().prepareSearch(".monitoring-es-*")
.setSize(0) .setSize(0)
.setQuery(QueryBuilders.termQuery("type", "cluster_stats")) .setQuery(QueryBuilders.termQuery("type", "cluster_stats"))

View File

@ -1,58 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring.resolver.cluster;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.monitoring.collector.cluster.ClusterStateMonitoringDoc;
import org.elasticsearch.xpack.monitoring.exporter.MonitoringTemplateUtils;
import org.elasticsearch.xpack.monitoring.resolver.MonitoringIndexNameResolverTestCase;
import java.io.IOException;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.hamcrest.Matchers.equalTo;
public class ClusterStateResolverTests extends MonitoringIndexNameResolverTestCase<ClusterStateMonitoringDoc, ClusterStateResolver> {
@Override
protected ClusterStateMonitoringDoc newMonitoringDoc() {
DiscoveryNode masterNode = new DiscoveryNode("master", buildNewFakeTransportAddress(),
emptyMap(), emptySet(), Version.CURRENT);
DiscoveryNode otherNode = new DiscoveryNode("other", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(masterNode).add(otherNode).masterNodeId(masterNode.getId()).build();
ClusterState clusterState = ClusterState.builder(new ClusterName("test")).nodes(discoveryNodes).build();
ClusterStateMonitoringDoc doc = new ClusterStateMonitoringDoc(randomMonitoringId(),
randomAlphaOfLength(2), randomAlphaOfLength(5), 1437580442979L,
new DiscoveryNode("id", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
clusterState, randomFrom(ClusterHealthStatus.values()));
return doc;
}
public void testClusterStateResolver() throws IOException {
ClusterStateMonitoringDoc doc = newMonitoringDoc();
ClusterStateResolver resolver = newResolver();
assertThat(resolver.index(doc), equalTo(".monitoring-es-" + MonitoringTemplateUtils.TEMPLATE_VERSION + "-2015.07.22"));
assertSource(resolver.source(doc, XContentType.JSON),
Sets.newHashSet(
"cluster_uuid",
"timestamp",
"type",
"source_node",
"cluster_state"), XContentType.JSON);
}
}

View File

@ -1,93 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.monitoring.resolver.cluster;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.monitoring.MonitoredSystem;
import org.elasticsearch.xpack.monitoring.MonitoringSettings;
import org.elasticsearch.xpack.monitoring.collector.cluster.ClusterStateCollector;
import org.elasticsearch.xpack.monitoring.collector.cluster.ClusterStateMonitoringDoc;
import org.elasticsearch.xpack.monitoring.exporter.MonitoringDoc;
import org.elasticsearch.xpack.monitoring.test.MonitoringIntegTestCase;
import org.elasticsearch.xpack.security.InternalClient;
import org.junit.After;
import org.junit.Before;
import java.util.Collection;
import java.util.Map;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.notNullValue;
@ESIntegTestCase.ClusterScope(numDataNodes = 1, supportsDedicatedMasters = false)
public class ClusterStateTests extends MonitoringIntegTestCase {
private int randomInt = randomInt();
private ThreadPool threadPool = null;
@Before
public void setupThreadPool() {
threadPool = new TestThreadPool(getTestName());
}
@After
public void removeThreadPool() throws InterruptedException {
if (threadPool != null) {
terminate(threadPool);
}
}
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(MonitoringSettings.INTERVAL.getKey(), "-1")
.put("xpack.monitoring.exporters.default_local.type", "local")
.put("node.attr.custom", randomInt)
.build();
}
public void testClusterState() throws Exception {
final String masterNodeName = internalCluster().getMasterName();
final MonitoringSettings monitoringSettings = new MonitoringSettings(Settings.EMPTY, clusterService().getClusterSettings());
final InternalClient client = new InternalClient(Settings.EMPTY, threadPool, internalCluster().client(masterNodeName));
final ClusterStateCollector collector =
new ClusterStateCollector(Settings.EMPTY,
internalCluster().clusterService(masterNodeName),
monitoringSettings, new XPackLicenseState(), client);
final Collection<MonitoringDoc> monitoringDocs = collector.collect();
// just one cluster state
assertThat(monitoringDocs, hasSize(1));
// get the cluster state document that we fetched
final ClusterStateMonitoringDoc clusterStateDoc = (ClusterStateMonitoringDoc)monitoringDocs.iterator().next();
assertThat(clusterStateDoc.getClusterState(), notNullValue());
assertThat(clusterStateDoc.getStatus(), notNullValue());
// turn the monitoring doc into JSON
final ClusterStateResolver resolver = new ClusterStateResolver(MonitoredSystem.ES, Settings.EMPTY);
final BytesReference jsonBytes = resolver.source(clusterStateDoc, XContentType.JSON);
// parse the JSON to figure out what we just did
final Map<String, Object> fields = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, jsonBytes).map();
// ensure we did what we wanted
for (final String filter : ClusterStateResolver.FILTERS) {
assertContains(filter, fields);
}
}
}

View File

@ -8,8 +8,12 @@ package org.elasticsearch.xpack.monitoring.resolver.cluster;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsResponse; import org.elasticsearch.action.admin.cluster.stats.ClusterStatsResponse;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
@ -25,6 +29,8 @@ import java.util.UUID;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet; import static java.util.Collections.emptySet;
import static org.hamcrest.Matchers.startsWith; import static org.hamcrest.Matchers.startsWith;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class ClusterStatsResolverTests extends MonitoringIndexNameResolverTestCase<ClusterStatsMonitoringDoc, ClusterStatsResolver> { public class ClusterStatsResolverTests extends MonitoringIndexNameResolverTestCase<ClusterStatsMonitoringDoc, ClusterStatsResolver> {
@ -40,6 +46,15 @@ public class ClusterStatsResolverTests extends MonitoringIndexNameResolverTestCa
.issueDate(1437580442979L) .issueDate(1437580442979L)
.expiryDate(1437580442979L + TimeValue.timeValueHours(2).getMillis()); .expiryDate(1437580442979L + TimeValue.timeValueHours(2).getMillis());
final String nodeUuid = "the-master-nodes-uuid";
final ClusterName clusterName = ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY);
final DiscoveryNodes nodes = mock(DiscoveryNodes.class);
final DiscoveryNode node =
new DiscoveryNode(nodeUuid, new TransportAddress(TransportAddress.META_ADDRESS, 9300), Version.CURRENT);
when(nodes.getMasterNodeId()).thenReturn(nodeUuid);
when(nodes.iterator()).thenReturn(Collections.singleton(node).iterator());
return new ClusterStatsMonitoringDoc(randomMonitoringId(), return new ClusterStatsMonitoringDoc(randomMonitoringId(),
randomAlphaOfLength(2), randomAlphaOfLength(5), randomAlphaOfLength(2), randomAlphaOfLength(5),
Math.abs(randomLong()), Math.abs(randomLong()),
@ -50,9 +65,11 @@ public class ClusterStatsResolverTests extends MonitoringIndexNameResolverTestCa
Collections.singletonList(new MonitoringFeatureSet.Usage(randomBoolean(), randomBoolean(), emptyMap())), Collections.singletonList(new MonitoringFeatureSet.Usage(randomBoolean(), randomBoolean(), emptyMap())),
new ClusterStatsResponse( new ClusterStatsResponse(
Math.abs(randomLong()), Math.abs(randomLong()),
ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY), clusterName,
Collections.emptyList(), Collections.emptyList(),
Collections.emptyList()) Collections.emptyList()),
new ClusterState(clusterName, randomLong(), "a-real-state-uuid", null, null, nodes, null, null, false),
randomFrom(ClusterHealthStatus.values())
); );
} catch (Exception e) { } catch (Exception e) {
throw new IllegalStateException("Failed to generated random ClusterStatsMonitoringDoc", e); throw new IllegalStateException("Failed to generated random ClusterStatsMonitoringDoc", e);
@ -79,6 +96,11 @@ public class ClusterStatsResolverTests extends MonitoringIndexNameResolverTestCa
"version", "version",
"license", "license",
"cluster_stats", "cluster_stats",
"cluster_state.status",
"cluster_state.version",
"cluster_state.state_uuid",
"cluster_state.master_node",
"cluster_state.nodes",
"stack_stats.xpack"), XContentType.JSON); "stack_stats.xpack"), XContentType.JSON);
} }
} }

View File

@ -28,6 +28,7 @@ import static org.elasticsearch.test.ESIntegTestCase.Scope.TEST;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.isEmptyOrNullString; import static org.hamcrest.Matchers.isEmptyOrNullString;
import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.not;
@ -55,6 +56,7 @@ public class ClusterStatsTests extends MonitoringIntegTestCase {
wipeMonitoringIndices(); wipeMonitoringIndices();
} }
@SuppressWarnings("unchecked")
public void testClusterStats() throws Exception { public void testClusterStats() throws Exception {
ensureGreen(); ensureGreen();
@ -135,5 +137,18 @@ public class ClusterStatsTests extends MonitoringIntegTestCase {
assertNotNull(monitoring); assertNotNull(monitoring);
// we don't make any assumptions about what's in it, only that it's there // we don't make any assumptions about what's in it, only that it's there
assertThat(monitoring, instanceOf(Map.class)); assertThat(monitoring, instanceOf(Map.class));
Object clusterState = source.get("cluster_state");
assertNotNull(clusterState);
assertThat(clusterState, instanceOf(Map.class));
Map<String, Object> clusterStateMap = (Map<String, Object>)clusterState;
assertThat(clusterStateMap.keySet(), hasSize(5));
assertThat(clusterStateMap.remove("status"), notNullValue());
assertThat(clusterStateMap.remove("version"), notNullValue());
assertThat(clusterStateMap.remove("state_uuid"), notNullValue());
assertThat(clusterStateMap.remove("master_node"), notNullValue());
assertThat(clusterStateMap.remove("nodes"), notNullValue());
} }
} }