diff --git a/client/rest-high-level/build.gradle b/client/rest-high-level/build.gradle index 82c15c0c544..55f1f9e56c9 100644 --- a/client/rest-high-level/build.gradle +++ b/client/rest-high-level/build.gradle @@ -135,6 +135,7 @@ testClusters.all { setting 'xpack.security.authc.realms.pki.pki1.delegation.enabled', 'true' setting 'indices.lifecycle.poll_interval', '1000ms' + setting 'index.lifecycle.history_index_enabled', 'false' keystore 'xpack.security.transport.ssl.truststore.secure_password', 'testnode' extraConfigFile 'roles.yml', file('roles.yml') user username: System.getProperty('tests.rest.cluster.username', 'test_user'), diff --git a/docs/build.gradle b/docs/build.gradle index f88d68d65f1..fece645fa27 100644 --- a/docs/build.gradle +++ b/docs/build.gradle @@ -59,6 +59,7 @@ testClusters.integTest { extraConfigFile 'hunspell/en_US/en_US.dic', project(":server").file('src/test/resources/indices/analyze/conf_dir/hunspell/en_US/en_US.dic') // Whitelist reindexing from the local node so we can test it. setting 'reindex.remote.whitelist', '127.0.0.1:*' + setting 'index.lifecycle.history_index_enabled', 'false' // TODO: remove this once cname is prepended to transport.publish_address by default in 8.0 systemProperty 'es.transport.cname_in_publish_address', 'true' diff --git a/docs/plugins/analysis-icu.asciidoc b/docs/plugins/analysis-icu.asciidoc index 19a8625f604..d0b95ae7f49 100644 --- a/docs/plugins/analysis-icu.asciidoc +++ b/docs/plugins/analysis-icu.asciidoc @@ -368,7 +368,7 @@ PUT my_index } } -GET _search <3> +GET /my_index/_search <3> { "query": { "match": { diff --git a/docs/reference/indices/rollover-index.asciidoc b/docs/reference/indices/rollover-index.asciidoc index cbc0dfc081e..46a1ae10f59 100644 --- a/docs/reference/indices/rollover-index.asciidoc +++ b/docs/reference/indices/rollover-index.asciidoc @@ -431,7 +431,7 @@ PUT logs/_doc/2 <2> ////////////////////////// [source,console] -------------------------------------------------- -GET _alias +GET my_logs_index-000001,my_logs_index-000002/_alias -------------------------------------------------- // TEST[continued] ////////////////////////// diff --git a/docs/reference/settings/ilm-settings.asciidoc b/docs/reference/settings/ilm-settings.asciidoc index 8781581a6e5..946848017bf 100644 --- a/docs/reference/settings/ilm-settings.asciidoc +++ b/docs/reference/settings/ilm-settings.asciidoc @@ -14,6 +14,11 @@ ILM REST API endpoints and functionality. Defaults to `true`. (<>) How often {ilm} checks for indices that meet policy criteria. Defaults to `10m`. +`index.lifecycle.history_index_enabled`:: +Whether ILM's history index is enabled. If enabled, ILM will record the +history of actions taken as part of ILM policies to the `ilm-history-*` +indices. Defaults to `true`. + ==== Index level settings These index-level {ilm-init} settings are typically configured through index templates. For more information, see <>. diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index 677d7c22cea..5468451066d 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -75,6 +75,7 @@ import java.security.NoSuchAlgorithmException; import java.security.cert.CertificateException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -472,6 +473,13 @@ public abstract class ESRestTestCase extends ESTestCase { return false; } + /** + * A set of ILM policies that should be preserved between runs. + */ + protected Set preserveILMPolicyIds() { + return Collections.singleton("ilm-history-ilm-policy"); + } + /** * Returns whether to preserve auto-follow patterns. Defaults to not * preserving them. Only runs at all if xpack is installed on the cluster @@ -560,7 +568,7 @@ public abstract class ESRestTestCase extends ESTestCase { } if (hasXPack && false == preserveILMPoliciesUponCompletion()) { - deleteAllILMPolicies(); + deleteAllILMPolicies(preserveILMPolicyIds()); } if (hasXPack && false == preserveAutoFollowPatternsUponCompletion()) { @@ -686,7 +694,7 @@ public abstract class ESRestTestCase extends ESTestCase { waitForPendingTasks(adminClient(), taskName -> taskName.startsWith("xpack/rollup/job") == false); } - private static void deleteAllILMPolicies() throws IOException { + private static void deleteAllILMPolicies(Set exclusions) throws IOException { Map policies; try { @@ -704,9 +712,15 @@ public abstract class ESRestTestCase extends ESTestCase { return; } - for (String policyName : policies.keySet()) { - adminClient().performRequest(new Request("DELETE", "/_ilm/policy/" + policyName)); - } + policies.keySet().stream() + .filter(p -> exclusions.contains(p) == false) + .forEach(policyName -> { + try { + adminClient().performRequest(new Request("DELETE", "/_ilm/policy/" + policyName)); + } catch (IOException e) { + throw new RuntimeException("failed to delete policy: " + policyName, e); + } + }); } private static void deleteAllSLMPolicies() throws IOException { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/LifecycleSettings.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/LifecycleSettings.java index 62797eb34a0..3d186892ec9 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/LifecycleSettings.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/LifecycleSettings.java @@ -19,6 +19,7 @@ public class LifecycleSettings { public static final String LIFECYCLE_INDEXING_COMPLETE = "index.lifecycle.indexing_complete"; public static final String LIFECYCLE_ORIGINATION_DATE = "index.lifecycle.origination_date"; public static final String LIFECYCLE_PARSE_ORIGINATION_DATE = "index.lifecycle.parse_origination_date"; + public static final String LIFECYCLE_HISTORY_INDEX_ENABLED = "index.lifecycle.history_index_enabled"; public static final String SLM_HISTORY_INDEX_ENABLED = "slm.history_index_enabled"; public static final String SLM_RETENTION_SCHEDULE = "slm.retention_schedule"; @@ -35,6 +36,8 @@ public class LifecycleSettings { Setting.longSetting(LIFECYCLE_ORIGINATION_DATE, -1, -1, Setting.Property.Dynamic, Setting.Property.IndexScope); public static final Setting LIFECYCLE_PARSE_ORIGINATION_DATE_SETTING = Setting.boolSetting(LIFECYCLE_PARSE_ORIGINATION_DATE, false, Setting.Property.Dynamic, Setting.Property.IndexScope); + public static final Setting LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING = Setting.boolSetting(LIFECYCLE_HISTORY_INDEX_ENABLED, + true, Setting.Property.NodeScope); public static final Setting SLM_HISTORY_INDEX_ENABLED_SETTING = Setting.boolSetting(SLM_HISTORY_INDEX_ENABLED, true, diff --git a/x-pack/plugin/core/src/main/resources/ilm-history-ilm-policy.json b/x-pack/plugin/core/src/main/resources/ilm-history-ilm-policy.json new file mode 100644 index 00000000000..febae00bc36 --- /dev/null +++ b/x-pack/plugin/core/src/main/resources/ilm-history-ilm-policy.json @@ -0,0 +1,18 @@ +{ + "phases": { + "hot": { + "actions": { + "rollover": { + "max_size": "50GB", + "max_age": "30d" + } + } + }, + "delete": { + "min_age": "90d", + "actions": { + "delete": {} + } + } + } +} diff --git a/x-pack/plugin/core/src/main/resources/ilm-history.json b/x-pack/plugin/core/src/main/resources/ilm-history.json new file mode 100644 index 00000000000..ae9c50552b3 --- /dev/null +++ b/x-pack/plugin/core/src/main/resources/ilm-history.json @@ -0,0 +1,83 @@ +{ + "index_patterns": [ + "ilm-history-${xpack.ilm_history.template.version}*" + ], + "order": 2147483647, + "settings": { + "index.number_of_shards": 1, + "index.number_of_replicas": 0, + "index.auto_expand_replicas": "0-1", + "index.lifecycle.name": "ilm-history-ilm-policy", + "index.lifecycle.rollover_alias": "ilm-history-${xpack.ilm_history.template.version}", + "index.format": 1 + }, + "mappings": { + "_doc": { + "dynamic": false, + "properties": { + "@timestamp": { + "type": "date", + "format": "epoch_millis" + }, + "policy": { + "type": "keyword" + }, + "index": { + "type": "keyword" + }, + "index_age":{ + "type": "long" + }, + "success": { + "type": "boolean" + }, + "state": { + "type": "object", + "dynamic": true, + "properties": { + "phase": { + "type": "keyword" + }, + "action": { + "type": "keyword" + }, + "step": { + "type": "keyword" + }, + "failed_step": { + "type": "keyword" + }, + "is_auto-retryable_error": { + "type": "keyword" + }, + "creation_date": { + "type": "date", + "format": "epoch_millis" + }, + "phase_time": { + "type": "date", + "format": "epoch_millis" + }, + "action_time": { + "type": "date", + "format": "epoch_millis" + }, + "step_time": { + "type": "date", + "format": "epoch_millis" + }, + "phase_definition": { + "type": "text" + }, + "step_info": { + "type": "text" + } + } + }, + "error_details": { + "type": "text" + } + } + } + } +} diff --git a/x-pack/plugin/ilm/qa/multi-node/build.gradle b/x-pack/plugin/ilm/qa/multi-node/build.gradle index 6703f281b93..bb0c247b5c8 100644 --- a/x-pack/plugin/ilm/qa/multi-node/build.gradle +++ b/x-pack/plugin/ilm/qa/multi-node/build.gradle @@ -27,4 +27,6 @@ testClusters.integTest { setting 'xpack.ml.enabled', 'false' setting 'xpack.license.self_generated.type', 'trial' setting 'indices.lifecycle.poll_interval', '1000ms' + setting 'logger.org.elasticsearch.xpack.core.ilm', 'TRACE' + setting 'logger.org.elasticsearch.xpack.ilm', 'TRACE' } diff --git a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java index 169b613f22a..682e13b6d37 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java @@ -8,13 +8,14 @@ package org.elasticsearch.xpack.ilm; import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.RestClient; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -37,8 +38,10 @@ import org.elasticsearch.xpack.core.ilm.RolloverAction; import org.elasticsearch.xpack.core.ilm.SetPriorityAction; import org.elasticsearch.xpack.core.ilm.ShrinkAction; import org.elasticsearch.xpack.core.ilm.ShrinkStep; +import org.elasticsearch.xpack.core.ilm.Step; import org.elasticsearch.xpack.core.ilm.Step.StepKey; import org.elasticsearch.xpack.core.ilm.TerminalPolicyStep; +import org.elasticsearch.xpack.core.ilm.WaitForRolloverReadyStep; import org.hamcrest.Matchers; import org.junit.Before; @@ -1010,6 +1013,171 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase { assertBusy(() -> assertTrue(indexExists(thirdIndex))); } + public void testHistoryIsWrittenWithSuccess() throws Exception { + String index = "index"; + + createNewSingletonPolicy("hot", new RolloverAction(null, null, 1L)); + Request createIndexTemplate = new Request("PUT", "_template/rolling_indexes"); + createIndexTemplate.setJsonEntity("{" + + "\"index_patterns\": [\""+ index + "-*\"], \n" + + " \"settings\": {\n" + + " \"number_of_shards\": 1,\n" + + " \"number_of_replicas\": 0,\n" + + " \"index.lifecycle.name\": \"" + policy+ "\",\n" + + " \"index.lifecycle.rollover_alias\": \"alias\"\n" + + " }\n" + + "}"); + client().performRequest(createIndexTemplate); + + createIndexWithSettings(index + "-1", + Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0), + true); + + // Index a document + index(client(), index + "-1", "1", "foo", "bar"); + Request refreshIndex = new Request("POST", "/" + index + "-1/_refresh"); + client().performRequest(refreshIndex); + + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "wait-for-indexing-complete"), 30, TimeUnit.SECONDS); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "wait-for-follow-shard-tasks"), 30, TimeUnit.SECONDS); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "pause-follower-index"), 30, TimeUnit.SECONDS); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "close-follower-index"), 30, TimeUnit.SECONDS); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "unfollow-follower-index"), 30, TimeUnit.SECONDS); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "open-follower-index"), 30, TimeUnit.SECONDS); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "wait-for-yellow-step"), 30, TimeUnit.SECONDS); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "check-rollover-ready"), 30, TimeUnit.SECONDS); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "attempt-rollover"), 30, TimeUnit.SECONDS); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "update-rollover-lifecycle-date"), 30, TimeUnit.SECONDS); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "set-indexing-complete"), 30, TimeUnit.SECONDS); + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "completed"), 30, TimeUnit.SECONDS); + + assertBusy(() -> assertHistoryIsPresent(policy, index + "-000002", true, "check-rollover-ready"), 30, TimeUnit.SECONDS); + } + + public void testHistoryIsWrittenWithFailure() throws Exception { + String index = "index"; + + createNewSingletonPolicy("hot", new RolloverAction(null, null, 1L)); + Request createIndexTemplate = new Request("PUT", "_template/rolling_indexes"); + createIndexTemplate.setJsonEntity("{" + + "\"index_patterns\": [\""+ index + "-*\"], \n" + + " \"settings\": {\n" + + " \"number_of_shards\": 1,\n" + + " \"number_of_replicas\": 0,\n" + + " \"index.lifecycle.name\": \"" + policy+ "\"\n" + + " }\n" + + "}"); + client().performRequest(createIndexTemplate); + + createIndexWithSettings(index + "-1", + Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0), + false); + + // Index a document + index(client(), index + "-1", "1", "foo", "bar"); + Request refreshIndex = new Request("POST", "/" + index + "-1/_refresh"); + client().performRequest(refreshIndex); + + assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", false, "ERROR"), 30, TimeUnit.SECONDS); + } + + public void testHistoryIsWrittenWithDeletion() throws Exception { + String index = "index"; + + createNewSingletonPolicy("delete", new DeleteAction()); + Request createIndexTemplate = new Request("PUT", "_template/delete_indexes"); + createIndexTemplate.setJsonEntity("{" + + "\"index_patterns\": [\""+ index + "\"], \n" + + " \"settings\": {\n" + + " \"number_of_shards\": 1,\n" + + " \"number_of_replicas\": 0,\n" + + " \"index.lifecycle.name\": \"" + policy+ "\"\n" + + " }\n" + + "}"); + client().performRequest(createIndexTemplate); + + // Index should be created and then deleted by ILM + createIndexWithSettings(index, Settings.builder(), false); + + assertBusy(() -> { + logger.info("--> checking for index deletion..."); + Request existCheck = new Request("HEAD", "/" + index); + Response resp = client().performRequest(existCheck); + assertThat(resp.getStatusLine().getStatusCode(), equalTo(404)); + }); + + assertBusy(() -> { + assertHistoryIsPresent(policy, index, true, "delete", "delete", "wait-for-shard-history-leases"); + assertHistoryIsPresent(policy, index, true, "delete", "delete", "complete"); + }, 30, TimeUnit.SECONDS); + } + + // This method should be called inside an assertBusy, it has no retry logic of its own + private void assertHistoryIsPresent(String policyName, String indexName, boolean success, String stepName) throws IOException { + assertHistoryIsPresent(policyName, indexName, success, null, null, stepName); + } + + // This method should be called inside an assertBusy, it has no retry logic of its own + private void assertHistoryIsPresent(String policyName, String indexName, boolean success, + @Nullable String phase, @Nullable String action, String stepName) throws IOException { + logger.info("--> checking for history item [{}], [{}], success: [{}], phase: [{}], action: [{}], step: [{}]", + policyName, indexName, success, phase, action, stepName); + final Request historySearchRequest = new Request("GET", "ilm-history*/_search"); + historySearchRequest.setJsonEntity("{\n" + + " \"query\": {\n" + + " \"bool\": {\n" + + " \"must\": [\n" + + " {\n" + + " \"term\": {\n" + + " \"policy\": \"" + policyName + "\"\n" + + " }\n" + + " },\n" + + " {\n" + + " \"term\": {\n" + + " \"success\": " + success + "\n" + + " }\n" + + " },\n" + + " {\n" + + " \"term\": {\n" + + " \"index\": \"" + indexName + "\"\n" + + " }\n" + + " },\n" + + " {\n" + + " \"term\": {\n" + + " \"state.step\": \"" + stepName + "\"\n" + + " }\n" + + " }\n" + + (phase == null ? "" : ",{\"term\": {\"state.phase\": \"" + phase + "\"}}") + + (action == null ? "" : ",{\"term\": {\"state.action\": \"" + action + "\"}}") + + " ]\n" + + " }\n" + + " }\n" + + "}"); + Response historyResponse; + try { + historyResponse = client().performRequest(historySearchRequest); + Map historyResponseMap; + try (InputStream is = historyResponse.getEntity().getContent()) { + historyResponseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true); + } + logger.info("--> history response: {}", historyResponseMap); + assertThat((int)((Map) ((Map) historyResponseMap.get("hits")).get("total")).get("value"), + greaterThanOrEqualTo(1)); + } catch (ResponseException e) { + // Throw AssertionError instead of an exception if the search fails so that assertBusy works as expected + logger.error(e); + fail("failed to perform search:" + e.getMessage()); + } + + // Finally, check that the history index is in a good state + Step.StepKey stepKey = getStepKeyForIndex("ilm-history-1-000001"); + assertEquals("hot", stepKey.getPhase()); + assertEquals(RolloverAction.NAME, stepKey.getAction()); + assertEquals(WaitForRolloverReadyStep.NAME, stepKey.getName()); + } + private void createFullPolicy(TimeValue hotTime) throws IOException { Map hotActions = new HashMap<>(); hotActions.put(SetPriorityAction.NAME, new SetPriorityAction(100)); diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/ExecuteStepsUpdateTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/ExecuteStepsUpdateTask.java index 46364f7cb40..b97944fe67b 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/ExecuteStepsUpdateTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/ExecuteStepsUpdateTask.java @@ -12,10 +12,13 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.index.Index; import org.elasticsearch.xpack.core.ilm.ClusterStateActionStep; import org.elasticsearch.xpack.core.ilm.ClusterStateWaitStep; +import org.elasticsearch.xpack.core.ilm.ErrorStep; +import org.elasticsearch.xpack.core.ilm.LifecycleExecutionState; import org.elasticsearch.xpack.core.ilm.Step; import org.elasticsearch.xpack.core.ilm.TerminalPolicyStep; @@ -29,8 +32,9 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask { private final Step startStep; private final PolicyStepsRegistry policyStepsRegistry; private final IndexLifecycleRunner lifecycleRunner; - private LongSupplier nowSupplier; + private final LongSupplier nowSupplier; private Step.StepKey nextStepKey = null; + private Exception failure = null; public ExecuteStepsUpdateTask(String policy, Index index, Step startStep, PolicyStepsRegistry policyStepsRegistry, IndexLifecycleRunner lifecycleRunner, LongSupplier nowSupplier) { @@ -115,7 +119,7 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask { // wait for the next trigger to evaluate the // condition again logger.trace("[{}] waiting for cluster state step condition ({}) [{}], next: [{}]", - index.getName(), currentStep.getClass().getSimpleName(), currentStep.getKey(), currentStep.getNextStepKey()); + index.getName(), currentStep.getClass().getSimpleName(), currentStep.getKey(), nextStepKey); ClusterStateWaitStep.Result result; try { result = ((ClusterStateWaitStep) currentStep).isConditionMet(index, state); @@ -124,22 +128,25 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask { } if (result.isComplete()) { logger.trace("[{}] cluster state step condition met successfully ({}) [{}], moving to next step {}", - index.getName(), currentStep.getClass().getSimpleName(), currentStep.getKey(), currentStep.getNextStepKey()); - if (currentStep.getNextStepKey() == null) { + index.getName(), currentStep.getClass().getSimpleName(), currentStep.getKey(), nextStepKey); + if (nextStepKey == null) { return state; } else { state = IndexLifecycleTransition.moveClusterStateToStep(index, state, - currentStep.getNextStepKey(), nowSupplier, policyStepsRegistry,false); + nextStepKey, nowSupplier, policyStepsRegistry,false); } } else { - logger.trace("[{}] condition not met ({}) [{}], returning existing state", - index.getName(), currentStep.getClass().getSimpleName(), currentStep.getKey()); + final ToXContentObject stepInfo = result.getInfomationContext(); + if (logger.isTraceEnabled()) { + logger.trace("[{}] condition not met ({}) [{}], returning existing state (info: {})", + index.getName(), currentStep.getClass().getSimpleName(), currentStep.getKey(), + Strings.toString(stepInfo)); + } // We may have executed a step and set "nextStepKey" to // a value, but in this case, since the condition was // not met, we can't advance any way, so don't attempt // to run the current step nextStepKey = null; - ToXContentObject stepInfo = result.getInfomationContext(); if (stepInfo == null) { return state; } else { @@ -169,13 +176,23 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask { public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { if (oldState.equals(newState) == false) { IndexMetaData indexMetaData = newState.metaData().index(index); - if (nextStepKey != null && nextStepKey != TerminalPolicyStep.KEY && indexMetaData != null) { - logger.trace("[{}] step sequence starting with {} has completed, running next step {} if it is an async action", - index.getName(), startStep.getKey(), nextStepKey); - // After the cluster state has been processed and we have moved - // to a new step, we need to conditionally execute the step iff - // it is an `AsyncAction` so that it is executed exactly once. - lifecycleRunner.maybeRunAsyncAction(newState, indexMetaData, policy, nextStepKey); + if (indexMetaData != null) { + + LifecycleExecutionState exState = LifecycleExecutionState.fromIndexMetadata(indexMetaData); + if (ErrorStep.NAME.equals(exState.getStep()) && this.failure != null) { + lifecycleRunner.registerFailedOperation(indexMetaData, failure); + } else { + lifecycleRunner.registerSuccessfulOperation(indexMetaData); + } + + if (nextStepKey != null && nextStepKey != TerminalPolicyStep.KEY) { + logger.trace("[{}] step sequence starting with {} has completed, running next step {} if it is an async action", + index.getName(), startStep.getKey(), nextStepKey); + // After the cluster state has been processed and we have moved + // to a new step, we need to conditionally execute the step iff + // it is an `AsyncAction` so that it is executed exactly once. + lifecycleRunner.maybeRunAsyncAction(newState, indexMetaData, policy, nextStepKey); + } } } } @@ -187,10 +204,9 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask { } private ClusterState moveToErrorStep(final ClusterState state, Step.StepKey currentStepKey, Exception cause) throws IOException { + this.failure = cause; logger.error("policy [{}] for index [{}] failed on cluster state step [{}]. Moving to ERROR step", policy, index.getName(), currentStepKey); - MoveToErrorStepUpdateTask moveToErrorStepUpdateTask = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, cause, - nowSupplier, policyStepsRegistry::getStep); - return moveToErrorStepUpdateTask.execute(state); + return IndexLifecycleTransition.moveClusterStateToErrorStep(index, state, cause, nowSupplier, policyStepsRegistry::getStep); } } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java index 8143f05d770..947d1752dad 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java @@ -94,6 +94,8 @@ import org.elasticsearch.xpack.ilm.action.TransportRemoveIndexLifecyclePolicyAct import org.elasticsearch.xpack.ilm.action.TransportRetryAction; import org.elasticsearch.xpack.ilm.action.TransportStartILMAction; import org.elasticsearch.xpack.ilm.action.TransportStopILMAction; +import org.elasticsearch.xpack.ilm.history.ILMHistoryStore; +import org.elasticsearch.xpack.ilm.history.ILMHistoryTemplateRegistry; import org.elasticsearch.xpack.slm.SLMFeatureSet; import org.elasticsearch.xpack.slm.SnapshotLifecycleService; import org.elasticsearch.xpack.slm.SnapshotLifecycleTask; @@ -131,6 +133,7 @@ import static org.elasticsearch.xpack.core.ClientHelper.INDEX_LIFECYCLE_ORIGIN; public class IndexLifecycle extends Plugin implements ActionPlugin { private final SetOnce indexLifecycleInitialisationService = new SetOnce<>(); + private final SetOnce ilmHistoryStore = new SetOnce<>(); private final SetOnce snapshotLifecycleService = new SetOnce<>(); private final SetOnce snapshotRetentionService = new SetOnce<>(); private final SetOnce snapshotHistoryStore = new SetOnce<>(); @@ -172,6 +175,7 @@ public class IndexLifecycle extends Plugin implements ActionPlugin { LifecycleSettings.LIFECYCLE_ORIGINATION_DATE_SETTING, LifecycleSettings.LIFECYCLE_PARSE_ORIGINATION_DATE_SETTING, LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE_SETTING, + LifecycleSettings.LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING, RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING, LifecycleSettings.SLM_HISTORY_INDEX_ENABLED_SETTING, LifecycleSettings.SLM_RETENTION_SCHEDULE_SETTING, @@ -188,8 +192,14 @@ public class IndexLifecycle extends Plugin implements ActionPlugin { } final List components = new ArrayList<>(); if (ilmEnabled) { + // This registers a cluster state listener, so appears unused but is not. + @SuppressWarnings("unused") + ILMHistoryTemplateRegistry ilmTemplateRegistry = + new ILMHistoryTemplateRegistry(settings, clusterService, threadPool, client, xContentRegistry); + ilmHistoryStore.set(new ILMHistoryStore(settings, new OriginSettingClient(client, INDEX_LIFECYCLE_ORIGIN), + clusterService, threadPool)); indexLifecycleInitialisationService.set(new IndexLifecycleService(settings, client, clusterService, threadPool, - getClock(), System::currentTimeMillis, xContentRegistry)); + getClock(), System::currentTimeMillis, xContentRegistry, ilmHistoryStore.get())); components.add(indexLifecycleInitialisationService.get()); } if (slmEnabled) { @@ -317,7 +327,8 @@ public class IndexLifecycle extends Plugin implements ActionPlugin { @Override public void close() { try { - IOUtils.close(indexLifecycleInitialisationService.get(), snapshotLifecycleService.get(), snapshotRetentionService.get()); + IOUtils.close(indexLifecycleInitialisationService.get(), ilmHistoryStore.get(), + snapshotLifecycleService.get(), snapshotRetentionService.get()); } catch (IOException e) { throw new ElasticsearchException("unable to close index lifecycle services", e); } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java index d8eaf2c53c5..15bde507c3a 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java @@ -13,6 +13,7 @@ import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.index.Index; @@ -23,10 +24,13 @@ import org.elasticsearch.xpack.core.ilm.ClusterStateActionStep; import org.elasticsearch.xpack.core.ilm.ClusterStateWaitStep; import org.elasticsearch.xpack.core.ilm.ErrorStep; import org.elasticsearch.xpack.core.ilm.LifecycleExecutionState; +import org.elasticsearch.xpack.core.ilm.LifecycleSettings; import org.elasticsearch.xpack.core.ilm.PhaseCompleteStep; import org.elasticsearch.xpack.core.ilm.Step; import org.elasticsearch.xpack.core.ilm.Step.StepKey; import org.elasticsearch.xpack.core.ilm.TerminalPolicyStep; +import org.elasticsearch.xpack.ilm.history.ILMHistoryItem; +import org.elasticsearch.xpack.ilm.history.ILMHistoryStore; import java.util.function.LongSupplier; @@ -35,13 +39,15 @@ import static org.elasticsearch.xpack.core.ilm.LifecycleSettings.LIFECYCLE_ORIGI class IndexLifecycleRunner { private static final Logger logger = LogManager.getLogger(IndexLifecycleRunner.class); private final ThreadPool threadPool; - private PolicyStepsRegistry stepRegistry; - private ClusterService clusterService; - private LongSupplier nowSupplier; + private final ClusterService clusterService; + private final PolicyStepsRegistry stepRegistry; + private final ILMHistoryStore ilmHistoryStore; + private final LongSupplier nowSupplier; - IndexLifecycleRunner(PolicyStepsRegistry stepRegistry, ClusterService clusterService, + IndexLifecycleRunner(PolicyStepsRegistry stepRegistry, ILMHistoryStore ilmHistoryStore, ClusterService clusterService, ThreadPool threadPool, LongSupplier nowSupplier) { this.stepRegistry = stepRegistry; + this.ilmHistoryStore = ilmHistoryStore; this.clusterService = clusterService; this.nowSupplier = nowSupplier; this.threadPool = threadPool; @@ -62,17 +68,29 @@ class IndexLifecycleRunner { } /** - * Return true or false depending on whether the index is ready to be in {@code phase} + * Calculate the index's origination time (in milliseconds) based on its + * metadata. Returns null if there is no lifecycle date and the origination + * date is not set. */ - boolean isReadyToTransitionToThisPhase(final String policy, final IndexMetaData indexMetaData, final String phase) { + @Nullable + private static Long calculateOriginationMillis(final IndexMetaData indexMetaData) { LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(indexMetaData); Long originationDate = indexMetaData.getSettings().getAsLong(LIFECYCLE_ORIGINATION_DATE, -1L); if (lifecycleState.getLifecycleDate() == null && originationDate == -1L) { - logger.trace("no index creation or origination date has been set yet"); + return null; + } + return originationDate == -1L ? lifecycleState.getLifecycleDate() : originationDate; + } + + /** + * Return true or false depending on whether the index is ready to be in {@code phase} + */ + boolean isReadyToTransitionToThisPhase(final String policy, final IndexMetaData indexMetaData, final String phase) { + final Long lifecycleDate = calculateOriginationMillis(indexMetaData); + if (lifecycleDate == null) { + logger.trace("[{}] no index creation or origination date has been set yet", indexMetaData.getIndex().getName()); return true; } - final Long lifecycleDate = originationDate != -1L ? originationDate : lifecycleState.getLifecycleDate(); - assert lifecycleDate != null && lifecycleDate >= 0 : "expected index to have a lifecycle date but it did not"; final TimeValue after = stepRegistry.getIndexAgeForPhase(policy, phase); final long now = nowSupplier.getAsLong(); final TimeValue age = new TimeValue(now - lifecycleDate); @@ -221,19 +239,26 @@ class IndexLifecycleRunner { ((AsyncActionStep) currentStep).performAction(indexMetaData, currentState, new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext()), new AsyncActionStep.Listener() { - @Override - public void onResponse(boolean complete) { - logger.trace("cs-change-async-action-callback, [{}], current-step: {}", index, currentStep.getKey()); - if (complete && ((AsyncActionStep) currentStep).indexSurvives()) { - moveToStep(indexMetaData.getIndex(), policy, currentStep.getKey(), currentStep.getNextStepKey()); + @Override + public void onResponse(boolean complete) { + logger.trace("cs-change-async-action-callback, [{}], current-step: {}", index, currentStep.getKey()); + if (complete) { + if (((AsyncActionStep) currentStep).indexSurvives()) { + moveToStep(indexMetaData.getIndex(), policy, currentStep.getKey(), currentStep.getNextStepKey()); + } else { + // Delete needs special handling, because after this step we + // will no longer have access to any information about the + // index since it will be... deleted. + registerDeleteOperation(indexMetaData); + } + } } - } - @Override - public void onFailure(Exception e) { - moveToErrorStep(indexMetaData.getIndex(), policy, currentStep.getKey(), e); - } - }); + @Override + public void onFailure(Exception e) { + moveToErrorStep(indexMetaData.getIndex(), policy, currentStep.getKey(), e); + } + }); } else { logger.trace("[{}] ignoring non async action step execution from step transition [{}]", index, currentStep.getKey()); } @@ -298,6 +323,7 @@ class IndexLifecycleRunner { new MoveToNextStepUpdateTask(index, policy, currentStepKey, newStepKey, nowSupplier, stepRegistry, clusterState -> { IndexMetaData indexMetaData = clusterState.metaData().index(index); + registerSuccessfulOperation(indexMetaData); if (newStepKey != null && newStepKey != TerminalPolicyStep.KEY && indexMetaData != null) { maybeRunAsyncAction(clusterState, indexMetaData, policy, newStepKey); } @@ -311,7 +337,10 @@ class IndexLifecycleRunner { logger.error(new ParameterizedMessage("policy [{}] for index [{}] failed on step [{}]. Moving to ERROR step", policy, index.getName(), currentStepKey), e); clusterService.submitStateUpdateTask("ilm-move-to-error-step", - new MoveToErrorStepUpdateTask(index, policy, currentStepKey, e, nowSupplier, stepRegistry::getStep)); + new MoveToErrorStepUpdateTask(index, policy, currentStepKey, e, nowSupplier, stepRegistry::getStep, clusterState -> { + IndexMetaData indexMetaData = clusterState.metaData().index(index); + registerFailedOperation(indexMetaData, e); + })); } /** @@ -343,4 +372,61 @@ class IndexLifecycleRunner { setStepInfo(index, policyName, LifecycleExecutionState.getCurrentStepKey(executionState), new SetStepInfoUpdateTask.ExceptionWrapper(e)); } + + /** + * For the given index metadata, register (index a document) that the index has transitioned + * successfully into this new state using the {@link ILMHistoryStore} + */ + void registerSuccessfulOperation(IndexMetaData indexMetaData) { + if (indexMetaData == null) { + // This index may have been deleted and has no metadata, so ignore it + return; + } + Long origination = calculateOriginationMillis(indexMetaData); + ilmHistoryStore.putAsync( + ILMHistoryItem.success(indexMetaData.getIndex().getName(), + LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexMetaData.getSettings()), + nowSupplier.getAsLong(), + origination == null ? null : (nowSupplier.getAsLong() - origination), + LifecycleExecutionState.fromIndexMetadata(indexMetaData))); + } + + /** + * For the given index metadata, register (index a document) that the index + * has been deleted by ILM using the {@link ILMHistoryStore} + */ + void registerDeleteOperation(IndexMetaData metadataBeforeDeletion) { + if (metadataBeforeDeletion == null) { + throw new IllegalStateException("cannot register deletion of an index that did not previously exist"); + } + Long origination = calculateOriginationMillis(metadataBeforeDeletion); + ilmHistoryStore.putAsync( + ILMHistoryItem.success(metadataBeforeDeletion.getIndex().getName(), + LifecycleSettings.LIFECYCLE_NAME_SETTING.get(metadataBeforeDeletion.getSettings()), + nowSupplier.getAsLong(), + origination == null ? null : (nowSupplier.getAsLong() - origination), + LifecycleExecutionState.builder(LifecycleExecutionState.fromIndexMetadata(metadataBeforeDeletion)) + // Register that the delete phase is now "complete" + .setStep(PhaseCompleteStep.NAME) + .build())); + } + + /** + * For the given index metadata, register (index a document) that the index has transitioned + * into the ERROR state using the {@link ILMHistoryStore} + */ + void registerFailedOperation(IndexMetaData indexMetaData, Exception failure) { + if (indexMetaData == null) { + // This index may have been deleted and has no metadata, so ignore it + return; + } + Long origination = calculateOriginationMillis(indexMetaData); + ilmHistoryStore.putAsync( + ILMHistoryItem.failure(indexMetaData.getIndex().getName(), + LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexMetaData.getSettings()), + nowSupplier.getAsLong(), + origination == null ? null : (nowSupplier.getAsLong() - origination), + LifecycleExecutionState.fromIndexMetadata(indexMetaData), + failure)); + } } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java index f116f9de087..a5ae4d4673a 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java @@ -35,6 +35,7 @@ import org.elasticsearch.xpack.core.ilm.OperationMode; import org.elasticsearch.xpack.core.ilm.ShrinkStep; import org.elasticsearch.xpack.core.ilm.Step.StepKey; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; +import org.elasticsearch.xpack.ilm.history.ILMHistoryStore; import java.io.Closeable; import java.time.Clock; @@ -59,21 +60,24 @@ public class IndexLifecycleService private final Clock clock; private final PolicyStepsRegistry policyRegistry; private final IndexLifecycleRunner lifecycleRunner; + private final ILMHistoryStore ilmHistoryStore; private final Settings settings; private ClusterService clusterService; private LongSupplier nowSupplier; private SchedulerEngine.Job scheduledJob; public IndexLifecycleService(Settings settings, Client client, ClusterService clusterService, ThreadPool threadPool, Clock clock, - LongSupplier nowSupplier, NamedXContentRegistry xContentRegistry) { + LongSupplier nowSupplier, NamedXContentRegistry xContentRegistry, + ILMHistoryStore ilmHistoryStore) { super(); this.settings = settings; this.clusterService = clusterService; this.clock = clock; this.nowSupplier = nowSupplier; this.scheduledJob = null; + this.ilmHistoryStore = ilmHistoryStore; this.policyRegistry = new PolicyStepsRegistry(xContentRegistry, client); - this.lifecycleRunner = new IndexLifecycleRunner(policyRegistry, clusterService, threadPool, nowSupplier); + this.lifecycleRunner = new IndexLifecycleRunner(policyRegistry, ilmHistoryStore, clusterService, threadPool, nowSupplier); this.pollInterval = LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING.get(settings); clusterService.addStateApplier(this); clusterService.addListener(this); diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/MoveToErrorStepUpdateTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/MoveToErrorStepUpdateTask.java index 1b80e070c55..ae05d210213 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/MoveToErrorStepUpdateTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/MoveToErrorStepUpdateTask.java @@ -17,6 +17,7 @@ import org.elasticsearch.xpack.core.ilm.Step; import java.io.IOException; import java.util.function.BiFunction; +import java.util.function.Consumer; import java.util.function.LongSupplier; public class MoveToErrorStepUpdateTask extends ClusterStateUpdateTask { @@ -24,17 +25,20 @@ public class MoveToErrorStepUpdateTask extends ClusterStateUpdateTask { private final String policy; private final Step.StepKey currentStepKey; private final BiFunction stepLookupFunction; + private final Consumer stateChangeConsumer; private LongSupplier nowSupplier; private Exception cause; public MoveToErrorStepUpdateTask(Index index, String policy, Step.StepKey currentStepKey, Exception cause, LongSupplier nowSupplier, - BiFunction stepLookupFunction) { + BiFunction stepLookupFunction, + Consumer stateChangeConsumer) { this.index = index; this.policy = policy; this.currentStepKey = currentStepKey; this.cause = cause; this.nowSupplier = nowSupplier; this.stepLookupFunction = stepLookupFunction; + this.stateChangeConsumer = stateChangeConsumer; } Index getIndex() { @@ -73,6 +77,13 @@ public class MoveToErrorStepUpdateTask extends ClusterStateUpdateTask { } } + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + if (newState.equals(oldState) == false) { + stateChangeConsumer.accept(newState); + } + } + @Override public void onFailure(String source, Exception e) { throw new ElasticsearchException("policy [" + policy + "] for index [" + index.getName() diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryItem.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryItem.java new file mode 100644 index 00000000000..e972613e3e9 --- /dev/null +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryItem.java @@ -0,0 +1,115 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ilm.history; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.xpack.core.ilm.LifecycleExecutionState; + +import java.io.IOException; +import java.util.Collections; +import java.util.Objects; + +import static org.elasticsearch.ElasticsearchException.REST_EXCEPTION_SKIP_STACK_TRACE; + +/** + * The {@link ILMHistoryItem} class encapsulates the state of an index at a point in time. It should + * be constructed when an index has transitioned into a new step. Construction is done through the + * {@link #success(String, String, long, Long, LifecycleExecutionState)} and + * {@link #failure(String, String, long, Long, LifecycleExecutionState, Exception)} methods. + */ +public class ILMHistoryItem implements ToXContentObject { + private static final ParseField INDEX = new ParseField("index"); + private static final ParseField POLICY = new ParseField("policy"); + private static final ParseField TIMESTAMP = new ParseField("@timestamp"); + private static final ParseField INDEX_AGE = new ParseField("index_age"); + private static final ParseField SUCCESS = new ParseField("success"); + private static final ParseField EXECUTION_STATE = new ParseField("state"); + private static final ParseField ERROR = new ParseField("error_details"); + + private final String index; + private final String policyId; + private final long timestamp; + @Nullable + private final Long indexAge; + private final boolean success; + @Nullable + private final LifecycleExecutionState executionState; + @Nullable + private final String errorDetails; + + private ILMHistoryItem(String index, String policyId, long timestamp, @Nullable Long indexAge, boolean success, + @Nullable LifecycleExecutionState executionState, @Nullable String errorDetails) { + this.index = index; + this.policyId = policyId; + this.timestamp = timestamp; + this.indexAge = indexAge; + this.success = success; + this.executionState = executionState; + this.errorDetails = errorDetails; + } + + public static ILMHistoryItem success(String index, String policyId, long timestamp, @Nullable Long indexAge, + @Nullable LifecycleExecutionState executionState) { + return new ILMHistoryItem(index, policyId, timestamp, indexAge, true, executionState, null); + } + + public static ILMHistoryItem failure(String index, String policyId, long timestamp, @Nullable Long indexAge, + @Nullable LifecycleExecutionState executionState, Exception error) { + Objects.requireNonNull(error, "ILM failures require an attached exception"); + return new ILMHistoryItem(index, policyId, timestamp, indexAge, false, executionState, exceptionToString(error)); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(INDEX.getPreferredName(), index); + builder.field(POLICY.getPreferredName(), policyId); + builder.field(TIMESTAMP.getPreferredName(), timestamp); + if (indexAge != null) { + builder.field(INDEX_AGE.getPreferredName(), indexAge); + } + builder.field(SUCCESS.getPreferredName(), success); + if (executionState != null) { + builder.field(EXECUTION_STATE.getPreferredName(), executionState.asMap()); + } + if (errorDetails != null) { + builder.field(ERROR.getPreferredName(), errorDetails); + } + builder.endObject(); + return builder; + } + + private static String exceptionToString(Exception exception) { + Params stacktraceParams = new MapParams(Collections.singletonMap(REST_EXCEPTION_SKIP_STACK_TRACE, "false")); + String exceptionString; + try (XContentBuilder causeXContentBuilder = JsonXContent.contentBuilder()) { + causeXContentBuilder.startObject(); + ElasticsearchException.generateThrowableXContent(causeXContentBuilder, stacktraceParams, exception); + causeXContentBuilder.endObject(); + exceptionString = BytesReference.bytes(causeXContentBuilder).utf8ToString(); + } catch (IOException e) { + // In the unlikely case that we cannot generate an exception string, + // try the best way can to encapsulate the error(s) with at least + // the message + exceptionString = "unable to generate the ILM error details due to: " + e.getMessage() + + "; the ILM error was: " + exception.getMessage(); + } + return exceptionString; + } + + @Override + public String toString() { + return Strings.toString(this); + } +} diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStore.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStore.java new file mode 100644 index 00000000000..96c54e5adfc --- /dev/null +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStore.java @@ -0,0 +1,226 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ilm.history; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ResourceAlreadyExistsException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.alias.Alias; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.bulk.BackoffPolicy; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.OriginSettingClient; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.AliasOrIndex; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.elasticsearch.xpack.core.ClientHelper.INDEX_LIFECYCLE_ORIGIN; +import static org.elasticsearch.xpack.core.ilm.LifecycleSettings.LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING; +import static org.elasticsearch.xpack.ilm.history.ILMHistoryTemplateRegistry.INDEX_TEMPLATE_VERSION; + +/** + * The {@link ILMHistoryStore} handles indexing {@link ILMHistoryItem} documents into the + * appropriate index. It sets up a {@link BulkProcessor} for indexing in bulk, and handles creation + * of the index/alias as needed for ILM policies. + */ +public class ILMHistoryStore implements Closeable { + private static final Logger logger = LogManager.getLogger(ILMHistoryStore.class); + + public static final String ILM_HISTORY_INDEX_PREFIX = "ilm-history-" + INDEX_TEMPLATE_VERSION + "-"; + public static final String ILM_HISTORY_ALIAS = "ilm-history-" + INDEX_TEMPLATE_VERSION; + + private final boolean ilmHistoryEnabled; + private final BulkProcessor processor; + private final ThreadPool threadPool; + + public ILMHistoryStore(Settings nodeSettings, Client client, ClusterService clusterService, ThreadPool threadPool) { + this.ilmHistoryEnabled = LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING.get(nodeSettings); + this.threadPool = threadPool; + + this.processor = BulkProcessor.builder( + new OriginSettingClient(client, INDEX_LIFECYCLE_ORIGIN)::bulk, + new BulkProcessor.Listener() { + @Override + public void beforeBulk(long executionId, BulkRequest request) { + // Prior to actually performing the bulk, we should ensure the index exists, and + // if we were unable to create it or it was in a bad state, we should not + // attempt to index documents. + try { + final CompletableFuture indexCreated = new CompletableFuture<>(); + ensureHistoryIndex(client, clusterService.state(), ActionListener.wrap(indexCreated::complete, + ex -> { + logger.warn("failed to create ILM history store index prior to issuing bulk request", ex); + indexCreated.completeExceptionally(ex); + })); + indexCreated.get(2, TimeUnit.MINUTES); + } catch (Exception e) { + logger.warn(new ParameterizedMessage("unable to index the following ILM history items:\n{}", + request.requests().stream() + .filter(dwr -> (dwr instanceof IndexRequest)) + .map(dwr -> ((IndexRequest) dwr)) + .map(IndexRequest::sourceAsMap) + .map(Object::toString) + .collect(Collectors.joining("\n"))), e); + throw new ElasticsearchException(e); + } + } + + @Override + public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + long items = request.numberOfActions(); + if (logger.isTraceEnabled()) { + logger.trace("indexed [{}] items into ILM history index [{}]", items, + Arrays.stream(response.getItems()) + .map(BulkItemResponse::getIndex) + .distinct() + .collect(Collectors.joining(","))); + } + if (response.hasFailures()) { + Map failures = Arrays.stream(response.getItems()) + .filter(BulkItemResponse::isFailed) + .collect(Collectors.toMap(BulkItemResponse::getId, BulkItemResponse::getFailureMessage)); + logger.error("failures: [{}]", failures); + } + } + + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + long items = request.numberOfActions(); + logger.error(new ParameterizedMessage("failed to index {} items into ILM history index", items), failure); + } + }) + .setBulkActions(100) + .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) + .setFlushInterval(TimeValue.timeValueSeconds(5)) + .setConcurrentRequests(1) + .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(1000), 3)) + .build(); + } + + /** + * Attempts to asynchronously index an ILM history entry + */ + public void putAsync(ILMHistoryItem item) { + if (ilmHistoryEnabled == false) { + logger.trace("not recording ILM history item because [{}] is [false]: [{}]", + LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING.getKey(), item); + return; + } + logger.trace("queueing ILM history item for indexing [{}]: [{}]", ILM_HISTORY_ALIAS, item); + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { + item.toXContent(builder, ToXContent.EMPTY_PARAMS); + IndexRequest request = new IndexRequest(ILM_HISTORY_ALIAS).source(builder); + // TODO: remove the threadpool wrapping when the .add call is non-blocking + // (it can currently execute the bulk request occasionally) + // see: https://github.com/elastic/elasticsearch/issues/50440 + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { + try { + processor.add(request); + } catch (Exception e) { + logger.error(new ParameterizedMessage("failed add ILM history item to queue for index [{}]: [{}]", + ILM_HISTORY_ALIAS, item), e); + } + }); + } catch (IOException exception) { + logger.error(new ParameterizedMessage("failed to queue ILM history item in index [{}]: [{}]", + ILM_HISTORY_ALIAS, item), exception); + } + } + + /** + * Checks if the ILM history index exists, and if not, creates it. + * + * @param client The client to use to create the index if needed + * @param state The current cluster state, to determine if the alias exists + * @param listener Called after the index has been created. `onResponse` called with `true` if the index was created, + * `false` if it already existed. + */ + static void ensureHistoryIndex(Client client, ClusterState state, ActionListener listener) { + final String initialHistoryIndexName = ILM_HISTORY_INDEX_PREFIX + "000001"; + final AliasOrIndex ilmHistory = state.metaData().getAliasAndIndexLookup().get(ILM_HISTORY_ALIAS); + final AliasOrIndex initialHistoryIndex = state.metaData().getAliasAndIndexLookup().get(initialHistoryIndexName); + + if (ilmHistory == null && initialHistoryIndex == null) { + // No alias or index exists with the expected names, so create the index with appropriate alias + logger.debug("creating ILM history index [{}]", initialHistoryIndexName); + client.admin().indices().prepareCreate(initialHistoryIndexName) + .setWaitForActiveShards(1) + .addAlias(new Alias(ILM_HISTORY_ALIAS) + .writeIndex(true)) + .execute(new ActionListener() { + @Override + public void onResponse(CreateIndexResponse response) { + listener.onResponse(true); + } + + @Override + public void onFailure(Exception e) { + if (e instanceof ResourceAlreadyExistsException) { + // The index didn't exist before we made the call, there was probably a race - just ignore this + logger.debug("index [{}] was created after checking for its existence, likely due to a concurrent call", + initialHistoryIndexName); + listener.onResponse(false); + } else { + listener.onFailure(e); + } + } + }); + } else if (ilmHistory == null) { + // alias does not exist but initial index does, something is broken + listener.onFailure(new IllegalStateException("ILM history index [" + initialHistoryIndexName + + "] already exists but does not have alias [" + ILM_HISTORY_ALIAS + "]")); + } else if (ilmHistory.isAlias() && ilmHistory instanceof AliasOrIndex.Alias) { + if (((AliasOrIndex.Alias) ilmHistory).getWriteIndex() != null) { + // The alias exists and has a write index, so we're good + listener.onResponse(false); + } else { + // The alias does not have a write index, so we can't index into it + listener.onFailure(new IllegalStateException("ILM history alias [" + ILM_HISTORY_ALIAS + "does not have a write index")); + } + } else if (ilmHistory.isAlias() == false) { + // This is not an alias, error out + listener.onFailure(new IllegalStateException("ILM history alias [" + ILM_HISTORY_ALIAS + + "] already exists as concrete index")); + } else { + logger.error("unexpected IndexOrAlias for [{}]: [{}]", ILM_HISTORY_ALIAS, ilmHistory); + assert false : ILM_HISTORY_ALIAS + " cannot be both an alias and not an alias simultaneously"; + } + } + + @Override + public void close() { + try { + processor.awaitClose(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + logger.warn("failed to shut down ILM history bulk processor after 10 seconds", e); + } + } +} diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryTemplateRegistry.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryTemplateRegistry.java new file mode 100644 index 00000000000..21b2d16afdc --- /dev/null +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryTemplateRegistry.java @@ -0,0 +1,80 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ilm.history; + +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.ilm.LifecycleSettings; +import org.elasticsearch.xpack.core.template.IndexTemplateConfig; +import org.elasticsearch.xpack.core.template.IndexTemplateRegistry; +import org.elasticsearch.xpack.core.template.LifecyclePolicyConfig; + +import java.util.Collections; +import java.util.List; + +/** + * The {@link ILMHistoryTemplateRegistry} class sets up and configures an ILM policy and index + * template for the ILM history indices (ilm-history-N-00000M). + */ +public class ILMHistoryTemplateRegistry extends IndexTemplateRegistry { + // history (please add a comment why you increased the version here) + // version 1: initial + public static final String INDEX_TEMPLATE_VERSION = "1"; + + public static final String ILM_TEMPLATE_VERSION_VARIABLE = "xpack.ilm_history.template.version"; + public static final String ILM_TEMPLATE_NAME = "ilm-history"; + + public static final String ILM_POLICY_NAME = "ilm-history-ilm-policy"; + + public static final IndexTemplateConfig TEMPLATE_ILM_HISTORY = new IndexTemplateConfig( + ILM_TEMPLATE_NAME, + "/ilm-history.json", + INDEX_TEMPLATE_VERSION, + ILM_TEMPLATE_VERSION_VARIABLE + ); + + public static final LifecyclePolicyConfig ILM_HISTORY_POLICY = new LifecyclePolicyConfig( + ILM_POLICY_NAME, + "/ilm-history-ilm-policy.json" + ); + + private final boolean ilmHistoryEnabled; + + public ILMHistoryTemplateRegistry(Settings nodeSettings, ClusterService clusterService, + ThreadPool threadPool, Client client, + NamedXContentRegistry xContentRegistry) { + super(nodeSettings, clusterService, threadPool, client, xContentRegistry); + this.ilmHistoryEnabled = LifecycleSettings.LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING.get(nodeSettings); + } + + @Override + protected List getTemplateConfigs() { + if (this.ilmHistoryEnabled) { + return Collections.singletonList(TEMPLATE_ILM_HISTORY); + } else { + return Collections.emptyList(); + } + } + + @Override + protected List getPolicyConfigs() { + if (this.ilmHistoryEnabled) { + return Collections.singletonList(ILM_HISTORY_POLICY); + } else { + return Collections.emptyList(); + } + } + + @Override + protected String getOrigin() { + return ClientHelper.INDEX_LIFECYCLE_ORIGIN; + } +} diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleInitialisationTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleInitialisationTests.java index d768714ba65..2179898b5a4 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleInitialisationTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleInitialisationTests.java @@ -107,7 +107,8 @@ public class IndexLifecycleInitialisationTests extends ESIntegTestCase { settings.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false); settings.put(LifecycleSettings.LIFECYCLE_POLL_INTERVAL, "1s"); - // This is necessary to prevent SLM installing a lifecycle policy, these tests assume a blank slate + // This is necessary to prevent ILM and SLM installing a lifecycle policy, these tests assume a blank slate + settings.put(LifecycleSettings.LIFECYCLE_HISTORY_INDEX_ENABLED, false); settings.put(LifecycleSettings.SLM_HISTORY_INDEX_ENABLED_SETTING.getKey(), false); return settings.build(); } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java index 2580e2970e5..faa1270f479 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java @@ -55,6 +55,8 @@ import org.elasticsearch.xpack.core.ilm.Step; import org.elasticsearch.xpack.core.ilm.Step.StepKey; import org.elasticsearch.xpack.core.ilm.TerminalPolicyStep; import org.elasticsearch.xpack.core.ilm.WaitForRolloverReadyStep; +import org.elasticsearch.xpack.ilm.history.ILMHistoryItem; +import org.elasticsearch.xpack.ilm.history.ILMHistoryStore; import org.junit.After; import org.junit.Before; import org.mockito.ArgumentMatcher; @@ -91,6 +93,8 @@ import static org.mockito.Mockito.when; public class IndexLifecycleRunnerTests extends ESTestCase { private static final NamedXContentRegistry REGISTRY; private ThreadPool threadPool; + private Client noopClient; + private NoOpHistoryStore historyStore; static { try (IndexLifecycle indexLifecycle = new IndexLifecycle(Settings.EMPTY)) { @@ -100,12 +104,16 @@ public class IndexLifecycleRunnerTests extends ESTestCase { } @Before - public void prepareThreadPool() { + public void prepare() { threadPool = new TestThreadPool("test"); + noopClient = new NoOpClient(threadPool); + historyStore = new NoOpHistoryStore(); } @After public void shutdown() { + historyStore.close(); + noopClient.close(); threadPool.shutdownNow(); } @@ -114,7 +122,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { TerminalPolicyStep step = TerminalPolicyStep.INSTANCE; PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); ClusterService clusterService = mock(ClusterService.class); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L); IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); @@ -136,7 +144,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); ClusterService clusterService = mock(ClusterService.class); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L); LifecycleExecutionState.Builder newState = LifecycleExecutionState.builder(); newState.setFailedStep(stepKey.getName()); newState.setIsAutoRetryableError(false); @@ -176,7 +184,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, waitForRolloverStep); ClusterService clusterService = mock(ClusterService.class); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L); LifecycleExecutionState.Builder newState = LifecycleExecutionState.builder(); newState.setFailedStep(stepKey.getName()); newState.setIsAutoRetryableError(true); @@ -221,7 +229,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { .localNodeId(node.getId())) .build(); ClusterServiceUtils.setState(clusterService, state); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L); ClusterState before = clusterService.state(); CountDownLatch latch = new CountDownLatch(1); @@ -282,7 +290,8 @@ public class IndexLifecycleRunnerTests extends ESTestCase { .build(); ClusterServiceUtils.setState(clusterService, state); long stepTime = randomLong(); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> stepTime); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, + clusterService, threadPool, () -> stepTime); ClusterState before = clusterService.state(); CountDownLatch latch = new CountDownLatch(1); @@ -303,6 +312,14 @@ public class IndexLifecycleRunnerTests extends ESTestCase { assertThat(nextStep.getExecuteCount(), equalTo(1L)); clusterService.close(); threadPool.shutdownNow(); + + ILMHistoryItem historyItem = historyStore.getItems().stream() + .findFirst() + .orElseThrow(() -> new AssertionError("failed to register ILM history")); + assertThat(historyItem.toString(), + containsString("{\"index\":\"test\",\"policy\":\"foo\",\"@timestamp\":" + stepTime + + ",\"success\":true,\"state\":{\"phase\":\"phase\",\"action\":\"action\"," + + "\"step\":\"next_cluster_state_action_step\",\"step_time\":\"" + stepTime + "\"}}")); } public void testRunPeriodicPolicyWithFailureToReadPolicy() throws Exception { @@ -357,7 +374,8 @@ public class IndexLifecycleRunnerTests extends ESTestCase { .build(); ClusterServiceUtils.setState(clusterService, state); long stepTime = randomLong(); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> stepTime); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, + clusterService, threadPool, () -> stepTime); ClusterState before = clusterService.state(); if (asyncAction) { @@ -409,7 +427,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { .localNodeId(node.getId())) .build(); ClusterServiceUtils.setState(clusterService, state); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L); ClusterState before = clusterService.state(); // State changes should not run AsyncAction steps @@ -468,7 +486,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { .build(); logger.info("--> state: {}", state); ClusterServiceUtils.setState(clusterService, state); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L); ClusterState before = clusterService.state(); CountDownLatch latch = new CountDownLatch(1); @@ -488,6 +506,13 @@ public class IndexLifecycleRunnerTests extends ESTestCase { assertThat(nextStep.getExecuteCount(), equalTo(1L)); clusterService.close(); threadPool.shutdownNow(); + + ILMHistoryItem historyItem = historyStore.getItems().stream() + .findFirst() + .orElseThrow(() -> new AssertionError("failed to register ILM history")); + assertThat(historyItem.toString(), + containsString("{\"index\":\"test\",\"policy\":\"foo\",\"@timestamp\":0,\"success\":true," + + "\"state\":{\"phase\":\"phase\",\"action\":\"action\",\"step\":\"async_action_step\",\"step_time\":\"0\"}}")); } public void testRunPeriodicStep() throws Exception { @@ -535,7 +560,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { .build(); logger.info("--> state: {}", state); ClusterServiceUtils.setState(clusterService, state); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L); ClusterState before = clusterService.state(); CountDownLatch latch = new CountDownLatch(1); @@ -558,7 +583,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { MockClusterStateActionStep step = new MockClusterStateActionStep(stepKey, null); PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); ClusterService clusterService = mock(ClusterService.class); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L); IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); @@ -576,7 +601,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { step.setWillComplete(true); PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); ClusterService clusterService = mock(ClusterService.class); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L); IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); @@ -595,7 +620,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { step.setException(expectedException); PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); ClusterService clusterService = mock(ClusterService.class); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L); IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); @@ -613,7 +638,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { step.setException(expectedException); PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); ClusterService clusterService = mock(ClusterService.class); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L); IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); @@ -627,7 +652,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { String policyName = "cluster_state_action_policy"; ClusterService clusterService = mock(ClusterService.class); IndexLifecycleRunner runner = new IndexLifecycleRunner(new PolicyStepsRegistry(NamedXContentRegistry.EMPTY, null), - clusterService, threadPool, () -> 0L); + historyStore, clusterService, threadPool, () -> 0L); IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); // verify that no exception is thrown @@ -712,7 +737,8 @@ public class IndexLifecycleRunnerTests extends ESTestCase { stepMap, NamedXContentRegistry.EMPTY, null); ClusterService clusterService = mock(ClusterService.class); final AtomicLong now = new AtomicLong(5); - IndexLifecycleRunner runner = new IndexLifecycleRunner(policyStepsRegistry, clusterService, threadPool, now::get); + IndexLifecycleRunner runner = new IndexLifecycleRunner(policyStepsRegistry, historyStore, + clusterService, threadPool, now::get); IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) .numberOfShards(randomIntBetween(1, 5)) .numberOfReplicas(randomIntBetween(0, 5)) @@ -1086,4 +1112,23 @@ public class IndexLifecycleRunnerTests extends ESTestCase { when(client.settings()).thenReturn(Settings.EMPTY); return new MockPolicyStepsRegistry(lifecyclePolicyMap, firstStepMap, stepMap, REGISTRY, client); } + + private class NoOpHistoryStore extends ILMHistoryStore { + + private final List items = new ArrayList<>(); + + NoOpHistoryStore() { + super(Settings.EMPTY, noopClient, null, null); + } + + public List getItems() { + return items; + } + + @Override + public void putAsync(ILMHistoryItem item) { + logger.info("--> adding ILM history item: [{}]", item); + items.add(item); + } + } } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleServiceTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleServiceTests.java index a7f15419d37..2bf054d318f 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleServiceTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleServiceTests.java @@ -108,7 +108,7 @@ public class IndexLifecycleServiceTests extends ESTestCase { threadPool = new TestThreadPool("test"); indexLifecycleService = new IndexLifecycleService(Settings.EMPTY, client, clusterService, threadPool, - clock, () -> now, null); + clock, () -> now, null, null); Mockito.verify(clusterService).addListener(indexLifecycleService); Mockito.verify(clusterService).addStateApplier(indexLifecycleService); } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/MoveToErrorStepUpdateTaskTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/MoveToErrorStepUpdateTaskTests.java index fa2a626b0f9..39b3fca46c1 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/MoveToErrorStepUpdateTaskTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/MoveToErrorStepUpdateTaskTests.java @@ -75,7 +75,7 @@ public class MoveToErrorStepUpdateTaskTests extends ESTestCase { setStateToKey(currentStepKey); MoveToErrorStepUpdateTask task = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, cause, () -> now, - (idxMeta, stepKey) -> new MockStep(stepKey, nextStepKey)); + (idxMeta, stepKey) -> new MockStep(stepKey, nextStepKey), state -> {}); ClusterState newState = task.execute(clusterState); LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(newState.getMetaData().index(index)); StepKey actualKey = LifecycleExecutionState.getCurrentStepKey(lifecycleState); @@ -101,7 +101,7 @@ public class MoveToErrorStepUpdateTaskTests extends ESTestCase { Exception cause = new ElasticsearchException("THIS IS AN EXPECTED CAUSE"); setStateToKey(notCurrentStepKey); MoveToErrorStepUpdateTask task = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, cause, () -> now, - (idxMeta, stepKey) -> new MockStep(stepKey, new StepKey("next-phase", "action", "step"))); + (idxMeta, stepKey) -> new MockStep(stepKey, new StepKey("next-phase", "action", "step")), state -> {}); ClusterState newState = task.execute(clusterState); assertThat(newState, sameInstance(clusterState)); } @@ -113,7 +113,7 @@ public class MoveToErrorStepUpdateTaskTests extends ESTestCase { setStateToKey(currentStepKey); setStatePolicy("not-" + policy); MoveToErrorStepUpdateTask task = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, cause, () -> now, - (idxMeta, stepKey) -> new MockStep(stepKey, new StepKey("next-phase", "action", "step"))); + (idxMeta, stepKey) -> new MockStep(stepKey, new StepKey("next-phase", "action", "step")), state -> {}); ClusterState newState = task.execute(clusterState); assertThat(newState, sameInstance(clusterState)); } @@ -126,7 +126,7 @@ public class MoveToErrorStepUpdateTaskTests extends ESTestCase { setStateToKey(currentStepKey); MoveToErrorStepUpdateTask task = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, cause, () -> now, - (idxMeta, stepKey) -> new MockStep(stepKey, new StepKey("next-phase", "action", "step"))); + (idxMeta, stepKey) -> new MockStep(stepKey, new StepKey("next-phase", "action", "step")), state -> {}); Exception expectedException = new RuntimeException(); ElasticsearchException exception = expectThrows(ElasticsearchException.class, () -> task.onFailure(randomAlphaOfLength(10), expectedException)); diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/history/ILMHistoryItemTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/history/ILMHistoryItemTests.java new file mode 100644 index 00000000000..2e3f76c6370 --- /dev/null +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/history/ILMHistoryItemTests.java @@ -0,0 +1,92 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ilm.history; + +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.ilm.LifecycleExecutionState; + +import java.io.IOException; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.startsWith; + +public class ILMHistoryItemTests extends ESTestCase { + + public void testToXContent() throws IOException { + ILMHistoryItem success = ILMHistoryItem.success("index", "policy", 1234L, 100L, + LifecycleExecutionState.builder() + .setPhase("phase") + .setAction("action") + .setStep("step") + .setPhaseTime(10L) + .setActionTime(20L) + .setStepTime(30L) + .setPhaseDefinition("{}") + .setStepInfo("{\"step_info\": \"foo\"") + .build()); + + ILMHistoryItem failure = ILMHistoryItem.failure("index", "policy", 1234L, 100L, + LifecycleExecutionState.builder() + .setPhase("phase") + .setAction("action") + .setStep("ERROR") + .setFailedStep("step") + .setFailedStepRetryCount(7) + .setIsAutoRetryableError(true) + .setPhaseTime(10L) + .setActionTime(20L) + .setStepTime(30L) + .setPhaseDefinition("{\"phase_json\": \"eggplant\"}") + .setStepInfo("{\"step_info\": \"foo\"") + .build(), + new IllegalArgumentException("failure")); + + try (XContentBuilder builder = jsonBuilder()) { + success.toXContent(builder, ToXContent.EMPTY_PARAMS); + String json = Strings.toString(builder); + assertThat(json, equalTo("{\"index\":\"index\"," + + "\"policy\":\"policy\"," + + "\"@timestamp\":1234," + + "\"index_age\":100," + + "\"success\":true," + + "\"state\":{\"phase\":\"phase\"," + + "\"phase_definition\":\"{}\"," + + "\"action_time\":\"20\"," + + "\"phase_time\":\"10\"," + + "\"step_info\":\"{\\\"step_info\\\": \\\"foo\\\"\",\"action\":\"action\",\"step\":\"step\",\"step_time\":\"30\"}}" + )); + } + + try (XContentBuilder builder = jsonBuilder()) { + failure.toXContent(builder, ToXContent.EMPTY_PARAMS); + String json = Strings.toString(builder); + assertThat(json, startsWith("{\"index\":\"index\"," + + "\"policy\":\"policy\"," + + "\"@timestamp\":1234," + + "\"index_age\":100," + + "\"success\":false," + + "\"state\":{\"phase\":\"phase\"," + + "\"failed_step\":\"step\"," + + "\"phase_definition\":\"{\\\"phase_json\\\": \\\"eggplant\\\"}\"," + + "\"action_time\":\"20\"," + + "\"is_auto_retryable_error\":\"true\"," + + "\"failed_step_retry_count\":\"7\"," + + "\"phase_time\":\"10\"," + + "\"step_info\":\"{\\\"step_info\\\": \\\"foo\\\"\"," + + "\"action\":\"action\"," + + "\"step\":\"ERROR\"," + + "\"step_time\":\"30\"}," + + "\"error_details\":\"{\\\"type\\\":\\\"illegal_argument_exception\\\"," + + "\\\"reason\\\":\\\"failure\\\"," + + "\\\"stack_trace\\\":\\\"java.lang.IllegalArgumentException: failure")); + } + } +} diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStoreTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStoreTests.java new file mode 100644 index 00000000000..948c017d9d0 --- /dev/null +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStoreTests.java @@ -0,0 +1,398 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ilm.history; + +import org.elasticsearch.ResourceAlreadyExistsException; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.LatchedActionListener; +import org.elasticsearch.action.admin.indices.create.CreateIndexAction; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.bulk.BulkAction; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.AliasMetaData; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.TriFunction; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ClusterServiceUtils; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.client.NoOpClient; +import org.elasticsearch.test.hamcrest.ElasticsearchAssertions; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.ilm.LifecycleExecutionState; +import org.hamcrest.Matchers; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; + +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; + +import static org.elasticsearch.xpack.core.ilm.LifecycleSettings.LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING; +import static org.elasticsearch.xpack.ilm.history.ILMHistoryStore.ILM_HISTORY_ALIAS; +import static org.elasticsearch.xpack.ilm.history.ILMHistoryStore.ILM_HISTORY_INDEX_PREFIX; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; + +public class ILMHistoryStoreTests extends ESTestCase { + + private ThreadPool threadPool; + private VerifyingClient client; + private ClusterService clusterService; + private ILMHistoryStore historyStore; + + @Before + public void setup() { + threadPool = new TestThreadPool(this.getClass().getName()); + client = new VerifyingClient(threadPool); + clusterService = ClusterServiceUtils.createClusterService(threadPool); + historyStore = new ILMHistoryStore(Settings.EMPTY, client, clusterService, threadPool); + } + + @After + public void setdown() { + historyStore.close(); + clusterService.close(); + client.close(); + threadPool.shutdownNow(); + } + + public void testNoActionIfDisabled() throws Exception { + Settings settings = Settings.builder().put(LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING.getKey(), false).build(); + try (ILMHistoryStore disabledHistoryStore = new ILMHistoryStore(settings, client, null, threadPool)) { + String policyId = randomAlphaOfLength(5); + final long timestamp = randomNonNegativeLong(); + ILMHistoryItem record = ILMHistoryItem.success("index", policyId, timestamp, null, null); + + CountDownLatch latch = new CountDownLatch(1); + client.setVerifier((a, r, l) -> { + fail("the history store is disabled, no action should have been taken"); + latch.countDown(); + return null; + }); + disabledHistoryStore.putAsync(record); + latch.await(10, TimeUnit.SECONDS); + } + } + + @SuppressWarnings("unchecked") + public void testPut() throws Exception { + String policyId = randomAlphaOfLength(5); + final long timestamp = randomNonNegativeLong(); + { + ILMHistoryItem record = ILMHistoryItem.success("index", policyId, timestamp, 10L, + LifecycleExecutionState.builder() + .setPhase("phase") + .build()); + + AtomicInteger calledTimes = new AtomicInteger(0); + client.setVerifier((action, request, listener) -> { + if (action instanceof CreateIndexAction && request instanceof CreateIndexRequest) { + return new CreateIndexResponse(true, true, ((CreateIndexRequest) request).index()); + } + calledTimes.incrementAndGet(); + assertThat(action, instanceOf(BulkAction.class)); + assertThat(request, instanceOf(BulkRequest.class)); + BulkRequest bulkRequest = (BulkRequest) request; + bulkRequest.requests().forEach(dwr -> assertEquals(ILM_HISTORY_ALIAS, dwr.index())); + assertNotNull(listener); + + // The content of this BulkResponse doesn't matter, so just make it have the same number of responses + int responses = bulkRequest.numberOfActions(); + return new BulkResponse(IntStream.range(0, responses) + .mapToObj(i -> new BulkItemResponse(i, DocWriteRequest.OpType.INDEX, + new IndexResponse(new ShardId("index", "uuid", 0), "_doc", randomAlphaOfLength(10), 1, 1, 1, true))) + .toArray(BulkItemResponse[]::new), + 1000L); + }); + + historyStore.putAsync(record); + assertBusy(() -> assertThat(calledTimes.get(), equalTo(1))); + } + + { + final String cause = randomAlphaOfLength(9); + Exception failureException = new RuntimeException(cause); + ILMHistoryItem record = ILMHistoryItem.failure("index", policyId, timestamp, 10L, + LifecycleExecutionState.builder() + .setPhase("phase") + .build(), failureException); + + AtomicInteger calledTimes = new AtomicInteger(0); + client.setVerifier((action, request, listener) -> { + if (action instanceof CreateIndexAction && request instanceof CreateIndexRequest) { + return new CreateIndexResponse(true, true, ((CreateIndexRequest) request).index()); + } + calledTimes.incrementAndGet(); + assertThat(action, instanceOf(BulkAction.class)); + assertThat(request, instanceOf(BulkRequest.class)); + BulkRequest bulkRequest = (BulkRequest) request; + bulkRequest.requests().forEach(dwr -> { + assertEquals(ILM_HISTORY_ALIAS, dwr.index()); + assertThat(dwr, instanceOf(IndexRequest.class)); + IndexRequest ir = (IndexRequest) dwr; + String indexedDocument = ir.source().utf8ToString(); + assertThat(indexedDocument, Matchers.containsString("runtime_exception")); + assertThat(indexedDocument, Matchers.containsString(cause)); + }); + assertNotNull(listener); + + // The content of this BulkResponse doesn't matter, so just make it have the same number of responses with failures + int responses = bulkRequest.numberOfActions(); + return new BulkResponse(IntStream.range(0, responses) + .mapToObj(i -> new BulkItemResponse(i, DocWriteRequest.OpType.INDEX, + new BulkItemResponse.Failure("index", "_doc", i + "", failureException))) + .toArray(BulkItemResponse[]::new), + 1000L); + }); + + historyStore.putAsync(record); + assertBusy(() -> assertThat(calledTimes.get(), equalTo(1))); + } + } + + public void testHistoryIndexNeedsCreation() throws InterruptedException { + ClusterState state = ClusterState.builder(new ClusterName(randomAlphaOfLength(5))) + .metaData(MetaData.builder()) + .build(); + + client.setVerifier((a, r, l) -> { + assertThat(a, instanceOf(CreateIndexAction.class)); + assertThat(r, instanceOf(CreateIndexRequest.class)); + CreateIndexRequest request = (CreateIndexRequest) r; + assertThat(request.aliases(), Matchers.hasSize(1)); + request.aliases().forEach(alias -> { + assertThat(alias.name(), equalTo(ILM_HISTORY_ALIAS)); + assertTrue(alias.writeIndex()); + }); + return new CreateIndexResponse(true, true, request.index()); + }); + + CountDownLatch latch = new CountDownLatch(1); + ILMHistoryStore.ensureHistoryIndex(client, state, new LatchedActionListener<>(ActionListener.wrap( + Assert::assertTrue, + ex -> { + logger.error(ex); + fail("should have called onResponse, not onFailure"); + }), latch)); + + ElasticsearchAssertions.awaitLatch(latch, 10, TimeUnit.SECONDS); + } + + public void testHistoryIndexProperlyExistsAlready() throws InterruptedException { + ClusterState state = ClusterState.builder(new ClusterName(randomAlphaOfLength(5))) + .metaData(MetaData.builder() + .put(IndexMetaData.builder(ILM_HISTORY_INDEX_PREFIX + "000001") + .settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)) + .numberOfShards(randomIntBetween(1,10)) + .numberOfReplicas(randomIntBetween(1,10)) + .putAlias(AliasMetaData.builder(ILM_HISTORY_ALIAS) + .writeIndex(true) + .build()))) + .build(); + + client.setVerifier((a, r, l) -> { + fail("no client calls should have been made"); + return null; + }); + + CountDownLatch latch = new CountDownLatch(1); + ILMHistoryStore.ensureHistoryIndex(client, state, new LatchedActionListener<>(ActionListener.wrap( + Assert::assertFalse, + ex -> { + logger.error(ex); + fail("should have called onResponse, not onFailure"); + }), latch)); + + ElasticsearchAssertions.awaitLatch(latch, 10, TimeUnit.SECONDS); + } + + public void testHistoryIndexHasNoWriteIndex() throws InterruptedException { + ClusterState state = ClusterState.builder(new ClusterName(randomAlphaOfLength(5))) + .metaData(MetaData.builder() + .put(IndexMetaData.builder(ILM_HISTORY_INDEX_PREFIX + "000001") + .settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)) + .numberOfShards(randomIntBetween(1,10)) + .numberOfReplicas(randomIntBetween(1,10)) + .putAlias(AliasMetaData.builder(ILM_HISTORY_ALIAS) + .build())) + .put(IndexMetaData.builder(randomAlphaOfLength(5)) + .settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)) + .numberOfShards(randomIntBetween(1,10)) + .numberOfReplicas(randomIntBetween(1,10)) + .putAlias(AliasMetaData.builder(ILM_HISTORY_ALIAS) + .build()))) + .build(); + + client.setVerifier((a, r, l) -> { + fail("no client calls should have been made"); + return null; + }); + + CountDownLatch latch = new CountDownLatch(1); + ILMHistoryStore.ensureHistoryIndex(client, state, new LatchedActionListener<>(ActionListener.wrap( + indexCreated -> fail("should have called onFailure, not onResponse"), + ex -> { + assertThat(ex, instanceOf(IllegalStateException.class)); + assertThat(ex.getMessage(), Matchers.containsString("ILM history alias [" + ILM_HISTORY_ALIAS + + "does not have a write index")); + }), latch)); + + ElasticsearchAssertions.awaitLatch(latch, 10, TimeUnit.SECONDS); + } + + public void testHistoryIndexNotAlias() throws InterruptedException { + ClusterState state = ClusterState.builder(new ClusterName(randomAlphaOfLength(5))) + .metaData(MetaData.builder() + .put(IndexMetaData.builder(ILM_HISTORY_ALIAS) + .settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)) + .numberOfShards(randomIntBetween(1,10)) + .numberOfReplicas(randomIntBetween(1,10)))) + .build(); + + client.setVerifier((a, r, l) -> { + fail("no client calls should have been made"); + return null; + }); + + CountDownLatch latch = new CountDownLatch(1); + ILMHistoryStore.ensureHistoryIndex(client, state, new LatchedActionListener<>(ActionListener.wrap( + indexCreated -> fail("should have called onFailure, not onResponse"), + ex -> { + assertThat(ex, instanceOf(IllegalStateException.class)); + assertThat(ex.getMessage(), Matchers.containsString("ILM history alias [" + ILM_HISTORY_ALIAS + + "] already exists as concrete index")); + }), latch)); + + ElasticsearchAssertions.awaitLatch(latch, 10, TimeUnit.SECONDS); + } + + public void testHistoryIndexCreatedConcurrently() throws InterruptedException { + ClusterState state = ClusterState.builder(new ClusterName(randomAlphaOfLength(5))) + .metaData(MetaData.builder()) + .build(); + + client.setVerifier((a, r, l) -> { + assertThat(a, instanceOf(CreateIndexAction.class)); + assertThat(r, instanceOf(CreateIndexRequest.class)); + CreateIndexRequest request = (CreateIndexRequest) r; + assertThat(request.aliases(), Matchers.hasSize(1)); + request.aliases().forEach(alias -> { + assertThat(alias.name(), equalTo(ILM_HISTORY_ALIAS)); + assertTrue(alias.writeIndex()); + }); + throw new ResourceAlreadyExistsException("that index already exists"); + }); + + CountDownLatch latch = new CountDownLatch(1); + ILMHistoryStore.ensureHistoryIndex(client, state, new LatchedActionListener<>(ActionListener.wrap( + Assert::assertFalse, + ex -> { + logger.error(ex); + fail("should have called onResponse, not onFailure"); + }), latch)); + + ElasticsearchAssertions.awaitLatch(latch, 10, TimeUnit.SECONDS); + } + + public void testHistoryAliasDoesntExistButIndexDoes() throws InterruptedException { + final String initialIndex = ILM_HISTORY_INDEX_PREFIX + "000001"; + ClusterState state = ClusterState.builder(new ClusterName(randomAlphaOfLength(5))) + .metaData(MetaData.builder() + .put(IndexMetaData.builder(initialIndex) + .settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)) + .numberOfShards(randomIntBetween(1,10)) + .numberOfReplicas(randomIntBetween(1,10)))) + .build(); + + client.setVerifier((a, r, l) -> { + fail("no client calls should have been made"); + return null; + }); + + CountDownLatch latch = new CountDownLatch(1); + ILMHistoryStore.ensureHistoryIndex(client, state, new LatchedActionListener<>(ActionListener.wrap( + response -> { + logger.error(response); + fail("should have called onFailure, not onResponse"); + }, + ex -> { + assertThat(ex, instanceOf(IllegalStateException.class)); + assertThat(ex.getMessage(), Matchers.containsString("ILM history index [" + initialIndex + + "] already exists but does not have alias [" + ILM_HISTORY_ALIAS + "]")); + }), latch)); + + ElasticsearchAssertions.awaitLatch(latch, 10, TimeUnit.SECONDS); + } + + @SuppressWarnings("unchecked") + private void assertContainsMap(String indexedDocument, Map map) { + map.forEach((k, v) -> { + assertThat(indexedDocument, Matchers.containsString(k)); + if (v instanceof Map) { + assertContainsMap(indexedDocument, (Map) v); + } + if (v instanceof Iterable) { + ((Iterable) v).forEach(elem -> { + assertThat(indexedDocument, Matchers.containsString(elem.toString())); + }); + } else { + assertThat(indexedDocument, Matchers.containsString(v.toString())); + } + }); + } + + /** + * A client that delegates to a verifying function for action/request/listener + */ + public static class VerifyingClient extends NoOpClient { + + private TriFunction, ActionRequest, ActionListener, ActionResponse> verifier = (a, r, l) -> { + fail("verifier not set"); + return null; + }; + + VerifyingClient(ThreadPool threadPool) { + super(threadPool); + } + + @Override + @SuppressWarnings("unchecked") + protected void doExecute(ActionType action, + Request request, + ActionListener listener) { + try { + listener.onResponse((Response) verifier.apply(action, request, listener)); + } catch (Exception e) { + listener.onFailure(e); + } + } + + VerifyingClient setVerifier(TriFunction, ActionRequest, ActionListener, ActionResponse> verifier) { + this.verifier = verifier; + return this; + } + } +} diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SLMSnapshotBlockingIntegTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SLMSnapshotBlockingIntegTests.java index 453aa2555c1..34d208337f1 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SLMSnapshotBlockingIntegTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SLMSnapshotBlockingIntegTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.snapshots.mockstore.MockRepository; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.ilm.LifecycleSettings; import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicy; import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicyItem; import org.elasticsearch.xpack.core.slm.SnapshotRetentionConfiguration; @@ -89,6 +90,7 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase { settings.put(XPackSettings.MONITORING_ENABLED.getKey(), false); settings.put(XPackSettings.GRAPH_ENABLED.getKey(), false); settings.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false); + settings.put(LifecycleSettings.LIFECYCLE_HISTORY_INDEX_ENABLED, false); return settings.build(); } diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/test/MonitoringIntegTestCase.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/test/MonitoringIntegTestCase.java index d716e2e479f..f8aad5d1d1d 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/test/MonitoringIntegTestCase.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/test/MonitoringIntegTestCase.java @@ -58,6 +58,7 @@ public abstract class MonitoringIntegTestCase extends ESIntegTestCase { // .put(MachineLearningField.AUTODETECT_PROCESS.getKey(), false) // .put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false) // we do this by default in core, but for monitoring this isn't needed and only adds noise. + .put("index.lifecycle.history_index_enabled", false) .put("index.store.mock.check_index_on_close", false); return builder.build(); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java index 7060e9f4555..771cff9f5d0 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java @@ -118,6 +118,7 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase // watcher settings that should work despite randomization .put("xpack.watcher.execution.scroll.size", randomIntBetween(1, 100)) .put("xpack.watcher.watch.scroll.size", randomIntBetween(1, 100)) + .put("index.lifecycle.history_index_enabled", false) // SLM can cause timing issues during testsuite teardown: https://github.com/elastic/elasticsearch/issues/50302 // SLM is not required for tests extending from this base class and only add noise. .put("xpack.slm.enabled", false)