Adding {index} option for _xpack/monitoring/_bulk

This adds it so that a system can specify "_data" as the index to index into the
data index (without having to know its name). _Not_ supplying an index will use
the timestamped index. Any other index name (including wrong case) is invalid.

Original commit: elastic/x-pack-elasticsearch@6eeadfb3c8
This commit is contained in:
Chris Earle 2016-05-09 19:10:14 -04:00
parent b5f17a0a40
commit 5c9d18fc34
21 changed files with 362 additions and 63 deletions

View File

@ -14,22 +14,31 @@ import java.io.IOException;
public class MonitoringBulkDoc extends MonitoringDoc { public class MonitoringBulkDoc extends MonitoringDoc {
private String index; private MonitoringIndex index = MonitoringIndex.TIMESTAMPED;
private String type; private String type;
private String id; private String id;
private BytesReference source; private BytesReference source;
public MonitoringBulkDoc(String monitoringId, String monitoringVersion) { public MonitoringBulkDoc(String monitoringId, String monitoringVersion) {
super(monitoringId, monitoringVersion); super(monitoringId, monitoringVersion);
} }
public MonitoringBulkDoc(String monitoringId, String monitoringVersion,
MonitoringIndex index, String type, String id,
BytesReference source) {
super(monitoringId, monitoringVersion);
this.index = index;
this.type = type;
this.id = id;
this.source = source;
}
/** /**
* Read from a stream. * Read from a stream.
*/ */
public MonitoringBulkDoc(StreamInput in) throws IOException { public MonitoringBulkDoc(StreamInput in) throws IOException {
super(in); super(in);
index = in.readOptionalString(); index = MonitoringIndex.readFrom(in);
type = in.readOptionalString(); type = in.readOptionalString();
id = in.readOptionalString(); id = in.readOptionalString();
source = in.readBytesReference(); source = in.readBytesReference();
@ -38,17 +47,17 @@ public class MonitoringBulkDoc extends MonitoringDoc {
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
out.writeOptionalString(index); index.writeTo(out);
out.writeOptionalString(type); out.writeOptionalString(type);
out.writeOptionalString(id); out.writeOptionalString(id);
out.writeBytesReference(source); out.writeBytesReference(source);
} }
public String getIndex() { public MonitoringIndex getIndex() {
return index; return index;
} }
public void setIndex(String index) { public void setIndex(MonitoringIndex index) {
this.index = index; this.index = index;
} }
@ -75,4 +84,5 @@ public class MonitoringBulkDoc extends MonitoringDoc {
public void setSource(BytesReference source) { public void setSource(BytesReference source) {
this.source = source; this.source = source;
} }
} }

View File

@ -95,11 +95,14 @@ public class MonitoringBulkRequest extends ActionRequest<MonitoringBulkRequest>
IndexRequest indexRequest = (IndexRequest) request; IndexRequest indexRequest = (IndexRequest) request;
// builds a new monitoring document based on the index request // builds a new monitoring document based on the index request
MonitoringBulkDoc doc = new MonitoringBulkDoc(defaultMonitoringId, defaultMonitoringVersion); MonitoringBulkDoc doc =
doc.setIndex(indexRequest.index()); new MonitoringBulkDoc(defaultMonitoringId,
doc.setType(indexRequest.type()); defaultMonitoringVersion,
doc.setId(indexRequest.id()); MonitoringIndex.from(indexRequest.index()),
doc.setSource(indexRequest.source()); indexRequest.type(),
indexRequest.id(),
indexRequest.source());
add(doc); add(doc);
} else { } else {
throw new IllegalArgumentException("monitoring bulk requests should only contain index requests"); throw new IllegalArgumentException("monitoring bulk requests should only contain index requests");

View File

@ -0,0 +1,76 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.action;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import java.io.IOException;
/**
* {@code MonitoringIndex} represents the receivable index from any request.
* <p>
* This allows external systems to provide details for an index without having to know its exact name.
*/
public enum MonitoringIndex implements Writeable {
/**
* Data that drives information about the "cluster" (e.g., a node or instance).
*/
DATA {
@Override
public boolean matchesIndexName(String indexName) {
return "_data".equals(indexName);
}
},
/**
* Timestamped data that drives the charts (e.g., memory statistics).
*/
TIMESTAMPED {
@Override
public boolean matchesIndexName(String indexName) {
return Strings.isEmpty(indexName);
}
};
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeByte((byte)ordinal());
}
public static MonitoringIndex readFrom(StreamInput in) throws IOException {
return values()[in.readByte()];
}
/**
* Determine if the {@code indexName} matches {@code this} monitoring index.
*
* @param indexName The name of the index.
* @return {@code true} if {@code this} matches the {@code indexName}
*/
public abstract boolean matchesIndexName(String indexName);
/**
* Find the {@link MonitoringIndex} to use for the request.
*
* @param indexName The name of the index.
* @return Never {@code null}.
* @throws IllegalArgumentException if {@code indexName} is unrecognized
*/
public static MonitoringIndex from(String indexName) {
for (MonitoringIndex index : values()) {
if (index.matchesIndexName(indexName)) {
return index;
}
}
throw new IllegalArgumentException("unrecognized index name [" + indexName + "]");
}
}

View File

@ -9,6 +9,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.marvel.MonitoredSystem; import org.elasticsearch.marvel.MonitoredSystem;
import org.elasticsearch.marvel.action.MonitoringBulkDoc; import org.elasticsearch.marvel.action.MonitoringBulkDoc;
import org.elasticsearch.marvel.action.MonitoringIndex;
import org.elasticsearch.marvel.agent.collector.cluster.ClusterInfoMonitoringDoc; import org.elasticsearch.marvel.agent.collector.cluster.ClusterInfoMonitoringDoc;
import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateMonitoringDoc; import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateMonitoringDoc;
import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateNodeMonitoringDoc; import org.elasticsearch.marvel.agent.collector.cluster.ClusterStateNodeMonitoringDoc;
@ -20,7 +21,8 @@ import org.elasticsearch.marvel.agent.collector.indices.IndicesStatsMonitoringDo
import org.elasticsearch.marvel.agent.collector.node.NodeStatsMonitoringDoc; import org.elasticsearch.marvel.agent.collector.node.NodeStatsMonitoringDoc;
import org.elasticsearch.marvel.agent.collector.shards.ShardMonitoringDoc; import org.elasticsearch.marvel.agent.collector.shards.ShardMonitoringDoc;
import org.elasticsearch.marvel.agent.exporter.MonitoringDoc; import org.elasticsearch.marvel.agent.exporter.MonitoringDoc;
import org.elasticsearch.marvel.agent.resolver.bulk.MonitoringBulkResolver; import org.elasticsearch.marvel.agent.resolver.bulk.MonitoringBulkDataResolver;
import org.elasticsearch.marvel.agent.resolver.bulk.MonitoringBulkTimestampedResolver;
import org.elasticsearch.marvel.agent.resolver.cluster.ClusterInfoResolver; import org.elasticsearch.marvel.agent.resolver.cluster.ClusterInfoResolver;
import org.elasticsearch.marvel.agent.resolver.cluster.ClusterStateNodeResolver; import org.elasticsearch.marvel.agent.resolver.cluster.ClusterStateNodeResolver;
import org.elasticsearch.marvel.agent.resolver.cluster.ClusterStateResolver; import org.elasticsearch.marvel.agent.resolver.cluster.ClusterStateResolver;
@ -72,8 +74,10 @@ public class ResolversRegistry implements Iterable<MonitoringIndexNameResolver>
* Registers resolvers for monitored systems * Registers resolvers for monitored systems
*/ */
private void registerMonitoredSystem(MonitoredSystem id, Settings settings) { private void registerMonitoredSystem(MonitoredSystem id, Settings settings) {
final MonitoringBulkResolver resolver = new MonitoringBulkResolver(id, settings); final MonitoringBulkDataResolver dataResolver = new MonitoringBulkDataResolver();
registrations.add(resolveByClassSystemVersion(MonitoringBulkDoc.class, id, Version.CURRENT, resolver)); final MonitoringBulkTimestampedResolver timestampedResolver = new MonitoringBulkTimestampedResolver(id, settings);
registrations.add(resolveByClassSystemVersion(id, dataResolver, MonitoringIndex.DATA, Version.CURRENT));
registrations.add(resolveByClassSystemVersion(id, timestampedResolver, MonitoringIndex.TIMESTAMPED, Version.CURRENT));
} }
/** /**
@ -97,11 +101,11 @@ public class ResolversRegistry implements Iterable<MonitoringIndexNameResolver>
return new Registration(resolver, type::isInstance); return new Registration(resolver, type::isInstance);
} }
static Registration resolveByClassSystemVersion(Class<? extends MonitoringDoc> type, MonitoredSystem system, Version version, static Registration resolveByClassSystemVersion(MonitoredSystem system, MonitoringIndexNameResolver resolver, MonitoringIndex index,
MonitoringIndexNameResolver resolver) { Version version) {
return new Registration(resolver, doc -> { return new Registration(resolver, doc -> {
try { try {
if (type.isInstance(doc) == false) { if (doc instanceof MonitoringBulkDoc == false || index != ((MonitoringBulkDoc)doc).getIndex()) {
return false; return false;
} }
if (system != MonitoredSystem.fromSystem(doc.getMonitoringId())) { if (system != MonitoredSystem.fromSystem(doc.getMonitoringId())) {

View File

@ -0,0 +1,35 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.resolver.bulk;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.marvel.action.MonitoringBulkDoc;
import org.elasticsearch.marvel.agent.resolver.MonitoringIndexNameResolver;
import java.io.IOException;
public class MonitoringBulkDataResolver extends MonitoringIndexNameResolver.Data<MonitoringBulkDoc> {
@Override
public String type(MonitoringBulkDoc document) {
return document.getType();
}
@Override
public String id(MonitoringBulkDoc document) {
return document.getId();
}
@Override
protected void buildXContent(MonitoringBulkDoc document, XContentBuilder builder, ToXContent.Params params) throws IOException {
BytesReference source = document.getSource();
if (source != null && source.length() > 0) {
builder.rawField(type(document), source);
}
}
}

View File

@ -15,9 +15,9 @@ import org.elasticsearch.marvel.agent.resolver.MonitoringIndexNameResolver;
import java.io.IOException; import java.io.IOException;
public class MonitoringBulkResolver extends MonitoringIndexNameResolver.Timestamped<MonitoringBulkDoc> { public class MonitoringBulkTimestampedResolver extends MonitoringIndexNameResolver.Timestamped<MonitoringBulkDoc> {
public MonitoringBulkResolver(MonitoredSystem id, Settings settings) { public MonitoringBulkTimestampedResolver(MonitoredSystem id, Settings settings) {
super(id, settings); super(id, settings);
} }

View File

@ -23,6 +23,9 @@ import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.action.support.RestActions; import org.elasticsearch.rest.action.support.RestActions;
import org.elasticsearch.rest.action.support.RestBuilderListener; import org.elasticsearch.rest.action.support.RestBuilderListener;
import static org.elasticsearch.rest.RestRequest.Method.POST;
import static org.elasticsearch.rest.RestRequest.Method.PUT;
public class RestMonitoringBulkAction extends MonitoringRestHandler { public class RestMonitoringBulkAction extends MonitoringRestHandler {
public static final String MONITORING_ID = "system_id"; public static final String MONITORING_ID = "system_id";
@ -31,12 +34,12 @@ public class RestMonitoringBulkAction extends MonitoringRestHandler {
@Inject @Inject
public RestMonitoringBulkAction(Settings settings, RestController controller, Client client) { public RestMonitoringBulkAction(Settings settings, RestController controller, Client client) {
super(settings, client); super(settings, client);
controller.registerHandler(RestRequest.Method.POST, URI_BASE + "/_bulk", this); controller.registerHandler(POST, URI_BASE + "/_bulk", this);
controller.registerHandler(RestRequest.Method.PUT, URI_BASE + "/_bulk", this); controller.registerHandler(PUT, URI_BASE + "/_bulk", this);
controller.registerHandler(RestRequest.Method.POST, URI_BASE + "/{index}/_bulk", this); controller.registerHandler(POST, URI_BASE + "/{type}/_bulk", this);
controller.registerHandler(RestRequest.Method.PUT, URI_BASE + "/{index}/_bulk", this); controller.registerHandler(PUT, URI_BASE + "/{type}/_bulk", this);
controller.registerHandler(RestRequest.Method.POST, URI_BASE + "/{index}/{type}/_bulk", this); controller.registerHandler(POST, URI_BASE + "/{index}/{type}/_bulk", this);
controller.registerHandler(RestRequest.Method.PUT, URI_BASE + "/{index}/{type}/_bulk", this); controller.registerHandler(PUT, URI_BASE + "/{index}/{type}/_bulk", this);
} }
@Override @Override

View File

@ -67,7 +67,7 @@ public class MonitoringBulkDocTests extends ESTestCase {
doc.setSource(new BytesArray("{\"key\" : \"value\"}")); doc.setSource(new BytesArray("{\"key\" : \"value\"}"));
} }
if (rarely()) { if (rarely()) {
doc.setIndex(randomAsciiOfLength(5)); doc.setIndex(MonitoringIndex.DATA);
doc.setId(randomAsciiOfLength(2)); doc.setId(randomAsciiOfLength(2));
} }
return doc; return doc;

View File

@ -23,6 +23,7 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItems; import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
public class MonitoringBulkRequestTests extends ESTestCase { public class MonitoringBulkRequestTests extends ESTestCase {
@ -122,9 +123,11 @@ public class MonitoringBulkRequestTests extends ESTestCase {
String defaultMonitoringId = randomBoolean() ? randomAsciiOfLength(2) : null; String defaultMonitoringId = randomBoolean() ? randomAsciiOfLength(2) : null;
String defaultMonitoringVersion = randomBoolean() ? randomAsciiOfLength(3) : null; String defaultMonitoringVersion = randomBoolean() ? randomAsciiOfLength(3) : null;
String defaultIndex = randomBoolean() ? randomAsciiOfLength(5) : null; String defaultIndex = randomFrom("_data", null);
String defaultType = randomBoolean() ? randomAsciiOfLength(4) : null; String defaultType = randomBoolean() ? randomAsciiOfLength(4) : null;
MonitoringIndex index = MonitoringIndex.from(defaultIndex);
MonitoringBulkRequest request = new MonitoringBulkRequest(); MonitoringBulkRequest request = new MonitoringBulkRequest();
request.add(content.bytes(), defaultMonitoringId, defaultMonitoringVersion, defaultIndex, defaultType); request.add(content.bytes(), defaultMonitoringId, defaultMonitoringVersion, defaultIndex, defaultType);
assertThat(request.getDocs(), hasSize(nbDocs)); assertThat(request.getDocs(), hasSize(nbDocs));
@ -132,7 +135,7 @@ public class MonitoringBulkRequestTests extends ESTestCase {
for (MonitoringBulkDoc doc : request.getDocs()) { for (MonitoringBulkDoc doc : request.getDocs()) {
assertThat(doc.getMonitoringId(), equalTo(defaultMonitoringId)); assertThat(doc.getMonitoringId(), equalTo(defaultMonitoringId));
assertThat(doc.getMonitoringVersion(), equalTo(defaultMonitoringVersion)); assertThat(doc.getMonitoringVersion(), equalTo(defaultMonitoringVersion));
assertThat(doc.getIndex(), equalTo(defaultIndex)); assertThat(doc.getIndex(), sameInstance(index));
assertThat(doc.getType(), equalTo(defaultType)); assertThat(doc.getType(), equalTo(defaultType));
} }
} }
@ -147,7 +150,7 @@ public class MonitoringBulkRequestTests extends ESTestCase {
doc.setType(randomFrom("type1", "type2", "type3")); doc.setType(randomFrom("type1", "type2", "type3"));
doc.setSource(SOURCE); doc.setSource(SOURCE);
if (randomBoolean()) { if (randomBoolean()) {
doc.setIndex("index"); doc.setIndex(MonitoringIndex.DATA);
} }
if (randomBoolean()) { if (randomBoolean()) {
doc.setId(randomAsciiOfLength(3)); doc.setId(randomAsciiOfLength(3));

View File

@ -10,7 +10,7 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.marvel.MonitoredSystem; import org.elasticsearch.marvel.MonitoredSystem;
import org.elasticsearch.marvel.agent.resolver.bulk.MonitoringBulkResolver; import org.elasticsearch.marvel.agent.resolver.bulk.MonitoringBulkTimestampedResolver;
import org.elasticsearch.marvel.test.MarvelIntegTestCase; import org.elasticsearch.marvel.test.MarvelIntegTestCase;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.junit.annotations.TestLogging;
@ -58,9 +58,9 @@ public class MonitoringBulkTests extends MarvelIntegTestCase {
for (SearchHit searchHit : searchResponse.getHits()) { for (SearchHit searchHit : searchResponse.getHits()) {
Map<String, Object> source = searchHit.sourceAsMap(); Map<String, Object> source = searchHit.sourceAsMap();
assertNotNull(source.get(MonitoringBulkResolver.Fields.CLUSTER_UUID)); assertNotNull(source.get(MonitoringBulkTimestampedResolver.Fields.CLUSTER_UUID));
assertNotNull(source.get(MonitoringBulkResolver.Fields.TIMESTAMP)); assertNotNull(source.get(MonitoringBulkTimestampedResolver.Fields.TIMESTAMP));
assertNotNull(source.get(MonitoringBulkResolver.Fields.SOURCE_NODE)); assertNotNull(source.get(MonitoringBulkTimestampedResolver.Fields.SOURCE_NODE));
} }
} }

View File

@ -0,0 +1,69 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.action;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
/**
* Tests {@link MonitoringIndex}
*/
public class MonitoringIndexTests extends ESTestCase {
public void testDataMatchesIndexName() {
assertTrue(MonitoringIndex.DATA.matchesIndexName("_data"));
assertFalse(MonitoringIndex.DATA.matchesIndexName("_DATA"));
assertFalse(MonitoringIndex.DATA.matchesIndexName("_dAtA"));
assertFalse(MonitoringIndex.DATA.matchesIndexName("_data "));
assertFalse(MonitoringIndex.DATA.matchesIndexName(" _data "));
assertFalse(MonitoringIndex.DATA.matchesIndexName(""));
assertFalse(MonitoringIndex.DATA.matchesIndexName(null));
}
public void testTimestampMatchesIndexName() {
assertTrue(MonitoringIndex.TIMESTAMPED.matchesIndexName(""));
assertTrue(MonitoringIndex.TIMESTAMPED.matchesIndexName(null));
assertFalse(MonitoringIndex.TIMESTAMPED.matchesIndexName(" "));
assertFalse(MonitoringIndex.TIMESTAMPED.matchesIndexName("_data"));
}
public void testFrom() {
assertSame(MonitoringIndex.DATA, MonitoringIndex.from("_data"));
assertSame(MonitoringIndex.TIMESTAMPED, MonitoringIndex.from(""));
assertSame(MonitoringIndex.TIMESTAMPED, MonitoringIndex.from(null));
}
public void testFromFails() {
String[] invalidNames = { "_DATA", "other", " " };
for (String name : invalidNames) {
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> MonitoringIndex.from(name));
assertEquals("unrecognized index name [" + name + "]", e.getMessage());
}
}
public void testStreaming() throws IOException {
MonitoringIndex index = randomFrom(MonitoringIndex.values());
final BytesStreamOutput out = new BytesStreamOutput();
index.writeTo(out);
final StreamInput in = StreamInput.wrap(out.bytes().toBytes());
assertSame(index, MonitoringIndex.readFrom(in));
assertEquals(0, in.available());
in.close();
out.close();
}
}

