[ML] ensure the ml-config index (#36792) (#36832)

This commit is contained in:
David Kyle 2018-12-19 13:43:43 +00:00 committed by GitHub
parent f2a5373495
commit d43cbdab97
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 305 additions and 70 deletions

View File

@ -10,6 +10,8 @@ package org.elasticsearch.xpack.core.ml.job.persistence;
*/
public final class AnomalyDetectorsIndex {
public static final int CONFIG_INDEX_MAX_RESULTS_WINDOW = 10_000;
private AnomalyDetectorsIndex() {
}

View File

@ -670,7 +670,9 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
// least possible burden on Elasticsearch
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1")
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), delayedNodeTimeOutSetting))
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), delayedNodeTimeOutSetting)
.put(IndexSettings.MAX_RESULT_WINDOW_SETTING.getKey(),
AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW))
.version(Version.CURRENT.id)
.putMapping(ElasticsearchMappings.DOC_TYPE, Strings.toString(configMapping))
.build();

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
@ -14,6 +15,7 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
/**
* Checks whether migration can start and whether ML resources (e.g. jobs, datafeeds)
@ -37,10 +39,12 @@ public class MlConfigMigrationEligibilityCheck {
this.isConfigMigrationEnabled = configMigrationEnabled;
}
/**
* Can migration start? Returns:
* False if config migration is disabled via the setting {@link #ENABLE_CONFIG_MIGRATION}
* False if the min node version of the cluster is before {@link #MIN_NODE_VERSION}
* False if the .ml-config index shards are not active
* True otherwise
* @param clusterState The cluster state
* @return A boolean that dictates if config migration can start
@ -54,12 +58,26 @@ public class MlConfigMigrationEligibilityCheck {
if (minNodeVersion.before(MIN_NODE_VERSION)) {
return false;
}
return mlConfigIndexIsAllocated(clusterState);
}
static boolean mlConfigIndexIsAllocated(ClusterState clusterState) {
if (clusterState.metaData().hasIndex(AnomalyDetectorsIndex.configIndexName()) == false) {
return false;
}
IndexRoutingTable routingTable = clusterState.getRoutingTable().index(AnomalyDetectorsIndex.configIndexName());
if (routingTable == null || routingTable.allPrimaryShardsActive() == false) {
return false;
}
return true;
}
/**
* Is the job a eligible for migration? Returns:
* False if {@link #canStartMigration(ClusterState)} returns {@code false}
* False if the job is not in the cluster state
* False if the {@link Job#isDeleting()}
* False if the job has a persistent task
* True otherwise i.e. the job is present, not deleting

View File

@ -11,6 +11,8 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
@ -21,6 +23,7 @@ import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
@ -29,6 +32,7 @@ import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
@ -126,19 +130,11 @@ public class MlConfigMigrator {
* @param listener The success listener
*/
public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener<Boolean> listener) {
if (migrationEligibilityCheck.canStartMigration(clusterState) == false) {
listener.onResponse(false);
return;
}
if (migrationInProgress.compareAndSet(false, true) == false) {
listener.onResponse(Boolean.FALSE);
return;
}
logger.debug("migrating ml configurations");
ActionListener<Boolean> unMarkMigrationInProgress = ActionListener.wrap(
response -> {
migrationInProgress.set(false);
@ -150,19 +146,34 @@ public class MlConfigMigrator {
}
);
snapshotMlMeta(MlMetadata.getMlMetadata(clusterState), ActionListener.wrap(
response -> {
// We have successfully snapshotted the ML configs so we don't need to try again
tookConfigSnapshot.set(true);
List<JobsAndDatafeeds> batches = splitInBatches(clusterState);
if (batches.isEmpty()) {
unMarkMigrationInProgress.onResponse(Boolean.FALSE);
return;
}
List<JobsAndDatafeeds> batches = splitInBatches(clusterState);
if (batches.isEmpty()) {
unMarkMigrationInProgress.onResponse(Boolean.FALSE);
return;
}
migrateBatches(batches, unMarkMigrationInProgress);
},
unMarkMigrationInProgress::onFailure
if (clusterState.metaData().hasIndex(AnomalyDetectorsIndex.configIndexName()) == false) {
createConfigIndex(ActionListener.wrap(
response -> {
unMarkMigrationInProgress.onResponse(Boolean.FALSE);
},
unMarkMigrationInProgress::onFailure
));
return;
}
if (migrationEligibilityCheck.canStartMigration(clusterState) == false) {
unMarkMigrationInProgress.onResponse(Boolean.FALSE);
return;
}
snapshotMlMeta(MlMetadata.getMlMetadata(clusterState), ActionListener.wrap(
response -> {
// We have successfully snapshotted the ML configs so we don't need to try again
tookConfigSnapshot.set(true);
migrateBatches(batches, unMarkMigrationInProgress);
},
unMarkMigrationInProgress::onFailure
));
}
@ -296,6 +307,7 @@ public class MlConfigMigrator {
private void addJobIndexRequests(Collection<Job> jobs, BulkRequestBuilder bulkRequestBuilder) {
ToXContent.Params params = new ToXContent.MapParams(JobConfigProvider.TO_XCONTENT_PARAMS);
for (Job job : jobs) {
logger.debug("adding job to migrate: " + job.getId());
bulkRequestBuilder.add(indexRequest(job, Job.documentId(job.getId()), params));
}
}
@ -303,6 +315,7 @@ public class MlConfigMigrator {
private void addDatafeedIndexRequests(Collection<DatafeedConfig> datafeedConfigs, BulkRequestBuilder bulkRequestBuilder) {
ToXContent.Params params = new ToXContent.MapParams(DatafeedConfigProvider.TO_XCONTENT_PARAMS);
for (DatafeedConfig datafeedConfig : datafeedConfigs) {
logger.debug("adding datafeed to migrate: " + datafeedConfig.getId());
bulkRequestBuilder.add(indexRequest(datafeedConfig, DatafeedConfig.documentId(datafeedConfig.getId()), params));
}
}
@ -318,7 +331,6 @@ public class MlConfigMigrator {
return indexRequest;
}
// public for testing
public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener<Boolean> listener) {
@ -361,6 +373,30 @@ public class MlConfigMigrator {
);
}
private void createConfigIndex(ActionListener<Boolean> listener) {
logger.info("creating the .ml-config index");
CreateIndexRequest createIndexRequest = new CreateIndexRequest(AnomalyDetectorsIndex.configIndexName());
try
{
createIndexRequest.settings(
Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1")
.put(IndexSettings.MAX_RESULT_WINDOW_SETTING.getKey(), AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW)
);
createIndexRequest.mapping(ElasticsearchMappings.DOC_TYPE, ElasticsearchMappings.configMapping());
} catch (Exception e) {
logger.error("error writing the .ml-config mappings", e);
listener.onFailure(e);
return;
}
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, createIndexRequest,
ActionListener.<CreateIndexResponse>wrap(
r -> listener.onResponse(r.isAcknowledged()),
listener::onFailure
), client.admin().indices()::create);
}
public static Job updateJobForMigration(Job job) {
Job.Builder builder = new Job.Builder(job);

View File

@ -73,6 +73,15 @@ import java.util.stream.Collectors;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
/**
* This class implements CRUD operation for the
* datafeed configuration document
*
* The number of datafeeds returned in a search it limited to
* {@link AnomalyDetectorsIndex#CONFIG_INDEX_MAX_RESULTS_WINDOW}.
* In most cases we expect 10s or 100s of datafeeds to be defined and
* a search for all datafeeds should return all.
*/
public class DatafeedConfigProvider {
private static final Logger logger = LogManager.getLogger(DatafeedConfigProvider.class);
@ -87,13 +96,6 @@ public class DatafeedConfigProvider {
TO_XCONTENT_PARAMS = Collections.unmodifiableMap(modifiable);
}
/**
* In most cases we expect 10s or 100s of datafeeds to be defined and
* a search for all datafeeds should return all.
* TODO this is a temporary fix
*/
public int searchSize = 1000;
public DatafeedConfigProvider(Client client, NamedXContentRegistry xContentRegistry) {
this.client = client;
this.xContentRegistry = xContentRegistry;
@ -368,7 +370,9 @@ public class DatafeedConfigProvider {
SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName())
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setSource(sourceBuilder).request();
.setSource(sourceBuilder)
.setSize(AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW)
.request();
ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(tokens, allowNoDatafeeds);
@ -407,7 +411,6 @@ public class DatafeedConfigProvider {
* wildcard then setting this true will not suppress the exception
* @param listener The expanded datafeed config listener
*/
// NORELEASE datafeed configs should be paged or have a mechanism to return all jobs if there are many of them
public void expandDatafeedConfigs(String expression, boolean allowNoDatafeeds, ActionListener<List<DatafeedConfig.Builder>> listener) {
String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildDatafeedIdQuery(tokens));
@ -416,7 +419,7 @@ public class DatafeedConfigProvider {
SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName())
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setSource(sourceBuilder)
.setSize(searchSize)
.setSize(AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW)
.request();
ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(tokens, allowNoDatafeeds);

View File

@ -89,6 +89,11 @@ import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
/**
* This class implements CRUD operation for the
* anomaly detector job configuration document
*
* The number of jobs returned in a search it limited to
* {@link AnomalyDetectorsIndex#CONFIG_INDEX_MAX_RESULTS_WINDOW}.
* In most cases we expect 10s or 100s of jobs to be defined and
* a search for all jobs should return all.
*/
public class JobConfigProvider {
@ -101,13 +106,6 @@ public class JobConfigProvider {
TO_XCONTENT_PARAMS = Collections.unmodifiableMap(modifiable);
}
/**
* In most cases we expect 10s or 100s of jobs to be defined and
* a search for all jobs should return all.
* TODO this is a temporary fix
*/
private int searchSize = 1000;
private final Client client;
public JobConfigProvider(Client client) {
@ -565,7 +563,7 @@ public class JobConfigProvider {
SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName())
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setSource(sourceBuilder)
.setSize(searchSize)
.setSize(AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW)
.request();
ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(tokens, allowNoJobs);
@ -599,6 +597,21 @@ public class JobConfigProvider {
}
private SearchRequest makeExpandIdsSearchRequest(String expression, boolean excludeDeleting) {
String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens, excludeDeleting));
sourceBuilder.sort(Job.ID.getPreferredName());
sourceBuilder.fetchSource(false);
sourceBuilder.docValueField(Job.ID.getPreferredName(), DocValueFieldsContext.USE_DEFAULT_FORMAT);
sourceBuilder.docValueField(Job.GROUPS.getPreferredName(), DocValueFieldsContext.USE_DEFAULT_FORMAT);
return client.prepareSearch(AnomalyDetectorsIndex.configIndexName())
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setSource(sourceBuilder)
.setSize(AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW)
.request();
}
/**
* The same logic as {@link #expandJobsIds(String, boolean, boolean, ActionListener)} but
* the full anomaly detector job configuration is returned.
@ -612,7 +625,6 @@ public class JobConfigProvider {
* @param excludeDeleting If true exclude jobs marked as deleting
* @param listener The expanded jobs listener
*/
// NORELEASE jobs should be paged or have a mechanism to return all jobs if there are many of them
public void expandJobs(String expression, boolean allowNoJobs, boolean excludeDeleting, ActionListener<List<Job.Builder>> listener) {
String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens, excludeDeleting));
@ -621,7 +633,7 @@ public class JobConfigProvider {
SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName())
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setSource(sourceBuilder)
.setSize(searchSize)
.setSize(AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW)
.request();
ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(tokens, allowNoJobs);
@ -679,7 +691,7 @@ public class JobConfigProvider {
SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName())
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setSource(sourceBuilder)
.setSize(searchSize)
.setSize(AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW)
.request();
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest,
@ -741,7 +753,7 @@ public class JobConfigProvider {
SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName())
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setSource(sourceBuilder)
.setSize(searchSize)
.setSize(AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW)
.request();
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest,

View File

@ -45,13 +45,12 @@ import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.TransportOpenJobAction.JobTask;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder;
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.process.NativeStorageProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
@ -62,6 +61,7 @@ import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer;
import org.elasticsearch.xpack.ml.job.process.normalizer.ScoresUpdater;
import org.elasticsearch.xpack.ml.job.process.normalizer.ShortCircuitingRenormalizer;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.process.NativeStorageProvider;
import java.io.IOException;
import java.io.InputStream;
@ -408,7 +408,6 @@ public class AutodetectProcessManager {
logger.info("Opening job [{}]", jobId);
jobManager.getJob(jobId, ActionListener.wrap(
// NORELEASE JIndex. Should not be doing this work on the network thread
job -> {
if (job.getJobVersion() == null) {
closeHandler.accept(ExceptionsHelper.badRequestException("Cannot open job [" + jobId

View File

@ -8,13 +8,22 @@ package org.elasticsearch.xpack.ml;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.MlMetadata;
@ -24,6 +33,7 @@ import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobTests;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.junit.Before;
import java.net.InetAddress;
@ -53,12 +63,18 @@ public class MlConfigMigrationEligibilityCheckTests extends ESTestCase {
}
public void testCanStartMigration_givenNodesNotUpToVersion() {
MetaData.Builder metaData = MetaData.builder();
RoutingTable.Builder routingTable = RoutingTable.builder();
addMlConfigIndex(metaData, routingTable);
// mixed 6.5 and 6.6 nodes
ClusterState clusterState = ClusterState.builder(new ClusterName("_name"))
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.V_6_5_0))
.add(new DiscoveryNode("node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), Version.V_6_6_0)))
.build();
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.V_6_5_0))
.add(new DiscoveryNode("node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), Version.V_6_6_0)))
.routingTable(routingTable.build())
.metaData(metaData)
.build();
Settings settings = newSettings(true);
givenClusterSettings(settings);
@ -69,12 +85,18 @@ public class MlConfigMigrationEligibilityCheckTests extends ESTestCase {
}
public void testCanStartMigration_givenNodesNotUpToVersionAndMigrationIsEnabled() {
MetaData.Builder metaData = MetaData.builder();
RoutingTable.Builder routingTable = RoutingTable.builder();
addMlConfigIndex(metaData, routingTable);
// mixed 6.5 and 6.6 nodes
ClusterState clusterState = ClusterState.builder(new ClusterName("_name"))
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.V_6_6_0))
.add(new DiscoveryNode("node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), Version.V_6_6_0)))
.build();
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.V_6_6_0))
.add(new DiscoveryNode("node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), Version.V_6_6_0)))
.routingTable(routingTable.build())
.metaData(metaData)
.build();
Settings settings = newSettings(true);
givenClusterSettings(settings);
@ -84,6 +106,52 @@ public class MlConfigMigrationEligibilityCheckTests extends ESTestCase {
assertTrue(check.canStartMigration(clusterState));
}
public void testCanStartMigration_givenMissingIndex() {
Settings settings = newSettings(true);
givenClusterSettings(settings);
ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests"))
.build();
MlConfigMigrationEligibilityCheck check = new MlConfigMigrationEligibilityCheck(settings, clusterService);
assertFalse(check.canStartMigration(clusterState));
}
public void testCanStartMigration_givenInactiveShards() {
Settings settings = newSettings(true);
givenClusterSettings(settings);
// index is present but no routing
MetaData.Builder metaData = MetaData.builder();
RoutingTable.Builder routingTable = RoutingTable.builder();
addMlConfigIndex(metaData, routingTable);
ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests"))
.metaData(metaData)
.build();
MlConfigMigrationEligibilityCheck check = new MlConfigMigrationEligibilityCheck(settings, clusterService);
assertFalse(check.canStartMigration(clusterState));
}
private void addMlConfigIndex(MetaData.Builder metaData, RoutingTable.Builder routingTable) {
IndexMetaData.Builder indexMetaData = IndexMetaData.builder(AnomalyDetectorsIndex.configIndexName());
indexMetaData.settings(Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
);
metaData.put(indexMetaData);
Index index = new Index(AnomalyDetectorsIndex.configIndexName(), "_uuid");
ShardId shardId = new ShardId(index, 0);
ShardRouting shardRouting = ShardRouting.newUnassigned(shardId, true, RecoverySource.EmptyStoreRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, ""));
shardRouting = shardRouting.initialize("node_id", null, 0L);
shardRouting = shardRouting.moveToStarted();
routingTable.add(IndexRoutingTable.builder(index)
.addIndexShard(new IndexShardRoutingTable.Builder(shardId).addShard(shardRouting).build()));
}
public void testJobIsEligibleForMigration_givenNodesNotUpToVersion() {
// mixed 6.5 and 6.6 nodes
ClusterState clusterState = ClusterState.builder(new ClusterName("_name"))
@ -185,11 +253,14 @@ public class MlConfigMigrationEligibilityCheckTests extends ESTestCase {
Job closedJob = JobTests.buildJobBuilder("closed-job").build();
MlMetadata.Builder mlMetadata = new MlMetadata.Builder().putJob(closedJob, false);
MetaData.Builder metaData = MetaData.builder();
RoutingTable.Builder routingTable = RoutingTable.builder();
addMlConfigIndex(metaData, routingTable);
ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests"))
.metaData(MetaData.builder()
.putCustom(MlMetadata.TYPE, mlMetadata.build())
)
.build();
.metaData(metaData.putCustom(MlMetadata.TYPE, mlMetadata.build()))
.routingTable(routingTable.build())
.build();
Settings settings = newSettings(true);
givenClusterSettings(settings);
@ -283,11 +354,14 @@ public class MlConfigMigrationEligibilityCheckTests extends ESTestCase {
mlMetadata.putDatafeed(createCompatibleDatafeed(job.getId()), Collections.emptyMap());
String datafeedId = "df-" + job.getId();
MetaData.Builder metaData = MetaData.builder();
RoutingTable.Builder routingTable = RoutingTable.builder();
addMlConfigIndex(metaData, routingTable);
ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests"))
.metaData(MetaData.builder()
.putCustom(MlMetadata.TYPE, mlMetadata.build())
)
.build();
.metaData(metaData.putCustom(MlMetadata.TYPE, mlMetadata.build()))
.routingTable(routingTable.build())
.build();
Settings settings = newSettings(true);
givenClusterSettings(settings);

View File

@ -5,12 +5,20 @@
*/
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.Version;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
@ -19,6 +27,8 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.job.config.Job;
@ -117,9 +127,12 @@ public class MlConfigMigratorIT extends MlSingleNodeTestCase {
builder.setIndices(Collections.singletonList("beats*"));
mlMetadata.putDatafeed(builder.build(), Collections.emptyMap());
MetaData.Builder metaData = MetaData.builder();
RoutingTable.Builder routingTable = RoutingTable.builder();
addMlConfigIndex(metaData, routingTable);
ClusterState clusterState = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder()
.putCustom(MlMetadata.TYPE, mlMetadata.build()))
.metaData(metaData.putCustom(MlMetadata.TYPE, mlMetadata.build()))
.routingTable(routingTable.build())
.build();
doAnswer(invocation -> {
@ -180,10 +193,13 @@ public class MlConfigMigratorIT extends MlSingleNodeTestCase {
mlMetadata.putDatafeed(builder.build(), Collections.emptyMap());
}
MetaData.Builder metaData = MetaData.builder();
RoutingTable.Builder routingTable = RoutingTable.builder();
addMlConfigIndex(metaData, routingTable);
ClusterState clusterState = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder()
.putCustom(MlMetadata.TYPE, mlMetadata.build()))
.build();
.metaData(metaData.putCustom(MlMetadata.TYPE, mlMetadata.build()))
.routingTable(routingTable.build())
.build();
doAnswer(invocation -> {
ClusterStateUpdateTask listener = (ClusterStateUpdateTask) invocation.getArguments()[1];
@ -304,6 +320,49 @@ public class MlConfigMigratorIT extends MlSingleNodeTestCase {
assertEquals(expectedMlMetadata, recoveredMeta);
}
}
private void addMlConfigIndex(MetaData.Builder metaData, RoutingTable.Builder routingTable) {
IndexMetaData.Builder indexMetaData = IndexMetaData.builder(AnomalyDetectorsIndex.configIndexName());
indexMetaData.settings(Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
);
metaData.put(indexMetaData);
Index index = new Index(AnomalyDetectorsIndex.configIndexName(), "_uuid");
ShardId shardId = new ShardId(index, 0);
ShardRouting shardRouting = ShardRouting.newUnassigned(shardId, true, RecoverySource.EmptyStoreRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, ""));
shardRouting = shardRouting.initialize("node_id", null, 0L);
shardRouting = shardRouting.moveToStarted();
routingTable.add(IndexRoutingTable.builder(index)
.addIndexShard(new IndexShardRoutingTable.Builder(shardId).addShard(shardRouting).build()));
}
public void testConfigIndexIsCreated() throws Exception {
// and jobs and datafeeds clusterstate
MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
mlMetadata.putJob(buildJobBuilder("job-foo").build(), false);
ClusterState clusterState = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(MlMetadata.TYPE, mlMetadata.build()))
.build();
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
AtomicReference<Boolean> responseHolder = new AtomicReference<>();
MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService);
// if the cluster state has a job config and the index does not
// exist it should be created
blockingCall(actionListener -> mlConfigMigrator.migrateConfigsWithoutTasks(clusterState, actionListener),
responseHolder, exceptionHolder);
assertBusy(() -> assertTrue(configIndexExists()));
}
private boolean configIndexExists() {
return client().admin().indices().prepareExists(AnomalyDetectorsIndex.configIndexName()).get().isExists();
}
}

View File

@ -8,13 +8,21 @@ package org.elasticsearch.xpack.ml.job;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.document.DocumentField;
@ -25,7 +33,9 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
@ -448,9 +458,29 @@ public class JobManagerTests extends ESTestCase {
public void testUpdateJob_notAllowedPreMigration() {
MlMetadata.Builder mlmetadata = new MlMetadata.Builder().putJob(buildJobBuilder("closed-job-not-migrated").build(), false);
MetaData.Builder metaData = MetaData.builder();
RoutingTable.Builder routingTable = RoutingTable.builder();
IndexMetaData.Builder indexMetaData = IndexMetaData.builder(AnomalyDetectorsIndex.configIndexName());
indexMetaData.settings(Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
);
metaData.put(indexMetaData);
Index index = new Index(AnomalyDetectorsIndex.configIndexName(), "_uuid");
ShardId shardId = new ShardId(index, 0);
ShardRouting shardRouting = ShardRouting.newUnassigned(shardId, true, RecoverySource.EmptyStoreRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, ""));
shardRouting = shardRouting.initialize("node_id", null, 0L);
shardRouting = shardRouting.moveToStarted();
routingTable.add(IndexRoutingTable.builder(index)
.addIndexShard(new IndexShardRoutingTable.Builder(shardId).addShard(shardRouting).build()));
ClusterState clusterState = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder()
.putCustom(MlMetadata.TYPE, mlmetadata.build()))
.metaData(metaData.putCustom(MlMetadata.TYPE, mlmetadata.build()))
.routingTable(routingTable.build())
.build();
when(clusterService.state()).thenReturn(clusterState);