[7.x] Add ILM histore store index (#50287) (#50345)

* Add ILM histore store index (#50287)

* Add ILM histore store index

This commit adds an ILM history store that tracks the lifecycle
execution state as an index progresses through its ILM policy. ILM
history documents store output similar to what the ILM explain API
returns.

An example document with ALL fields (not all documents will have all
fields) would look like:

```json
{
  "@timestamp": 1203012389,
  "policy": "my-ilm-policy",
  "index": "index-2019.1.1-000023",
  "index_age":123120,
  "success": true,
  "state": {
    "phase": "warm",
    "action": "allocate",
    "step": "ERROR",
    "failed_step": "update-settings",
    "is_auto-retryable_error": true,
    "creation_date": 12389012039,
    "phase_time": 12908389120,
    "action_time": 1283901209,
    "step_time": 123904107140,
    "phase_definition": "{\"policy\":\"ilm-history-ilm-policy\",\"phase_definition\":{\"min_age\":\"0ms\",\"actions\":{\"rollover\":{\"max_size\":\"50gb\",\"max_age\":\"30d\"}}},\"version\":1,\"modified_date_in_millis\":1576517253463}",
    "step_info": "{... etc step info here as json ...}"
  },
  "error_details": "java.lang.RuntimeException: etc\n\tcaused by:etc etc etc full stacktrace"
}
```

These documents go into the `ilm-history-1-00000N` index to provide an
audit trail of the operations ILM has performed.

This history storage is enabled by default but can be disabled by setting
`index.lifecycle.history_index_enabled` to `false.`

Resolves #49180

* Make ILMHistoryStore.putAsync truly async (#50403)

This moves the `putAsync` method in `ILMHistoryStore` never to block.
Previously due to the way that the `BulkProcessor` works, it was possible
for `BulkProcessor#add` to block executing a bulk request. This was bad
as we may be adding things to the history store in cluster state update
threads.

This also moves the index creation to be done prior to the bulk request
execution, rather than being checked every time an operation was added
to the queue. This lessens the chance of the index being created, then
deleted (by some external force), and then recreated via a bulk indexing
request.

Resolves #50353
This commit is contained in:
Lee Hinman 2019-12-20 12:33:36 -07:00 committed by GitHub
parent b81e072504
commit c3c9ccf61f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 1459 additions and 75 deletions

View File

@ -135,6 +135,7 @@ testClusters.all {
setting 'xpack.security.authc.realms.pki.pki1.delegation.enabled', 'true' setting 'xpack.security.authc.realms.pki.pki1.delegation.enabled', 'true'
setting 'indices.lifecycle.poll_interval', '1000ms' setting 'indices.lifecycle.poll_interval', '1000ms'
setting 'index.lifecycle.history_index_enabled', 'false'
keystore 'xpack.security.transport.ssl.truststore.secure_password', 'testnode' keystore 'xpack.security.transport.ssl.truststore.secure_password', 'testnode'
extraConfigFile 'roles.yml', file('roles.yml') extraConfigFile 'roles.yml', file('roles.yml')
user username: System.getProperty('tests.rest.cluster.username', 'test_user'), user username: System.getProperty('tests.rest.cluster.username', 'test_user'),

View File

@ -59,6 +59,7 @@ testClusters.integTest {
extraConfigFile 'hunspell/en_US/en_US.dic', project(":server").file('src/test/resources/indices/analyze/conf_dir/hunspell/en_US/en_US.dic') extraConfigFile 'hunspell/en_US/en_US.dic', project(":server").file('src/test/resources/indices/analyze/conf_dir/hunspell/en_US/en_US.dic')
// Whitelist reindexing from the local node so we can test it. // Whitelist reindexing from the local node so we can test it.
setting 'reindex.remote.whitelist', '127.0.0.1:*' setting 'reindex.remote.whitelist', '127.0.0.1:*'
setting 'index.lifecycle.history_index_enabled', 'false'
// TODO: remove this once cname is prepended to transport.publish_address by default in 8.0 // TODO: remove this once cname is prepended to transport.publish_address by default in 8.0
systemProperty 'es.transport.cname_in_publish_address', 'true' systemProperty 'es.transport.cname_in_publish_address', 'true'

View File

@ -368,7 +368,7 @@ PUT my_index
} }
} }
GET _search <3> GET /my_index/_search <3>
{ {
"query": { "query": {
"match": { "match": {

View File

@ -431,7 +431,7 @@ PUT logs/_doc/2 <2>
////////////////////////// //////////////////////////
[source,console] [source,console]
-------------------------------------------------- --------------------------------------------------
GET _alias GET my_logs_index-000001,my_logs_index-000002/_alias
-------------------------------------------------- --------------------------------------------------
// TEST[continued] // TEST[continued]
////////////////////////// //////////////////////////

View File

@ -14,6 +14,11 @@ ILM REST API endpoints and functionality. Defaults to `true`.
(<<time-units, time units>>) How often {ilm} checks for indices that meet policy (<<time-units, time units>>) How often {ilm} checks for indices that meet policy
criteria. Defaults to `10m`. criteria. Defaults to `10m`.
`index.lifecycle.history_index_enabled`::
Whether ILM's history index is enabled. If enabled, ILM will record the
history of actions taken as part of ILM policies to the `ilm-history-*`
indices. Defaults to `true`.
==== Index level settings ==== Index level settings
These index-level {ilm-init} settings are typically configured through index These index-level {ilm-init} settings are typically configured through index
templates. For more information, see <<ilm-gs-create-policy>>. templates. For more information, see <<ilm-gs-create-policy>>.

View File

@ -75,6 +75,7 @@ import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException; import java.security.cert.CertificateException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -472,6 +473,13 @@ public abstract class ESRestTestCase extends ESTestCase {
return false; return false;
} }
/**
* A set of ILM policies that should be preserved between runs.
*/
protected Set<String> preserveILMPolicyIds() {
return Collections.singleton("ilm-history-ilm-policy");
}
/** /**
* Returns whether to preserve auto-follow patterns. Defaults to not * Returns whether to preserve auto-follow patterns. Defaults to not
* preserving them. Only runs at all if xpack is installed on the cluster * preserving them. Only runs at all if xpack is installed on the cluster
@ -560,7 +568,7 @@ public abstract class ESRestTestCase extends ESTestCase {
} }
if (hasXPack && false == preserveILMPoliciesUponCompletion()) { if (hasXPack && false == preserveILMPoliciesUponCompletion()) {
deleteAllILMPolicies(); deleteAllILMPolicies(preserveILMPolicyIds());
} }
if (hasXPack && false == preserveAutoFollowPatternsUponCompletion()) { if (hasXPack && false == preserveAutoFollowPatternsUponCompletion()) {
@ -686,7 +694,7 @@ public abstract class ESRestTestCase extends ESTestCase {
waitForPendingTasks(adminClient(), taskName -> taskName.startsWith("xpack/rollup/job") == false); waitForPendingTasks(adminClient(), taskName -> taskName.startsWith("xpack/rollup/job") == false);
} }
private static void deleteAllILMPolicies() throws IOException { private static void deleteAllILMPolicies(Set<String> exclusions) throws IOException {
Map<String, Object> policies; Map<String, Object> policies;
try { try {
@ -704,9 +712,15 @@ public abstract class ESRestTestCase extends ESTestCase {
return; return;
} }
for (String policyName : policies.keySet()) { policies.keySet().stream()
adminClient().performRequest(new Request("DELETE", "/_ilm/policy/" + policyName)); .filter(p -> exclusions.contains(p) == false)
} .forEach(policyName -> {
try {
adminClient().performRequest(new Request("DELETE", "/_ilm/policy/" + policyName));
} catch (IOException e) {
throw new RuntimeException("failed to delete policy: " + policyName, e);
}
});
} }
private static void deleteAllSLMPolicies() throws IOException { private static void deleteAllSLMPolicies() throws IOException {

View File

@ -19,6 +19,7 @@ public class LifecycleSettings {
public static final String LIFECYCLE_INDEXING_COMPLETE = "index.lifecycle.indexing_complete"; public static final String LIFECYCLE_INDEXING_COMPLETE = "index.lifecycle.indexing_complete";
public static final String LIFECYCLE_ORIGINATION_DATE = "index.lifecycle.origination_date"; public static final String LIFECYCLE_ORIGINATION_DATE = "index.lifecycle.origination_date";
public static final String LIFECYCLE_PARSE_ORIGINATION_DATE = "index.lifecycle.parse_origination_date"; public static final String LIFECYCLE_PARSE_ORIGINATION_DATE = "index.lifecycle.parse_origination_date";
public static final String LIFECYCLE_HISTORY_INDEX_ENABLED = "index.lifecycle.history_index_enabled";
public static final String SLM_HISTORY_INDEX_ENABLED = "slm.history_index_enabled"; public static final String SLM_HISTORY_INDEX_ENABLED = "slm.history_index_enabled";
public static final String SLM_RETENTION_SCHEDULE = "slm.retention_schedule"; public static final String SLM_RETENTION_SCHEDULE = "slm.retention_schedule";
@ -35,6 +36,8 @@ public class LifecycleSettings {
Setting.longSetting(LIFECYCLE_ORIGINATION_DATE, -1, -1, Setting.Property.Dynamic, Setting.Property.IndexScope); Setting.longSetting(LIFECYCLE_ORIGINATION_DATE, -1, -1, Setting.Property.Dynamic, Setting.Property.IndexScope);
public static final Setting<Boolean> LIFECYCLE_PARSE_ORIGINATION_DATE_SETTING = Setting.boolSetting(LIFECYCLE_PARSE_ORIGINATION_DATE, public static final Setting<Boolean> LIFECYCLE_PARSE_ORIGINATION_DATE_SETTING = Setting.boolSetting(LIFECYCLE_PARSE_ORIGINATION_DATE,
false, Setting.Property.Dynamic, Setting.Property.IndexScope); false, Setting.Property.Dynamic, Setting.Property.IndexScope);
public static final Setting<Boolean> LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING = Setting.boolSetting(LIFECYCLE_HISTORY_INDEX_ENABLED,
true, Setting.Property.NodeScope);
public static final Setting<Boolean> SLM_HISTORY_INDEX_ENABLED_SETTING = Setting.boolSetting(SLM_HISTORY_INDEX_ENABLED, true, public static final Setting<Boolean> SLM_HISTORY_INDEX_ENABLED_SETTING = Setting.boolSetting(SLM_HISTORY_INDEX_ENABLED, true,

View File

@ -0,0 +1,18 @@
{
"phases": {
"hot": {
"actions": {
"rollover": {
"max_size": "50GB",
"max_age": "30d"
}
}
},
"delete": {
"min_age": "90d",
"actions": {
"delete": {}
}
}
}
}

View File

@ -0,0 +1,83 @@
{
"index_patterns": [
"ilm-history-${xpack.ilm_history.template.version}*"
],
"order": 2147483647,
"settings": {
"index.number_of_shards": 1,
"index.number_of_replicas": 0,
"index.auto_expand_replicas": "0-1",
"index.lifecycle.name": "ilm-history-ilm-policy",
"index.lifecycle.rollover_alias": "ilm-history-${xpack.ilm_history.template.version}",
"index.format": 1
},
"mappings": {
"_doc": {
"dynamic": false,
"properties": {
"@timestamp": {
"type": "date",
"format": "epoch_millis"
},
"policy": {
"type": "keyword"
},
"index": {
"type": "keyword"
},
"index_age":{
"type": "long"
},
"success": {
"type": "boolean"
},
"state": {
"type": "object",
"dynamic": true,
"properties": {
"phase": {
"type": "keyword"
},
"action": {
"type": "keyword"
},
"step": {
"type": "keyword"
},
"failed_step": {
"type": "keyword"
},
"is_auto-retryable_error": {
"type": "keyword"
},
"creation_date": {
"type": "date",
"format": "epoch_millis"
},
"phase_time": {
"type": "date",
"format": "epoch_millis"
},
"action_time": {
"type": "date",
"format": "epoch_millis"
},
"step_time": {
"type": "date",
"format": "epoch_millis"
},
"phase_definition": {
"type": "text"
},
"step_info": {
"type": "text"
}
}
},
"error_details": {
"type": "text"
}
}
}
}
}

View File

@ -27,4 +27,6 @@ testClusters.integTest {
setting 'xpack.ml.enabled', 'false' setting 'xpack.ml.enabled', 'false'
setting 'xpack.license.self_generated.type', 'trial' setting 'xpack.license.self_generated.type', 'trial'
setting 'indices.lifecycle.poll_interval', '1000ms' setting 'indices.lifecycle.poll_interval', '1000ms'
setting 'logger.org.elasticsearch.xpack.core.ilm', 'TRACE'
setting 'logger.org.elasticsearch.xpack.ilm', 'TRACE'
} }

View File

@ -8,13 +8,14 @@ package org.elasticsearch.xpack.ilm;
import org.apache.http.entity.ContentType; import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity; import org.apache.http.entity.StringEntity;
import org.apache.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Request; import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response; import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClient;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
@ -37,8 +38,10 @@ import org.elasticsearch.xpack.core.ilm.RolloverAction;
import org.elasticsearch.xpack.core.ilm.SetPriorityAction; import org.elasticsearch.xpack.core.ilm.SetPriorityAction;
import org.elasticsearch.xpack.core.ilm.ShrinkAction; import org.elasticsearch.xpack.core.ilm.ShrinkAction;
import org.elasticsearch.xpack.core.ilm.ShrinkStep; import org.elasticsearch.xpack.core.ilm.ShrinkStep;
import org.elasticsearch.xpack.core.ilm.Step;
import org.elasticsearch.xpack.core.ilm.Step.StepKey; import org.elasticsearch.xpack.core.ilm.Step.StepKey;
import org.elasticsearch.xpack.core.ilm.TerminalPolicyStep; import org.elasticsearch.xpack.core.ilm.TerminalPolicyStep;
import org.elasticsearch.xpack.core.ilm.WaitForRolloverReadyStep;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import org.junit.Before; import org.junit.Before;
@ -1010,6 +1013,171 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
assertBusy(() -> assertTrue(indexExists(thirdIndex))); assertBusy(() -> assertTrue(indexExists(thirdIndex)));
} }
public void testHistoryIsWrittenWithSuccess() throws Exception {
String index = "index";
createNewSingletonPolicy("hot", new RolloverAction(null, null, 1L));
Request createIndexTemplate = new Request("PUT", "_template/rolling_indexes");
createIndexTemplate.setJsonEntity("{" +
"\"index_patterns\": [\""+ index + "-*\"], \n" +
" \"settings\": {\n" +
" \"number_of_shards\": 1,\n" +
" \"number_of_replicas\": 0,\n" +
" \"index.lifecycle.name\": \"" + policy+ "\",\n" +
" \"index.lifecycle.rollover_alias\": \"alias\"\n" +
" }\n" +
"}");
client().performRequest(createIndexTemplate);
createIndexWithSettings(index + "-1",
Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0),
true);
// Index a document
index(client(), index + "-1", "1", "foo", "bar");
Request refreshIndex = new Request("POST", "/" + index + "-1/_refresh");
client().performRequest(refreshIndex);
assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "wait-for-indexing-complete"), 30, TimeUnit.SECONDS);
assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "wait-for-follow-shard-tasks"), 30, TimeUnit.SECONDS);
assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "pause-follower-index"), 30, TimeUnit.SECONDS);
assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "close-follower-index"), 30, TimeUnit.SECONDS);
assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "unfollow-follower-index"), 30, TimeUnit.SECONDS);
assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "open-follower-index"), 30, TimeUnit.SECONDS);
assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "wait-for-yellow-step"), 30, TimeUnit.SECONDS);
assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "check-rollover-ready"), 30, TimeUnit.SECONDS);
assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "attempt-rollover"), 30, TimeUnit.SECONDS);
assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "update-rollover-lifecycle-date"), 30, TimeUnit.SECONDS);
assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "set-indexing-complete"), 30, TimeUnit.SECONDS);
assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", true, "completed"), 30, TimeUnit.SECONDS);
assertBusy(() -> assertHistoryIsPresent(policy, index + "-000002", true, "check-rollover-ready"), 30, TimeUnit.SECONDS);
}
public void testHistoryIsWrittenWithFailure() throws Exception {
String index = "index";
createNewSingletonPolicy("hot", new RolloverAction(null, null, 1L));
Request createIndexTemplate = new Request("PUT", "_template/rolling_indexes");
createIndexTemplate.setJsonEntity("{" +
"\"index_patterns\": [\""+ index + "-*\"], \n" +
" \"settings\": {\n" +
" \"number_of_shards\": 1,\n" +
" \"number_of_replicas\": 0,\n" +
" \"index.lifecycle.name\": \"" + policy+ "\"\n" +
" }\n" +
"}");
client().performRequest(createIndexTemplate);
createIndexWithSettings(index + "-1",
Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0),
false);
// Index a document
index(client(), index + "-1", "1", "foo", "bar");
Request refreshIndex = new Request("POST", "/" + index + "-1/_refresh");
client().performRequest(refreshIndex);
assertBusy(() -> assertHistoryIsPresent(policy, index + "-1", false, "ERROR"), 30, TimeUnit.SECONDS);
}
public void testHistoryIsWrittenWithDeletion() throws Exception {
String index = "index";
createNewSingletonPolicy("delete", new DeleteAction());
Request createIndexTemplate = new Request("PUT", "_template/delete_indexes");
createIndexTemplate.setJsonEntity("{" +
"\"index_patterns\": [\""+ index + "\"], \n" +
" \"settings\": {\n" +
" \"number_of_shards\": 1,\n" +
" \"number_of_replicas\": 0,\n" +
" \"index.lifecycle.name\": \"" + policy+ "\"\n" +
" }\n" +
"}");
client().performRequest(createIndexTemplate);
// Index should be created and then deleted by ILM
createIndexWithSettings(index, Settings.builder(), false);
assertBusy(() -> {
logger.info("--> checking for index deletion...");
Request existCheck = new Request("HEAD", "/" + index);
Response resp = client().performRequest(existCheck);
assertThat(resp.getStatusLine().getStatusCode(), equalTo(404));
});
assertBusy(() -> {
assertHistoryIsPresent(policy, index, true, "delete", "delete", "wait-for-shard-history-leases");
assertHistoryIsPresent(policy, index, true, "delete", "delete", "complete");
}, 30, TimeUnit.SECONDS);
}
// This method should be called inside an assertBusy, it has no retry logic of its own
private void assertHistoryIsPresent(String policyName, String indexName, boolean success, String stepName) throws IOException {
assertHistoryIsPresent(policyName, indexName, success, null, null, stepName);
}
// This method should be called inside an assertBusy, it has no retry logic of its own
private void assertHistoryIsPresent(String policyName, String indexName, boolean success,
@Nullable String phase, @Nullable String action, String stepName) throws IOException {
logger.info("--> checking for history item [{}], [{}], success: [{}], phase: [{}], action: [{}], step: [{}]",
policyName, indexName, success, phase, action, stepName);
final Request historySearchRequest = new Request("GET", "ilm-history*/_search");
historySearchRequest.setJsonEntity("{\n" +
" \"query\": {\n" +
" \"bool\": {\n" +
" \"must\": [\n" +
" {\n" +
" \"term\": {\n" +
" \"policy\": \"" + policyName + "\"\n" +
" }\n" +
" },\n" +
" {\n" +
" \"term\": {\n" +
" \"success\": " + success + "\n" +
" }\n" +
" },\n" +
" {\n" +
" \"term\": {\n" +
" \"index\": \"" + indexName + "\"\n" +
" }\n" +
" },\n" +
" {\n" +
" \"term\": {\n" +
" \"state.step\": \"" + stepName + "\"\n" +
" }\n" +
" }\n" +
(phase == null ? "" : ",{\"term\": {\"state.phase\": \"" + phase + "\"}}") +
(action == null ? "" : ",{\"term\": {\"state.action\": \"" + action + "\"}}") +
" ]\n" +
" }\n" +
" }\n" +
"}");
Response historyResponse;
try {
historyResponse = client().performRequest(historySearchRequest);
Map<String, Object> historyResponseMap;
try (InputStream is = historyResponse.getEntity().getContent()) {
historyResponseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true);
}
logger.info("--> history response: {}", historyResponseMap);
assertThat((int)((Map<String, Object>) ((Map<String, Object>) historyResponseMap.get("hits")).get("total")).get("value"),
greaterThanOrEqualTo(1));
} catch (ResponseException e) {
// Throw AssertionError instead of an exception if the search fails so that assertBusy works as expected
logger.error(e);
fail("failed to perform search:" + e.getMessage());
}
// Finally, check that the history index is in a good state
Step.StepKey stepKey = getStepKeyForIndex("ilm-history-1-000001");
assertEquals("hot", stepKey.getPhase());
assertEquals(RolloverAction.NAME, stepKey.getAction());
assertEquals(WaitForRolloverReadyStep.NAME, stepKey.getName());
}
private void createFullPolicy(TimeValue hotTime) throws IOException { private void createFullPolicy(TimeValue hotTime) throws IOException {
Map<String, LifecycleAction> hotActions = new HashMap<>(); Map<String, LifecycleAction> hotActions = new HashMap<>();
hotActions.put(SetPriorityAction.NAME, new SetPriorityAction(100)); hotActions.put(SetPriorityAction.NAME, new SetPriorityAction(100));

View File

@ -12,10 +12,13 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.xpack.core.ilm.ClusterStateActionStep; import org.elasticsearch.xpack.core.ilm.ClusterStateActionStep;
import org.elasticsearch.xpack.core.ilm.ClusterStateWaitStep; import org.elasticsearch.xpack.core.ilm.ClusterStateWaitStep;
import org.elasticsearch.xpack.core.ilm.ErrorStep;
import org.elasticsearch.xpack.core.ilm.LifecycleExecutionState;
import org.elasticsearch.xpack.core.ilm.Step; import org.elasticsearch.xpack.core.ilm.Step;
import org.elasticsearch.xpack.core.ilm.TerminalPolicyStep; import org.elasticsearch.xpack.core.ilm.TerminalPolicyStep;
@ -29,8 +32,9 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask {
private final Step startStep; private final Step startStep;
private final PolicyStepsRegistry policyStepsRegistry; private final PolicyStepsRegistry policyStepsRegistry;
private final IndexLifecycleRunner lifecycleRunner; private final IndexLifecycleRunner lifecycleRunner;
private LongSupplier nowSupplier; private final LongSupplier nowSupplier;
private Step.StepKey nextStepKey = null; private Step.StepKey nextStepKey = null;
private Exception failure = null;
public ExecuteStepsUpdateTask(String policy, Index index, Step startStep, PolicyStepsRegistry policyStepsRegistry, public ExecuteStepsUpdateTask(String policy, Index index, Step startStep, PolicyStepsRegistry policyStepsRegistry,
IndexLifecycleRunner lifecycleRunner, LongSupplier nowSupplier) { IndexLifecycleRunner lifecycleRunner, LongSupplier nowSupplier) {
@ -115,7 +119,7 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask {
// wait for the next trigger to evaluate the // wait for the next trigger to evaluate the
// condition again // condition again
logger.trace("[{}] waiting for cluster state step condition ({}) [{}], next: [{}]", logger.trace("[{}] waiting for cluster state step condition ({}) [{}], next: [{}]",
index.getName(), currentStep.getClass().getSimpleName(), currentStep.getKey(), currentStep.getNextStepKey()); index.getName(), currentStep.getClass().getSimpleName(), currentStep.getKey(), nextStepKey);
ClusterStateWaitStep.Result result; ClusterStateWaitStep.Result result;
try { try {
result = ((ClusterStateWaitStep) currentStep).isConditionMet(index, state); result = ((ClusterStateWaitStep) currentStep).isConditionMet(index, state);
@ -124,22 +128,25 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask {
} }
if (result.isComplete()) { if (result.isComplete()) {
logger.trace("[{}] cluster state step condition met successfully ({}) [{}], moving to next step {}", logger.trace("[{}] cluster state step condition met successfully ({}) [{}], moving to next step {}",
index.getName(), currentStep.getClass().getSimpleName(), currentStep.getKey(), currentStep.getNextStepKey()); index.getName(), currentStep.getClass().getSimpleName(), currentStep.getKey(), nextStepKey);
if (currentStep.getNextStepKey() == null) { if (nextStepKey == null) {
return state; return state;
} else { } else {
state = IndexLifecycleTransition.moveClusterStateToStep(index, state, state = IndexLifecycleTransition.moveClusterStateToStep(index, state,
currentStep.getNextStepKey(), nowSupplier, policyStepsRegistry,false); nextStepKey, nowSupplier, policyStepsRegistry,false);
} }
} else { } else {
logger.trace("[{}] condition not met ({}) [{}], returning existing state", final ToXContentObject stepInfo = result.getInfomationContext();
index.getName(), currentStep.getClass().getSimpleName(), currentStep.getKey()); if (logger.isTraceEnabled()) {
logger.trace("[{}] condition not met ({}) [{}], returning existing state (info: {})",
index.getName(), currentStep.getClass().getSimpleName(), currentStep.getKey(),
Strings.toString(stepInfo));
}
// We may have executed a step and set "nextStepKey" to // We may have executed a step and set "nextStepKey" to
// a value, but in this case, since the condition was // a value, but in this case, since the condition was
// not met, we can't advance any way, so don't attempt // not met, we can't advance any way, so don't attempt
// to run the current step // to run the current step
nextStepKey = null; nextStepKey = null;
ToXContentObject stepInfo = result.getInfomationContext();
if (stepInfo == null) { if (stepInfo == null) {
return state; return state;
} else { } else {
@ -169,13 +176,23 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask {
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (oldState.equals(newState) == false) { if (oldState.equals(newState) == false) {
IndexMetaData indexMetaData = newState.metaData().index(index); IndexMetaData indexMetaData = newState.metaData().index(index);
if (nextStepKey != null && nextStepKey != TerminalPolicyStep.KEY && indexMetaData != null) { if (indexMetaData != null) {
logger.trace("[{}] step sequence starting with {} has completed, running next step {} if it is an async action",
index.getName(), startStep.getKey(), nextStepKey); LifecycleExecutionState exState = LifecycleExecutionState.fromIndexMetadata(indexMetaData);
// After the cluster state has been processed and we have moved if (ErrorStep.NAME.equals(exState.getStep()) && this.failure != null) {
// to a new step, we need to conditionally execute the step iff lifecycleRunner.registerFailedOperation(indexMetaData, failure);
// it is an `AsyncAction` so that it is executed exactly once. } else {
lifecycleRunner.maybeRunAsyncAction(newState, indexMetaData, policy, nextStepKey); lifecycleRunner.registerSuccessfulOperation(indexMetaData);
}
if (nextStepKey != null && nextStepKey != TerminalPolicyStep.KEY) {
logger.trace("[{}] step sequence starting with {} has completed, running next step {} if it is an async action",
index.getName(), startStep.getKey(), nextStepKey);
// After the cluster state has been processed and we have moved
// to a new step, we need to conditionally execute the step iff
// it is an `AsyncAction` so that it is executed exactly once.
lifecycleRunner.maybeRunAsyncAction(newState, indexMetaData, policy, nextStepKey);
}
} }
} }
} }
@ -187,10 +204,9 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask {
} }
private ClusterState moveToErrorStep(final ClusterState state, Step.StepKey currentStepKey, Exception cause) throws IOException { private ClusterState moveToErrorStep(final ClusterState state, Step.StepKey currentStepKey, Exception cause) throws IOException {
this.failure = cause;
logger.error("policy [{}] for index [{}] failed on cluster state step [{}]. Moving to ERROR step", policy, index.getName(), logger.error("policy [{}] for index [{}] failed on cluster state step [{}]. Moving to ERROR step", policy, index.getName(),
currentStepKey); currentStepKey);
MoveToErrorStepUpdateTask moveToErrorStepUpdateTask = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, cause, return IndexLifecycleTransition.moveClusterStateToErrorStep(index, state, cause, nowSupplier, policyStepsRegistry::getStep);
nowSupplier, policyStepsRegistry::getStep);
return moveToErrorStepUpdateTask.execute(state);
} }
} }

View File

@ -94,6 +94,8 @@ import org.elasticsearch.xpack.ilm.action.TransportRemoveIndexLifecyclePolicyAct
import org.elasticsearch.xpack.ilm.action.TransportRetryAction; import org.elasticsearch.xpack.ilm.action.TransportRetryAction;
import org.elasticsearch.xpack.ilm.action.TransportStartILMAction; import org.elasticsearch.xpack.ilm.action.TransportStartILMAction;
import org.elasticsearch.xpack.ilm.action.TransportStopILMAction; import org.elasticsearch.xpack.ilm.action.TransportStopILMAction;
import org.elasticsearch.xpack.ilm.history.ILMHistoryStore;
import org.elasticsearch.xpack.ilm.history.ILMHistoryTemplateRegistry;
import org.elasticsearch.xpack.slm.SLMFeatureSet; import org.elasticsearch.xpack.slm.SLMFeatureSet;
import org.elasticsearch.xpack.slm.SnapshotLifecycleService; import org.elasticsearch.xpack.slm.SnapshotLifecycleService;
import org.elasticsearch.xpack.slm.SnapshotLifecycleTask; import org.elasticsearch.xpack.slm.SnapshotLifecycleTask;
@ -131,6 +133,7 @@ import static org.elasticsearch.xpack.core.ClientHelper.INDEX_LIFECYCLE_ORIGIN;
public class IndexLifecycle extends Plugin implements ActionPlugin { public class IndexLifecycle extends Plugin implements ActionPlugin {
private final SetOnce<IndexLifecycleService> indexLifecycleInitialisationService = new SetOnce<>(); private final SetOnce<IndexLifecycleService> indexLifecycleInitialisationService = new SetOnce<>();
private final SetOnce<ILMHistoryStore> ilmHistoryStore = new SetOnce<>();
private final SetOnce<SnapshotLifecycleService> snapshotLifecycleService = new SetOnce<>(); private final SetOnce<SnapshotLifecycleService> snapshotLifecycleService = new SetOnce<>();
private final SetOnce<SnapshotRetentionService> snapshotRetentionService = new SetOnce<>(); private final SetOnce<SnapshotRetentionService> snapshotRetentionService = new SetOnce<>();
private final SetOnce<SnapshotHistoryStore> snapshotHistoryStore = new SetOnce<>(); private final SetOnce<SnapshotHistoryStore> snapshotHistoryStore = new SetOnce<>();
@ -172,6 +175,7 @@ public class IndexLifecycle extends Plugin implements ActionPlugin {
LifecycleSettings.LIFECYCLE_ORIGINATION_DATE_SETTING, LifecycleSettings.LIFECYCLE_ORIGINATION_DATE_SETTING,
LifecycleSettings.LIFECYCLE_PARSE_ORIGINATION_DATE_SETTING, LifecycleSettings.LIFECYCLE_PARSE_ORIGINATION_DATE_SETTING,
LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE_SETTING, LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE_SETTING,
LifecycleSettings.LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING,
RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING, RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING,
LifecycleSettings.SLM_HISTORY_INDEX_ENABLED_SETTING, LifecycleSettings.SLM_HISTORY_INDEX_ENABLED_SETTING,
LifecycleSettings.SLM_RETENTION_SCHEDULE_SETTING, LifecycleSettings.SLM_RETENTION_SCHEDULE_SETTING,
@ -188,8 +192,14 @@ public class IndexLifecycle extends Plugin implements ActionPlugin {
} }
final List<Object> components = new ArrayList<>(); final List<Object> components = new ArrayList<>();
if (ilmEnabled) { if (ilmEnabled) {
// This registers a cluster state listener, so appears unused but is not.
@SuppressWarnings("unused")
ILMHistoryTemplateRegistry ilmTemplateRegistry =
new ILMHistoryTemplateRegistry(settings, clusterService, threadPool, client, xContentRegistry);
ilmHistoryStore.set(new ILMHistoryStore(settings, new OriginSettingClient(client, INDEX_LIFECYCLE_ORIGIN),
clusterService, threadPool));
indexLifecycleInitialisationService.set(new IndexLifecycleService(settings, client, clusterService, threadPool, indexLifecycleInitialisationService.set(new IndexLifecycleService(settings, client, clusterService, threadPool,
getClock(), System::currentTimeMillis, xContentRegistry)); getClock(), System::currentTimeMillis, xContentRegistry, ilmHistoryStore.get()));
components.add(indexLifecycleInitialisationService.get()); components.add(indexLifecycleInitialisationService.get());
} }
if (slmEnabled) { if (slmEnabled) {
@ -317,7 +327,8 @@ public class IndexLifecycle extends Plugin implements ActionPlugin {
@Override @Override
public void close() { public void close() {
try { try {
IOUtils.close(indexLifecycleInitialisationService.get(), snapshotLifecycleService.get(), snapshotRetentionService.get()); IOUtils.close(indexLifecycleInitialisationService.get(), ilmHistoryStore.get(),
snapshotLifecycleService.get(), snapshotRetentionService.get());
} catch (IOException e) { } catch (IOException e) {
throw new ElasticsearchException("unable to close index lifecycle services", e); throw new ElasticsearchException("unable to close index lifecycle services", e);
} }

View File

@ -13,6 +13,7 @@ import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
@ -23,10 +24,13 @@ import org.elasticsearch.xpack.core.ilm.ClusterStateActionStep;
import org.elasticsearch.xpack.core.ilm.ClusterStateWaitStep; import org.elasticsearch.xpack.core.ilm.ClusterStateWaitStep;
import org.elasticsearch.xpack.core.ilm.ErrorStep; import org.elasticsearch.xpack.core.ilm.ErrorStep;
import org.elasticsearch.xpack.core.ilm.LifecycleExecutionState; import org.elasticsearch.xpack.core.ilm.LifecycleExecutionState;
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
import org.elasticsearch.xpack.core.ilm.PhaseCompleteStep; import org.elasticsearch.xpack.core.ilm.PhaseCompleteStep;
import org.elasticsearch.xpack.core.ilm.Step; import org.elasticsearch.xpack.core.ilm.Step;
import org.elasticsearch.xpack.core.ilm.Step.StepKey; import org.elasticsearch.xpack.core.ilm.Step.StepKey;
import org.elasticsearch.xpack.core.ilm.TerminalPolicyStep; import org.elasticsearch.xpack.core.ilm.TerminalPolicyStep;
import org.elasticsearch.xpack.ilm.history.ILMHistoryItem;
import org.elasticsearch.xpack.ilm.history.ILMHistoryStore;
import java.util.function.LongSupplier; import java.util.function.LongSupplier;
@ -35,13 +39,15 @@ import static org.elasticsearch.xpack.core.ilm.LifecycleSettings.LIFECYCLE_ORIGI
class IndexLifecycleRunner { class IndexLifecycleRunner {
private static final Logger logger = LogManager.getLogger(IndexLifecycleRunner.class); private static final Logger logger = LogManager.getLogger(IndexLifecycleRunner.class);
private final ThreadPool threadPool; private final ThreadPool threadPool;
private PolicyStepsRegistry stepRegistry; private final ClusterService clusterService;
private ClusterService clusterService; private final PolicyStepsRegistry stepRegistry;
private LongSupplier nowSupplier; private final ILMHistoryStore ilmHistoryStore;
private final LongSupplier nowSupplier;
IndexLifecycleRunner(PolicyStepsRegistry stepRegistry, ClusterService clusterService, IndexLifecycleRunner(PolicyStepsRegistry stepRegistry, ILMHistoryStore ilmHistoryStore, ClusterService clusterService,
ThreadPool threadPool, LongSupplier nowSupplier) { ThreadPool threadPool, LongSupplier nowSupplier) {
this.stepRegistry = stepRegistry; this.stepRegistry = stepRegistry;
this.ilmHistoryStore = ilmHistoryStore;
this.clusterService = clusterService; this.clusterService = clusterService;
this.nowSupplier = nowSupplier; this.nowSupplier = nowSupplier;
this.threadPool = threadPool; this.threadPool = threadPool;
@ -62,17 +68,29 @@ class IndexLifecycleRunner {
} }
/** /**
* Return true or false depending on whether the index is ready to be in {@code phase} * Calculate the index's origination time (in milliseconds) based on its
* metadata. Returns null if there is no lifecycle date and the origination
* date is not set.
*/ */
boolean isReadyToTransitionToThisPhase(final String policy, final IndexMetaData indexMetaData, final String phase) { @Nullable
private static Long calculateOriginationMillis(final IndexMetaData indexMetaData) {
LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(indexMetaData); LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(indexMetaData);
Long originationDate = indexMetaData.getSettings().getAsLong(LIFECYCLE_ORIGINATION_DATE, -1L); Long originationDate = indexMetaData.getSettings().getAsLong(LIFECYCLE_ORIGINATION_DATE, -1L);
if (lifecycleState.getLifecycleDate() == null && originationDate == -1L) { if (lifecycleState.getLifecycleDate() == null && originationDate == -1L) {
logger.trace("no index creation or origination date has been set yet"); return null;
}
return originationDate == -1L ? lifecycleState.getLifecycleDate() : originationDate;
}
/**
* Return true or false depending on whether the index is ready to be in {@code phase}
*/
boolean isReadyToTransitionToThisPhase(final String policy, final IndexMetaData indexMetaData, final String phase) {
final Long lifecycleDate = calculateOriginationMillis(indexMetaData);
if (lifecycleDate == null) {
logger.trace("[{}] no index creation or origination date has been set yet", indexMetaData.getIndex().getName());
return true; return true;
} }
final Long lifecycleDate = originationDate != -1L ? originationDate : lifecycleState.getLifecycleDate();
assert lifecycleDate != null && lifecycleDate >= 0 : "expected index to have a lifecycle date but it did not";
final TimeValue after = stepRegistry.getIndexAgeForPhase(policy, phase); final TimeValue after = stepRegistry.getIndexAgeForPhase(policy, phase);
final long now = nowSupplier.getAsLong(); final long now = nowSupplier.getAsLong();
final TimeValue age = new TimeValue(now - lifecycleDate); final TimeValue age = new TimeValue(now - lifecycleDate);
@ -221,19 +239,26 @@ class IndexLifecycleRunner {
((AsyncActionStep) currentStep).performAction(indexMetaData, currentState, ((AsyncActionStep) currentStep).performAction(indexMetaData, currentState,
new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext()), new AsyncActionStep.Listener() { new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext()), new AsyncActionStep.Listener() {
@Override @Override
public void onResponse(boolean complete) { public void onResponse(boolean complete) {
logger.trace("cs-change-async-action-callback, [{}], current-step: {}", index, currentStep.getKey()); logger.trace("cs-change-async-action-callback, [{}], current-step: {}", index, currentStep.getKey());
if (complete && ((AsyncActionStep) currentStep).indexSurvives()) { if (complete) {
moveToStep(indexMetaData.getIndex(), policy, currentStep.getKey(), currentStep.getNextStepKey()); if (((AsyncActionStep) currentStep).indexSurvives()) {
moveToStep(indexMetaData.getIndex(), policy, currentStep.getKey(), currentStep.getNextStepKey());
} else {
// Delete needs special handling, because after this step we
// will no longer have access to any information about the
// index since it will be... deleted.
registerDeleteOperation(indexMetaData);
}
}
} }
}
@Override @Override
public void onFailure(Exception e) { public void onFailure(Exception e) {
moveToErrorStep(indexMetaData.getIndex(), policy, currentStep.getKey(), e); moveToErrorStep(indexMetaData.getIndex(), policy, currentStep.getKey(), e);
} }
}); });
} else { } else {
logger.trace("[{}] ignoring non async action step execution from step transition [{}]", index, currentStep.getKey()); logger.trace("[{}] ignoring non async action step execution from step transition [{}]", index, currentStep.getKey());
} }
@ -298,6 +323,7 @@ class IndexLifecycleRunner {
new MoveToNextStepUpdateTask(index, policy, currentStepKey, newStepKey, nowSupplier, stepRegistry, clusterState -> new MoveToNextStepUpdateTask(index, policy, currentStepKey, newStepKey, nowSupplier, stepRegistry, clusterState ->
{ {
IndexMetaData indexMetaData = clusterState.metaData().index(index); IndexMetaData indexMetaData = clusterState.metaData().index(index);
registerSuccessfulOperation(indexMetaData);
if (newStepKey != null && newStepKey != TerminalPolicyStep.KEY && indexMetaData != null) { if (newStepKey != null && newStepKey != TerminalPolicyStep.KEY && indexMetaData != null) {
maybeRunAsyncAction(clusterState, indexMetaData, policy, newStepKey); maybeRunAsyncAction(clusterState, indexMetaData, policy, newStepKey);
} }
@ -311,7 +337,10 @@ class IndexLifecycleRunner {
logger.error(new ParameterizedMessage("policy [{}] for index [{}] failed on step [{}]. Moving to ERROR step", logger.error(new ParameterizedMessage("policy [{}] for index [{}] failed on step [{}]. Moving to ERROR step",
policy, index.getName(), currentStepKey), e); policy, index.getName(), currentStepKey), e);
clusterService.submitStateUpdateTask("ilm-move-to-error-step", clusterService.submitStateUpdateTask("ilm-move-to-error-step",
new MoveToErrorStepUpdateTask(index, policy, currentStepKey, e, nowSupplier, stepRegistry::getStep)); new MoveToErrorStepUpdateTask(index, policy, currentStepKey, e, nowSupplier, stepRegistry::getStep, clusterState -> {
IndexMetaData indexMetaData = clusterState.metaData().index(index);
registerFailedOperation(indexMetaData, e);
}));
} }
/** /**
@ -343,4 +372,61 @@ class IndexLifecycleRunner {
setStepInfo(index, policyName, LifecycleExecutionState.getCurrentStepKey(executionState), setStepInfo(index, policyName, LifecycleExecutionState.getCurrentStepKey(executionState),
new SetStepInfoUpdateTask.ExceptionWrapper(e)); new SetStepInfoUpdateTask.ExceptionWrapper(e));
} }
/**
* For the given index metadata, register (index a document) that the index has transitioned
* successfully into this new state using the {@link ILMHistoryStore}
*/
void registerSuccessfulOperation(IndexMetaData indexMetaData) {
if (indexMetaData == null) {
// This index may have been deleted and has no metadata, so ignore it
return;
}
Long origination = calculateOriginationMillis(indexMetaData);
ilmHistoryStore.putAsync(
ILMHistoryItem.success(indexMetaData.getIndex().getName(),
LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexMetaData.getSettings()),
nowSupplier.getAsLong(),
origination == null ? null : (nowSupplier.getAsLong() - origination),
LifecycleExecutionState.fromIndexMetadata(indexMetaData)));
}
/**
* For the given index metadata, register (index a document) that the index
* has been deleted by ILM using the {@link ILMHistoryStore}
*/
void registerDeleteOperation(IndexMetaData metadataBeforeDeletion) {
if (metadataBeforeDeletion == null) {
throw new IllegalStateException("cannot register deletion of an index that did not previously exist");
}
Long origination = calculateOriginationMillis(metadataBeforeDeletion);
ilmHistoryStore.putAsync(
ILMHistoryItem.success(metadataBeforeDeletion.getIndex().getName(),
LifecycleSettings.LIFECYCLE_NAME_SETTING.get(metadataBeforeDeletion.getSettings()),
nowSupplier.getAsLong(),
origination == null ? null : (nowSupplier.getAsLong() - origination),
LifecycleExecutionState.builder(LifecycleExecutionState.fromIndexMetadata(metadataBeforeDeletion))
// Register that the delete phase is now "complete"
.setStep(PhaseCompleteStep.NAME)
.build()));
}
/**
* For the given index metadata, register (index a document) that the index has transitioned
* into the ERROR state using the {@link ILMHistoryStore}
*/
void registerFailedOperation(IndexMetaData indexMetaData, Exception failure) {
if (indexMetaData == null) {
// This index may have been deleted and has no metadata, so ignore it
return;
}
Long origination = calculateOriginationMillis(indexMetaData);
ilmHistoryStore.putAsync(
ILMHistoryItem.failure(indexMetaData.getIndex().getName(),
LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexMetaData.getSettings()),
nowSupplier.getAsLong(),
origination == null ? null : (nowSupplier.getAsLong() - origination),
LifecycleExecutionState.fromIndexMetadata(indexMetaData),
failure));
}
} }

View File

@ -35,6 +35,7 @@ import org.elasticsearch.xpack.core.ilm.OperationMode;
import org.elasticsearch.xpack.core.ilm.ShrinkStep; import org.elasticsearch.xpack.core.ilm.ShrinkStep;
import org.elasticsearch.xpack.core.ilm.Step.StepKey; import org.elasticsearch.xpack.core.ilm.Step.StepKey;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.ilm.history.ILMHistoryStore;
import java.io.Closeable; import java.io.Closeable;
import java.time.Clock; import java.time.Clock;
@ -59,21 +60,24 @@ public class IndexLifecycleService
private final Clock clock; private final Clock clock;
private final PolicyStepsRegistry policyRegistry; private final PolicyStepsRegistry policyRegistry;
private final IndexLifecycleRunner lifecycleRunner; private final IndexLifecycleRunner lifecycleRunner;
private final ILMHistoryStore ilmHistoryStore;
private final Settings settings; private final Settings settings;
private ClusterService clusterService; private ClusterService clusterService;
private LongSupplier nowSupplier; private LongSupplier nowSupplier;
private SchedulerEngine.Job scheduledJob; private SchedulerEngine.Job scheduledJob;
public IndexLifecycleService(Settings settings, Client client, ClusterService clusterService, ThreadPool threadPool, Clock clock, public IndexLifecycleService(Settings settings, Client client, ClusterService clusterService, ThreadPool threadPool, Clock clock,
LongSupplier nowSupplier, NamedXContentRegistry xContentRegistry) { LongSupplier nowSupplier, NamedXContentRegistry xContentRegistry,
ILMHistoryStore ilmHistoryStore) {
super(); super();
this.settings = settings; this.settings = settings;
this.clusterService = clusterService; this.clusterService = clusterService;
this.clock = clock; this.clock = clock;
this.nowSupplier = nowSupplier; this.nowSupplier = nowSupplier;
this.scheduledJob = null; this.scheduledJob = null;
this.ilmHistoryStore = ilmHistoryStore;
this.policyRegistry = new PolicyStepsRegistry(xContentRegistry, client); this.policyRegistry = new PolicyStepsRegistry(xContentRegistry, client);
this.lifecycleRunner = new IndexLifecycleRunner(policyRegistry, clusterService, threadPool, nowSupplier); this.lifecycleRunner = new IndexLifecycleRunner(policyRegistry, ilmHistoryStore, clusterService, threadPool, nowSupplier);
this.pollInterval = LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING.get(settings); this.pollInterval = LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING.get(settings);
clusterService.addStateApplier(this); clusterService.addStateApplier(this);
clusterService.addListener(this); clusterService.addListener(this);

View File

@ -17,6 +17,7 @@ import org.elasticsearch.xpack.core.ilm.Step;
import java.io.IOException; import java.io.IOException;
import java.util.function.BiFunction; import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.LongSupplier; import java.util.function.LongSupplier;
public class MoveToErrorStepUpdateTask extends ClusterStateUpdateTask { public class MoveToErrorStepUpdateTask extends ClusterStateUpdateTask {
@ -24,17 +25,20 @@ public class MoveToErrorStepUpdateTask extends ClusterStateUpdateTask {
private final String policy; private final String policy;
private final Step.StepKey currentStepKey; private final Step.StepKey currentStepKey;
private final BiFunction<IndexMetaData, Step.StepKey, Step> stepLookupFunction; private final BiFunction<IndexMetaData, Step.StepKey, Step> stepLookupFunction;
private final Consumer<ClusterState> stateChangeConsumer;
private LongSupplier nowSupplier; private LongSupplier nowSupplier;
private Exception cause; private Exception cause;
public MoveToErrorStepUpdateTask(Index index, String policy, Step.StepKey currentStepKey, Exception cause, LongSupplier nowSupplier, public MoveToErrorStepUpdateTask(Index index, String policy, Step.StepKey currentStepKey, Exception cause, LongSupplier nowSupplier,
BiFunction<IndexMetaData, Step.StepKey, Step> stepLookupFunction) { BiFunction<IndexMetaData, Step.StepKey, Step> stepLookupFunction,
Consumer<ClusterState> stateChangeConsumer) {
this.index = index; this.index = index;
this.policy = policy; this.policy = policy;
this.currentStepKey = currentStepKey; this.currentStepKey = currentStepKey;
this.cause = cause; this.cause = cause;
this.nowSupplier = nowSupplier; this.nowSupplier = nowSupplier;
this.stepLookupFunction = stepLookupFunction; this.stepLookupFunction = stepLookupFunction;
this.stateChangeConsumer = stateChangeConsumer;
} }
Index getIndex() { Index getIndex() {
@ -73,6 +77,13 @@ public class MoveToErrorStepUpdateTask extends ClusterStateUpdateTask {
} }
} }
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (newState.equals(oldState) == false) {
stateChangeConsumer.accept(newState);
}
}
@Override @Override
public void onFailure(String source, Exception e) { public void onFailure(String source, Exception e) {
throw new ElasticsearchException("policy [" + policy + "] for index [" + index.getName() throw new ElasticsearchException("policy [" + policy + "] for index [" + index.getName()

View File

@ -0,0 +1,115 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ilm.history;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.core.ilm.LifecycleExecutionState;
import java.io.IOException;
import java.util.Collections;
import java.util.Objects;
import static org.elasticsearch.ElasticsearchException.REST_EXCEPTION_SKIP_STACK_TRACE;
/**
* The {@link ILMHistoryItem} class encapsulates the state of an index at a point in time. It should
* be constructed when an index has transitioned into a new step. Construction is done through the
* {@link #success(String, String, long, Long, LifecycleExecutionState)} and
* {@link #failure(String, String, long, Long, LifecycleExecutionState, Exception)} methods.
*/
public class ILMHistoryItem implements ToXContentObject {
private static final ParseField INDEX = new ParseField("index");
private static final ParseField POLICY = new ParseField("policy");
private static final ParseField TIMESTAMP = new ParseField("@timestamp");
private static final ParseField INDEX_AGE = new ParseField("index_age");
private static final ParseField SUCCESS = new ParseField("success");
private static final ParseField EXECUTION_STATE = new ParseField("state");
private static final ParseField ERROR = new ParseField("error_details");
private final String index;
private final String policyId;
private final long timestamp;
@Nullable
private final Long indexAge;
private final boolean success;
@Nullable
private final LifecycleExecutionState executionState;
@Nullable
private final String errorDetails;
private ILMHistoryItem(String index, String policyId, long timestamp, @Nullable Long indexAge, boolean success,
@Nullable LifecycleExecutionState executionState, @Nullable String errorDetails) {
this.index = index;
this.policyId = policyId;
this.timestamp = timestamp;
this.indexAge = indexAge;
this.success = success;
this.executionState = executionState;
this.errorDetails = errorDetails;
}
public static ILMHistoryItem success(String index, String policyId, long timestamp, @Nullable Long indexAge,
@Nullable LifecycleExecutionState executionState) {
return new ILMHistoryItem(index, policyId, timestamp, indexAge, true, executionState, null);
}
public static ILMHistoryItem failure(String index, String policyId, long timestamp, @Nullable Long indexAge,
@Nullable LifecycleExecutionState executionState, Exception error) {
Objects.requireNonNull(error, "ILM failures require an attached exception");
return new ILMHistoryItem(index, policyId, timestamp, indexAge, false, executionState, exceptionToString(error));
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(INDEX.getPreferredName(), index);
builder.field(POLICY.getPreferredName(), policyId);
builder.field(TIMESTAMP.getPreferredName(), timestamp);
if (indexAge != null) {
builder.field(INDEX_AGE.getPreferredName(), indexAge);
}
builder.field(SUCCESS.getPreferredName(), success);
if (executionState != null) {
builder.field(EXECUTION_STATE.getPreferredName(), executionState.asMap());
}
if (errorDetails != null) {
builder.field(ERROR.getPreferredName(), errorDetails);
}
builder.endObject();
return builder;
}
private static String exceptionToString(Exception exception) {
Params stacktraceParams = new MapParams(Collections.singletonMap(REST_EXCEPTION_SKIP_STACK_TRACE, "false"));
String exceptionString;
try (XContentBuilder causeXContentBuilder = JsonXContent.contentBuilder()) {
causeXContentBuilder.startObject();
ElasticsearchException.generateThrowableXContent(causeXContentBuilder, stacktraceParams, exception);
causeXContentBuilder.endObject();
exceptionString = BytesReference.bytes(causeXContentBuilder).utf8ToString();
} catch (IOException e) {
// In the unlikely case that we cannot generate an exception string,
// try the best way can to encapsulate the error(s) with at least
// the message
exceptionString = "unable to generate the ILM error details due to: " + e.getMessage() +
"; the ILM error was: " + exception.getMessage();
}
return exceptionString;
}
@Override
public String toString() {
return Strings.toString(this);
}
}

View File

@ -0,0 +1,226 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ilm.history;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasOrIndex;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.elasticsearch.xpack.core.ClientHelper.INDEX_LIFECYCLE_ORIGIN;
import static org.elasticsearch.xpack.core.ilm.LifecycleSettings.LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING;
import static org.elasticsearch.xpack.ilm.history.ILMHistoryTemplateRegistry.INDEX_TEMPLATE_VERSION;
/**
* The {@link ILMHistoryStore} handles indexing {@link ILMHistoryItem} documents into the
* appropriate index. It sets up a {@link BulkProcessor} for indexing in bulk, and handles creation
* of the index/alias as needed for ILM policies.
*/
public class ILMHistoryStore implements Closeable {
private static final Logger logger = LogManager.getLogger(ILMHistoryStore.class);
public static final String ILM_HISTORY_INDEX_PREFIX = "ilm-history-" + INDEX_TEMPLATE_VERSION + "-";
public static final String ILM_HISTORY_ALIAS = "ilm-history-" + INDEX_TEMPLATE_VERSION;
private final boolean ilmHistoryEnabled;
private final BulkProcessor processor;
private final ThreadPool threadPool;
public ILMHistoryStore(Settings nodeSettings, Client client, ClusterService clusterService, ThreadPool threadPool) {
this.ilmHistoryEnabled = LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING.get(nodeSettings);
this.threadPool = threadPool;
this.processor = BulkProcessor.builder(
new OriginSettingClient(client, INDEX_LIFECYCLE_ORIGIN)::bulk,
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
// Prior to actually performing the bulk, we should ensure the index exists, and
// if we were unable to create it or it was in a bad state, we should not
// attempt to index documents.
try {
final CompletableFuture<Boolean> indexCreated = new CompletableFuture<>();
ensureHistoryIndex(client, clusterService.state(), ActionListener.wrap(indexCreated::complete,
ex -> {
logger.warn("failed to create ILM history store index prior to issuing bulk request", ex);
indexCreated.completeExceptionally(ex);
}));
indexCreated.get(2, TimeUnit.MINUTES);
} catch (Exception e) {
logger.warn(new ParameterizedMessage("unable to index the following ILM history items:\n{}",
request.requests().stream()
.filter(dwr -> (dwr instanceof IndexRequest))
.map(dwr -> ((IndexRequest) dwr))
.map(IndexRequest::sourceAsMap)
.map(Object::toString)
.collect(Collectors.joining("\n"))), e);
throw new ElasticsearchException(e);
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
long items = request.numberOfActions();
if (logger.isTraceEnabled()) {
logger.trace("indexed [{}] items into ILM history index [{}]", items,
Arrays.stream(response.getItems())
.map(BulkItemResponse::getIndex)
.distinct()
.collect(Collectors.joining(",")));
}
if (response.hasFailures()) {
Map<String, String> failures = Arrays.stream(response.getItems())
.filter(BulkItemResponse::isFailed)
.collect(Collectors.toMap(BulkItemResponse::getId, BulkItemResponse::getFailureMessage));
logger.error("failures: [{}]", failures);
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
long items = request.numberOfActions();
logger.error(new ParameterizedMessage("failed to index {} items into ILM history index", items), failure);
}
})
.setBulkActions(100)
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
.setFlushInterval(TimeValue.timeValueSeconds(5))
.setConcurrentRequests(1)
.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(1000), 3))
.build();
}
/**
* Attempts to asynchronously index an ILM history entry
*/
public void putAsync(ILMHistoryItem item) {
if (ilmHistoryEnabled == false) {
logger.trace("not recording ILM history item because [{}] is [false]: [{}]",
LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING.getKey(), item);
return;
}
logger.trace("queueing ILM history item for indexing [{}]: [{}]", ILM_HISTORY_ALIAS, item);
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
item.toXContent(builder, ToXContent.EMPTY_PARAMS);
IndexRequest request = new IndexRequest(ILM_HISTORY_ALIAS).source(builder);
// TODO: remove the threadpool wrapping when the .add call is non-blocking
// (it can currently execute the bulk request occasionally)
// see: https://github.com/elastic/elasticsearch/issues/50440
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
try {
processor.add(request);
} catch (Exception e) {
logger.error(new ParameterizedMessage("failed add ILM history item to queue for index [{}]: [{}]",
ILM_HISTORY_ALIAS, item), e);
}
});
} catch (IOException exception) {
logger.error(new ParameterizedMessage("failed to queue ILM history item in index [{}]: [{}]",
ILM_HISTORY_ALIAS, item), exception);
}
}
/**
* Checks if the ILM history index exists, and if not, creates it.
*
* @param client The client to use to create the index if needed
* @param state The current cluster state, to determine if the alias exists
* @param listener Called after the index has been created. `onResponse` called with `true` if the index was created,
* `false` if it already existed.
*/
static void ensureHistoryIndex(Client client, ClusterState state, ActionListener<Boolean> listener) {
final String initialHistoryIndexName = ILM_HISTORY_INDEX_PREFIX + "000001";
final AliasOrIndex ilmHistory = state.metaData().getAliasAndIndexLookup().get(ILM_HISTORY_ALIAS);
final AliasOrIndex initialHistoryIndex = state.metaData().getAliasAndIndexLookup().get(initialHistoryIndexName);
if (ilmHistory == null && initialHistoryIndex == null) {
// No alias or index exists with the expected names, so create the index with appropriate alias
logger.debug("creating ILM history index [{}]", initialHistoryIndexName);
client.admin().indices().prepareCreate(initialHistoryIndexName)
.setWaitForActiveShards(1)
.addAlias(new Alias(ILM_HISTORY_ALIAS)
.writeIndex(true))
.execute(new ActionListener<CreateIndexResponse>() {
@Override
public void onResponse(CreateIndexResponse response) {
listener.onResponse(true);
}
@Override
public void onFailure(Exception e) {
if (e instanceof ResourceAlreadyExistsException) {
// The index didn't exist before we made the call, there was probably a race - just ignore this
logger.debug("index [{}] was created after checking for its existence, likely due to a concurrent call",
initialHistoryIndexName);
listener.onResponse(false);
} else {
listener.onFailure(e);
}
}
});
} else if (ilmHistory == null) {
// alias does not exist but initial index does, something is broken
listener.onFailure(new IllegalStateException("ILM history index [" + initialHistoryIndexName +
"] already exists but does not have alias [" + ILM_HISTORY_ALIAS + "]"));
} else if (ilmHistory.isAlias() && ilmHistory instanceof AliasOrIndex.Alias) {
if (((AliasOrIndex.Alias) ilmHistory).getWriteIndex() != null) {
// The alias exists and has a write index, so we're good
listener.onResponse(false);
} else {
// The alias does not have a write index, so we can't index into it
listener.onFailure(new IllegalStateException("ILM history alias [" + ILM_HISTORY_ALIAS + "does not have a write index"));
}
} else if (ilmHistory.isAlias() == false) {
// This is not an alias, error out
listener.onFailure(new IllegalStateException("ILM history alias [" + ILM_HISTORY_ALIAS +
"] already exists as concrete index"));
} else {
logger.error("unexpected IndexOrAlias for [{}]: [{}]", ILM_HISTORY_ALIAS, ilmHistory);
assert false : ILM_HISTORY_ALIAS + " cannot be both an alias and not an alias simultaneously";
}
}
@Override
public void close() {
try {
processor.awaitClose(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.warn("failed to shut down ILM history bulk processor after 10 seconds", e);
}
}
}

View File

@ -0,0 +1,80 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ilm.history;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
import org.elasticsearch.xpack.core.template.IndexTemplateConfig;
import org.elasticsearch.xpack.core.template.IndexTemplateRegistry;
import org.elasticsearch.xpack.core.template.LifecyclePolicyConfig;
import java.util.Collections;
import java.util.List;
/**
* The {@link ILMHistoryTemplateRegistry} class sets up and configures an ILM policy and index
* template for the ILM history indices (ilm-history-N-00000M).
*/
public class ILMHistoryTemplateRegistry extends IndexTemplateRegistry {
// history (please add a comment why you increased the version here)
// version 1: initial
public static final String INDEX_TEMPLATE_VERSION = "1";
public static final String ILM_TEMPLATE_VERSION_VARIABLE = "xpack.ilm_history.template.version";
public static final String ILM_TEMPLATE_NAME = "ilm-history";
public static final String ILM_POLICY_NAME = "ilm-history-ilm-policy";
public static final IndexTemplateConfig TEMPLATE_ILM_HISTORY = new IndexTemplateConfig(
ILM_TEMPLATE_NAME,
"/ilm-history.json",
INDEX_TEMPLATE_VERSION,
ILM_TEMPLATE_VERSION_VARIABLE
);
public static final LifecyclePolicyConfig ILM_HISTORY_POLICY = new LifecyclePolicyConfig(
ILM_POLICY_NAME,
"/ilm-history-ilm-policy.json"
);
private final boolean ilmHistoryEnabled;
public ILMHistoryTemplateRegistry(Settings nodeSettings, ClusterService clusterService,
ThreadPool threadPool, Client client,
NamedXContentRegistry xContentRegistry) {
super(nodeSettings, clusterService, threadPool, client, xContentRegistry);
this.ilmHistoryEnabled = LifecycleSettings.LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING.get(nodeSettings);
}
@Override
protected List<IndexTemplateConfig> getTemplateConfigs() {
if (this.ilmHistoryEnabled) {
return Collections.singletonList(TEMPLATE_ILM_HISTORY);
} else {
return Collections.emptyList();
}
}
@Override
protected List<LifecyclePolicyConfig> getPolicyConfigs() {
if (this.ilmHistoryEnabled) {
return Collections.singletonList(ILM_HISTORY_POLICY);
} else {
return Collections.emptyList();
}
}
@Override
protected String getOrigin() {
return ClientHelper.INDEX_LIFECYCLE_ORIGIN;
}
}

View File

@ -107,7 +107,8 @@ public class IndexLifecycleInitialisationTests extends ESIntegTestCase {
settings.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false); settings.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false);
settings.put(LifecycleSettings.LIFECYCLE_POLL_INTERVAL, "1s"); settings.put(LifecycleSettings.LIFECYCLE_POLL_INTERVAL, "1s");
// This is necessary to prevent SLM installing a lifecycle policy, these tests assume a blank slate // This is necessary to prevent ILM and SLM installing a lifecycle policy, these tests assume a blank slate
settings.put(LifecycleSettings.LIFECYCLE_HISTORY_INDEX_ENABLED, false);
settings.put(LifecycleSettings.SLM_HISTORY_INDEX_ENABLED_SETTING.getKey(), false); settings.put(LifecycleSettings.SLM_HISTORY_INDEX_ENABLED_SETTING.getKey(), false);
return settings.build(); return settings.build();
} }

View File

@ -55,6 +55,8 @@ import org.elasticsearch.xpack.core.ilm.Step;
import org.elasticsearch.xpack.core.ilm.Step.StepKey; import org.elasticsearch.xpack.core.ilm.Step.StepKey;
import org.elasticsearch.xpack.core.ilm.TerminalPolicyStep; import org.elasticsearch.xpack.core.ilm.TerminalPolicyStep;
import org.elasticsearch.xpack.core.ilm.WaitForRolloverReadyStep; import org.elasticsearch.xpack.core.ilm.WaitForRolloverReadyStep;
import org.elasticsearch.xpack.ilm.history.ILMHistoryItem;
import org.elasticsearch.xpack.ilm.history.ILMHistoryStore;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.mockito.ArgumentMatcher; import org.mockito.ArgumentMatcher;
@ -91,6 +93,8 @@ import static org.mockito.Mockito.when;
public class IndexLifecycleRunnerTests extends ESTestCase { public class IndexLifecycleRunnerTests extends ESTestCase {
private static final NamedXContentRegistry REGISTRY; private static final NamedXContentRegistry REGISTRY;
private ThreadPool threadPool; private ThreadPool threadPool;
private Client noopClient;
private NoOpHistoryStore historyStore;
static { static {
try (IndexLifecycle indexLifecycle = new IndexLifecycle(Settings.EMPTY)) { try (IndexLifecycle indexLifecycle = new IndexLifecycle(Settings.EMPTY)) {
@ -100,12 +104,16 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
} }
@Before @Before
public void prepareThreadPool() { public void prepare() {
threadPool = new TestThreadPool("test"); threadPool = new TestThreadPool("test");
noopClient = new NoOpClient(threadPool);
historyStore = new NoOpHistoryStore();
} }
@After @After
public void shutdown() { public void shutdown() {
historyStore.close();
noopClient.close();
threadPool.shutdownNow(); threadPool.shutdownNow();
} }
@ -114,7 +122,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
TerminalPolicyStep step = TerminalPolicyStep.INSTANCE; TerminalPolicyStep step = TerminalPolicyStep.INSTANCE;
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
ClusterService clusterService = mock(ClusterService.class); ClusterService clusterService = mock(ClusterService.class);
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L);
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
@ -136,7 +144,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
ClusterService clusterService = mock(ClusterService.class); ClusterService clusterService = mock(ClusterService.class);
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L);
LifecycleExecutionState.Builder newState = LifecycleExecutionState.builder(); LifecycleExecutionState.Builder newState = LifecycleExecutionState.builder();
newState.setFailedStep(stepKey.getName()); newState.setFailedStep(stepKey.getName());
newState.setIsAutoRetryableError(false); newState.setIsAutoRetryableError(false);
@ -176,7 +184,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, waitForRolloverStep); PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, waitForRolloverStep);
ClusterService clusterService = mock(ClusterService.class); ClusterService clusterService = mock(ClusterService.class);
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L);
LifecycleExecutionState.Builder newState = LifecycleExecutionState.builder(); LifecycleExecutionState.Builder newState = LifecycleExecutionState.builder();
newState.setFailedStep(stepKey.getName()); newState.setFailedStep(stepKey.getName());
newState.setIsAutoRetryableError(true); newState.setIsAutoRetryableError(true);
@ -221,7 +229,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
.localNodeId(node.getId())) .localNodeId(node.getId()))
.build(); .build();
ClusterServiceUtils.setState(clusterService, state); ClusterServiceUtils.setState(clusterService, state);
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L);
ClusterState before = clusterService.state(); ClusterState before = clusterService.state();
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
@ -282,7 +290,8 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
.build(); .build();
ClusterServiceUtils.setState(clusterService, state); ClusterServiceUtils.setState(clusterService, state);
long stepTime = randomLong(); long stepTime = randomLong();
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> stepTime); IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore,
clusterService, threadPool, () -> stepTime);
ClusterState before = clusterService.state(); ClusterState before = clusterService.state();
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
@ -303,6 +312,14 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
assertThat(nextStep.getExecuteCount(), equalTo(1L)); assertThat(nextStep.getExecuteCount(), equalTo(1L));
clusterService.close(); clusterService.close();
threadPool.shutdownNow(); threadPool.shutdownNow();
ILMHistoryItem historyItem = historyStore.getItems().stream()
.findFirst()
.orElseThrow(() -> new AssertionError("failed to register ILM history"));
assertThat(historyItem.toString(),
containsString("{\"index\":\"test\",\"policy\":\"foo\",\"@timestamp\":" + stepTime +
",\"success\":true,\"state\":{\"phase\":\"phase\",\"action\":\"action\"," +
"\"step\":\"next_cluster_state_action_step\",\"step_time\":\"" + stepTime + "\"}}"));
} }
public void testRunPeriodicPolicyWithFailureToReadPolicy() throws Exception { public void testRunPeriodicPolicyWithFailureToReadPolicy() throws Exception {
@ -357,7 +374,8 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
.build(); .build();
ClusterServiceUtils.setState(clusterService, state); ClusterServiceUtils.setState(clusterService, state);
long stepTime = randomLong(); long stepTime = randomLong();
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> stepTime); IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore,
clusterService, threadPool, () -> stepTime);
ClusterState before = clusterService.state(); ClusterState before = clusterService.state();
if (asyncAction) { if (asyncAction) {
@ -409,7 +427,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
.localNodeId(node.getId())) .localNodeId(node.getId()))
.build(); .build();
ClusterServiceUtils.setState(clusterService, state); ClusterServiceUtils.setState(clusterService, state);
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L);
ClusterState before = clusterService.state(); ClusterState before = clusterService.state();
// State changes should not run AsyncAction steps // State changes should not run AsyncAction steps
@ -468,7 +486,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
.build(); .build();
logger.info("--> state: {}", state); logger.info("--> state: {}", state);
ClusterServiceUtils.setState(clusterService, state); ClusterServiceUtils.setState(clusterService, state);
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L);
ClusterState before = clusterService.state(); ClusterState before = clusterService.state();
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
@ -488,6 +506,13 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
assertThat(nextStep.getExecuteCount(), equalTo(1L)); assertThat(nextStep.getExecuteCount(), equalTo(1L));
clusterService.close(); clusterService.close();
threadPool.shutdownNow(); threadPool.shutdownNow();
ILMHistoryItem historyItem = historyStore.getItems().stream()
.findFirst()
.orElseThrow(() -> new AssertionError("failed to register ILM history"));
assertThat(historyItem.toString(),
containsString("{\"index\":\"test\",\"policy\":\"foo\",\"@timestamp\":0,\"success\":true," +
"\"state\":{\"phase\":\"phase\",\"action\":\"action\",\"step\":\"async_action_step\",\"step_time\":\"0\"}}"));
} }
public void testRunPeriodicStep() throws Exception { public void testRunPeriodicStep() throws Exception {
@ -535,7 +560,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
.build(); .build();
logger.info("--> state: {}", state); logger.info("--> state: {}", state);
ClusterServiceUtils.setState(clusterService, state); ClusterServiceUtils.setState(clusterService, state);
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L);
ClusterState before = clusterService.state(); ClusterState before = clusterService.state();
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
@ -558,7 +583,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
MockClusterStateActionStep step = new MockClusterStateActionStep(stepKey, null); MockClusterStateActionStep step = new MockClusterStateActionStep(stepKey, null);
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
ClusterService clusterService = mock(ClusterService.class); ClusterService clusterService = mock(ClusterService.class);
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L);
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
@ -576,7 +601,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
step.setWillComplete(true); step.setWillComplete(true);
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
ClusterService clusterService = mock(ClusterService.class); ClusterService clusterService = mock(ClusterService.class);
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L);
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
@ -595,7 +620,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
step.setException(expectedException); step.setException(expectedException);
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
ClusterService clusterService = mock(ClusterService.class); ClusterService clusterService = mock(ClusterService.class);
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L);
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
@ -613,7 +638,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
step.setException(expectedException); step.setException(expectedException);
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
ClusterService clusterService = mock(ClusterService.class); ClusterService clusterService = mock(ClusterService.class);
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L); IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L);
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
@ -627,7 +652,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
String policyName = "cluster_state_action_policy"; String policyName = "cluster_state_action_policy";
ClusterService clusterService = mock(ClusterService.class); ClusterService clusterService = mock(ClusterService.class);
IndexLifecycleRunner runner = new IndexLifecycleRunner(new PolicyStepsRegistry(NamedXContentRegistry.EMPTY, null), IndexLifecycleRunner runner = new IndexLifecycleRunner(new PolicyStepsRegistry(NamedXContentRegistry.EMPTY, null),
clusterService, threadPool, () -> 0L); historyStore, clusterService, threadPool, () -> 0L);
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
// verify that no exception is thrown // verify that no exception is thrown
@ -712,7 +737,8 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
stepMap, NamedXContentRegistry.EMPTY, null); stepMap, NamedXContentRegistry.EMPTY, null);
ClusterService clusterService = mock(ClusterService.class); ClusterService clusterService = mock(ClusterService.class);
final AtomicLong now = new AtomicLong(5); final AtomicLong now = new AtomicLong(5);
IndexLifecycleRunner runner = new IndexLifecycleRunner(policyStepsRegistry, clusterService, threadPool, now::get); IndexLifecycleRunner runner = new IndexLifecycleRunner(policyStepsRegistry, historyStore,
clusterService, threadPool, now::get);
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
.numberOfShards(randomIntBetween(1, 5)) .numberOfShards(randomIntBetween(1, 5))
.numberOfReplicas(randomIntBetween(0, 5)) .numberOfReplicas(randomIntBetween(0, 5))
@ -1086,4 +1112,23 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
when(client.settings()).thenReturn(Settings.EMPTY); when(client.settings()).thenReturn(Settings.EMPTY);
return new MockPolicyStepsRegistry(lifecyclePolicyMap, firstStepMap, stepMap, REGISTRY, client); return new MockPolicyStepsRegistry(lifecyclePolicyMap, firstStepMap, stepMap, REGISTRY, client);
} }
private class NoOpHistoryStore extends ILMHistoryStore {
private final List<ILMHistoryItem> items = new ArrayList<>();
NoOpHistoryStore() {
super(Settings.EMPTY, noopClient, null, null);
}
public List<ILMHistoryItem> getItems() {
return items;
}
@Override
public void putAsync(ILMHistoryItem item) {
logger.info("--> adding ILM history item: [{}]", item);
items.add(item);
}
}
} }

