make the lifecycle poll-interval configurable (#3319)

Removes the poll-interval from the IndexLifecycleMetadata and introduces it in 
the form of a cluster setting that is configurable. Changes to this poll interval setting 
will reflect in the Lifecycle Scheduler.
This commit is contained in:
Tal Levy 2017-12-20 09:39:39 -08:00 committed by GitHub
parent 586065fca7
commit fb59f54637
9 changed files with 344 additions and 113 deletions

View File

@ -23,6 +23,7 @@ import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.plugins.ActionPlugin.ActionHandler;
import org.elasticsearch.plugins.Plugin;
@ -58,11 +59,14 @@ public class IndexLifecycle extends Plugin {
private boolean enabled;
private boolean transportClientMode;
public static final Setting<String> LIFECYCLE_TIMESERIES_NAME_SETTING = Setting.simpleString("index.lifecycle.name",
// NORELEASE: we should probably change the default to something other than three seconds for initial release
public static final Setting<TimeValue> LIFECYCLE_POLL_INTERVAL_SETTING = Setting.positiveTimeSetting("indices.lifecycle.poll_interval",
TimeValue.timeValueSeconds(3), Setting.Property.Dynamic, Setting.Property.NodeScope);
public static final Setting<String> LIFECYCLE_NAME_SETTING = Setting.simpleString("index.lifecycle.name",
Setting.Property.Dynamic, Setting.Property.IndexScope);
public static final Setting<String> LIFECYCLE_TIMESERIES_PHASE_SETTING = Setting.simpleString("index.lifecycle.phase",
public static final Setting<String> LIFECYCLE_PHASE_SETTING = Setting.simpleString("index.lifecycle.phase",
Setting.Property.Dynamic, Setting.Property.IndexScope);
public static final Setting<String> LIFECYCLE_TIMESERIES_ACTION_SETTING = Setting.simpleString("index.lifecycle.action",
public static final Setting<String> LIFECYCLE_ACTION_SETTING = Setting.simpleString("index.lifecycle.action",
Setting.Property.Dynamic, Setting.Property.IndexScope);
public IndexLifecycle(Settings settings) {
@ -86,9 +90,10 @@ public class IndexLifecycle extends Plugin {
@Override
public List<Setting<?>> getSettings() {
return Arrays.asList(
LIFECYCLE_TIMESERIES_NAME_SETTING,
LIFECYCLE_TIMESERIES_PHASE_SETTING,
LIFECYCLE_TIMESERIES_ACTION_SETTING);
LIFECYCLE_POLL_INTERVAL_SETTING,
LIFECYCLE_NAME_SETTING,
LIFECYCLE_PHASE_SETTING,
LIFECYCLE_ACTION_SETTING);
}
public Collection<Object> createComponents(Client client, ClusterService clusterService, Clock clock,
@ -97,7 +102,7 @@ public class IndexLifecycle extends Plugin {
return emptyList();
}
indexLifecycleInitialisationService
.set(new IndexLifecycleService(settings, client, clusterService, clock, threadPool));
.set(new IndexLifecycleService(settings, client, clusterService, clock, threadPool, System::currentTimeMillis));
return Collections.singletonList(indexLifecycleInitialisationService.get());
}

View File

@ -34,27 +34,22 @@ import java.util.TreeMap;
public class IndexLifecycleMetadata implements MetaData.Custom {
public static final String TYPE = "index_lifecycle";
public static final ParseField POLICIES_FIELD = new ParseField("policies");
public static final ParseField POLL_INTERVAL_FIELD = new ParseField("poll_interval");
public static final IndexLifecycleMetadata EMPTY_METADATA = new IndexLifecycleMetadata(Collections.emptySortedMap(), 3);
@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<IndexLifecycleMetadata, NamedXContentRegistry> PARSER = new ConstructingObjectParser<>(
TYPE, a -> new IndexLifecycleMetadata(
ObjectParserUtils.convertListToMapValues(LifecyclePolicy::getName, (List<LifecyclePolicy>) a[0]), (long) a[1]));
ObjectParserUtils.convertListToMapValues(LifecyclePolicy::getName, (List<LifecyclePolicy>) a[0])));
static {
PARSER.declareNamedObjects(ConstructingObjectParser.constructorArg(), (p, c, n) -> LifecyclePolicy.parse(p, new Tuple<>(n, c)),
v -> {
throw new IllegalArgumentException("ordered " + POLICIES_FIELD.getPreferredName() + " are not supported");
}, POLICIES_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), POLL_INTERVAL_FIELD);
}
private final SortedMap<String, LifecyclePolicy> policies;
private final long pollInterval;
public IndexLifecycleMetadata(SortedMap<String, LifecyclePolicy> policies, long pollInterval) {
public IndexLifecycleMetadata(SortedMap<String, LifecyclePolicy> policies) {
this.policies = Collections.unmodifiableSortedMap(policies);
this.pollInterval = pollInterval;
}
public IndexLifecycleMetadata(StreamInput in) throws IOException {
@ -64,7 +59,6 @@ public class IndexLifecycleMetadata implements MetaData.Custom {
policies.put(in.readString(), in.readNamedWriteable(LifecyclePolicy.class));
}
this.policies = policies;
this.pollInterval = in.readVLong();
}
@Override
@ -74,17 +68,12 @@ public class IndexLifecycleMetadata implements MetaData.Custom {
out.writeString(entry.getKey());
out.writeNamedWriteable(entry.getValue());
}
out.writeVLong(pollInterval);
}
public SortedMap<String, LifecyclePolicy> getPolicies() {
return policies;
}
public long getPollInterval() {
return pollInterval;
}
@Override
public Diff<Custom> diff(Custom previousState) {
return new IndexLifecycleMetadataDiff((IndexLifecycleMetadata) previousState, this);
@ -93,7 +82,6 @@ public class IndexLifecycleMetadata implements MetaData.Custom {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(POLICIES_FIELD.getPreferredName(), policies);
builder.field(POLL_INTERVAL_FIELD.getPreferredName(), pollInterval);
return builder;
}
@ -114,7 +102,7 @@ public class IndexLifecycleMetadata implements MetaData.Custom {
@Override
public int hashCode() {
return Objects.hash(policies, pollInterval);
return Objects.hash(policies);
}
@Override
@ -126,8 +114,7 @@ public class IndexLifecycleMetadata implements MetaData.Custom {
return false;
}
IndexLifecycleMetadata other = (IndexLifecycleMetadata) obj;
return Objects.equals(policies, other.policies) &&
Objects.equals(pollInterval, other.pollInterval);
return Objects.equals(policies, other.policies);
}
@Override
@ -138,7 +125,6 @@ public class IndexLifecycleMetadata implements MetaData.Custom {
public static class IndexLifecycleMetadataDiff implements NamedDiff<MetaData.Custom> {
final Diff<Map<String, LifecyclePolicy>> policies;
final Long pollIntervalDiff;
IndexLifecycleMetadataDiff(IndexLifecycleMetadata before, IndexLifecycleMetadata after) {
this.policies = DiffableUtils.diff(before.policies, after.policies, DiffableUtils.getStringKeySerializer(),
@ -174,26 +160,22 @@ public class IndexLifecycleMetadata implements MetaData.Custom {
}
});
this.pollIntervalDiff = after.pollInterval - before.pollInterval;
}
public IndexLifecycleMetadataDiff(StreamInput in) throws IOException {
this.policies = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(),
(i) -> i.readNamedWriteable(LifecyclePolicy.class), IndexLifecycleMetadataDiff::readLifecyclePolicyDiffFrom);
this.pollIntervalDiff = in.readZLong();
}
@Override
public MetaData.Custom apply(MetaData.Custom part) {
TreeMap<String, LifecyclePolicy> newPolicies = new TreeMap<>(policies.apply(((IndexLifecycleMetadata) part).policies));
long pollInterval = ((IndexLifecycleMetadata) part).pollInterval + pollIntervalDiff;
return new IndexLifecycleMetadata(newPolicies, pollInterval);
return new IndexLifecycleMetadata(newPolicies);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
policies.writeTo(out);
out.writeZLong(pollIntervalDiff);
}
@Override

View File

@ -19,6 +19,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule;
@ -28,11 +29,10 @@ import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule.Interva
import java.io.Closeable;
import java.io.IOException;
import java.time.Clock;
import java.util.Collections;
import java.util.SortedMap;
import java.util.function.LongSupplier;
import static org.elasticsearch.xpack.indexlifecycle.IndexLifecycle.NAME;
/**
* A service which runs the {@link LifecyclePolicy}s associated with indexes.
*/
@ -46,34 +46,65 @@ public class IndexLifecycleService extends AbstractComponent
private ClusterService clusterService;
private ThreadPool threadPool;
private LongSupplier nowSupplier;
private SchedulerEngine.Job scheduledJob;
public IndexLifecycleService(Settings settings, Client client, ClusterService clusterService, Clock clock,
ThreadPool threadPool) {
ThreadPool threadPool, LongSupplier nowSupplier) {
super(settings);
this.client = client;
this.clusterService = clusterService;
this.clock = clock;
this.threadPool = threadPool;
this.nowSupplier = () -> System.currentTimeMillis();
this.nowSupplier = nowSupplier;
this.scheduledJob = null;
clusterService.addListener(this);
}
SchedulerEngine getScheduler() {
return scheduler.get();
}
SchedulerEngine.Job getScheduledJob() {
return scheduledJob;
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.localNodeMaster()) {
MetaData metaData = event.state().metaData();
installMetadata(metaData);
if (event.localNodeMaster()) { // only act if we are master, otherwise keep idle until elected
IndexLifecycleMetadata lifecycleMetadata = event.state().metaData().custom(IndexLifecycleMetadata.TYPE);
TimeValue pollInterval = IndexLifecycle.LIFECYCLE_POLL_INTERVAL_SETTING
.get(event.state().getMetaData().settings());
TimeValue previousPollInterval = IndexLifecycle.LIFECYCLE_POLL_INTERVAL_SETTING
.get(event.previousState().getMetaData().settings());
boolean pollIntervalSettingChanged = !pollInterval.equals(previousPollInterval);
if (lifecycleMetadata == null) { // no lifecycle metadata, install initial empty metadata state
lifecycleMetadata = new IndexLifecycleMetadata(Collections.emptySortedMap());
installMetadata(lifecycleMetadata);
} else if (scheduler.get() == null) { // metadata installed and scheduler should be kicked off. start your engines.
scheduler.set(new SchedulerEngine(clock));
scheduler.get().register(this);
scheduledJob = new SchedulerEngine.Job(IndexLifecycle.NAME,
new IntervalSchedule(new Interval(pollInterval.seconds(), Unit.SECONDS)));
scheduler.get().add(scheduledJob);
} else if (pollIntervalSettingChanged) { // all engines are running, just need to update with latest interval
scheduledJob = new SchedulerEngine.Job(IndexLifecycle.NAME,
new IntervalSchedule(new Interval(pollInterval.seconds(), Unit.SECONDS)));
scheduler.get().add(scheduledJob);
}
}
}
@Override
public void triggered(SchedulerEngine.Event event) {
if (event.getJobName().equals(NAME)) {
if (event.getJobName().equals(IndexLifecycle.NAME)) {
logger.info("Job triggered: " + event.getJobName() + ", " + event.getScheduledTime() + ", " + event.getTriggeredTime());
IndexLifecycleMetadata indexLifecycleMetadata = clusterService.state().metaData().custom(IndexLifecycleMetadata.TYPE);
SortedMap<String, LifecyclePolicy> policies = indexLifecycleMetadata.getPolicies();
clusterService.state().getMetaData().getIndices().valuesIt().forEachRemaining((idxMeta) -> {
String policyName = IndexLifecycle.LIFECYCLE_TIMESERIES_NAME_SETTING.get(idxMeta.getSettings());
clusterService.state().metaData().indices().valuesIt().forEachRemaining((idxMeta) -> {
String policyName = IndexLifecycle.LIFECYCLE_NAME_SETTING.get(idxMeta.getSettings());
if (Strings.isNullOrEmpty(policyName) == false) {
logger.info("Checking index for next action: " + idxMeta.getIndex().getName() + " (" + policyName + ")");
LifecyclePolicy policy = policies.get(policyName);
@ -83,37 +114,23 @@ public class IndexLifecycleService extends AbstractComponent
}
}
private void installMetadata(MetaData metaData) {
IndexLifecycleMetadata indexLifecycleMetadata = metaData.custom(IndexLifecycleMetadata.TYPE);
if (indexLifecycleMetadata == null) {
threadPool.executor(ThreadPool.Names.GENERIC)
.execute(() -> clusterService.submitStateUpdateTask("install-index-lifecycle-metadata", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
// If the metadata has been added already don't try to update
if (currentState.metaData().custom(IndexLifecycleMetadata.TYPE) != null) {
return currentState;
}
ClusterState.Builder builder = new ClusterState.Builder(currentState);
MetaData.Builder metadataBuilder = MetaData.builder(currentState.metaData());
metadataBuilder.putCustom(IndexLifecycleMetadata.TYPE, IndexLifecycleMetadata.EMPTY_METADATA);
builder.metaData(metadataBuilder.build());
return builder.build();
}
private void installMetadata(IndexLifecycleMetadata lifecycleMetadata) {
threadPool.executor(ThreadPool.Names.GENERIC)
.execute(() -> clusterService.submitStateUpdateTask("install-index-lifecycle-metadata", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
ClusterState.Builder builder = new ClusterState.Builder(currentState);
MetaData.Builder metadataBuilder = MetaData.builder(currentState.metaData());
metadataBuilder.putCustom(IndexLifecycleMetadata.TYPE, lifecycleMetadata);
builder.metaData(metadataBuilder.build());
return builder.build();
}
@Override
public void onFailure(String source, Exception e) {
logger.error("unable to install index lifecycle metadata", e);
}
}));
} else {
if (scheduler.get() == null) {
scheduler.set(new SchedulerEngine(clock));
scheduler.get().register(this);
scheduler.get().add(new SchedulerEngine.Job(NAME,
new IntervalSchedule(new Interval(indexLifecycleMetadata.getPollInterval(), Unit.SECONDS))));
}
}
@Override
public void onFailure(String source, Exception e) {
logger.error("unable to install index lifecycle metadata", e);
}
}));
}
@Override

View File

@ -45,24 +45,24 @@ public class InternalIndexLifecycleContext implements IndexLifecycleContext {
@Override
public void setPhase(String phase, Listener listener) {
writeSettings(idxMeta.getIndex().getName(),
Settings.builder().put(IndexLifecycle.LIFECYCLE_TIMESERIES_PHASE_SETTING.getKey(), phase)
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), "").build(), listener);
Settings.builder().put(IndexLifecycle.LIFECYCLE_PHASE_SETTING.getKey(), phase)
.put(IndexLifecycle.LIFECYCLE_ACTION_SETTING.getKey(), "").build(), listener);
}
@Override
public String getPhase() {
return IndexLifecycle.LIFECYCLE_TIMESERIES_PHASE_SETTING.get(idxMeta.getSettings());
return IndexLifecycle.LIFECYCLE_PHASE_SETTING.get(idxMeta.getSettings());
}
@Override
public void setAction(String action, Listener listener) {
writeSettings(idxMeta.getIndex().getName(),
Settings.builder().put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), action).build(), listener);
Settings.builder().put(IndexLifecycle.LIFECYCLE_ACTION_SETTING.getKey(), action).build(), listener);
}
@Override
public String getAction() {
return IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.get(idxMeta.getSettings());
return IndexLifecycle.LIFECYCLE_ACTION_SETTING.get(idxMeta.getSettings());
}
@Override

View File

@ -208,7 +208,7 @@ public class DeleteLifecycleAction
}
SortedMap<String, LifecyclePolicy> newPolicies = new TreeMap<>(currentMetadata.getPolicies());
newPolicies.remove(request.getPolicyName());
IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata(newPolicies, currentMetadata.getPollInterval());
IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata(newPolicies);
newState.metaData(MetaData.builder(currentState.getMetaData())
.putCustom(IndexLifecycleMetadata.TYPE, newMetadata).build());
return newState.build();

View File

@ -235,7 +235,7 @@ public class PutLifecycleAction extends Action<PutLifecycleAction.Request, PutLi
}
SortedMap<String, LifecyclePolicy> newPolicies = new TreeMap<>(currentMetadata.getPolicies());
newPolicies.put(request.getPolicy().getName(), request.getPolicy());
IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata(newPolicies, currentMetadata.getPollInterval());
IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata(newPolicies);
newState.metaData(MetaData.builder(currentState.getMetaData())
.putCustom(IndexLifecycleMetadata.TYPE, newMetadata).build());
return newState.build();

View File

@ -58,8 +58,7 @@ public class IndexLifecycleMetadataTests extends AbstractDiffableSerializationTe
String policyName = randomAlphaOfLength(10);
policies.put(policyName, new TestLifecyclePolicy(policyName, phases));
}
long pollInterval = randomNonNegativeLong();
return new IndexLifecycleMetadata(policies, pollInterval);
return new IndexLifecycleMetadata(policies);
}
@Override
@ -84,20 +83,10 @@ public class IndexLifecycleMetadataTests extends AbstractDiffableSerializationTe
protected MetaData.Custom mutateInstance(MetaData.Custom instance) {
IndexLifecycleMetadata metadata = (IndexLifecycleMetadata) instance;
SortedMap<String, LifecyclePolicy> policies = metadata.getPolicies();
long pollInterval = metadata.getPollInterval();
switch (between(0, 1)) {
case 0:
pollInterval = pollInterval + randomIntBetween(1, 1000);
break;
case 1:
policies = new TreeMap<>(policies);
String policyName = randomAlphaOfLength(10);
policies.put(policyName, new TestLifecyclePolicy(policyName, Collections.emptyList()));
break;
default:
throw new AssertionError("Illegal randomisation branch");
}
return new IndexLifecycleMetadata(policies, pollInterval);
policies = new TreeMap<>(policies);
String policyName = randomAlphaOfLength(10);
policies.put(policyName, new TestLifecyclePolicy(policyName, Collections.emptyList()));
return new IndexLifecycleMetadata(policies);
}
@Override

