mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-05 20:48:22 +00:00
[Monitoring] Decouple Kibana Stats Publishing from Application Version
This changes the way that Kibana (and future applications) send their monitoring stats to Elasticsearch. Instead of sending their payloads with the System ID (e.g., "kibana") and System Version (e.g., "5.0.0-alpha4"), it now expects the System ID and System _API_ Version (e.g., "2"). This means a few things: - Future releases are automatically compatible with previous releases as long as the API version doesn't change. - Users don't have to update Kibana at the exact same time as their cluster (which technically means rolling updates were temporarily blockers of Kibana monitoring before). - We can accept old API versions (if we need to make a breaking change) and automatically up-convert them to the latest API version. (We are in full control of how far back we choose to accept) In general, this change implies that users should be updating their Monitoring cluster before their _monitored_ cluster(s) to get the best opportunity of monitoring backwards compatibility. That way if any API change does occur, then it can up-convert as needed. Then, any ES node should be updated, and only then should Kibana be updated. This is not required in any way, but it will give the smoothest experience. Original commit: elastic/x-pack-elasticsearch@d3c24936e1
This commit is contained in:
parent
acaefe89fa
commit
dbe189b064
@ -58,7 +58,7 @@ public class MonitoringBulkRequest extends ActionRequest<MonitoringBulkRequest>
|
||||
validationException);
|
||||
}
|
||||
if (Strings.hasLength(doc.getMonitoringVersion()) == false) {
|
||||
validationException = addValidationError("monitored system version is missing for monitoring document [" + i + "]",
|
||||
validationException = addValidationError("monitored system API version is missing for monitoring document [" + i + "]",
|
||||
validationException);
|
||||
}
|
||||
if (Strings.hasLength(doc.getType()) == false) {
|
||||
@ -84,7 +84,7 @@ public class MonitoringBulkRequest extends ActionRequest<MonitoringBulkRequest>
|
||||
/**
|
||||
* Parses a monitoring bulk request and builds the list of documents to be indexed.
|
||||
*/
|
||||
public MonitoringBulkRequest add(BytesReference content, String defaultMonitoringId, String defaultMonitoringVersion,
|
||||
public MonitoringBulkRequest add(BytesReference content, String defaultMonitoringId, String defaultMonitoringApiVersion,
|
||||
String defaultType) throws Exception {
|
||||
// MonitoringBulkRequest accepts a body request that has the same format as the BulkRequest:
|
||||
// instead of duplicating the parsing logic here we use a new BulkRequest instance to parse the content.
|
||||
@ -97,7 +97,7 @@ public class MonitoringBulkRequest extends ActionRequest<MonitoringBulkRequest>
|
||||
// builds a new monitoring document based on the index request
|
||||
MonitoringBulkDoc doc =
|
||||
new MonitoringBulkDoc(defaultMonitoringId,
|
||||
defaultMonitoringVersion,
|
||||
defaultMonitoringApiVersion,
|
||||
MonitoringIndex.from(indexRequest.index()),
|
||||
indexRequest.type(),
|
||||
indexRequest.id(),
|
||||
|
@ -21,9 +21,9 @@ public class MonitoringBulkRequestBuilder
|
||||
return this;
|
||||
}
|
||||
|
||||
public MonitoringBulkRequestBuilder add(BytesReference content, String defaultId, String defaultVersion, String defaultType)
|
||||
public MonitoringBulkRequestBuilder add(BytesReference content, String defaultId, String defaultApiVersion, String defaultType)
|
||||
throws Exception {
|
||||
request.add(content, defaultId, defaultVersion, defaultType);
|
||||
request.add(content, defaultId, defaultApiVersion, defaultType);
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
@ -16,13 +16,13 @@ public final class MonitoringTemplateUtils {
|
||||
private static final String TEMPLATE_VERSION_PROPERTY = Pattern.quote("${monitoring.template.version}");
|
||||
|
||||
/** Current version of es and data templates **/
|
||||
public static final Integer TEMPLATE_VERSION = 2;
|
||||
public static final String TEMPLATE_VERSION = "2";
|
||||
|
||||
private MonitoringTemplateUtils() {
|
||||
}
|
||||
|
||||
public static String loadTemplate(String id) {
|
||||
String resource = String.format(Locale.ROOT, TEMPLATE_FILE, id);
|
||||
return TemplateUtils.loadTemplate(resource, String.valueOf(TEMPLATE_VERSION), TEMPLATE_VERSION_PROPERTY);
|
||||
return TemplateUtils.loadTemplate(resource, TEMPLATE_VERSION, TEMPLATE_VERSION_PROPERTY);
|
||||
}
|
||||
}
|
||||
|
@ -137,8 +137,8 @@ public abstract class MonitoringIndexNameResolver<T extends MonitoringDoc> {
|
||||
}
|
||||
|
||||
// Used in tests
|
||||
protected Data(Integer version) {
|
||||
this.index = String.join(DELIMITER, PREFIX, DATA, String.valueOf(version));
|
||||
protected Data(String version) {
|
||||
this.index = String.join(DELIMITER, PREFIX, DATA, version);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -153,7 +153,7 @@ public abstract class MonitoringIndexNameResolver<T extends MonitoringDoc> {
|
||||
|
||||
@Override
|
||||
public String templateName() {
|
||||
return String.format(Locale.ROOT, "%s-%s-%d", PREFIX, DATA, MonitoringTemplateUtils.TEMPLATE_VERSION);
|
||||
return String.format(Locale.ROOT, "%s-%s-%s", PREFIX, DATA, MonitoringTemplateUtils.TEMPLATE_VERSION);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -180,9 +180,9 @@ public abstract class MonitoringIndexNameResolver<T extends MonitoringDoc> {
|
||||
}
|
||||
|
||||
// Used in tests
|
||||
protected Timestamped(MonitoredSystem system, Settings settings, Integer version) {
|
||||
protected Timestamped(MonitoredSystem system, Settings settings, String version) {
|
||||
this.system = system;
|
||||
this.index = String.join(DELIMITER, PREFIX, system.getSystem(), String.valueOf(version));
|
||||
this.index = String.join(DELIMITER, PREFIX, system.getSystem(), version);
|
||||
String format = INDEX_NAME_TIME_FORMAT_SETTING.get(settings);
|
||||
try {
|
||||
this.formatter = DateTimeFormat.forPattern(format).withZoneUTC();
|
||||
@ -209,7 +209,7 @@ public abstract class MonitoringIndexNameResolver<T extends MonitoringDoc> {
|
||||
|
||||
@Override
|
||||
public String templateName() {
|
||||
return String.format(Locale.ROOT, "%s-%s-%d", PREFIX, getId(), MonitoringTemplateUtils.TEMPLATE_VERSION);
|
||||
return String.format(Locale.ROOT, "%s-%s-%s", PREFIX, getId(), MonitoringTemplateUtils.TEMPLATE_VERSION);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -5,7 +5,6 @@
|
||||
*/
|
||||
package org.elasticsearch.xpack.monitoring.agent.resolver;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.xpack.monitoring.MonitoredSystem;
|
||||
import org.elasticsearch.xpack.monitoring.action.MonitoringBulkDoc;
|
||||
@ -21,6 +20,7 @@ import org.elasticsearch.xpack.monitoring.agent.collector.indices.IndicesStatsMo
|
||||
import org.elasticsearch.xpack.monitoring.agent.collector.node.NodeStatsMonitoringDoc;
|
||||
import org.elasticsearch.xpack.monitoring.agent.collector.shards.ShardMonitoringDoc;
|
||||
import org.elasticsearch.xpack.monitoring.agent.exporter.MonitoringDoc;
|
||||
import org.elasticsearch.xpack.monitoring.agent.exporter.MonitoringTemplateUtils;
|
||||
import org.elasticsearch.xpack.monitoring.agent.resolver.bulk.MonitoringBulkDataResolver;
|
||||
import org.elasticsearch.xpack.monitoring.agent.resolver.bulk.MonitoringBulkTimestampedResolver;
|
||||
import org.elasticsearch.xpack.monitoring.agent.resolver.cluster.ClusterInfoResolver;
|
||||
@ -74,8 +74,13 @@ public class ResolversRegistry implements Iterable<MonitoringIndexNameResolver>
|
||||
private void registerMonitoredSystem(MonitoredSystem id, Settings settings) {
|
||||
final MonitoringBulkDataResolver dataResolver = new MonitoringBulkDataResolver();
|
||||
final MonitoringBulkTimestampedResolver timestampedResolver = new MonitoringBulkTimestampedResolver(id, settings);
|
||||
registrations.add(resolveByClassSystemVersion(id, dataResolver, MonitoringIndex.DATA, Version.CURRENT));
|
||||
registrations.add(resolveByClassSystemVersion(id, timestampedResolver, MonitoringIndex.TIMESTAMPED, Version.CURRENT));
|
||||
|
||||
final String currentApiVersion = MonitoringTemplateUtils.TEMPLATE_VERSION;
|
||||
|
||||
// Note: We resolve requests by the API version that is supplied; this allows us to translate and up-convert any older
|
||||
// requests that come through the _xpack/monitoring/_bulk endpoint
|
||||
registrations.add(resolveByClassSystemVersion(id, dataResolver, MonitoringIndex.DATA, currentApiVersion));
|
||||
registrations.add(resolveByClassSystemVersion(id, timestampedResolver, MonitoringIndex.TIMESTAMPED, currentApiVersion));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -100,7 +105,7 @@ public class ResolversRegistry implements Iterable<MonitoringIndexNameResolver>
|
||||
}
|
||||
|
||||
static Registration resolveByClassSystemVersion(MonitoredSystem system, MonitoringIndexNameResolver resolver, MonitoringIndex index,
|
||||
Version version) {
|
||||
String apiVersion) {
|
||||
return new Registration(resolver, doc -> {
|
||||
try {
|
||||
if (doc instanceof MonitoringBulkDoc == false || index != ((MonitoringBulkDoc)doc).getIndex()) {
|
||||
@ -109,7 +114,7 @@ public class ResolversRegistry implements Iterable<MonitoringIndexNameResolver>
|
||||
if (system != MonitoredSystem.fromSystem(doc.getMonitoringId())) {
|
||||
return false;
|
||||
}
|
||||
return version == Version.fromString(doc.getMonitoringVersion());
|
||||
return apiVersion.equals(doc.getMonitoringVersion());
|
||||
} catch (Exception e) {
|
||||
return false;
|
||||
}
|
||||
|
@ -29,7 +29,7 @@ import static org.elasticsearch.rest.RestRequest.Method.PUT;
|
||||
public class RestMonitoringBulkAction extends MonitoringRestHandler {
|
||||
|
||||
public static final String MONITORING_ID = "system_id";
|
||||
public static final String MONITORING_VERSION = "system_version";
|
||||
public static final String MONITORING_VERSION = "system_api_version";
|
||||
|
||||
@Inject
|
||||
public RestMonitoringBulkAction(Settings settings, RestController controller) {
|
||||
|
@ -36,13 +36,13 @@ public class MonitoringBulkRequestTests extends ESTestCase {
|
||||
MonitoringBulkDoc doc = new MonitoringBulkDoc(null, null);
|
||||
|
||||
assertValidationErrors(new MonitoringBulkRequest().add(doc), hasItems("monitored system id is missing for monitoring document [0]",
|
||||
"monitored system version is missing for monitoring document [0]",
|
||||
"monitored system API version is missing for monitoring document [0]",
|
||||
"type is missing for monitoring document [0]",
|
||||
"source is missing for monitoring document [0]"));
|
||||
|
||||
doc = new MonitoringBulkDoc("id", null);
|
||||
assertValidationErrors(new MonitoringBulkRequest().add(doc),
|
||||
hasItems("monitored system version is missing for monitoring document [0]",
|
||||
hasItems("monitored system API version is missing for monitoring document [0]",
|
||||
"type is missing for monitoring document [0]",
|
||||
"source is missing for monitoring document [0]"));
|
||||
|
||||
@ -92,7 +92,7 @@ public class MonitoringBulkRequestTests extends ESTestCase {
|
||||
|
||||
assertValidationErrors(request, hasItems("type is missing for monitoring document [1]",
|
||||
"source is missing for monitoring document [2]",
|
||||
"monitored system version is missing for monitoring document [3]",
|
||||
"monitored system API version is missing for monitoring document [3]",
|
||||
"monitored system id is missing for monitoring document [4]"));
|
||||
|
||||
}
|
||||
|
@ -5,13 +5,12 @@
|
||||
*/
|
||||
package org.elasticsearch.xpack.monitoring.action;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
import org.elasticsearch.xpack.monitoring.MonitoredSystem;
|
||||
import org.elasticsearch.xpack.monitoring.agent.exporter.MonitoringTemplateUtils;
|
||||
import org.elasticsearch.xpack.monitoring.agent.resolver.bulk.MonitoringBulkTimestampedResolver;
|
||||
import org.elasticsearch.xpack.monitoring.test.MonitoringIntegTestCase;
|
||||
|
||||
@ -29,7 +28,6 @@ import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
@TestLogging("_root:DEBUG")
|
||||
public class MonitoringBulkTests extends MonitoringIntegTestCase {
|
||||
|
||||
@Override
|
||||
@ -43,7 +41,7 @@ public class MonitoringBulkTests extends MonitoringIntegTestCase {
|
||||
|
||||
int numDocs = scaledRandomIntBetween(100, 5000);
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
MonitoringBulkDoc doc = new MonitoringBulkDoc(MonitoredSystem.KIBANA.getSystem(), Version.CURRENT.toString());
|
||||
MonitoringBulkDoc doc = new MonitoringBulkDoc(MonitoredSystem.KIBANA.getSystem(), MonitoringTemplateUtils.TEMPLATE_VERSION);
|
||||
doc.setType(randomFrom(types));
|
||||
doc.setSource(jsonBuilder().startObject().field("num", numDocs).endObject().bytes());
|
||||
requestBuilder.add(doc);
|
||||
@ -95,7 +93,8 @@ public class MonitoringBulkTests extends MonitoringIntegTestCase {
|
||||
|
||||
int numDocs = scaledRandomIntBetween(10, 50);
|
||||
for (int k = 0; k < numDocs; k++) {
|
||||
MonitoringBulkDoc doc = new MonitoringBulkDoc(MonitoredSystem.KIBANA.getSystem(), Version.CURRENT.toString());
|
||||
MonitoringBulkDoc doc =
|
||||
new MonitoringBulkDoc(MonitoredSystem.KIBANA.getSystem(), MonitoringTemplateUtils.TEMPLATE_VERSION);
|
||||
doc.setType("concurrent");
|
||||
doc.setSource(jsonBuilder().startObject().field("num", k).endObject().bytes());
|
||||
requestBuilder.add(doc);
|
||||
@ -133,10 +132,10 @@ public class MonitoringBulkTests extends MonitoringIntegTestCase {
|
||||
for (int i = 0; i < totalDocs; i++) {
|
||||
MonitoringBulkDoc doc;
|
||||
if (randomBoolean()) {
|
||||
doc = new MonitoringBulkDoc("unknown", Version.CURRENT.toString());
|
||||
doc = new MonitoringBulkDoc("unknown", MonitoringTemplateUtils.TEMPLATE_VERSION);
|
||||
unsupportedDocs++;
|
||||
} else {
|
||||
doc = new MonitoringBulkDoc(MonitoredSystem.KIBANA.getSystem(), Version.CURRENT.toString());
|
||||
doc = new MonitoringBulkDoc(MonitoredSystem.KIBANA.getSystem(), MonitoringTemplateUtils.TEMPLATE_VERSION);
|
||||
}
|
||||
doc.setType(randomFrom(types));
|
||||
doc.setSource(jsonBuilder().startObject().field("num", i).endObject().bytes());
|
||||
|
@ -13,6 +13,7 @@ import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.xpack.monitoring.MonitoredSystem;
|
||||
import org.elasticsearch.xpack.monitoring.action.MonitoringBulkDoc;
|
||||
import org.elasticsearch.xpack.monitoring.action.MonitoringIndex;
|
||||
import org.elasticsearch.xpack.monitoring.agent.exporter.MonitoringTemplateUtils;
|
||||
import org.elasticsearch.xpack.monitoring.agent.resolver.MonitoringIndexNameResolverTestCase;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
@ -29,7 +30,7 @@ public class MonitoringBulkDataResolverTests extends MonitoringIndexNameResolver
|
||||
|
||||
@Override
|
||||
protected MonitoringBulkDoc newMonitoringDoc() {
|
||||
MonitoringBulkDoc doc = new MonitoringBulkDoc(MonitoredSystem.KIBANA.getSystem(), Version.CURRENT.toString(),
|
||||
MonitoringBulkDoc doc = new MonitoringBulkDoc(MonitoredSystem.KIBANA.getSystem(), MonitoringTemplateUtils.TEMPLATE_VERSION,
|
||||
MonitoringIndex.DATA, "kibana", id,
|
||||
new BytesArray("{\"field1\" : \"value1\"}"));
|
||||
|
||||
|
@ -13,6 +13,7 @@ import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.xpack.monitoring.MonitoredSystem;
|
||||
import org.elasticsearch.xpack.monitoring.action.MonitoringBulkDoc;
|
||||
import org.elasticsearch.xpack.monitoring.action.MonitoringIndex;
|
||||
import org.elasticsearch.xpack.monitoring.agent.exporter.MonitoringTemplateUtils;
|
||||
import org.elasticsearch.xpack.monitoring.agent.resolver.MonitoringIndexNameResolverTestCase;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
@ -28,7 +29,7 @@ public class MonitoringBulkTimestampedResolverTests
|
||||
|
||||
@Override
|
||||
protected MonitoringBulkDoc newMonitoringDoc() {
|
||||
MonitoringBulkDoc doc = new MonitoringBulkDoc(MonitoredSystem.KIBANA.getSystem(), Version.CURRENT.toString(),
|
||||
MonitoringBulkDoc doc = new MonitoringBulkDoc(MonitoredSystem.KIBANA.getSystem(), MonitoringTemplateUtils.TEMPLATE_VERSION,
|
||||
MonitoringIndex.TIMESTAMPED, "kibana_stats", null,
|
||||
new BytesArray("{\"field1\" : \"value1\"}"));
|
||||
|
||||
|
@ -77,8 +77,8 @@ public abstract class AbstractIndicesCleanerTestCase extends MonitoringIntegTest
|
||||
|
||||
// Won't be deleted
|
||||
createIndex(MonitoringSettings.LEGACY_DATA_INDEX_NAME, now().minusYears(1));
|
||||
createDataIndex(now().minusDays(10), 0);
|
||||
createDataIndex(now().minusDays(10), 1);
|
||||
createDataIndex(now().minusDays(10), "0");
|
||||
createDataIndex(now().minusDays(10), "1");
|
||||
assertIndicesCount(4);
|
||||
|
||||
CleanerService.Listener listener = getListener();
|
||||
@ -108,9 +108,9 @@ public abstract class AbstractIndicesCleanerTestCase extends MonitoringIntegTest
|
||||
createTimestampedIndex(now().minusDays(10));
|
||||
|
||||
// Won't be deleted
|
||||
createTimestampedIndex(now().minusDays(10), 0);
|
||||
createTimestampedIndex(now().minusDays(10), 1);
|
||||
createTimestampedIndex(now().minusDays(10), Integer.MAX_VALUE);
|
||||
createTimestampedIndex(now().minusDays(10), "0");
|
||||
createTimestampedIndex(now().minusDays(10), "1");
|
||||
createTimestampedIndex(now().minusDays(10), String.valueOf(Integer.MAX_VALUE));
|
||||
assertIndicesCount(4);
|
||||
|
||||
CleanerService.Listener listener = getListener();
|
||||
@ -198,7 +198,7 @@ public abstract class AbstractIndicesCleanerTestCase extends MonitoringIntegTest
|
||||
/**
|
||||
* Creates a monitoring data index in a given version.
|
||||
*/
|
||||
protected void createDataIndex(DateTime creationDate, int version) {
|
||||
protected void createDataIndex(DateTime creationDate, String version) {
|
||||
createIndex(new MockDataIndexNameResolver(version).index(randomMonitoringDoc()), creationDate);
|
||||
}
|
||||
|
||||
@ -212,7 +212,7 @@ public abstract class AbstractIndicesCleanerTestCase extends MonitoringIntegTest
|
||||
/**
|
||||
* Creates a monitoring timestamped index using a given template version.
|
||||
*/
|
||||
protected void createTimestampedIndex(DateTime creationDate, int version) {
|
||||
protected void createTimestampedIndex(DateTime creationDate, String version) {
|
||||
MonitoringDoc monitoringDoc = randomMonitoringDoc();
|
||||
monitoringDoc.setTimestamp(creationDate.getMillis());
|
||||
|
||||
|
@ -440,7 +440,7 @@ public abstract class MonitoringIntegTestCase extends ESIntegTestCase {
|
||||
|
||||
public class MockDataIndexNameResolver extends MonitoringIndexNameResolver.Data<MonitoringDoc> {
|
||||
|
||||
public MockDataIndexNameResolver(Integer version) {
|
||||
public MockDataIndexNameResolver(String version) {
|
||||
super(version);
|
||||
}
|
||||
|
||||
@ -462,7 +462,7 @@ public abstract class MonitoringIntegTestCase extends ESIntegTestCase {
|
||||
|
||||
protected class MockTimestampedIndexNameResolver extends MonitoringIndexNameResolver.Timestamped<MonitoringDoc> {
|
||||
|
||||
public MockTimestampedIndexNameResolver(MonitoredSystem system, Settings settings, Integer version) {
|
||||
public MockTimestampedIndexNameResolver(MonitoredSystem system, Settings settings, String version) {
|
||||
super(system, settings, version);
|
||||
}
|
||||
|
||||
|
@ -16,9 +16,9 @@
|
||||
"type": "string",
|
||||
"description" : "Identifier of the monitored system"
|
||||
},
|
||||
"system_version" : {
|
||||
"system_api_version" : {
|
||||
"type" : "string",
|
||||
"description" : "Version of the monitored system"
|
||||
"description" : "API Version of the monitored system"
|
||||
}
|
||||
}
|
||||
},
|
||||
|
@ -13,14 +13,11 @@ setup:
|
||||
|
||||
---
|
||||
"Bulk indexing of monitoring data":
|
||||
# Get the current version
|
||||
- do: {info: {}}
|
||||
- set: {version.number: version}
|
||||
|
||||
- do:
|
||||
xpack.monitoring.bulk:
|
||||
system_id: "kibana"
|
||||
system_version: $version
|
||||
system_api_version: "2"
|
||||
body:
|
||||
- index:
|
||||
_type: test_type
|
||||
@ -54,7 +51,7 @@ setup:
|
||||
- do:
|
||||
xpack.monitoring.bulk:
|
||||
system_id: "kibana"
|
||||
system_version: $version
|
||||
system_api_version: "2"
|
||||
type: "default_type"
|
||||
body:
|
||||
- '{"index": {}}'
|
||||
|
Loading…
x
Reference in New Issue
Block a user