View File

@ -108,7 +108,7 @@ public class IndexLifecycleServiceTests extends ESTestCase {
threadPool = new TestThreadPool("test"); threadPool = new TestThreadPool("test");
indexLifecycleService = new IndexLifecycleService(Settings.EMPTY, client, clusterService, threadPool, indexLifecycleService = new IndexLifecycleService(Settings.EMPTY, client, clusterService, threadPool,
clock, () -> now, null); clock, () -> now, null, null);
Mockito.verify(clusterService).addListener(indexLifecycleService); Mockito.verify(clusterService).addListener(indexLifecycleService);
Mockito.verify(clusterService).addStateApplier(indexLifecycleService); Mockito.verify(clusterService).addStateApplier(indexLifecycleService);
} }

View File

@ -75,7 +75,7 @@ public class MoveToErrorStepUpdateTaskTests extends ESTestCase {
setStateToKey(currentStepKey); setStateToKey(currentStepKey);
MoveToErrorStepUpdateTask task = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, cause, () -> now, MoveToErrorStepUpdateTask task = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, cause, () -> now,
(idxMeta, stepKey) -> new MockStep(stepKey, nextStepKey)); (idxMeta, stepKey) -> new MockStep(stepKey, nextStepKey), state -> {});
ClusterState newState = task.execute(clusterState); ClusterState newState = task.execute(clusterState);
LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(newState.getMetaData().index(index)); LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(newState.getMetaData().index(index));
StepKey actualKey = LifecycleExecutionState.getCurrentStepKey(lifecycleState); StepKey actualKey = LifecycleExecutionState.getCurrentStepKey(lifecycleState);
@ -101,7 +101,7 @@ public class MoveToErrorStepUpdateTaskTests extends ESTestCase {
Exception cause = new ElasticsearchException("THIS IS AN EXPECTED CAUSE"); Exception cause = new ElasticsearchException("THIS IS AN EXPECTED CAUSE");
setStateToKey(notCurrentStepKey); setStateToKey(notCurrentStepKey);
MoveToErrorStepUpdateTask task = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, cause, () -> now, MoveToErrorStepUpdateTask task = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, cause, () -> now,
(idxMeta, stepKey) -> new MockStep(stepKey, new StepKey("next-phase", "action", "step"))); (idxMeta, stepKey) -> new MockStep(stepKey, new StepKey("next-phase", "action", "step")), state -> {});
ClusterState newState = task.execute(clusterState); ClusterState newState = task.execute(clusterState);
assertThat(newState, sameInstance(clusterState)); assertThat(newState, sameInstance(clusterState));
} }
@ -113,7 +113,7 @@ public class MoveToErrorStepUpdateTaskTests extends ESTestCase {
setStateToKey(currentStepKey); setStateToKey(currentStepKey);
setStatePolicy("not-" + policy); setStatePolicy("not-" + policy);
MoveToErrorStepUpdateTask task = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, cause, () -> now, MoveToErrorStepUpdateTask task = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, cause, () -> now,
(idxMeta, stepKey) -> new MockStep(stepKey, new StepKey("next-phase", "action", "step"))); (idxMeta, stepKey) -> new MockStep(stepKey, new StepKey("next-phase", "action", "step")), state -> {});
ClusterState newState = task.execute(clusterState); ClusterState newState = task.execute(clusterState);
assertThat(newState, sameInstance(clusterState)); assertThat(newState, sameInstance(clusterState));
} }
@ -126,7 +126,7 @@ public class MoveToErrorStepUpdateTaskTests extends ESTestCase {
setStateToKey(currentStepKey); setStateToKey(currentStepKey);
MoveToErrorStepUpdateTask task = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, cause, () -> now, MoveToErrorStepUpdateTask task = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, cause, () -> now,
(idxMeta, stepKey) -> new MockStep(stepKey, new StepKey("next-phase", "action", "step"))); (idxMeta, stepKey) -> new MockStep(stepKey, new StepKey("next-phase", "action", "step")), state -> {});
Exception expectedException = new RuntimeException(); Exception expectedException = new RuntimeException();
ElasticsearchException exception = expectThrows(ElasticsearchException.class, ElasticsearchException exception = expectThrows(ElasticsearchException.class,
() -> task.onFailure(randomAlphaOfLength(10), expectedException)); () -> task.onFailure(randomAlphaOfLength(10), expectedException));