View File

@ -0,0 +1,238 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.indexlifecycle;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsAction;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsTestHelper;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule;
import org.junit.After;
import org.junit.Before;
import java.io.IOException;
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Collections;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import static org.elasticsearch.node.Node.NODE_MASTER_SETTING;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.only;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class IndexLifecycleServiceTests extends ESTestCase {
private ClusterService clusterService;
private IndexLifecycleService indexLifecycleService;
private String nodeId;
private DiscoveryNode masterNode;
private IndicesAdminClient indicesClient;
@Before
public void prepareServices() {
nodeId = randomAlphaOfLength(10);
ThreadPool threadPool = mock(ThreadPool.class);
ExecutorService executorService = mock(ExecutorService.class);
clusterService = mock(ClusterService.class);
masterNode = DiscoveryNode.createLocal(settings(Version.CURRENT)
.put(NODE_MASTER_SETTING.getKey(), true).build(),
new TransportAddress(TransportAddress.META_ADDRESS, 9300), nodeId);
long randomMilli = randomNonNegativeLong();
Clock clock = Clock.fixed(Instant.ofEpochMilli(randomMilli), ZoneId.of(randomFrom(ZoneId.getAvailableZoneIds())));
doAnswer(invocationOnMock -> null).when(clusterService).addListener(any());
when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService);
doAnswer(invocationOnMock -> {
Runnable runnable = (Runnable) invocationOnMock.getArguments()[0];
runnable.run();
return null;
}).when(executorService).execute(any());
Client client = mock(Client.class);
AdminClient adminClient = mock(AdminClient.class);
indicesClient = mock(IndicesAdminClient.class);
when(client.admin()).thenReturn(adminClient);
when(adminClient.indices()).thenReturn(indicesClient);
indexLifecycleService = new IndexLifecycleService(Settings.EMPTY, client, clusterService, clock,
threadPool, () -> randomMilli);
}
public void testOnlyChangesStateOnMaster() throws Exception {
MetaData metaData = MetaData.builder()
.persistentSettings(settings(Version.CURRENT)
.put(IndexLifecycle.LIFECYCLE_POLL_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(3)).build())
.build();
ClusterState state = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.nodes(DiscoveryNodes.builder().localNodeId(nodeId + "not").masterNodeId(nodeId).add(masterNode).build())
.build();
ClusterChangedEvent event = new ClusterChangedEvent("_source", state, state);
indexLifecycleService.clusterChanged(event);
verify(clusterService, only()).addListener(any());
assertNull(indexLifecycleService.getScheduler());
}
public void testServiceSetupOnFirstClusterChange() {
TimeValue pollInterval = TimeValue.timeValueSeconds(randomIntBetween(1, 59));
MetaData metaData = MetaData.builder() .persistentSettings(settings(Version.CURRENT)
.put(IndexLifecycle.LIFECYCLE_POLL_INTERVAL_SETTING.getKey(), pollInterval).build())
.build();
ClusterState state = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.nodes(DiscoveryNodes.builder().localNodeId(nodeId).masterNodeId(nodeId).add(masterNode).build())
.build();
ClusterChangedEvent event = new ClusterChangedEvent("_source", state, state);
final SetOnce<ClusterChangedEvent> installedEvent = new SetOnce<>();
doAnswer(invocationOnMock -> {
ClusterStateUpdateTask updateTask = (ClusterStateUpdateTask) invocationOnMock.getArguments()[1];
ClusterState newState = updateTask.execute(state);
IndexLifecycleMetadata indexLifecycleMetadata = newState.metaData().custom(IndexLifecycleMetadata.TYPE);
assertThat(indexLifecycleMetadata.getPolicies(), equalTo(Collections.emptySortedMap()));
installedEvent.set(new ClusterChangedEvent(event.source(), newState, state));
return null;
}).when(clusterService).submitStateUpdateTask(anyString(), any(ClusterStateUpdateTask.class));
indexLifecycleService.clusterChanged(event);
verify(clusterService).addListener(any());
verify(clusterService).submitStateUpdateTask(anyString(), any(ClusterStateUpdateTask.class));
assertNull(indexLifecycleService.getScheduler());
}
@After
public void cleanup() throws IOException {
indexLifecycleService.close();
}
public void testSchedulerInitializationAndUpdate() {
TimeValue pollInterval = TimeValue.timeValueSeconds(randomIntBetween(1, 59));
MetaData metaData = MetaData.builder()
.putCustom(IndexLifecycleMetadata.TYPE, new IndexLifecycleMetadata(Collections.emptySortedMap()))
.persistentSettings(settings(Version.CURRENT).build())
.build();
MetaData updatedPollMetaData = MetaData.builder(metaData).persistentSettings(settings(Version.CURRENT)
.put(IndexLifecycle.LIFECYCLE_POLL_INTERVAL_SETTING.getKey(), pollInterval).build())
.build();
ClusterState previousState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.nodes(DiscoveryNodes.builder().localNodeId(nodeId).masterNodeId(nodeId).add(masterNode).build())
.build();
ClusterState currentState = ClusterState.builder(previousState)
.metaData(updatedPollMetaData)
.build();
ClusterChangedEvent event = new ClusterChangedEvent("_source", currentState, previousState);
indexLifecycleService.clusterChanged(new ClusterChangedEvent("_source", previousState, previousState));
assertThat(indexLifecycleService.getScheduler().jobCount(), equalTo(1));
assertThat(((IntervalSchedule)indexLifecycleService.getScheduledJob().getSchedule()).interval(),
equalTo(new IntervalSchedule.Interval(3, IntervalSchedule.Interval.Unit.SECONDS)));
indexLifecycleService.clusterChanged(event);
assertThat(indexLifecycleService.getScheduler().jobCount(), equalTo(1));
assertThat(((IntervalSchedule)indexLifecycleService.getScheduledJob().getSchedule()).interval(),
equalTo(new IntervalSchedule.Interval(pollInterval.seconds(), IntervalSchedule.Interval.Unit.SECONDS)));
verify(clusterService, only()).addListener(any());
verify(clusterService, never()).submitStateUpdateTask(anyString(), any(ClusterStateUpdateTask.class));
}
public void testInstallMetadataFail() {
TimeValue pollInterval = TimeValue.timeValueSeconds(randomIntBetween(1, 59));
MetaData metaData = MetaData.builder() .persistentSettings(settings(Version.CURRENT)
.put(IndexLifecycle.LIFECYCLE_POLL_INTERVAL_SETTING.getKey(), pollInterval).build())
.build();
ClusterState state = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.nodes(DiscoveryNodes.builder().localNodeId(nodeId).masterNodeId(nodeId).add(masterNode).build())
.build();
ClusterChangedEvent event = new ClusterChangedEvent("_source", state, state);
doThrow(new RuntimeException("error")).when(clusterService).submitStateUpdateTask(anyString(), any(ClusterStateUpdateTask.class));
Exception exception = expectThrows(RuntimeException.class, () -> indexLifecycleService.clusterChanged(event));
assertThat(exception.getMessage(), equalTo("error"));
verify(clusterService).addListener(any());
verify(clusterService).submitStateUpdateTask(anyString(), any(ClusterStateUpdateTask.class));
assertNull(indexLifecycleService.getScheduler());
}
@SuppressWarnings("unchecked")
public void testTriggeredWithMatchingPolicy() {
String policyName = randomAlphaOfLengthBetween(1, 20);
MockAction mockAction = new MockAction();
Phase phase = new Phase("phase", TimeValue.ZERO, Collections.singletonMap("action", mockAction));
LifecyclePolicy policy = new TestLifecyclePolicy(policyName, Collections.singletonList(phase));
SortedMap<String, LifecyclePolicy> policyMap = new TreeMap<>();
policyMap.put(policyName, policy);
Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
IndexMetaData indexMetadata = IndexMetaData.builder(index.getName())
.settings(settings(Version.CURRENT).put(IndexLifecycle.LIFECYCLE_NAME_SETTING.getKey(), policyName))
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
ImmutableOpenMap.Builder<String, IndexMetaData> indices = ImmutableOpenMap.<String, IndexMetaData> builder()
.fPut(index.getName(), indexMetadata);
MetaData metaData = MetaData.builder()
.putCustom(IndexLifecycleMetadata.TYPE, new IndexLifecycleMetadata(policyMap))
.indices(indices.build())
.persistentSettings(settings(Version.CURRENT).build())
.build();
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.nodes(DiscoveryNodes.builder().localNodeId(nodeId).masterNodeId(nodeId).add(masterNode).build())
.build();
SchedulerEngine.Event schedulerEvent = new SchedulerEngine.Event(IndexLifecycle.NAME, randomLong(), randomLong());
when(clusterService.state()).thenReturn(currentState);
doAnswer(invocationOnMock -> {
ActionListener<UpdateSettingsResponse> listener = (ActionListener) invocationOnMock.getArguments()[1];
listener.onResponse(UpdateSettingsTestHelper.createMockResponse(true));
return null;
}).when(indicesClient).updateSettings(any(), any());
indexLifecycleService.triggered(schedulerEvent);
assertThat(mockAction.getExecutedCount(), equalTo(1L));
}
}