View File

@ -31,7 +31,7 @@ import org.elasticsearch.marvel.agent.collector.indices.IndexRecoveryMonitoringD
import org.elasticsearch.marvel.agent.exporter.Exporters; import org.elasticsearch.marvel.agent.exporter.Exporters;
import org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils; import org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils;
import org.elasticsearch.marvel.agent.exporter.MonitoringDoc; import org.elasticsearch.marvel.agent.exporter.MonitoringDoc;
import org.elasticsearch.marvel.agent.resolver.bulk.MonitoringBulkResolver; import org.elasticsearch.marvel.agent.resolver.bulk.MonitoringBulkTimestampedResolver;
import org.elasticsearch.marvel.test.MarvelIntegTestCase; import org.elasticsearch.marvel.test.MarvelIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.Scope; import org.elasticsearch.test.ESIntegTestCase.Scope;
@ -229,7 +229,7 @@ public class HttpExporterTests extends MarvelIntegTestCase {
enqueueGetClusterVersionResponse(secondWebServer, Version.CURRENT); enqueueGetClusterVersionResponse(secondWebServer, Version.CURRENT);
for (String template : monitoringTemplates().keySet()) { for (String template : monitoringTemplates().keySet()) {
if (template.contains(MonitoringBulkResolver.Data.DATA)) { if (template.contains(MonitoringBulkTimestampedResolver.Data.DATA)) {
enqueueResponse(secondWebServer, 200, "template [" + template + "] exist"); enqueueResponse(secondWebServer, 200, "template [" + template + "] exist");
} else { } else {
enqueueResponse(secondWebServer, 404, "template [" + template + "] does not exist"); enqueueResponse(secondWebServer, 404, "template [" + template + "] does not exist");
@ -252,7 +252,7 @@ public class HttpExporterTests extends MarvelIntegTestCase {
assertThat(recordedRequest.getMethod(), equalTo("GET")); assertThat(recordedRequest.getMethod(), equalTo("GET"));
assertThat(recordedRequest.getPath(), equalTo("/_template/" + template.getKey())); assertThat(recordedRequest.getPath(), equalTo("/_template/" + template.getKey()));
if (template.getKey().contains(MonitoringBulkResolver.Data.DATA) == false) { if (template.getKey().contains(MonitoringBulkTimestampedResolver.Data.DATA) == false) {
recordedRequest = secondWebServer.takeRequest(); recordedRequest = secondWebServer.takeRequest();
assertThat(recordedRequest.getMethod(), equalTo("PUT")); assertThat(recordedRequest.getMethod(), equalTo("PUT"));
assertThat(recordedRequest.getPath(), equalTo("/_template/" + template.getKey())); assertThat(recordedRequest.getPath(), equalTo("/_template/" + template.getKey()));

View File

@ -30,7 +30,6 @@ import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.emptyArray; import static org.hamcrest.Matchers.emptyArray;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItems; import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isEmptyOrNullString; import static org.hamcrest.Matchers.isEmptyOrNullString;
@ -48,7 +47,15 @@ public abstract class MonitoringIndexNameResolverTestCase<M extends MonitoringDo
* @return the {@link MonitoringIndexNameResolver} to test * @return the {@link MonitoringIndexNameResolver} to test
*/ */
protected R newResolver() { protected R newResolver() {
return (R) resolversRegistry.getResolver(newMarvelDoc()); return newResolver(newMarvelDoc());
}
/**
* @return the {@link MonitoringIndexNameResolver} to test
*/
@SuppressWarnings("unchecked")
protected R newResolver(M doc) {
return (R) resolversRegistry.getResolver(doc);
} }
/** /**

View File

@ -90,7 +90,7 @@ public class TimestampedResolverTests extends MonitoringIndexNameResolverTestCas
@Override @Override
protected void buildXContent(MonitoringDoc document, XContentBuilder builder, ToXContent.Params params) throws IOException { protected void buildXContent(MonitoringDoc document, XContentBuilder builder, ToXContent.Params params) throws IOException {
return; // noop
} }
}; };
} }

View File

@ -0,0 +1,73 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.agent.resolver.bulk;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.marvel.MonitoredSystem;
import org.elasticsearch.marvel.action.MonitoringBulkDoc;
import org.elasticsearch.marvel.action.MonitoringIndex;
import org.elasticsearch.marvel.agent.resolver.MonitoringIndexNameResolverTestCase;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.sameInstance;
/**
* Tests {@link MonitoringBulkDataResolver}.
*/
public class MonitoringBulkDataResolverTests extends MonitoringIndexNameResolverTestCase<MonitoringBulkDoc, MonitoringBulkDataResolver> {
private final String id = randomBoolean() ? randomAsciiOfLength(35) : null;
@Override
protected MonitoringBulkDoc newMarvelDoc() {
MonitoringBulkDoc doc = new MonitoringBulkDoc(MonitoredSystem.KIBANA.getSystem(), Version.CURRENT.toString(),
MonitoringIndex.DATA, "kibana", id,
new BytesArray("{\"field1\" : \"value1\"}"));
if (randomBoolean()) {
doc.setClusterUUID(randomAsciiOfLength(5));
}
doc.setClusterUUID(randomAsciiOfLength(5));
doc.setTimestamp(Math.abs(randomLong()));
doc.setSourceNode(new DiscoveryNode("id", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT));
return doc;
}
@Override
public void testId() {
MonitoringBulkDoc doc = newMarvelDoc();
assertThat(newResolver(doc).id(doc), sameInstance(id));
}
@Override
protected boolean checkFilters() {
return false;
}
public void testMonitoringBulkResolver() throws Exception {
MonitoringBulkDoc doc = newMarvelDoc();
MonitoringBulkDataResolver resolver = newResolver(doc);
assertThat(resolver.index(doc), equalTo(".monitoring-data-2"));
assertThat(resolver.type(doc), equalTo(doc.getType()));
assertThat(resolver.id(doc), sameInstance(id));
assertSource(resolver.source(doc, XContentType.JSON),
"cluster_uuid",
"timestamp",
"source_node",
"kibana",
"kibana.field1");
}
}

View File

@ -12,6 +12,7 @@ import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.marvel.MonitoredSystem; import org.elasticsearch.marvel.MonitoredSystem;
import org.elasticsearch.marvel.action.MonitoringBulkDoc; import org.elasticsearch.marvel.action.MonitoringBulkDoc;
import org.elasticsearch.marvel.action.MonitoringIndex;
import org.elasticsearch.marvel.agent.resolver.MonitoringIndexNameResolverTestCase; import org.elasticsearch.marvel.agent.resolver.MonitoringIndexNameResolverTestCase;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
@ -19,16 +20,28 @@ import static java.util.Collections.emptySet;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
public class MonitoringBulkResolverTests extends MonitoringIndexNameResolverTestCase<MonitoringBulkDoc, MonitoringBulkResolver> { /**
* Tests {@link MonitoringBulkTimestampedResolver}.
*/
public class MonitoringBulkTimestampedResolverTests
extends MonitoringIndexNameResolverTestCase<MonitoringBulkDoc, MonitoringBulkTimestampedResolver> {
@Override @Override
protected MonitoringBulkDoc newMarvelDoc() { protected MonitoringBulkDoc newMarvelDoc() {
MonitoringBulkDoc doc = new MonitoringBulkDoc(MonitoredSystem.KIBANA.getSystem(), Version.CURRENT.toString()); MonitoringBulkDoc doc = new MonitoringBulkDoc(MonitoredSystem.KIBANA.getSystem(), Version.CURRENT.toString(),
MonitoringIndex.TIMESTAMPED, "kibana_stats", null,
new BytesArray("{\"field1\" : \"value1\"}"));
doc.setTimestamp(1437580442979L);
if (randomBoolean()) {
doc.setId(randomAsciiOfLength(35));
}
if (randomBoolean()) {
doc.setClusterUUID(randomAsciiOfLength(5));
}
doc.setClusterUUID(randomAsciiOfLength(5)); doc.setClusterUUID(randomAsciiOfLength(5));
doc.setTimestamp(Math.abs(randomLong()));
doc.setSourceNode(new DiscoveryNode("id", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT)); doc.setSourceNode(new DiscoveryNode("id", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT));
doc.setType("kibana_stats");
doc.setSource(new BytesArray("{\"field1\" : \"value1\"}"));
return doc; return doc;
} }
@ -44,18 +57,8 @@ public class MonitoringBulkResolverTests extends MonitoringIndexNameResolverTest
public void testMonitoringBulkResolver() throws Exception { public void testMonitoringBulkResolver() throws Exception {
MonitoringBulkDoc doc = newMarvelDoc(); MonitoringBulkDoc doc = newMarvelDoc();
doc.setTimestamp(1437580442979L);
if (randomBoolean()) {
doc.setIndex(randomAsciiOfLength(5));
}
if (randomBoolean()) {
doc.setId(randomAsciiOfLength(35));
}
if (randomBoolean()) {
doc.setClusterUUID(randomAsciiOfLength(5));
}
MonitoringBulkResolver resolver = newResolver(); MonitoringBulkTimestampedResolver resolver = newResolver(doc);
assertThat(resolver.index(doc), equalTo(".monitoring-kibana-2-2015.07.22")); assertThat(resolver.index(doc), equalTo(".monitoring-kibana-2-2015.07.22"));
assertThat(resolver.type(doc), equalTo(doc.getType())); assertThat(resolver.type(doc), equalTo(doc.getType()));
assertThat(resolver.id(doc), nullValue()); assertThat(resolver.id(doc), nullValue());

View File

@ -26,7 +26,6 @@ setup:
system: 1.51 system: 1.51
iowait: 0.85 iowait: 0.85
idle: 84.20 idle: 84.20
- index: - index:
_type: test_type _type: test_type
- avg-cpu: - avg-cpu:
@ -52,7 +51,6 @@ setup:
monitoring.bulk: monitoring.bulk:
system_id: "kibana" system_id: "kibana"
system_version: $version system_version: $version
index: "default_index"
type: "default_type" type: "default_type"
body: body:
- '{"index": {}}' - '{"index": {}}'
@ -61,6 +59,8 @@ setup:
- '{"field_1": "value_2"}' - '{"field_1": "value_2"}'
- '{"index": {}}' - '{"index": {}}'
- '{"field_1": "value_3"}' - '{"field_1": "value_3"}'
- '{"index": {"_index": "_data", "_type": "kibana"}}'
- '{"field_1": "value_4"}'
- is_false: errors - is_false: errors
@ -80,3 +80,10 @@ setup:
type: custom_type type: custom_type
- match: { hits.total: 1 } - match: { hits.total: 1 }
- do:
search:
index: .monitoring-data-*
type: kibana
- match: { hits.total: 1 }

View File

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.shield.authz.permission; package org.elasticsearch.shield.authz.permission;
import org.elasticsearch.marvel.action.MonitoringBulkAction;
import org.elasticsearch.shield.authz.RoleDescriptor; import org.elasticsearch.shield.authz.RoleDescriptor;
import org.elasticsearch.shield.authz.privilege.ClusterPrivilege; import org.elasticsearch.shield.authz.privilege.ClusterPrivilege;
import org.elasticsearch.shield.authz.privilege.Privilege.Name; import org.elasticsearch.shield.authz.privilege.Privilege.Name;
@ -14,7 +15,7 @@ import org.elasticsearch.shield.authz.privilege.Privilege.Name;
*/ */
public class KibanaRole extends Role { public class KibanaRole extends Role {
private static final String[] CLUSTER_PRIVILEGES = new String[] { "monitor" }; private static final String[] CLUSTER_PRIVILEGES = new String[] { "monitor", MonitoringBulkAction.NAME};
private static final RoleDescriptor.IndicesPrivileges[] INDICES_PRIVILEGES = new RoleDescriptor.IndicesPrivileges[] { private static final RoleDescriptor.IndicesPrivileges[] INDICES_PRIVILEGES = new RoleDescriptor.IndicesPrivileges[] {
RoleDescriptor.IndicesPrivileges.builder().indices(".kibana").privileges("all").build() }; RoleDescriptor.IndicesPrivileges.builder().indices(".kibana").privileges("all").build() };

View File

@ -167,4 +167,7 @@ public class KibanaRoleTests extends ShieldIntegTestCase {
.setIndices(index).get(); .setIndices(index).get();
assertThat(response.getIndices(), arrayContaining(index)); assertThat(response.getIndices(), arrayContaining(index));
} }
// TODO: When we have an XPackIntegTestCase, this should test that we can send MonitoringBulkActions
} }

View File

@ -16,6 +16,7 @@ import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsAction;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateAction; import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateAction;
import org.elasticsearch.action.delete.DeleteAction; import org.elasticsearch.action.delete.DeleteAction;
import org.elasticsearch.action.index.IndexAction; import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.marvel.action.MonitoringBulkAction;
import org.elasticsearch.shield.user.KibanaUser; import org.elasticsearch.shield.user.KibanaUser;
import org.elasticsearch.shield.user.User; import org.elasticsearch.shield.user.User;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
@ -34,6 +35,7 @@ public class KibanaRoleTests extends ESTestCase {
assertThat(KibanaRole.INSTANCE.cluster().check(ClusterHealthAction.NAME, request, user), is(true)); assertThat(KibanaRole.INSTANCE.cluster().check(ClusterHealthAction.NAME, request, user), is(true));
assertThat(KibanaRole.INSTANCE.cluster().check(ClusterStateAction.NAME, request, user), is(true)); assertThat(KibanaRole.INSTANCE.cluster().check(ClusterStateAction.NAME, request, user), is(true));
assertThat(KibanaRole.INSTANCE.cluster().check(ClusterStatsAction.NAME, request, user), is(true)); assertThat(KibanaRole.INSTANCE.cluster().check(ClusterStatsAction.NAME, request, user), is(true));
assertThat(KibanaRole.INSTANCE.cluster().check(MonitoringBulkAction.NAME, request, user), is(true));
assertThat(KibanaRole.INSTANCE.cluster().check(PutIndexTemplateAction.NAME, request, user), is(false)); assertThat(KibanaRole.INSTANCE.cluster().check(PutIndexTemplateAction.NAME, request, user), is(false));
assertThat(KibanaRole.INSTANCE.cluster().check(ClusterRerouteAction.NAME, request, user), is(false)); assertThat(KibanaRole.INSTANCE.cluster().check(ClusterRerouteAction.NAME, request, user), is(false));
assertThat(KibanaRole.INSTANCE.cluster().check(ClusterUpdateSettingsAction.NAME, request, user), is(false)); assertThat(KibanaRole.INSTANCE.cluster().check(ClusterUpdateSettingsAction.NAME, request, user), is(false));