From 34d6b612090447f143447cd9785bb1191df6750a Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Mon, 22 Jan 2018 09:27:33 -0800 Subject: [PATCH] add `index.lifecycle.date` setting (#3545) This PR adds a new setting called `index.lifecycle.date` that the ShrinkAction will be responsible for populating in the newly created index. This way, we can continue to know when we should be executing the next phase relative to the original index creation date, and not that of the shrunken index. --- .../xpack/indexlifecycle/ShrinkAction.java | 1 + .../xpack/indexlifecycle/IndexLifecycle.java | 8 +- .../indexlifecycle/IndexLifecycleService.java | 70 +++++++++++--- .../InternalIndexLifecycleContext.java | 4 +- .../IndexLifecycleServiceTests.java | 96 ++++++++++++++++++- .../InternalIndexLifecycleContextTests.java | 8 +- .../indexlifecycle/ShrinkActionTests.java | 5 +- 7 files changed, 167 insertions(+), 25 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/indexlifecycle/ShrinkAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/indexlifecycle/ShrinkAction.java index ab777524b3b..3f3ea32dc4d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/indexlifecycle/ShrinkAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/indexlifecycle/ShrinkAction.java @@ -136,6 +136,7 @@ public class ShrinkAction implements LifecycleAction { resizeRequest.getTargetIndexRequest().settings(Settings.builder() .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numberOfShards) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, indexMetaData.getNumberOfReplicas()) + .put("index.lifecycle.date", indexMetaData.getCreationDate()) .build()); indexMetaData.getAliases().values().spliterator().forEachRemaining(aliasMetaDataObjectCursor -> { resizeRequest.getTargetIndexRequest().alias(new Alias(aliasMetaDataObjectCursor.value.alias())); diff --git a/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java b/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java index d17f3333164..d48220182ca 100644 --- a/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java +++ b/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java @@ -56,7 +56,6 @@ import static java.util.Collections.emptyList; public class IndexLifecycle extends Plugin { public static final String NAME = "index_lifecycle"; public static final String BASE_PATH = "/_xpack/index_lifecycle/"; - public static final String THREAD_POOL_NAME = NAME; private final SetOnce indexLifecycleInitialisationService = new SetOnce<>(); private Settings settings; private boolean enabled; @@ -71,6 +70,8 @@ public class IndexLifecycle extends Plugin { Setting.Property.Dynamic, Setting.Property.IndexScope); public static final Setting LIFECYCLE_ACTION_SETTING = Setting.simpleString("index.lifecycle.action", Setting.Property.Dynamic, Setting.Property.IndexScope); + public static final Setting LIFECYCLE_INDEX_CREATION_DATE_SETTING = Setting.longSetting("index.lifecycle.date", + -1L, -1L, Setting.Property.Dynamic, Setting.Property.IndexScope); public IndexLifecycle(Settings settings) { this.settings = settings; @@ -93,9 +94,10 @@ public class IndexLifecycle extends Plugin { @Override public List> getSettings() { return Arrays.asList( - LIFECYCLE_POLL_INTERVAL_SETTING, + LIFECYCLE_POLL_INTERVAL_SETTING, LIFECYCLE_NAME_SETTING, - LIFECYCLE_PHASE_SETTING, + LIFECYCLE_PHASE_SETTING, + LIFECYCLE_INDEX_CREATION_DATE_SETTING, LIFECYCLE_ACTION_SETTING); } diff --git a/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java b/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java index 9e994f2f745..350210ea77e 100644 --- a/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java +++ b/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java @@ -8,11 +8,15 @@ package org.elasticsearch.xpack.indexlifecycle; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.FormattedMessage; import org.apache.lucene.util.SetOnce; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; @@ -20,15 +24,21 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.Index; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.scheduler.SchedulerEngine; import java.io.Closeable; import java.io.IOException; import java.time.Clock; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.SortedMap; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; import java.util.function.LongSupplier; +import java.util.stream.Collectors; /** * A service which runs the {@link LifecyclePolicy}s associated with indexes. @@ -112,26 +122,35 @@ public class IndexLifecycleService extends AbstractComponent logger.info("Job triggered: " + event.getJobName() + ", " + event.getScheduledTime() + ", " + event.getTriggeredTime()); IndexLifecycleMetadata indexLifecycleMetadata = clusterService.state().metaData().custom(IndexLifecycleMetadata.TYPE); SortedMap policies = indexLifecycleMetadata.getPolicies(); + // loop through all indices in cluster state and filter for ones that are + // managed by the Index Lifecycle Service they have a index.lifecycle.name setting + // associated to a policy clusterService.state().metaData().indices().valuesIt().forEachRemaining((idxMeta) -> { String policyName = IndexLifecycle.LIFECYCLE_NAME_SETTING.get(idxMeta.getSettings()); if (Strings.isNullOrEmpty(policyName) == false) { - logger.info("Checking index for next action: " + idxMeta.getIndex().getName() + " (" + policyName + ")"); - LifecyclePolicy policy = policies.get(policyName); - if (policy == null) { - logger.error("Unknown lifecycle policy [{}] for index [{}]", policyName, idxMeta.getIndex().getName()); - } else { - try { - policy.execute(new InternalIndexLifecycleContext(idxMeta.getIndex(), client, clusterService, nowSupplier)); - } catch (Exception e) { - logger.error(new FormattedMessage("Failed to execute lifecycle policy [{}] for index [{}]", policyName, - idxMeta.getIndex().getName()), e); - } - } + // ensure that all managed indices have `index.lifecycle.date` set + // and then execute their respective lifecycle policies. + putLifecycleDate(idxMeta).thenRun(() -> executePolicy(idxMeta, policies, policyName)); } }); } } + private void executePolicy(IndexMetaData idxMeta, SortedMap policies, String policyName) { + logger.info("Checking index for next action: " + idxMeta.getIndex().getName() + " (" + policyName + ")"); + LifecyclePolicy policy = policies.get(policyName); + if (policy == null) { + logger.error("Unknown lifecycle policy [{}] for index [{}]", policyName, idxMeta.getIndex().getName()); + } else { + try { + policy.execute(new InternalIndexLifecycleContext(idxMeta.getIndex(), client, clusterService, nowSupplier)); + } catch (Exception e) { + logger.error(new FormattedMessage("Failed to execute lifecycle policy [{}] for index [{}]", policyName, + idxMeta.getIndex().getName()), e); + } + } + } + private void installMetadata(IndexLifecycleMetadata lifecycleMetadata) { threadPool.executor(ThreadPool.Names.GENERIC) .execute(() -> clusterService.submitStateUpdateTask("install-index-lifecycle-metadata", new ClusterStateUpdateTask() { @@ -151,8 +170,33 @@ public class IndexLifecycleService extends AbstractComponent })); } + private CompletableFuture putLifecycleDate(IndexMetaData idxMeta) { + CompletableFuture completableFuture = new CompletableFuture<>(); + if (idxMeta.getSettings().hasValue(IndexLifecycle.LIFECYCLE_INDEX_CREATION_DATE_SETTING.getKey())) { + completableFuture.complete(null); + } else { + UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(Settings.builder() + .put(IndexLifecycle.LIFECYCLE_INDEX_CREATION_DATE_SETTING.getKey(), idxMeta.getCreationDate()).build(), + idxMeta.getIndex().getName()); + client.admin().indices().updateSettings(updateSettingsRequest, new ActionListener() { + @Override + public void onResponse(UpdateSettingsResponse updateSettingsResponse) { + completableFuture.complete(null); + } + + @Override + public void onFailure(Exception e) { + logger.error("unable to update index.lifecycle.date setting on indices", e); + completableFuture.completeExceptionally(e); + } + }); + } + + return completableFuture; + } + @Override - public void close() throws IOException { + public void close() { SchedulerEngine engine = scheduler.get(); if (engine != null) { engine.stop(); diff --git a/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/InternalIndexLifecycleContext.java b/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/InternalIndexLifecycleContext.java index eb92af520c2..c3ca8a1b607 100644 --- a/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/InternalIndexLifecycleContext.java +++ b/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/InternalIndexLifecycleContext.java @@ -84,8 +84,8 @@ public class InternalIndexLifecycleContext implements IndexLifecycleContext { @Override public boolean canExecute(Phase phase) { long now = nowSupplier.getAsLong(); - long indexCreated = getIdxMetaData().getCreationDate(); - return (indexCreated + phase.getAfter().millis()) <= now; + long initialIndexCreated = IndexLifecycle.LIFECYCLE_INDEX_CREATION_DATE_SETTING.get(getIdxMetaData().getSettings()); + return (initialIndexCreated + phase.getAfter().millis()) <= now; } @Override diff --git a/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleServiceTests.java b/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleServiceTests.java index 9cb83fca5f7..4f25d8c03da 100644 --- a/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleServiceTests.java +++ b/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleServiceTests.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.indexlifecycle; import org.apache.lucene.util.SetOnce; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsTestHelper; import org.elasticsearch.client.AdminClient; @@ -260,6 +261,14 @@ public class IndexLifecycleServiceTests extends ESTestCase { assertNull(indexLifecycleService.getScheduler()); } + /** + * Checks that a new index does the following successfully: + * + * 1. setting index.lifecycle.date + * 2. sets phase + * 3. sets action + * 4. executes action + */ @SuppressWarnings("unchecked") public void testTriggeredWithMatchingPolicy() { String policyName = randomAlphaOfLengthBetween(1, 20); @@ -289,18 +298,103 @@ public class IndexLifecycleServiceTests extends ESTestCase { when(clusterService.state()).thenReturn(currentState); + SetOnce dateUpdated = new SetOnce<>(); + SetOnce phaseUpdated = new SetOnce<>(); + SetOnce actionUpdated = new SetOnce<>(); doAnswer(invocationOnMock -> { + UpdateSettingsRequest request = (UpdateSettingsRequest) invocationOnMock.getArguments()[0]; ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; + UpdateSettingsTestHelper.assertSettingsRequest(request, Settings.builder() + .put(IndexLifecycle.LIFECYCLE_INDEX_CREATION_DATE_SETTING.getKey(), + indexMetadata.getCreationDate()).build(), index.getName()); + dateUpdated.set(true); + listener.onResponse(UpdateSettingsTestHelper.createMockResponse(true)); + return null; + }).doAnswer(invocationOnMock -> { + UpdateSettingsRequest request = (UpdateSettingsRequest) invocationOnMock.getArguments()[0]; + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; + UpdateSettingsTestHelper.assertSettingsRequest(request, Settings.builder() + .put(IndexLifecycle.LIFECYCLE_ACTION_SETTING.getKey(), "") + .put(IndexLifecycle.LIFECYCLE_PHASE_SETTING.getKey(), "phase").build(), index.getName()); + phaseUpdated.set(true); + listener.onResponse(UpdateSettingsTestHelper.createMockResponse(true)); + return null; + }).doAnswer(invocationOnMock -> { + UpdateSettingsRequest request = (UpdateSettingsRequest) invocationOnMock.getArguments()[0]; + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; + UpdateSettingsTestHelper.assertSettingsRequest(request, Settings.builder() + .put(IndexLifecycle.LIFECYCLE_ACTION_SETTING.getKey(), MockAction.NAME).build(), index.getName()); + actionUpdated.set(true); listener.onResponse(UpdateSettingsTestHelper.createMockResponse(true)); return null; - }).when(indicesClient).updateSettings(any(), any()); indexLifecycleService.triggered(schedulerEvent); + assertThat(dateUpdated.get(), equalTo(true)); + assertThat(phaseUpdated.get(), equalTo(true)); + assertThat(actionUpdated.get(), equalTo(true)); assertThat(mockAction.getExecutedCount(), equalTo(1L)); } + /** + * Check that a policy is executed without first setting the `index.lifecycle.date` setting + */ + @SuppressWarnings("unchecked") + public void testTriggeredWithDateSettingAlreadyPresent() { + String policyName = randomAlphaOfLengthBetween(1, 20); + MockAction mockAction = new MockAction(); + Phase phase = new Phase("phase", TimeValue.ZERO, Collections.singletonMap("action", mockAction)); + LifecyclePolicy policy = new LifecyclePolicy(TestLifecycleType.INSTANCE, policyName, + Collections.singletonMap(phase.getName(), phase)); + SortedMap policyMap = new TreeMap<>(); + policyMap.put(policyName, policy); + Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20)); + long creationDate = randomNonNegativeLong(); + IndexMetaData indexMetadata = IndexMetaData.builder(index.getName()) + .settings(settings(Version.CURRENT) + .put(IndexLifecycle.LIFECYCLE_NAME_SETTING.getKey(), policyName) + .put(IndexLifecycle.LIFECYCLE_INDEX_CREATION_DATE_SETTING.getKey(), creationDate)) + .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).creationDate(creationDate).build(); + ImmutableOpenMap.Builder indices = ImmutableOpenMap. builder() + .fPut(index.getName(), indexMetadata); + MetaData metaData = MetaData.builder() + .putCustom(IndexLifecycleMetadata.TYPE, new IndexLifecycleMetadata(policyMap)) + .indices(indices.build()) + .persistentSettings(settings(Version.CURRENT).build()) + .build(); + ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT) + .metaData(metaData) + .nodes(DiscoveryNodes.builder().localNodeId(nodeId).masterNodeId(nodeId).add(masterNode).build()) + .build(); + + SchedulerEngine.Event schedulerEvent = new SchedulerEngine.Event(IndexLifecycle.NAME, randomLong(), randomLong()); + + when(clusterService.state()).thenReturn(currentState); + + SetOnce dateUpdated = new SetOnce<>(); + doAnswer(invocationOnMock -> { + UpdateSettingsRequest request = (UpdateSettingsRequest) invocationOnMock.getArguments()[0]; + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; + try { + UpdateSettingsTestHelper.assertSettingsRequest(request, Settings.builder() + .put(IndexLifecycle.LIFECYCLE_INDEX_CREATION_DATE_SETTING.getKey(), + indexMetadata.getCreationDate()).build(), index.getName()); + dateUpdated.set(true); + } catch (AssertionError e) { + // noop: here because we are either updating the phase or action prior to executing MockAction + } + listener.onResponse(UpdateSettingsTestHelper.createMockResponse(true)); + return null; + }).when(indicesClient).updateSettings(any(), any()); + + indexLifecycleService.triggered(schedulerEvent); + + assertNull(dateUpdated.get()); + assertThat(mockAction.getExecutedCount(), equalTo(1L)); + + } + /** * Check that if an index has an unknown lifecycle policy set it does not * execute any policy but does process other indexes. diff --git a/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/InternalIndexLifecycleContextTests.java b/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/InternalIndexLifecycleContextTests.java index 1af76100fc1..7c49c02fe65 100644 --- a/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/InternalIndexLifecycleContextTests.java +++ b/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/InternalIndexLifecycleContextTests.java @@ -40,8 +40,6 @@ public class InternalIndexLifecycleContextTests extends ESTestCase { MetaData metaData = MetaData.builder().indices(indices.build()) .persistentSettings(settings(Version.CURRENT).build()).build(); return ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).build(); -// ClusterService clusterService = Mockito.mock(ClusterService.class); -// Mockito.when(clusterService.state()).thenReturn(clusterState); } public void testSetPhase() { @@ -495,7 +493,7 @@ public class InternalIndexLifecycleContextTests extends ESTestCase { long now = random().longs(creationDate, creationDate + after.millis()).iterator().nextLong(); IndexMetaData idxMeta = IndexMetaData.builder(TEST_INDEX.getName()) - .settings(Settings.builder().put("index.version.created", 7000001L).put("index.creation_date", creationDate).build()) + .settings(Settings.builder().put("index.version.created", 7000001L).put("index.lifecycle.date", creationDate).build()) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); ClusterState clusterState = getClusterState(idxMeta); ClusterService clusterService = Mockito.mock(ClusterService.class); @@ -514,7 +512,7 @@ public class InternalIndexLifecycleContextTests extends ESTestCase { long now = creationDate + after.millis(); IndexMetaData idxMeta = IndexMetaData.builder(TEST_INDEX.getName()) - .settings(Settings.builder().put("index.version.created", 7000001L).put("index.creation_date", creationDate).build()) + .settings(Settings.builder().put("index.version.created", 7000001L).put("index.lifecycle.date", creationDate).build()) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); ClusterState clusterState = getClusterState(idxMeta); ClusterService clusterService = Mockito.mock(ClusterService.class); @@ -533,7 +531,7 @@ public class InternalIndexLifecycleContextTests extends ESTestCase { long now = random().longs(creationDate + after.millis(), Long.MAX_VALUE).iterator().nextLong(); IndexMetaData idxMeta = IndexMetaData.builder(TEST_INDEX.getName()) - .settings(Settings.builder().put("index.version.created", 7000001L).put("index.creation_date", creationDate).build()) + .settings(Settings.builder().put("index.version.created", 7000001L).put("index.lifecycle.date", creationDate).build()) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); ClusterState clusterState = getClusterState(idxMeta); ClusterService clusterService = Mockito.mock(ClusterService.class); diff --git a/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/ShrinkActionTests.java b/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/ShrinkActionTests.java index 2551a0019e2..456819c031e 100644 --- a/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/ShrinkActionTests.java +++ b/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/ShrinkActionTests.java @@ -226,10 +226,12 @@ public class ShrinkActionTests extends AbstractSerializingTestCase Index targetIndex = new Index("shrunk-" + index.getName(), randomAlphaOfLengthBetween(1, 20)); int numberOfShards = randomIntBetween(1, 5); int numberOfReplicas = randomIntBetween(1, 5); + long creationDate = randomNonNegativeLong(); ClusterService clusterService = Mockito.mock(ClusterService.class); IndexMetaData indexMetadata = IndexMetaData.builder(index.getName()) .settings(settings(Version.CURRENT)) .putAlias(AliasMetaData.builder("my_alias")) + .creationDate(creationDate) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(numberOfReplicas).build(); ImmutableOpenMap.Builder indices = ImmutableOpenMap. builder().fPut(index.getName(), indexMetadata); @@ -266,7 +268,8 @@ public class ShrinkActionTests extends AbstractSerializingTestCase assertThat(request.getTargetIndexRequest().aliases(), equalTo(Collections.singleton(new Alias("my_alias")))); assertThat(request.getTargetIndexRequest().settings(), equalTo(Settings.builder() .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numberOfShards) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas).build())); + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas) + .put("index.lifecycle.date", creationDate).build())); assertThat(request.getTargetIndexRequest().index(), equalTo(targetIndex.getName())); ResizeResponse resizeResponse = ResizeAction.INSTANCE.newResponse(); resizeResponse.readFrom(StreamInput.wrap(new byte[] { 1, 1, 1, 1, 1 }));