diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java index 82f26ca88bc..3dab221dd88 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java @@ -15,8 +15,6 @@ import org.elasticsearch.xpack.core.ml.utils.MlIndexAndAlias; import org.elasticsearch.xpack.core.template.TemplateUtils; import java.util.Collections; -import java.util.Comparator; -import java.util.regex.Pattern; /** * Methods for handling index naming related functions @@ -28,31 +26,7 @@ public final class AnomalyDetectorsIndex { private static final String RESULTS_MAPPINGS_VERSION_VARIABLE = "xpack.ml.version"; private static final String RESOURCE_PATH = "/org/elasticsearch/xpack/core/ml/anomalydetection/"; - // Visible for testing - static final Comparator STATE_INDEX_NAME_COMPARATOR = new Comparator() { - - private final Pattern HAS_SIX_DIGIT_SUFFIX = Pattern.compile("\\d{6}"); - - @Override - public int compare(String index1, String index2) { - String[] index1Parts = index1.split("-"); - String index1Suffix = index1Parts[index1Parts.length - 1]; - boolean index1HasSixDigitsSuffix = HAS_SIX_DIGIT_SUFFIX.matcher(index1Suffix).matches(); - String[] index2Parts = index2.split("-"); - String index2Suffix = index2Parts[index2Parts.length - 1]; - boolean index2HasSixDigitsSuffix = HAS_SIX_DIGIT_SUFFIX.matcher(index2Suffix).matches(); - if (index1HasSixDigitsSuffix && index2HasSixDigitsSuffix) { - return index1Suffix.compareTo(index2Suffix); - } else if (index1HasSixDigitsSuffix != index2HasSixDigitsSuffix) { - return Boolean.compare(index1HasSixDigitsSuffix, index2HasSixDigitsSuffix); - } else { - return index1.compareTo(index2); - } - } - }; - - private AnomalyDetectorsIndex() { - } + private AnomalyDetectorsIndex() {} public static String jobResultsIndexPrefix() { return AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX; @@ -109,8 +83,13 @@ public final class AnomalyDetectorsIndex { */ public static void createStateIndexAndAliasIfNecessary(Client client, ClusterState state, IndexNameExpressionResolver resolver, final ActionListener finalListener) { - MlIndexAndAlias.createIndexAndAliasIfNecessary(client, state, resolver, - AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX, AnomalyDetectorsIndex.jobStateIndexWriteAlias(), finalListener); + MlIndexAndAlias.createIndexAndAliasIfNecessary( + client, + state, + resolver, + AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX, + AnomalyDetectorsIndex.jobStateIndexWriteAlias(), + finalListener); } public static String resultsMapping() { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java index ebf4fbd26f0..3678d1a6b97 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java @@ -5,20 +5,27 @@ */ package org.elasticsearch.xpack.core.ml.utils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; +import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.common.Nullable; import java.util.Arrays; import java.util.Comparator; +import java.util.Optional; import java.util.regex.Pattern; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; @@ -29,6 +36,8 @@ import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; */ public final class MlIndexAndAlias { + private static final Logger logger = LogManager.getLogger(MlIndexAndAlias.class); + // Visible for testing static final Comparator INDEX_NAME_COMPARATOR = new Comparator() { @@ -60,60 +69,114 @@ public final class MlIndexAndAlias { * or to the index with the highest suffix if the index did not have to be created. * The listener is notified with a {@code boolean} that informs whether the index or the alias were created. */ - public static void createIndexAndAliasIfNecessary(Client client, ClusterState clusterState, IndexNameExpressionResolver resolver, - String indexPatternPrefix, String alias, ActionListener listener) { - if (clusterState.getMetaData().getAliasAndIndexLookup().containsKey(alias)) { - listener.onResponse(false); - return; - } + public static void createIndexAndAliasIfNecessary(Client client, + ClusterState clusterState, + IndexNameExpressionResolver resolver, + String indexPatternPrefix, + String alias, + ActionListener listener) { - final ActionListener createAliasListener = ActionListener.wrap( - concreteIndexName -> { - final IndicesAliasesRequest request = client.admin() - .indices() - .prepareAliases() - .addAlias(concreteIndexName, alias) - .request(); - executeAsyncWithOrigin(client.threadPool().getThreadContext(), - ML_ORIGIN, - request, - ActionListener.wrap( - resp -> listener.onResponse(resp.isAcknowledged()), - listener::onFailure), - client.admin().indices()::aliases); - }, - listener::onFailure - ); + String legacyIndexWithoutSuffix = indexPatternPrefix; + String indexPattern = indexPatternPrefix + "*"; + // The initial index name must be suitable for rollover functionality. + String firstConcreteIndex = indexPatternPrefix + "-000001"; + String[] concreteIndexNames = + resolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), indexPattern); + Optional indexPointedByCurrentWriteAlias = clusterState.getMetaData().hasAlias(alias) + ? clusterState.getMetaData().getAliasAndIndexLookup().get(alias).getIndices().stream().findFirst() + : Optional.empty(); - String[] stateIndices = resolver.concreteIndexNames(clusterState, - IndicesOptions.lenientExpandOpen(), indexPatternPrefix + "*"); - if (stateIndices.length > 0) { - String latestStateIndex = Arrays.stream(stateIndices).max(INDEX_NAME_COMPARATOR).get(); - createAliasListener.onResponse(latestStateIndex); + if (concreteIndexNames.length == 0) { + if (indexPointedByCurrentWriteAlias.isPresent() == false) { + createFirstConcreteIndex(client, firstConcreteIndex, alias, true, listener); + return; + } + logger.error( + "There are no indices matching '{}' pattern but '{}' alias points at [{}]. This should never happen.", + indexPattern, alias, indexPointedByCurrentWriteAlias.get()); + } else if (concreteIndexNames.length == 1 && concreteIndexNames[0].equals(legacyIndexWithoutSuffix)) { + if (indexPointedByCurrentWriteAlias.isPresent() == false) { + createFirstConcreteIndex(client, firstConcreteIndex, alias, true, listener); + return; + } + if (indexPointedByCurrentWriteAlias.get().getIndex().getName().equals(legacyIndexWithoutSuffix)) { + createFirstConcreteIndex( + client, + firstConcreteIndex, + alias, + false, + ActionListener.wrap( + unused -> updateWriteAlias(client, alias, legacyIndexWithoutSuffix, firstConcreteIndex, listener), + listener::onFailure) + ); + return; + } + logger.error( + "There is exactly one index (i.e. '{}') matching '{}' pattern but '{}' alias points at [{}]. This should never happen.", + legacyIndexWithoutSuffix, indexPattern, alias, indexPointedByCurrentWriteAlias.get()); } else { - // The initial index name must be suitable for rollover functionality. - String initialJobStateIndex = indexPatternPrefix + "-000001"; - CreateIndexRequest createIndexRequest = client.admin() - .indices() - .prepareCreate(initialJobStateIndex) - .addAlias(new Alias(alias)) - .request(); - executeAsyncWithOrigin(client.threadPool().getThreadContext(), - ML_ORIGIN, - createIndexRequest, - ActionListener.wrap( - createIndexResponse -> listener.onResponse(true), - createIndexFailure -> { - // If it was created between our last check, and this request being handled, we should add the alias - // Adding an alias that already exists is idempotent. So, no need to double check if the alias exists - // as well. - if (ExceptionsHelper.unwrapCause(createIndexFailure) instanceof ResourceAlreadyExistsException) { - createAliasListener.onResponse(initialJobStateIndex); - } else { - listener.onFailure(createIndexFailure); - } - }), - client.admin().indices()::create); + if (indexPointedByCurrentWriteAlias.isPresent() == false) { + assert concreteIndexNames.length > 0; + String latestConcreteIndexName = Arrays.stream(concreteIndexNames).max(INDEX_NAME_COMPARATOR).get(); + updateWriteAlias(client, alias, null, latestConcreteIndexName, listener); + return; + } } + // If the alias is set, there is nothing more to do. + listener.onResponse(false); + } + + private static void createFirstConcreteIndex(Client client, + String index, + String alias, + boolean addAlias, + ActionListener listener) { + CreateIndexRequestBuilder requestBuilder = client.admin() + .indices() + .prepareCreate(index); + if (addAlias) { + requestBuilder.addAlias(new Alias(alias)); + } + CreateIndexRequest request = requestBuilder.request(); + + executeAsyncWithOrigin(client.threadPool().getThreadContext(), + ML_ORIGIN, + request, + ActionListener.wrap( + createIndexResponse -> listener.onResponse(true), + createIndexFailure -> { + // If it was created between our last check, and this request being handled, we should add the alias + // Adding an alias that already exists is idempotent. So, no need to double check if the alias exists + // as well. + if (ExceptionsHelper.unwrapCause(createIndexFailure) instanceof ResourceAlreadyExistsException) { + updateWriteAlias(client, alias, null, index, listener); + } else { + listener.onFailure(createIndexFailure); + } + }), + client.admin().indices()::create); + } + + private static void updateWriteAlias(Client client, + String alias, + @Nullable String currentIndex, + String newIndex, + ActionListener listener) { + IndicesAliasesRequestBuilder requestBuilder = client.admin() + .indices() + .prepareAliases() + .addAlias(newIndex, alias); + if (currentIndex != null) { + requestBuilder.removeAlias(currentIndex, alias); + } + IndicesAliasesRequest request = requestBuilder.request(); + + executeAsyncWithOrigin(client.threadPool().getThreadContext(), + ML_ORIGIN, + request, + ActionListener.wrap( + resp -> listener.onResponse(resp.isAcknowledged()), + listener::onFailure), + client.admin().indices()::aliases); } } diff --git a/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/anomalydetection/state_index_ilm_policy.json b/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/anomalydetection/state_index_ilm_policy.json new file mode 100644 index 00000000000..09a8db4e1ee --- /dev/null +++ b/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/anomalydetection/state_index_ilm_policy.json @@ -0,0 +1,11 @@ +{ + "phases": { + "hot": { + "actions": { + "rollover": { + "max_size": "50GB" + } + } + } + } +} diff --git a/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/anomalydetection/state_index_template.json b/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/anomalydetection/state_index_template.json index fb0b7ad6c2b..e3cd7f51dc4 100644 --- a/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/anomalydetection/state_index_template.json +++ b/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/anomalydetection/state_index_template.json @@ -9,6 +9,7 @@ "auto_expand_replicas" : "0-1", "hidden": true } + ${xpack.ml.index.lifecycle.settings} }, "mappings" : { "_doc": { @@ -18,5 +19,5 @@ "enabled": false } }, - "aliases" : { } + "aliases" : {} } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java index 80a55394a40..850d197aa6d 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java @@ -47,6 +47,7 @@ import java.util.stream.Stream; import static java.util.stream.Collectors.toMap; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; @@ -67,7 +68,7 @@ public class MlIndexAndAliasTests extends ESTestCase { private IndicesAdminClient indicesAdminClient; private AdminClient adminClient; private Client client; - private ActionListener finalListener; + private ActionListener listener; private ArgumentCaptor createRequestCaptor; private ArgumentCaptor aliasesRequestCaptor; @@ -91,7 +92,7 @@ public class MlIndexAndAliasTests extends ESTestCase { when(client.threadPool()).thenReturn(threadPool); when(client.admin()).thenReturn(adminClient); - finalListener = mock(ActionListener.class); + listener = mock(ActionListener.class); createRequestCaptor = ArgumentCaptor.forClass(CreateIndexRequest.class); aliasesRequestCaptor = ArgumentCaptor.forClass(IndicesAliasesRequest.class); @@ -99,17 +100,17 @@ public class MlIndexAndAliasTests extends ESTestCase { @After public void verifyNoMoreInteractionsWithMocks() { - verifyNoMoreInteractions(indicesAdminClient, finalListener); + verifyNoMoreInteractions(indicesAdminClient, listener); } public void testCreateStateIndexAndAliasIfNecessary_CleanState() { ClusterState clusterState = createClusterState(Collections.emptyMap()); createIndexAndAliasIfNecessary(clusterState); - InOrder inOrder = inOrder(indicesAdminClient, finalListener); + InOrder inOrder = inOrder(indicesAdminClient, listener); inOrder.verify(indicesAdminClient).prepareCreate(FIRST_CONCRETE_INDEX); inOrder.verify(indicesAdminClient).create(createRequestCaptor.capture(), any()); - inOrder.verify(finalListener).onResponse(true); + inOrder.verify(listener).onResponse(true); CreateIndexRequest createRequest = createRequestCaptor.getValue(); assertThat(createRequest.index(), equalTo(FIRST_CONCRETE_INDEX)); @@ -120,11 +121,7 @@ public class MlIndexAndAliasTests extends ESTestCase { ClusterState clusterState = createClusterState(Collections.singletonMap(indexName, createIndexMetaDataWithAlias(indexName))); createIndexAndAliasIfNecessary(clusterState); - verify(finalListener).onResponse(false); - } - - public void testCreateStateIndexAndAliasIfNecessary_WriteAliasAlreadyExistsAndPointsAtLegacyIndex() { - assertNoClientInteractionsWhenWriteAliasAlreadyExists(LEGACY_INDEX_WITHOUT_SUFFIX); + verify(listener).onResponse(false); } public void testCreateStateIndexAndAliasIfNecessary_WriteAliasAlreadyExistsAndPointsAtInitialStateIndex() { @@ -132,23 +129,48 @@ public class MlIndexAndAliasTests extends ESTestCase { } public void testCreateStateIndexAndAliasIfNecessary_WriteAliasAlreadyExistsAndPointsAtSubsequentStateIndex() { - assertNoClientInteractionsWhenWriteAliasAlreadyExists(".ml-state-000007"); + assertNoClientInteractionsWhenWriteAliasAlreadyExists("test-000007"); } public void testCreateStateIndexAndAliasIfNecessary_WriteAliasAlreadyExistsAndPointsAtDummyIndex() { assertNoClientInteractionsWhenWriteAliasAlreadyExists("dummy-index"); } + public void testCreateStateIndexAndAliasIfNecessary_WriteAliasAlreadyExistsAndPointsAtLegacyStateIndex() { + ClusterState clusterState = + createClusterState( + Collections.singletonMap(LEGACY_INDEX_WITHOUT_SUFFIX, createIndexMetaDataWithAlias(LEGACY_INDEX_WITHOUT_SUFFIX))); + createIndexAndAliasIfNecessary(clusterState); + + InOrder inOrder = inOrder(indicesAdminClient, listener); + inOrder.verify(indicesAdminClient).prepareCreate(FIRST_CONCRETE_INDEX); + inOrder.verify(indicesAdminClient).create(createRequestCaptor.capture(), any()); + inOrder.verify(indicesAdminClient).prepareAliases(); + inOrder.verify(indicesAdminClient).aliases(aliasesRequestCaptor.capture(), any()); + inOrder.verify(listener).onResponse(true); + + CreateIndexRequest createRequest = createRequestCaptor.getValue(); + assertThat(createRequest.index(), equalTo(FIRST_CONCRETE_INDEX)); + assertThat(createRequest.aliases(), empty()); + + IndicesAliasesRequest indicesAliasesRequest = aliasesRequestCaptor.getValue(); + assertThat( + indicesAliasesRequest.getAliasActions(), + contains( + AliasActions.add().alias(TEST_INDEX_ALIAS).index(FIRST_CONCRETE_INDEX), + AliasActions.remove().alias(TEST_INDEX_ALIAS).index(LEGACY_INDEX_WITHOUT_SUFFIX))); + } + private void assertMlStateWriteAliasAddedToMostRecentMlStateIndex(List existingIndexNames, String expectedWriteIndexName) { ClusterState clusterState = createClusterState( existingIndexNames.stream().collect(toMap(Function.identity(), MlIndexAndAliasTests::createIndexMetaData))); createIndexAndAliasIfNecessary(clusterState); - InOrder inOrder = inOrder(indicesAdminClient, finalListener); + InOrder inOrder = inOrder(indicesAdminClient, listener); inOrder.verify(indicesAdminClient).prepareAliases(); inOrder.verify(indicesAdminClient).aliases(aliasesRequestCaptor.capture(), any()); - inOrder.verify(finalListener).onResponse(true); + inOrder.verify(listener).onResponse(true); IndicesAliasesRequest indicesAliasesRequest = aliasesRequestCaptor.getValue(); assertThat( @@ -156,11 +178,6 @@ public class MlIndexAndAliasTests extends ESTestCase { contains(AliasActions.add().alias(TEST_INDEX_ALIAS).index(expectedWriteIndexName))); } - public void testCreateStateIndexAndAliasIfNecessary_WriteAliasDoesNotExistButLegacyIndexExists() { - assertMlStateWriteAliasAddedToMostRecentMlStateIndex( - Arrays.asList(LEGACY_INDEX_WITHOUT_SUFFIX), LEGACY_INDEX_WITHOUT_SUFFIX); - } - public void testCreateStateIndexAndAliasIfNecessary_WriteAliasDoesNotExistButInitialStateIndexExists() { assertMlStateWriteAliasAddedToMostRecentMlStateIndex( Arrays.asList(FIRST_CONCRETE_INDEX), FIRST_CONCRETE_INDEX); @@ -176,6 +193,21 @@ public class MlIndexAndAliasTests extends ESTestCase { Arrays.asList(LEGACY_INDEX_WITHOUT_SUFFIX, "test-000003", "test-000040", "test-000500"), "test-000500"); } + public void testCreateStateIndexAndAliasIfNecessary_WriteAliasDoesNotExistButLegacyStateIndexExists() { + ClusterState clusterState = + createClusterState(Collections.singletonMap(LEGACY_INDEX_WITHOUT_SUFFIX, createIndexMetaData(LEGACY_INDEX_WITHOUT_SUFFIX))); + createIndexAndAliasIfNecessary(clusterState); + + InOrder inOrder = inOrder(indicesAdminClient, listener); + inOrder.verify(indicesAdminClient).prepareCreate(FIRST_CONCRETE_INDEX); + inOrder.verify(indicesAdminClient).create(createRequestCaptor.capture(), any()); + inOrder.verify(listener).onResponse(true); + + CreateIndexRequest createRequest = createRequestCaptor.getValue(); + assertThat(createRequest.index(), equalTo(FIRST_CONCRETE_INDEX)); + assertThat(createRequest.aliases(), equalTo(Collections.singleton(new Alias(TEST_INDEX_ALIAS)))); + } + public void testIndexNameComparator() { Comparator comparator = MlIndexAndAlias.INDEX_NAME_COMPARATOR; assertThat( @@ -202,8 +234,8 @@ public class MlIndexAndAliasTests extends ESTestCase { } private void createIndexAndAliasIfNecessary(ClusterState clusterState) { - MlIndexAndAlias.createIndexAndAliasIfNecessary(client, clusterState, new IndexNameExpressionResolver(), - TEST_INDEX_PREFIX, TEST_INDEX_ALIAS, finalListener); + MlIndexAndAlias.createIndexAndAliasIfNecessary( + client, clusterState, new IndexNameExpressionResolver(), TEST_INDEX_PREFIX, TEST_INDEX_ALIAS, listener); } @SuppressWarnings("unchecked") diff --git a/x-pack/plugin/ml/build.gradle b/x-pack/plugin/ml/build.gradle index bd3bb8d1b1e..8854b2f5204 100644 --- a/x-pack/plugin/ml/build.gradle +++ b/x-pack/plugin/ml/build.gradle @@ -50,6 +50,7 @@ dependencies { compileOnly project(':modules:lang-painless:spi') compileOnly project(path: xpackModule('core'), configuration: 'default') testCompile project(path: xpackModule('core'), configuration: 'testArtifacts') + testCompile project(path: xpackModule('ilm'), configuration: 'default') // This should not be here testCompile project(path: xpackModule('security'), configuration: 'testArtifacts') diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java index 1bffd5db52e..5ce22d4604d 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java @@ -27,6 +27,7 @@ import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.Detector; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.results.Bucket; import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats; @@ -94,12 +95,14 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase { @AwaitsFix(bugUrl = "https://github.com/elastic/ml-cpp/pulls/468") public void testDeleteExpiredData() throws Exception { // Index some unused state documents (more than 10K to test scrolling works) - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); - bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + String mlStateIndexName = AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000001"; + BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); for (int i = 0; i < 10010; i++) { String docId = "non_existing_job_" + randomFrom("model_state_1234567#" + i, "quantiles", "categorizer_state#" + i); - IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias()).id(docId); - indexRequest.source(Collections.emptyMap()); + IndexRequest indexRequest = + new IndexRequest(mlStateIndexName) + .id(docId) + .source(Collections.emptyMap()); bulkRequestBuilder.add(indexRequest); } ActionFuture indexUnusedStateDocsResponse = bulkRequestBuilder.execute(); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java index 127aee53a62..dffb4ca073e 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java @@ -28,6 +28,7 @@ import org.elasticsearch.transport.Netty4Plugin; import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; import org.elasticsearch.xpack.core.XPackClientPlugin; import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.ilm.LifecycleSettings; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction; @@ -39,6 +40,7 @@ import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState; import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.xpack.core.security.SecurityField; import org.elasticsearch.xpack.core.security.authc.TokenMetaData; +import org.elasticsearch.xpack.ilm.IndexLifecycle; import java.io.IOException; import java.net.URISyntaxException; @@ -66,12 +68,21 @@ abstract class MlNativeIntegTestCase extends ESIntegTestCase { @Override protected Collection> nodePlugins() { - return Arrays.asList(LocalStateCompositeXPackPlugin.class, Netty4Plugin.class); + return Arrays.asList( + LocalStateCompositeXPackPlugin.class, + Netty4Plugin.class, + // ILM is required for .ml-state template index settings + IndexLifecycle.class); } @Override protected Collection> transportClientPlugins() { - return Arrays.asList(XPackClientPlugin.class, Netty4Plugin.class, ReindexPlugin.class); + return Arrays.asList( + XPackClientPlugin.class, + Netty4Plugin.class, + ReindexPlugin.class, + // ILM is required for .ml-state template index settings + IndexLifecycle.class); } @Override @@ -88,6 +99,7 @@ abstract class MlNativeIntegTestCase extends ESIntegTestCase { builder.put(NetworkModule.TRANSPORT_TYPE_KEY, SecurityField.NAME4); builder.put(SecurityField.USER_SETTING.getKey(), "x_pack_rest_user:" + SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING); builder.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), true); + builder.put(LifecycleSettings.LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING.getKey(), false); builder.put("xpack.security.transport.ssl.enabled", true); builder.put("xpack.security.transport.ssl.key", key.toAbsolutePath().toString()); builder.put("xpack.security.transport.ssl.certificate", certificate.toAbsolutePath().toString()); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistry.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistry.java index ae516b4f8ce..6c205f23bb2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistry.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistry.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.ml.MlConfigIndex; import org.elasticsearch.xpack.core.ml.MlMetaIndex; import org.elasticsearch.xpack.core.ml.MlStatsIndex; @@ -38,10 +39,8 @@ public class MlIndexTemplateRegistry extends IndexTemplateRegistry { private static final IndexTemplateConfig ANOMALY_DETECTION_RESULTS_TEMPLATE = anomalyDetectionResultsTemplate(); - private static final IndexTemplateConfig ANOMALY_DETECTION_STATE_TEMPLATE = new IndexTemplateConfig( - AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX,ANOMALY_DETECTION_PATH + "state_index_template.json", - Version.CURRENT.id, VERSION_PATTERN, - Collections.singletonMap(VERSION_ID_PATTERN, String.valueOf(Version.CURRENT.id))); + private static final IndexTemplateConfig ANOMALY_DETECTION_STATE_TEMPLATE = stateTemplate(true); + private static final IndexTemplateConfig ANOMALY_DETECTION_STATE_TEMPLATE_NO_ILM = stateTemplate(false); private static final IndexTemplateConfig META_TEMPLATE = new IndexTemplateConfig(MlMetaIndex.INDEX_NAME, ROOT_RESOURCE_PATH + "meta_index_template.json", Version.CURRENT.id, VERSION_PATTERN, @@ -59,6 +58,10 @@ public class MlIndexTemplateRegistry extends IndexTemplateRegistry { private static final IndexTemplateConfig STATS_TEMPLATE = statsTemplate(); + private static final String ML_STATE_ILM_POLICY_NAME = "ml-state-ilm-policy"; + private static final LifecyclePolicyConfig ML_STATE_ILM_POLICY = + new LifecyclePolicyConfig(ML_STATE_ILM_POLICY_NAME, ANOMALY_DETECTION_PATH + "state_index_ilm_policy.json"); + private static IndexTemplateConfig configTemplate() { Map variables = new HashMap<>(); variables.put(VERSION_ID_PATTERN, String.valueOf(Version.CURRENT.id)); @@ -72,6 +75,22 @@ public class MlIndexTemplateRegistry extends IndexTemplateRegistry { variables); } + private static IndexTemplateConfig stateTemplate(boolean ilmEnabled) { + Map variables = new HashMap<>(); + variables.put(VERSION_ID_PATTERN, String.valueOf(Version.CURRENT.id)); + variables.put( + "xpack.ml.index.lifecycle.settings", + ilmEnabled + ? ",\"index.lifecycle.name\": \"" + ML_STATE_ILM_POLICY_NAME + "\"\n" + + ",\"index.lifecycle.rollover_alias\": \"" + AnomalyDetectorsIndex.jobStateIndexWriteAlias() + "\"\n" + : ""); + + return new IndexTemplateConfig(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX, + ANOMALY_DETECTION_PATH + "state_index_template.json", + Version.CURRENT.id, VERSION_PATTERN, + variables); + } + private static IndexTemplateConfig anomalyDetectionResultsTemplate() { Map variables = new HashMap<>(); variables.put(VERSION_ID_PATTERN, String.valueOf(Version.CURRENT.id)); @@ -94,9 +113,20 @@ public class MlIndexTemplateRegistry extends IndexTemplateRegistry { variables); } + private final List templatesToUse; + public MlIndexTemplateRegistry(Settings nodeSettings, ClusterService clusterService, ThreadPool threadPool, Client client, NamedXContentRegistry xContentRegistry) { super(nodeSettings, clusterService, threadPool, client, xContentRegistry); + boolean ilmEnabled = XPackSettings.INDEX_LIFECYCLE_ENABLED.get(settings); + templatesToUse = Arrays.asList( + ANOMALY_DETECTION_RESULTS_TEMPLATE, + ilmEnabled ? ANOMALY_DETECTION_STATE_TEMPLATE : ANOMALY_DETECTION_STATE_TEMPLATE_NO_ILM, + CONFIG_TEMPLATE, + INFERENCE_TEMPLATE, + META_TEMPLATE, + NOTIFICATIONS_TEMPLATE, + STATS_TEMPLATE); } @Override @@ -106,20 +136,12 @@ public class MlIndexTemplateRegistry extends IndexTemplateRegistry { @Override protected List getTemplateConfigs() { - return Arrays.asList( - ANOMALY_DETECTION_RESULTS_TEMPLATE, - ANOMALY_DETECTION_STATE_TEMPLATE, - CONFIG_TEMPLATE, - INFERENCE_TEMPLATE, - META_TEMPLATE, - NOTIFICATIONS_TEMPLATE, - STATS_TEMPLATE - ); + return templatesToUse; } @Override protected List getPolicyConfigs() { - return Collections.emptyList(); + return Collections.singletonList(ML_STATE_ILM_POLICY); } @Override diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistryTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistryTests.java new file mode 100644 index 00000000000..a4bffadd429 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistryTests.java @@ -0,0 +1,140 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.AdminClient; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.IndicesAdminClient; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterModule; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.ilm.LifecycleAction; +import org.elasticsearch.xpack.core.ilm.RolloverAction; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields; +import org.junit.Before; +import org.mockito.ArgumentCaptor; +import org.mockito.stubbing.Answer; + +import java.util.ArrayList; +import java.util.List; + +import static org.elasticsearch.mock.orig.Mockito.verify; +import static org.elasticsearch.mock.orig.Mockito.when; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; + +public class MlIndexTemplateRegistryTests extends ESTestCase { + + private final DiscoveryNode node = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Version.CURRENT); + private final DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build(); + + private NamedXContentRegistry xContentRegistry; + private ClusterService clusterService; + private ThreadPool threadPool; + private Client client; + private ArgumentCaptor putIndexTemplateRequestCaptor; + + @Before + public void setUpMocks() { + threadPool = mock(ThreadPool.class); + when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); + when(threadPool.generic()).thenReturn(EsExecutors.newDirectExecutorService()); + + client = mock(Client.class); + when(client.threadPool()).thenReturn(threadPool); + AdminClient adminClient = mock(AdminClient.class); + IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class); + when(adminClient.indices()).thenReturn(indicesAdminClient); + when(client.admin()).thenReturn(adminClient); + doAnswer(withResponse(new AcknowledgedResponse(true))).when(indicesAdminClient).putTemplate(any(), any()); + + clusterService = mock(ClusterService.class); + + List entries = new ArrayList<>(ClusterModule.getNamedXWriteables()); + entries.add(new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(RolloverAction.NAME), RolloverAction::parse)); + xContentRegistry = new NamedXContentRegistry(entries); + + putIndexTemplateRequestCaptor = ArgumentCaptor.forClass(PutIndexTemplateRequest.class); + } + + public void testStateTemplateWithIlm() { + MlIndexTemplateRegistry registry = + new MlIndexTemplateRegistry( + Settings.builder() + .put(XPackSettings.INDEX_LIFECYCLE_ENABLED.getKey(), true) + .build(), + clusterService, threadPool, client, xContentRegistry); + + registry.clusterChanged(createClusterChangedEvent(nodes)); + + verify(client.admin().indices(), times(7)).putTemplate(putIndexTemplateRequestCaptor.capture(), anyObject()); + + PutIndexTemplateRequest req = putIndexTemplateRequestCaptor.getAllValues().stream() + .filter(r -> r.name().equals(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX)) + .findFirst() + .orElseThrow(() -> new AssertionError("expected the ml state index template to be put")); + assertThat(req.settings().get("index.lifecycle.name"), equalTo("ml-state-ilm-policy")); + assertThat(req.settings().get("index.lifecycle.rollover_alias"), equalTo(".ml-state-write")); + } + + public void testStateTemplateWithNoIlm() { + MlIndexTemplateRegistry registry = + new MlIndexTemplateRegistry( + Settings.builder() + .put(XPackSettings.INDEX_LIFECYCLE_ENABLED.getKey(), false) + .build(), + clusterService, threadPool, client, xContentRegistry); + + registry.clusterChanged(createClusterChangedEvent(nodes)); + + verify(client.admin().indices(), times(7)).putTemplate(putIndexTemplateRequestCaptor.capture(), anyObject()); + + PutIndexTemplateRequest req = putIndexTemplateRequestCaptor.getAllValues().stream() + .filter(r -> r.name().equals(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX)) + .findFirst() + .orElseThrow(() -> new AssertionError("expected the ml state index template to be put")); + assertThat(req.settings().get("index.lifecycle.name"), is(nullValue())); + assertThat(req.settings().get("index.lifecycle.rollover_alias"), is(nullValue())); + } + + @SuppressWarnings("unchecked") + private static Answer withResponse(Response response) { + return invocationOnMock -> { + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; + listener.onResponse(response); + return null; + }; + } + + private static ClusterChangedEvent createClusterChangedEvent(DiscoveryNodes nodes) { + return new ClusterChangedEvent( + "created-from-test", + ClusterState.builder(new ClusterName("test")).nodes(nodes).build(), + ClusterState.builder(new ClusterName("test")).build()); + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlSingleNodeTestCase.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlSingleNodeTestCase.java index bdb1be97bd2..7243be63068 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlSingleNodeTestCase.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlSingleNodeTestCase.java @@ -15,7 +15,9 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.search.SearchModule; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.ilm.LifecycleSettings; import org.elasticsearch.xpack.core.ml.MachineLearningField; +import org.elasticsearch.xpack.ilm.IndexLifecycle; import java.util.Arrays; import java.util.Collection; @@ -44,6 +46,8 @@ public abstract class MlSingleNodeTestCase extends ESSingleNodeTestCase { newSettings.put(XPackSettings.SECURITY_ENABLED.getKey(), false); newSettings.put(XPackSettings.MONITORING_ENABLED.getKey(), false); newSettings.put(XPackSettings.WATCHER_ENABLED.getKey(), false); + // Disable ILM history index so that the tests don't have to clean it up + newSettings.put(LifecycleSettings.LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING.getKey(), false); return newSettings.build(); } @@ -55,7 +59,10 @@ public abstract class MlSingleNodeTestCase extends ESSingleNodeTestCase { @Override protected Collection> getPlugins() { - return pluginList(LocalStateMachineLearning.class); + return pluginList( + LocalStateMachineLearning.class, + // ILM is required for .ml-state template index settings + IndexLifecycle.class); } /** diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AnnotationIndexIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AnnotationIndexIT.java index 76d62e175ca..9b1ee7f9089 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AnnotationIndexIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AnnotationIndexIT.java @@ -10,15 +10,12 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.plugins.Plugin; import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex; -import org.elasticsearch.xpack.ml.LocalStateMachineLearning; import org.elasticsearch.xpack.ml.MlSingleNodeTestCase; import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; import org.junit.Before; -import java.util.Collection; import java.util.List; public class AnnotationIndexIT extends MlSingleNodeTestCase { @@ -33,11 +30,6 @@ public class AnnotationIndexIT extends MlSingleNodeTestCase { return newSettings.build(); } - @Override - protected Collection> getPlugins() { - return pluginList(LocalStateMachineLearning.class); - } - @Before public void createComponents() throws Exception { waitForMlTemplates(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java index 9c1c0f8aa2f..d13fcad9f44 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java @@ -41,6 +41,7 @@ import org.elasticsearch.xpack.core.ml.job.results.Bucket; import org.elasticsearch.xpack.core.ml.job.results.CategoryDefinition; import org.elasticsearch.xpack.core.ml.job.results.Influencer; import org.elasticsearch.xpack.core.ml.job.results.ModelPlot; +import org.elasticsearch.xpack.ilm.IndexLifecycle; import org.elasticsearch.xpack.ml.LocalStateMachineLearning; import org.elasticsearch.xpack.ml.MlSingleNodeTestCase; import org.elasticsearch.xpack.ml.inference.ingest.InferenceProcessor; @@ -94,7 +95,11 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase { @Override protected Collection> getPlugins() { - return pluginList(LocalStateMachineLearning.class, ReindexPlugin.class); + return pluginList( + LocalStateMachineLearning.class, + ReindexPlugin.class, + // ILM is required for .ml-state template index settings + IndexLifecycle.class); } @Before diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index 92e18a6e643..996ffec02af 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -6,14 +6,17 @@ package org.elasticsearch.xpack.ml.job.process.autodetect; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.AliasOrIndex; +import org.elasticsearch.cluster.metadata.AliasMetaData; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; @@ -37,6 +40,7 @@ import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; import org.elasticsearch.xpack.core.ml.job.config.MlFilter; import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; @@ -70,8 +74,6 @@ import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; -import java.util.SortedMap; -import java.util.TreeMap; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -82,6 +84,9 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED; import static org.elasticsearch.mock.orig.Mockito.doAnswer; import static org.elasticsearch.mock.orig.Mockito.doReturn; import static org.elasticsearch.mock.orig.Mockito.doThrow; @@ -158,10 +163,21 @@ public class AutodetectProcessManagerTests extends ESTestCase { new HashSet<>(Arrays.asList(MachineLearning.MAX_OPEN_JOBS_PER_NODE, ResultsPersisterService.PERSIST_RESULTS_MAX_RETRIES))); when(clusterService.getClusterSettings()).thenReturn(clusterSettings); - MetaData metaData = mock(MetaData.class); - SortedMap aliasOrIndexSortedMap = new TreeMap<>(); - aliasOrIndexSortedMap.put(AnomalyDetectorsIndex.jobStateIndexWriteAlias(), mock(AliasOrIndex.Alias.class)); - when(metaData.getAliasAndIndexLookup()).thenReturn(aliasOrIndexSortedMap); + MetaData metaData = MetaData.builder() + .indices(ImmutableOpenMap.builder() + .fPut( + AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000001", + IndexMetaData.builder(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000001") + .settings( + Settings.builder() + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 0) + .put(SETTING_VERSION_CREATED, Version.CURRENT) + .build()) + .putAlias(AliasMetaData.builder(AnomalyDetectorsIndex.jobStateIndexWriteAlias()).build()) + .build()) + .build()) + .build(); clusterState = mock(ClusterState.class); when(clusterState.getMetaData()).thenReturn(metaData); when(clusterState.metaData()).thenReturn(metaData); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java index 0586f1286a8..37bbee6a6b2 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java @@ -30,6 +30,8 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.MockHttpTransport; import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.action.util.QueryPage; +import org.elasticsearch.xpack.core.ilm.LifecycleSettings; import org.elasticsearch.xpack.core.ml.MachineLearningField; import org.elasticsearch.xpack.core.ml.action.CloseJobAction; import org.elasticsearch.xpack.core.ml.action.DeleteDataFrameAnalyticsAction; @@ -55,6 +57,7 @@ import org.elasticsearch.xpack.core.ml.job.config.Detector; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; +import org.elasticsearch.xpack.ilm.IndexLifecycle; import org.elasticsearch.xpack.ml.LocalStateMachineLearning; import org.elasticsearch.xpack.ml.MachineLearning; import org.junit.After; @@ -94,6 +97,7 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase { settings.put(XPackSettings.WATCHER_ENABLED.getKey(), false); settings.put(XPackSettings.MONITORING_ENABLED.getKey(), false); settings.put(XPackSettings.GRAPH_ENABLED.getKey(), false); + settings.put(LifecycleSettings.LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING.getKey(), false); return settings.build(); } @@ -110,8 +114,12 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase { @Override protected Collection> nodePlugins() { - return Arrays.asList(LocalStateMachineLearning.class, CommonAnalysisPlugin.class, - ReindexPlugin.class); + return Arrays.asList( + LocalStateMachineLearning.class, + CommonAnalysisPlugin.class, + ReindexPlugin.class, + // ILM is required for .ml-state template index settings + IndexLifecycle.class); } @Override