From 59b50bb18c2c3ab4e2c6944ab04afb9f7ca4591e Mon Sep 17 00:00:00 2001 From: David Kyle Date: Tue, 28 Feb 2017 15:34:05 +0000 Subject: [PATCH] [ML] Index template registry with simple versioning (elastic/x-pack-elasticsearch#655) [ML] * Add MachineLearningTemplateRegistry class [ML] * Add blocking method to put templates required by tests [ML] * Add version check for templates [ML] * Review comments Original commit: elastic/x-pack-elasticsearch@07d315e56d4a964241b5af1843ed00d22d5e321a --- .../xpack/ml/MachineLearning.java | 3 +- .../ml/MachineLearningTemplateRegistry.java | 349 ++++++++++++++++++ .../xpack/ml/MlInitializationService.java | 93 +---- .../xpack/ml/action/DeleteFilterAction.java | 5 +- .../xpack/ml/action/GetFiltersAction.java | 5 +- .../xpack/ml/action/OpenJobAction.java | 3 +- .../xpack/ml/action/PutFilterAction.java | 4 +- .../persistence/AnomalyDetectorsIndex.java | 6 + .../xpack/ml/job/persistence/JobProvider.java | 175 +-------- .../MachineLearningTemplateRegistryTests.java | 302 +++++++++++++++ .../ml/MlInitializationServiceTests.java | 101 +---- .../xpack/ml/action/OpenJobActionTests.java | 2 +- .../AutodetectResultProcessorIT.java | 30 +- .../ml/job/persistence/JobProviderTests.java | 126 +------ 14 files changed, 702 insertions(+), 502 deletions(-) create mode 100644 plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearningTemplateRegistry.java create mode 100644 plugin/src/test/java/org/elasticsearch/xpack/ml/MachineLearningTemplateRegistryTests.java diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 94135372f7b..4c0ec6a1d2e 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -304,7 +304,8 @@ public class MachineLearning extends Plugin implements ActionPlugin { jobProvider, jobManager, dataProcessor, - new MlInitializationService(settings, threadPool, clusterService, client, jobProvider, auditor), + new MlInitializationService(settings, threadPool, clusterService, client, auditor), + new MachineLearningTemplateRegistry(settings, clusterService, client, threadPool), jobDataCountsPersister, datafeedJobRunner, persistentActionService, diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearningTemplateRegistry.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearningTemplateRegistry.java new file mode 100644 index 00000000000..55e36849b97 --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearningTemplateRegistry.java @@ -0,0 +1,349 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml; + +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.gateway.GatewayService; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.ml.job.persistence.ElasticsearchMappings; +import org.elasticsearch.xpack.ml.job.process.autodetect.state.CategorizerState; +import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; +import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; +import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelState; +import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles; +import org.elasticsearch.xpack.ml.job.results.CategoryDefinition; +import org.elasticsearch.xpack.ml.job.results.Result; +import org.elasticsearch.xpack.ml.notifications.AuditActivity; +import org.elasticsearch.xpack.ml.notifications.AuditMessage; +import org.elasticsearch.xpack.ml.notifications.Auditor; + +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; + +/** + * Registry for the ML index templates and settings + */ +public class MachineLearningTemplateRegistry extends AbstractComponent implements ClusterStateListener { + private static final String ASYNC = "async"; + + private final Client client; + private final ThreadPool threadPool; + + final AtomicBoolean putMlNotificationsIndexTemplateCheck = new AtomicBoolean(false); + final AtomicBoolean putMlMetaIndexTemplateCheck = new AtomicBoolean(false); + final AtomicBoolean putStateIndexTemplateCheck = new AtomicBoolean(false); + final AtomicBoolean putResultsIndexTemplateCheck = new AtomicBoolean(false); + + // Allows us in test mode to disable the delay of shard allocation, so that in tests we don't have to wait for + // for at least a minute for shards to get allocated. + private final TimeValue delayedNodeTimeOutSetting; + + public MachineLearningTemplateRegistry(Settings settings, ClusterService clusterService, Client client, + ThreadPool threadPool) { + super(settings); + this.client = client; + this.threadPool = threadPool; + // Whether we are using native process is a good way to detect whether we are in dev / test mode: + if (MachineLearning.AUTODETECT_PROCESS.get(settings)) { + delayedNodeTimeOutSetting = UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(settings); + } else { + delayedNodeTimeOutSetting = TimeValue.timeValueNanos(0); + } + + clusterService.addListener(this); + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + if (event.localNodeMaster()) { + + // wait until the gateway has recovered from disk, + // otherwise we think may not have the index templates while they actually do exist + if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK) == false) { + addTemplatesIfMissing(event.state(), () -> {}); + } + } + } + + /** + * Blocking call adds the registered index templates if missing to the + * cluster waiting until the templates have been updated. + */ + public void addTemplatesIfMissing(ClusterState state) throws InterruptedException { + // to be sure that the templates exist after this method call, we should wait until the put index templates calls + // have returned if the templates were missing + CountDownLatch latch = new CountDownLatch(4); + addTemplatesIfMissing(state, latch::countDown); + + latch.await(); + } + + private void addTemplatesIfMissing(ClusterState state, Runnable callback) { + MetaData metaData = state.metaData(); + addMlNotificationsIndexTemplate(metaData, callback); + addMlMetaIndexTemplate(metaData, callback); + addStateIndexTemplate(metaData, callback); + addResultsIndexTemplate(metaData, callback); + } + + boolean templateIsPresentAndUpToDate(String templateName, MetaData metaData) { + IndexTemplateMetaData templateMetaData = metaData.templates().get(templateName); + if (templateMetaData == null) { + return false; + } + + return templateMetaData.version() != null && templateMetaData.version() >= Version.CURRENT.id; + } + + private void addMlNotificationsIndexTemplate(MetaData metaData, Runnable callback) { + if (templateIsPresentAndUpToDate(Auditor.NOTIFICATIONS_INDEX, metaData) == false) { + if (putMlNotificationsIndexTemplateCheck.compareAndSet(false, true)) { + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { + putNotificationMessageIndexTemplate((result, error) -> { + putMlNotificationsIndexTemplateCheck.set(false); + if (result) { + logger.info("successfully created {} index template", Auditor.NOTIFICATIONS_INDEX); + } else { + logger.error( + new ParameterizedMessage("not able to create {} index template", Auditor.NOTIFICATIONS_INDEX), error); + } + callback.run(); + }); + }); + } else { + callback.run(); + } + } else { + callback.run(); + } + } + + private void addMlMetaIndexTemplate(MetaData metaData, Runnable callback) { + if (templateIsPresentAndUpToDate(AnomalyDetectorsIndex.ML_META_INDEX, metaData) == false) { + if (putMlMetaIndexTemplateCheck.compareAndSet(false, true)) { + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { + putMetaIndexTemplate((result, error) -> { + putMlMetaIndexTemplateCheck.set(false); + if (result) { + logger.info("successfully created {} index template", AnomalyDetectorsIndex.ML_META_INDEX); + } else { + logger.error(new ParameterizedMessage( + "not able to create {} index template", AnomalyDetectorsIndex.ML_META_INDEX), error); + } + callback.run(); + }); + }); + } else { + callback.run(); + } + } else { + callback.run(); + } + } + + private void addStateIndexTemplate(MetaData metaData, Runnable callback) { + String stateIndexName = AnomalyDetectorsIndex.jobStateIndexName(); + if (templateIsPresentAndUpToDate(stateIndexName, metaData) == false) { + if (putStateIndexTemplateCheck.compareAndSet(false, true)) { + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { + putJobStateIndexTemplate((result, error) -> { + putStateIndexTemplateCheck.set(false); + if (result) { + logger.info("successfully created {} index template", stateIndexName); + } else { + logger.error("not able to create " + stateIndexName + " index template", error); + } + callback.run(); + }); + }); + } else { + callback.run(); + } + } else { + callback.run(); + } + } + + private void addResultsIndexTemplate(MetaData metaData, Runnable callback) { + if (templateIsPresentAndUpToDate(AnomalyDetectorsIndex.jobResultsIndexPrefix(), metaData) == false) { + if (putResultsIndexTemplateCheck.compareAndSet(false, true)) { + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { + putJobResultsIndexTemplate((result, error) -> { + putResultsIndexTemplateCheck.set(false); + if (result) { + logger.info("successfully created {} index template", AnomalyDetectorsIndex.jobResultsIndexPrefix()); + } else { + logger.error( + new ParameterizedMessage("not able to create {} index template", + AnomalyDetectorsIndex.jobResultsIndexPrefix()), error); + } + callback.run(); + }); + }); + } else { + callback.run(); + } + } else { + callback.run(); + } + } + + /** + * Index template for notifications + */ + void putNotificationMessageIndexTemplate(BiConsumer listener) { + try { + PutIndexTemplateRequest templateRequest = new PutIndexTemplateRequest(Auditor.NOTIFICATIONS_INDEX); + templateRequest.patterns(Collections.singletonList(Auditor.NOTIFICATIONS_INDEX)); + templateRequest.settings(mlNotificationIndexSettings()); + templateRequest.mapping(AuditMessage.TYPE.getPreferredName(), ElasticsearchMappings.auditMessageMapping()); + templateRequest.mapping(AuditActivity.TYPE.getPreferredName(), ElasticsearchMappings.auditActivityMapping()); + templateRequest.version(Version.CURRENT.id); + client.admin().indices().putTemplate(templateRequest, + ActionListener.wrap(r -> listener.accept(true, null), e -> listener.accept(false, e))); + } catch (IOException e) { + logger.warn("Error putting the template for the notification message index", e); + } + } + + /** + * Index template for meta data + */ + void putMetaIndexTemplate(BiConsumer listener) { + PutIndexTemplateRequest templateRequest = new PutIndexTemplateRequest(AnomalyDetectorsIndex.ML_META_INDEX); + templateRequest.patterns(Collections.singletonList(AnomalyDetectorsIndex.ML_META_INDEX)); + templateRequest.settings(mlNotificationIndexSettings()); + templateRequest.version(Version.CURRENT.id); + + client.admin().indices().putTemplate(templateRequest, + ActionListener.wrap(r -> listener.accept(true, null), e -> listener.accept(false, e))); + } + + void putJobStateIndexTemplate(BiConsumer listener) { + try { + XContentBuilder categorizerStateMapping = ElasticsearchMappings.categorizerStateMapping(); + XContentBuilder quantilesMapping = ElasticsearchMappings.quantilesMapping(); + XContentBuilder modelStateMapping = ElasticsearchMappings.modelStateMapping(); + + PutIndexTemplateRequest templateRequest = new PutIndexTemplateRequest(AnomalyDetectorsIndex.jobStateIndexName()); + templateRequest.patterns(Collections.singletonList(AnomalyDetectorsIndex.jobStateIndexName())); + templateRequest.settings(mlStateIndexSettings()); + templateRequest.mapping(CategorizerState.TYPE, categorizerStateMapping); + templateRequest.mapping(Quantiles.TYPE.getPreferredName(), quantilesMapping); + templateRequest.mapping(ModelState.TYPE.getPreferredName(), modelStateMapping); + templateRequest.version(Version.CURRENT.id); + + client.admin().indices().putTemplate(templateRequest, + ActionListener.wrap(r -> listener.accept(true, null), e -> listener.accept(false, e))); + } catch (IOException e) { + logger.error("Error creating template mappings for the " + AnomalyDetectorsIndex.jobStateIndexName() + " index", e); + } + } + + void putJobResultsIndexTemplate(BiConsumer listener) { + try { + XContentBuilder resultsMapping = ElasticsearchMappings.resultsMapping(); + XContentBuilder categoryDefinitionMapping = ElasticsearchMappings.categoryDefinitionMapping(); + XContentBuilder dataCountsMapping = ElasticsearchMappings.dataCountsMapping(); + XContentBuilder modelSnapshotMapping = ElasticsearchMappings.modelSnapshotMapping(); + + PutIndexTemplateRequest templateRequest = new PutIndexTemplateRequest(AnomalyDetectorsIndex.jobResultsIndexPrefix()); + templateRequest.patterns(Collections.singletonList(AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*")); + templateRequest.settings(mlResultsIndexSettings()); + templateRequest.mapping(Result.TYPE.getPreferredName(), resultsMapping); + templateRequest.mapping(CategoryDefinition.TYPE.getPreferredName(), categoryDefinitionMapping); + templateRequest.mapping(DataCounts.TYPE.getPreferredName(), dataCountsMapping); + templateRequest.mapping(ModelSnapshot.TYPE.getPreferredName(), modelSnapshotMapping); + templateRequest.version(Version.CURRENT.id); + + client.admin().indices().putTemplate(templateRequest, + ActionListener.wrap(r -> listener.accept(true, null), e -> listener.accept(false, e))); + } catch (IOException e) { + logger.error("Error creating template mappings for the " + AnomalyDetectorsIndex.jobResultsIndexPrefix() + " indices", e); + } + } + + /** + * Build the index settings that we want to apply to results indexes. + * + * @return Builder initialised with the desired setting for the ML results indices. + */ + Settings.Builder mlResultsIndexSettings() { + return Settings.builder() + // Our indexes are small and one shard puts the + // least possible burden on Elasticsearch + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-2") + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), delayedNodeTimeOutSetting) + // Sacrifice durability for performance: in the event of power + // failure we can lose the last 5 seconds of changes, but it's + // much faster + .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), ASYNC) + // We need to allow fields not mentioned in the mappings to + // pick up default mappings and be used in queries + .put(MapperService.INDEX_MAPPER_DYNAMIC_SETTING.getKey(), true) + // set the default all search field + .put(IndexSettings.DEFAULT_FIELD_SETTING.getKey(), ElasticsearchMappings.ALL_FIELD_VALUES); + } + + /** + * Settings for the notification messages index + * + * @return Builder initialised with the desired setting for the ML index. + */ + Settings.Builder mlNotificationIndexSettings() { + return Settings.builder() + // Our indexes are small and one shard puts the + // least possible burden on Elasticsearch + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-2") + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), delayedNodeTimeOutSetting) + // We need to allow fields not mentioned in the mappings to + // pick up default mappings and be used in queries + .put(MapperService.INDEX_MAPPER_DYNAMIC_SETTING.getKey(), true); + } + + /** + * Settings for the state index + * + * @return Builder initialised with the desired setting for the ML index. + */ + Settings.Builder mlStateIndexSettings() { + // TODO review these settings + return Settings.builder() + // Our indexes are small and one shard puts the + // least possible burden on Elasticsearch + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-2") + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), delayedNodeTimeOutSetting) + // Sacrifice durability for performance: in the event of power + // failure we can lose the last 5 seconds of changes, but it's + // much faster + .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), ASYNC); + } +} diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java index c6477251a25..442d654dd26 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java @@ -17,8 +17,6 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex; -import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.retention.ExpiredModelSnapshotsRemover; import org.elasticsearch.xpack.ml.job.retention.ExpiredResultsRemover; import org.elasticsearch.xpack.ml.notifications.Auditor; @@ -31,24 +29,17 @@ public class MlInitializationService extends AbstractComponent implements Cluste private final ThreadPool threadPool; private final ClusterService clusterService; private final Client client; - private final JobProvider jobProvider; private final Auditor auditor; private final AtomicBoolean installMlMetadataCheck = new AtomicBoolean(false); - private final AtomicBoolean putMlNotificationsIndexTemplateCheck = new AtomicBoolean(false); - private final AtomicBoolean putMlMetaIndexTemplateCheck = new AtomicBoolean(false); - private final AtomicBoolean putStateIndexTemplateCheck = new AtomicBoolean(false); - private final AtomicBoolean putResultsIndexTemplateCheck = new AtomicBoolean(false); - private volatile MlDailyManagementService mlDailyManagementService; public MlInitializationService(Settings settings, ThreadPool threadPool, ClusterService clusterService, Client client, - JobProvider jobProvider, Auditor auditor) { + Auditor auditor) { super(settings); this.threadPool = threadPool; this.clusterService = clusterService; this.client = client; - this.jobProvider = jobProvider; this.auditor = auditor; clusterService.addListener(this); clusterService.addLifecycleListener(new LifecycleListener() { @@ -64,10 +55,6 @@ public class MlInitializationService extends AbstractComponent implements Cluste if (event.localNodeMaster()) { MetaData metaData = event.state().metaData(); installMlMetadata(metaData); - putMlNoficationsIndexTemplate(metaData); - putMlMetaIndexTemplate(metaData); - putStateIndexTemplate(metaData); - putResultsIndexTemplate(metaData); installDailyManagementService(); } else { uninstallDailyManagementService(); @@ -100,84 +87,6 @@ public class MlInitializationService extends AbstractComponent implements Cluste } } - private void putMlNoficationsIndexTemplate(MetaData metaData) { - if (metaData.templates().containsKey(Auditor.NOTIFICATIONS_INDEX) == false) { - if (putMlNotificationsIndexTemplateCheck.compareAndSet(false, true)) { - threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { - jobProvider.putNotificationMessageIndexTemplate((result, error) -> { - if (result) { - logger.info("successfully created {} index template", Auditor.NOTIFICATIONS_INDEX); - } else { - logger.error( - new ParameterizedMessage("not able to create {} index template", Auditor.NOTIFICATIONS_INDEX), error); - } - }); - }); - } else { - putMlNotificationsIndexTemplateCheck.set(false); - } - } - } - - private void putMlMetaIndexTemplate(MetaData metaData) { - if (metaData.templates().containsKey(JobProvider.ML_META_INDEX) == false) { - if (putMlMetaIndexTemplateCheck.compareAndSet(false, true)) { - threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { - jobProvider.putMetaIndexTemplate((result, error) -> { - if (result) { - logger.info("successfully created {} index template", JobProvider.ML_META_INDEX); - } else { - logger.error( - new ParameterizedMessage("not able to create {} index template", JobProvider.ML_META_INDEX), error); - } - }); - }); - } else { - putMlMetaIndexTemplateCheck.set(false); - } - - } - } - - private void putStateIndexTemplate(MetaData metaData) { - String stateIndexName = AnomalyDetectorsIndex.jobStateIndexName(); - if (metaData.templates().containsKey(stateIndexName) == false) { - if (putStateIndexTemplateCheck.compareAndSet(false, true)) { - threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { - jobProvider.putJobStateIndexTemplate((result, error) -> { - if (result) { - logger.info("successfully created {} index template", stateIndexName); - } else { - logger.error("not able to create " + stateIndexName + " index template", error); - } - }); - }); - } else { - putStateIndexTemplateCheck.set(false); - } - } - } - - private void putResultsIndexTemplate(MetaData metaData) { - if (metaData.templates().containsKey(AnomalyDetectorsIndex.jobResultsIndexPrefix()) == false) { - if (putResultsIndexTemplateCheck.compareAndSet(false, true)) { - threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { - jobProvider.putJobResultsIndexTemplate((result, error) -> { - if (result) { - logger.info("successfully created {} index template", AnomalyDetectorsIndex.jobResultsIndexPrefix()); - } else { - logger.error( - new ParameterizedMessage("not able to create {} index template", - AnomalyDetectorsIndex.jobResultsIndexPrefix()), error); - } - }); - }); - } else { - putResultsIndexTemplateCheck.set(false); - } - } - } - private void installDailyManagementService() { if (mlDailyManagementService == null) { mlDailyManagementService = new MlDailyManagementService(threadPool, Arrays.asList((MlDailyManagementService.Listener) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteFilterAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteFilterAction.java index 60e175232ed..db8cc2bc152 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteFilterAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteFilterAction.java @@ -36,7 +36,7 @@ import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.job.config.Detector; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.MlFilter; -import org.elasticsearch.xpack.ml.job.persistence.JobProvider; +import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import java.io.IOException; @@ -188,7 +188,8 @@ public class DeleteFilterAction extends Action() { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/GetFiltersAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/GetFiltersAction.java index bbca78c646f..030944c7703 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/GetFiltersAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/GetFiltersAction.java @@ -44,6 +44,7 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ml.action.util.PageParams; import org.elasticsearch.xpack.ml.action.util.QueryPage; import org.elasticsearch.xpack.ml.job.config.MlFilter; +import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import java.io.IOException; @@ -262,7 +263,7 @@ public class GetFiltersAction extends Action listener) { - GetRequest getRequest = new GetRequest(JobProvider.ML_META_INDEX, MlFilter.TYPE.getPreferredName(), filterId); + GetRequest getRequest = new GetRequest(AnomalyDetectorsIndex.ML_META_INDEX, MlFilter.TYPE.getPreferredName(), filterId); transportGetAction.execute(getRequest, new ActionListener() { @Override public void onResponse(GetResponse getDocResponse) { @@ -299,7 +300,7 @@ public class GetFiltersAction extends Action verifyIndicesPrimaryShardsAreActive(Logger logger, String jobId, ClusterState clusterState) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/PutFilterAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/PutFilterAction.java index d175dbb88a9..c960a1c8350 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/PutFilterAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/PutFilterAction.java @@ -34,7 +34,7 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ml.job.config.MlFilter; -import org.elasticsearch.xpack.ml.job.persistence.JobProvider; +import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import java.io.IOException; @@ -177,7 +177,7 @@ public class PutFilterAction extends Action listener) throws Exception { MlFilter filter = request.getFilter(); final String filterId = filter.getId(); - IndexRequest indexRequest = new IndexRequest(JobProvider.ML_META_INDEX, MlFilter.TYPE.getPreferredName(), filterId); + IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.ML_META_INDEX, MlFilter.TYPE.getPreferredName(), filterId); XContentBuilder builder = XContentFactory.jsonBuilder(); indexRequest.source(filter.toXContent(builder, ToXContent.EMPTY_PARAMS)); transportIndexAction.execute(indexRequest, new ActionListener() { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/AnomalyDetectorsIndex.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/AnomalyDetectorsIndex.java index fe065749d77..aead89933b3 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/AnomalyDetectorsIndex.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/AnomalyDetectorsIndex.java @@ -12,6 +12,12 @@ import org.elasticsearch.xpack.ml.MlMetadata; * Methods for handling index naming related functions */ public final class AnomalyDetectorsIndex { + /** + * Where to store the ml info in Elasticsearch - must match what's + * expected by kibana/engineAPI/app/directives/mlLogUsage.js + */ + public static final String ML_META_INDEX = ".ml-meta"; + private static final String RESULTS_INDEX_PREFIX = ".ml-anomalies-"; private static final String STATE_INDEX_NAME = ".ml-state"; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java index b24f1d932af..da416bc48c2 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java @@ -16,7 +16,6 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; -import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.MultiGetItemResponse; @@ -28,7 +27,6 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.common.Nullable; @@ -39,11 +37,9 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; -import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.IndexNotFoundException; -import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.UidFieldMapper; @@ -78,10 +74,6 @@ import org.elasticsearch.xpack.ml.job.results.Influencer; import org.elasticsearch.xpack.ml.job.results.ModelDebugOutput; import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities; import org.elasticsearch.xpack.ml.job.results.Result; -import org.elasticsearch.xpack.ml.notifications.AuditActivity; -import org.elasticsearch.xpack.ml.notifications.AuditMessage; -import org.elasticsearch.xpack.ml.notifications.Auditor; -import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import java.io.IOException; import java.io.OutputStream; @@ -95,26 +87,18 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Supplier; +import static org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex.ML_META_INDEX; + public class JobProvider { private static final Logger LOGGER = Loggers.getLogger(JobProvider.class); - /** - * Where to store the ml info in Elasticsearch - must match what's - * expected by kibana/engineAPI/app/directives/mlLogUsage.js - */ - public static final String ML_META_INDEX = ".ml-meta"; - - private static final String ASYNC = "async"; - private static final List SECONDARY_SORT = Arrays.asList( AnomalyRecord.ANOMALY_SCORE.getPreferredName(), AnomalyRecord.OVER_FIELD_VALUE.getPreferredName(), @@ -128,166 +112,11 @@ public class JobProvider { private final Client client; - private final int numberOfReplicas; - // Allows us in test mode to disable the delay of shard allocation, so that in tests we don't have to wait for - // for at least a minute for shards to get allocated. - private final TimeValue delayedNodeTimeOutSetting; - private final Settings settings; public JobProvider(Client client, int numberOfReplicas, Settings settings) { this.client = Objects.requireNonNull(client); - this.numberOfReplicas = numberOfReplicas; this.settings = settings; - - // Whether we are using native process is a good way to detect whether we are in dev / test mode: - if (MachineLearning.AUTODETECT_PROCESS.get(settings)) { - delayedNodeTimeOutSetting = UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(settings); - } else { - delayedNodeTimeOutSetting = TimeValue.timeValueNanos(0); - } - } - - /** - * Index template for notifications - */ - public void putNotificationMessageIndexTemplate(BiConsumer listener) { - try { - PutIndexTemplateRequest templateRequest = new PutIndexTemplateRequest(Auditor.NOTIFICATIONS_INDEX); - templateRequest.patterns(Collections.singletonList(Auditor.NOTIFICATIONS_INDEX)); - templateRequest.settings(mlNotificationIndexSettings()); - templateRequest.mapping(AuditMessage.TYPE.getPreferredName(), ElasticsearchMappings.auditMessageMapping()); - templateRequest.mapping(AuditActivity.TYPE.getPreferredName(), ElasticsearchMappings.auditActivityMapping()); - - client.admin().indices().putTemplate(templateRequest, - ActionListener.wrap(r -> listener.accept(true, null), e -> listener.accept(false, e))); - } catch (IOException e) { - LOGGER.warn("Error putting the template for the notification message index", e); - } - } - - /** - * Index template for meta data - */ - public void putMetaIndexTemplate(BiConsumer listener) { - PutIndexTemplateRequest templateRequest = new PutIndexTemplateRequest(ML_META_INDEX); - templateRequest.patterns(Collections.singletonList(ML_META_INDEX)); - templateRequest.settings(mlNotificationIndexSettings()); - - client.admin().indices().putTemplate(templateRequest, - ActionListener.wrap(r -> listener.accept(true, null), e -> listener.accept(false, e))); - } - - public void putJobStateIndexTemplate(BiConsumer listener) { - try { - XContentBuilder categorizerStateMapping = ElasticsearchMappings.categorizerStateMapping(); - XContentBuilder quantilesMapping = ElasticsearchMappings.quantilesMapping(); - XContentBuilder modelStateMapping = ElasticsearchMappings.modelStateMapping(); - - PutIndexTemplateRequest templateRequest = new PutIndexTemplateRequest(AnomalyDetectorsIndex.jobStateIndexName()); - templateRequest.patterns(Collections.singletonList(AnomalyDetectorsIndex.jobStateIndexName())); - templateRequest.settings(mlStateIndexSettings()); - templateRequest.mapping(CategorizerState.TYPE, categorizerStateMapping); - templateRequest.mapping(Quantiles.TYPE.getPreferredName(), quantilesMapping); - templateRequest.mapping(ModelState.TYPE.getPreferredName(), modelStateMapping); - - client.admin().indices().putTemplate(templateRequest, - ActionListener.wrap(r -> listener.accept(true, null), e -> listener.accept(false, e))); - } catch (Exception e) { - LOGGER.error("Error putting the template for the " + AnomalyDetectorsIndex.jobStateIndexName() + " index", e); - } - } - - /** - * Build the Elasticsearch index settings that we want to apply to results - * indexes. It's better to do this in code rather than in elasticsearch.yml - * because then the settings can be applied regardless of whether we're - * using our own Elasticsearch to store results or a customer's pre-existing - * Elasticsearch. - * - * @return An Elasticsearch builder initialised with the desired settings - * for Ml indexes. - */ - Settings.Builder mlResultsIndexSettings() { - return Settings.builder() - // Our indexes are small and one shard puts the - // least possible burden on Elasticsearch - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas) - .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), delayedNodeTimeOutSetting) - // Sacrifice durability for performance: in the event of power - // failure we can lose the last 5 seconds of changes, but it's - // much faster - .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), ASYNC) - // We need to allow fields not mentioned in the mappings to - // pick up default mappings and be used in queries - .put(MapperService.INDEX_MAPPER_DYNAMIC_SETTING.getKey(), true) - // set the default all search field - .put(IndexSettings.DEFAULT_FIELD_SETTING.getKey(), ElasticsearchMappings.ALL_FIELD_VALUES); - } - - /** - * Build the Elasticsearch index settings that we want to apply to the state - * index. It's better to do this in code rather than in elasticsearch.yml - * because then the settings can be applied regardless of whether we're - * using our own Elasticsearch to store results or a customer's pre-existing - * Elasticsearch. - * - * @return An Elasticsearch builder initialised with the desired settings - * for Ml indexes. - */ - Settings.Builder mlStateIndexSettings() { - // TODO review these settings - return Settings.builder() - // Our indexes are small and one shard puts the - // least possible burden on Elasticsearch - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas) - .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), delayedNodeTimeOutSetting) - // Sacrifice durability for performance: in the event of power - // failure we can lose the last 5 seconds of changes, but it's - // much faster - .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), ASYNC); - } - - /** - * Settings for the notification messages index - * - * @return An Elasticsearch builder initialised with the desired settings - * for Ml indexes. - */ - Settings.Builder mlNotificationIndexSettings() { - return Settings.builder() - // Our indexes are small and one shard puts the - // least possible burden on Elasticsearch - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas) - .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), delayedNodeTimeOutSetting) - // We need to allow fields not mentioned in the mappings to - // pick up default mappings and be used in queries - .put(MapperService.INDEX_MAPPER_DYNAMIC_SETTING.getKey(), true); - } - - public void putJobResultsIndexTemplate(BiConsumer listener) { - try { - XContentBuilder resultsMapping = ElasticsearchMappings.resultsMapping(); - XContentBuilder categoryDefinitionMapping = ElasticsearchMappings.categoryDefinitionMapping(); - XContentBuilder dataCountsMapping = ElasticsearchMappings.dataCountsMapping(); - XContentBuilder modelSnapshotMapping = ElasticsearchMappings.modelSnapshotMapping(); - - PutIndexTemplateRequest templateRequest = new PutIndexTemplateRequest(AnomalyDetectorsIndex.jobResultsIndexPrefix()); - templateRequest.patterns(Collections.singletonList(AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*")); - templateRequest.settings(mlResultsIndexSettings()); - templateRequest.mapping(Result.TYPE.getPreferredName(), resultsMapping); - templateRequest.mapping(CategoryDefinition.TYPE.getPreferredName(), categoryDefinitionMapping); - templateRequest.mapping(DataCounts.TYPE.getPreferredName(), dataCountsMapping); - templateRequest.mapping(ModelSnapshot.TYPE.getPreferredName(), modelSnapshotMapping); - - client.admin().indices().putTemplate(templateRequest, - ActionListener.wrap(r -> listener.accept(true, null), e -> listener.accept(false, e))); - } catch (Exception e) { - LOGGER.error("Error putting the template for the " + AnomalyDetectorsIndex.jobResultsIndexPrefix() + " indices", e); - } } /** diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/MachineLearningTemplateRegistryTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/MachineLearningTemplateRegistryTests.java new file mode 100644 index 00000000000..30e635bf106 --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/MachineLearningTemplateRegistryTests.java @@ -0,0 +1,302 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml; + +import org.elasticsearch.Version; +import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.ml.job.persistence.MockClientBuilder; +import org.elasticsearch.xpack.ml.job.process.autodetect.state.CategorizerState; +import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; +import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; +import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelState; +import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles; +import org.elasticsearch.xpack.ml.job.results.CategoryDefinition; +import org.elasticsearch.xpack.ml.job.results.Result; +import org.elasticsearch.xpack.ml.notifications.AuditActivity; +import org.elasticsearch.xpack.ml.notifications.AuditMessage; +import org.elasticsearch.xpack.ml.notifications.Auditor; +import org.junit.Before; +import org.mockito.ArgumentCaptor; + +import java.net.InetAddress; +import java.util.Collections; +import java.util.concurrent.ExecutorService; + +import static org.elasticsearch.mock.orig.Mockito.doAnswer; +import static org.elasticsearch.mock.orig.Mockito.times; +import static org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex.ML_META_INDEX; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class MachineLearningTemplateRegistryTests extends ESTestCase { + private static final String CLUSTER_NAME = "clusterMcClusterFace"; + + private ClusterService clusterService; + private ExecutorService executorService; + private Client client; + private ThreadPool threadPool; + + @Before + public void setUpMocks() { + threadPool = mock(ThreadPool.class); + executorService = mock(ExecutorService.class); + clusterService = mock(ClusterService.class); + client = mock(Client.class); + + doAnswer(invocation -> { + ((Runnable) invocation.getArguments()[0]).run(); + return null; + }).when(executorService).execute(any(Runnable.class)); + when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService); + } + + public void testAddsListener() throws Exception { + MachineLearningTemplateRegistry templateRegistry = + new MachineLearningTemplateRegistry(Settings.EMPTY, clusterService, client, threadPool); + + verify(clusterService, times(1)).addListener(templateRegistry); + } + + public void testAddTemplatesIfMissing() throws Exception { + MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME); + ArgumentCaptor captor = ArgumentCaptor.forClass(PutIndexTemplateRequest.class); + clientBuilder.putTemplate(captor); + + MachineLearningTemplateRegistry templateRegistry = + new MachineLearningTemplateRegistry(Settings.EMPTY, clusterService, clientBuilder.build(), threadPool); + + ClusterState cs = ClusterState.builder(new ClusterName("_name")) + .nodes(DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT)) + .localNodeId("_node_id") + .masterNodeId("_node_id")) + .metaData(MetaData.builder()) + .build(); + templateRegistry.clusterChanged(new ClusterChangedEvent("_source", cs, cs)); + + verify(threadPool, times(4)).executor(anyString()); + assertFalse(templateRegistry.putMlNotificationsIndexTemplateCheck.get()); + assertFalse(templateRegistry.putMlMetaIndexTemplateCheck.get()); + assertFalse(templateRegistry.putMlNotificationsIndexTemplateCheck.get()); + assertFalse(templateRegistry.putResultsIndexTemplateCheck.get()); + } + + public void testAddTemplatesIfMissing_alreadyInitialized() throws Exception { + MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME); + ArgumentCaptor captor = ArgumentCaptor.forClass(PutIndexTemplateRequest.class); + clientBuilder.putTemplate(captor); + + MachineLearningTemplateRegistry templateRegistry = + new MachineLearningTemplateRegistry(Settings.EMPTY, clusterService, clientBuilder.build(), threadPool); + + ClusterState cs = ClusterState.builder(new ClusterName("_name")) + .nodes(DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT)) + .localNodeId("_node_id") + .masterNodeId("_node_id")) + .metaData(MetaData.builder() + .put(IndexMetaData.builder(Auditor.NOTIFICATIONS_INDEX).settings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + )) + .put(IndexMetaData.builder(AnomalyDetectorsIndex.ML_META_INDEX).settings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + )) + .put(IndexMetaData.builder(AnomalyDetectorsIndex.jobStateIndexName()).settings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + )) + .put(IndexTemplateMetaData.builder(Auditor.NOTIFICATIONS_INDEX).version(Version.CURRENT.id).build()) + .put(IndexTemplateMetaData.builder(AnomalyDetectorsIndex.ML_META_INDEX).version(Version.CURRENT.id).build()) + .put(IndexTemplateMetaData.builder(AnomalyDetectorsIndex.jobStateIndexName()).version(Version.CURRENT.id).build()) + .put(IndexTemplateMetaData.builder( + AnomalyDetectorsIndex.jobResultsIndexPrefix()).version(Version.CURRENT.id).build()) + .putCustom(MlMetadata.TYPE, new MlMetadata.Builder().build())) + .build(); + MlDailyManagementService initialDailyManagementService = mock(MlDailyManagementService.class); + templateRegistry.clusterChanged(new ClusterChangedEvent("_source", cs, cs)); + + verify(threadPool, times(0)).executor(anyString()); + assertFalse(templateRegistry.putMlNotificationsIndexTemplateCheck.get()); + assertFalse(templateRegistry.putMlMetaIndexTemplateCheck.get()); + assertFalse(templateRegistry.putMlNotificationsIndexTemplateCheck.get()); + assertFalse(templateRegistry.putResultsIndexTemplateCheck.get()); + } + + public void testMlResultsIndexSettings() { + MachineLearningTemplateRegistry templateRegistry = + new MachineLearningTemplateRegistry(createSettings(), clusterService, client, threadPool); + Settings settings = templateRegistry.mlResultsIndexSettings().build(); + + assertEquals("1", settings.get("index.number_of_shards")); + assertEquals("0-2", settings.get("index.auto_expand_replicas")); + assertEquals("async", settings.get("index.translog.durability")); + assertEquals("true", settings.get("index.mapper.dynamic")); + assertEquals("all_field_values", settings.get("index.query.default_field")); + assertEquals("2s", settings.get("index.unassigned.node_left.delayed_timeout")); + } + + public void testMlAuditIndexSettings() { + MachineLearningTemplateRegistry templateRegistry = + new MachineLearningTemplateRegistry(createSettings(), clusterService, client, threadPool); + Settings settings = templateRegistry.mlResultsIndexSettings().build(); + + assertEquals("1", settings.get("index.number_of_shards")); + assertEquals("0-2", settings.get("index.auto_expand_replicas")); + assertEquals("async", settings.get("index.translog.durability")); + assertEquals("true", settings.get("index.mapper.dynamic")); + assertEquals("2s", settings.get("index.unassigned.node_left.delayed_timeout")); + } + + public void testMlStateIndexSettings() { + MachineLearningTemplateRegistry templateRegistry = + new MachineLearningTemplateRegistry(createSettings(), clusterService, client, threadPool); + Settings settings = templateRegistry.mlResultsIndexSettings().build(); + + assertEquals("1", settings.get("index.number_of_shards")); + assertEquals("0-2", settings.get("index.auto_expand_replicas")); + assertEquals("async", settings.get("index.translog.durability")); + assertEquals("2s", settings.get("index.unassigned.node_left.delayed_timeout")); + } + + public void testPutNotificationIndexTemplate() { + MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME); + ArgumentCaptor captor = ArgumentCaptor.forClass(PutIndexTemplateRequest.class); + clientBuilder.putTemplate(captor); + + MachineLearningTemplateRegistry templateRegistry = + new MachineLearningTemplateRegistry(createSettings(), clusterService, clientBuilder.build(), threadPool); + + templateRegistry.putNotificationMessageIndexTemplate((result, error) -> { + assertTrue(result); + PutIndexTemplateRequest request = captor.getValue(); + assertNotNull(request); + assertEquals(templateRegistry.mlNotificationIndexSettings().build(), request.settings()); + assertTrue(request.mappings().containsKey(AuditMessage.TYPE.getPreferredName())); + assertTrue(request.mappings().containsKey(AuditActivity.TYPE.getPreferredName())); + assertEquals(2, request.mappings().size()); + assertEquals(Collections.singletonList(Auditor.NOTIFICATIONS_INDEX), request.patterns()); + assertEquals(new Integer(Version.CURRENT.id), request.version()); + }); + } + + public void testPutMetaIndexTemplate() { + MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME); + ArgumentCaptor captor = ArgumentCaptor.forClass(PutIndexTemplateRequest.class); + clientBuilder.putTemplate(captor); + + MachineLearningTemplateRegistry templateRegistry = + new MachineLearningTemplateRegistry(createSettings(), clusterService, clientBuilder.build(), threadPool); + + templateRegistry.putMetaIndexTemplate((result, error) -> { + assertTrue(result); + PutIndexTemplateRequest request = captor.getValue(); + assertNotNull(request); + assertEquals(templateRegistry.mlNotificationIndexSettings().build(), request.settings()); + assertEquals(0, request.mappings().size()); + assertEquals(Collections.singletonList(ML_META_INDEX), request.patterns()); + assertEquals(new Integer(Version.CURRENT.id), request.version()); + }); + } + + public void testPutJobStateIndexTemplate() { + MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME); + ArgumentCaptor captor = ArgumentCaptor.forClass(PutIndexTemplateRequest.class); + clientBuilder.putTemplate(captor); + + MachineLearningTemplateRegistry templateRegistry = + new MachineLearningTemplateRegistry(createSettings(), clusterService, clientBuilder.build(), threadPool); + + templateRegistry.putJobStateIndexTemplate((result, error) -> { + assertTrue(result); + PutIndexTemplateRequest request = captor.getValue(); + assertNotNull(request); + assertEquals(templateRegistry.mlStateIndexSettings().build(), request.settings()); + assertTrue(request.mappings().containsKey(CategorizerState.TYPE)); + assertTrue(request.mappings().containsKey(Quantiles.TYPE.getPreferredName())); + assertTrue(request.mappings().containsKey(ModelState.TYPE.getPreferredName())); + assertEquals(3, request.mappings().size()); + assertEquals(Collections.singletonList(AnomalyDetectorsIndex.jobStateIndexName()), request.patterns()); + assertEquals(new Integer(Version.CURRENT.id), request.version()); + }); + } + + public void testPutJobResultsIndexTemplate() { + MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME); + ArgumentCaptor captor = ArgumentCaptor.forClass(PutIndexTemplateRequest.class); + clientBuilder.putTemplate(captor); + + MachineLearningTemplateRegistry templateRegistry = + new MachineLearningTemplateRegistry(createSettings(), clusterService, clientBuilder.build(), threadPool); + + templateRegistry.putJobResultsIndexTemplate((result, error) -> { + assertTrue(result); + PutIndexTemplateRequest request = captor.getValue(); + assertNotNull(request); + assertEquals(templateRegistry.mlResultsIndexSettings().build(), request.settings()); + assertTrue(request.mappings().containsKey(Result.TYPE.getPreferredName())); + assertTrue(request.mappings().containsKey(CategoryDefinition.TYPE.getPreferredName())); + assertTrue(request.mappings().containsKey(DataCounts.TYPE.getPreferredName())); + assertTrue(request.mappings().containsKey(ModelSnapshot.TYPE.getPreferredName())); + assertEquals(4, request.mappings().size()); + assertEquals(Collections.singletonList(AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*"), request.patterns()); + assertEquals(new Integer(Version.CURRENT.id), request.version()); + }); + } + + public void testTemplateIsPresentAndUpToDate() { + MachineLearningTemplateRegistry templateRegistry = + new MachineLearningTemplateRegistry(createSettings(), clusterService, client, threadPool); + + // missing template + MetaData metaData = MetaData.builder().build(); + assertFalse(templateRegistry.templateIsPresentAndUpToDate(Auditor.NOTIFICATIONS_INDEX, metaData)); + + // old version of template + IndexTemplateMetaData templateMetaData = IndexTemplateMetaData.builder(Auditor.NOTIFICATIONS_INDEX) + .version(Version.CURRENT.id - 1).build(); + metaData = MetaData.builder().put(templateMetaData).build(); + assertFalse(templateRegistry.templateIsPresentAndUpToDate(Auditor.NOTIFICATIONS_INDEX, metaData)); + + // latest template + templateMetaData = IndexTemplateMetaData.builder(Auditor.NOTIFICATIONS_INDEX) + .version(Version.CURRENT.id).build(); + metaData = MetaData.builder().put(templateMetaData).build(); + assertTrue(templateRegistry.templateIsPresentAndUpToDate(Auditor.NOTIFICATIONS_INDEX, metaData)); + } + + private Settings createSettings() { + return Settings.builder() + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueSeconds(2)) + .put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), 1001L) + .build(); + } +} \ No newline at end of file diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java index d384c88c98e..59e75a33fa2 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java @@ -10,8 +10,6 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -20,15 +18,12 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex; -import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.notifications.Auditor; import org.junit.Before; import java.net.InetAddress; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledFuture; -import java.util.function.BiConsumer; import static org.elasticsearch.mock.orig.Mockito.doAnswer; import static org.elasticsearch.mock.orig.Mockito.times; @@ -46,7 +41,6 @@ public class MlInitializationServiceTests extends ESTestCase { private ExecutorService executorService; private ClusterService clusterService; private Client client; - private JobProvider jobProvider; private Auditor auditor; @Before @@ -55,7 +49,6 @@ public class MlInitializationServiceTests extends ESTestCase { executorService = mock(ExecutorService.class); clusterService = mock(ClusterService.class); client = mock(Client.class); - jobProvider = mock(JobProvider.class); auditor = mock(Auditor.class); doAnswer(invocation -> { @@ -63,16 +56,14 @@ public class MlInitializationServiceTests extends ESTestCase { return null; }).when(executorService).execute(any(Runnable.class)); when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService); + ScheduledFuture scheduledFuture = mock(ScheduledFuture.class); when(threadPool.schedule(any(), any(), any())).thenReturn(scheduledFuture); } public void testInitialize() throws Exception { - JobProvider jobProvider = mockJobProvider(); - - ClusterService clusterService = mock(ClusterService.class); MlInitializationService initializationService = - new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, jobProvider, auditor); + new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, auditor); ClusterState cs = ClusterState.builder(new ClusterName("_name")) .nodes(DiscoveryNodes.builder() @@ -85,15 +76,11 @@ public class MlInitializationServiceTests extends ESTestCase { verify(clusterService, times(1)).submitStateUpdateTask(eq("install-ml-metadata"), any()); assertThat(initializationService.getDailyManagementService().isStarted(), is(true)); - verify(jobProvider, times(1)).putNotificationMessageIndexTemplate(any()); - verify(jobProvider, times(1)).putMetaIndexTemplate(any()); - verify(jobProvider, times(1)).putJobStateIndexTemplate(any()); - verify(jobProvider, times(1)).putJobResultsIndexTemplate(any()); } public void testInitialize_noMasterNode() throws Exception { MlInitializationService initializationService = - new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, jobProvider, auditor); + new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, auditor); ClusterState cs = ClusterState.builder(new ClusterName("_name")) .nodes(DiscoveryNodes.builder() @@ -104,25 +91,12 @@ public class MlInitializationServiceTests extends ESTestCase { verify(clusterService, times(0)).submitStateUpdateTask(eq("install-ml-metadata"), any()); assertThat(initializationService.getDailyManagementService(), is(nullValue())); - verify(jobProvider, times(0)).putNotificationMessageIndexTemplate(any()); - verify(jobProvider, times(0)).putMetaIndexTemplate(any()); - verify(jobProvider, times(0)).putJobStateIndexTemplate(any()); - verify(jobProvider, times(0)).putJobResultsIndexTemplate(any()); } public void testInitialize_alreadyInitialized() throws Exception { - ThreadPool threadPool = mock(ThreadPool.class); - ExecutorService executorService = mock(ExecutorService.class); - doAnswer(invocation -> { - ((Runnable) invocation.getArguments()[0]).run(); - return null; - }).when(executorService).execute(any(Runnable.class)); - when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService); - ClusterService clusterService = mock(ClusterService.class); - JobProvider jobProvider = mockJobProvider(); MlInitializationService initializationService = - new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, jobProvider, auditor); + new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, auditor); ClusterState cs = ClusterState.builder(new ClusterName("_name")) .nodes(DiscoveryNodes.builder() @@ -130,25 +104,6 @@ public class MlInitializationServiceTests extends ESTestCase { .localNodeId("_node_id") .masterNodeId("_node_id")) .metaData(MetaData.builder() - .put(IndexMetaData.builder(Auditor.NOTIFICATIONS_INDEX).settings(Settings.builder() - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) - )) - .put(IndexMetaData.builder(JobProvider.ML_META_INDEX).settings(Settings.builder() - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) - )) - .put(IndexMetaData.builder(AnomalyDetectorsIndex.jobStateIndexName()).settings(Settings.builder() - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) - )) - .put(IndexTemplateMetaData.builder(Auditor.NOTIFICATIONS_INDEX).build()) - .put(IndexTemplateMetaData.builder(JobProvider.ML_META_INDEX).build()) - .put(IndexTemplateMetaData.builder(AnomalyDetectorsIndex.jobStateIndexName()).build()) - .put(IndexTemplateMetaData.builder(AnomalyDetectorsIndex.jobResultsIndexPrefix()).build()) .putCustom(MlMetadata.TYPE, new MlMetadata.Builder().build())) .build(); MlDailyManagementService initialDailyManagementService = mock(MlDailyManagementService.class); @@ -157,25 +112,12 @@ public class MlInitializationServiceTests extends ESTestCase { verify(clusterService, times(0)).submitStateUpdateTask(eq("install-ml-metadata"), any()); assertSame(initialDailyManagementService, initializationService.getDailyManagementService()); - verify(jobProvider, times(0)).putNotificationMessageIndexTemplate(any()); - verify(jobProvider, times(0)).putMetaIndexTemplate(any()); - verify(jobProvider, times(0)).putJobStateIndexTemplate(any()); - verify(jobProvider, times(0)).putJobResultsIndexTemplate(any()); } public void testInitialize_onlyOnce() throws Exception { - ThreadPool threadPool = mock(ThreadPool.class); - ExecutorService executorService = mock(ExecutorService.class); - doAnswer(invocation -> { - ((Runnable) invocation.getArguments()[0]).run(); - return null; - }).when(executorService).execute(any(Runnable.class)); - when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService); - ClusterService clusterService = mock(ClusterService.class); - JobProvider jobProvider = mockJobProvider(); MlInitializationService initializationService = - new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, jobProvider, auditor); + new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, auditor); ClusterState cs = ClusterState.builder(new ClusterName("_name")) .nodes(DiscoveryNodes.builder() @@ -188,42 +130,11 @@ public class MlInitializationServiceTests extends ESTestCase { initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs)); verify(clusterService, times(1)).submitStateUpdateTask(eq("install-ml-metadata"), any()); - verify(jobProvider, times(1)).putNotificationMessageIndexTemplate(any()); - verify(jobProvider, times(1)).putMetaIndexTemplate(any()); - verify(jobProvider, times(1)).putJobStateIndexTemplate(any()); - verify(jobProvider, times(1)).putJobResultsIndexTemplate(any()); - } - - private JobProvider mockJobProvider() { - JobProvider jobProvider = mock(JobProvider.class); - - doAnswer(invocation -> { - BiConsumer listener = (BiConsumer)invocation.getArguments()[0]; - listener.accept(true, null); - return null; - }).when(jobProvider).putMetaIndexTemplate(any()); - doAnswer(invocation -> { - BiConsumer listener = (BiConsumer)invocation.getArguments()[0]; - listener.accept(true, null); - return null; - }).when(jobProvider).putNotificationMessageIndexTemplate(any()); - doAnswer(invocation -> { - BiConsumer listener = (BiConsumer)invocation.getArguments()[0]; - listener.accept(true, null); - return null; - }).when(jobProvider).putJobStateIndexTemplate(any()); - doAnswer(invocation -> { - BiConsumer listener = (BiConsumer)invocation.getArguments()[0]; - listener.accept(true, null); - return null; - }).when(jobProvider).putJobResultsIndexTemplate(any()); - - return jobProvider; } public void testNodeGoesFromMasterToNonMasterAndBack() throws Exception { MlInitializationService initializationService = - new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, jobProvider, auditor); + new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, auditor); MlDailyManagementService initialDailyManagementService = mock(MlDailyManagementService.class); initializationService.setDailyManagementService(initialDailyManagementService); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java index 795cc292b7c..1dc40af66e9 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java @@ -315,7 +315,7 @@ public class OpenJobActionTests extends ESTestCase { private void addJobAndIndices(MetaData.Builder metaData, RoutingTable.Builder routingTable, String... jobIds) { List indices = new ArrayList<>(); indices.add(AnomalyDetectorsIndex.jobStateIndexName()); - indices.add(JobProvider.ML_META_INDEX); + indices.add(AnomalyDetectorsIndex.ML_META_INDEX); indices.add(Auditor.NOTIFICATIONS_INDEX); for (String jobId : jobIds) { indices.add(AnomalyDetectorsIndex.jobResultsIndexName(jobId)); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java index 68e49919e68..76415833dde 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java @@ -5,15 +5,19 @@ */ package org.elasticsearch.xpack.ml.integration; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.XPackPlugin; import org.elasticsearch.xpack.XPackSettings; import org.elasticsearch.xpack.XPackSingleNodeTestCase; +import org.elasticsearch.xpack.ml.MachineLearningTemplateRegistry; import org.elasticsearch.xpack.ml.action.util.QueryPage; import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder; import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder; @@ -51,8 +55,11 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicReference; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -95,7 +102,7 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase { } public void testProcessResults() throws Exception { - putResultsIndexMappingTemplate(); + putIndexTemplates(); ResultsBuilder builder = new ResultsBuilder(); Bucket bucket = createBucket(false); @@ -155,7 +162,7 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase { } public void testDeleteInterimResults() throws Exception { - putResultsIndexMappingTemplate(); + putIndexTemplates(); Bucket nonInterimBucket = createBucket(false); Bucket interimBucket = createBucket(true); @@ -184,7 +191,7 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase { } public void testMultipleFlushesBetweenPersisting() throws Exception { - putResultsIndexMappingTemplate(); + putIndexTemplates(); Bucket finalBucket = createBucket(true); List finalAnomalyRecords = createRecords(true); @@ -214,7 +221,7 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase { } public void testEndOfStreamTriggersPersisting() throws Exception { - putResultsIndexMappingTemplate(); + putIndexTemplates(); Bucket bucket = createBucket(false); List firstSetOfRecords = createRecords(false); List secondSetOfRecords = createRecords(false); @@ -236,10 +243,17 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase { assertResultsAreSame(allRecords, persistedRecords); } - private void putResultsIndexMappingTemplate() throws InterruptedException { - CountDownLatch latch = new CountDownLatch(1); - jobProvider.putJobResultsIndexTemplate((aBoolean, e) -> {latch.countDown();}); - latch.await(); + private void putIndexTemplates() throws InterruptedException { + ThreadPool threadPool = mock(ThreadPool.class); + ExecutorService executorService = mock(ExecutorService.class); + doAnswer(invocation -> { + ((Runnable) invocation.getArguments()[0]).run(); + return null; + }).when(executorService).execute(any(Runnable.class)); + when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService); + + new MachineLearningTemplateRegistry(Settings.EMPTY, mock(ClusterService.class), client(), threadPool) + .addTemplatesIfMissing(client().admin().cluster().state(new ClusterStateRequest().all()).actionGet().getState()); } private Bucket createBucket(boolean isInterim) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java index d4345e15335..e38b287a7c5 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java @@ -11,7 +11,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; -import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.search.MultiSearchRequest; import org.elasticsearch.action.search.MultiSearchResponse; @@ -41,12 +40,10 @@ import org.elasticsearch.search.SearchHits; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.ml.action.DeleteJobAction; import org.elasticsearch.xpack.ml.action.util.QueryPage; -import org.elasticsearch.xpack.ml.job.config.AnalysisLimits; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder.InfluencersQuery; import org.elasticsearch.xpack.ml.job.process.autodetect.state.CategorizerState; -import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelState; import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles; @@ -56,9 +53,6 @@ import org.elasticsearch.xpack.ml.job.results.CategoryDefinition; import org.elasticsearch.xpack.ml.job.results.Influencer; import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities; import org.elasticsearch.xpack.ml.job.results.Result; -import org.elasticsearch.xpack.ml.notifications.AuditActivity; -import org.elasticsearch.xpack.ml.notifications.AuditMessage; -import org.elasticsearch.xpack.ml.notifications.Auditor; import org.mockito.ArgumentCaptor; import java.io.ByteArrayOutputStream; @@ -76,7 +70,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder; -import static org.elasticsearch.xpack.ml.job.persistence.JobProvider.ML_META_INDEX; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.mockito.Matchers.any; @@ -138,40 +131,6 @@ public class JobProviderTests extends ESTestCase { assertEquals("", quantiles.getQuantileState()); } - public void testMlResultsIndexSettings() { - MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME); - JobProvider provider = createProvider(clientBuilder.build()); - Settings settings = provider.mlResultsIndexSettings().build(); - - assertEquals("1", settings.get("index.number_of_shards")); - assertEquals("0", settings.get("index.number_of_replicas")); - assertEquals("async", settings.get("index.translog.durability")); - assertEquals("true", settings.get("index.mapper.dynamic")); - assertEquals("all_field_values", settings.get("index.query.default_field")); - } - - public void testPutJobResultsIndexTemplate() { - MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME); - ArgumentCaptor captor = ArgumentCaptor.forClass(PutIndexTemplateRequest.class); - clientBuilder.putTemplate(captor); - - Job.Builder job = buildJobBuilder("foo"); - JobProvider provider = createProvider(clientBuilder.build()); - - provider.putJobResultsIndexTemplate((result, error) -> { - assertTrue(result); - PutIndexTemplateRequest request = captor.getValue(); - assertNotNull(request); - assertEquals(provider.mlResultsIndexSettings().build(), request.settings()); - assertTrue(request.mappings().containsKey(Result.TYPE.getPreferredName())); - assertTrue(request.mappings().containsKey(CategoryDefinition.TYPE.getPreferredName())); - assertTrue(request.mappings().containsKey(DataCounts.TYPE.getPreferredName())); - assertTrue(request.mappings().containsKey(ModelSnapshot.TYPE.getPreferredName())); - assertEquals(4, request.mappings().size()); - assertEquals(Collections.singletonList(AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*"), request.patterns()); - }); - } - @SuppressWarnings("unchecked") public void testCreateJobResultsIndex() { MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME); @@ -353,85 +312,7 @@ public class JobProviderTests extends ESTestCase { }); } - public void testMlAuditIndexSettings() { - MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME); - JobProvider provider = createProvider(clientBuilder.build()); - Settings settings = provider.mlResultsIndexSettings().build(); - - assertEquals("1", settings.get("index.number_of_shards")); - assertEquals("0", settings.get("index.number_of_replicas")); - assertEquals("async", settings.get("index.translog.durability")); - assertEquals("true", settings.get("index.mapper.dynamic")); - } - - public void testPutNotificationIndexTemplate() { - MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME); - ArgumentCaptor captor = ArgumentCaptor.forClass(PutIndexTemplateRequest.class); - clientBuilder.putTemplate(captor); - - JobProvider provider = createProvider(clientBuilder.build()); - - provider.putNotificationMessageIndexTemplate((result, error) -> { - assertTrue(result); - PutIndexTemplateRequest request = captor.getValue(); - assertNotNull(request); - assertEquals(provider.mlNotificationIndexSettings().build(), request.settings()); - assertTrue(request.mappings().containsKey(AuditMessage.TYPE.getPreferredName())); - assertTrue(request.mappings().containsKey(AuditActivity.TYPE.getPreferredName())); - assertEquals(2, request.mappings().size()); - assertEquals(Collections.singletonList(Auditor.NOTIFICATIONS_INDEX), request.patterns()); - }); - } - - public void testPutMetaIndexTemplate() { - MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME); - ArgumentCaptor captor = ArgumentCaptor.forClass(PutIndexTemplateRequest.class); - clientBuilder.putTemplate(captor); - - JobProvider provider = createProvider(clientBuilder.build()); - - provider.putMetaIndexTemplate((result, error) -> { - assertTrue(result); - PutIndexTemplateRequest request = captor.getValue(); - assertNotNull(request); - assertEquals(provider.mlNotificationIndexSettings().build(), request.settings()); - assertEquals(0, request.mappings().size()); - assertEquals(Collections.singletonList(ML_META_INDEX), request.patterns()); - }); - } - - public void testMlStateIndexSettings() { - MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME); - JobProvider provider = createProvider(clientBuilder.build()); - Settings settings = provider.mlResultsIndexSettings().build(); - - assertEquals("1", settings.get("index.number_of_shards")); - assertEquals("0", settings.get("index.number_of_replicas")); - assertEquals("async", settings.get("index.translog.durability")); - } - - public void testPutJobStateIndexTemplate() { - MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME); - ArgumentCaptor captor = ArgumentCaptor.forClass(PutIndexTemplateRequest.class); - clientBuilder.putTemplate(captor); - - Job.Builder job = buildJobBuilder("foo"); - JobProvider provider = createProvider(clientBuilder.build()); - - provider.putJobStateIndexTemplate((result, error) -> { - assertTrue(result); - PutIndexTemplateRequest request = captor.getValue(); - assertNotNull(request); - assertEquals(provider.mlStateIndexSettings().build(), request.settings()); - assertTrue(request.mappings().containsKey(CategorizerState.TYPE)); - assertTrue(request.mappings().containsKey(Quantiles.TYPE.getPreferredName())); - assertTrue(request.mappings().containsKey(ModelState.TYPE.getPreferredName())); - assertEquals(3, request.mappings().size()); - assertEquals(Collections.singletonList(AnomalyDetectorsIndex.jobStateIndexName()), request.patterns()); - }); - } - - public void testDeleteJobRelatedIndices() throws InterruptedException, ExecutionException, IOException { + public void testDeleteJobRelatedIndices() throws InterruptedException, ExecutionException, IOException { @SuppressWarnings("unchecked") ActionListener actionListener = mock(ActionListener.class); String jobId = "ThisIsMyJob"; @@ -1258,10 +1139,7 @@ public class JobProviderTests extends ESTestCase { } private JobProvider createProvider(Client client) { - Settings.Builder builder = Settings.builder() - .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueSeconds(1)) - .put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), 1000L); - return new JobProvider(client, 0, builder.build()); + return new JobProvider(client, 0, Settings.EMPTY); } private static GetResponse createGetResponse(boolean exists, Map source) throws IOException {