[7.x] Implement ILM policy for .ml-state* indices (#52356) (#53327)

This commit is contained in:
Przemysław Witek 2020-03-10 14:24:18 +01:00 committed by GitHub
parent e23c3f915f
commit d54d7f2be0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 432 additions and 140 deletions

View File

@ -15,8 +15,6 @@ import org.elasticsearch.xpack.core.ml.utils.MlIndexAndAlias;
import org.elasticsearch.xpack.core.template.TemplateUtils;
import java.util.Collections;
import java.util.Comparator;
import java.util.regex.Pattern;
/**
* Methods for handling index naming related functions
@ -28,31 +26,7 @@ public final class AnomalyDetectorsIndex {
private static final String RESULTS_MAPPINGS_VERSION_VARIABLE = "xpack.ml.version";
private static final String RESOURCE_PATH = "/org/elasticsearch/xpack/core/ml/anomalydetection/";
// Visible for testing
static final Comparator<String> STATE_INDEX_NAME_COMPARATOR = new Comparator<String>() {
private final Pattern HAS_SIX_DIGIT_SUFFIX = Pattern.compile("\\d{6}");
@Override
public int compare(String index1, String index2) {
String[] index1Parts = index1.split("-");
String index1Suffix = index1Parts[index1Parts.length - 1];
boolean index1HasSixDigitsSuffix = HAS_SIX_DIGIT_SUFFIX.matcher(index1Suffix).matches();
String[] index2Parts = index2.split("-");
String index2Suffix = index2Parts[index2Parts.length - 1];
boolean index2HasSixDigitsSuffix = HAS_SIX_DIGIT_SUFFIX.matcher(index2Suffix).matches();
if (index1HasSixDigitsSuffix && index2HasSixDigitsSuffix) {
return index1Suffix.compareTo(index2Suffix);
} else if (index1HasSixDigitsSuffix != index2HasSixDigitsSuffix) {
return Boolean.compare(index1HasSixDigitsSuffix, index2HasSixDigitsSuffix);
} else {
return index1.compareTo(index2);
}
}
};
private AnomalyDetectorsIndex() {
}
private AnomalyDetectorsIndex() {}
public static String jobResultsIndexPrefix() {
return AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX;
@ -109,8 +83,13 @@ public final class AnomalyDetectorsIndex {
*/
public static void createStateIndexAndAliasIfNecessary(Client client, ClusterState state, IndexNameExpressionResolver resolver,
final ActionListener<Boolean> finalListener) {
MlIndexAndAlias.createIndexAndAliasIfNecessary(client, state, resolver,
AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX, AnomalyDetectorsIndex.jobStateIndexWriteAlias(), finalListener);
MlIndexAndAlias.createIndexAndAliasIfNecessary(
client,
state,
resolver,
AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX,
AnomalyDetectorsIndex.jobStateIndexWriteAlias(),
finalListener);
}
public static String resultsMapping() {

View File

@ -5,20 +5,27 @@
*/
package org.elasticsearch.xpack.core.ml.utils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.Nullable;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Optional;
import java.util.regex.Pattern;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
@ -29,6 +36,8 @@ import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
*/
public final class MlIndexAndAlias {
private static final Logger logger = LogManager.getLogger(MlIndexAndAlias.class);
// Visible for testing
static final Comparator<String> INDEX_NAME_COMPARATOR = new Comparator<String>() {
@ -60,60 +69,114 @@ public final class MlIndexAndAlias {
* or to the index with the highest suffix if the index did not have to be created.
* The listener is notified with a {@code boolean} that informs whether the index or the alias were created.
*/
public static void createIndexAndAliasIfNecessary(Client client, ClusterState clusterState, IndexNameExpressionResolver resolver,
String indexPatternPrefix, String alias, ActionListener<Boolean> listener) {
if (clusterState.getMetaData().getAliasAndIndexLookup().containsKey(alias)) {
listener.onResponse(false);
return;
}
public static void createIndexAndAliasIfNecessary(Client client,
ClusterState clusterState,
IndexNameExpressionResolver resolver,
String indexPatternPrefix,
String alias,
ActionListener<Boolean> listener) {
final ActionListener<String> createAliasListener = ActionListener.wrap(
concreteIndexName -> {
final IndicesAliasesRequest request = client.admin()
.indices()
.prepareAliases()
.addAlias(concreteIndexName, alias)
.request();
executeAsyncWithOrigin(client.threadPool().getThreadContext(),
ML_ORIGIN,
request,
ActionListener.<AcknowledgedResponse>wrap(
resp -> listener.onResponse(resp.isAcknowledged()),
listener::onFailure),
client.admin().indices()::aliases);
},
listener::onFailure
);
String legacyIndexWithoutSuffix = indexPatternPrefix;
String indexPattern = indexPatternPrefix + "*";
// The initial index name must be suitable for rollover functionality.
String firstConcreteIndex = indexPatternPrefix + "-000001";
String[] concreteIndexNames =
resolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), indexPattern);
Optional<IndexMetaData> indexPointedByCurrentWriteAlias = clusterState.getMetaData().hasAlias(alias)
? clusterState.getMetaData().getAliasAndIndexLookup().get(alias).getIndices().stream().findFirst()
: Optional.empty();
String[] stateIndices = resolver.concreteIndexNames(clusterState,
IndicesOptions.lenientExpandOpen(), indexPatternPrefix + "*");
if (stateIndices.length > 0) {
String latestStateIndex = Arrays.stream(stateIndices).max(INDEX_NAME_COMPARATOR).get();
createAliasListener.onResponse(latestStateIndex);
if (concreteIndexNames.length == 0) {
if (indexPointedByCurrentWriteAlias.isPresent() == false) {
createFirstConcreteIndex(client, firstConcreteIndex, alias, true, listener);
return;
}
logger.error(
"There are no indices matching '{}' pattern but '{}' alias points at [{}]. This should never happen.",
indexPattern, alias, indexPointedByCurrentWriteAlias.get());
} else if (concreteIndexNames.length == 1 && concreteIndexNames[0].equals(legacyIndexWithoutSuffix)) {
if (indexPointedByCurrentWriteAlias.isPresent() == false) {
createFirstConcreteIndex(client, firstConcreteIndex, alias, true, listener);
return;
}
if (indexPointedByCurrentWriteAlias.get().getIndex().getName().equals(legacyIndexWithoutSuffix)) {
createFirstConcreteIndex(
client,
firstConcreteIndex,
alias,
false,
ActionListener.wrap(
unused -> updateWriteAlias(client, alias, legacyIndexWithoutSuffix, firstConcreteIndex, listener),
listener::onFailure)
);
return;
}
logger.error(
"There is exactly one index (i.e. '{}') matching '{}' pattern but '{}' alias points at [{}]. This should never happen.",
legacyIndexWithoutSuffix, indexPattern, alias, indexPointedByCurrentWriteAlias.get());
} else {
// The initial index name must be suitable for rollover functionality.
String initialJobStateIndex = indexPatternPrefix + "-000001";
CreateIndexRequest createIndexRequest = client.admin()
.indices()
.prepareCreate(initialJobStateIndex)
.addAlias(new Alias(alias))
.request();
executeAsyncWithOrigin(client.threadPool().getThreadContext(),
ML_ORIGIN,
createIndexRequest,
ActionListener.<CreateIndexResponse>wrap(
createIndexResponse -> listener.onResponse(true),
createIndexFailure -> {
// If it was created between our last check, and this request being handled, we should add the alias
// Adding an alias that already exists is idempotent. So, no need to double check if the alias exists
// as well.
if (ExceptionsHelper.unwrapCause(createIndexFailure) instanceof ResourceAlreadyExistsException) {
createAliasListener.onResponse(initialJobStateIndex);
} else {
listener.onFailure(createIndexFailure);
}
}),
client.admin().indices()::create);
if (indexPointedByCurrentWriteAlias.isPresent() == false) {
assert concreteIndexNames.length > 0;
String latestConcreteIndexName = Arrays.stream(concreteIndexNames).max(INDEX_NAME_COMPARATOR).get();
updateWriteAlias(client, alias, null, latestConcreteIndexName, listener);
return;
}
}
// If the alias is set, there is nothing more to do.
listener.onResponse(false);
}
private static void createFirstConcreteIndex(Client client,
String index,
String alias,
boolean addAlias,
ActionListener<Boolean> listener) {
CreateIndexRequestBuilder requestBuilder = client.admin()
.indices()
.prepareCreate(index);
if (addAlias) {
requestBuilder.addAlias(new Alias(alias));
}
CreateIndexRequest request = requestBuilder.request();
executeAsyncWithOrigin(client.threadPool().getThreadContext(),
ML_ORIGIN,
request,
ActionListener.<CreateIndexResponse>wrap(
createIndexResponse -> listener.onResponse(true),
createIndexFailure -> {
// If it was created between our last check, and this request being handled, we should add the alias
// Adding an alias that already exists is idempotent. So, no need to double check if the alias exists
// as well.
if (ExceptionsHelper.unwrapCause(createIndexFailure) instanceof ResourceAlreadyExistsException) {
updateWriteAlias(client, alias, null, index, listener);
} else {
listener.onFailure(createIndexFailure);
}
}),
client.admin().indices()::create);
}
private static void updateWriteAlias(Client client,
String alias,
@Nullable String currentIndex,
String newIndex,
ActionListener<Boolean> listener) {
IndicesAliasesRequestBuilder requestBuilder = client.admin()
.indices()
.prepareAliases()
.addAlias(newIndex, alias);
if (currentIndex != null) {
requestBuilder.removeAlias(currentIndex, alias);
}
IndicesAliasesRequest request = requestBuilder.request();
executeAsyncWithOrigin(client.threadPool().getThreadContext(),
ML_ORIGIN,
request,
ActionListener.<AcknowledgedResponse>wrap(
resp -> listener.onResponse(resp.isAcknowledged()),
listener::onFailure),
client.admin().indices()::aliases);
}
}

View File

@ -0,0 +1,11 @@
{
"phases": {
"hot": {
"actions": {
"rollover": {
"max_size": "50GB"
}
}
}
}
}

View File

@ -9,6 +9,7 @@
"auto_expand_replicas" : "0-1",
"hidden": true
}
${xpack.ml.index.lifecycle.settings}
},
"mappings" : {
"_doc": {
@ -18,5 +19,5 @@
"enabled": false
}
},
"aliases" : { }
"aliases" : {}
}

View File

@ -47,6 +47,7 @@ import java.util.stream.Stream;
import static java.util.stream.Collectors.toMap;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
@ -67,7 +68,7 @@ public class MlIndexAndAliasTests extends ESTestCase {
private IndicesAdminClient indicesAdminClient;
private AdminClient adminClient;
private Client client;
private ActionListener<Boolean> finalListener;
private ActionListener<Boolean> listener;
private ArgumentCaptor<CreateIndexRequest> createRequestCaptor;
private ArgumentCaptor<IndicesAliasesRequest> aliasesRequestCaptor;
@ -91,7 +92,7 @@ public class MlIndexAndAliasTests extends ESTestCase {
when(client.threadPool()).thenReturn(threadPool);
when(client.admin()).thenReturn(adminClient);
finalListener = mock(ActionListener.class);
listener = mock(ActionListener.class);
createRequestCaptor = ArgumentCaptor.forClass(CreateIndexRequest.class);
aliasesRequestCaptor = ArgumentCaptor.forClass(IndicesAliasesRequest.class);
@ -99,17 +100,17 @@ public class MlIndexAndAliasTests extends ESTestCase {
@After
public void verifyNoMoreInteractionsWithMocks() {
verifyNoMoreInteractions(indicesAdminClient, finalListener);
verifyNoMoreInteractions(indicesAdminClient, listener);
}
public void testCreateStateIndexAndAliasIfNecessary_CleanState() {
ClusterState clusterState = createClusterState(Collections.emptyMap());
createIndexAndAliasIfNecessary(clusterState);
InOrder inOrder = inOrder(indicesAdminClient, finalListener);
InOrder inOrder = inOrder(indicesAdminClient, listener);
inOrder.verify(indicesAdminClient).prepareCreate(FIRST_CONCRETE_INDEX);
inOrder.verify(indicesAdminClient).create(createRequestCaptor.capture(), any());
inOrder.verify(finalListener).onResponse(true);
inOrder.verify(listener).onResponse(true);
CreateIndexRequest createRequest = createRequestCaptor.getValue();
assertThat(createRequest.index(), equalTo(FIRST_CONCRETE_INDEX));
@ -120,11 +121,7 @@ public class MlIndexAndAliasTests extends ESTestCase {
ClusterState clusterState = createClusterState(Collections.singletonMap(indexName, createIndexMetaDataWithAlias(indexName)));
createIndexAndAliasIfNecessary(clusterState);
verify(finalListener).onResponse(false);
}
public void testCreateStateIndexAndAliasIfNecessary_WriteAliasAlreadyExistsAndPointsAtLegacyIndex() {
assertNoClientInteractionsWhenWriteAliasAlreadyExists(LEGACY_INDEX_WITHOUT_SUFFIX);
verify(listener).onResponse(false);
}
public void testCreateStateIndexAndAliasIfNecessary_WriteAliasAlreadyExistsAndPointsAtInitialStateIndex() {
@ -132,23 +129,48 @@ public class MlIndexAndAliasTests extends ESTestCase {
}
public void testCreateStateIndexAndAliasIfNecessary_WriteAliasAlreadyExistsAndPointsAtSubsequentStateIndex() {
assertNoClientInteractionsWhenWriteAliasAlreadyExists(".ml-state-000007");
assertNoClientInteractionsWhenWriteAliasAlreadyExists("test-000007");
}
public void testCreateStateIndexAndAliasIfNecessary_WriteAliasAlreadyExistsAndPointsAtDummyIndex() {
assertNoClientInteractionsWhenWriteAliasAlreadyExists("dummy-index");
}
public void testCreateStateIndexAndAliasIfNecessary_WriteAliasAlreadyExistsAndPointsAtLegacyStateIndex() {
ClusterState clusterState =
createClusterState(
Collections.singletonMap(LEGACY_INDEX_WITHOUT_SUFFIX, createIndexMetaDataWithAlias(LEGACY_INDEX_WITHOUT_SUFFIX)));
createIndexAndAliasIfNecessary(clusterState);
InOrder inOrder = inOrder(indicesAdminClient, listener);
inOrder.verify(indicesAdminClient).prepareCreate(FIRST_CONCRETE_INDEX);
inOrder.verify(indicesAdminClient).create(createRequestCaptor.capture(), any());
inOrder.verify(indicesAdminClient).prepareAliases();
inOrder.verify(indicesAdminClient).aliases(aliasesRequestCaptor.capture(), any());
inOrder.verify(listener).onResponse(true);
CreateIndexRequest createRequest = createRequestCaptor.getValue();
assertThat(createRequest.index(), equalTo(FIRST_CONCRETE_INDEX));
assertThat(createRequest.aliases(), empty());
IndicesAliasesRequest indicesAliasesRequest = aliasesRequestCaptor.getValue();
assertThat(
indicesAliasesRequest.getAliasActions(),
contains(
AliasActions.add().alias(TEST_INDEX_ALIAS).index(FIRST_CONCRETE_INDEX),
AliasActions.remove().alias(TEST_INDEX_ALIAS).index(LEGACY_INDEX_WITHOUT_SUFFIX)));
}
private void assertMlStateWriteAliasAddedToMostRecentMlStateIndex(List<String> existingIndexNames, String expectedWriteIndexName) {
ClusterState clusterState =
createClusterState(
existingIndexNames.stream().collect(toMap(Function.identity(), MlIndexAndAliasTests::createIndexMetaData)));
createIndexAndAliasIfNecessary(clusterState);
InOrder inOrder = inOrder(indicesAdminClient, finalListener);
InOrder inOrder = inOrder(indicesAdminClient, listener);
inOrder.verify(indicesAdminClient).prepareAliases();
inOrder.verify(indicesAdminClient).aliases(aliasesRequestCaptor.capture(), any());
inOrder.verify(finalListener).onResponse(true);
inOrder.verify(listener).onResponse(true);
IndicesAliasesRequest indicesAliasesRequest = aliasesRequestCaptor.getValue();
assertThat(
@ -156,11 +178,6 @@ public class MlIndexAndAliasTests extends ESTestCase {
contains(AliasActions.add().alias(TEST_INDEX_ALIAS).index(expectedWriteIndexName)));
}
public void testCreateStateIndexAndAliasIfNecessary_WriteAliasDoesNotExistButLegacyIndexExists() {
assertMlStateWriteAliasAddedToMostRecentMlStateIndex(
Arrays.asList(LEGACY_INDEX_WITHOUT_SUFFIX), LEGACY_INDEX_WITHOUT_SUFFIX);
}
public void testCreateStateIndexAndAliasIfNecessary_WriteAliasDoesNotExistButInitialStateIndexExists() {
assertMlStateWriteAliasAddedToMostRecentMlStateIndex(
Arrays.asList(FIRST_CONCRETE_INDEX), FIRST_CONCRETE_INDEX);
@ -176,6 +193,21 @@ public class MlIndexAndAliasTests extends ESTestCase {
Arrays.asList(LEGACY_INDEX_WITHOUT_SUFFIX, "test-000003", "test-000040", "test-000500"), "test-000500");
}
public void testCreateStateIndexAndAliasIfNecessary_WriteAliasDoesNotExistButLegacyStateIndexExists() {
ClusterState clusterState =
createClusterState(Collections.singletonMap(LEGACY_INDEX_WITHOUT_SUFFIX, createIndexMetaData(LEGACY_INDEX_WITHOUT_SUFFIX)));
createIndexAndAliasIfNecessary(clusterState);
InOrder inOrder = inOrder(indicesAdminClient, listener);
inOrder.verify(indicesAdminClient).prepareCreate(FIRST_CONCRETE_INDEX);
inOrder.verify(indicesAdminClient).create(createRequestCaptor.capture(), any());
inOrder.verify(listener).onResponse(true);
CreateIndexRequest createRequest = createRequestCaptor.getValue();
assertThat(createRequest.index(), equalTo(FIRST_CONCRETE_INDEX));
assertThat(createRequest.aliases(), equalTo(Collections.singleton(new Alias(TEST_INDEX_ALIAS))));
}
public void testIndexNameComparator() {
Comparator<String> comparator = MlIndexAndAlias.INDEX_NAME_COMPARATOR;
assertThat(
@ -202,8 +234,8 @@ public class MlIndexAndAliasTests extends ESTestCase {
}
private void createIndexAndAliasIfNecessary(ClusterState clusterState) {
MlIndexAndAlias.createIndexAndAliasIfNecessary(client, clusterState, new IndexNameExpressionResolver(),
TEST_INDEX_PREFIX, TEST_INDEX_ALIAS, finalListener);
MlIndexAndAlias.createIndexAndAliasIfNecessary(
client, clusterState, new IndexNameExpressionResolver(), TEST_INDEX_PREFIX, TEST_INDEX_ALIAS, listener);
}
@SuppressWarnings("unchecked")

View File

@ -50,6 +50,7 @@ dependencies {
compileOnly project(':modules:lang-painless:spi')
compileOnly project(path: xpackModule('core'), configuration: 'default')
testCompile project(path: xpackModule('core'), configuration: 'testArtifacts')
testCompile project(path: xpackModule('ilm'), configuration: 'default')
// This should not be here
testCompile project(path: xpackModule('security'), configuration: 'testArtifacts')

View File

@ -27,6 +27,7 @@ import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Detector;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
@ -94,12 +95,14 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
@AwaitsFix(bugUrl = "https://github.com/elastic/ml-cpp/pulls/468")
public void testDeleteExpiredData() throws Exception {
// Index some unused state documents (more than 10K to test scrolling works)
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
String mlStateIndexName = AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000001";
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int i = 0; i < 10010; i++) {
String docId = "non_existing_job_" + randomFrom("model_state_1234567#" + i, "quantiles", "categorizer_state#" + i);
IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias()).id(docId);
indexRequest.source(Collections.emptyMap());
IndexRequest indexRequest =
new IndexRequest(mlStateIndexName)
.id(docId)
.source(Collections.emptyMap());
bulkRequestBuilder.add(indexRequest);
}
ActionFuture<BulkResponse> indexUnusedStateDocsResponse = bulkRequestBuilder.execute();

View File

@ -28,6 +28,7 @@ import org.elasticsearch.transport.Netty4Plugin;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import org.elasticsearch.xpack.core.XPackClientPlugin;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction;
@ -39,6 +40,7 @@ import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
import org.elasticsearch.xpack.core.security.SecurityField;
import org.elasticsearch.xpack.core.security.authc.TokenMetaData;
import org.elasticsearch.xpack.ilm.IndexLifecycle;
import java.io.IOException;
import java.net.URISyntaxException;
@ -66,12 +68,21 @@ abstract class MlNativeIntegTestCase extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(LocalStateCompositeXPackPlugin.class, Netty4Plugin.class);
return Arrays.asList(
LocalStateCompositeXPackPlugin.class,
Netty4Plugin.class,
// ILM is required for .ml-state template index settings
IndexLifecycle.class);
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return Arrays.asList(XPackClientPlugin.class, Netty4Plugin.class, ReindexPlugin.class);
return Arrays.asList(
XPackClientPlugin.class,
Netty4Plugin.class,
ReindexPlugin.class,
// ILM is required for .ml-state template index settings
IndexLifecycle.class);
}
@Override
@ -88,6 +99,7 @@ abstract class MlNativeIntegTestCase extends ESIntegTestCase {
builder.put(NetworkModule.TRANSPORT_TYPE_KEY, SecurityField.NAME4);
builder.put(SecurityField.USER_SETTING.getKey(), "x_pack_rest_user:" + SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING);
builder.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), true);
builder.put(LifecycleSettings.LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING.getKey(), false);
builder.put("xpack.security.transport.ssl.enabled", true);
builder.put("xpack.security.transport.ssl.key", key.toAbsolutePath().toString());
builder.put("xpack.security.transport.ssl.certificate", certificate.toAbsolutePath().toString());

View File

@ -12,6 +12,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ml.MlConfigIndex;
import org.elasticsearch.xpack.core.ml.MlMetaIndex;
import org.elasticsearch.xpack.core.ml.MlStatsIndex;
@ -38,10 +39,8 @@ public class MlIndexTemplateRegistry extends IndexTemplateRegistry {
private static final IndexTemplateConfig ANOMALY_DETECTION_RESULTS_TEMPLATE = anomalyDetectionResultsTemplate();
private static final IndexTemplateConfig ANOMALY_DETECTION_STATE_TEMPLATE = new IndexTemplateConfig(
AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX,ANOMALY_DETECTION_PATH + "state_index_template.json",
Version.CURRENT.id, VERSION_PATTERN,
Collections.singletonMap(VERSION_ID_PATTERN, String.valueOf(Version.CURRENT.id)));
private static final IndexTemplateConfig ANOMALY_DETECTION_STATE_TEMPLATE = stateTemplate(true);
private static final IndexTemplateConfig ANOMALY_DETECTION_STATE_TEMPLATE_NO_ILM = stateTemplate(false);
private static final IndexTemplateConfig META_TEMPLATE = new IndexTemplateConfig(MlMetaIndex.INDEX_NAME,
ROOT_RESOURCE_PATH + "meta_index_template.json", Version.CURRENT.id, VERSION_PATTERN,
@ -59,6 +58,10 @@ public class MlIndexTemplateRegistry extends IndexTemplateRegistry {
private static final IndexTemplateConfig STATS_TEMPLATE = statsTemplate();
private static final String ML_STATE_ILM_POLICY_NAME = "ml-state-ilm-policy";
private static final LifecyclePolicyConfig ML_STATE_ILM_POLICY =
new LifecyclePolicyConfig(ML_STATE_ILM_POLICY_NAME, ANOMALY_DETECTION_PATH + "state_index_ilm_policy.json");
private static IndexTemplateConfig configTemplate() {
Map<String, String> variables = new HashMap<>();
variables.put(VERSION_ID_PATTERN, String.valueOf(Version.CURRENT.id));
@ -72,6 +75,22 @@ public class MlIndexTemplateRegistry extends IndexTemplateRegistry {
variables);
}
private static IndexTemplateConfig stateTemplate(boolean ilmEnabled) {
Map<String, String> variables = new HashMap<>();
variables.put(VERSION_ID_PATTERN, String.valueOf(Version.CURRENT.id));
variables.put(
"xpack.ml.index.lifecycle.settings",
ilmEnabled
? ",\"index.lifecycle.name\": \"" + ML_STATE_ILM_POLICY_NAME + "\"\n" +
",\"index.lifecycle.rollover_alias\": \"" + AnomalyDetectorsIndex.jobStateIndexWriteAlias() + "\"\n"
: "");
return new IndexTemplateConfig(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX,
ANOMALY_DETECTION_PATH + "state_index_template.json",
Version.CURRENT.id, VERSION_PATTERN,
variables);
}
private static IndexTemplateConfig anomalyDetectionResultsTemplate() {
Map<String, String> variables = new HashMap<>();
variables.put(VERSION_ID_PATTERN, String.valueOf(Version.CURRENT.id));
@ -94,9 +113,20 @@ public class MlIndexTemplateRegistry extends IndexTemplateRegistry {
variables);
}
private final List<IndexTemplateConfig> templatesToUse;
public MlIndexTemplateRegistry(Settings nodeSettings, ClusterService clusterService, ThreadPool threadPool, Client client,
NamedXContentRegistry xContentRegistry) {
super(nodeSettings, clusterService, threadPool, client, xContentRegistry);
boolean ilmEnabled = XPackSettings.INDEX_LIFECYCLE_ENABLED.get(settings);
templatesToUse = Arrays.asList(
ANOMALY_DETECTION_RESULTS_TEMPLATE,
ilmEnabled ? ANOMALY_DETECTION_STATE_TEMPLATE : ANOMALY_DETECTION_STATE_TEMPLATE_NO_ILM,
CONFIG_TEMPLATE,
INFERENCE_TEMPLATE,
META_TEMPLATE,
NOTIFICATIONS_TEMPLATE,
STATS_TEMPLATE);
}
@Override
@ -106,20 +136,12 @@ public class MlIndexTemplateRegistry extends IndexTemplateRegistry {
@Override
protected List<IndexTemplateConfig> getTemplateConfigs() {
return Arrays.asList(
ANOMALY_DETECTION_RESULTS_TEMPLATE,
ANOMALY_DETECTION_STATE_TEMPLATE,
CONFIG_TEMPLATE,
INFERENCE_TEMPLATE,
META_TEMPLATE,
NOTIFICATIONS_TEMPLATE,
STATS_TEMPLATE
);
return templatesToUse;
}
@Override
protected List<LifecyclePolicyConfig> getPolicyConfigs() {
return Collections.emptyList();
return Collections.singletonList(ML_STATE_ILM_POLICY);
}
@Override

View File

@ -0,0 +1,140 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ilm.LifecycleAction;
import org.elasticsearch.xpack.core.ilm.RolloverAction;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.mockito.stubbing.Answer;
import java.util.ArrayList;
import java.util.List;
import static org.elasticsearch.mock.orig.Mockito.verify;
import static org.elasticsearch.mock.orig.Mockito.when;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
public class MlIndexTemplateRegistryTests extends ESTestCase {
private final DiscoveryNode node = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Version.CURRENT);
private final DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();
private NamedXContentRegistry xContentRegistry;
private ClusterService clusterService;
private ThreadPool threadPool;
private Client client;
private ArgumentCaptor<PutIndexTemplateRequest> putIndexTemplateRequestCaptor;
@Before
public void setUpMocks() {
threadPool = mock(ThreadPool.class);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
when(threadPool.generic()).thenReturn(EsExecutors.newDirectExecutorService());
client = mock(Client.class);
when(client.threadPool()).thenReturn(threadPool);
AdminClient adminClient = mock(AdminClient.class);
IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class);
when(adminClient.indices()).thenReturn(indicesAdminClient);
when(client.admin()).thenReturn(adminClient);
doAnswer(withResponse(new AcknowledgedResponse(true))).when(indicesAdminClient).putTemplate(any(), any());
clusterService = mock(ClusterService.class);
List<NamedXContentRegistry.Entry> entries = new ArrayList<>(ClusterModule.getNamedXWriteables());
entries.add(new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(RolloverAction.NAME), RolloverAction::parse));
xContentRegistry = new NamedXContentRegistry(entries);
putIndexTemplateRequestCaptor = ArgumentCaptor.forClass(PutIndexTemplateRequest.class);
}
public void testStateTemplateWithIlm() {
MlIndexTemplateRegistry registry =
new MlIndexTemplateRegistry(
Settings.builder()
.put(XPackSettings.INDEX_LIFECYCLE_ENABLED.getKey(), true)
.build(),
clusterService, threadPool, client, xContentRegistry);
registry.clusterChanged(createClusterChangedEvent(nodes));
verify(client.admin().indices(), times(7)).putTemplate(putIndexTemplateRequestCaptor.capture(), anyObject());
PutIndexTemplateRequest req = putIndexTemplateRequestCaptor.getAllValues().stream()
.filter(r -> r.name().equals(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX))
.findFirst()
.orElseThrow(() -> new AssertionError("expected the ml state index template to be put"));
assertThat(req.settings().get("index.lifecycle.name"), equalTo("ml-state-ilm-policy"));
assertThat(req.settings().get("index.lifecycle.rollover_alias"), equalTo(".ml-state-write"));
}
public void testStateTemplateWithNoIlm() {
MlIndexTemplateRegistry registry =
new MlIndexTemplateRegistry(
Settings.builder()
.put(XPackSettings.INDEX_LIFECYCLE_ENABLED.getKey(), false)
.build(),
clusterService, threadPool, client, xContentRegistry);
registry.clusterChanged(createClusterChangedEvent(nodes));
verify(client.admin().indices(), times(7)).putTemplate(putIndexTemplateRequestCaptor.capture(), anyObject());
PutIndexTemplateRequest req = putIndexTemplateRequestCaptor.getAllValues().stream()
.filter(r -> r.name().equals(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX))
.findFirst()
.orElseThrow(() -> new AssertionError("expected the ml state index template to be put"));
assertThat(req.settings().get("index.lifecycle.name"), is(nullValue()));
assertThat(req.settings().get("index.lifecycle.rollover_alias"), is(nullValue()));
}
@SuppressWarnings("unchecked")
private static <Response> Answer<Response> withResponse(Response response) {
return invocationOnMock -> {
ActionListener<Response> listener = (ActionListener<Response>) invocationOnMock.getArguments()[1];
listener.onResponse(response);
return null;
};
}
private static ClusterChangedEvent createClusterChangedEvent(DiscoveryNodes nodes) {
return new ClusterChangedEvent(
"created-from-test",
ClusterState.builder(new ClusterName("test")).nodes(nodes).build(),
ClusterState.builder(new ClusterName("test")).build());
}
}

View File

@ -15,7 +15,9 @@ import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
import org.elasticsearch.xpack.core.ml.MachineLearningField;
import org.elasticsearch.xpack.ilm.IndexLifecycle;
import java.util.Arrays;
import java.util.Collection;
@ -44,6 +46,8 @@ public abstract class MlSingleNodeTestCase extends ESSingleNodeTestCase {
newSettings.put(XPackSettings.SECURITY_ENABLED.getKey(), false);
newSettings.put(XPackSettings.MONITORING_ENABLED.getKey(), false);
newSettings.put(XPackSettings.WATCHER_ENABLED.getKey(), false);
// Disable ILM history index so that the tests don't have to clean it up
newSettings.put(LifecycleSettings.LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING.getKey(), false);
return newSettings.build();
}
@ -55,7 +59,10 @@ public abstract class MlSingleNodeTestCase extends ESSingleNodeTestCase {
@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return pluginList(LocalStateMachineLearning.class);
return pluginList(
LocalStateMachineLearning.class,
// ILM is required for .ml-state template index settings
IndexLifecycle.class);
}
/**

View File

@ -10,15 +10,12 @@ import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;
import org.elasticsearch.xpack.ml.LocalStateMachineLearning;
import org.elasticsearch.xpack.ml.MlSingleNodeTestCase;
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
import org.junit.Before;
import java.util.Collection;
import java.util.List;
public class AnnotationIndexIT extends MlSingleNodeTestCase {
@ -33,11 +30,6 @@ public class AnnotationIndexIT extends MlSingleNodeTestCase {
return newSettings.build();
}
@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return pluginList(LocalStateMachineLearning.class);
}
@Before
public void createComponents() throws Exception {
waitForMlTemplates();

View File

@ -41,6 +41,7 @@ import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.ml.job.results.CategoryDefinition;
import org.elasticsearch.xpack.core.ml.job.results.Influencer;
import org.elasticsearch.xpack.core.ml.job.results.ModelPlot;
import org.elasticsearch.xpack.ilm.IndexLifecycle;
import org.elasticsearch.xpack.ml.LocalStateMachineLearning;
import org.elasticsearch.xpack.ml.MlSingleNodeTestCase;
import org.elasticsearch.xpack.ml.inference.ingest.InferenceProcessor;
@ -94,7 +95,11 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return pluginList(LocalStateMachineLearning.class, ReindexPlugin.class);
return pluginList(
LocalStateMachineLearning.class,
ReindexPlugin.class,
// ILM is required for .ml-state template index settings
IndexLifecycle.class);
}
@Before

View File

@ -6,14 +6,17 @@
package org.elasticsearch.xpack.ml.job.process.autodetect;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasOrIndex;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
@ -37,6 +40,7 @@ import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
@ -70,8 +74,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@ -82,6 +84,9 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED;
import static org.elasticsearch.mock.orig.Mockito.doAnswer;
import static org.elasticsearch.mock.orig.Mockito.doReturn;
import static org.elasticsearch.mock.orig.Mockito.doThrow;
@ -158,10 +163,21 @@ public class AutodetectProcessManagerTests extends ESTestCase {
new HashSet<>(Arrays.asList(MachineLearning.MAX_OPEN_JOBS_PER_NODE,
ResultsPersisterService.PERSIST_RESULTS_MAX_RETRIES)));
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
MetaData metaData = mock(MetaData.class);
SortedMap<String, AliasOrIndex> aliasOrIndexSortedMap = new TreeMap<>();
aliasOrIndexSortedMap.put(AnomalyDetectorsIndex.jobStateIndexWriteAlias(), mock(AliasOrIndex.Alias.class));
when(metaData.getAliasAndIndexLookup()).thenReturn(aliasOrIndexSortedMap);
MetaData metaData = MetaData.builder()
.indices(ImmutableOpenMap.<String, IndexMetaData>builder()
.fPut(
AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000001",
IndexMetaData.builder(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000001")
.settings(
Settings.builder()
.put(SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_NUMBER_OF_REPLICAS, 0)
.put(SETTING_VERSION_CREATED, Version.CURRENT)
.build())
.putAlias(AliasMetaData.builder(AnomalyDetectorsIndex.jobStateIndexWriteAlias()).build())
.build())
.build())
.build();
clusterState = mock(ClusterState.class);
when(clusterState.getMetaData()).thenReturn(metaData);
when(clusterState.metaData()).thenReturn(metaData);

View File

@ -30,6 +30,8 @@ import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.MockHttpTransport;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.action.util.QueryPage;
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
import org.elasticsearch.xpack.core.ml.MachineLearningField;
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
import org.elasticsearch.xpack.core.ml.action.DeleteDataFrameAnalyticsAction;
@ -55,6 +57,7 @@ import org.elasticsearch.xpack.core.ml.job.config.Detector;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ilm.IndexLifecycle;
import org.elasticsearch.xpack.ml.LocalStateMachineLearning;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.junit.After;
@ -94,6 +97,7 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase {
settings.put(XPackSettings.WATCHER_ENABLED.getKey(), false);
settings.put(XPackSettings.MONITORING_ENABLED.getKey(), false);
settings.put(XPackSettings.GRAPH_ENABLED.getKey(), false);
settings.put(LifecycleSettings.LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING.getKey(), false);
return settings.build();
}
@ -110,8 +114,12 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(LocalStateMachineLearning.class, CommonAnalysisPlugin.class,
ReindexPlugin.class);
return Arrays.asList(
LocalStateMachineLearning.class,
CommonAnalysisPlugin.class,
ReindexPlugin.class,
// ILM is required for .ml-state template index settings
IndexLifecycle.class);
}
@Override