diff --git a/plugin/core/src/main/java/org/elasticsearch/xpack/core/monitoring/MonitoringFeatureSetUsage.java b/plugin/core/src/main/java/org/elasticsearch/xpack/core/monitoring/MonitoringFeatureSetUsage.java index d4235515eda..c3f6020f27a 100644 --- a/plugin/core/src/main/java/org/elasticsearch/xpack/core/monitoring/MonitoringFeatureSetUsage.java +++ b/plugin/core/src/main/java/org/elasticsearch/xpack/core/monitoring/MonitoringFeatureSetUsage.java @@ -13,6 +13,7 @@ import org.elasticsearch.xpack.core.XPackFeatureSet; import org.elasticsearch.xpack.core.XPackField; import java.io.IOException; +import java.util.Collections; import java.util.Map; public class MonitoringFeatureSetUsage extends XPackFeatureSet.Usage { @@ -32,6 +33,10 @@ public class MonitoringFeatureSetUsage extends XPackFeatureSet.Usage { this.exporters = exporters; } + public Map getExporters() { + return exporters == null ? Collections.emptyMap() : Collections.unmodifiableMap(exporters); + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/action/TransportMonitoringBulkActionTests.java b/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/action/TransportMonitoringBulkActionTests.java index 3217d55cb67..7210fd0f26f 100644 --- a/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/action/TransportMonitoringBulkActionTests.java +++ b/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/action/TransportMonitoringBulkActionTests.java @@ -84,7 +84,7 @@ public class TransportMonitoringBulkActionTests extends ESTestCase { @Before @SuppressWarnings("unchecked") - public void setUpMocks() throws Exception { + public void setUpMocks() { listener = mock(ActionListener.class); exporters = mock(Exporters.class); threadPool = mock(ThreadPool.class); @@ -113,7 +113,7 @@ public class TransportMonitoringBulkActionTests extends ESTestCase { assertThat(e, hasToString(containsString("ClusterBlockException[blocked by: [SERVICE_UNAVAILABLE/2/no master]"))); } - public void testExecuteEmptyRequest() throws Exception { + public void testExecuteEmptyRequest() { final TransportMonitoringBulkAction action = new TransportMonitoringBulkAction(Settings.EMPTY, threadPool, clusterService, transportService, filters, resolver, exporters); @@ -184,7 +184,7 @@ public class TransportMonitoringBulkActionTests extends ESTestCase { verify(clusterService).localNode(); } - public void testAsyncActionCreateMonitoringDocsWithNoDocs() throws Exception { + public void testAsyncActionCreateMonitoringDocsWithNoDocs() { final Collection bulkDocs = new ArrayList<>(); if (randomBoolean()) { final int nbDocs = randomIntBetween(1, 50); @@ -202,7 +202,7 @@ public class TransportMonitoringBulkActionTests extends ESTestCase { assertThat(results.size(), equalTo(0)); } - public void testAsyncActionCreateMonitoringDocs() throws Exception { + public void testAsyncActionCreateMonitoringDocs() { final List docs = new ArrayList<>(); final MonitoredSystem system = randomFrom(MonitoredSystem.KIBANA, MonitoredSystem.LOGSTASH, MonitoredSystem.BEATS); @@ -243,7 +243,7 @@ public class TransportMonitoringBulkActionTests extends ESTestCase { }); } - public void testAsyncActionCreateMonitoringDocWithNoTimestamp() throws Exception { + public void testAsyncActionCreateMonitoringDocWithNoTimestamp() { final MonitoringBulkDoc monitoringBulkDoc = new MonitoringBulkDoc(MonitoredSystem.LOGSTASH, "_type", "_id", 0L, 0L, BytesArray.EMPTY, XContentType.JSON); @@ -293,7 +293,7 @@ public class TransportMonitoringBulkActionTests extends ESTestCase { } @SuppressWarnings("unchecked") - public void testAsyncActionExecuteExport() throws Exception { + public void testAsyncActionExecuteExport() { final int nbDocs = randomIntBetween(1, 25); final Collection docs = new ArrayList<>(nbDocs); for (int i = 0; i < nbDocs; i++) { @@ -318,7 +318,7 @@ public class TransportMonitoringBulkActionTests extends ESTestCase { } @SuppressWarnings("unchecked") - public void testAsyncActionExportThrowsException() throws Exception { + public void testAsyncActionExportThrowsException() { final int nbDocs = randomIntBetween(1, 25); final Collection docs = new ArrayList<>(nbDocs); for (int i = 0; i < nbDocs; i++) { diff --git a/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/integration/MonitoringIT.java b/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/integration/MonitoringIT.java index 713d1d4637d..845b128cf38 100644 --- a/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/integration/MonitoringIT.java +++ b/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/integration/MonitoringIT.java @@ -5,60 +5,71 @@ */ package org.elasticsearch.xpack.monitoring.integration; -import org.apache.http.HttpEntity; -import org.apache.http.HttpStatus; -import org.apache.http.entity.ContentType; -import org.apache.http.entity.StringEntity; -import org.apache.http.nio.entity.NStringEntity; import org.apache.lucene.util.Constants; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; -import org.elasticsearch.client.Response; -import org.elasticsearch.client.ResponseException; -import org.elasticsearch.common.CheckedFunction; +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; +import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; +import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.analysis.common.CommonAnalysisPlugin; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.license.License; -import org.elasticsearch.test.rest.ESRestTestCase; -import org.elasticsearch.xpack.core.monitoring.MonitoredSystem; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.collapse.CollapseBuilder; +import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.threadpool.ThreadPoolStats; +import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.action.XPackUsageRequestBuilder; +import org.elasticsearch.xpack.core.action.XPackUsageResponse; +import org.elasticsearch.xpack.core.monitoring.action.MonitoringBulkRequestBuilder; +import org.elasticsearch.xpack.core.monitoring.action.MonitoringBulkResponse; import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils; +import org.elasticsearch.xpack.core.monitoring.MonitoredSystem; +import org.elasticsearch.xpack.core.monitoring.MonitoringFeatureSetUsage; +import org.elasticsearch.xpack.monitoring.LocalStateMonitoring; import org.elasticsearch.xpack.monitoring.collector.cluster.ClusterStatsMonitoringDoc; import org.elasticsearch.xpack.monitoring.collector.indices.IndexRecoveryMonitoringDoc; import org.elasticsearch.xpack.monitoring.collector.indices.IndexStatsMonitoringDoc; import org.elasticsearch.xpack.monitoring.collector.indices.IndicesStatsMonitoringDoc; import org.elasticsearch.xpack.monitoring.collector.node.NodeStatsMonitoringDoc; import org.elasticsearch.xpack.monitoring.collector.shards.ShardMonitoringDoc; -import org.elasticsearch.xpack.monitoring.rest.action.RestMonitoringBulkAction; -import org.hamcrest.Matcher; +import org.elasticsearch.xpack.monitoring.test.MockIngestPlugin; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.ISODateTimeFormat; import java.io.IOException; -import java.util.HashMap; +import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; -import static java.util.Collections.emptyMap; -import static java.util.Collections.singletonMap; -import static org.elasticsearch.common.xcontent.XContentHelper.toXContent; +import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS; import static org.elasticsearch.common.xcontent.support.XContentMapValues.extractValue; -import static org.elasticsearch.test.SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING; -import static org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils.TEMPLATE_VERSION; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.threadpool.ThreadPool.Names.BULK; import static org.elasticsearch.xpack.monitoring.collector.cluster.ClusterStatsMonitoringDoc.hash; -import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; +import static org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils.TEMPLATE_VERSION; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.isEmptyOrNullString; import static org.hamcrest.Matchers.isOneOf; @@ -66,74 +77,35 @@ import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/x-pack-elasticsearch/issues/3728") -public class MonitoringIT extends ESRestTestCase { - - private static final String BASIC_AUTH_VALUE = basicAuthHeaderValue("x_pack_rest_user", TEST_PASSWORD_SECURE_STRING); +public class MonitoringIT extends ESSingleNodeTestCase { private final TimeValue collectionInterval = TimeValue.timeValueSeconds(3); @Override - protected Settings restClientSettings() { + protected Settings nodeSettings() { return Settings.builder() - .put(super.restClientSettings()) - .put(ThreadContext.PREFIX + ".Authorization", BASIC_AUTH_VALUE) + .put(super.nodeSettings()) + .put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false) + .put("xpack.monitoring.collection.interval", "-1") + .put("xpack.monitoring.exporters._local.type", "local") + .put("xpack.monitoring.exporters._local.enabled", false) + .put("xpack.monitoring.exporters._local.cluster_alerts.management.enabled", false) .build(); } - private HttpEntity createBulkEntity() { - final StringBuilder bulk = new StringBuilder(); - bulk.append("{\"index\":{\"_type\":\"test\"}}\n"); - bulk.append("{\"foo\":{\"bar\":0}}\n"); - bulk.append("{\"index\":{\"_type\":\"test\"}}\n"); - bulk.append("{\"foo\":{\"bar\":1}}\n"); - bulk.append("{\"index\":{\"_type\":\"test\"}}\n"); - bulk.append("{\"foo\":{\"bar\":2}}\n"); - bulk.append("\n"); - return new NStringEntity(bulk.toString(), ContentType.APPLICATION_JSON); + @Override + protected Collection> getPlugins() { + return Arrays.asList(LocalStateMonitoring.class, MockIngestPlugin.class, CommonAnalysisPlugin.class); } - public void testMonitoringBulkApiWithMissingSystemId() throws IOException { - final Map parameters = parameters(null, TEMPLATE_VERSION, "10s"); - - assertBadRequest(parameters, createBulkEntity(), containsString("no [system_id] for monitoring bulk request")); - } - - public void testMonitoringBulkApiWithMissingSystemApiVersion() throws IOException { - final Map parameters = parameters(randomSystemId(), null, "10s"); - - assertBadRequest(parameters, createBulkEntity(), containsString("no [system_api_version] for monitoring bulk request")); - } - - public void testMonitoringBulkApiWithMissingInterval() throws IOException { - final Map parameters = parameters(randomSystemId(), TEMPLATE_VERSION, null); - - assertBadRequest(parameters, createBulkEntity(), containsString("no [interval] for monitoring bulk request")); - } - - public void testMonitoringBulkApiWithWrongInterval() throws IOException { - final Map parameters = parameters(randomSystemId(), TEMPLATE_VERSION, "null"); - - assertBadRequest(parameters, createBulkEntity(), containsString("failed to parse setting [interval] with value [null]")); - } - - public void testMonitoringBulkApiWithMissingContent() throws IOException { - final Map parameters = parameters(randomSystemId(), TEMPLATE_VERSION, "30s"); - - assertBadRequest(parameters, null, containsString("no body content for monitoring bulk request")); - } - - public void testMonitoringBulkApiWithUnsupportedSystemVersion() throws IOException { - final String systemId = randomSystemId(); - final String systemApiVersion = randomFrom(TEMPLATE_VERSION, MonitoringTemplateUtils.OLD_TEMPLATE_VERSION); - - Map parameters = parameters(MonitoredSystem.UNKNOWN.getSystem(), systemApiVersion, "30s"); - assertBadRequest(parameters, createBulkEntity(), - containsString("system_api_version ["+ systemApiVersion + "] is not supported by system_id [unknown]")); - - parameters = parameters(systemId, "0", "30s"); - assertBadRequest(parameters, createBulkEntity(), - containsString("system_api_version [0] is not supported by system_id [" + systemId + "]")); + private String createBulkEntity() { + return "{\"index\":{\"_type\":\"test\"}}\n" + + "{\"foo\":{\"bar\":0}}\n" + + "{\"index\":{\"_type\":\"test\"}}\n" + + "{\"foo\":{\"bar\":1}}\n" + + "{\"index\":{\"_type\":\"test\"}}\n" + + "{\"foo\":{\"bar\":2}}\n" + + "\n"; } /** @@ -148,38 +120,63 @@ public class MonitoringIT extends ESRestTestCase { final MonitoredSystem system = randomSystem(); final TimeValue interval = TimeValue.timeValueSeconds(randomIntBetween(1, 20)); + // REST is the realistic way that these operations happen, so it's the most realistic way to integration test it too // Use Monitoring Bulk API to index 3 documents - Response bulkResponse = client().performRequest("POST", "/_xpack/monitoring/_bulk", - parameters(system.getSystem(), TEMPLATE_VERSION, interval.getStringRep()), - createBulkEntity()); + //final Response bulkResponse = getRestClient().performRequest("POST", "/_xpack/monitoring/_bulk", + // parameters, createBulkEntity()); - assertThat(bulkResponse.getStatusLine().getStatusCode(), equalTo(HttpStatus.SC_OK)); - assertThat(toMap(bulkResponse.getEntity()).get("errors"), equalTo(false)); + final MonitoringBulkResponse bulkResponse = + new MonitoringBulkRequestBuilder(client()) + .add(system, null, new BytesArray(createBulkEntity().getBytes("UTF-8")), XContentType.JSON, + System.currentTimeMillis(), interval.millis()) + .get(); - final String indexPrefix = ".monitoring-" + system.getSystem() + "-" + TEMPLATE_VERSION; + assertThat(bulkResponse.status(), is(RestStatus.OK)); + assertThat(bulkResponse.getError(), nullValue()); + + final String monitoringIndex = ".monitoring-" + system.getSystem() + "-" + TEMPLATE_VERSION + "-*"; // Wait for the monitoring index to be created - awaitRestApi("GET", "/" + indexPrefix + "-*/_search", singletonMap("size", "0"), null, - resp -> { - Number hitsTotal = (Number) extractValue("hits.total", resp); - return hitsTotal != null && hitsTotal.intValue() >= 3; - }, "Exception when waiting for monitoring bulk documents to be indexed"); + assertBusy(() -> { + assertThat(client().admin().indices().prepareRefresh(monitoringIndex).get().getStatus(), is(RestStatus.OK)); - Response searchResponse = client().performRequest("GET", "/" + indexPrefix + "-*/_search"); - final Map results = toMap(searchResponse.getEntity()); + final SearchResponse response = + client().prepareSearch(".monitoring-" + system.getSystem() + "-" + TEMPLATE_VERSION + "-*") + .get(); - final Map hits = (Map) results.get("hits"); - final List> searchHits = (List>) hits.get("hits"); - assertEquals(3, searchHits.size()); + // exactly 3 results are expected + assertThat("No monitoring documents yet", response.getHits().getTotalHits(), equalTo(3L)); - assertEquals("Monitoring documents must have the same timestamp", - 1, searchHits.stream().map(map -> extractValue("_source.timestamp", map)).distinct().count()); + final List> sources = + Arrays.stream(response.getHits().getHits()) + .map(SearchHit::getSourceAsMap) + .collect(Collectors.toList()); - assertEquals("Monitoring documents must have the same source_node timestamp", - 1, searchHits.stream().map(map -> extractValue("_source.source_node.timestamp", map)).distinct().count()); + // find distinct _source.timestamp fields + assertThat(sources.stream().map(source -> source.get("timestamp")).distinct().count(), is(1L)); + // find distinct _source.source_node fields (which is a map) + assertThat(sources.stream().map(source -> source.get("source_node")).distinct().count(), is(1L)); + }); - for (Map searchHit : searchHits) { - assertMonitoringDoc(searchHit, system, "test", interval); + final SearchResponse response = client().prepareSearch(monitoringIndex).get(); + final SearchHits hits = response.getHits(); + + assertThat(response.getHits().getTotalHits(), equalTo(3L)); + assertThat("Monitoring documents must have the same timestamp", + Arrays.stream(hits.getHits()) + .map(hit -> extractValue("timestamp", hit.getSourceAsMap())) + .distinct() + .count(), + equalTo(1L)); + assertThat("Monitoring documents must have the same source_node timestamp", + Arrays.stream(hits.getHits()) + .map(hit -> extractValue("source_node.timestamp", hit.getSourceAsMap())) + .distinct() + .count(), + equalTo(1L)); + + for (final SearchHit hit : hits.getHits()) { + assertMonitoringDoc(toMap(hit), system, "test", interval); } }); } @@ -194,36 +191,50 @@ public class MonitoringIT extends ESRestTestCase { public void testMonitoringService() throws Exception { final boolean createAPMIndex = randomBoolean(); final String indexName = createAPMIndex ? "apm-2017.11.06" : "books"; - final HttpEntity document = new StringEntity("{\"field\":\"value\"}", ContentType.APPLICATION_JSON); - assertThat(client().performRequest("POST", "/" + indexName + "/doc/0", singletonMap("refresh", "true"), document) - .getStatusLine().getStatusCode(), equalTo(HttpStatus.SC_CREATED)); + + assertThat(client().prepareIndex(indexName, "doc", "0") + .setRefreshPolicy("true") + .setSource("{\"field\":\"value\"}", XContentType.JSON) + .get() + .status(), + is(RestStatus.CREATED)); whenExportersAreReady(() -> { - final Response searchResponse = client().performRequest("GET", "/.monitoring-es-*/_search", singletonMap("size", "100")); - assertThat(searchResponse.getStatusLine().getStatusCode(), equalTo(HttpStatus.SC_OK)); + final AtomicReference searchResponse = new AtomicReference<>(); - final Map results = toMap(searchResponse.getEntity()); + assertBusy(() -> { + final SearchResponse response = + client().prepareSearch(".monitoring-es-*") + .setCollapse(new CollapseBuilder("type")) + .addSort("timestamp", SortOrder.DESC) + .get(); - final Map hits = (Map) results.get("hits"); - assertThat("Expecting a minimum number of 6 docs, one per collector", (Integer) hits.get("total"), greaterThanOrEqualTo(6)); + assertThat(response.status(), is(RestStatus.OK)); + assertThat("Expecting a minimum number of 6 docs, one per collector", + response.getHits().getHits().length, + greaterThanOrEqualTo(6)); - final List> searchHits = (List>) hits.get("hits"); + searchResponse.set(response); + }); - for (Map searchHit : searchHits) { + for (final SearchHit hit : searchResponse.get().getHits()) { + final Map searchHit = toMap(hit); final String type = (String) extractValue("_source.type", searchHit); + assertMonitoringDoc(searchHit, MonitoredSystem.ES, type, collectionInterval); + if (ClusterStatsMonitoringDoc.TYPE.equals(type)) { - assertClusterStatsMonitoringDoc(searchHit, collectionInterval, createAPMIndex); + assertClusterStatsMonitoringDoc(searchHit, createAPMIndex); } else if (IndexRecoveryMonitoringDoc.TYPE.equals(type)) { - assertIndexRecoveryMonitoringDoc(searchHit, collectionInterval); + assertIndexRecoveryMonitoringDoc(searchHit); } else if (IndicesStatsMonitoringDoc.TYPE.equals(type)) { - assertIndicesStatsMonitoringDoc(searchHit, collectionInterval); + assertIndicesStatsMonitoringDoc(searchHit); } else if (IndexStatsMonitoringDoc.TYPE.equals(type)) { - assertIndexStatsMonitoringDoc(searchHit, collectionInterval); + assertIndexStatsMonitoringDoc(searchHit); } else if (NodeStatsMonitoringDoc.TYPE.equals(type)) { - assertNodeStatsMonitoringDoc(searchHit, collectionInterval); + assertNodeStatsMonitoringDoc(searchHit); } else if (ShardMonitoringDoc.TYPE.equals(type)) { - assertShardMonitoringDoc(searchHit, collectionInterval); + assertShardMonitoringDoc(searchHit); } else { fail("Monitoring document of type [" + type + "] is not supported by this test"); } @@ -236,11 +247,11 @@ public class MonitoringIT extends ESRestTestCase { * all monitoring documents must have */ @SuppressWarnings("unchecked") - private static void assertMonitoringDoc(final Map document, - final MonitoredSystem expectedSystem, - final String expectedType, - final TimeValue interval) throws Exception { - assertEquals(5, document.size()); + private void assertMonitoringDoc(final Map document, + final MonitoredSystem expectedSystem, + final String expectedType, + final TimeValue interval) { + assertEquals(document.toString(),4, document.size()); final String index = (String) document.get("_index"); assertThat(index, containsString(".monitoring-" + expectedSystem.getSystem() + "-" + TEMPLATE_VERSION + "-")); @@ -272,25 +283,20 @@ public class MonitoringIT extends ESRestTestCase { * the current local node information */ @SuppressWarnings("unchecked") - private static void assertMonitoringDocSourceNode(final Map sourceNode) throws Exception { + private void assertMonitoringDocSourceNode(final Map sourceNode) { assertEquals(6, sourceNode.size()); - Map filterPath = singletonMap("filter_path", "nodes.*.name,nodes.*.transport_address,nodes.*.host,nodes.*.ip"); - final Response nodesResponse = client().performRequest("GET", "/_nodes", filterPath); + final NodesInfoResponse nodesResponse = client().admin().cluster().prepareNodesInfo().clear().get(); - final Map nodes = (Map) toMap(nodesResponse.getEntity()).get("nodes"); - assertEquals(1, nodes.size()); + assertEquals(1, nodesResponse.getNodes().size()); - final String nodeId = nodes.keySet().iterator().next(); + final DiscoveryNode node = nodesResponse.getNodes().stream().findFirst().get().getNode(); - @SuppressWarnings("unchecked") - final Map node = (Map) nodes.get(nodeId); - - assertThat(sourceNode.get("uuid"), equalTo(nodeId)); - assertThat(sourceNode.get("host"), equalTo(node.get("host"))); - assertThat(sourceNode.get("transport_address"),equalTo(node.get("transport_address"))); - assertThat(sourceNode.get("ip"), equalTo(node.get("ip"))); - assertThat(sourceNode.get("name"), equalTo(node.get("name"))); + assertThat(sourceNode.get("uuid"), equalTo(node.getId())); + assertThat(sourceNode.get("host"), equalTo(node.getHostName())); + assertThat(sourceNode.get("transport_address"),equalTo(node.getAddress().toString())); + assertThat(sourceNode.get("ip"), equalTo(node.getAddress().getAddress())); + assertThat(sourceNode.get("name"), equalTo(node.getName())); assertThat((String) sourceNode.get("timestamp"), not(isEmptyOrNullString())); } @@ -298,11 +304,8 @@ public class MonitoringIT extends ESRestTestCase { * Assert that a {@link ClusterStatsMonitoringDoc} contains the expected information */ @SuppressWarnings("unchecked") - private static void assertClusterStatsMonitoringDoc(final Map document, - final TimeValue interval, - final boolean apmIndicesExist) throws Exception { - assertMonitoringDoc(document, MonitoredSystem.ES, ClusterStatsMonitoringDoc.TYPE, interval); - + private void assertClusterStatsMonitoringDoc(final Map document, + final boolean apmIndicesExist) { final Map source = (Map) document.get("_source"); assertEquals(11, source.size()); @@ -373,9 +376,7 @@ public class MonitoringIT extends ESRestTestCase { * Assert that a {@link IndexRecoveryMonitoringDoc} contains the expected information */ @SuppressWarnings("unchecked") - private static void assertIndexRecoveryMonitoringDoc(final Map document, final TimeValue interval) throws Exception { - assertMonitoringDoc(document, MonitoredSystem.ES, IndexRecoveryMonitoringDoc.TYPE, interval); - + private void assertIndexRecoveryMonitoringDoc(final Map document) { final Map source = (Map) document.get("_source"); assertEquals(6, source.size()); @@ -390,9 +391,7 @@ public class MonitoringIT extends ESRestTestCase { * Assert that a {@link IndicesStatsMonitoringDoc} contains the expected information */ @SuppressWarnings("unchecked") - private static void assertIndicesStatsMonitoringDoc(final Map document, final TimeValue interval) throws Exception { - assertMonitoringDoc(document, MonitoredSystem.ES, IndicesStatsMonitoringDoc.TYPE, interval); - + private void assertIndicesStatsMonitoringDoc(final Map document) { final Map source = (Map) document.get("_source"); assertEquals(6, source.size()); @@ -407,9 +406,7 @@ public class MonitoringIT extends ESRestTestCase { * Assert that a {@link IndexStatsMonitoringDoc} contains the expected information */ @SuppressWarnings("unchecked") - private static void assertIndexStatsMonitoringDoc(final Map document, final TimeValue interval) throws Exception { - assertMonitoringDoc(document, MonitoredSystem.ES, IndexStatsMonitoringDoc.TYPE, interval); - + private void assertIndexStatsMonitoringDoc(final Map document) { final Map source = (Map) document.get("_source"); assertEquals(6, source.size()); @@ -418,7 +415,7 @@ public class MonitoringIT extends ESRestTestCase { assertEquals(8, indexStats.size()); assertThat((String) indexStats.get("index"), not(isEmptyOrNullString())); assertThat((String) indexStats.get("uuid"), not(isEmptyOrNullString())); - assertThat((Long) indexStats.get("created"), notNullValue()); + assertThat(indexStats.get("created"), notNullValue()); assertThat((String) indexStats.get("status"), not(isEmptyOrNullString())); assertThat(indexStats.get("version"), notNullValue()); final Map version = (Map) indexStats.get("version"); @@ -437,9 +434,7 @@ public class MonitoringIT extends ESRestTestCase { * Assert that a {@link NodeStatsMonitoringDoc} contains the expected information */ @SuppressWarnings("unchecked") - private static void assertNodeStatsMonitoringDoc(final Map document, final TimeValue interval) throws Exception { - assertMonitoringDoc(document, MonitoredSystem.ES, NodeStatsMonitoringDoc.TYPE, interval); - + private void assertNodeStatsMonitoringDoc(final Map document) { final Map source = (Map) document.get("_source"); assertEquals(6, source.size()); @@ -468,9 +463,7 @@ public class MonitoringIT extends ESRestTestCase { * Assert that a {@link ShardMonitoringDoc} contains the expected information */ @SuppressWarnings("unchecked") - private static void assertShardMonitoringDoc(final Map document, final TimeValue interval) throws Exception { - assertMonitoringDoc(document, MonitoredSystem.ES, ShardMonitoringDoc.TYPE, interval); - + private void assertShardMonitoringDoc(final Map document) { final Map source = (Map) document.get("_source"); assertEquals(7, source.size()); assertThat(source.get("state_uuid"), notNullValue()); @@ -517,72 +510,82 @@ public class MonitoringIT extends ESRestTestCase { * Enable the monitoring service and the Local exporter, waiting for some monitoring documents * to be indexed before it returns. */ - public static void enableMonitoring(final TimeValue interval) throws Exception { - final Map exporters = callRestApi("GET", "/_xpack/usage?filter_path=monitoring.enabled_exporters", 200); - assertNotNull("List of monitoring exporters must not be null", exporters); - assertThat("List of enabled exporters must be empty before enabling monitoring", - XContentMapValues.extractRawValues("monitoring.enabled_exporters", exporters), hasSize(0)); + public void enableMonitoring(final TimeValue interval) throws Exception { + // delete anything that may happen to already exist + assertAcked(client().admin().indices().prepareDelete(".monitoring-*").get()); + + final XPackUsageResponse usageResponse = new XPackUsageRequestBuilder(client()).execute().get(); + final Optional monitoringUsage = + usageResponse.getUsages() + .stream() + .filter(usage -> usage instanceof MonitoringFeatureSetUsage) + .map(usage -> (MonitoringFeatureSetUsage)usage) + .findFirst(); + assertThat("Monitoring feature set does not exist", monitoringUsage.isPresent(), is(true)); + + final Map exporters = monitoringUsage.get().getExporters(); + + assertThat("List of enabled exporters must be empty before enabling monitoring", exporters.isEmpty(), is(true)); final Settings settings = Settings.builder() - .put("transient.xpack.monitoring.collection.interval", interval.getStringRep()) - .put("transient.xpack.monitoring.exporters._local.enabled", true) + .put("xpack.monitoring.collection.interval", interval.getStringRep()) + .put("xpack.monitoring.exporters._local.enabled", true) .build(); - final HttpEntity entity = - new StringEntity(toXContent(settings, XContentType.JSON, false).utf8ToString(), ContentType.APPLICATION_JSON); + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(settings)); - awaitRestApi("PUT", "/_cluster/settings", emptyMap(), entity, - response -> { - Boolean acknowledged = (Boolean) response.get("acknowledged"); - return acknowledged != null && acknowledged; - },"Exception when enabling monitoring"); - - awaitRestApi("HEAD", "/.monitoring-es-*", singletonMap("allow_no_indices", "false"), null, - response -> true, - "Exception when waiting for monitoring-es-* index to be created"); - - awaitRestApi("GET", "/.monitoring-es-*/_search", emptyMap(), null, - response -> { - Number hitsTotal = (Number) XContentMapValues.extractRawValues("hits.total", response).get(0); - return hitsTotal != null && hitsTotal.intValue() > 0; - },"Exception when waiting for monitoring documents to be indexed"); + assertBusy(() -> { + assertThat("No monitoring documents yet", + client().prepareSearch(".monitoring-es-" + TEMPLATE_VERSION + "-*") + .setSize(0) + .get().getHits().getTotalHits(), + greaterThan(0L)); + }); } /** - * Disable the monitoring service and the Local exporter, waiting for the monitoring indices to - * be deleted before it returns. + * Disable the monitoring service and the Local exporter. */ @SuppressWarnings("unchecked") - public static void disableMonitoring() throws Exception { + public void disableMonitoring() throws Exception { final Settings settings = Settings.builder() - .put("transient.xpack.monitoring.collection.interval", (String) null) - .put("transient.xpack.monitoring.exporters._local.enabled", (String) null) + .put("xpack.monitoring.collection.interval", (String) null) + .put("xpack.monitoring.exporters._local.enabled", (String) null) .build(); - final HttpEntity entity = - new StringEntity(toXContent(settings, XContentType.JSON, false).utf8ToString(), ContentType.APPLICATION_JSON); + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(settings)); - awaitRestApi("PUT", "/_cluster/settings", emptyMap(), entity, - response -> { - Boolean acknowledged = (Boolean) response.get("acknowledged"); - return acknowledged != null && acknowledged; - },"Exception when disabling monitoring"); - - awaitBusy(() -> { + assertBusy(() -> { try { - Map response = callRestApi("GET", "/_xpack/usage?filter_path=monitoring.enabled_exporters", 200); - final List exporters = XContentMapValues.extractRawValues("monitoring.enabled_exporters", response); + final XPackUsageResponse usageResponse = new XPackUsageRequestBuilder(client()).execute().get(); + final Optional monitoringUsage = + usageResponse.getUsages() + .stream() + .filter(usage -> usage instanceof MonitoringFeatureSetUsage) + .map(usage -> (MonitoringFeatureSetUsage)usage) + .findFirst(); + assertThat("Monitoring feature set does not exist", monitoringUsage.isPresent(), is(true)); - if (exporters.isEmpty() == false) { - return false; + final Map exporters = monitoringUsage.get().getExporters(); + + assertThat("Exporters are not yet stopped", exporters.isEmpty(), is(true)); + + // now wait until Monitoring has actually stopped + final NodesStatsResponse response = client().admin().cluster().prepareNodesStats().clear().setThreadPool(true).get(); + + for (final NodeStats nodeStats : response.getNodes()) { + boolean foundBulkThreads = false; + + for(final ThreadPoolStats.Stats threadPoolStats : nodeStats.getThreadPool()) { + if (BULK.equals(threadPoolStats.getName())) { + foundBulkThreads = true; + assertThat("Still some active _bulk threads!", threadPoolStats.getActive(), equalTo(0)); + break; + } + } + + assertThat("Could not find bulk thread pool", foundBulkThreads, is(true)); } - response = callRestApi("GET", "/_nodes/_local/stats/thread_pool?filter_path=nodes.*.thread_pool.bulk.active", 200); - - final Map nodes = (Map) response.get("nodes"); - final Map node = (Map) nodes.values().iterator().next(); - - final Number activeBulks = (Number) extractValue("thread_pool.bulk.active", node); - return activeBulks != null && activeBulks.longValue() == 0L; } catch (Exception e) { throw new ElasticsearchException("Failed to wait for monitoring exporters to stop:", e); } @@ -590,95 +593,25 @@ public class MonitoringIT extends ESRestTestCase { } /** - * Executes a request using {@link org.elasticsearch.client.RestClient}, waiting for it to succeed. + * Returns the {@link SearchHit} content as a {@link Map} object. */ - private static void awaitRestApi(final String method, final String endpoint, final Map params, final HttpEntity entity, - final CheckedFunction, Boolean, IOException> success, - final String error) throws Exception { + private static Map toMap(final ToXContentObject xContentObject) throws IOException { + final XContentType xContentType = XContentType.JSON; - final AtomicReference exceptionHolder = new AtomicReference<>(); - awaitBusy(() -> { - try { - final Response response = client().performRequest(method, endpoint, params, entity); - if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { - exceptionHolder.set(null); - final Map map = ("HEAD".equals(method) == false) ? toMap(response.getEntity()) : null; - return success.apply(map); - } - } catch (IOException e) { - exceptionHolder.set(e); - } - return false; - }); + try (XContentBuilder builder = XContentBuilder.builder(xContentType.xContent())) { + xContentObject.toXContent(builder, EMPTY_PARAMS); - IOException exception = exceptionHolder.get(); - if (exception != null) { - throw new IllegalStateException(error, exception); + final Map map = XContentHelper.convertToMap(xContentType.xContent(), builder.string(), false); + + // remove extraneous fields not actually wanted from the response + map.remove("_score"); + map.remove("fields"); + map.remove("sort"); + + return map; } } - /** - * Executes a request using {@link org.elasticsearch.client.RestClient} in a synchronous manner, asserts that the response code - * is equal to {@code expectedCode} and then returns the {@link Response} as a {@link Map}. - */ - private static Map callRestApi(final String method, final String endpoint, final int expectedCode) throws Exception { - final Response response = client().performRequest(method, endpoint); - assertEquals("Unexpected HTTP response code", expectedCode, response.getStatusLine().getStatusCode()); - - return toMap(response.getEntity()); - } - - /** - * Returns the {@link HttpEntity} content as a {@link Map} object. - */ - private static Map toMap(final HttpEntity httpEntity) throws IOException { - final String contentType = httpEntity.getContentType().getValue(); - final XContentType xContentType = XContentType.fromMediaTypeOrFormat(contentType); - if (xContentType == null) { - throw new IOException("Content-type not supported [" + contentType + "]"); - } - return XContentHelper.convertToMap(xContentType.xContent(), httpEntity.getContent(), false); - } - - /** - * Execute a Monitoring Bulk request and checks that it returns a 400 error with a given message. - * - * @param parameters the request parameters - * @param httpEntity the request body (can be null) - * @param matcher a {@link Matcher} to match the message against - */ - private static void assertBadRequest(final Map parameters, final HttpEntity httpEntity, final Matcher matcher) { - ResponseException responseException = expectThrows(ResponseException.class, () -> - client().performRequest("POST", "/_xpack/monitoring/_bulk", parameters, httpEntity)); - - assertThat(responseException.getMessage(), matcher); - assertThat(responseException.getResponse().getStatusLine().getStatusCode(), equalTo(HttpStatus.SC_BAD_REQUEST)); - } - - /** - * Builds a map of parameters for the Monitoring Bulk API - */ - private static Map parameters(final String systemId, final String systemApiVersion, final String interval) { - final Map parameters = new HashMap<>(); - if (systemId != null) { - parameters.put(RestMonitoringBulkAction.MONITORING_ID, systemId); - } - if (systemApiVersion != null) { - parameters.put(RestMonitoringBulkAction.MONITORING_VERSION, systemApiVersion); - } - if (interval != null) { - parameters.put(RestMonitoringBulkAction.INTERVAL, interval); - } - return parameters; - } - - /** - * Returns a {@link String} representing a {@link MonitoredSystem} supported by the Monitoring Bulk API - */ - private static String randomSystemId() { - return randomSystem().getSystem(); - } - /** * Returns a {@link MonitoredSystem} supported by the Monitoring Bulk API */ diff --git a/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/rest/action/RestMonitoringBulkActionTests.java b/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/rest/action/RestMonitoringBulkActionTests.java new file mode 100644 index 00000000000..5cb49831adc --- /dev/null +++ b/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/rest/action/RestMonitoringBulkActionTests.java @@ -0,0 +1,223 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.monitoring.rest.action; + +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.action.RestBuilderListener; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.rest.FakeRestRequest; +import org.elasticsearch.xpack.core.XPackClient; +import org.elasticsearch.xpack.core.monitoring.MonitoredSystem; +import org.elasticsearch.xpack.core.monitoring.action.MonitoringBulkRequestBuilder; +import org.elasticsearch.xpack.core.monitoring.action.MonitoringBulkResponse; +import org.elasticsearch.xpack.core.monitoring.client.MonitoringClient; +import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import static org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils.TEMPLATE_VERSION; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class RestMonitoringBulkActionTests extends ESTestCase { + + private final RestController controller = mock(RestController.class); + + private final RestMonitoringBulkAction action = new RestMonitoringBulkAction(Settings.EMPTY, controller); + + public void testGetName() { + // Are you sure that you want to change the name? + assertThat(action.getName(), is("xpack_monitoring_bulk_action")); + } + + public void testSupportsContentStream() { + // if you change this, it's a very breaking change for Monitoring + assertThat(action.supportsContentStream(), is(true)); + } + + public void testMissingSystemId() { + final RestRequest restRequest = createRestRequest(null, TEMPLATE_VERSION, "10s"); + + final IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> prepareRequest(restRequest)); + assertThat(exception.getMessage(), containsString("no [system_id] for monitoring bulk request")); + } + + public void testMissingSystemApiVersion() { + final RestRequest restRequest = createRestRequest(randomSystem().getSystem(), null, "10s"); + + final IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> prepareRequest(restRequest)); + assertThat(exception.getMessage(), containsString("no [system_api_version] for monitoring bulk request")); + } + + public void testMissingInterval() { + final RestRequest restRequest = createRestRequest(randomSystem().getSystem(), TEMPLATE_VERSION, null); + + final IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> prepareRequest(restRequest)); + assertThat(exception.getMessage(), containsString("no [interval] for monitoring bulk request")); + } + + public void testWrongInterval() { + final RestRequest restRequest = createRestRequest(randomSystem().getSystem(), TEMPLATE_VERSION, "null"); + + final ElasticsearchParseException exception = expectThrows(ElasticsearchParseException.class, () -> prepareRequest(restRequest)); + assertThat(exception.getMessage(), containsString("failed to parse setting [interval] with value [null]")); + } + + public void testMissingContent() { + final RestRequest restRequest = createRestRequest(0, randomSystem().getSystem(), TEMPLATE_VERSION, "30s"); + + final ElasticsearchParseException exception = expectThrows(ElasticsearchParseException.class, () -> prepareRequest(restRequest)); + assertThat(exception.getMessage(), containsString("no body content for monitoring bulk request")); + } + + public void testUnsupportedSystemVersion() { + final String systemApiVersion = randomFrom(TEMPLATE_VERSION, MonitoringTemplateUtils.OLD_TEMPLATE_VERSION); + final RestRequest restRequest = createRestRequest(MonitoredSystem.UNKNOWN.getSystem(), systemApiVersion, "30s"); + + final IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> prepareRequest(restRequest)); + assertThat(exception.getMessage(), + containsString("system_api_version [" + systemApiVersion + "] is not supported by system_id [unknown]")); + } + + public void testUnknownSystemVersion() { + final MonitoredSystem system = randomSystem(); + final RestRequest restRequest = createRestRequest(system.getSystem(), "0", "30s"); + + final IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> prepareRequest(restRequest)); + assertThat(exception.getMessage(), + containsString("system_api_version [0] is not supported by system_id [" + system.getSystem() + "]")); + } + + public void testNoErrors() throws Exception { + final MonitoringBulkResponse response = new MonitoringBulkResponse(randomLong()); + final FakeRestRequest request = createRestRequest(randomSystemId(), TEMPLATE_VERSION, "10s"); + final RestResponse restResponse = getRestBuilderListener(request).buildResponse(response); + + assertThat(restResponse.status(), is(RestStatus.OK)); + assertThat(restResponse.content().utf8ToString(), is("{\"took\":" + response.getTookInMillis() + ",\"errors\":false}")); + } + + public void testWithErrors() throws Exception { + final RuntimeException e = new RuntimeException("TEST - expected"); + final MonitoringBulkResponse.Error error = new MonitoringBulkResponse.Error(e); + final MonitoringBulkResponse response = new MonitoringBulkResponse(randomLong(), error); + final String errorJson; + + final FakeRestRequest request = createRestRequest(randomSystemId(), TEMPLATE_VERSION, "10s"); + final RestResponse restResponse = getRestBuilderListener(request).buildResponse(response); + + try (XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent())) { + error.toXContent(builder, ToXContent.EMPTY_PARAMS); + errorJson = builder.string(); + } + + assertThat(restResponse.status(), is(RestStatus.INTERNAL_SERVER_ERROR)); + assertThat(restResponse.content().utf8ToString(), + is("{\"took\":" + response.getTookInMillis() + ",\"errors\":true,\"error\":" + errorJson + "}")); + } + + /** + * Returns a {@link MonitoredSystem} supported by the Monitoring Bulk API + */ + private static MonitoredSystem randomSystem() { + return randomFrom(MonitoredSystem.LOGSTASH, MonitoredSystem.KIBANA, MonitoredSystem.BEATS); + } + + /** + * Returns a {@link String} representing a {@link MonitoredSystem} supported by the Monitoring Bulk API + */ + private static String randomSystemId() { + return randomSystem().getSystem(); + } + + private void prepareRequest(final RestRequest restRequest) throws Exception { + getRestBuilderListener(restRequest); + } + + private RestBuilderListener getRestBuilderListener(final RestRequest restRequest) throws Exception { + final Client client = mock(Client.class); + final XPackClient xpackClient = mock(XPackClient.class); + final MonitoringClient monitoringClient = mock(MonitoringClient.class); + final AtomicReference> listenerReference = new AtomicReference<>(); + final MonitoringBulkRequestBuilder builder = new MonitoringBulkRequestBuilder(client){ + @SuppressWarnings("unchecked") + @Override + public void execute(ActionListener listener) { + listenerReference.set((RestBuilderListener)listener); + } + }; + when(monitoringClient.prepareMonitoringBulk()).thenReturn(builder); + when(xpackClient.monitoring()).thenReturn(monitoringClient); + + final CheckedConsumer consumer = action.doPrepareRequest(restRequest, xpackClient); + + final RestChannel channel = mock(RestChannel.class); + when(channel.newBuilder()).thenReturn(JsonXContent.contentBuilder()); + + // trigger/capture execution + consumer.accept(channel); + + assertThat(listenerReference.get(), not(nullValue())); + + return listenerReference.get(); + } + + private static FakeRestRequest createRestRequest(final String systemId, final String systemApiVersion, final String interval) { + return createRestRequest(randomIntBetween(1, 10), systemId, systemApiVersion, interval); + } + + private static FakeRestRequest createRestRequest(final int nbDocs, + final String systemId, + final String systemApiVersion, + final String interval) { + final FakeRestRequest.Builder builder = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY); + if (nbDocs > 0) { + final StringBuilder requestBody = new StringBuilder(); + for (int n = 0; n < nbDocs; n++) { + requestBody.append("{\"index\":{\"_type\":\"_doc\"}}\n"); + requestBody.append("{\"field\":").append(n).append("}\n"); + } + requestBody.append("\n"); + builder.withContent(new BytesArray(requestBody.toString()), XContentType.JSON); + } + + final Map parameters = new HashMap<>(); + if (systemId != null) { + parameters.put(RestMonitoringBulkAction.MONITORING_ID, systemId); + } + if (systemApiVersion != null) { + parameters.put(RestMonitoringBulkAction.MONITORING_VERSION, systemApiVersion); + } + if (interval != null) { + parameters.put(RestMonitoringBulkAction.INTERVAL, interval); + } + builder.withParams(parameters); + + return builder.build(); + } + +}