[ML] Index template registry with simple versioning (elastic/x-pack-elasticsearch#655)

[ML] * Add MachineLearningTemplateRegistry class

[ML] * Add blocking method to put templates required by tests

[ML]  * Add version check for templates

[ML] * Review comments

Original commit: elastic/x-pack-elasticsearch@07d315e56d
This commit is contained in:
David Kyle 2017-02-28 15:34:05 +00:00 committed by GitHub
parent 6864111acc
commit 59b50bb18c
14 changed files with 702 additions and 502 deletions

View File

@ -304,7 +304,8 @@ public class MachineLearning extends Plugin implements ActionPlugin {
jobProvider, jobProvider,
jobManager, jobManager,
dataProcessor, dataProcessor,
new MlInitializationService(settings, threadPool, clusterService, client, jobProvider, auditor), new MlInitializationService(settings, threadPool, clusterService, client, auditor),
new MachineLearningTemplateRegistry(settings, clusterService, client, threadPool),
jobDataCountsPersister, jobDataCountsPersister,
datafeedJobRunner, datafeedJobRunner,
persistentActionService, persistentActionService,

View File

@ -0,0 +1,349 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.CategorizerState;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelState;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.ml.job.results.CategoryDefinition;
import org.elasticsearch.xpack.ml.job.results.Result;
import org.elasticsearch.xpack.ml.notifications.AuditActivity;
import org.elasticsearch.xpack.ml.notifications.AuditMessage;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
/**
* Registry for the ML index templates and settings
*/
public class MachineLearningTemplateRegistry extends AbstractComponent implements ClusterStateListener {
private static final String ASYNC = "async";
private final Client client;
private final ThreadPool threadPool;
final AtomicBoolean putMlNotificationsIndexTemplateCheck = new AtomicBoolean(false);
final AtomicBoolean putMlMetaIndexTemplateCheck = new AtomicBoolean(false);
final AtomicBoolean putStateIndexTemplateCheck = new AtomicBoolean(false);
final AtomicBoolean putResultsIndexTemplateCheck = new AtomicBoolean(false);
// Allows us in test mode to disable the delay of shard allocation, so that in tests we don't have to wait for
// for at least a minute for shards to get allocated.
private final TimeValue delayedNodeTimeOutSetting;
public MachineLearningTemplateRegistry(Settings settings, ClusterService clusterService, Client client,
ThreadPool threadPool) {
super(settings);
this.client = client;
this.threadPool = threadPool;
// Whether we are using native process is a good way to detect whether we are in dev / test mode:
if (MachineLearning.AUTODETECT_PROCESS.get(settings)) {
delayedNodeTimeOutSetting = UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(settings);
} else {
delayedNodeTimeOutSetting = TimeValue.timeValueNanos(0);
}
clusterService.addListener(this);
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.localNodeMaster()) {
// wait until the gateway has recovered from disk,
// otherwise we think may not have the index templates while they actually do exist
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK) == false) {
addTemplatesIfMissing(event.state(), () -> {});
}
}
}
/**
* Blocking call adds the registered index templates if missing to the
* cluster waiting until the templates have been updated.
*/
public void addTemplatesIfMissing(ClusterState state) throws InterruptedException {
// to be sure that the templates exist after this method call, we should wait until the put index templates calls
// have returned if the templates were missing
CountDownLatch latch = new CountDownLatch(4);
addTemplatesIfMissing(state, latch::countDown);
latch.await();
}
private void addTemplatesIfMissing(ClusterState state, Runnable callback) {
MetaData metaData = state.metaData();
addMlNotificationsIndexTemplate(metaData, callback);
addMlMetaIndexTemplate(metaData, callback);
addStateIndexTemplate(metaData, callback);
addResultsIndexTemplate(metaData, callback);
}
boolean templateIsPresentAndUpToDate(String templateName, MetaData metaData) {
IndexTemplateMetaData templateMetaData = metaData.templates().get(templateName);
if (templateMetaData == null) {
return false;
}
return templateMetaData.version() != null && templateMetaData.version() >= Version.CURRENT.id;
}
private void addMlNotificationsIndexTemplate(MetaData metaData, Runnable callback) {
if (templateIsPresentAndUpToDate(Auditor.NOTIFICATIONS_INDEX, metaData) == false) {
if (putMlNotificationsIndexTemplateCheck.compareAndSet(false, true)) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
putNotificationMessageIndexTemplate((result, error) -> {
putMlNotificationsIndexTemplateCheck.set(false);
if (result) {
logger.info("successfully created {} index template", Auditor.NOTIFICATIONS_INDEX);
} else {
logger.error(
new ParameterizedMessage("not able to create {} index template", Auditor.NOTIFICATIONS_INDEX), error);
}
callback.run();
});
});
} else {
callback.run();
}
} else {
callback.run();
}
}
private void addMlMetaIndexTemplate(MetaData metaData, Runnable callback) {
if (templateIsPresentAndUpToDate(AnomalyDetectorsIndex.ML_META_INDEX, metaData) == false) {
if (putMlMetaIndexTemplateCheck.compareAndSet(false, true)) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
putMetaIndexTemplate((result, error) -> {
putMlMetaIndexTemplateCheck.set(false);
if (result) {
logger.info("successfully created {} index template", AnomalyDetectorsIndex.ML_META_INDEX);
} else {
logger.error(new ParameterizedMessage(
"not able to create {} index template", AnomalyDetectorsIndex.ML_META_INDEX), error);
}
callback.run();
});
});
} else {
callback.run();
}
} else {
callback.run();
}
}
private void addStateIndexTemplate(MetaData metaData, Runnable callback) {
String stateIndexName = AnomalyDetectorsIndex.jobStateIndexName();
if (templateIsPresentAndUpToDate(stateIndexName, metaData) == false) {
if (putStateIndexTemplateCheck.compareAndSet(false, true)) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
putJobStateIndexTemplate((result, error) -> {
putStateIndexTemplateCheck.set(false);
if (result) {
logger.info("successfully created {} index template", stateIndexName);
} else {
logger.error("not able to create " + stateIndexName + " index template", error);
}
callback.run();
});
});
} else {
callback.run();
}
} else {
callback.run();
}
}
private void addResultsIndexTemplate(MetaData metaData, Runnable callback) {
if (templateIsPresentAndUpToDate(AnomalyDetectorsIndex.jobResultsIndexPrefix(), metaData) == false) {
if (putResultsIndexTemplateCheck.compareAndSet(false, true)) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
putJobResultsIndexTemplate((result, error) -> {
putResultsIndexTemplateCheck.set(false);
if (result) {
logger.info("successfully created {} index template", AnomalyDetectorsIndex.jobResultsIndexPrefix());
} else {
logger.error(
new ParameterizedMessage("not able to create {} index template",
AnomalyDetectorsIndex.jobResultsIndexPrefix()), error);
}
callback.run();
});
});
} else {
callback.run();
}
} else {
callback.run();
}
}
/**
* Index template for notifications
*/
void putNotificationMessageIndexTemplate(BiConsumer<Boolean, Exception> listener) {
try {
PutIndexTemplateRequest templateRequest = new PutIndexTemplateRequest(Auditor.NOTIFICATIONS_INDEX);
templateRequest.patterns(Collections.singletonList(Auditor.NOTIFICATIONS_INDEX));
templateRequest.settings(mlNotificationIndexSettings());
templateRequest.mapping(AuditMessage.TYPE.getPreferredName(), ElasticsearchMappings.auditMessageMapping());
templateRequest.mapping(AuditActivity.TYPE.getPreferredName(), ElasticsearchMappings.auditActivityMapping());
templateRequest.version(Version.CURRENT.id);
client.admin().indices().putTemplate(templateRequest,
ActionListener.wrap(r -> listener.accept(true, null), e -> listener.accept(false, e)));
} catch (IOException e) {
logger.warn("Error putting the template for the notification message index", e);
}
}
/**
* Index template for meta data
*/
void putMetaIndexTemplate(BiConsumer<Boolean, Exception> listener) {
PutIndexTemplateRequest templateRequest = new PutIndexTemplateRequest(AnomalyDetectorsIndex.ML_META_INDEX);
templateRequest.patterns(Collections.singletonList(AnomalyDetectorsIndex.ML_META_INDEX));
templateRequest.settings(mlNotificationIndexSettings());
templateRequest.version(Version.CURRENT.id);
client.admin().indices().putTemplate(templateRequest,
ActionListener.wrap(r -> listener.accept(true, null), e -> listener.accept(false, e)));
}
void putJobStateIndexTemplate(BiConsumer<Boolean, Exception> listener) {
try {
XContentBuilder categorizerStateMapping = ElasticsearchMappings.categorizerStateMapping();
XContentBuilder quantilesMapping = ElasticsearchMappings.quantilesMapping();
XContentBuilder modelStateMapping = ElasticsearchMappings.modelStateMapping();
PutIndexTemplateRequest templateRequest = new PutIndexTemplateRequest(AnomalyDetectorsIndex.jobStateIndexName());
templateRequest.patterns(Collections.singletonList(AnomalyDetectorsIndex.jobStateIndexName()));
templateRequest.settings(mlStateIndexSettings());
templateRequest.mapping(CategorizerState.TYPE, categorizerStateMapping);
templateRequest.mapping(Quantiles.TYPE.getPreferredName(), quantilesMapping);
templateRequest.mapping(ModelState.TYPE.getPreferredName(), modelStateMapping);
templateRequest.version(Version.CURRENT.id);
client.admin().indices().putTemplate(templateRequest,
ActionListener.wrap(r -> listener.accept(true, null), e -> listener.accept(false, e)));
} catch (IOException e) {
logger.error("Error creating template mappings for the " + AnomalyDetectorsIndex.jobStateIndexName() + " index", e);
}
}
void putJobResultsIndexTemplate(BiConsumer<Boolean, Exception> listener) {
try {
XContentBuilder resultsMapping = ElasticsearchMappings.resultsMapping();
XContentBuilder categoryDefinitionMapping = ElasticsearchMappings.categoryDefinitionMapping();
XContentBuilder dataCountsMapping = ElasticsearchMappings.dataCountsMapping();
XContentBuilder modelSnapshotMapping = ElasticsearchMappings.modelSnapshotMapping();
PutIndexTemplateRequest templateRequest = new PutIndexTemplateRequest(AnomalyDetectorsIndex.jobResultsIndexPrefix());
templateRequest.patterns(Collections.singletonList(AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*"));
templateRequest.settings(mlResultsIndexSettings());
templateRequest.mapping(Result.TYPE.getPreferredName(), resultsMapping);
templateRequest.mapping(CategoryDefinition.TYPE.getPreferredName(), categoryDefinitionMapping);
templateRequest.mapping(DataCounts.TYPE.getPreferredName(), dataCountsMapping);
templateRequest.mapping(ModelSnapshot.TYPE.getPreferredName(), modelSnapshotMapping);
templateRequest.version(Version.CURRENT.id);
client.admin().indices().putTemplate(templateRequest,
ActionListener.wrap(r -> listener.accept(true, null), e -> listener.accept(false, e)));
} catch (IOException e) {
logger.error("Error creating template mappings for the " + AnomalyDetectorsIndex.jobResultsIndexPrefix() + " indices", e);
}
}
/**
* Build the index settings that we want to apply to results indexes.
*
* @return Builder initialised with the desired setting for the ML results indices.
*/
Settings.Builder mlResultsIndexSettings() {
return Settings.builder()
// Our indexes are small and one shard puts the
// least possible burden on Elasticsearch
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-2")
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), delayedNodeTimeOutSetting)
// Sacrifice durability for performance: in the event of power
// failure we can lose the last 5 seconds of changes, but it's
// much faster
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), ASYNC)
// We need to allow fields not mentioned in the mappings to
// pick up default mappings and be used in queries
.put(MapperService.INDEX_MAPPER_DYNAMIC_SETTING.getKey(), true)
// set the default all search field
.put(IndexSettings.DEFAULT_FIELD_SETTING.getKey(), ElasticsearchMappings.ALL_FIELD_VALUES);
}
/**
* Settings for the notification messages index
*
* @return Builder initialised with the desired setting for the ML index.
*/
Settings.Builder mlNotificationIndexSettings() {
return Settings.builder()
// Our indexes are small and one shard puts the
// least possible burden on Elasticsearch
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-2")
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), delayedNodeTimeOutSetting)
// We need to allow fields not mentioned in the mappings to
// pick up default mappings and be used in queries
.put(MapperService.INDEX_MAPPER_DYNAMIC_SETTING.getKey(), true);
}
/**
* Settings for the state index
*
* @return Builder initialised with the desired setting for the ML index.
*/
Settings.Builder mlStateIndexSettings() {
// TODO review these settings
return Settings.builder()
// Our indexes are small and one shard puts the
// least possible burden on Elasticsearch
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-2")
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), delayedNodeTimeOutSetting)
// Sacrifice durability for performance: in the event of power
// failure we can lose the last 5 seconds of changes, but it's
// much faster
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), ASYNC);
}
}