View File

@ -0,0 +1,92 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ilm.history;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ilm.LifecycleExecutionState;
import java.io.IOException;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.startsWith;
public class ILMHistoryItemTests extends ESTestCase {
public void testToXContent() throws IOException {
ILMHistoryItem success = ILMHistoryItem.success("index", "policy", 1234L, 100L,
LifecycleExecutionState.builder()
.setPhase("phase")
.setAction("action")
.setStep("step")
.setPhaseTime(10L)
.setActionTime(20L)
.setStepTime(30L)
.setPhaseDefinition("{}")
.setStepInfo("{\"step_info\": \"foo\"")
.build());
ILMHistoryItem failure = ILMHistoryItem.failure("index", "policy", 1234L, 100L,
LifecycleExecutionState.builder()
.setPhase("phase")
.setAction("action")
.setStep("ERROR")
.setFailedStep("step")
.setFailedStepRetryCount(7)
.setIsAutoRetryableError(true)
.setPhaseTime(10L)
.setActionTime(20L)
.setStepTime(30L)
.setPhaseDefinition("{\"phase_json\": \"eggplant\"}")
.setStepInfo("{\"step_info\": \"foo\"")
.build(),
new IllegalArgumentException("failure"));
try (XContentBuilder builder = jsonBuilder()) {
success.toXContent(builder, ToXContent.EMPTY_PARAMS);
String json = Strings.toString(builder);
assertThat(json, equalTo("{\"index\":\"index\"," +
"\"policy\":\"policy\"," +
"\"@timestamp\":1234," +
"\"index_age\":100," +
"\"success\":true," +
"\"state\":{\"phase\":\"phase\"," +
"\"phase_definition\":\"{}\"," +
"\"action_time\":\"20\"," +
"\"phase_time\":\"10\"," +
"\"step_info\":\"{\\\"step_info\\\": \\\"foo\\\"\",\"action\":\"action\",\"step\":\"step\",\"step_time\":\"30\"}}"
));
}
try (XContentBuilder builder = jsonBuilder()) {
failure.toXContent(builder, ToXContent.EMPTY_PARAMS);
String json = Strings.toString(builder);
assertThat(json, startsWith("{\"index\":\"index\"," +
"\"policy\":\"policy\"," +
"\"@timestamp\":1234," +
"\"index_age\":100," +
"\"success\":false," +
"\"state\":{\"phase\":\"phase\"," +
"\"failed_step\":\"step\"," +
"\"phase_definition\":\"{\\\"phase_json\\\": \\\"eggplant\\\"}\"," +
"\"action_time\":\"20\"," +
"\"is_auto_retryable_error\":\"true\"," +
"\"failed_step_retry_count\":\"7\"," +
"\"phase_time\":\"10\"," +
"\"step_info\":\"{\\\"step_info\\\": \\\"foo\\\"\"," +
"\"action\":\"action\"," +
"\"step\":\"ERROR\"," +
"\"step_time\":\"30\"}," +
"\"error_details\":\"{\\\"type\\\":\\\"illegal_argument_exception\\\"," +
"\\\"reason\\\":\\\"failure\\\"," +
"\\\"stack_trace\\\":\\\"java.lang.IllegalArgumentException: failure"));
}
}
}

View File

@ -0,0 +1,398 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ilm.history;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.TriFunction;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.client.NoOpClient;
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ilm.LifecycleExecutionState;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import static org.elasticsearch.xpack.core.ilm.LifecycleSettings.LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING;
import static org.elasticsearch.xpack.ilm.history.ILMHistoryStore.ILM_HISTORY_ALIAS;
import static org.elasticsearch.xpack.ilm.history.ILMHistoryStore.ILM_HISTORY_INDEX_PREFIX;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
public class ILMHistoryStoreTests extends ESTestCase {
private ThreadPool threadPool;
private VerifyingClient client;
private ClusterService clusterService;
private ILMHistoryStore historyStore;
@Before
public void setup() {
threadPool = new TestThreadPool(this.getClass().getName());
client = new VerifyingClient(threadPool);
clusterService = ClusterServiceUtils.createClusterService(threadPool);
historyStore = new ILMHistoryStore(Settings.EMPTY, client, clusterService, threadPool);
}
@After
public void setdown() {
historyStore.close();
clusterService.close();
client.close();
threadPool.shutdownNow();
}
public void testNoActionIfDisabled() throws Exception {
Settings settings = Settings.builder().put(LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING.getKey(), false).build();
try (ILMHistoryStore disabledHistoryStore = new ILMHistoryStore(settings, client, null, threadPool)) {
String policyId = randomAlphaOfLength(5);
final long timestamp = randomNonNegativeLong();
ILMHistoryItem record = ILMHistoryItem.success("index", policyId, timestamp, null, null);
CountDownLatch latch = new CountDownLatch(1);
client.setVerifier((a, r, l) -> {
fail("the history store is disabled, no action should have been taken");
latch.countDown();
return null;
});
disabledHistoryStore.putAsync(record);
latch.await(10, TimeUnit.SECONDS);
}
}
@SuppressWarnings("unchecked")
public void testPut() throws Exception {
String policyId = randomAlphaOfLength(5);
final long timestamp = randomNonNegativeLong();
{
ILMHistoryItem record = ILMHistoryItem.success("index", policyId, timestamp, 10L,
LifecycleExecutionState.builder()
.setPhase("phase")
.build());
AtomicInteger calledTimes = new AtomicInteger(0);
client.setVerifier((action, request, listener) -> {
if (action instanceof CreateIndexAction && request instanceof CreateIndexRequest) {
return new CreateIndexResponse(true, true, ((CreateIndexRequest) request).index());
}
calledTimes.incrementAndGet();
assertThat(action, instanceOf(BulkAction.class));
assertThat(request, instanceOf(BulkRequest.class));
BulkRequest bulkRequest = (BulkRequest) request;
bulkRequest.requests().forEach(dwr -> assertEquals(ILM_HISTORY_ALIAS, dwr.index()));
assertNotNull(listener);
// The content of this BulkResponse doesn't matter, so just make it have the same number of responses
int responses = bulkRequest.numberOfActions();
return new BulkResponse(IntStream.range(0, responses)
.mapToObj(i -> new BulkItemResponse(i, DocWriteRequest.OpType.INDEX,
new IndexResponse(new ShardId("index", "uuid", 0), "_doc", randomAlphaOfLength(10), 1, 1, 1, true)))
.toArray(BulkItemResponse[]::new),
1000L);
});
historyStore.putAsync(record);
assertBusy(() -> assertThat(calledTimes.get(), equalTo(1)));
}
{
final String cause = randomAlphaOfLength(9);
Exception failureException = new RuntimeException(cause);
ILMHistoryItem record = ILMHistoryItem.failure("index", policyId, timestamp, 10L,
LifecycleExecutionState.builder()
.setPhase("phase")
.build(), failureException);
AtomicInteger calledTimes = new AtomicInteger(0);
client.setVerifier((action, request, listener) -> {
if (action instanceof CreateIndexAction && request instanceof CreateIndexRequest) {
return new CreateIndexResponse(true, true, ((CreateIndexRequest) request).index());
}
calledTimes.incrementAndGet();
assertThat(action, instanceOf(BulkAction.class));
assertThat(request, instanceOf(BulkRequest.class));
BulkRequest bulkRequest = (BulkRequest) request;
bulkRequest.requests().forEach(dwr -> {
assertEquals(ILM_HISTORY_ALIAS, dwr.index());
assertThat(dwr, instanceOf(IndexRequest.class));
IndexRequest ir = (IndexRequest) dwr;
String indexedDocument = ir.source().utf8ToString();
assertThat(indexedDocument, Matchers.containsString("runtime_exception"));
assertThat(indexedDocument, Matchers.containsString(cause));
});
assertNotNull(listener);
// The content of this BulkResponse doesn't matter, so just make it have the same number of responses with failures
int responses = bulkRequest.numberOfActions();
return new BulkResponse(IntStream.range(0, responses)
.mapToObj(i -> new BulkItemResponse(i, DocWriteRequest.OpType.INDEX,
new BulkItemResponse.Failure("index", "_doc", i + "", failureException)))
.toArray(BulkItemResponse[]::new),
1000L);
});
historyStore.putAsync(record);
assertBusy(() -> assertThat(calledTimes.get(), equalTo(1)));
}
}
public void testHistoryIndexNeedsCreation() throws InterruptedException {
ClusterState state = ClusterState.builder(new ClusterName(randomAlphaOfLength(5)))
.metaData(MetaData.builder())
.build();
client.setVerifier((a, r, l) -> {
assertThat(a, instanceOf(CreateIndexAction.class));
assertThat(r, instanceOf(CreateIndexRequest.class));
CreateIndexRequest request = (CreateIndexRequest) r;
assertThat(request.aliases(), Matchers.hasSize(1));
request.aliases().forEach(alias -> {
assertThat(alias.name(), equalTo(ILM_HISTORY_ALIAS));
assertTrue(alias.writeIndex());
});
return new CreateIndexResponse(true, true, request.index());
});
CountDownLatch latch = new CountDownLatch(1);
ILMHistoryStore.ensureHistoryIndex(client, state, new LatchedActionListener<>(ActionListener.wrap(
Assert::assertTrue,
ex -> {
logger.error(ex);
fail("should have called onResponse, not onFailure");
}), latch));
ElasticsearchAssertions.awaitLatch(latch, 10, TimeUnit.SECONDS);
}
public void testHistoryIndexProperlyExistsAlready() throws InterruptedException {
ClusterState state = ClusterState.builder(new ClusterName(randomAlphaOfLength(5)))
.metaData(MetaData.builder()
.put(IndexMetaData.builder(ILM_HISTORY_INDEX_PREFIX + "000001")
.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
.numberOfShards(randomIntBetween(1,10))
.numberOfReplicas(randomIntBetween(1,10))
.putAlias(AliasMetaData.builder(ILM_HISTORY_ALIAS)
.writeIndex(true)
.build())))
.build();
client.setVerifier((a, r, l) -> {
fail("no client calls should have been made");
return null;
});
CountDownLatch latch = new CountDownLatch(1);
ILMHistoryStore.ensureHistoryIndex(client, state, new LatchedActionListener<>(ActionListener.wrap(
Assert::assertFalse,
ex -> {
logger.error(ex);
fail("should have called onResponse, not onFailure");
}), latch));
ElasticsearchAssertions.awaitLatch(latch, 10, TimeUnit.SECONDS);
}
public void testHistoryIndexHasNoWriteIndex() throws InterruptedException {
ClusterState state = ClusterState.builder(new ClusterName(randomAlphaOfLength(5)))
.metaData(MetaData.builder()
.put(IndexMetaData.builder(ILM_HISTORY_INDEX_PREFIX + "000001")
.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
.numberOfShards(randomIntBetween(1,10))
.numberOfReplicas(randomIntBetween(1,10))
.putAlias(AliasMetaData.builder(ILM_HISTORY_ALIAS)
.build()))
.put(IndexMetaData.builder(randomAlphaOfLength(5))
.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
.numberOfShards(randomIntBetween(1,10))
.numberOfReplicas(randomIntBetween(1,10))
.putAlias(AliasMetaData.builder(ILM_HISTORY_ALIAS)
.build())))
.build();
client.setVerifier((a, r, l) -> {
fail("no client calls should have been made");
return null;
});
CountDownLatch latch = new CountDownLatch(1);
ILMHistoryStore.ensureHistoryIndex(client, state, new LatchedActionListener<>(ActionListener.wrap(
indexCreated -> fail("should have called onFailure, not onResponse"),
ex -> {
assertThat(ex, instanceOf(IllegalStateException.class));
assertThat(ex.getMessage(), Matchers.containsString("ILM history alias [" + ILM_HISTORY_ALIAS +
"does not have a write index"));
}), latch));
ElasticsearchAssertions.awaitLatch(latch, 10, TimeUnit.SECONDS);
}
public void testHistoryIndexNotAlias() throws InterruptedException {
ClusterState state = ClusterState.builder(new ClusterName(randomAlphaOfLength(5)))
.metaData(MetaData.builder()
.put(IndexMetaData.builder(ILM_HISTORY_ALIAS)
.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
.numberOfShards(randomIntBetween(1,10))
.numberOfReplicas(randomIntBetween(1,10))))
.build();
client.setVerifier((a, r, l) -> {
fail("no client calls should have been made");
return null;
});
CountDownLatch latch = new CountDownLatch(1);
ILMHistoryStore.ensureHistoryIndex(client, state, new LatchedActionListener<>(ActionListener.wrap(
indexCreated -> fail("should have called onFailure, not onResponse"),
ex -> {
assertThat(ex, instanceOf(IllegalStateException.class));
assertThat(ex.getMessage(), Matchers.containsString("ILM history alias [" + ILM_HISTORY_ALIAS +
"] already exists as concrete index"));
}), latch));
ElasticsearchAssertions.awaitLatch(latch, 10, TimeUnit.SECONDS);
}
public void testHistoryIndexCreatedConcurrently() throws InterruptedException {
ClusterState state = ClusterState.builder(new ClusterName(randomAlphaOfLength(5)))
.metaData(MetaData.builder())
.build();
client.setVerifier((a, r, l) -> {
assertThat(a, instanceOf(CreateIndexAction.class));
assertThat(r, instanceOf(CreateIndexRequest.class));
CreateIndexRequest request = (CreateIndexRequest) r;
assertThat(request.aliases(), Matchers.hasSize(1));
request.aliases().forEach(alias -> {
assertThat(alias.name(), equalTo(ILM_HISTORY_ALIAS));
assertTrue(alias.writeIndex());
});
throw new ResourceAlreadyExistsException("that index already exists");
});
CountDownLatch latch = new CountDownLatch(1);
ILMHistoryStore.ensureHistoryIndex(client, state, new LatchedActionListener<>(ActionListener.wrap(
Assert::assertFalse,
ex -> {
logger.error(ex);
fail("should have called onResponse, not onFailure");
}), latch));
ElasticsearchAssertions.awaitLatch(latch, 10, TimeUnit.SECONDS);
}
public void testHistoryAliasDoesntExistButIndexDoes() throws InterruptedException {
final String initialIndex = ILM_HISTORY_INDEX_PREFIX + "000001";
ClusterState state = ClusterState.builder(new ClusterName(randomAlphaOfLength(5)))
.metaData(MetaData.builder()
.put(IndexMetaData.builder(initialIndex)
.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
.numberOfShards(randomIntBetween(1,10))
.numberOfReplicas(randomIntBetween(1,10))))
.build();
client.setVerifier((a, r, l) -> {
fail("no client calls should have been made");
return null;
});
CountDownLatch latch = new CountDownLatch(1);
ILMHistoryStore.ensureHistoryIndex(client, state, new LatchedActionListener<>(ActionListener.wrap(
response -> {
logger.error(response);
fail("should have called onFailure, not onResponse");
},
ex -> {
assertThat(ex, instanceOf(IllegalStateException.class));
assertThat(ex.getMessage(), Matchers.containsString("ILM history index [" + initialIndex +
"] already exists but does not have alias [" + ILM_HISTORY_ALIAS + "]"));
}), latch));
ElasticsearchAssertions.awaitLatch(latch, 10, TimeUnit.SECONDS);
}
@SuppressWarnings("unchecked")
private void assertContainsMap(String indexedDocument, Map<String, Object> map) {
map.forEach((k, v) -> {
assertThat(indexedDocument, Matchers.containsString(k));
if (v instanceof Map) {
assertContainsMap(indexedDocument, (Map<String, Object>) v);
}
if (v instanceof Iterable) {
((Iterable) v).forEach(elem -> {
assertThat(indexedDocument, Matchers.containsString(elem.toString()));
});
} else {
assertThat(indexedDocument, Matchers.containsString(v.toString()));
}
});
}
/**
* A client that delegates to a verifying function for action/request/listener
*/
public static class VerifyingClient extends NoOpClient {
private TriFunction<ActionType<?>, ActionRequest, ActionListener<?>, ActionResponse> verifier = (a, r, l) -> {
fail("verifier not set");
return null;
};
VerifyingClient(ThreadPool threadPool) {
super(threadPool);
}
@Override
@SuppressWarnings("unchecked")
protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(ActionType<Response> action,
Request request,
ActionListener<Response> listener) {
try {
listener.onResponse((Response) verifier.apply(action, request, listener));
} catch (Exception e) {
listener.onFailure(e);
}
}
VerifyingClient setVerifier(TriFunction<ActionType<?>, ActionRequest, ActionListener<?>, ActionResponse> verifier) {
this.verifier = verifier;
return this;
}
}
}

