moar refactor for steps
This commit is contained in:
parent
d63cd8c9c3
commit
57821cd55a
|
@ -23,36 +23,13 @@ public class ClientStep<RequestBuilder extends ActionRequestBuilder, Response ex
|
|||
private Exception returnedException;
|
||||
private boolean returnedSuccess;
|
||||
|
||||
public ClientStep(String name, String action, String phase, String index, RequestBuilder requestBuilder,
|
||||
public ClientStep(String name, String action, String phase, String index, Step nextStep, RequestBuilder requestBuilder,
|
||||
Function<ClusterState, Boolean> checkComplete, Function<Response, Boolean> checkSuccess) {
|
||||
super(name, action, phase, index);
|
||||
super(name, action, phase, nextStep);
|
||||
this.requestBuilder = requestBuilder;
|
||||
this.checkComplete = checkComplete;
|
||||
this.checkSuccess = checkSuccess;
|
||||
this.returnedException = null;
|
||||
this.returnedSuccess = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public StepResult execute(ClusterState currentState) {
|
||||
if (checkComplete.apply(currentState)) {
|
||||
return new StepResult("client-complete", null, currentState, true, true);
|
||||
} else {
|
||||
requestBuilder.execute(new ActionListener<Response>() {
|
||||
@Override
|
||||
public void onResponse(Response r) {
|
||||
if (checkSuccess.apply(r)) {
|
||||
returnedSuccess = true;
|
||||
}
|
||||
// IndexLifecycleService.triggerPolicies()
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
returnedException = e;
|
||||
}
|
||||
});
|
||||
return new StepResult("client-in-progress", null, currentState, true, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,32 +5,29 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.Step;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.StepResult;
|
||||
|
||||
import java.util.function.Function;
|
||||
import java.util.function.LongSupplier;
|
||||
|
||||
public class ClusterStateUpdateStep extends Step {
|
||||
private final String name;
|
||||
private final String index;
|
||||
private final String phase;
|
||||
private final String action;
|
||||
private final Function<ClusterState, ClusterState> updateTask;
|
||||
|
||||
public ClusterStateUpdateStep(String name, String index, String phase, String action, Function<ClusterState, ClusterState> updateTask) {
|
||||
super(name, action, phase, index);
|
||||
this.name = name;
|
||||
this.index = index;
|
||||
this.phase = phase;
|
||||
this.action = action;
|
||||
public ClusterStateUpdateStep(String name, String index, String phase, String action, Step nextStep, Function<ClusterState, ClusterState> updateTask) {
|
||||
super(name, action, phase, nextStep);
|
||||
this.updateTask = updateTask;
|
||||
}
|
||||
|
||||
public StepResult execute(ClusterState clusterState) {
|
||||
public StepResult execute(ClusterService clusterService, ClusterState currentState, Index index, Client client, LongSupplier nowSupplier) {
|
||||
ClusterState updated = null;
|
||||
try {
|
||||
updated = updateTask.apply(clusterState);
|
||||
updated = updateTask.apply(currentState);
|
||||
updated = updateStateWithNextStep(updated, nowSupplier, index);
|
||||
return new StepResult("done!", null, updated, true, true);
|
||||
} catch (Exception e) {
|
||||
return new StepResult("something went wrong", e, updated, true, true);
|
||||
|
|
|
@ -144,49 +144,9 @@ public class LifecyclePolicy extends AbstractDiffable<LifecyclePolicy>
|
|||
return builder;
|
||||
}
|
||||
|
||||
public StepResult execute(List<Step> steps, ClusterState currentState, IndexMetaData indexMetaData, Client client, LongSupplier nowSupplier) {
|
||||
StepResult lastStepResult = null;
|
||||
ClusterState updatedState = currentState;
|
||||
for (int i = getNextStepIdx(steps, indexMetaData); i < steps.size(); i++) {
|
||||
lastStepResult = steps.get(i).execute(updatedState);
|
||||
if (lastStepResult.isComplete() && lastStepResult.indexSurvived()) {
|
||||
if (i < steps.size() - 1) {
|
||||
Step nextStep = steps.get(i + 1);
|
||||
long now = nowSupplier.getAsLong();
|
||||
// fetch details about next step to run and update the cluster state with this information
|
||||
Settings newLifecyclePhaseSettings = Settings.builder()
|
||||
.put(LifecycleSettings.LIFECYCLE_PHASE, nextStep.getPhase())
|
||||
.put(LifecycleSettings.LIFECYCLE_PHASE_TIME, now)
|
||||
.put(LifecycleSettings.LIFECYCLE_ACTION_TIME, now)
|
||||
.put(LifecycleSettings.LIFECYCLE_ACTION, nextStep.getAction())
|
||||
.put(LifecycleSettings.LIFECYCLE_STEP_TIME, now)
|
||||
.put(LifecycleSettings.LIFECYCLE_STEP, nextStep.getName())
|
||||
.build();
|
||||
updatedState = ClusterState.builder(lastStepResult.getClusterState())
|
||||
.metaData(MetaData.builder(lastStepResult.getClusterState().metaData())
|
||||
.updateSettings(newLifecyclePhaseSettings)).build();
|
||||
lastStepResult = new StepResult(lastStepResult, updatedState);
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return lastStepResult;
|
||||
}
|
||||
|
||||
private int getNextStepIdx(List<Step> steps, IndexMetaData indexMetaData) {
|
||||
String step = indexMetaData.getSettings().get(LifecycleSettings.LIFECYCLE_STEP);
|
||||
if (step == null) {
|
||||
return 0;
|
||||
}
|
||||
for (int i = 0; i < steps.size(); i++) {
|
||||
if (steps.get(i).getName().equals(step)) {
|
||||
return i;
|
||||
}
|
||||
}
|
||||
|
||||
return steps.size();
|
||||
public List<Step> toSteps() {
|
||||
// TODO(talevy): make real with types
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -229,7 +189,5 @@ public class LifecyclePolicy extends AbstractDiffable<LifecyclePolicy>
|
|||
* @return the action following {@code current} to execute
|
||||
*/
|
||||
LifecycleAction next(LifecycleAction current);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -138,8 +138,8 @@ public class Phase implements ToXContentObject, Writeable {
|
|||
|
||||
public List<Step> toSteps(Index index, long indexLifecycleCreationDate, Client client, ThreadPool threadPool, LongSupplier nowSupplier) {
|
||||
// TODO(talevy) phase needs to know indexLifecycleStartTime
|
||||
PhaseAfterStep phaseAfterStep = new PhaseAfterStep(threadPool, indexLifecycleCreationDate, nowSupplier, after,
|
||||
"phase_start", index.getName(), getName(), null);
|
||||
PhaseAfterStep phaseAfterStep = new PhaseAfterStep(
|
||||
"phase_start", index.getName(), getName(),after, null);
|
||||
return Stream.concat(Stream.of(phaseAfterStep), actions.values().stream()
|
||||
.flatMap(a -> a.toSteps(name, index, client, threadPool, nowSupplier).stream())).collect(Collectors.toList());
|
||||
}
|
||||
|
|
|
@ -5,45 +5,13 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.Step;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.StepResult;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.function.BooleanSupplier;
|
||||
import java.util.function.LongSupplier;
|
||||
|
||||
public class PhaseAfterStep extends Step {
|
||||
private final CompletableFuture<StepResult> timeUp;
|
||||
private final ThreadPool threadPool;
|
||||
private final long indexCreationDate;
|
||||
private final LongSupplier nowSupplier;
|
||||
private final TimeValue after;
|
||||
|
||||
public PhaseAfterStep(ThreadPool threadPool, long indexCreationDate, LongSupplier nowSupplier, TimeValue after, String name,
|
||||
String index, String phase, String action) {
|
||||
super(name, action, phase, index);
|
||||
this.threadPool = threadPool;
|
||||
this.indexCreationDate = indexCreationDate;
|
||||
this.nowSupplier = nowSupplier;
|
||||
this.timeUp = new CompletableFuture<>();
|
||||
public PhaseAfterStep(String phase, String action, String name, TimeValue after, Step nextStep) {
|
||||
super(name, action, phase, nextStep);
|
||||
this.after = after;
|
||||
}
|
||||
|
||||
public StepResult execute(ClusterState currentState) {
|
||||
LongSupplier elapsed = () -> nowSupplier.getAsLong() - indexCreationDate;
|
||||
BooleanSupplier isReady = () -> after.getSeconds() <= elapsed.getAsLong();
|
||||
if (isReady.getAsBoolean()) {
|
||||
return new StepResult("phase starting", null, currentState, true, true);
|
||||
} else {
|
||||
threadPool.schedule(TimeValue.timeValueSeconds(elapsed.getAsLong()), ThreadPool.Names.GENERIC, () -> {
|
||||
if (after.getSeconds() <= elapsed.getAsLong()) {
|
||||
// IndexLifecycleService.triggerPolicies()
|
||||
}
|
||||
});
|
||||
return new StepResult("phase-check-rescheduled", null, currentState, true, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -153,7 +153,7 @@ public class ShrinkAction implements LifecycleAction {
|
|||
currentState -> {
|
||||
// check that shrunken index was already created, if so, no need to both client
|
||||
IndexMetaData shrunkMetaData = currentState.metaData().index(shrunkenIndexName);
|
||||
boolean isSuccessful = shrunkMetaData != null && shrunkenIndexName.equals(IndexMetaData.INDEX_SHRINK_SOURCE_NAME
|
||||
return shrunkMetaData != null && shrunkenIndexName.equals(IndexMetaData.INDEX_SHRINK_SOURCE_NAME
|
||||
.get(shrunkMetaData.getSettings()));
|
||||
|
||||
}, ResizeResponse::isAcknowledged);
|
||||
|
|
|
@ -5,22 +5,30 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.Index;
|
||||
|
||||
import java.util.function.LongSupplier;
|
||||
|
||||
/**
|
||||
* A {@link LifecycleAction} which deletes the index.
|
||||
*/
|
||||
public abstract class Step {
|
||||
public class Step {
|
||||
private final String name;
|
||||
private final String action;
|
||||
private final String phase;
|
||||
private final String index;
|
||||
private final Step nextStep;
|
||||
|
||||
public Step(String name, String action, String phase, String index) {
|
||||
public Step(String name, String action, String phase, Step nextStep) {
|
||||
this.name = name;
|
||||
this.action = action;
|
||||
this.phase = phase;
|
||||
this.index = index;
|
||||
this.nextStep = nextStep;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
|
@ -35,9 +43,68 @@ public abstract class Step {
|
|||
return phase;
|
||||
}
|
||||
|
||||
public String getIndex() {
|
||||
return index;
|
||||
public Step getNextStep() {
|
||||
return nextStep;
|
||||
}
|
||||
|
||||
public abstract StepResult execute(ClusterState currentState);
|
||||
public boolean hasNextStep() {
|
||||
return nextStep != null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes this step and updates the cluster state with the next step to run
|
||||
*
|
||||
* @param currentState
|
||||
* @param client
|
||||
* @param nowSupplier
|
||||
* @return
|
||||
*/
|
||||
public StepResult execute(ClusterService clusterService, ClusterState currentState, Index index, Client client, LongSupplier nowSupplier) {
|
||||
// Example: Delete
|
||||
//
|
||||
// client.admin().indices().prepareDelete(index.getName()).execute(new ActionListener<DeleteIndexResponse>() {
|
||||
// @Override
|
||||
// public void onResponse(DeleteIndexResponse deleteIndexResponse) {
|
||||
// if (deleteIndexResponse.isAcknowledged()) {
|
||||
// submitUpdateNextStepTask(clusterService, nowSupplier, index);
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// @Override
|
||||
// public void onFailure(Exception e) {
|
||||
//
|
||||
// }
|
||||
// });
|
||||
throw new UnsupportedOperationException("implement me");
|
||||
}
|
||||
|
||||
protected void submitUpdateNextStepTask(ClusterService clusterService, LongSupplier nowSupplier, Index index) {
|
||||
clusterService.submitStateUpdateTask("update-next-step", new ClusterStateUpdateTask() {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
return updateStateWithNextStep(currentState, nowSupplier, index);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected ClusterState updateStateWithNextStep(ClusterState currentState, LongSupplier nowSupplier, Index index) {
|
||||
long now = nowSupplier.getAsLong();
|
||||
// fetch details about next step to run and update the cluster state with this information
|
||||
Settings newLifecyclePhaseSettings = Settings.builder()
|
||||
.put(LifecycleSettings.LIFECYCLE_PHASE, nextStep.getPhase())
|
||||
.put(LifecycleSettings.LIFECYCLE_PHASE_TIME, now)
|
||||
.put(LifecycleSettings.LIFECYCLE_ACTION_TIME, now)
|
||||
.put(LifecycleSettings.LIFECYCLE_ACTION, nextStep.getAction())
|
||||
.put(LifecycleSettings.LIFECYCLE_STEP_TIME, now)
|
||||
.put(LifecycleSettings.LIFECYCLE_STEP, nextStep.getName())
|
||||
.build();
|
||||
return ClusterState.builder(currentState)
|
||||
.metaData(MetaData.builder(currentState.metaData())
|
||||
.updateSettings(newLifecyclePhaseSettings, index.getName())).build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@ import org.elasticsearch.cluster.ClusterChangedEvent;
|
|||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.DiffableUtils;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
|
@ -24,6 +25,7 @@ import org.elasticsearch.common.component.AbstractComponent;
|
|||
import org.elasticsearch.common.logging.ESLoggerFactory;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
|
||||
|
@ -34,7 +36,10 @@ import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
|
|||
import java.io.Closeable;
|
||||
import java.time.Clock;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.SortedMap;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.function.LongSupplier;
|
||||
|
@ -49,6 +54,7 @@ public class IndexLifecycleService extends AbstractComponent
|
|||
|
||||
private final SetOnce<SchedulerEngine> scheduler = new SetOnce<>();
|
||||
private final Clock clock;
|
||||
private final PolicyStepsRegistry policyRegistry;
|
||||
private Client client;
|
||||
private ClusterService clusterService;
|
||||
private ThreadPool threadPool;
|
||||
|
@ -64,6 +70,7 @@ public class IndexLifecycleService extends AbstractComponent
|
|||
this.threadPool = threadPool;
|
||||
this.nowSupplier = nowSupplier;
|
||||
this.scheduledJob = null;
|
||||
this.policyRegistry = new PolicyStepsRegistry();
|
||||
clusterService.addListener(this);
|
||||
}
|
||||
|
||||
|
@ -76,10 +83,10 @@ public class IndexLifecycleService extends AbstractComponent
|
|||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public void clusterChanged(ClusterChangedEvent event) {
|
||||
if (event.localNodeMaster()) { // only act if we are master, otherwise keep idle until elected
|
||||
IndexLifecycleMetadata lifecycleMetadata = event.state().metaData().custom(IndexLifecycleMetadata.TYPE);
|
||||
|
||||
TimeValue pollInterval = LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING
|
||||
.get(event.state().getMetaData().settings());
|
||||
TimeValue previousPollInterval = LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING
|
||||
|
@ -87,6 +94,11 @@ public class IndexLifecycleService extends AbstractComponent
|
|||
|
||||
boolean pollIntervalSettingChanged = !pollInterval.equals(previousPollInterval);
|
||||
|
||||
if (lifecycleMetadata != null) {
|
||||
// update policy steps registry
|
||||
policyRegistry.update(event.state());
|
||||
}
|
||||
|
||||
if (lifecycleMetadata == null) { // no lifecycle metadata, install initial empty metadata state
|
||||
lifecycleMetadata = new IndexLifecycleMetadata(Collections.emptySortedMap());
|
||||
installMetadata(lifecycleMetadata);
|
||||
|
@ -116,9 +128,7 @@ public class IndexLifecycleService extends AbstractComponent
|
|||
scheduler.get().add(scheduledJob);
|
||||
}
|
||||
|
||||
public synchronized void triggerPolicies() {
|
||||
IndexLifecycleMetadata indexLifecycleMetadata = clusterService.state().metaData().custom(IndexLifecycleMetadata.TYPE);
|
||||
SortedMap<String, LifecyclePolicy> policies = indexLifecycleMetadata.getPolicies();
|
||||
public void triggerPolicies() {
|
||||
// loop through all indices in cluster state and filter for ones that are
|
||||
// managed by the Index Lifecycle Service they have a index.lifecycle.name setting
|
||||
// associated to a policy
|
||||
|
@ -126,10 +136,6 @@ public class IndexLifecycleService extends AbstractComponent
|
|||
clusterState.metaData().indices().valuesIt().forEachRemaining((idxMeta) -> {
|
||||
String policyName = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(idxMeta.getSettings());
|
||||
if (Strings.isNullOrEmpty(policyName) == false) {
|
||||
LifecyclePolicy policy = policies.get(policyName);
|
||||
// fetch step
|
||||
// check whether complete
|
||||
// if complete, launch next task
|
||||
clusterService.submitStateUpdateTask("index-lifecycle-" + policyName, new ClusterStateUpdateTask() {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
|
@ -137,12 +143,15 @@ public class IndexLifecycleService extends AbstractComponent
|
|||
currentState = putLifecycleDate(currentState, idxMeta);
|
||||
long lifecycleDate = currentState.metaData().settings()
|
||||
.getAsLong(LifecycleSettings.LIFECYCLE_INDEX_CREATION_DATE, -1L);
|
||||
// TODO(talevy): make real
|
||||
List<Step> steps = policy.getPhases().values().stream()
|
||||
.flatMap(p -> p.toSteps(idxMeta.getIndex(), lifecycleDate, client, threadPool, nowSupplier).stream())
|
||||
.collect(Collectors.toList());
|
||||
StepResult result = policy.execute(steps, currentState, idxMeta, client, nowSupplier);
|
||||
return result.getClusterState();
|
||||
// get current phase, action, step
|
||||
String phase = currentState.metaData().settings().get(LifecycleSettings.LIFECYCLE_PHASE);
|
||||
String action = currentState.metaData().settings().get(LifecycleSettings.LIFECYCLE_ACTION);
|
||||
String stepName = currentState.metaData().settings().get(LifecycleSettings.LIFECYCLE_STEP);
|
||||
// returns current step to execute. If settings are null, then the first step to be executed in
|
||||
// this policy is returned.
|
||||
Step currentStep = policyRegistry.getStep(policyName, phase, action, stepName);
|
||||
ClusterState newClusterState = executeStepUntilAsync(currentStep, clusterState, client, nowSupplier, idxMeta.getIndex());
|
||||
return currentState;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -154,6 +163,28 @@ public class IndexLifecycleService extends AbstractComponent
|
|||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* executes the given step, and then all proceeding steps, until it is necessary to exit the
|
||||
* cluster-state thread and let any wait-condition or asynchronous action progress externally
|
||||
*
|
||||
* TODO(colin): should steps execute themselves and execute `nextStep` internally?
|
||||
*
|
||||
* @param startStep The current step that has either not been executed, or not completed before
|
||||
* @return the new ClusterState
|
||||
*/
|
||||
private ClusterState executeStepUntilAsync(Step startStep, ClusterState currentState, Client client, LongSupplier nowSupplier, Index index) {
|
||||
StepResult result = startStep.execute(clusterService, currentState, index, client, nowSupplier);
|
||||
while (result.isComplete() && result.indexSurvived() && startStep.hasNextStep()) {
|
||||
currentState = result.getClusterState();
|
||||
startStep = startStep.getNextStep();
|
||||
result = startStep.execute(clusterService, currentState, index, client, nowSupplier);
|
||||
}
|
||||
if (result.isComplete()) {
|
||||
currentState = result.getClusterState();
|
||||
}
|
||||
return currentState;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void triggered(SchedulerEngine.Event event) {
|
||||
if (event.getJobName().equals(IndexLifecycle.NAME)) {
|
||||
|
|
|
@ -0,0 +1,149 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.Diff;
|
||||
import org.elasticsearch.cluster.DiffableUtils;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.Step;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
|
||||
public class PolicyStepsRegistry {
|
||||
|
||||
|
||||
// keeps track of existing policies in the cluster state
|
||||
SortedMap<String, LifecyclePolicy> lifecyclePolicyMap;
|
||||
// keeps track of what the first step in a policy is
|
||||
Map<String, Step> firstStepMap;
|
||||
// keeps track of a mapping from step-name to respective Step
|
||||
Map<StepKey, Step> stepMap;
|
||||
|
||||
public PolicyStepsRegistry() {
|
||||
this.lifecyclePolicyMap = new TreeMap<>();
|
||||
this.firstStepMap = new HashMap<>();
|
||||
this.stepMap = new HashMap<>();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void update(ClusterState currentState) {
|
||||
IndexLifecycleMetadata meta = currentState.metaData().custom(IndexLifecycleMetadata.TYPE);
|
||||
Diff<Map<String, LifecyclePolicy>> diff = DiffableUtils.diff(lifecyclePolicyMap, meta.getPolicies(), DiffableUtils.getStringKeySerializer());
|
||||
DiffableUtils.MapDiff<String, LifecyclePolicy, DiffableUtils.KeySerializer<String>> mapDiff = (DiffableUtils.MapDiff) diff;
|
||||
if (mapDiff.getUpserts().isEmpty() == false) {
|
||||
for (LifecyclePolicy policy : mapDiff.getUpserts().values()) {
|
||||
lifecyclePolicyMap.put(policy.getName(), policy);
|
||||
List<Step> policyAsSteps = policy.toSteps();
|
||||
if (policyAsSteps.isEmpty() == false) {
|
||||
firstStepMap.put(policy.getName(), policyAsSteps.get(0));
|
||||
for (Step step : policyAsSteps) {
|
||||
stepMap.put(new StepKey(step.getPhase(), step.getAction(), step.getName()), step);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (String deletedPolicyName : mapDiff.getDeletes()) {
|
||||
LifecyclePolicy policy = lifecyclePolicyMap.remove(deletedPolicyName);
|
||||
Step next = firstStepMap.remove(deletedPolicyName);
|
||||
while (next.hasNextStep()) {
|
||||
next = stepMap.remove(next.getNextStep());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public Lifecycle
|
||||
|
||||
/**
|
||||
* returns the {@link Step} that matches the name and
|
||||
* policy specified. This is used by {@link ClusterState}
|
||||
* readers that know the current policy and step by name
|
||||
* as String values in the cluster state.
|
||||
* @param policy the policy from which to fetch the associated steps from
|
||||
* @param phase the phase the requested step is run in
|
||||
* @param action the action the requested step is run in
|
||||
* @param name the name of the requested step
|
||||
* @return
|
||||
*/
|
||||
public Step getStep(String policy, @Nullable String phase, @Nullable String action, @Nullable String name) {
|
||||
Step step = stepMap.get(new StepKey(phase, action, name));
|
||||
if (step == null) {
|
||||
step = firstStepMap.get(policy);
|
||||
}
|
||||
return step;
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(lifecyclePolicyMap, firstStepMap, stepMap);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
PolicyStepsRegistry other = (PolicyStepsRegistry) obj;
|
||||
return Objects.equals(lifecyclePolicyMap, other.lifecyclePolicyMap)
|
||||
&& Objects.equals(firstStepMap, other.firstStepMap) && Objects.equals(stepMap, other.stepMap);
|
||||
}
|
||||
|
||||
public class StepKey {
|
||||
private final String phase;
|
||||
|
||||
private final String action;
|
||||
private final String name;
|
||||
|
||||
public StepKey(String phase, String action, String name) {
|
||||
this.phase = phase;
|
||||
this.action = action;
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public String getPhase() {
|
||||
return phase;
|
||||
}
|
||||
|
||||
public String getAction() {
|
||||
return action;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(phase, action, name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
StepKey other = (StepKey) obj;
|
||||
return Objects.equals(phase, other.phase) && Objects.equals(action, other.action) && Objects.equals(name, other.name);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -127,35 +127,35 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
|
|||
}
|
||||
|
||||
public void testSteps() throws Exception {
|
||||
ThreadPool threadPool = new TestThreadPool("test");
|
||||
LongSupplier nowSupplier = () -> 0L;
|
||||
Client client = mock(Client.class);
|
||||
Step phaseAfterStep = new PhaseAfterStep(threadPool, 0L, nowSupplier,
|
||||
TimeValue.timeValueSeconds(0L), "name-0", "index", "phase", "mock_action");
|
||||
Step updateStep = new ClusterStateUpdateStep("name-1", "index", "phase", "mock_action",
|
||||
(state) -> state);
|
||||
Step waitStep = new ConditionalWaitStep("name-2", "index", "phase", "mock_action",
|
||||
(state) -> true);
|
||||
indexName = randomAlphaOfLengthBetween(1, 20);
|
||||
lifecycleName = randomAlphaOfLengthBetween(1, 20);
|
||||
Map<String, Phase> phases = new LinkedHashMap<>();
|
||||
firstAction = new MockAction(Arrays.asList(phaseAfterStep, updateStep, waitStep));
|
||||
Map<String, LifecycleAction> actions = Collections.singletonMap(MockAction.NAME, firstAction);
|
||||
firstPhase = new Phase("phase", null, actions);
|
||||
phases.put(firstPhase.getName(), firstPhase);
|
||||
policy = new LifecyclePolicy(TestLifecycleType.INSTANCE, lifecycleName, null);
|
||||
|
||||
List<Step> phaseSteps = firstPhase.toSteps(new Index(indexName, indexName), 0L,
|
||||
client, threadPool, nowSupplier);
|
||||
|
||||
ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE)
|
||||
.metaData(MetaData.builder().put(
|
||||
IndexMetaData.builder("index")
|
||||
.settings(settings(Version.CURRENT))
|
||||
.numberOfShards(1).numberOfReplicas(1))).build();
|
||||
|
||||
StepResult result = policy.execute(phaseSteps, clusterState, clusterState.metaData().index("index"), client, nowSupplier);
|
||||
|
||||
threadPool.shutdown();
|
||||
// ThreadPool threadPool = new TestThreadPool("test");
|
||||
// LongSupplier nowSupplier = () -> 0L;
|
||||
// Client client = mock(Client.class);
|
||||
// Step phaseAfterStep = new PhaseAfterStep(threadPool, 0L, nowSupplier,
|
||||
// TimeValue.timeValueSeconds(0L), "name-0", "index", "phase", "mock_action");
|
||||
// Step updateStep = new ClusterStateUpdateStep("name-1", "index", "phase", "mock_action",
|
||||
// (state) -> state);
|
||||
// Step waitStep = new ConditionalWaitStep("name-2", "index", "phase", "mock_action",
|
||||
// (state) -> true);
|
||||
// indexName = randomAlphaOfLengthBetween(1, 20);
|
||||
// lifecycleName = randomAlphaOfLengthBetween(1, 20);
|
||||
// Map<String, Phase> phases = new LinkedHashMap<>();
|
||||
// firstAction = new MockAction(Arrays.asList(phaseAfterStep, updateStep, waitStep));
|
||||
// Map<String, LifecycleAction> actions = Collections.singletonMap(MockAction.NAME, firstAction);
|
||||
// firstPhase = new Phase("phase", null, actions);
|
||||
// phases.put(firstPhase.getName(), firstPhase);
|
||||
// policy = new LifecyclePolicy(TestLifecycleType.INSTANCE, lifecycleName, null);
|
||||
//
|
||||
// List<Step> phaseSteps = firstPhase.toSteps(new Index(indexName, indexName), 0L,
|
||||
// client, threadPool, nowSupplier);
|
||||
//
|
||||
// ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE)
|
||||
// .metaData(MetaData.builder().put(
|
||||
// IndexMetaData.builder("index")
|
||||
// .settings(settings(Version.CURRENT))
|
||||
// .numberOfShards(1).numberOfReplicas(1))).build();
|
||||
//
|
||||
// StepResult result = policy.execute(phaseSteps, clusterState, clusterState.metaData().index("index"), client, nowSupplier);
|
||||
//
|
||||
// threadPool.shutdown();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue