[Monitoring] Change MonitoringIT into ESIntegTestCase (elastic/x-pack-elasticsearch#3899)
Change MonitoringIT into ESIntegTestCase Original commit: elastic/x-pack-elasticsearch@d06f6cc0b3
This commit is contained in:
parent
f6b0de2536
commit
63c2d3e6db
|
@ -13,6 +13,7 @@ import org.elasticsearch.xpack.core.XPackFeatureSet;
|
|||
import org.elasticsearch.xpack.core.XPackField;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
public class MonitoringFeatureSetUsage extends XPackFeatureSet.Usage {
|
||||
|
@ -32,6 +33,10 @@ public class MonitoringFeatureSetUsage extends XPackFeatureSet.Usage {
|
|||
this.exporters = exporters;
|
||||
}
|
||||
|
||||
public Map<String, Object> getExporters() {
|
||||
return exporters == null ? Collections.emptyMap() : Collections.unmodifiableMap(exporters);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
|
|
|
@ -84,7 +84,7 @@ public class TransportMonitoringBulkActionTests extends ESTestCase {
|
|||
|
||||
@Before
|
||||
@SuppressWarnings("unchecked")
|
||||
public void setUpMocks() throws Exception {
|
||||
public void setUpMocks() {
|
||||
listener = mock(ActionListener.class);
|
||||
exporters = mock(Exporters.class);
|
||||
threadPool = mock(ThreadPool.class);
|
||||
|
@ -113,7 +113,7 @@ public class TransportMonitoringBulkActionTests extends ESTestCase {
|
|||
assertThat(e, hasToString(containsString("ClusterBlockException[blocked by: [SERVICE_UNAVAILABLE/2/no master]")));
|
||||
}
|
||||
|
||||
public void testExecuteEmptyRequest() throws Exception {
|
||||
public void testExecuteEmptyRequest() {
|
||||
final TransportMonitoringBulkAction action = new TransportMonitoringBulkAction(Settings.EMPTY, threadPool, clusterService,
|
||||
transportService, filters, resolver, exporters);
|
||||
|
||||
|
@ -184,7 +184,7 @@ public class TransportMonitoringBulkActionTests extends ESTestCase {
|
|||
verify(clusterService).localNode();
|
||||
}
|
||||
|
||||
public void testAsyncActionCreateMonitoringDocsWithNoDocs() throws Exception {
|
||||
public void testAsyncActionCreateMonitoringDocsWithNoDocs() {
|
||||
final Collection<MonitoringBulkDoc> bulkDocs = new ArrayList<>();
|
||||
if (randomBoolean()) {
|
||||
final int nbDocs = randomIntBetween(1, 50);
|
||||
|
@ -202,7 +202,7 @@ public class TransportMonitoringBulkActionTests extends ESTestCase {
|
|||
assertThat(results.size(), equalTo(0));
|
||||
}
|
||||
|
||||
public void testAsyncActionCreateMonitoringDocs() throws Exception {
|
||||
public void testAsyncActionCreateMonitoringDocs() {
|
||||
final List<MonitoringBulkDoc> docs = new ArrayList<>();
|
||||
|
||||
final MonitoredSystem system = randomFrom(MonitoredSystem.KIBANA, MonitoredSystem.LOGSTASH, MonitoredSystem.BEATS);
|
||||
|
@ -243,7 +243,7 @@ public class TransportMonitoringBulkActionTests extends ESTestCase {
|
|||
});
|
||||
}
|
||||
|
||||
public void testAsyncActionCreateMonitoringDocWithNoTimestamp() throws Exception {
|
||||
public void testAsyncActionCreateMonitoringDocWithNoTimestamp() {
|
||||
final MonitoringBulkDoc monitoringBulkDoc =
|
||||
new MonitoringBulkDoc(MonitoredSystem.LOGSTASH, "_type", "_id", 0L, 0L, BytesArray.EMPTY, XContentType.JSON);
|
||||
|
||||
|
@ -293,7 +293,7 @@ public class TransportMonitoringBulkActionTests extends ESTestCase {
|
|||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testAsyncActionExecuteExport() throws Exception {
|
||||
public void testAsyncActionExecuteExport() {
|
||||
final int nbDocs = randomIntBetween(1, 25);
|
||||
final Collection<MonitoringDoc> docs = new ArrayList<>(nbDocs);
|
||||
for (int i = 0; i < nbDocs; i++) {
|
||||
|
@ -318,7 +318,7 @@ public class TransportMonitoringBulkActionTests extends ESTestCase {
|
|||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testAsyncActionExportThrowsException() throws Exception {
|
||||
public void testAsyncActionExportThrowsException() {
|
||||
final int nbDocs = randomIntBetween(1, 25);
|
||||
final Collection<MonitoringDoc> docs = new ArrayList<>(nbDocs);
|
||||
for (int i = 0; i < nbDocs; i++) {
|
||||
|
|
|
@ -5,60 +5,71 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.monitoring.integration;
|
||||
|
||||
import org.apache.http.HttpEntity;
|
||||
import org.apache.http.HttpStatus;
|
||||
import org.apache.http.entity.ContentType;
|
||||
import org.apache.http.entity.StringEntity;
|
||||
import org.apache.http.nio.entity.NStringEntity;
|
||||
import org.apache.lucene.util.Constants;
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.client.Response;
|
||||
import org.elasticsearch.client.ResponseException;
|
||||
import org.elasticsearch.common.CheckedFunction;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
|
||||
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
|
||||
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.analysis.common.CommonAnalysisPlugin;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.CheckedRunnable;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.common.xcontent.support.XContentMapValues;
|
||||
import org.elasticsearch.license.License;
|
||||
import org.elasticsearch.test.rest.ESRestTestCase;
|
||||
import org.elasticsearch.xpack.core.monitoring.MonitoredSystem;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.SearchHits;
|
||||
import org.elasticsearch.search.collapse.CollapseBuilder;
|
||||
import org.elasticsearch.search.sort.SortOrder;
|
||||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPoolStats;
|
||||
import org.elasticsearch.xpack.core.XPackSettings;
|
||||
import org.elasticsearch.xpack.core.action.XPackUsageRequestBuilder;
|
||||
import org.elasticsearch.xpack.core.action.XPackUsageResponse;
|
||||
import org.elasticsearch.xpack.core.monitoring.action.MonitoringBulkRequestBuilder;
|
||||
import org.elasticsearch.xpack.core.monitoring.action.MonitoringBulkResponse;
|
||||
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils;
|
||||
import org.elasticsearch.xpack.core.monitoring.MonitoredSystem;
|
||||
import org.elasticsearch.xpack.core.monitoring.MonitoringFeatureSetUsage;
|
||||
import org.elasticsearch.xpack.monitoring.LocalStateMonitoring;
|
||||
import org.elasticsearch.xpack.monitoring.collector.cluster.ClusterStatsMonitoringDoc;
|
||||
import org.elasticsearch.xpack.monitoring.collector.indices.IndexRecoveryMonitoringDoc;
|
||||
import org.elasticsearch.xpack.monitoring.collector.indices.IndexStatsMonitoringDoc;
|
||||
import org.elasticsearch.xpack.monitoring.collector.indices.IndicesStatsMonitoringDoc;
|
||||
import org.elasticsearch.xpack.monitoring.collector.node.NodeStatsMonitoringDoc;
|
||||
import org.elasticsearch.xpack.monitoring.collector.shards.ShardMonitoringDoc;
|
||||
import org.elasticsearch.xpack.monitoring.rest.action.RestMonitoringBulkAction;
|
||||
import org.hamcrest.Matcher;
|
||||
import org.elasticsearch.xpack.monitoring.test.MockIngestPlugin;
|
||||
import org.joda.time.format.DateTimeFormat;
|
||||
import org.joda.time.format.ISODateTimeFormat;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static org.elasticsearch.common.xcontent.XContentHelper.toXContent;
|
||||
import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS;
|
||||
import static org.elasticsearch.common.xcontent.support.XContentMapValues.extractValue;
|
||||
import static org.elasticsearch.test.SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING;
|
||||
import static org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils.TEMPLATE_VERSION;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.elasticsearch.threadpool.ThreadPool.Names.BULK;
|
||||
import static org.elasticsearch.xpack.monitoring.collector.cluster.ClusterStatsMonitoringDoc.hash;
|
||||
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
|
||||
import static org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils.TEMPLATE_VERSION;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.isEmptyOrNullString;
|
||||
import static org.hamcrest.Matchers.isOneOf;
|
||||
|
@ -66,74 +77,35 @@ import static org.hamcrest.Matchers.not;
|
|||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/x-pack-elasticsearch/issues/3728")
|
||||
public class MonitoringIT extends ESRestTestCase {
|
||||
|
||||
private static final String BASIC_AUTH_VALUE = basicAuthHeaderValue("x_pack_rest_user", TEST_PASSWORD_SECURE_STRING);
|
||||
public class MonitoringIT extends ESSingleNodeTestCase {
|
||||
|
||||
private final TimeValue collectionInterval = TimeValue.timeValueSeconds(3);
|
||||
|
||||
@Override
|
||||
protected Settings restClientSettings() {
|
||||
protected Settings nodeSettings() {
|
||||
return Settings.builder()
|
||||
.put(super.restClientSettings())
|
||||
.put(ThreadContext.PREFIX + ".Authorization", BASIC_AUTH_VALUE)
|
||||
.put(super.nodeSettings())
|
||||
.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false)
|
||||
.put("xpack.monitoring.collection.interval", "-1")
|
||||
.put("xpack.monitoring.exporters._local.type", "local")
|
||||
.put("xpack.monitoring.exporters._local.enabled", false)
|
||||
.put("xpack.monitoring.exporters._local.cluster_alerts.management.enabled", false)
|
||||
.build();
|
||||
}
|
||||
|
||||
private HttpEntity createBulkEntity() {
|
||||
final StringBuilder bulk = new StringBuilder();
|
||||
bulk.append("{\"index\":{\"_type\":\"test\"}}\n");
|
||||
bulk.append("{\"foo\":{\"bar\":0}}\n");
|
||||
bulk.append("{\"index\":{\"_type\":\"test\"}}\n");
|
||||
bulk.append("{\"foo\":{\"bar\":1}}\n");
|
||||
bulk.append("{\"index\":{\"_type\":\"test\"}}\n");
|
||||
bulk.append("{\"foo\":{\"bar\":2}}\n");
|
||||
bulk.append("\n");
|
||||
return new NStringEntity(bulk.toString(), ContentType.APPLICATION_JSON);
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> getPlugins() {
|
||||
return Arrays.asList(LocalStateMonitoring.class, MockIngestPlugin.class, CommonAnalysisPlugin.class);
|
||||
}
|
||||
|
||||
public void testMonitoringBulkApiWithMissingSystemId() throws IOException {
|
||||
final Map<String, String> parameters = parameters(null, TEMPLATE_VERSION, "10s");
|
||||
|
||||
assertBadRequest(parameters, createBulkEntity(), containsString("no [system_id] for monitoring bulk request"));
|
||||
}
|
||||
|
||||
public void testMonitoringBulkApiWithMissingSystemApiVersion() throws IOException {
|
||||
final Map<String, String> parameters = parameters(randomSystemId(), null, "10s");
|
||||
|
||||
assertBadRequest(parameters, createBulkEntity(), containsString("no [system_api_version] for monitoring bulk request"));
|
||||
}
|
||||
|
||||
public void testMonitoringBulkApiWithMissingInterval() throws IOException {
|
||||
final Map<String, String> parameters = parameters(randomSystemId(), TEMPLATE_VERSION, null);
|
||||
|
||||
assertBadRequest(parameters, createBulkEntity(), containsString("no [interval] for monitoring bulk request"));
|
||||
}
|
||||
|
||||
public void testMonitoringBulkApiWithWrongInterval() throws IOException {
|
||||
final Map<String, String> parameters = parameters(randomSystemId(), TEMPLATE_VERSION, "null");
|
||||
|
||||
assertBadRequest(parameters, createBulkEntity(), containsString("failed to parse setting [interval] with value [null]"));
|
||||
}
|
||||
|
||||
public void testMonitoringBulkApiWithMissingContent() throws IOException {
|
||||
final Map<String, String> parameters = parameters(randomSystemId(), TEMPLATE_VERSION, "30s");
|
||||
|
||||
assertBadRequest(parameters, null, containsString("no body content for monitoring bulk request"));
|
||||
}
|
||||
|
||||
public void testMonitoringBulkApiWithUnsupportedSystemVersion() throws IOException {
|
||||
final String systemId = randomSystemId();
|
||||
final String systemApiVersion = randomFrom(TEMPLATE_VERSION, MonitoringTemplateUtils.OLD_TEMPLATE_VERSION);
|
||||
|
||||
Map<String, String> parameters = parameters(MonitoredSystem.UNKNOWN.getSystem(), systemApiVersion, "30s");
|
||||
assertBadRequest(parameters, createBulkEntity(),
|
||||
containsString("system_api_version ["+ systemApiVersion + "] is not supported by system_id [unknown]"));
|
||||
|
||||
parameters = parameters(systemId, "0", "30s");
|
||||
assertBadRequest(parameters, createBulkEntity(),
|
||||
containsString("system_api_version [0] is not supported by system_id [" + systemId + "]"));
|
||||
private String createBulkEntity() {
|
||||
return "{\"index\":{\"_type\":\"test\"}}\n" +
|
||||
"{\"foo\":{\"bar\":0}}\n" +
|
||||
"{\"index\":{\"_type\":\"test\"}}\n" +
|
||||
"{\"foo\":{\"bar\":1}}\n" +
|
||||
"{\"index\":{\"_type\":\"test\"}}\n" +
|
||||
"{\"foo\":{\"bar\":2}}\n" +
|
||||
"\n";
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -148,38 +120,63 @@ public class MonitoringIT extends ESRestTestCase {
|
|||
final MonitoredSystem system = randomSystem();
|
||||
final TimeValue interval = TimeValue.timeValueSeconds(randomIntBetween(1, 20));
|
||||
|
||||
// REST is the realistic way that these operations happen, so it's the most realistic way to integration test it too
|
||||
// Use Monitoring Bulk API to index 3 documents
|
||||
Response bulkResponse = client().performRequest("POST", "/_xpack/monitoring/_bulk",
|
||||
parameters(system.getSystem(), TEMPLATE_VERSION, interval.getStringRep()),
|
||||
createBulkEntity());
|
||||
//final Response bulkResponse = getRestClient().performRequest("POST", "/_xpack/monitoring/_bulk",
|
||||
// parameters, createBulkEntity());
|
||||
|
||||
assertThat(bulkResponse.getStatusLine().getStatusCode(), equalTo(HttpStatus.SC_OK));
|
||||
assertThat(toMap(bulkResponse.getEntity()).get("errors"), equalTo(false));
|
||||
final MonitoringBulkResponse bulkResponse =
|
||||
new MonitoringBulkRequestBuilder(client())
|
||||
.add(system, null, new BytesArray(createBulkEntity().getBytes("UTF-8")), XContentType.JSON,
|
||||
System.currentTimeMillis(), interval.millis())
|
||||
.get();
|
||||
|
||||
final String indexPrefix = ".monitoring-" + system.getSystem() + "-" + TEMPLATE_VERSION;
|
||||
assertThat(bulkResponse.status(), is(RestStatus.OK));
|
||||
assertThat(bulkResponse.getError(), nullValue());
|
||||
|
||||
final String monitoringIndex = ".monitoring-" + system.getSystem() + "-" + TEMPLATE_VERSION + "-*";
|
||||
|
||||
// Wait for the monitoring index to be created
|
||||
awaitRestApi("GET", "/" + indexPrefix + "-*/_search", singletonMap("size", "0"), null,
|
||||
resp -> {
|
||||
Number hitsTotal = (Number) extractValue("hits.total", resp);
|
||||
return hitsTotal != null && hitsTotal.intValue() >= 3;
|
||||
}, "Exception when waiting for monitoring bulk documents to be indexed");
|
||||
assertBusy(() -> {
|
||||
assertThat(client().admin().indices().prepareRefresh(monitoringIndex).get().getStatus(), is(RestStatus.OK));
|
||||
|
||||
Response searchResponse = client().performRequest("GET", "/" + indexPrefix + "-*/_search");
|
||||
final Map<String, Object> results = toMap(searchResponse.getEntity());
|
||||
final SearchResponse response =
|
||||
client().prepareSearch(".monitoring-" + system.getSystem() + "-" + TEMPLATE_VERSION + "-*")
|
||||
.get();
|
||||
|
||||
final Map<String, Object> hits = (Map<String, Object>) results.get("hits");
|
||||
final List<Map<String, Object>> searchHits = (List<Map<String,Object>>) hits.get("hits");
|
||||
assertEquals(3, searchHits.size());
|
||||
// exactly 3 results are expected
|
||||
assertThat("No monitoring documents yet", response.getHits().getTotalHits(), equalTo(3L));
|
||||
|
||||
assertEquals("Monitoring documents must have the same timestamp",
|
||||
1, searchHits.stream().map(map -> extractValue("_source.timestamp", map)).distinct().count());
|
||||
final List<Map<String, Object>> sources =
|
||||
Arrays.stream(response.getHits().getHits())
|
||||
.map(SearchHit::getSourceAsMap)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
assertEquals("Monitoring documents must have the same source_node timestamp",
|
||||
1, searchHits.stream().map(map -> extractValue("_source.source_node.timestamp", map)).distinct().count());
|
||||
// find distinct _source.timestamp fields
|
||||
assertThat(sources.stream().map(source -> source.get("timestamp")).distinct().count(), is(1L));
|
||||
// find distinct _source.source_node fields (which is a map)
|
||||
assertThat(sources.stream().map(source -> source.get("source_node")).distinct().count(), is(1L));
|
||||
});
|
||||
|
||||
for (Map<String,Object> searchHit : searchHits) {
|
||||
assertMonitoringDoc(searchHit, system, "test", interval);
|
||||
final SearchResponse response = client().prepareSearch(monitoringIndex).get();
|
||||
final SearchHits hits = response.getHits();
|
||||
|
||||
assertThat(response.getHits().getTotalHits(), equalTo(3L));
|
||||
assertThat("Monitoring documents must have the same timestamp",
|
||||
Arrays.stream(hits.getHits())
|
||||
.map(hit -> extractValue("timestamp", hit.getSourceAsMap()))
|
||||
.distinct()
|
||||
.count(),
|
||||
equalTo(1L));
|
||||
assertThat("Monitoring documents must have the same source_node timestamp",
|
||||
Arrays.stream(hits.getHits())
|
||||
.map(hit -> extractValue("source_node.timestamp", hit.getSourceAsMap()))
|
||||
.distinct()
|
||||
.count(),
|
||||
equalTo(1L));
|
||||
|
||||
for (final SearchHit hit : hits.getHits()) {
|
||||
assertMonitoringDoc(toMap(hit), system, "test", interval);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -194,36 +191,50 @@ public class MonitoringIT extends ESRestTestCase {
|
|||
public void testMonitoringService() throws Exception {
|
||||
final boolean createAPMIndex = randomBoolean();
|
||||
final String indexName = createAPMIndex ? "apm-2017.11.06" : "books";
|
||||
final HttpEntity document = new StringEntity("{\"field\":\"value\"}", ContentType.APPLICATION_JSON);
|
||||
assertThat(client().performRequest("POST", "/" + indexName + "/doc/0", singletonMap("refresh", "true"), document)
|
||||
.getStatusLine().getStatusCode(), equalTo(HttpStatus.SC_CREATED));
|
||||
|
||||
assertThat(client().prepareIndex(indexName, "doc", "0")
|
||||
.setRefreshPolicy("true")
|
||||
.setSource("{\"field\":\"value\"}", XContentType.JSON)
|
||||
.get()
|
||||
.status(),
|
||||
is(RestStatus.CREATED));
|
||||
|
||||
whenExportersAreReady(() -> {
|
||||
final Response searchResponse = client().performRequest("GET", "/.monitoring-es-*/_search", singletonMap("size", "100"));
|
||||
assertThat(searchResponse.getStatusLine().getStatusCode(), equalTo(HttpStatus.SC_OK));
|
||||
final AtomicReference<SearchResponse> searchResponse = new AtomicReference<>();
|
||||
|
||||
final Map<String, Object> results = toMap(searchResponse.getEntity());
|
||||
assertBusy(() -> {
|
||||
final SearchResponse response =
|
||||
client().prepareSearch(".monitoring-es-*")
|
||||
.setCollapse(new CollapseBuilder("type"))
|
||||
.addSort("timestamp", SortOrder.DESC)
|
||||
.get();
|
||||
|
||||
final Map<String, Object> hits = (Map<String, Object>) results.get("hits");
|
||||
assertThat("Expecting a minimum number of 6 docs, one per collector", (Integer) hits.get("total"), greaterThanOrEqualTo(6));
|
||||
assertThat(response.status(), is(RestStatus.OK));
|
||||
assertThat("Expecting a minimum number of 6 docs, one per collector",
|
||||
response.getHits().getHits().length,
|
||||
greaterThanOrEqualTo(6));
|
||||
|
||||
final List<Map<String, Object>> searchHits = (List<Map<String,Object>>) hits.get("hits");
|
||||
searchResponse.set(response);
|
||||
});
|
||||
|
||||
for (Map<String,Object> searchHit : searchHits) {
|
||||
for (final SearchHit hit : searchResponse.get().getHits()) {
|
||||
final Map<String, Object> searchHit = toMap(hit);
|
||||
final String type = (String) extractValue("_source.type", searchHit);
|
||||
|
||||
assertMonitoringDoc(searchHit, MonitoredSystem.ES, type, collectionInterval);
|
||||
|
||||
if (ClusterStatsMonitoringDoc.TYPE.equals(type)) {
|
||||
assertClusterStatsMonitoringDoc(searchHit, collectionInterval, createAPMIndex);
|
||||
assertClusterStatsMonitoringDoc(searchHit, createAPMIndex);
|
||||
} else if (IndexRecoveryMonitoringDoc.TYPE.equals(type)) {
|
||||
assertIndexRecoveryMonitoringDoc(searchHit, collectionInterval);
|
||||
assertIndexRecoveryMonitoringDoc(searchHit);
|
||||
} else if (IndicesStatsMonitoringDoc.TYPE.equals(type)) {
|
||||
assertIndicesStatsMonitoringDoc(searchHit, collectionInterval);
|
||||
assertIndicesStatsMonitoringDoc(searchHit);
|
||||
} else if (IndexStatsMonitoringDoc.TYPE.equals(type)) {
|
||||
assertIndexStatsMonitoringDoc(searchHit, collectionInterval);
|
||||
assertIndexStatsMonitoringDoc(searchHit);
|
||||
} else if (NodeStatsMonitoringDoc.TYPE.equals(type)) {
|
||||
assertNodeStatsMonitoringDoc(searchHit, collectionInterval);
|
||||
assertNodeStatsMonitoringDoc(searchHit);
|
||||
} else if (ShardMonitoringDoc.TYPE.equals(type)) {
|
||||
assertShardMonitoringDoc(searchHit, collectionInterval);
|
||||
assertShardMonitoringDoc(searchHit);
|
||||
} else {
|
||||
fail("Monitoring document of type [" + type + "] is not supported by this test");
|
||||
}
|
||||
|
@ -236,11 +247,11 @@ public class MonitoringIT extends ESRestTestCase {
|
|||
* all monitoring documents must have
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private static void assertMonitoringDoc(final Map<String, Object> document,
|
||||
final MonitoredSystem expectedSystem,
|
||||
final String expectedType,
|
||||
final TimeValue interval) throws Exception {
|
||||
assertEquals(5, document.size());
|
||||
private void assertMonitoringDoc(final Map<String, Object> document,
|
||||
final MonitoredSystem expectedSystem,
|
||||
final String expectedType,
|
||||
final TimeValue interval) {
|
||||
assertEquals(document.toString(),4, document.size());
|
||||
|
||||
final String index = (String) document.get("_index");
|
||||
assertThat(index, containsString(".monitoring-" + expectedSystem.getSystem() + "-" + TEMPLATE_VERSION + "-"));
|
||||
|
@ -272,25 +283,20 @@ public class MonitoringIT extends ESRestTestCase {
|
|||
* the current local node information
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private static void assertMonitoringDocSourceNode(final Map<String, Object> sourceNode) throws Exception {
|
||||
private void assertMonitoringDocSourceNode(final Map<String, Object> sourceNode) {
|
||||
assertEquals(6, sourceNode.size());
|
||||
|
||||
Map<String, String> filterPath = singletonMap("filter_path", "nodes.*.name,nodes.*.transport_address,nodes.*.host,nodes.*.ip");
|
||||
final Response nodesResponse = client().performRequest("GET", "/_nodes", filterPath);
|
||||
final NodesInfoResponse nodesResponse = client().admin().cluster().prepareNodesInfo().clear().get();
|
||||
|
||||
final Map<String, Object> nodes = (Map<String, Object>) toMap(nodesResponse.getEntity()).get("nodes");
|
||||
assertEquals(1, nodes.size());
|
||||
assertEquals(1, nodesResponse.getNodes().size());
|
||||
|
||||
final String nodeId = nodes.keySet().iterator().next();
|
||||
final DiscoveryNode node = nodesResponse.getNodes().stream().findFirst().get().getNode();
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
final Map<String, Object> node = (Map<String, Object>) nodes.get(nodeId);
|
||||
|
||||
assertThat(sourceNode.get("uuid"), equalTo(nodeId));
|
||||
assertThat(sourceNode.get("host"), equalTo(node.get("host")));
|
||||
assertThat(sourceNode.get("transport_address"),equalTo(node.get("transport_address")));
|
||||
assertThat(sourceNode.get("ip"), equalTo(node.get("ip")));
|
||||
assertThat(sourceNode.get("name"), equalTo(node.get("name")));
|
||||
assertThat(sourceNode.get("uuid"), equalTo(node.getId()));
|
||||
assertThat(sourceNode.get("host"), equalTo(node.getHostName()));
|
||||
assertThat(sourceNode.get("transport_address"),equalTo(node.getAddress().toString()));
|
||||
assertThat(sourceNode.get("ip"), equalTo(node.getAddress().getAddress()));
|
||||
assertThat(sourceNode.get("name"), equalTo(node.getName()));
|
||||
assertThat((String) sourceNode.get("timestamp"), not(isEmptyOrNullString()));
|
||||
}
|
||||
|
||||
|
@ -298,11 +304,8 @@ public class MonitoringIT extends ESRestTestCase {
|
|||
* Assert that a {@link ClusterStatsMonitoringDoc} contains the expected information
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private static void assertClusterStatsMonitoringDoc(final Map<String, Object> document,
|
||||
final TimeValue interval,
|
||||
final boolean apmIndicesExist) throws Exception {
|
||||
assertMonitoringDoc(document, MonitoredSystem.ES, ClusterStatsMonitoringDoc.TYPE, interval);
|
||||
|
||||
private void assertClusterStatsMonitoringDoc(final Map<String, Object> document,
|
||||
final boolean apmIndicesExist) {
|
||||
final Map<String, Object> source = (Map<String, Object>) document.get("_source");
|
||||
assertEquals(11, source.size());
|
||||
|
||||
|
@ -373,9 +376,7 @@ public class MonitoringIT extends ESRestTestCase {
|
|||
* Assert that a {@link IndexRecoveryMonitoringDoc} contains the expected information
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private static void assertIndexRecoveryMonitoringDoc(final Map<String, Object> document, final TimeValue interval) throws Exception {
|
||||
assertMonitoringDoc(document, MonitoredSystem.ES, IndexRecoveryMonitoringDoc.TYPE, interval);
|
||||
|
||||
private void assertIndexRecoveryMonitoringDoc(final Map<String, Object> document) {
|
||||
final Map<String, Object> source = (Map<String, Object>) document.get("_source");
|
||||
assertEquals(6, source.size());
|
||||
|
||||
|
@ -390,9 +391,7 @@ public class MonitoringIT extends ESRestTestCase {
|
|||
* Assert that a {@link IndicesStatsMonitoringDoc} contains the expected information
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private static void assertIndicesStatsMonitoringDoc(final Map<String, Object> document, final TimeValue interval) throws Exception {
|
||||
assertMonitoringDoc(document, MonitoredSystem.ES, IndicesStatsMonitoringDoc.TYPE, interval);
|
||||
|
||||
private void assertIndicesStatsMonitoringDoc(final Map<String, Object> document) {
|
||||
final Map<String, Object> source = (Map<String, Object>) document.get("_source");
|
||||
assertEquals(6, source.size());
|
||||
|
||||
|
@ -407,9 +406,7 @@ public class MonitoringIT extends ESRestTestCase {
|
|||
* Assert that a {@link IndexStatsMonitoringDoc} contains the expected information
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private static void assertIndexStatsMonitoringDoc(final Map<String, Object> document, final TimeValue interval) throws Exception {
|
||||
assertMonitoringDoc(document, MonitoredSystem.ES, IndexStatsMonitoringDoc.TYPE, interval);
|
||||
|
||||
private void assertIndexStatsMonitoringDoc(final Map<String, Object> document) {
|
||||
final Map<String, Object> source = (Map<String, Object>) document.get("_source");
|
||||
assertEquals(6, source.size());
|
||||
|
||||
|
@ -418,7 +415,7 @@ public class MonitoringIT extends ESRestTestCase {
|
|||
assertEquals(8, indexStats.size());
|
||||
assertThat((String) indexStats.get("index"), not(isEmptyOrNullString()));
|
||||
assertThat((String) indexStats.get("uuid"), not(isEmptyOrNullString()));
|
||||
assertThat((Long) indexStats.get("created"), notNullValue());
|
||||
assertThat(indexStats.get("created"), notNullValue());
|
||||
assertThat((String) indexStats.get("status"), not(isEmptyOrNullString()));
|
||||
assertThat(indexStats.get("version"), notNullValue());
|
||||
final Map<String, Object> version = (Map<String, Object>) indexStats.get("version");
|
||||
|
@ -437,9 +434,7 @@ public class MonitoringIT extends ESRestTestCase {
|
|||
* Assert that a {@link NodeStatsMonitoringDoc} contains the expected information
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private static void assertNodeStatsMonitoringDoc(final Map<String, Object> document, final TimeValue interval) throws Exception {
|
||||
assertMonitoringDoc(document, MonitoredSystem.ES, NodeStatsMonitoringDoc.TYPE, interval);
|
||||
|
||||
private void assertNodeStatsMonitoringDoc(final Map<String, Object> document) {
|
||||
final Map<String, Object> source = (Map<String, Object>) document.get("_source");
|
||||
assertEquals(6, source.size());
|
||||
|
||||
|
@ -468,9 +463,7 @@ public class MonitoringIT extends ESRestTestCase {
|
|||
* Assert that a {@link ShardMonitoringDoc} contains the expected information
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private static void assertShardMonitoringDoc(final Map<String, Object> document, final TimeValue interval) throws Exception {
|
||||
assertMonitoringDoc(document, MonitoredSystem.ES, ShardMonitoringDoc.TYPE, interval);
|
||||
|
||||
private void assertShardMonitoringDoc(final Map<String, Object> document) {
|
||||
final Map<String, Object> source = (Map<String, Object>) document.get("_source");
|
||||
assertEquals(7, source.size());
|
||||
assertThat(source.get("state_uuid"), notNullValue());
|
||||
|
@ -517,72 +510,82 @@ public class MonitoringIT extends ESRestTestCase {
|
|||
* Enable the monitoring service and the Local exporter, waiting for some monitoring documents
|
||||
* to be indexed before it returns.
|
||||
*/
|
||||
public static void enableMonitoring(final TimeValue interval) throws Exception {
|
||||
final Map<String, Object> exporters = callRestApi("GET", "/_xpack/usage?filter_path=monitoring.enabled_exporters", 200);
|
||||
assertNotNull("List of monitoring exporters must not be null", exporters);
|
||||
assertThat("List of enabled exporters must be empty before enabling monitoring",
|
||||
XContentMapValues.extractRawValues("monitoring.enabled_exporters", exporters), hasSize(0));
|
||||
public void enableMonitoring(final TimeValue interval) throws Exception {
|
||||
// delete anything that may happen to already exist
|
||||
assertAcked(client().admin().indices().prepareDelete(".monitoring-*").get());
|
||||
|
||||
final XPackUsageResponse usageResponse = new XPackUsageRequestBuilder(client()).execute().get();
|
||||
final Optional<MonitoringFeatureSetUsage> monitoringUsage =
|
||||
usageResponse.getUsages()
|
||||
.stream()
|
||||
.filter(usage -> usage instanceof MonitoringFeatureSetUsage)
|
||||
.map(usage -> (MonitoringFeatureSetUsage)usage)
|
||||
.findFirst();
|
||||
assertThat("Monitoring feature set does not exist", monitoringUsage.isPresent(), is(true));
|
||||
|
||||
final Map<String, Object> exporters = monitoringUsage.get().getExporters();
|
||||
|
||||
assertThat("List of enabled exporters must be empty before enabling monitoring", exporters.isEmpty(), is(true));
|
||||
|
||||
final Settings settings = Settings.builder()
|
||||
.put("transient.xpack.monitoring.collection.interval", interval.getStringRep())
|
||||
.put("transient.xpack.monitoring.exporters._local.enabled", true)
|
||||
.put("xpack.monitoring.collection.interval", interval.getStringRep())
|
||||
.put("xpack.monitoring.exporters._local.enabled", true)
|
||||
.build();
|
||||
|
||||
final HttpEntity entity =
|
||||
new StringEntity(toXContent(settings, XContentType.JSON, false).utf8ToString(), ContentType.APPLICATION_JSON);
|
||||
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(settings));
|
||||
|
||||
awaitRestApi("PUT", "/_cluster/settings", emptyMap(), entity,
|
||||
response -> {
|
||||
Boolean acknowledged = (Boolean) response.get("acknowledged");
|
||||
return acknowledged != null && acknowledged;
|
||||
},"Exception when enabling monitoring");
|
||||
|
||||
awaitRestApi("HEAD", "/.monitoring-es-*", singletonMap("allow_no_indices", "false"), null,
|
||||
response -> true,
|
||||
"Exception when waiting for monitoring-es-* index to be created");
|
||||
|
||||
awaitRestApi("GET", "/.monitoring-es-*/_search", emptyMap(), null,
|
||||
response -> {
|
||||
Number hitsTotal = (Number) XContentMapValues.extractRawValues("hits.total", response).get(0);
|
||||
return hitsTotal != null && hitsTotal.intValue() > 0;
|
||||
},"Exception when waiting for monitoring documents to be indexed");
|
||||
assertBusy(() -> {
|
||||
assertThat("No monitoring documents yet",
|
||||
client().prepareSearch(".monitoring-es-" + TEMPLATE_VERSION + "-*")
|
||||
.setSize(0)
|
||||
.get().getHits().getTotalHits(),
|
||||
greaterThan(0L));
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Disable the monitoring service and the Local exporter, waiting for the monitoring indices to
|
||||
* be deleted before it returns.
|
||||
* Disable the monitoring service and the Local exporter.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public static void disableMonitoring() throws Exception {
|
||||
public void disableMonitoring() throws Exception {
|
||||
final Settings settings = Settings.builder()
|
||||
.put("transient.xpack.monitoring.collection.interval", (String) null)
|
||||
.put("transient.xpack.monitoring.exporters._local.enabled", (String) null)
|
||||
.put("xpack.monitoring.collection.interval", (String) null)
|
||||
.put("xpack.monitoring.exporters._local.enabled", (String) null)
|
||||
.build();
|
||||
|
||||
final HttpEntity entity =
|
||||
new StringEntity(toXContent(settings, XContentType.JSON, false).utf8ToString(), ContentType.APPLICATION_JSON);
|
||||
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(settings));
|
||||
|
||||
awaitRestApi("PUT", "/_cluster/settings", emptyMap(), entity,
|
||||
response -> {
|
||||
Boolean acknowledged = (Boolean) response.get("acknowledged");
|
||||
return acknowledged != null && acknowledged;
|
||||
},"Exception when disabling monitoring");
|
||||
|
||||
awaitBusy(() -> {
|
||||
assertBusy(() -> {
|
||||
try {
|
||||
Map<String, Object> response = callRestApi("GET", "/_xpack/usage?filter_path=monitoring.enabled_exporters", 200);
|
||||
final List<?> exporters = XContentMapValues.extractRawValues("monitoring.enabled_exporters", response);
|
||||
final XPackUsageResponse usageResponse = new XPackUsageRequestBuilder(client()).execute().get();
|
||||
final Optional<MonitoringFeatureSetUsage> monitoringUsage =
|
||||
usageResponse.getUsages()
|
||||
.stream()
|
||||
.filter(usage -> usage instanceof MonitoringFeatureSetUsage)
|
||||
.map(usage -> (MonitoringFeatureSetUsage)usage)
|
||||
.findFirst();
|
||||
assertThat("Monitoring feature set does not exist", monitoringUsage.isPresent(), is(true));
|
||||
|
||||
if (exporters.isEmpty() == false) {
|
||||
return false;
|
||||
final Map<String, Object> exporters = monitoringUsage.get().getExporters();
|
||||
|
||||
assertThat("Exporters are not yet stopped", exporters.isEmpty(), is(true));
|
||||
|
||||
// now wait until Monitoring has actually stopped
|
||||
final NodesStatsResponse response = client().admin().cluster().prepareNodesStats().clear().setThreadPool(true).get();
|
||||
|
||||
for (final NodeStats nodeStats : response.getNodes()) {
|
||||
boolean foundBulkThreads = false;
|
||||
|
||||
for(final ThreadPoolStats.Stats threadPoolStats : nodeStats.getThreadPool()) {
|
||||
if (BULK.equals(threadPoolStats.getName())) {
|
||||
foundBulkThreads = true;
|
||||
assertThat("Still some active _bulk threads!", threadPoolStats.getActive(), equalTo(0));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
assertThat("Could not find bulk thread pool", foundBulkThreads, is(true));
|
||||
}
|
||||
response = callRestApi("GET", "/_nodes/_local/stats/thread_pool?filter_path=nodes.*.thread_pool.bulk.active", 200);
|
||||
|
||||
final Map<String, Object> nodes = (Map<String, Object>) response.get("nodes");
|
||||
final Map<String, Object> node = (Map<String, Object>) nodes.values().iterator().next();
|
||||
|
||||
final Number activeBulks = (Number) extractValue("thread_pool.bulk.active", node);
|
||||
return activeBulks != null && activeBulks.longValue() == 0L;
|
||||
} catch (Exception e) {
|
||||
throw new ElasticsearchException("Failed to wait for monitoring exporters to stop:", e);
|
||||
}
|
||||
|
@ -590,95 +593,25 @@ public class MonitoringIT extends ESRestTestCase {
|
|||
}
|
||||
|
||||
/**
|
||||
* Executes a request using {@link org.elasticsearch.client.RestClient}, waiting for it to succeed.
|
||||
* Returns the {@link SearchHit} content as a {@link Map} object.
|
||||
*/
|
||||
private static void awaitRestApi(final String method, final String endpoint, final Map<String, String> params, final HttpEntity entity,
|
||||
final CheckedFunction<Map<String, Object>, Boolean, IOException> success,
|
||||
final String error) throws Exception {
|
||||
private static Map<String, Object> toMap(final ToXContentObject xContentObject) throws IOException {
|
||||
final XContentType xContentType = XContentType.JSON;
|
||||
|
||||
final AtomicReference<IOException> exceptionHolder = new AtomicReference<>();
|
||||
awaitBusy(() -> {
|
||||
try {
|
||||
final Response response = client().performRequest(method, endpoint, params, entity);
|
||||
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
|
||||
exceptionHolder.set(null);
|
||||
final Map<String, Object> map = ("HEAD".equals(method) == false) ? toMap(response.getEntity()) : null;
|
||||
return success.apply(map);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
exceptionHolder.set(e);
|
||||
}
|
||||
return false;
|
||||
});
|
||||
try (XContentBuilder builder = XContentBuilder.builder(xContentType.xContent())) {
|
||||
xContentObject.toXContent(builder, EMPTY_PARAMS);
|
||||
|
||||
IOException exception = exceptionHolder.get();
|
||||
if (exception != null) {
|
||||
throw new IllegalStateException(error, exception);
|
||||
final Map<String, Object> map = XContentHelper.convertToMap(xContentType.xContent(), builder.string(), false);
|
||||
|
||||
// remove extraneous fields not actually wanted from the response
|
||||
map.remove("_score");
|
||||
map.remove("fields");
|
||||
map.remove("sort");
|
||||
|
||||
return map;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes a request using {@link org.elasticsearch.client.RestClient} in a synchronous manner, asserts that the response code
|
||||
* is equal to {@code expectedCode} and then returns the {@link Response} as a {@link Map}.
|
||||
*/
|
||||
private static Map<String, Object> callRestApi(final String method, final String endpoint, final int expectedCode) throws Exception {
|
||||
final Response response = client().performRequest(method, endpoint);
|
||||
assertEquals("Unexpected HTTP response code", expectedCode, response.getStatusLine().getStatusCode());
|
||||
|
||||
return toMap(response.getEntity());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the {@link HttpEntity} content as a {@link Map} object.
|
||||
*/
|
||||
private static Map<String, Object> toMap(final HttpEntity httpEntity) throws IOException {
|
||||
final String contentType = httpEntity.getContentType().getValue();
|
||||
final XContentType xContentType = XContentType.fromMediaTypeOrFormat(contentType);
|
||||
if (xContentType == null) {
|
||||
throw new IOException("Content-type not supported [" + contentType + "]");
|
||||
}
|
||||
return XContentHelper.convertToMap(xContentType.xContent(), httpEntity.getContent(), false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a Monitoring Bulk request and checks that it returns a 400 error with a given message.
|
||||
*
|
||||
* @param parameters the request parameters
|
||||
* @param httpEntity the request body (can be null)
|
||||
* @param matcher a {@link Matcher} to match the message against
|
||||
*/
|
||||
private static void assertBadRequest(final Map<String, String> parameters, final HttpEntity httpEntity, final Matcher<String> matcher) {
|
||||
ResponseException responseException = expectThrows(ResponseException.class, () ->
|
||||
client().performRequest("POST", "/_xpack/monitoring/_bulk", parameters, httpEntity));
|
||||
|
||||
assertThat(responseException.getMessage(), matcher);
|
||||
assertThat(responseException.getResponse().getStatusLine().getStatusCode(), equalTo(HttpStatus.SC_BAD_REQUEST));
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds a map of parameters for the Monitoring Bulk API
|
||||
*/
|
||||
private static Map<String, String> parameters(final String systemId, final String systemApiVersion, final String interval) {
|
||||
final Map<String, String> parameters = new HashMap<>();
|
||||
if (systemId != null) {
|
||||
parameters.put(RestMonitoringBulkAction.MONITORING_ID, systemId);
|
||||
}
|
||||
if (systemApiVersion != null) {
|
||||
parameters.put(RestMonitoringBulkAction.MONITORING_VERSION, systemApiVersion);
|
||||
}
|
||||
if (interval != null) {
|
||||
parameters.put(RestMonitoringBulkAction.INTERVAL, interval);
|
||||
}
|
||||
return parameters;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a {@link String} representing a {@link MonitoredSystem} supported by the Monitoring Bulk API
|
||||
*/
|
||||
private static String randomSystemId() {
|
||||
return randomSystem().getSystem();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a {@link MonitoredSystem} supported by the Monitoring Bulk API
|
||||
*/
|
||||
|
|
|
@ -0,0 +1,223 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.monitoring.rest.action;
|
||||
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.CheckedConsumer;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.rest.RestChannel;
|
||||
import org.elasticsearch.rest.RestController;
|
||||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.rest.RestResponse;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.rest.action.RestBuilderListener;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.rest.FakeRestRequest;
|
||||
import org.elasticsearch.xpack.core.XPackClient;
|
||||
import org.elasticsearch.xpack.core.monitoring.MonitoredSystem;
|
||||
import org.elasticsearch.xpack.core.monitoring.action.MonitoringBulkRequestBuilder;
|
||||
import org.elasticsearch.xpack.core.monitoring.action.MonitoringBulkResponse;
|
||||
import org.elasticsearch.xpack.core.monitoring.client.MonitoringClient;
|
||||
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils.TEMPLATE_VERSION;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class RestMonitoringBulkActionTests extends ESTestCase {
|
||||
|
||||
private final RestController controller = mock(RestController.class);
|
||||
|
||||
private final RestMonitoringBulkAction action = new RestMonitoringBulkAction(Settings.EMPTY, controller);
|
||||
|
||||
public void testGetName() {
|
||||
// Are you sure that you want to change the name?
|
||||
assertThat(action.getName(), is("xpack_monitoring_bulk_action"));
|
||||
}
|
||||
|
||||
public void testSupportsContentStream() {
|
||||
// if you change this, it's a very breaking change for Monitoring
|
||||
assertThat(action.supportsContentStream(), is(true));
|
||||
}
|
||||
|
||||
public void testMissingSystemId() {
|
||||
final RestRequest restRequest = createRestRequest(null, TEMPLATE_VERSION, "10s");
|
||||
|
||||
final IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> prepareRequest(restRequest));
|
||||
assertThat(exception.getMessage(), containsString("no [system_id] for monitoring bulk request"));
|
||||
}
|
||||
|
||||
public void testMissingSystemApiVersion() {
|
||||
final RestRequest restRequest = createRestRequest(randomSystem().getSystem(), null, "10s");
|
||||
|
||||
final IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> prepareRequest(restRequest));
|
||||
assertThat(exception.getMessage(), containsString("no [system_api_version] for monitoring bulk request"));
|
||||
}
|
||||
|
||||
public void testMissingInterval() {
|
||||
final RestRequest restRequest = createRestRequest(randomSystem().getSystem(), TEMPLATE_VERSION, null);
|
||||
|
||||
final IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> prepareRequest(restRequest));
|
||||
assertThat(exception.getMessage(), containsString("no [interval] for monitoring bulk request"));
|
||||
}
|
||||
|
||||
public void testWrongInterval() {
|
||||
final RestRequest restRequest = createRestRequest(randomSystem().getSystem(), TEMPLATE_VERSION, "null");
|
||||
|
||||
final ElasticsearchParseException exception = expectThrows(ElasticsearchParseException.class, () -> prepareRequest(restRequest));
|
||||
assertThat(exception.getMessage(), containsString("failed to parse setting [interval] with value [null]"));
|
||||
}
|
||||
|
||||
public void testMissingContent() {
|
||||
final RestRequest restRequest = createRestRequest(0, randomSystem().getSystem(), TEMPLATE_VERSION, "30s");
|
||||
|
||||
final ElasticsearchParseException exception = expectThrows(ElasticsearchParseException.class, () -> prepareRequest(restRequest));
|
||||
assertThat(exception.getMessage(), containsString("no body content for monitoring bulk request"));
|
||||
}
|
||||
|
||||
public void testUnsupportedSystemVersion() {
|
||||
final String systemApiVersion = randomFrom(TEMPLATE_VERSION, MonitoringTemplateUtils.OLD_TEMPLATE_VERSION);
|
||||
final RestRequest restRequest = createRestRequest(MonitoredSystem.UNKNOWN.getSystem(), systemApiVersion, "30s");
|
||||
|
||||
final IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> prepareRequest(restRequest));
|
||||
assertThat(exception.getMessage(),
|
||||
containsString("system_api_version [" + systemApiVersion + "] is not supported by system_id [unknown]"));
|
||||
}
|
||||
|
||||
public void testUnknownSystemVersion() {
|
||||
final MonitoredSystem system = randomSystem();
|
||||
final RestRequest restRequest = createRestRequest(system.getSystem(), "0", "30s");
|
||||
|
||||
final IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> prepareRequest(restRequest));
|
||||
assertThat(exception.getMessage(),
|
||||
containsString("system_api_version [0] is not supported by system_id [" + system.getSystem() + "]"));
|
||||
}
|
||||
|
||||
public void testNoErrors() throws Exception {
|
||||
final MonitoringBulkResponse response = new MonitoringBulkResponse(randomLong());
|
||||
final FakeRestRequest request = createRestRequest(randomSystemId(), TEMPLATE_VERSION, "10s");
|
||||
final RestResponse restResponse = getRestBuilderListener(request).buildResponse(response);
|
||||
|
||||
assertThat(restResponse.status(), is(RestStatus.OK));
|
||||
assertThat(restResponse.content().utf8ToString(), is("{\"took\":" + response.getTookInMillis() + ",\"errors\":false}"));
|
||||
}
|
||||
|
||||
public void testWithErrors() throws Exception {
|
||||
final RuntimeException e = new RuntimeException("TEST - expected");
|
||||
final MonitoringBulkResponse.Error error = new MonitoringBulkResponse.Error(e);
|
||||
final MonitoringBulkResponse response = new MonitoringBulkResponse(randomLong(), error);
|
||||
final String errorJson;
|
||||
|
||||
final FakeRestRequest request = createRestRequest(randomSystemId(), TEMPLATE_VERSION, "10s");
|
||||
final RestResponse restResponse = getRestBuilderListener(request).buildResponse(response);
|
||||
|
||||
try (XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent())) {
|
||||
error.toXContent(builder, ToXContent.EMPTY_PARAMS);
|
||||
errorJson = builder.string();
|
||||
}
|
||||
|
||||
assertThat(restResponse.status(), is(RestStatus.INTERNAL_SERVER_ERROR));
|
||||
assertThat(restResponse.content().utf8ToString(),
|
||||
is("{\"took\":" + response.getTookInMillis() + ",\"errors\":true,\"error\":" + errorJson + "}"));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a {@link MonitoredSystem} supported by the Monitoring Bulk API
|
||||
*/
|
||||
private static MonitoredSystem randomSystem() {
|
||||
return randomFrom(MonitoredSystem.LOGSTASH, MonitoredSystem.KIBANA, MonitoredSystem.BEATS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a {@link String} representing a {@link MonitoredSystem} supported by the Monitoring Bulk API
|
||||
*/
|
||||
private static String randomSystemId() {
|
||||
return randomSystem().getSystem();
|
||||
}
|
||||
|
||||
private void prepareRequest(final RestRequest restRequest) throws Exception {
|
||||
getRestBuilderListener(restRequest);
|
||||
}
|
||||
|
||||
private RestBuilderListener<MonitoringBulkResponse> getRestBuilderListener(final RestRequest restRequest) throws Exception {
|
||||
final Client client = mock(Client.class);
|
||||
final XPackClient xpackClient = mock(XPackClient.class);
|
||||
final MonitoringClient monitoringClient = mock(MonitoringClient.class);
|
||||
final AtomicReference<RestBuilderListener<MonitoringBulkResponse>> listenerReference = new AtomicReference<>();
|
||||
final MonitoringBulkRequestBuilder builder = new MonitoringBulkRequestBuilder(client){
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public void execute(ActionListener<MonitoringBulkResponse> listener) {
|
||||
listenerReference.set((RestBuilderListener)listener);
|
||||
}
|
||||
};
|
||||
when(monitoringClient.prepareMonitoringBulk()).thenReturn(builder);
|
||||
when(xpackClient.monitoring()).thenReturn(monitoringClient);
|
||||
|
||||
final CheckedConsumer<RestChannel, Exception> consumer = action.doPrepareRequest(restRequest, xpackClient);
|
||||
|
||||
final RestChannel channel = mock(RestChannel.class);
|
||||
when(channel.newBuilder()).thenReturn(JsonXContent.contentBuilder());
|
||||
|
||||
// trigger/capture execution
|
||||
consumer.accept(channel);
|
||||
|
||||
assertThat(listenerReference.get(), not(nullValue()));
|
||||
|
||||
return listenerReference.get();
|
||||
}
|
||||
|
||||
private static FakeRestRequest createRestRequest(final String systemId, final String systemApiVersion, final String interval) {
|
||||
return createRestRequest(randomIntBetween(1, 10), systemId, systemApiVersion, interval);
|
||||
}
|
||||
|
||||
private static FakeRestRequest createRestRequest(final int nbDocs,
|
||||
final String systemId,
|
||||
final String systemApiVersion,
|
||||
final String interval) {
|
||||
final FakeRestRequest.Builder builder = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY);
|
||||
if (nbDocs > 0) {
|
||||
final StringBuilder requestBody = new StringBuilder();
|
||||
for (int n = 0; n < nbDocs; n++) {
|
||||
requestBody.append("{\"index\":{\"_type\":\"_doc\"}}\n");
|
||||
requestBody.append("{\"field\":").append(n).append("}\n");
|
||||
}
|
||||
requestBody.append("\n");
|
||||
builder.withContent(new BytesArray(requestBody.toString()), XContentType.JSON);
|
||||
}
|
||||
|
||||
final Map<String, String> parameters = new HashMap<>();
|
||||
if (systemId != null) {
|
||||
parameters.put(RestMonitoringBulkAction.MONITORING_ID, systemId);
|
||||
}
|
||||
if (systemApiVersion != null) {
|
||||
parameters.put(RestMonitoringBulkAction.MONITORING_VERSION, systemApiVersion);
|
||||
}
|
||||
if (interval != null) {
|
||||
parameters.put(RestMonitoringBulkAction.INTERVAL, interval);
|
||||
}
|
||||
builder.withParams(parameters);
|
||||
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue