[Monitoring] _xpack/monitoring/_bulk action support content stream (elastic/elasticsearch#4916)

This commit marks the monitoring rest bulk action as supporting a content stream. This endpoint takes the same format as the normal bulk endpoint so we need to also accept the newline delimited JSON content type header here.

Closes elastic/elasticsearch#4913

Original commit: elastic/x-pack-elasticsearch@a312cd1256
This commit is contained in:
Jay Modi 2017-02-09 06:27:52 -05:00 committed by GitHub
parent 5185e06631
commit 8c2ce2c504
5 changed files with 43 additions and 5 deletions

View File

@ -80,10 +80,14 @@ public class RestMonitoringBulkAction extends MonitoringRestHandler {
});
}
@Override
public boolean supportsContentStream() {
return true;
}
static final class Fields {
static final String TOOK = "took";
static final String ERRORS = "errors";
static final String ERROR = "error";
}
}

View File

@ -5,15 +5,27 @@
*/
package org.elasticsearch.xpack.monitoring.action;
import org.apache.http.Header;
import org.apache.http.HttpHeaders;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.monitoring.MonitoredSystem;
import org.elasticsearch.xpack.monitoring.exporter.MonitoringTemplateUtils;
import org.elasticsearch.xpack.monitoring.resolver.bulk.MonitoringBulkTimestampedResolver;
import org.elasticsearch.xpack.monitoring.test.MonitoringIntegTestCase;
import org.elasticsearch.xpack.security.authc.support.SecuredString;
import org.elasticsearch.xpack.security.authc.support.UsernamePasswordToken;
import java.util.List;
import java.util.Map;
@ -31,6 +43,11 @@ import static org.hamcrest.Matchers.nullValue;
public class MonitoringBulkTests extends MonitoringIntegTestCase {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(NetworkModule.HTTP_ENABLED.getKey(), securityEnabled).build();
}
@Override
protected Settings transportClientSettings() {
return super.transportClientSettings();
@ -154,4 +171,20 @@ public class MonitoringBulkTests extends MonitoringIntegTestCase {
SearchResponse countResponse = client().prepareSearch().setTypes(types).setSize(0).get();
assertHitCount(countResponse, totalDocs - unsupportedDocs);
}
public void testOverHttp() throws Exception {
final String contentType = randomFrom("application/json", "application/x-ndjson");
assumeTrue("security needs to be enabled for the http transport type to be set", securityEnabled);
RestClient restClient = getRestClient();
final Header authorization = new BasicHeader(HttpHeaders.AUTHORIZATION,
UsernamePasswordToken.basicAuthHeaderValue(SecuritySettings.TEST_USERNAME,
new SecuredString(SecuritySettings.TEST_PASSWORD.toCharArray())));
Response response = restClient.performRequest("POST", "/_xpack/monitoring/_bulk",
MapBuilder.<String, String>newMapBuilder().put("system_id", MonitoredSystem.KIBANA.getSystem())
.put("system_api_version", MonitoringTemplateUtils.TEMPLATE_VERSION)
.put("interval", "10s").immutableMap(),
new StringEntity("{\"index\":{\"_index\":\"\",\"_type\":\"kibana\",\"_id\":\"1\"} }\n" +
"{ \"field1\" : \"value1\" }\n", ContentType.create(contentType)), authorization);
assertEquals(RestStatus.OK.getStatus(), response.getStatusLine().getStatusCode());
}
}

View File

@ -46,7 +46,6 @@ import org.junit.Before;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@ -726,7 +725,8 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
}
private void assertBulkRequest(String requestBody, int numberOfActions) throws Exception {
BulkRequest bulkRequest = Requests.bulkRequest().add(new BytesArray(requestBody.getBytes(StandardCharsets.UTF_8)), null, null);
BulkRequest bulkRequest = Requests.bulkRequest()
.add(new BytesArray(requestBody.getBytes(StandardCharsets.UTF_8)), null, null, XContentType.JSON);
assertThat(bulkRequest.numberOfActions(), equalTo(numberOfActions));
for (DocWriteRequest actionRequest : bulkRequest.requests()) {
assertThat(actionRequest, instanceOf(IndexRequest.class));

View File

@ -212,7 +212,7 @@ public class LocalExporterTemplateTests extends MonitoringIntegTestCase {
private void putTemplate(String name) throws Exception {
waitNoPendingTasksOnAll();
assertAcked(client().admin().indices().preparePutTemplate(name).setSource(generateTemplateSource(name)).get());
assertAcked(client().admin().indices().preparePutTemplate(name).setSource(generateTemplateSource(name), XContentType.JSON).get());
}
private void putPipeline(String name) throws Exception {

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.watcher.actions.throttler;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.common.http.HttpMethod;
import org.elasticsearch.xpack.common.http.HttpRequestTemplate;
import org.elasticsearch.xpack.common.text.TextTemplate;
@ -301,7 +302,7 @@ public class ActionThrottleTests extends AbstractWatcherIntegrationTestCase {
.endObject()
.endObject().string();
client().admin().indices().prepareCreate("foo").addMapping("bar", mapping).get();
client().admin().indices().prepareCreate("foo").addMapping("bar", mapping, XContentType.JSON).get();
TimeValue throttlePeriod = new TimeValue(60, TimeUnit.MINUTES);