Refactors steps and adds some execution logic
This is far from complete but its necessary to push so the new step classes can be shared and used.
This commit is contained in:
parent
e6ee5b49d1
commit
179074d3b0
|
@ -0,0 +1,33 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.index.Index;
|
||||
|
||||
public abstract class AsyncActionStep extends Step {
|
||||
|
||||
private Client client;
|
||||
|
||||
public AsyncActionStep(String name, String action, String phase, StepKey nextStepKey, Client client) {
|
||||
super(name, action, phase, nextStepKey);
|
||||
this.client = client;
|
||||
}
|
||||
|
||||
protected Client getClient() {
|
||||
return client;
|
||||
}
|
||||
|
||||
public abstract void performAction(Index index, Listener listener);
|
||||
|
||||
public static interface Listener {
|
||||
|
||||
void onResponse(boolean complete);
|
||||
|
||||
void onFailure(Exception e);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,32 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.index.Index;
|
||||
|
||||
public abstract class AsyncWaitStep extends Step {
|
||||
|
||||
private Client client;
|
||||
|
||||
public AsyncWaitStep(String name, String action, String phase, StepKey nextStepKey, Client client) {
|
||||
super(name, action, phase, nextStepKey);
|
||||
this.client = client;
|
||||
}
|
||||
|
||||
protected Client getClient() {
|
||||
return client;
|
||||
}
|
||||
|
||||
public abstract void evaluateCondition(Index index, Listener listener);
|
||||
|
||||
public static interface Listener {
|
||||
|
||||
void onResponse(boolean conditionMet);
|
||||
|
||||
void onFailure(Exception e);
|
||||
}
|
||||
}
|
|
@ -1,35 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequestBuilder;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.Step;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.StepResult;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.function.Function;
|
||||
|
||||
public class ClientStep<RequestBuilder extends ActionRequestBuilder, Response extends ActionResponse> extends Step {
|
||||
|
||||
private final RequestBuilder requestBuilder;
|
||||
private final Function<ClusterState, Boolean> checkComplete;
|
||||
private final Function<Response, Boolean> checkSuccess;
|
||||
private Exception returnedException;
|
||||
private boolean returnedSuccess;
|
||||
|
||||
public ClientStep(String name, String action, String phase, String index, StepKey nextStepKey, RequestBuilder requestBuilder,
|
||||
Function<ClusterState, Boolean> checkComplete, Function<Response, Boolean> checkSuccess) {
|
||||
super(name, action, phase, nextStepKey);
|
||||
this.requestBuilder = requestBuilder;
|
||||
this.checkComplete = checkComplete;
|
||||
this.checkSuccess = checkSuccess;
|
||||
this.returnedException = null;
|
||||
this.returnedSuccess = false;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,19 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.index.Index;
|
||||
|
||||
public abstract class ClusterStateActionStep extends Step {
|
||||
|
||||
public ClusterStateActionStep(String name, String action, String phase, StepKey nextStepKey) {
|
||||
super(name, action, phase, nextStepKey);
|
||||
}
|
||||
|
||||
public abstract ClusterState performAction(Index index, ClusterState clusterState);
|
||||
|
||||
}
|
|
@ -1,36 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.Step;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.StepResult;
|
||||
|
||||
import java.util.function.Function;
|
||||
import java.util.function.LongSupplier;
|
||||
|
||||
public class ClusterStateUpdateStep extends Step {
|
||||
private final Function<ClusterState, ClusterState> updateTask;
|
||||
|
||||
public ClusterStateUpdateStep(String name, String index, String phase, String action, StepKey nextStepKey, Function<ClusterState, ClusterState> updateTask) {
|
||||
super(name, action, phase, nextStepKey);
|
||||
this.updateTask = updateTask;
|
||||
}
|
||||
|
||||
public StepResult execute(ClusterService clusterService, ClusterState currentState, Index index, Client client, LongSupplier nowSupplier) {
|
||||
ClusterState updated = null;
|
||||
try {
|
||||
updated = updateTask.apply(currentState);
|
||||
updated = updateStateWithNextStep(updated, nowSupplier, index);
|
||||
return new StepResult("done!", null, updated, true, true);
|
||||
} catch (Exception e) {
|
||||
return new StepResult("something went wrong", e, updated, true, true);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,19 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.index.Index;
|
||||
|
||||
public abstract class ClusterStateWaitStep extends Step {
|
||||
|
||||
public ClusterStateWaitStep(String name, String action, String phase, StepKey nextStepKey) {
|
||||
super(name, action, phase, nextStepKey);
|
||||
}
|
||||
|
||||
public abstract boolean isConditionMet(Index index, ClusterState clusterState);
|
||||
|
||||
}
|
|
@ -1,31 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.Step;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.StepResult;
|
||||
|
||||
import java.util.function.Function;
|
||||
import java.util.function.LongSupplier;
|
||||
|
||||
public class ConditionalWaitStep extends Step {
|
||||
private final Function<ClusterState, Boolean> condition;
|
||||
|
||||
public ConditionalWaitStep(String name, String phase, String action, StepKey nextStepKey, Function<ClusterState, Boolean> condition) {
|
||||
super(name, action, phase, nextStepKey);
|
||||
this.condition = condition;
|
||||
}
|
||||
|
||||
@Override
|
||||
public StepResult execute(ClusterService clusterService, ClusterState currentState, Index index, Client client, LongSupplier nowSupplier) {
|
||||
boolean isComplete = condition.apply(currentState);
|
||||
return new StepResult(String.valueOf(isComplete), null, currentState, true, isComplete);
|
||||
}
|
||||
}
|
|
@ -31,7 +31,7 @@ public class LifecycleSettings {
|
|||
Setting.Property.Dynamic, Setting.Property.IndexScope);
|
||||
public static final Setting<String> LIFECYCLE_ACTION_SETTING = Setting.simpleString(LIFECYCLE_ACTION,
|
||||
Setting.Property.Dynamic, Setting.Property.IndexScope);
|
||||
public static final Setting<String> LIFECYCLE_STEP_SETTING = Setting.simpleString(LIFECYCLE_ACTION,
|
||||
public static final Setting<String> LIFECYCLE_STEP_SETTING = Setting.simpleString(LIFECYCLE_STEP,
|
||||
Setting.Property.Dynamic, Setting.Property.IndexScope);
|
||||
public static final Setting<Long> LIFECYCLE_INDEX_CREATION_DATE_SETTING = Setting.longSetting(LIFECYCLE_INDEX_CREATION_DATE,
|
||||
-1L, -1L, Setting.Property.Dynamic, Setting.Property.IndexScope);
|
||||
|
@ -39,6 +39,6 @@ public class LifecycleSettings {
|
|||
-1L, -1L, Setting.Property.Dynamic, Setting.Property.IndexScope);
|
||||
public static final Setting<Long> LIFECYCLE_ACTION_TIME_SETTING = Setting.longSetting(LIFECYCLE_ACTION_TIME,
|
||||
-1L, -1L, Setting.Property.Dynamic, Setting.Property.IndexScope);
|
||||
public static final Setting<Long> LIFECYCLE_STEP_TIME_SETTING = Setting.longSetting(LIFECYCLE_ACTION_TIME,
|
||||
public static final Setting<Long> LIFECYCLE_STEP_TIME_SETTING = Setting.longSetting(LIFECYCLE_STEP_TIME,
|
||||
-1L, -1L, Setting.Property.Dynamic, Setting.Property.IndexScope);
|
||||
}
|
||||
|
|
|
@ -5,21 +5,12 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.Index;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.function.LongSupplier;
|
||||
|
||||
/**
|
||||
* A {@link LifecycleAction} which deletes the index.
|
||||
*/
|
||||
public class Step {
|
||||
public abstract class Step {
|
||||
private final String name;
|
||||
private final String action;
|
||||
private final String phase;
|
||||
|
@ -52,63 +43,6 @@ public class Step {
|
|||
return nextStepKey != null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes this step and updates the cluster state with the next step to run
|
||||
*
|
||||
* @param currentState
|
||||
* @param client
|
||||
* @param nowSupplier
|
||||
* @return
|
||||
*/
|
||||
public StepResult execute(ClusterService clusterService, ClusterState currentState, Index index, Client client, LongSupplier nowSupplier) {
|
||||
// Example: Delete
|
||||
//
|
||||
// client.admin().indices().prepareDelete(index.getName()).execute(new ActionListener<DeleteIndexResponse>() {
|
||||
// @Override
|
||||
// public void onResponse(DeleteIndexResponse deleteIndexResponse) {
|
||||
// if (deleteIndexResponse.isAcknowledged()) {
|
||||
// submitUpdateNextStepTask(clusterService, nowSupplier, index);
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// @Override
|
||||
// public void onFailure(Exception e) {
|
||||
//
|
||||
// }
|
||||
// });
|
||||
throw new UnsupportedOperationException("implement me");
|
||||
}
|
||||
|
||||
protected void submitUpdateNextStepTask(ClusterService clusterService, LongSupplier nowSupplier, Index index) {
|
||||
clusterService.submitStateUpdateTask("update-next-step", new ClusterStateUpdateTask() {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
return updateStateWithNextStep(currentState, nowSupplier, index);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected ClusterState updateStateWithNextStep(ClusterState currentState, LongSupplier nowSupplier, Index index) {
|
||||
long now = nowSupplier.getAsLong();
|
||||
// fetch details about next step to run and update the cluster state with this information
|
||||
Settings newLifecyclePhaseSettings = Settings.builder()
|
||||
.put(LifecycleSettings.LIFECYCLE_PHASE, nextStepKey.getPhase())
|
||||
.put(LifecycleSettings.LIFECYCLE_PHASE_TIME, now)
|
||||
.put(LifecycleSettings.LIFECYCLE_ACTION_TIME, now)
|
||||
.put(LifecycleSettings.LIFECYCLE_ACTION, nextStepKey.getAction())
|
||||
.put(LifecycleSettings.LIFECYCLE_STEP_TIME, now)
|
||||
.put(LifecycleSettings.LIFECYCLE_STEP, nextStepKey.getName())
|
||||
.build();
|
||||
return ClusterState.builder(currentState)
|
||||
.metaData(MetaData.builder(currentState.metaData())
|
||||
.updateSettings(newLifecyclePhaseSettings, index.getName())).build();
|
||||
}
|
||||
|
||||
public static class StepKey {
|
||||
private final String phase;
|
||||
|
||||
|
|
|
@ -0,0 +1,216 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.settings.Settings.Builder;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.AsyncActionStep;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.AsyncWaitStep;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateActionStep;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateWaitStep;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.Step;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
|
||||
|
||||
public class IndexLifecycleRunner {
|
||||
|
||||
private PolicyStepsRegistry stepRegistry;
|
||||
private ClusterService clusterService;
|
||||
|
||||
public IndexLifecycleRunner(PolicyStepsRegistry stepRegistry, ClusterService clusterService) {
|
||||
this.stepRegistry = stepRegistry;
|
||||
this.clusterService = clusterService;
|
||||
}
|
||||
|
||||
public void runPolicy(String policy, Index index, Settings indexSettings, Cause cause) {
|
||||
Step currentStep = getCurrentStep(policy, indexSettings);
|
||||
if (currentStep instanceof ClusterStateActionStep || currentStep instanceof ClusterStateWaitStep) {
|
||||
if (cause != Cause.SCHEDULE_TRIGGER) {
|
||||
executeClusterStateSteps(index, policy, currentStep);
|
||||
}
|
||||
} else if (currentStep instanceof AsyncWaitStep) {
|
||||
if (cause != Cause.CLUSTER_STATE_CHANGE) {
|
||||
((AsyncWaitStep) currentStep).evaluateCondition(index, new AsyncWaitStep.Listener() {
|
||||
|
||||
@Override
|
||||
public void onResponse(boolean conditionMet) {
|
||||
if (conditionMet) {
|
||||
moveToStep(index, policy, new StepKey(currentStep.getPhase(), currentStep.getAction(), currentStep.getName()),
|
||||
currentStep.getNextStepKey(), Cause.CALLBACK);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
throw new RuntimeException(e); // NORELEASE implement error handling
|
||||
}
|
||||
|
||||
});
|
||||
}
|
||||
} else if (currentStep instanceof AsyncActionStep) {
|
||||
if (cause != Cause.CLUSTER_STATE_CHANGE) {
|
||||
((AsyncActionStep) currentStep).performAction(index, new AsyncActionStep.Listener() {
|
||||
|
||||
@Override
|
||||
public void onResponse(boolean complete) {
|
||||
if (complete) {
|
||||
moveToStep(index, policy, new StepKey(currentStep.getPhase(), currentStep.getAction(), currentStep.getName()),
|
||||
currentStep.getNextStepKey(), Cause.CALLBACK);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
throw new RuntimeException(e); // NORELEASE implement error handling
|
||||
}
|
||||
});
|
||||
}
|
||||
} else {
|
||||
throw new IllegalStateException(
|
||||
"Step with key [" + currentStep.getName() + "] is not a recognised type: [" + currentStep.getClass().getName() + "]");
|
||||
}
|
||||
}
|
||||
|
||||
private void runPolicy(Index index, ClusterState clusterState, Cause cause) {
|
||||
IndexMetaData indexMetaData = clusterState.getMetaData().index(index);
|
||||
Settings indexSettings = indexMetaData.getSettings();
|
||||
String policy = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexSettings);
|
||||
runPolicy(policy, index, indexSettings, cause);
|
||||
}
|
||||
|
||||
private void executeClusterStateSteps(Index index, String policy, Step step) {
|
||||
assert step instanceof ClusterStateActionStep || step instanceof ClusterStateWaitStep;
|
||||
clusterService.submitStateUpdateTask("ILM", new ClusterStateUpdateTask() {
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
Step currentStep = step;
|
||||
if (currentStep.equals(getCurrentStep(policy, currentState.getMetaData().index(index).getSettings()))) {
|
||||
// We can do cluster state steps all together until we
|
||||
// either get to a step that isn't a cluster state step or a
|
||||
// cluster state wait step returns not completed
|
||||
while (currentStep instanceof ClusterStateActionStep || currentStep instanceof ClusterStateWaitStep) {
|
||||
if (currentStep instanceof ClusterStateActionStep) {
|
||||
// cluster state action step so do the action and
|
||||
// move
|
||||
// the cluster state to the next step
|
||||
currentState = ((ClusterStateActionStep) currentStep).performAction(index, currentState);
|
||||
currentState = moveClusterStateToNextStep(index, currentState, currentStep.getNextStepKey());
|
||||
} else {
|
||||
// cluster state wait step so evaluate the
|
||||
// condition, if the condition is met move to the
|
||||
// next step, if its not met return the current
|
||||
// cluster state so it can be applied and we will
|
||||
// wait for the next trigger to evaluate the
|
||||
// condition again
|
||||
boolean complete = ((ClusterStateWaitStep) currentStep).isConditionMet(index, currentState);
|
||||
if (complete) {
|
||||
currentState = moveClusterStateToNextStep(index, currentState, currentStep.getNextStepKey());
|
||||
} else {
|
||||
return currentState;
|
||||
}
|
||||
}
|
||||
currentStep = stepRegistry.getStep(policy, currentStep.getNextStepKey());
|
||||
}
|
||||
return currentState;
|
||||
} else {
|
||||
// either we are no longer the master or the step is now
|
||||
// not the same as when we submitted the update task. In
|
||||
// either case we don't want to do anything now
|
||||
return currentState;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
throw new RuntimeException(e); // NORELEASE implement error handling
|
||||
}
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
private StepKey getCurrentStepKey(Settings indexSettings) {
|
||||
String currentPhase = LifecycleSettings.LIFECYCLE_PHASE_SETTING.get(indexSettings);
|
||||
String currentAction = LifecycleSettings.LIFECYCLE_ACTION_SETTING.get(indexSettings);
|
||||
String currentStep = LifecycleSettings.LIFECYCLE_STEP_SETTING.get(indexSettings);
|
||||
if (currentStep == null) {
|
||||
assert currentPhase == null : "Current phase is not null: " + currentPhase;
|
||||
assert currentAction == null : "Current action is not null: " + currentAction;
|
||||
return null;
|
||||
} else {
|
||||
assert currentPhase != null;
|
||||
assert currentAction != null;
|
||||
return new StepKey(currentPhase, currentAction, currentStep);
|
||||
}
|
||||
}
|
||||
|
||||
private Step getCurrentStep(String policy, Settings indexSettings) {
|
||||
StepKey currentStepKey = getCurrentStepKey(indexSettings);
|
||||
if (currentStepKey == null) {
|
||||
return stepRegistry.getFirstStep(policy);
|
||||
} else {
|
||||
return stepRegistry.getStep(policy, currentStepKey);
|
||||
}
|
||||
}
|
||||
|
||||
private ClusterState moveClusterStateToNextStep(Index index, ClusterState clusterState, StepKey nextStep) {
|
||||
ClusterState.Builder newClusterStateBuilder = ClusterState.builder(clusterState);
|
||||
IndexMetaData idxMeta = clusterState.getMetaData().index(index);
|
||||
Builder indexSettings = Settings.builder().put(idxMeta.getSettings()).put(LifecycleSettings.LIFECYCLE_PHASE, nextStep.getPhase())
|
||||
.put(LifecycleSettings.LIFECYCLE_ACTION, nextStep.getAction()).put(LifecycleSettings.LIFECYCLE_STEP, nextStep.getName());
|
||||
if (indexSettings.keys().contains(LifecycleSettings.LIFECYCLE_INDEX_CREATION_DATE) == false) {
|
||||
indexSettings.put(LifecycleSettings.LIFECYCLE_INDEX_CREATION_DATE_SETTING.getKey(), idxMeta.getCreationDate());
|
||||
}
|
||||
newClusterStateBuilder.metaData(MetaData.builder(clusterState.getMetaData()).put(IndexMetaData
|
||||
.builder(clusterState.getMetaData().index(index))
|
||||
.settings(indexSettings)));
|
||||
return newClusterStateBuilder.build();
|
||||
}
|
||||
|
||||
private void moveToStep(Index index, String policy, StepKey currentStepKey, StepKey nextStepKey, Cause cause) {
|
||||
clusterService.submitStateUpdateTask("ILM", new ClusterStateUpdateTask() {
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
Settings indexSettings = currentState.getMetaData().index(index).getSettings();
|
||||
if (policy.equals(LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexSettings))
|
||||
&& currentStepKey.equals(getCurrentStepKey(indexSettings))) {
|
||||
return moveClusterStateToNextStep(index, currentState, nextStepKey);
|
||||
} else {
|
||||
// either the policy has changed or the step is now
|
||||
// not the same as when we submitted the update task. In
|
||||
// either case we don't want to do anything now
|
||||
return currentState;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
// if the new cluster state is different from the old one then
|
||||
// we moved to the new step in the execute method so we should
|
||||
// execute the next step
|
||||
if (oldState != newState) {
|
||||
runPolicy(index, newState, cause);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
throw new RuntimeException(e); // NORELEASE implement error handling
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public static enum Cause {
|
||||
CLUSTER_STATE_CHANGE, SCHEDULE_TRIGGER, CALLBACK;
|
||||
}
|
||||
}
|
|
@ -12,7 +12,6 @@ import org.elasticsearch.cluster.ClusterChangedEvent;
|
|||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Strings;
|
||||
|
@ -20,13 +19,11 @@ import org.elasticsearch.common.component.AbstractComponent;
|
|||
import org.elasticsearch.common.logging.ESLoggerFactory;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.Step;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.StepResult;
|
||||
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
|
||||
import org.elasticsearch.xpack.indexlifecycle.IndexLifecycleRunner.Cause;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.time.Clock;
|
||||
|
@ -48,6 +45,7 @@ public class IndexLifecycleService extends AbstractComponent
|
|||
private ThreadPool threadPool;
|
||||
private LongSupplier nowSupplier;
|
||||
private SchedulerEngine.Job scheduledJob;
|
||||
private IndexLifecycleRunner lifecycleRunner;
|
||||
|
||||
public IndexLifecycleService(Settings settings, Client client, ClusterService clusterService, Clock clock,
|
||||
ThreadPool threadPool, LongSupplier nowSupplier) {
|
||||
|
@ -59,6 +57,7 @@ public class IndexLifecycleService extends AbstractComponent
|
|||
this.nowSupplier = nowSupplier;
|
||||
this.scheduledJob = null;
|
||||
this.policyRegistry = new PolicyStepsRegistry();
|
||||
this.lifecycleRunner = new IndexLifecycleRunner(policyRegistry, clusterService);
|
||||
clusterService.addListener(this);
|
||||
}
|
||||
|
||||
|
@ -71,7 +70,6 @@ public class IndexLifecycleService extends AbstractComponent
|
|||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public void clusterChanged(ClusterChangedEvent event) {
|
||||
if (event.localNodeMaster()) { // only act if we are master, otherwise keep idle until elected
|
||||
IndexLifecycleMetadata lifecycleMetadata = event.state().metaData().custom(IndexLifecycleMetadata.TYPE);
|
||||
|
@ -99,6 +97,8 @@ public class IndexLifecycleService extends AbstractComponent
|
|||
} else if (pollIntervalSettingChanged) { // all engines are running, just need to update with latest interval
|
||||
scheduleJob(pollInterval);
|
||||
}
|
||||
|
||||
triggerPolicies(event.state(), Cause.CLUSTER_STATE_CHANGE);
|
||||
} else {
|
||||
cancelJob();
|
||||
}
|
||||
|
@ -116,67 +116,11 @@ public class IndexLifecycleService extends AbstractComponent
|
|||
scheduler.get().add(scheduledJob);
|
||||
}
|
||||
|
||||
public void triggerPolicies() {
|
||||
// loop through all indices in cluster state and filter for ones that are
|
||||
// managed by the Index Lifecycle Service they have a index.lifecycle.name setting
|
||||
// associated to a policy
|
||||
ClusterState clusterState = clusterService.state();
|
||||
clusterState.metaData().indices().valuesIt().forEachRemaining((idxMeta) -> {
|
||||
String policyName = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(idxMeta.getSettings());
|
||||
if (Strings.isNullOrEmpty(policyName) == false) {
|
||||
clusterService.submitStateUpdateTask("index-lifecycle-" + policyName, new ClusterStateUpdateTask() {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
// ensure date is set
|
||||
currentState = putLifecycleDate(currentState, idxMeta);
|
||||
long lifecycleDate = currentState.metaData().settings()
|
||||
.getAsLong(LifecycleSettings.LIFECYCLE_INDEX_CREATION_DATE, -1L);
|
||||
// get current phase, action, step
|
||||
String phase = currentState.metaData().settings().get(LifecycleSettings.LIFECYCLE_PHASE);
|
||||
String action = currentState.metaData().settings().get(LifecycleSettings.LIFECYCLE_ACTION);
|
||||
String stepName = currentState.metaData().settings().get(LifecycleSettings.LIFECYCLE_STEP);
|
||||
// returns current step to execute. If settings are null, then the first step to be executed in
|
||||
// this policy is returned.
|
||||
Step currentStep = policyRegistry.getStep(policyName, new Step.StepKey(phase, action, stepName));
|
||||
return executeStepUntilAsync(policyName, currentStep, clusterState, client, nowSupplier, idxMeta.getIndex());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* executes the given step, and then all proceeding steps, until it is necessary to exit the
|
||||
* cluster-state thread and let any wait-condition or asynchronous action progress externally
|
||||
*
|
||||
* TODO(colin): should steps execute themselves and execute `nextStep` internally?
|
||||
*
|
||||
* @param startStep The current step that has either not been executed, or not completed before
|
||||
* @return the new ClusterState
|
||||
*/
|
||||
private ClusterState executeStepUntilAsync(String policyName, Step startStep, ClusterState currentState, Client client, LongSupplier nowSupplier, Index index) {
|
||||
StepResult result = startStep.execute(clusterService, currentState, index, client, nowSupplier);
|
||||
while (result.isComplete() && result.indexSurvived() && startStep.hasNextStep()) {
|
||||
currentState = result.getClusterState();
|
||||
startStep = policyRegistry.getStep(policyName, startStep.getNextStepKey());
|
||||
result = startStep.execute(clusterService, currentState, index, client, nowSupplier);
|
||||
}
|
||||
if (result.isComplete()) {
|
||||
currentState = result.getClusterState();
|
||||
}
|
||||
return currentState;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void triggered(SchedulerEngine.Event event) {
|
||||
if (event.getJobName().equals(IndexLifecycle.NAME)) {
|
||||
logger.info("Job triggered: " + event.getJobName() + ", " + event.getScheduledTime() + ", " + event.getTriggeredTime());
|
||||
triggerPolicies();
|
||||
triggerPolicies(clusterService.state(), Cause.SCHEDULE_TRIGGER);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -198,18 +142,17 @@ public class IndexLifecycleService extends AbstractComponent
|
|||
}
|
||||
}));
|
||||
}
|
||||
|
||||
private ClusterState putLifecycleDate(ClusterState clusterState, IndexMetaData idxMeta) {
|
||||
if (idxMeta.getSettings().hasValue(LifecycleSettings.LIFECYCLE_INDEX_CREATION_DATE)) {
|
||||
return clusterState;
|
||||
} else {
|
||||
ClusterState.Builder builder = new ClusterState.Builder(clusterState);
|
||||
MetaData.Builder metadataBuilder = MetaData.builder(clusterState.metaData());
|
||||
Settings settings = Settings.builder()
|
||||
.put(LifecycleSettings.LIFECYCLE_INDEX_CREATION_DATE_SETTING.getKey(), idxMeta.getCreationDate()).build();
|
||||
metadataBuilder.updateSettings(settings, idxMeta.getIndex().getName());
|
||||
return builder.metaData(metadataBuilder.build()).build();
|
||||
}
|
||||
|
||||
public void triggerPolicies(ClusterState clusterState, Cause cause) {
|
||||
// loop through all indices in cluster state and filter for ones that are
|
||||
// managed by the Index Lifecycle Service they have a index.lifecycle.name setting
|
||||
// associated to a policy
|
||||
clusterState.metaData().indices().valuesIt().forEachRemaining((idxMeta) -> {
|
||||
String policyName = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(idxMeta.getSettings());
|
||||
if (Strings.isNullOrEmpty(policyName) == false) {
|
||||
lifecycleRunner.runPolicy(policyName, idxMeta.getIndex(), idxMeta.getSettings(), cause);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -78,6 +78,9 @@ public class PolicyStepsRegistry {
|
|||
return step;
|
||||
}
|
||||
|
||||
public Step getFirstStep(String policy) {
|
||||
return firstStepMap.get(policy);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
|
|
|
@ -5,35 +5,22 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterModule;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.Writeable.Reader;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.test.AbstractSerializingTestCase;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateUpdateStep;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.ConditionalWaitStep;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.DeleteAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleType;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.MockAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.Phase;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.PhaseAfterStep;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.Step;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.StepResult;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.TimeseriesLifecycleType;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.TestLifecycleType;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.TimeseriesLifecycleType;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -43,9 +30,6 @@ import java.util.HashMap;
|
|||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.LongSupplier;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecyclePolicy> {
|
||||
|
||||
|
|
Loading…
Reference in New Issue