add `index.lifecycle.date` setting (#3545)
This PR adds a new setting called `index.lifecycle.date` that the ShrinkAction will be responsible for populating in the newly created index. This way, we can continue to know when we should be executing the next phase relative to the original index creation date, and not that of the shrunken index.
This commit is contained in:
parent
9032d61516
commit
34d6b61209
|
@ -136,6 +136,7 @@ public class ShrinkAction implements LifecycleAction {
|
|||
resizeRequest.getTargetIndexRequest().settings(Settings.builder()
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numberOfShards)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, indexMetaData.getNumberOfReplicas())
|
||||
.put("index.lifecycle.date", indexMetaData.getCreationDate())
|
||||
.build());
|
||||
indexMetaData.getAliases().values().spliterator().forEachRemaining(aliasMetaDataObjectCursor -> {
|
||||
resizeRequest.getTargetIndexRequest().alias(new Alias(aliasMetaDataObjectCursor.value.alias()));
|
||||
|
|
|
@ -56,7 +56,6 @@ import static java.util.Collections.emptyList;
|
|||
public class IndexLifecycle extends Plugin {
|
||||
public static final String NAME = "index_lifecycle";
|
||||
public static final String BASE_PATH = "/_xpack/index_lifecycle/";
|
||||
public static final String THREAD_POOL_NAME = NAME;
|
||||
private final SetOnce<IndexLifecycleService> indexLifecycleInitialisationService = new SetOnce<>();
|
||||
private Settings settings;
|
||||
private boolean enabled;
|
||||
|
@ -71,6 +70,8 @@ public class IndexLifecycle extends Plugin {
|
|||
Setting.Property.Dynamic, Setting.Property.IndexScope);
|
||||
public static final Setting<String> LIFECYCLE_ACTION_SETTING = Setting.simpleString("index.lifecycle.action",
|
||||
Setting.Property.Dynamic, Setting.Property.IndexScope);
|
||||
public static final Setting<Long> LIFECYCLE_INDEX_CREATION_DATE_SETTING = Setting.longSetting("index.lifecycle.date",
|
||||
-1L, -1L, Setting.Property.Dynamic, Setting.Property.IndexScope);
|
||||
|
||||
public IndexLifecycle(Settings settings) {
|
||||
this.settings = settings;
|
||||
|
@ -96,6 +97,7 @@ public class IndexLifecycle extends Plugin {
|
|||
LIFECYCLE_POLL_INTERVAL_SETTING,
|
||||
LIFECYCLE_NAME_SETTING,
|
||||
LIFECYCLE_PHASE_SETTING,
|
||||
LIFECYCLE_INDEX_CREATION_DATE_SETTING,
|
||||
LIFECYCLE_ACTION_SETTING);
|
||||
}
|
||||
|
||||
|
|
|
@ -8,11 +8,15 @@ package org.elasticsearch.xpack.indexlifecycle;
|
|||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.FormattedMessage;
|
||||
import org.apache.lucene.util.SetOnce;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
|
||||
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Strings;
|
||||
|
@ -20,15 +24,21 @@ import org.elasticsearch.common.component.AbstractComponent;
|
|||
import org.elasticsearch.common.logging.ESLoggerFactory;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.scheduler.SchedulerEngine;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.time.Clock;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.SortedMap;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.LongSupplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* A service which runs the {@link LifecyclePolicy}s associated with indexes.
|
||||
|
@ -112,9 +122,21 @@ public class IndexLifecycleService extends AbstractComponent
|
|||
logger.info("Job triggered: " + event.getJobName() + ", " + event.getScheduledTime() + ", " + event.getTriggeredTime());
|
||||
IndexLifecycleMetadata indexLifecycleMetadata = clusterService.state().metaData().custom(IndexLifecycleMetadata.TYPE);
|
||||
SortedMap<String, LifecyclePolicy> policies = indexLifecycleMetadata.getPolicies();
|
||||
// loop through all indices in cluster state and filter for ones that are
|
||||
// managed by the Index Lifecycle Service they have a index.lifecycle.name setting
|
||||
// associated to a policy
|
||||
clusterService.state().metaData().indices().valuesIt().forEachRemaining((idxMeta) -> {
|
||||
String policyName = IndexLifecycle.LIFECYCLE_NAME_SETTING.get(idxMeta.getSettings());
|
||||
if (Strings.isNullOrEmpty(policyName) == false) {
|
||||
// ensure that all managed indices have `index.lifecycle.date` set
|
||||
// and then execute their respective lifecycle policies.
|
||||
putLifecycleDate(idxMeta).thenRun(() -> executePolicy(idxMeta, policies, policyName));
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private void executePolicy(IndexMetaData idxMeta, SortedMap<String, LifecyclePolicy> policies, String policyName) {
|
||||
logger.info("Checking index for next action: " + idxMeta.getIndex().getName() + " (" + policyName + ")");
|
||||
LifecyclePolicy policy = policies.get(policyName);
|
||||
if (policy == null) {
|
||||
|
@ -128,9 +150,6 @@ public class IndexLifecycleService extends AbstractComponent
|
|||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private void installMetadata(IndexLifecycleMetadata lifecycleMetadata) {
|
||||
threadPool.executor(ThreadPool.Names.GENERIC)
|
||||
|
@ -151,8 +170,33 @@ public class IndexLifecycleService extends AbstractComponent
|
|||
}));
|
||||
}
|
||||
|
||||
private CompletableFuture<Void> putLifecycleDate(IndexMetaData idxMeta) {
|
||||
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
|
||||
if (idxMeta.getSettings().hasValue(IndexLifecycle.LIFECYCLE_INDEX_CREATION_DATE_SETTING.getKey())) {
|
||||
completableFuture.complete(null);
|
||||
} else {
|
||||
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(Settings.builder()
|
||||
.put(IndexLifecycle.LIFECYCLE_INDEX_CREATION_DATE_SETTING.getKey(), idxMeta.getCreationDate()).build(),
|
||||
idxMeta.getIndex().getName());
|
||||
client.admin().indices().updateSettings(updateSettingsRequest, new ActionListener<UpdateSettingsResponse>() {
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
public void onResponse(UpdateSettingsResponse updateSettingsResponse) {
|
||||
completableFuture.complete(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
logger.error("unable to update index.lifecycle.date setting on indices", e);
|
||||
completableFuture.completeExceptionally(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return completableFuture;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
SchedulerEngine engine = scheduler.get();
|
||||
if (engine != null) {
|
||||
engine.stop();
|
||||
|
|
|
@ -84,8 +84,8 @@ public class InternalIndexLifecycleContext implements IndexLifecycleContext {
|
|||
@Override
|
||||
public boolean canExecute(Phase phase) {
|
||||
long now = nowSupplier.getAsLong();
|
||||
long indexCreated = getIdxMetaData().getCreationDate();
|
||||
return (indexCreated + phase.getAfter().millis()) <= now;
|
||||
long initialIndexCreated = IndexLifecycle.LIFECYCLE_INDEX_CREATION_DATE_SETTING.get(getIdxMetaData().getSettings());
|
||||
return (initialIndexCreated + phase.getAfter().millis()) <= now;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -8,6 +8,7 @@ package org.elasticsearch.xpack.indexlifecycle;
|
|||
import org.apache.lucene.util.SetOnce;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
|
||||
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse;
|
||||
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsTestHelper;
|
||||
import org.elasticsearch.client.AdminClient;
|
||||
|
@ -260,6 +261,14 @@ public class IndexLifecycleServiceTests extends ESTestCase {
|
|||
assertNull(indexLifecycleService.getScheduler());
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks that a new index does the following successfully:
|
||||
*
|
||||
* 1. setting index.lifecycle.date
|
||||
* 2. sets phase
|
||||
* 3. sets action
|
||||
* 4. executes action
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testTriggeredWithMatchingPolicy() {
|
||||
String policyName = randomAlphaOfLengthBetween(1, 20);
|
||||
|
@ -289,18 +298,103 @@ public class IndexLifecycleServiceTests extends ESTestCase {
|
|||
|
||||
when(clusterService.state()).thenReturn(currentState);
|
||||
|
||||
SetOnce<Boolean> dateUpdated = new SetOnce<>();
|
||||
SetOnce<Boolean> phaseUpdated = new SetOnce<>();
|
||||
SetOnce<Boolean> actionUpdated = new SetOnce<>();
|
||||
doAnswer(invocationOnMock -> {
|
||||
UpdateSettingsRequest request = (UpdateSettingsRequest) invocationOnMock.getArguments()[0];
|
||||
ActionListener<UpdateSettingsResponse> listener = (ActionListener<UpdateSettingsResponse>) invocationOnMock.getArguments()[1];
|
||||
UpdateSettingsTestHelper.assertSettingsRequest(request, Settings.builder()
|
||||
.put(IndexLifecycle.LIFECYCLE_INDEX_CREATION_DATE_SETTING.getKey(),
|
||||
indexMetadata.getCreationDate()).build(), index.getName());
|
||||
dateUpdated.set(true);
|
||||
listener.onResponse(UpdateSettingsTestHelper.createMockResponse(true));
|
||||
return null;
|
||||
}).doAnswer(invocationOnMock -> {
|
||||
UpdateSettingsRequest request = (UpdateSettingsRequest) invocationOnMock.getArguments()[0];
|
||||
ActionListener<UpdateSettingsResponse> listener = (ActionListener<UpdateSettingsResponse>) invocationOnMock.getArguments()[1];
|
||||
UpdateSettingsTestHelper.assertSettingsRequest(request, Settings.builder()
|
||||
.put(IndexLifecycle.LIFECYCLE_ACTION_SETTING.getKey(), "")
|
||||
.put(IndexLifecycle.LIFECYCLE_PHASE_SETTING.getKey(), "phase").build(), index.getName());
|
||||
phaseUpdated.set(true);
|
||||
listener.onResponse(UpdateSettingsTestHelper.createMockResponse(true));
|
||||
return null;
|
||||
}).doAnswer(invocationOnMock -> {
|
||||
UpdateSettingsRequest request = (UpdateSettingsRequest) invocationOnMock.getArguments()[0];
|
||||
ActionListener<UpdateSettingsResponse> listener = (ActionListener<UpdateSettingsResponse>) invocationOnMock.getArguments()[1];
|
||||
UpdateSettingsTestHelper.assertSettingsRequest(request, Settings.builder()
|
||||
.put(IndexLifecycle.LIFECYCLE_ACTION_SETTING.getKey(), MockAction.NAME).build(), index.getName());
|
||||
actionUpdated.set(true);
|
||||
listener.onResponse(UpdateSettingsTestHelper.createMockResponse(true));
|
||||
return null;
|
||||
|
||||
}).when(indicesClient).updateSettings(any(), any());
|
||||
|
||||
indexLifecycleService.triggered(schedulerEvent);
|
||||
|
||||
assertThat(dateUpdated.get(), equalTo(true));
|
||||
assertThat(phaseUpdated.get(), equalTo(true));
|
||||
assertThat(actionUpdated.get(), equalTo(true));
|
||||
assertThat(mockAction.getExecutedCount(), equalTo(1L));
|
||||
}
|
||||
|
||||
/**
|
||||
* Check that a policy is executed without first setting the `index.lifecycle.date` setting
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testTriggeredWithDateSettingAlreadyPresent() {
|
||||
String policyName = randomAlphaOfLengthBetween(1, 20);
|
||||
MockAction mockAction = new MockAction();
|
||||
Phase phase = new Phase("phase", TimeValue.ZERO, Collections.singletonMap("action", mockAction));
|
||||
LifecyclePolicy policy = new LifecyclePolicy(TestLifecycleType.INSTANCE, policyName,
|
||||
Collections.singletonMap(phase.getName(), phase));
|
||||
SortedMap<String, LifecyclePolicy> policyMap = new TreeMap<>();
|
||||
policyMap.put(policyName, policy);
|
||||
Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
|
||||
long creationDate = randomNonNegativeLong();
|
||||
IndexMetaData indexMetadata = IndexMetaData.builder(index.getName())
|
||||
.settings(settings(Version.CURRENT)
|
||||
.put(IndexLifecycle.LIFECYCLE_NAME_SETTING.getKey(), policyName)
|
||||
.put(IndexLifecycle.LIFECYCLE_INDEX_CREATION_DATE_SETTING.getKey(), creationDate))
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).creationDate(creationDate).build();
|
||||
ImmutableOpenMap.Builder<String, IndexMetaData> indices = ImmutableOpenMap.<String, IndexMetaData> builder()
|
||||
.fPut(index.getName(), indexMetadata);
|
||||
MetaData metaData = MetaData.builder()
|
||||
.putCustom(IndexLifecycleMetadata.TYPE, new IndexLifecycleMetadata(policyMap))
|
||||
.indices(indices.build())
|
||||
.persistentSettings(settings(Version.CURRENT).build())
|
||||
.build();
|
||||
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
|
||||
.metaData(metaData)
|
||||
.nodes(DiscoveryNodes.builder().localNodeId(nodeId).masterNodeId(nodeId).add(masterNode).build())
|
||||
.build();
|
||||
|
||||
SchedulerEngine.Event schedulerEvent = new SchedulerEngine.Event(IndexLifecycle.NAME, randomLong(), randomLong());
|
||||
|
||||
when(clusterService.state()).thenReturn(currentState);
|
||||
|
||||
SetOnce<Boolean> dateUpdated = new SetOnce<>();
|
||||
doAnswer(invocationOnMock -> {
|
||||
UpdateSettingsRequest request = (UpdateSettingsRequest) invocationOnMock.getArguments()[0];
|
||||
ActionListener<UpdateSettingsResponse> listener = (ActionListener<UpdateSettingsResponse>) invocationOnMock.getArguments()[1];
|
||||
try {
|
||||
UpdateSettingsTestHelper.assertSettingsRequest(request, Settings.builder()
|
||||
.put(IndexLifecycle.LIFECYCLE_INDEX_CREATION_DATE_SETTING.getKey(),
|
||||
indexMetadata.getCreationDate()).build(), index.getName());
|
||||
dateUpdated.set(true);
|
||||
} catch (AssertionError e) {
|
||||
// noop: here because we are either updating the phase or action prior to executing MockAction
|
||||
}
|
||||
listener.onResponse(UpdateSettingsTestHelper.createMockResponse(true));
|
||||
return null;
|
||||
}).when(indicesClient).updateSettings(any(), any());
|
||||
|
||||
indexLifecycleService.triggered(schedulerEvent);
|
||||
|
||||
assertNull(dateUpdated.get());
|
||||
assertThat(mockAction.getExecutedCount(), equalTo(1L));
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Check that if an index has an unknown lifecycle policy set it does not
|
||||
* execute any policy but does process other indexes.
|
||||
|
|
|
@ -40,8 +40,6 @@ public class InternalIndexLifecycleContextTests extends ESTestCase {
|
|||
MetaData metaData = MetaData.builder().indices(indices.build())
|
||||
.persistentSettings(settings(Version.CURRENT).build()).build();
|
||||
return ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).build();
|
||||
// ClusterService clusterService = Mockito.mock(ClusterService.class);
|
||||
// Mockito.when(clusterService.state()).thenReturn(clusterState);
|
||||
}
|
||||
|
||||
public void testSetPhase() {
|
||||
|
@ -495,7 +493,7 @@ public class InternalIndexLifecycleContextTests extends ESTestCase {
|
|||
long now = random().longs(creationDate, creationDate + after.millis()).iterator().nextLong();
|
||||
|
||||
IndexMetaData idxMeta = IndexMetaData.builder(TEST_INDEX.getName())
|
||||
.settings(Settings.builder().put("index.version.created", 7000001L).put("index.creation_date", creationDate).build())
|
||||
.settings(Settings.builder().put("index.version.created", 7000001L).put("index.lifecycle.date", creationDate).build())
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
ClusterState clusterState = getClusterState(idxMeta);
|
||||
ClusterService clusterService = Mockito.mock(ClusterService.class);
|
||||
|
@ -514,7 +512,7 @@ public class InternalIndexLifecycleContextTests extends ESTestCase {
|
|||
long now = creationDate + after.millis();
|
||||
|
||||
IndexMetaData idxMeta = IndexMetaData.builder(TEST_INDEX.getName())
|
||||
.settings(Settings.builder().put("index.version.created", 7000001L).put("index.creation_date", creationDate).build())
|
||||
.settings(Settings.builder().put("index.version.created", 7000001L).put("index.lifecycle.date", creationDate).build())
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
ClusterState clusterState = getClusterState(idxMeta);
|
||||
ClusterService clusterService = Mockito.mock(ClusterService.class);
|
||||
|
@ -533,7 +531,7 @@ public class InternalIndexLifecycleContextTests extends ESTestCase {
|
|||
long now = random().longs(creationDate + after.millis(), Long.MAX_VALUE).iterator().nextLong();
|
||||
|
||||
IndexMetaData idxMeta = IndexMetaData.builder(TEST_INDEX.getName())
|
||||
.settings(Settings.builder().put("index.version.created", 7000001L).put("index.creation_date", creationDate).build())
|
||||
.settings(Settings.builder().put("index.version.created", 7000001L).put("index.lifecycle.date", creationDate).build())
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
ClusterState clusterState = getClusterState(idxMeta);
|
||||
ClusterService clusterService = Mockito.mock(ClusterService.class);
|
||||
|
|
|
@ -226,10 +226,12 @@ public class ShrinkActionTests extends AbstractSerializingTestCase<ShrinkAction>
|
|||
Index targetIndex = new Index("shrunk-" + index.getName(), randomAlphaOfLengthBetween(1, 20));
|
||||
int numberOfShards = randomIntBetween(1, 5);
|
||||
int numberOfReplicas = randomIntBetween(1, 5);
|
||||
long creationDate = randomNonNegativeLong();
|
||||
ClusterService clusterService = Mockito.mock(ClusterService.class);
|
||||
IndexMetaData indexMetadata = IndexMetaData.builder(index.getName())
|
||||
.settings(settings(Version.CURRENT))
|
||||
.putAlias(AliasMetaData.builder("my_alias"))
|
||||
.creationDate(creationDate)
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(numberOfReplicas).build();
|
||||
ImmutableOpenMap.Builder<String, IndexMetaData> indices = ImmutableOpenMap.<String, IndexMetaData> builder().fPut(index.getName(),
|
||||
indexMetadata);
|
||||
|
@ -266,7 +268,8 @@ public class ShrinkActionTests extends AbstractSerializingTestCase<ShrinkAction>
|
|||
assertThat(request.getTargetIndexRequest().aliases(), equalTo(Collections.singleton(new Alias("my_alias"))));
|
||||
assertThat(request.getTargetIndexRequest().settings(), equalTo(Settings.builder()
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numberOfShards)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas).build()));
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)
|
||||
.put("index.lifecycle.date", creationDate).build()));
|
||||
assertThat(request.getTargetIndexRequest().index(), equalTo(targetIndex.getName()));
|
||||
ResizeResponse resizeResponse = ResizeAction.INSTANCE.newResponse();
|
||||
resizeResponse.readFrom(StreamInput.wrap(new byte[] { 1, 1, 1, 1, 1 }));
|
||||
|
|
Loading…
Reference in New Issue