View File

@ -32,8 +32,8 @@ public class InternalIndexLifecycleContextTests extends ESTestCase {
String indexName = "test";
IndexMetaData idxMeta = IndexMetaData.builder(indexName)
.settings(Settings.builder().put("index.version.created", 7000001L).put("index.creation_date", creationDate)
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_PHASE_SETTING.getKey(), randomAlphaOfLengthBetween(1, 20))
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), randomAlphaOfLengthBetween(1, 20)).build())
.put(IndexLifecycle.LIFECYCLE_PHASE_SETTING.getKey(), randomAlphaOfLengthBetween(1, 20))
.put(IndexLifecycle.LIFECYCLE_ACTION_SETTING.getKey(), randomAlphaOfLengthBetween(1, 20)).build())
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
Client client = Mockito.mock(Client.class);
@ -49,8 +49,8 @@ public class InternalIndexLifecycleContextTests extends ESTestCase {
UpdateSettingsRequest request = (UpdateSettingsRequest) invocation.getArguments()[0];
@SuppressWarnings("unchecked")
ActionListener<UpdateSettingsResponse> listener = (ActionListener<UpdateSettingsResponse>) invocation.getArguments()[1];
Settings expectedSettings = Settings.builder().put(IndexLifecycle.LIFECYCLE_TIMESERIES_PHASE_SETTING.getKey(), newPhase)
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), "").build();
Settings expectedSettings = Settings.builder().put(IndexLifecycle.LIFECYCLE_PHASE_SETTING.getKey(), newPhase)
.put(IndexLifecycle.LIFECYCLE_ACTION_SETTING.getKey(), "").build();
UpdateSettingsTestHelper.assertSettingsRequest(request, expectedSettings, indexName);
listener.onResponse(UpdateSettingsTestHelper.createMockResponse(true));
return null;
@ -90,8 +90,8 @@ public class InternalIndexLifecycleContextTests extends ESTestCase {
String indexName = "test";
IndexMetaData idxMeta = IndexMetaData.builder(indexName)
.settings(Settings.builder().put("index.version.created", 7000001L).put("index.creation_date", creationDate)
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_PHASE_SETTING.getKey(), randomAlphaOfLengthBetween(1, 20))
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), randomAlphaOfLengthBetween(1, 20)).build())
.put(IndexLifecycle.LIFECYCLE_PHASE_SETTING.getKey(), randomAlphaOfLengthBetween(1, 20))
.put(IndexLifecycle.LIFECYCLE_ACTION_SETTING.getKey(), randomAlphaOfLengthBetween(1, 20)).build())
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
Client client = Mockito.mock(Client.class);
@ -107,8 +107,8 @@ public class InternalIndexLifecycleContextTests extends ESTestCase {
UpdateSettingsRequest request = (UpdateSettingsRequest) invocation.getArguments()[0];
@SuppressWarnings("unchecked")
ActionListener<UpdateSettingsResponse> listener = (ActionListener<UpdateSettingsResponse>) invocation.getArguments()[1];
Settings expectedSettings = Settings.builder().put(IndexLifecycle.LIFECYCLE_TIMESERIES_PHASE_SETTING.getKey(), newPhase)
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), "").build();
Settings expectedSettings = Settings.builder().put(IndexLifecycle.LIFECYCLE_PHASE_SETTING.getKey(), newPhase)
.put(IndexLifecycle.LIFECYCLE_ACTION_SETTING.getKey(), "").build();
UpdateSettingsTestHelper.assertSettingsRequest(request, expectedSettings, indexName);
listener.onResponse(UpdateSettingsTestHelper.createMockResponse(false));
return null;
@ -149,8 +149,8 @@ public class InternalIndexLifecycleContextTests extends ESTestCase {
String indexName = "test";
IndexMetaData idxMeta = IndexMetaData.builder(indexName)
.settings(Settings.builder().put("index.version.created", 7000001L).put("index.creation_date", creationDate)
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_PHASE_SETTING.getKey(), randomAlphaOfLengthBetween(1, 20))
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), randomAlphaOfLengthBetween(1, 20)).build())
.put(IndexLifecycle.LIFECYCLE_PHASE_SETTING.getKey(), randomAlphaOfLengthBetween(1, 20))
.put(IndexLifecycle.LIFECYCLE_ACTION_SETTING.getKey(), randomAlphaOfLengthBetween(1, 20)).build())
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
Exception exception = new RuntimeException();
@ -168,8 +168,8 @@ public class InternalIndexLifecycleContextTests extends ESTestCase {
UpdateSettingsRequest request = (UpdateSettingsRequest) invocation.getArguments()[0];
@SuppressWarnings("unchecked")
ActionListener<UpdateSettingsResponse> listener = (ActionListener<UpdateSettingsResponse>) invocation.getArguments()[1];
Settings expectedSettings = Settings.builder().put(IndexLifecycle.LIFECYCLE_TIMESERIES_PHASE_SETTING.getKey(), newPhase)
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), "").build();
Settings expectedSettings = Settings.builder().put(IndexLifecycle.LIFECYCLE_PHASE_SETTING.getKey(), newPhase)
.put(IndexLifecycle.LIFECYCLE_ACTION_SETTING.getKey(), "").build();
UpdateSettingsTestHelper.assertSettingsRequest(request, expectedSettings, indexName);
listener.onFailure(exception);
return null;
@ -209,7 +209,7 @@ public class InternalIndexLifecycleContextTests extends ESTestCase {
String phase = randomAlphaOfLengthBetween(1, 20);
IndexMetaData idxMeta = IndexMetaData.builder("test")
.settings(Settings.builder().put("index.version.created", 7000001L).put("index.creation_date", creationDate)
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_PHASE_SETTING.getKey(), phase).build())
.put(IndexLifecycle.LIFECYCLE_PHASE_SETTING.getKey(), phase).build())
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, null, null, () -> {
@ -237,7 +237,7 @@ public class InternalIndexLifecycleContextTests extends ESTestCase {
String indexName = "test";
IndexMetaData idxMeta = IndexMetaData.builder(indexName)
.settings(Settings.builder().put("index.version.created", 7000001L).put("index.creation_date", creationDate)
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), randomAlphaOfLengthBetween(1, 20)).build())
.put(IndexLifecycle.LIFECYCLE_ACTION_SETTING.getKey(), randomAlphaOfLengthBetween(1, 20)).build())
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
Client client = Mockito.mock(Client.class);
@ -253,7 +253,7 @@ public class InternalIndexLifecycleContextTests extends ESTestCase {
UpdateSettingsRequest request = (UpdateSettingsRequest) invocation.getArguments()[0];
@SuppressWarnings("unchecked")
ActionListener<UpdateSettingsResponse> listener = (ActionListener<UpdateSettingsResponse>) invocation.getArguments()[1];
Settings expectedSettings = Settings.builder().put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), newAction)
Settings expectedSettings = Settings.builder().put(IndexLifecycle.LIFECYCLE_ACTION_SETTING.getKey(), newAction)
.build();
UpdateSettingsTestHelper.assertSettingsRequest(request, expectedSettings, indexName);
listener.onResponse(UpdateSettingsTestHelper.createMockResponse(true));
@ -294,7 +294,7 @@ public class InternalIndexLifecycleContextTests extends ESTestCase {
String indexName = "test";
IndexMetaData idxMeta = IndexMetaData.builder(indexName)
.settings(Settings.builder().put("index.version.created", 7000001L).put("index.creation_date", creationDate)
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), randomAlphaOfLengthBetween(1, 20)).build())
.put(IndexLifecycle.LIFECYCLE_ACTION_SETTING.getKey(), randomAlphaOfLengthBetween(1, 20)).build())
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
Client client = Mockito.mock(Client.class);
@ -310,7 +310,7 @@ public class InternalIndexLifecycleContextTests extends ESTestCase {
UpdateSettingsRequest request = (UpdateSettingsRequest) invocation.getArguments()[0];
@SuppressWarnings("unchecked")
ActionListener<UpdateSettingsResponse> listener = (ActionListener<UpdateSettingsResponse>) invocation.getArguments()[1];
Settings expectedSettings = Settings.builder().put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), newAction)
Settings expectedSettings = Settings.builder().put(IndexLifecycle.LIFECYCLE_ACTION_SETTING.getKey(), newAction)
.build();
UpdateSettingsTestHelper.assertSettingsRequest(request, expectedSettings, indexName);
listener.onResponse(UpdateSettingsTestHelper.createMockResponse(false));
@ -352,7 +352,7 @@ public class InternalIndexLifecycleContextTests extends ESTestCase {
String indexName = "test";
IndexMetaData idxMeta = IndexMetaData.builder(indexName)
.settings(Settings.builder().put("index.version.created", 7000001L).put("index.creation_date", creationDate)
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), randomAlphaOfLengthBetween(1, 20)).build())
.put(IndexLifecycle.LIFECYCLE_ACTION_SETTING.getKey(), randomAlphaOfLengthBetween(1, 20)).build())
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
Exception exception = new RuntimeException();
@ -370,7 +370,7 @@ public class InternalIndexLifecycleContextTests extends ESTestCase {
UpdateSettingsRequest request = (UpdateSettingsRequest) invocation.getArguments()[0];
@SuppressWarnings("unchecked")
ActionListener<UpdateSettingsResponse> listener = (ActionListener<UpdateSettingsResponse>) invocation.getArguments()[1];
Settings expectedSettings = Settings.builder().put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), newAction)
Settings expectedSettings = Settings.builder().put(IndexLifecycle.LIFECYCLE_ACTION_SETTING.getKey(), newAction)
.build();
UpdateSettingsTestHelper.assertSettingsRequest(request, expectedSettings, indexName);
listener.onFailure(exception);
@ -411,7 +411,7 @@ public class InternalIndexLifecycleContextTests extends ESTestCase {
String action = randomAlphaOfLengthBetween(1, 20);
IndexMetaData idxMeta = IndexMetaData.builder("test")
.settings(Settings.builder().put("index.version.created", 7000001L).put("index.creation_date", creationDate)
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), action).build())
.put(IndexLifecycle.LIFECYCLE_ACTION_SETTING.getKey(), action).build())
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, null, null, () -> {