View File

@ -17,8 +17,6 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.retention.ExpiredModelSnapshotsRemover; import org.elasticsearch.xpack.ml.job.retention.ExpiredModelSnapshotsRemover;
import org.elasticsearch.xpack.ml.job.retention.ExpiredResultsRemover; import org.elasticsearch.xpack.ml.job.retention.ExpiredResultsRemover;
import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.notifications.Auditor;
@ -31,24 +29,17 @@ public class MlInitializationService extends AbstractComponent implements Cluste
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final ClusterService clusterService; private final ClusterService clusterService;
private final Client client; private final Client client;
private final JobProvider jobProvider;
private final Auditor auditor; private final Auditor auditor;
private final AtomicBoolean installMlMetadataCheck = new AtomicBoolean(false); private final AtomicBoolean installMlMetadataCheck = new AtomicBoolean(false);
private final AtomicBoolean putMlNotificationsIndexTemplateCheck = new AtomicBoolean(false);
private final AtomicBoolean putMlMetaIndexTemplateCheck = new AtomicBoolean(false);
private final AtomicBoolean putStateIndexTemplateCheck = new AtomicBoolean(false);
private final AtomicBoolean putResultsIndexTemplateCheck = new AtomicBoolean(false);
private volatile MlDailyManagementService mlDailyManagementService; private volatile MlDailyManagementService mlDailyManagementService;
public MlInitializationService(Settings settings, ThreadPool threadPool, ClusterService clusterService, Client client, public MlInitializationService(Settings settings, ThreadPool threadPool, ClusterService clusterService, Client client,
JobProvider jobProvider, Auditor auditor) { Auditor auditor) {
super(settings); super(settings);
this.threadPool = threadPool; this.threadPool = threadPool;
this.clusterService = clusterService; this.clusterService = clusterService;
this.client = client; this.client = client;
this.jobProvider = jobProvider;
this.auditor = auditor; this.auditor = auditor;
clusterService.addListener(this); clusterService.addListener(this);
clusterService.addLifecycleListener(new LifecycleListener() { clusterService.addLifecycleListener(new LifecycleListener() {
@ -64,10 +55,6 @@ public class MlInitializationService extends AbstractComponent implements Cluste
if (event.localNodeMaster()) { if (event.localNodeMaster()) {
MetaData metaData = event.state().metaData(); MetaData metaData = event.state().metaData();
installMlMetadata(metaData); installMlMetadata(metaData);
putMlNoficationsIndexTemplate(metaData);
putMlMetaIndexTemplate(metaData);
putStateIndexTemplate(metaData);
putResultsIndexTemplate(metaData);
installDailyManagementService(); installDailyManagementService();
} else { } else {
uninstallDailyManagementService(); uninstallDailyManagementService();
@ -100,84 +87,6 @@ public class MlInitializationService extends AbstractComponent implements Cluste
} }
} }
private void putMlNoficationsIndexTemplate(MetaData metaData) {
if (metaData.templates().containsKey(Auditor.NOTIFICATIONS_INDEX) == false) {
if (putMlNotificationsIndexTemplateCheck.compareAndSet(false, true)) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
jobProvider.putNotificationMessageIndexTemplate((result, error) -> {
if (result) {
logger.info("successfully created {} index template", Auditor.NOTIFICATIONS_INDEX);
} else {
logger.error(
new ParameterizedMessage("not able to create {} index template", Auditor.NOTIFICATIONS_INDEX), error);
}
});
});
} else {
putMlNotificationsIndexTemplateCheck.set(false);
}
}
}
private void putMlMetaIndexTemplate(MetaData metaData) {
if (metaData.templates().containsKey(JobProvider.ML_META_INDEX) == false) {
if (putMlMetaIndexTemplateCheck.compareAndSet(false, true)) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
jobProvider.putMetaIndexTemplate((result, error) -> {
if (result) {
logger.info("successfully created {} index template", JobProvider.ML_META_INDEX);
} else {
logger.error(
new ParameterizedMessage("not able to create {} index template", JobProvider.ML_META_INDEX), error);
}
});
});
} else {
putMlMetaIndexTemplateCheck.set(false);
}
}
}
private void putStateIndexTemplate(MetaData metaData) {
String stateIndexName = AnomalyDetectorsIndex.jobStateIndexName();
if (metaData.templates().containsKey(stateIndexName) == false) {
if (putStateIndexTemplateCheck.compareAndSet(false, true)) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
jobProvider.putJobStateIndexTemplate((result, error) -> {
if (result) {
logger.info("successfully created {} index template", stateIndexName);
} else {
logger.error("not able to create " + stateIndexName + " index template", error);
}
});
});
} else {
putStateIndexTemplateCheck.set(false);
}
}
}
private void putResultsIndexTemplate(MetaData metaData) {
if (metaData.templates().containsKey(AnomalyDetectorsIndex.jobResultsIndexPrefix()) == false) {
if (putResultsIndexTemplateCheck.compareAndSet(false, true)) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
jobProvider.putJobResultsIndexTemplate((result, error) -> {
if (result) {
logger.info("successfully created {} index template", AnomalyDetectorsIndex.jobResultsIndexPrefix());
} else {
logger.error(
new ParameterizedMessage("not able to create {} index template",
AnomalyDetectorsIndex.jobResultsIndexPrefix()), error);
}
});
});
} else {
putResultsIndexTemplateCheck.set(false);
}
}
}
private void installDailyManagementService() { private void installDailyManagementService() {
if (mlDailyManagementService == null) { if (mlDailyManagementService == null) {
mlDailyManagementService = new MlDailyManagementService(threadPool, Arrays.asList((MlDailyManagementService.Listener) mlDailyManagementService = new MlDailyManagementService(threadPool, Arrays.asList((MlDailyManagementService.Listener)

View File

@ -36,7 +36,7 @@ import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.job.config.Detector; import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.MlFilter; import org.elasticsearch.xpack.ml.job.config.MlFilter;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import java.io.IOException; import java.io.IOException;
@ -188,7 +188,8 @@ public class DeleteFilterAction extends Action<DeleteFilterAction.Request, Delet
+ currentlyUsedBy); + currentlyUsedBy);
} }
DeleteRequest deleteRequest = new DeleteRequest(JobProvider.ML_META_INDEX, MlFilter.TYPE.getPreferredName(), filterId); DeleteRequest deleteRequest = new DeleteRequest(AnomalyDetectorsIndex.ML_META_INDEX,
MlFilter.TYPE.getPreferredName(), filterId);
BulkRequest bulkRequest = new BulkRequest(); BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(deleteRequest); bulkRequest.add(deleteRequest);
transportAction.execute(bulkRequest, new ActionListener<BulkResponse>() { transportAction.execute(bulkRequest, new ActionListener<BulkResponse>() {

View File

@ -44,6 +44,7 @@ import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.action.util.PageParams; import org.elasticsearch.xpack.ml.action.util.PageParams;
import org.elasticsearch.xpack.ml.action.util.QueryPage; import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.job.config.MlFilter; import org.elasticsearch.xpack.ml.job.config.MlFilter;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import java.io.IOException; import java.io.IOException;
@ -262,7 +263,7 @@ public class GetFiltersAction extends Action<GetFiltersAction.Request, GetFilter
} }
private void getFilter(String filterId, ActionListener<Response> listener) { private void getFilter(String filterId, ActionListener<Response> listener) {
GetRequest getRequest = new GetRequest(JobProvider.ML_META_INDEX, MlFilter.TYPE.getPreferredName(), filterId); GetRequest getRequest = new GetRequest(AnomalyDetectorsIndex.ML_META_INDEX, MlFilter.TYPE.getPreferredName(), filterId);
transportGetAction.execute(getRequest, new ActionListener<GetResponse>() { transportGetAction.execute(getRequest, new ActionListener<GetResponse>() {
@Override @Override
public void onResponse(GetResponse getDocResponse) { public void onResponse(GetResponse getDocResponse) {
@ -299,7 +300,7 @@ public class GetFiltersAction extends Action<GetFiltersAction.Request, GetFilter
.from(pageParams.getFrom()) .from(pageParams.getFrom())
.size(pageParams.getSize()); .size(pageParams.getSize());
SearchRequest searchRequest = new SearchRequest(new String[]{JobProvider.ML_META_INDEX}, sourceBuilder) SearchRequest searchRequest = new SearchRequest(new String[]{AnomalyDetectorsIndex.ML_META_INDEX}, sourceBuilder)
.indicesOptions(JobProvider.addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS)) .indicesOptions(JobProvider.addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS))
.types(MlFilter.TYPE.getPreferredName()); .types(MlFilter.TYPE.getPreferredName());

View File

@ -46,7 +46,6 @@ import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
@ -484,7 +483,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
static String[] indicesOfInterest(Job job) { static String[] indicesOfInterest(Job job) {
String jobResultIndex = AnomalyDetectorsIndex.jobResultsIndexName(job.getResultsIndexName()); String jobResultIndex = AnomalyDetectorsIndex.jobResultsIndexName(job.getResultsIndexName());
return new String[]{AnomalyDetectorsIndex.jobStateIndexName(), jobResultIndex, JobProvider.ML_META_INDEX}; return new String[]{AnomalyDetectorsIndex.jobStateIndexName(), jobResultIndex, AnomalyDetectorsIndex.ML_META_INDEX};
} }
static List<String> verifyIndicesPrimaryShardsAreActive(Logger logger, String jobId, ClusterState clusterState) { static List<String> verifyIndicesPrimaryShardsAreActive(Logger logger, String jobId, ClusterState clusterState) {

View File

@ -34,7 +34,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.job.config.MlFilter; import org.elasticsearch.xpack.ml.job.config.MlFilter;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import java.io.IOException; import java.io.IOException;
@ -177,7 +177,7 @@ public class PutFilterAction extends Action<PutFilterAction.Request, PutFilterAc
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception { protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
MlFilter filter = request.getFilter(); MlFilter filter = request.getFilter();
final String filterId = filter.getId(); final String filterId = filter.getId();
IndexRequest indexRequest = new IndexRequest(JobProvider.ML_META_INDEX, MlFilter.TYPE.getPreferredName(), filterId); IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.ML_META_INDEX, MlFilter.TYPE.getPreferredName(), filterId);
XContentBuilder builder = XContentFactory.jsonBuilder(); XContentBuilder builder = XContentFactory.jsonBuilder();
indexRequest.source(filter.toXContent(builder, ToXContent.EMPTY_PARAMS)); indexRequest.source(filter.toXContent(builder, ToXContent.EMPTY_PARAMS));
transportIndexAction.execute(indexRequest, new ActionListener<IndexResponse>() { transportIndexAction.execute(indexRequest, new ActionListener<IndexResponse>() {

View File

@ -12,6 +12,12 @@ import org.elasticsearch.xpack.ml.MlMetadata;
* Methods for handling index naming related functions * Methods for handling index naming related functions
*/ */
public final class AnomalyDetectorsIndex { public final class AnomalyDetectorsIndex {
/**
* Where to store the ml info in Elasticsearch - must match what's
* expected by kibana/engineAPI/app/directives/mlLogUsage.js
*/
public static final String ML_META_INDEX = ".ml-meta";
private static final String RESULTS_INDEX_PREFIX = ".ml-anomalies-"; private static final String RESULTS_INDEX_PREFIX = ".ml-anomalies-";
private static final String STATE_INDEX_NAME = ".ml-state"; private static final String STATE_INDEX_NAME = ".ml-state";

View File

@ -16,7 +16,6 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse; import org.elasticsearch.action.get.MultiGetItemResponse;
@ -28,7 +27,6 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
@ -39,11 +37,9 @@ import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.UidFieldMapper; import org.elasticsearch.index.mapper.UidFieldMapper;
@ -78,10 +74,6 @@ import org.elasticsearch.xpack.ml.job.results.Influencer;
import org.elasticsearch.xpack.ml.job.results.ModelDebugOutput; import org.elasticsearch.xpack.ml.job.results.ModelDebugOutput;
import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities; import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities;
import org.elasticsearch.xpack.ml.job.results.Result; import org.elasticsearch.xpack.ml.job.results.Result;
import org.elasticsearch.xpack.ml.notifications.AuditActivity;
import org.elasticsearch.xpack.ml.notifications.AuditMessage;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
@ -95,26 +87,18 @@ import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BiFunction; import java.util.function.BiFunction;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Supplier; import java.util.function.Supplier;
import static org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex.ML_META_INDEX;
public class JobProvider { public class JobProvider {
private static final Logger LOGGER = Loggers.getLogger(JobProvider.class); private static final Logger LOGGER = Loggers.getLogger(JobProvider.class);
/**
* Where to store the ml info in Elasticsearch - must match what's
* expected by kibana/engineAPI/app/directives/mlLogUsage.js
*/
public static final String ML_META_INDEX = ".ml-meta";
private static final String ASYNC = "async";
private static final List<String> SECONDARY_SORT = Arrays.asList( private static final List<String> SECONDARY_SORT = Arrays.asList(
AnomalyRecord.ANOMALY_SCORE.getPreferredName(), AnomalyRecord.ANOMALY_SCORE.getPreferredName(),
AnomalyRecord.OVER_FIELD_VALUE.getPreferredName(), AnomalyRecord.OVER_FIELD_VALUE.getPreferredName(),
@ -128,166 +112,11 @@ public class JobProvider {
private final Client client; private final Client client;
private final int numberOfReplicas;
// Allows us in test mode to disable the delay of shard allocation, so that in tests we don't have to wait for
// for at least a minute for shards to get allocated.
private final TimeValue delayedNodeTimeOutSetting;
private final Settings settings; private final Settings settings;
public JobProvider(Client client, int numberOfReplicas, Settings settings) { public JobProvider(Client client, int numberOfReplicas, Settings settings) {
this.client = Objects.requireNonNull(client); this.client = Objects.requireNonNull(client);
this.numberOfReplicas = numberOfReplicas;
this.settings = settings; this.settings = settings;
// Whether we are using native process is a good way to detect whether we are in dev / test mode:
if (MachineLearning.AUTODETECT_PROCESS.get(settings)) {
delayedNodeTimeOutSetting = UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(settings);
} else {
delayedNodeTimeOutSetting = TimeValue.timeValueNanos(0);
}
}
/**
* Index template for notifications
*/
public void putNotificationMessageIndexTemplate(BiConsumer<Boolean, Exception> listener) {
try {
PutIndexTemplateRequest templateRequest = new PutIndexTemplateRequest(Auditor.NOTIFICATIONS_INDEX);
templateRequest.patterns(Collections.singletonList(Auditor.NOTIFICATIONS_INDEX));
templateRequest.settings(mlNotificationIndexSettings());
templateRequest.mapping(AuditMessage.TYPE.getPreferredName(), ElasticsearchMappings.auditMessageMapping());
templateRequest.mapping(AuditActivity.TYPE.getPreferredName(), ElasticsearchMappings.auditActivityMapping());
client.admin().indices().putTemplate(templateRequest,
ActionListener.wrap(r -> listener.accept(true, null), e -> listener.accept(false, e)));
} catch (IOException e) {
LOGGER.warn("Error putting the template for the notification message index", e);
}
}
/**
* Index template for meta data
*/
public void putMetaIndexTemplate(BiConsumer<Boolean, Exception> listener) {
PutIndexTemplateRequest templateRequest = new PutIndexTemplateRequest(ML_META_INDEX);
templateRequest.patterns(Collections.singletonList(ML_META_INDEX));
templateRequest.settings(mlNotificationIndexSettings());
client.admin().indices().putTemplate(templateRequest,
ActionListener.wrap(r -> listener.accept(true, null), e -> listener.accept(false, e)));
}
public void putJobStateIndexTemplate(BiConsumer<Boolean, Exception> listener) {
try {
XContentBuilder categorizerStateMapping = ElasticsearchMappings.categorizerStateMapping();
XContentBuilder quantilesMapping = ElasticsearchMappings.quantilesMapping();
XContentBuilder modelStateMapping = ElasticsearchMappings.modelStateMapping();
PutIndexTemplateRequest templateRequest = new PutIndexTemplateRequest(AnomalyDetectorsIndex.jobStateIndexName());
templateRequest.patterns(Collections.singletonList(AnomalyDetectorsIndex.jobStateIndexName()));
templateRequest.settings(mlStateIndexSettings());
templateRequest.mapping(CategorizerState.TYPE, categorizerStateMapping);
templateRequest.mapping(Quantiles.TYPE.getPreferredName(), quantilesMapping);
templateRequest.mapping(ModelState.TYPE.getPreferredName(), modelStateMapping);
client.admin().indices().putTemplate(templateRequest,
ActionListener.wrap(r -> listener.accept(true, null), e -> listener.accept(false, e)));
} catch (Exception e) {
LOGGER.error("Error putting the template for the " + AnomalyDetectorsIndex.jobStateIndexName() + " index", e);
}
}
/**
* Build the Elasticsearch index settings that we want to apply to results
* indexes. It's better to do this in code rather than in elasticsearch.yml
* because then the settings can be applied regardless of whether we're
* using our own Elasticsearch to store results or a customer's pre-existing
* Elasticsearch.
*
* @return An Elasticsearch builder initialised with the desired settings
* for Ml indexes.
*/
Settings.Builder mlResultsIndexSettings() {
return Settings.builder()
// Our indexes are small and one shard puts the
// least possible burden on Elasticsearch
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), delayedNodeTimeOutSetting)
// Sacrifice durability for performance: in the event of power
// failure we can lose the last 5 seconds of changes, but it's
// much faster
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), ASYNC)
// We need to allow fields not mentioned in the mappings to
// pick up default mappings and be used in queries
.put(MapperService.INDEX_MAPPER_DYNAMIC_SETTING.getKey(), true)
// set the default all search field
.put(IndexSettings.DEFAULT_FIELD_SETTING.getKey(), ElasticsearchMappings.ALL_FIELD_VALUES);
}
/**
* Build the Elasticsearch index settings that we want to apply to the state
* index. It's better to do this in code rather than in elasticsearch.yml
* because then the settings can be applied regardless of whether we're
* using our own Elasticsearch to store results or a customer's pre-existing
* Elasticsearch.
*
* @return An Elasticsearch builder initialised with the desired settings
* for Ml indexes.
*/
Settings.Builder mlStateIndexSettings() {
// TODO review these settings
return Settings.builder()
// Our indexes are small and one shard puts the
// least possible burden on Elasticsearch
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), delayedNodeTimeOutSetting)
// Sacrifice durability for performance: in the event of power
// failure we can lose the last 5 seconds of changes, but it's
// much faster
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), ASYNC);
}
/**
* Settings for the notification messages index
*
* @return An Elasticsearch builder initialised with the desired settings
* for Ml indexes.
*/
Settings.Builder mlNotificationIndexSettings() {
return Settings.builder()
// Our indexes are small and one shard puts the
// least possible burden on Elasticsearch
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), delayedNodeTimeOutSetting)
// We need to allow fields not mentioned in the mappings to
// pick up default mappings and be used in queries
.put(MapperService.INDEX_MAPPER_DYNAMIC_SETTING.getKey(), true);
}
public void putJobResultsIndexTemplate(BiConsumer<Boolean, Exception> listener) {
try {
XContentBuilder resultsMapping = ElasticsearchMappings.resultsMapping();
XContentBuilder categoryDefinitionMapping = ElasticsearchMappings.categoryDefinitionMapping();
XContentBuilder dataCountsMapping = ElasticsearchMappings.dataCountsMapping();
XContentBuilder modelSnapshotMapping = ElasticsearchMappings.modelSnapshotMapping();
PutIndexTemplateRequest templateRequest = new PutIndexTemplateRequest(AnomalyDetectorsIndex.jobResultsIndexPrefix());
templateRequest.patterns(Collections.singletonList(AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*"));
templateRequest.settings(mlResultsIndexSettings());
templateRequest.mapping(Result.TYPE.getPreferredName(), resultsMapping);
templateRequest.mapping(CategoryDefinition.TYPE.getPreferredName(), categoryDefinitionMapping);
templateRequest.mapping(DataCounts.TYPE.getPreferredName(), dataCountsMapping);
templateRequest.mapping(ModelSnapshot.TYPE.getPreferredName(), modelSnapshotMapping);
client.admin().indices().putTemplate(templateRequest,
ActionListener.wrap(r -> listener.accept(true, null), e -> listener.accept(false, e)));
} catch (Exception e) {
LOGGER.error("Error putting the template for the " + AnomalyDetectorsIndex.jobResultsIndexPrefix() + " indices", e);
}
} }
/** /**

View File

@ -0,0 +1,302 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.job.persistence.MockClientBuilder;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.CategorizerState;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelState;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.ml.job.results.CategoryDefinition;
import org.elasticsearch.xpack.ml.job.results.Result;
import org.elasticsearch.xpack.ml.notifications.AuditActivity;
import org.elasticsearch.xpack.ml.notifications.AuditMessage;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import java.net.InetAddress;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import static org.elasticsearch.mock.orig.Mockito.doAnswer;
import static org.elasticsearch.mock.orig.Mockito.times;
import static org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex.ML_META_INDEX;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class MachineLearningTemplateRegistryTests extends ESTestCase {
private static final String CLUSTER_NAME = "clusterMcClusterFace";
private ClusterService clusterService;
private ExecutorService executorService;
private Client client;
private ThreadPool threadPool;
@Before
public void setUpMocks() {
threadPool = mock(ThreadPool.class);
executorService = mock(ExecutorService.class);
clusterService = mock(ClusterService.class);
client = mock(Client.class);
doAnswer(invocation -> {
((Runnable) invocation.getArguments()[0]).run();
return null;
}).when(executorService).execute(any(Runnable.class));
when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService);
}
public void testAddsListener() throws Exception {
MachineLearningTemplateRegistry templateRegistry =
new MachineLearningTemplateRegistry(Settings.EMPTY, clusterService, client, threadPool);
verify(clusterService, times(1)).addListener(templateRegistry);
}
public void testAddTemplatesIfMissing() throws Exception {
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
ArgumentCaptor<PutIndexTemplateRequest> captor = ArgumentCaptor.forClass(PutIndexTemplateRequest.class);
clientBuilder.putTemplate(captor);
MachineLearningTemplateRegistry templateRegistry =
new MachineLearningTemplateRegistry(Settings.EMPTY, clusterService, clientBuilder.build(), threadPool);
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT))
.localNodeId("_node_id")
.masterNodeId("_node_id"))
.metaData(MetaData.builder())
.build();
templateRegistry.clusterChanged(new ClusterChangedEvent("_source", cs, cs));
verify(threadPool, times(4)).executor(anyString());
assertFalse(templateRegistry.putMlNotificationsIndexTemplateCheck.get());
assertFalse(templateRegistry.putMlMetaIndexTemplateCheck.get());
assertFalse(templateRegistry.putMlNotificationsIndexTemplateCheck.get());
assertFalse(templateRegistry.putResultsIndexTemplateCheck.get());
}
public void testAddTemplatesIfMissing_alreadyInitialized() throws Exception {
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
ArgumentCaptor<PutIndexTemplateRequest> captor = ArgumentCaptor.forClass(PutIndexTemplateRequest.class);
clientBuilder.putTemplate(captor);
MachineLearningTemplateRegistry templateRegistry =
new MachineLearningTemplateRegistry(Settings.EMPTY, clusterService, clientBuilder.build(), threadPool);
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT))
.localNodeId("_node_id")
.masterNodeId("_node_id"))
.metaData(MetaData.builder()
.put(IndexMetaData.builder(Auditor.NOTIFICATIONS_INDEX).settings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
))
.put(IndexMetaData.builder(AnomalyDetectorsIndex.ML_META_INDEX).settings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
))
.put(IndexMetaData.builder(AnomalyDetectorsIndex.jobStateIndexName()).settings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
))
.put(IndexTemplateMetaData.builder(Auditor.NOTIFICATIONS_INDEX).version(Version.CURRENT.id).build())
.put(IndexTemplateMetaData.builder(AnomalyDetectorsIndex.ML_META_INDEX).version(Version.CURRENT.id).build())
.put(IndexTemplateMetaData.builder(AnomalyDetectorsIndex.jobStateIndexName()).version(Version.CURRENT.id).build())
.put(IndexTemplateMetaData.builder(
AnomalyDetectorsIndex.jobResultsIndexPrefix()).version(Version.CURRENT.id).build())
.putCustom(MlMetadata.TYPE, new MlMetadata.Builder().build()))
.build();
MlDailyManagementService initialDailyManagementService = mock(MlDailyManagementService.class);
templateRegistry.clusterChanged(new ClusterChangedEvent("_source", cs, cs));
verify(threadPool, times(0)).executor(anyString());
assertFalse(templateRegistry.putMlNotificationsIndexTemplateCheck.get());
assertFalse(templateRegistry.putMlMetaIndexTemplateCheck.get());
assertFalse(templateRegistry.putMlNotificationsIndexTemplateCheck.get());
assertFalse(templateRegistry.putResultsIndexTemplateCheck.get());
}
public void testMlResultsIndexSettings() {
MachineLearningTemplateRegistry templateRegistry =
new MachineLearningTemplateRegistry(createSettings(), clusterService, client, threadPool);
Settings settings = templateRegistry.mlResultsIndexSettings().build();
assertEquals("1", settings.get("index.number_of_shards"));
assertEquals("0-2", settings.get("index.auto_expand_replicas"));
assertEquals("async", settings.get("index.translog.durability"));
assertEquals("true", settings.get("index.mapper.dynamic"));
assertEquals("all_field_values", settings.get("index.query.default_field"));
assertEquals("2s", settings.get("index.unassigned.node_left.delayed_timeout"));
}
public void testMlAuditIndexSettings() {
MachineLearningTemplateRegistry templateRegistry =
new MachineLearningTemplateRegistry(createSettings(), clusterService, client, threadPool);
Settings settings = templateRegistry.mlResultsIndexSettings().build();
assertEquals("1", settings.get("index.number_of_shards"));
assertEquals("0-2", settings.get("index.auto_expand_replicas"));
assertEquals("async", settings.get("index.translog.durability"));
assertEquals("true", settings.get("index.mapper.dynamic"));
assertEquals("2s", settings.get("index.unassigned.node_left.delayed_timeout"));
}
public void testMlStateIndexSettings() {
MachineLearningTemplateRegistry templateRegistry =
new MachineLearningTemplateRegistry(createSettings(), clusterService, client, threadPool);
Settings settings = templateRegistry.mlResultsIndexSettings().build();
assertEquals("1", settings.get("index.number_of_shards"));
assertEquals("0-2", settings.get("index.auto_expand_replicas"));
assertEquals("async", settings.get("index.translog.durability"));
assertEquals("2s", settings.get("index.unassigned.node_left.delayed_timeout"));
}
public void testPutNotificationIndexTemplate() {
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
ArgumentCaptor<PutIndexTemplateRequest> captor = ArgumentCaptor.forClass(PutIndexTemplateRequest.class);
clientBuilder.putTemplate(captor);
MachineLearningTemplateRegistry templateRegistry =
new MachineLearningTemplateRegistry(createSettings(), clusterService, clientBuilder.build(), threadPool);
templateRegistry.putNotificationMessageIndexTemplate((result, error) -> {
assertTrue(result);
PutIndexTemplateRequest request = captor.getValue();
assertNotNull(request);
assertEquals(templateRegistry.mlNotificationIndexSettings().build(), request.settings());
assertTrue(request.mappings().containsKey(AuditMessage.TYPE.getPreferredName()));
assertTrue(request.mappings().containsKey(AuditActivity.TYPE.getPreferredName()));
assertEquals(2, request.mappings().size());
assertEquals(Collections.singletonList(Auditor.NOTIFICATIONS_INDEX), request.patterns());
assertEquals(new Integer(Version.CURRENT.id), request.version());
});
}
public void testPutMetaIndexTemplate() {
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
ArgumentCaptor<PutIndexTemplateRequest> captor = ArgumentCaptor.forClass(PutIndexTemplateRequest.class);
clientBuilder.putTemplate(captor);
MachineLearningTemplateRegistry templateRegistry =
new MachineLearningTemplateRegistry(createSettings(), clusterService, clientBuilder.build(), threadPool);
templateRegistry.putMetaIndexTemplate((result, error) -> {
assertTrue(result);
PutIndexTemplateRequest request = captor.getValue();
assertNotNull(request);
assertEquals(templateRegistry.mlNotificationIndexSettings().build(), request.settings());
assertEquals(0, request.mappings().size());
assertEquals(Collections.singletonList(ML_META_INDEX), request.patterns());
assertEquals(new Integer(Version.CURRENT.id), request.version());
});
}
public void testPutJobStateIndexTemplate() {
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
ArgumentCaptor<PutIndexTemplateRequest> captor = ArgumentCaptor.forClass(PutIndexTemplateRequest.class);
clientBuilder.putTemplate(captor);
MachineLearningTemplateRegistry templateRegistry =
new MachineLearningTemplateRegistry(createSettings(), clusterService, clientBuilder.build(), threadPool);
templateRegistry.putJobStateIndexTemplate((result, error) -> {
assertTrue(result);
PutIndexTemplateRequest request = captor.getValue();
assertNotNull(request);
assertEquals(templateRegistry.mlStateIndexSettings().build(), request.settings());
assertTrue(request.mappings().containsKey(CategorizerState.TYPE));
assertTrue(request.mappings().containsKey(Quantiles.TYPE.getPreferredName()));
assertTrue(request.mappings().containsKey(ModelState.TYPE.getPreferredName()));
assertEquals(3, request.mappings().size());
assertEquals(Collections.singletonList(AnomalyDetectorsIndex.jobStateIndexName()), request.patterns());
assertEquals(new Integer(Version.CURRENT.id), request.version());
});
}
public void testPutJobResultsIndexTemplate() {
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
ArgumentCaptor<PutIndexTemplateRequest> captor = ArgumentCaptor.forClass(PutIndexTemplateRequest.class);
clientBuilder.putTemplate(captor);
MachineLearningTemplateRegistry templateRegistry =
new MachineLearningTemplateRegistry(createSettings(), clusterService, clientBuilder.build(), threadPool);
templateRegistry.putJobResultsIndexTemplate((result, error) -> {
assertTrue(result);
PutIndexTemplateRequest request = captor.getValue();
assertNotNull(request);
assertEquals(templateRegistry.mlResultsIndexSettings().build(), request.settings());
assertTrue(request.mappings().containsKey(Result.TYPE.getPreferredName()));
assertTrue(request.mappings().containsKey(CategoryDefinition.TYPE.getPreferredName()));
assertTrue(request.mappings().containsKey(DataCounts.TYPE.getPreferredName()));
assertTrue(request.mappings().containsKey(ModelSnapshot.TYPE.getPreferredName()));
assertEquals(4, request.mappings().size());
assertEquals(Collections.singletonList(AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*"), request.patterns());
assertEquals(new Integer(Version.CURRENT.id), request.version());
});
}
public void testTemplateIsPresentAndUpToDate() {
MachineLearningTemplateRegistry templateRegistry =
new MachineLearningTemplateRegistry(createSettings(), clusterService, client, threadPool);
// missing template
MetaData metaData = MetaData.builder().build();
assertFalse(templateRegistry.templateIsPresentAndUpToDate(Auditor.NOTIFICATIONS_INDEX, metaData));
// old version of template
IndexTemplateMetaData templateMetaData = IndexTemplateMetaData.builder(Auditor.NOTIFICATIONS_INDEX)
.version(Version.CURRENT.id - 1).build();
metaData = MetaData.builder().put(templateMetaData).build();
assertFalse(templateRegistry.templateIsPresentAndUpToDate(Auditor.NOTIFICATIONS_INDEX, metaData));
// latest template
templateMetaData = IndexTemplateMetaData.builder(Auditor.NOTIFICATIONS_INDEX)
.version(Version.CURRENT.id).build();
metaData = MetaData.builder().put(templateMetaData).build();
assertTrue(templateRegistry.templateIsPresentAndUpToDate(Auditor.NOTIFICATIONS_INDEX, metaData));
}
private Settings createSettings() {
return Settings.builder()
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueSeconds(2))
.put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), 1001L)
.build();
}
}

View File

@ -10,8 +10,6 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
@ -20,15 +18,12 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.junit.Before; import org.junit.Before;
import java.net.InetAddress; import java.net.InetAddress;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.function.BiConsumer;
import static org.elasticsearch.mock.orig.Mockito.doAnswer; import static org.elasticsearch.mock.orig.Mockito.doAnswer;
import static org.elasticsearch.mock.orig.Mockito.times; import static org.elasticsearch.mock.orig.Mockito.times;
@ -46,7 +41,6 @@ public class MlInitializationServiceTests extends ESTestCase {
private ExecutorService executorService; private ExecutorService executorService;
private ClusterService clusterService; private ClusterService clusterService;
private Client client; private Client client;
private JobProvider jobProvider;
private Auditor auditor; private Auditor auditor;
@Before @Before
@ -55,7 +49,6 @@ public class MlInitializationServiceTests extends ESTestCase {
executorService = mock(ExecutorService.class); executorService = mock(ExecutorService.class);
clusterService = mock(ClusterService.class); clusterService = mock(ClusterService.class);
client = mock(Client.class); client = mock(Client.class);
jobProvider = mock(JobProvider.class);
auditor = mock(Auditor.class); auditor = mock(Auditor.class);
doAnswer(invocation -> { doAnswer(invocation -> {
@ -63,16 +56,14 @@ public class MlInitializationServiceTests extends ESTestCase {
return null; return null;
}).when(executorService).execute(any(Runnable.class)); }).when(executorService).execute(any(Runnable.class));
when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService); when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService);
ScheduledFuture scheduledFuture = mock(ScheduledFuture.class); ScheduledFuture scheduledFuture = mock(ScheduledFuture.class);
when(threadPool.schedule(any(), any(), any())).thenReturn(scheduledFuture); when(threadPool.schedule(any(), any(), any())).thenReturn(scheduledFuture);
} }
public void testInitialize() throws Exception { public void testInitialize() throws Exception {
JobProvider jobProvider = mockJobProvider();
ClusterService clusterService = mock(ClusterService.class);
MlInitializationService initializationService = MlInitializationService initializationService =
new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, jobProvider, auditor); new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, auditor);
ClusterState cs = ClusterState.builder(new ClusterName("_name")) ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.nodes(DiscoveryNodes.builder() .nodes(DiscoveryNodes.builder()
@ -85,15 +76,11 @@ public class MlInitializationServiceTests extends ESTestCase {
verify(clusterService, times(1)).submitStateUpdateTask(eq("install-ml-metadata"), any()); verify(clusterService, times(1)).submitStateUpdateTask(eq("install-ml-metadata"), any());
assertThat(initializationService.getDailyManagementService().isStarted(), is(true)); assertThat(initializationService.getDailyManagementService().isStarted(), is(true));
verify(jobProvider, times(1)).putNotificationMessageIndexTemplate(any());
verify(jobProvider, times(1)).putMetaIndexTemplate(any());
verify(jobProvider, times(1)).putJobStateIndexTemplate(any());
verify(jobProvider, times(1)).putJobResultsIndexTemplate(any());
} }
public void testInitialize_noMasterNode() throws Exception { public void testInitialize_noMasterNode() throws Exception {
MlInitializationService initializationService = MlInitializationService initializationService =
new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, jobProvider, auditor); new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, auditor);
ClusterState cs = ClusterState.builder(new ClusterName("_name")) ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.nodes(DiscoveryNodes.builder() .nodes(DiscoveryNodes.builder()
@ -104,25 +91,12 @@ public class MlInitializationServiceTests extends ESTestCase {
verify(clusterService, times(0)).submitStateUpdateTask(eq("install-ml-metadata"), any()); verify(clusterService, times(0)).submitStateUpdateTask(eq("install-ml-metadata"), any());
assertThat(initializationService.getDailyManagementService(), is(nullValue())); assertThat(initializationService.getDailyManagementService(), is(nullValue()));
verify(jobProvider, times(0)).putNotificationMessageIndexTemplate(any());
verify(jobProvider, times(0)).putMetaIndexTemplate(any());
verify(jobProvider, times(0)).putJobStateIndexTemplate(any());
verify(jobProvider, times(0)).putJobResultsIndexTemplate(any());
} }
public void testInitialize_alreadyInitialized() throws Exception { public void testInitialize_alreadyInitialized() throws Exception {
ThreadPool threadPool = mock(ThreadPool.class);
ExecutorService executorService = mock(ExecutorService.class);
doAnswer(invocation -> {
((Runnable) invocation.getArguments()[0]).run();
return null;
}).when(executorService).execute(any(Runnable.class));
when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService);
ClusterService clusterService = mock(ClusterService.class); ClusterService clusterService = mock(ClusterService.class);
JobProvider jobProvider = mockJobProvider();
MlInitializationService initializationService = MlInitializationService initializationService =
new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, jobProvider, auditor); new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, auditor);
ClusterState cs = ClusterState.builder(new ClusterName("_name")) ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.nodes(DiscoveryNodes.builder() .nodes(DiscoveryNodes.builder()
@ -130,25 +104,6 @@ public class MlInitializationServiceTests extends ESTestCase {
.localNodeId("_node_id") .localNodeId("_node_id")
.masterNodeId("_node_id")) .masterNodeId("_node_id"))
.metaData(MetaData.builder() .metaData(MetaData.builder()
.put(IndexMetaData.builder(Auditor.NOTIFICATIONS_INDEX).settings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
))
.put(IndexMetaData.builder(JobProvider.ML_META_INDEX).settings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
))
.put(IndexMetaData.builder(AnomalyDetectorsIndex.jobStateIndexName()).settings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
))
.put(IndexTemplateMetaData.builder(Auditor.NOTIFICATIONS_INDEX).build())
.put(IndexTemplateMetaData.builder(JobProvider.ML_META_INDEX).build())
.put(IndexTemplateMetaData.builder(AnomalyDetectorsIndex.jobStateIndexName()).build())
.put(IndexTemplateMetaData.builder(AnomalyDetectorsIndex.jobResultsIndexPrefix()).build())
.putCustom(MlMetadata.TYPE, new MlMetadata.Builder().build())) .putCustom(MlMetadata.TYPE, new MlMetadata.Builder().build()))
.build(); .build();
MlDailyManagementService initialDailyManagementService = mock(MlDailyManagementService.class); MlDailyManagementService initialDailyManagementService = mock(MlDailyManagementService.class);
@ -157,25 +112,12 @@ public class MlInitializationServiceTests extends ESTestCase {
verify(clusterService, times(0)).submitStateUpdateTask(eq("install-ml-metadata"), any()); verify(clusterService, times(0)).submitStateUpdateTask(eq("install-ml-metadata"), any());
assertSame(initialDailyManagementService, initializationService.getDailyManagementService()); assertSame(initialDailyManagementService, initializationService.getDailyManagementService());
verify(jobProvider, times(0)).putNotificationMessageIndexTemplate(any());
verify(jobProvider, times(0)).putMetaIndexTemplate(any());
verify(jobProvider, times(0)).putJobStateIndexTemplate(any());
verify(jobProvider, times(0)).putJobResultsIndexTemplate(any());
} }
public void testInitialize_onlyOnce() throws Exception { public void testInitialize_onlyOnce() throws Exception {
ThreadPool threadPool = mock(ThreadPool.class);
ExecutorService executorService = mock(ExecutorService.class);
doAnswer(invocation -> {
((Runnable) invocation.getArguments()[0]).run();
return null;
}).when(executorService).execute(any(Runnable.class));
when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService);
ClusterService clusterService = mock(ClusterService.class); ClusterService clusterService = mock(ClusterService.class);
JobProvider jobProvider = mockJobProvider();
MlInitializationService initializationService = MlInitializationService initializationService =
new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, jobProvider, auditor); new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, auditor);
ClusterState cs = ClusterState.builder(new ClusterName("_name")) ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.nodes(DiscoveryNodes.builder() .nodes(DiscoveryNodes.builder()
@ -188,42 +130,11 @@ public class MlInitializationServiceTests extends ESTestCase {
initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs)); initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs));
verify(clusterService, times(1)).submitStateUpdateTask(eq("install-ml-metadata"), any()); verify(clusterService, times(1)).submitStateUpdateTask(eq("install-ml-metadata"), any());
verify(jobProvider, times(1)).putNotificationMessageIndexTemplate(any());
verify(jobProvider, times(1)).putMetaIndexTemplate(any());
verify(jobProvider, times(1)).putJobStateIndexTemplate(any());
verify(jobProvider, times(1)).putJobResultsIndexTemplate(any());
}
private JobProvider mockJobProvider() {
JobProvider jobProvider = mock(JobProvider.class);
doAnswer(invocation -> {
BiConsumer<Boolean, Exception> listener = (BiConsumer<Boolean, Exception>)invocation.getArguments()[0];
listener.accept(true, null);
return null;
}).when(jobProvider).putMetaIndexTemplate(any());
doAnswer(invocation -> {
BiConsumer<Boolean, Exception> listener = (BiConsumer<Boolean, Exception>)invocation.getArguments()[0];
listener.accept(true, null);
return null;
}).when(jobProvider).putNotificationMessageIndexTemplate(any());
doAnswer(invocation -> {
BiConsumer<Boolean, Exception> listener = (BiConsumer<Boolean, Exception>)invocation.getArguments()[0];
listener.accept(true, null);
return null;
}).when(jobProvider).putJobStateIndexTemplate(any());
doAnswer(invocation -> {
BiConsumer<Boolean, Exception> listener = (BiConsumer<Boolean, Exception>)invocation.getArguments()[0];
listener.accept(true, null);
return null;
}).when(jobProvider).putJobResultsIndexTemplate(any());
return jobProvider;
} }
public void testNodeGoesFromMasterToNonMasterAndBack() throws Exception { public void testNodeGoesFromMasterToNonMasterAndBack() throws Exception {
MlInitializationService initializationService = MlInitializationService initializationService =
new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, jobProvider, auditor); new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, auditor);
MlDailyManagementService initialDailyManagementService = mock(MlDailyManagementService.class); MlDailyManagementService initialDailyManagementService = mock(MlDailyManagementService.class);
initializationService.setDailyManagementService(initialDailyManagementService); initializationService.setDailyManagementService(initialDailyManagementService);

View File

@ -315,7 +315,7 @@ public class OpenJobActionTests extends ESTestCase {
private void addJobAndIndices(MetaData.Builder metaData, RoutingTable.Builder routingTable, String... jobIds) { private void addJobAndIndices(MetaData.Builder metaData, RoutingTable.Builder routingTable, String... jobIds) {
List<String> indices = new ArrayList<>(); List<String> indices = new ArrayList<>();
indices.add(AnomalyDetectorsIndex.jobStateIndexName()); indices.add(AnomalyDetectorsIndex.jobStateIndexName());
indices.add(JobProvider.ML_META_INDEX); indices.add(AnomalyDetectorsIndex.ML_META_INDEX);
indices.add(Auditor.NOTIFICATIONS_INDEX); indices.add(Auditor.NOTIFICATIONS_INDEX);
for (String jobId : jobIds) { for (String jobId : jobIds) {
indices.add(AnomalyDetectorsIndex.jobResultsIndexName(jobId)); indices.add(AnomalyDetectorsIndex.jobResultsIndexName(jobId));

View File

@ -5,15 +5,19 @@
*/ */
package org.elasticsearch.xpack.ml.integration; package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.XPackPlugin; import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.XPackSettings; import org.elasticsearch.xpack.XPackSettings;
import org.elasticsearch.xpack.XPackSingleNodeTestCase; import org.elasticsearch.xpack.XPackSingleNodeTestCase;
import org.elasticsearch.xpack.ml.MachineLearningTemplateRegistry;
import org.elasticsearch.xpack.ml.action.util.QueryPage; import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder; import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder;
import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder; import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder;
@ -51,8 +55,11 @@ import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -95,7 +102,7 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase {
} }
public void testProcessResults() throws Exception { public void testProcessResults() throws Exception {
putResultsIndexMappingTemplate(); putIndexTemplates();
ResultsBuilder builder = new ResultsBuilder(); ResultsBuilder builder = new ResultsBuilder();
Bucket bucket = createBucket(false); Bucket bucket = createBucket(false);
@ -155,7 +162,7 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase {
} }
public void testDeleteInterimResults() throws Exception { public void testDeleteInterimResults() throws Exception {
putResultsIndexMappingTemplate(); putIndexTemplates();
Bucket nonInterimBucket = createBucket(false); Bucket nonInterimBucket = createBucket(false);
Bucket interimBucket = createBucket(true); Bucket interimBucket = createBucket(true);
@ -184,7 +191,7 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase {
} }
public void testMultipleFlushesBetweenPersisting() throws Exception { public void testMultipleFlushesBetweenPersisting() throws Exception {
putResultsIndexMappingTemplate(); putIndexTemplates();
Bucket finalBucket = createBucket(true); Bucket finalBucket = createBucket(true);
List<AnomalyRecord> finalAnomalyRecords = createRecords(true); List<AnomalyRecord> finalAnomalyRecords = createRecords(true);
@ -214,7 +221,7 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase {
} }
public void testEndOfStreamTriggersPersisting() throws Exception { public void testEndOfStreamTriggersPersisting() throws Exception {
putResultsIndexMappingTemplate(); putIndexTemplates();
Bucket bucket = createBucket(false); Bucket bucket = createBucket(false);
List<AnomalyRecord> firstSetOfRecords = createRecords(false); List<AnomalyRecord> firstSetOfRecords = createRecords(false);
List<AnomalyRecord> secondSetOfRecords = createRecords(false); List<AnomalyRecord> secondSetOfRecords = createRecords(false);
@ -236,10 +243,17 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase {
assertResultsAreSame(allRecords, persistedRecords); assertResultsAreSame(allRecords, persistedRecords);
} }
private void putResultsIndexMappingTemplate() throws InterruptedException { private void putIndexTemplates() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1); ThreadPool threadPool = mock(ThreadPool.class);
jobProvider.putJobResultsIndexTemplate((aBoolean, e) -> {latch.countDown();}); ExecutorService executorService = mock(ExecutorService.class);
latch.await(); doAnswer(invocation -> {
((Runnable) invocation.getArguments()[0]).run();
return null;
}).when(executorService).execute(any(Runnable.class));
when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService);
new MachineLearningTemplateRegistry(Settings.EMPTY, mock(ClusterService.class), client(), threadPool)
.addTemplatesIfMissing(client().admin().cluster().state(new ClusterStateRequest().all()).actionGet().getState());
} }
private Bucket createBucket(boolean isInterim) { private Bucket createBucket(boolean isInterim) {

View File

@ -11,7 +11,6 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.MultiSearchRequest; import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.MultiSearchResponse; import org.elasticsearch.action.search.MultiSearchResponse;
@ -41,12 +40,10 @@ import org.elasticsearch.search.SearchHits;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.action.DeleteJobAction; import org.elasticsearch.xpack.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.ml.action.util.QueryPage; import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.job.config.AnalysisLimits;
import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder.InfluencersQuery; import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder.InfluencersQuery;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.CategorizerState; import org.elasticsearch.xpack.ml.job.process.autodetect.state.CategorizerState;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelState; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelState;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles; import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles;
@ -56,9 +53,6 @@ import org.elasticsearch.xpack.ml.job.results.CategoryDefinition;
import org.elasticsearch.xpack.ml.job.results.Influencer; import org.elasticsearch.xpack.ml.job.results.Influencer;
import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities; import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities;
import org.elasticsearch.xpack.ml.job.results.Result; import org.elasticsearch.xpack.ml.job.results.Result;
import org.elasticsearch.xpack.ml.notifications.AuditActivity;
import org.elasticsearch.xpack.ml.notifications.AuditMessage;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
@ -76,7 +70,6 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer; import java.util.function.Consumer;
import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder; import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder;
import static org.elasticsearch.xpack.ml.job.persistence.JobProvider.ML_META_INDEX;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
@ -138,40 +131,6 @@ public class JobProviderTests extends ESTestCase {
assertEquals("", quantiles.getQuantileState()); assertEquals("", quantiles.getQuantileState());
} }
public void testMlResultsIndexSettings() {
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
JobProvider provider = createProvider(clientBuilder.build());
Settings settings = provider.mlResultsIndexSettings().build();
assertEquals("1", settings.get("index.number_of_shards"));
assertEquals("0", settings.get("index.number_of_replicas"));
assertEquals("async", settings.get("index.translog.durability"));
assertEquals("true", settings.get("index.mapper.dynamic"));
assertEquals("all_field_values", settings.get("index.query.default_field"));
}
public void testPutJobResultsIndexTemplate() {
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
ArgumentCaptor<PutIndexTemplateRequest> captor = ArgumentCaptor.forClass(PutIndexTemplateRequest.class);
clientBuilder.putTemplate(captor);
Job.Builder job = buildJobBuilder("foo");
JobProvider provider = createProvider(clientBuilder.build());
provider.putJobResultsIndexTemplate((result, error) -> {
assertTrue(result);
PutIndexTemplateRequest request = captor.getValue();
assertNotNull(request);
assertEquals(provider.mlResultsIndexSettings().build(), request.settings());
assertTrue(request.mappings().containsKey(Result.TYPE.getPreferredName()));
assertTrue(request.mappings().containsKey(CategoryDefinition.TYPE.getPreferredName()));
assertTrue(request.mappings().containsKey(DataCounts.TYPE.getPreferredName()));
assertTrue(request.mappings().containsKey(ModelSnapshot.TYPE.getPreferredName()));
assertEquals(4, request.mappings().size());
assertEquals(Collections.singletonList(AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*"), request.patterns());
});
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void testCreateJobResultsIndex() { public void testCreateJobResultsIndex() {
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME); MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
@ -353,85 +312,7 @@ public class JobProviderTests extends ESTestCase {
}); });
} }
public void testMlAuditIndexSettings() { public void testDeleteJobRelatedIndices() throws InterruptedException, ExecutionException, IOException {
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
JobProvider provider = createProvider(clientBuilder.build());
Settings settings = provider.mlResultsIndexSettings().build();
assertEquals("1", settings.get("index.number_of_shards"));
assertEquals("0", settings.get("index.number_of_replicas"));
assertEquals("async", settings.get("index.translog.durability"));
assertEquals("true", settings.get("index.mapper.dynamic"));
}
public void testPutNotificationIndexTemplate() {
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
ArgumentCaptor<PutIndexTemplateRequest> captor = ArgumentCaptor.forClass(PutIndexTemplateRequest.class);
clientBuilder.putTemplate(captor);
JobProvider provider = createProvider(clientBuilder.build());
provider.putNotificationMessageIndexTemplate((result, error) -> {
assertTrue(result);
PutIndexTemplateRequest request = captor.getValue();
assertNotNull(request);
assertEquals(provider.mlNotificationIndexSettings().build(), request.settings());
assertTrue(request.mappings().containsKey(AuditMessage.TYPE.getPreferredName()));
assertTrue(request.mappings().containsKey(AuditActivity.TYPE.getPreferredName()));
assertEquals(2, request.mappings().size());
assertEquals(Collections.singletonList(Auditor.NOTIFICATIONS_INDEX), request.patterns());
});
}
public void testPutMetaIndexTemplate() {
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
ArgumentCaptor<PutIndexTemplateRequest> captor = ArgumentCaptor.forClass(PutIndexTemplateRequest.class);
clientBuilder.putTemplate(captor);
JobProvider provider = createProvider(clientBuilder.build());
provider.putMetaIndexTemplate((result, error) -> {
assertTrue(result);
PutIndexTemplateRequest request = captor.getValue();
assertNotNull(request);
assertEquals(provider.mlNotificationIndexSettings().build(), request.settings());
assertEquals(0, request.mappings().size());
assertEquals(Collections.singletonList(ML_META_INDEX), request.patterns());
});
}
public void testMlStateIndexSettings() {
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
JobProvider provider = createProvider(clientBuilder.build());
Settings settings = provider.mlResultsIndexSettings().build();
assertEquals("1", settings.get("index.number_of_shards"));
assertEquals("0", settings.get("index.number_of_replicas"));
assertEquals("async", settings.get("index.translog.durability"));
}
public void testPutJobStateIndexTemplate() {
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
ArgumentCaptor<PutIndexTemplateRequest> captor = ArgumentCaptor.forClass(PutIndexTemplateRequest.class);
clientBuilder.putTemplate(captor);
Job.Builder job = buildJobBuilder("foo");
JobProvider provider = createProvider(clientBuilder.build());
provider.putJobStateIndexTemplate((result, error) -> {
assertTrue(result);
PutIndexTemplateRequest request = captor.getValue();
assertNotNull(request);
assertEquals(provider.mlStateIndexSettings().build(), request.settings());
assertTrue(request.mappings().containsKey(CategorizerState.TYPE));
assertTrue(request.mappings().containsKey(Quantiles.TYPE.getPreferredName()));
assertTrue(request.mappings().containsKey(ModelState.TYPE.getPreferredName()));
assertEquals(3, request.mappings().size());
assertEquals(Collections.singletonList(AnomalyDetectorsIndex.jobStateIndexName()), request.patterns());
});
}
public void testDeleteJobRelatedIndices() throws InterruptedException, ExecutionException, IOException {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
ActionListener<DeleteJobAction.Response> actionListener = mock(ActionListener.class); ActionListener<DeleteJobAction.Response> actionListener = mock(ActionListener.class);
String jobId = "ThisIsMyJob"; String jobId = "ThisIsMyJob";
@ -1258,10 +1139,7 @@ public class JobProviderTests extends ESTestCase {
} }
private JobProvider createProvider(Client client) { private JobProvider createProvider(Client client) {
Settings.Builder builder = Settings.builder() return new JobProvider(client, 0, Settings.EMPTY);
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueSeconds(1))
.put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), 1000L);
return new JobProvider(client, 0, builder.build());
} }
private static GetResponse createGetResponse(boolean exists, Map<String, Object> source) throws IOException { private static GetResponse createGetResponse(boolean exists, Map<String, Object> source) throws IOException {