View File

@ -30,6 +30,7 @@ import org.elasticsearch.snapshots.mockstore.MockRepository;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicy; import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicy;
import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicyItem; import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicyItem;
import org.elasticsearch.xpack.core.slm.SnapshotRetentionConfiguration; import org.elasticsearch.xpack.core.slm.SnapshotRetentionConfiguration;
@ -89,6 +90,7 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase {
settings.put(XPackSettings.MONITORING_ENABLED.getKey(), false); settings.put(XPackSettings.MONITORING_ENABLED.getKey(), false);
settings.put(XPackSettings.GRAPH_ENABLED.getKey(), false); settings.put(XPackSettings.GRAPH_ENABLED.getKey(), false);
settings.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false); settings.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false);
settings.put(LifecycleSettings.LIFECYCLE_HISTORY_INDEX_ENABLED, false);
return settings.build(); return settings.build();
} }

View File

@ -58,6 +58,7 @@ public abstract class MonitoringIntegTestCase extends ESIntegTestCase {
// .put(MachineLearningField.AUTODETECT_PROCESS.getKey(), false) // .put(MachineLearningField.AUTODETECT_PROCESS.getKey(), false)
// .put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false) // .put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false)
// we do this by default in core, but for monitoring this isn't needed and only adds noise. // we do this by default in core, but for monitoring this isn't needed and only adds noise.
.put("index.lifecycle.history_index_enabled", false)
.put("index.store.mock.check_index_on_close", false); .put("index.store.mock.check_index_on_close", false);
return builder.build(); return builder.build();

View File

@ -118,6 +118,7 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
// watcher settings that should work despite randomization // watcher settings that should work despite randomization
.put("xpack.watcher.execution.scroll.size", randomIntBetween(1, 100)) .put("xpack.watcher.execution.scroll.size", randomIntBetween(1, 100))
.put("xpack.watcher.watch.scroll.size", randomIntBetween(1, 100)) .put("xpack.watcher.watch.scroll.size", randomIntBetween(1, 100))
.put("index.lifecycle.history_index_enabled", false)
// SLM can cause timing issues during testsuite teardown: https://github.com/elastic/elasticsearch/issues/50302 // SLM can cause timing issues during testsuite teardown: https://github.com/elastic/elasticsearch/issues/50302
// SLM is not required for tests extending from this base class and only add noise. // SLM is not required for tests extending from this base class and only add noise.
.put("xpack.slm.enabled", false) .put("xpack.slm.enabled", false)