moar changes from Tal

This commit is contained in:
Tal Levy 2018-03-28 17:10:50 -07:00
parent 179074d3b0
commit 74eaba2078
22 changed files with 153 additions and 94 deletions

View File

@ -182,7 +182,7 @@ public class AllocateAction implements LifecycleAction {
// }
@Override
public List<Step> toSteps(String phase) {
public List<Step> toSteps(Client client, String phase, Step.StepKey nextStepKey) {
// ClusterStateUpdateStep updateAllocationSettings = new ClusterStateUpdateStep(
// "update_allocation", NAME, phase, index.getName(), (clusterState) -> {
// IndexMetaData idxMeta = clusterState.metaData().index(index);

View File

@ -12,8 +12,8 @@ public abstract class AsyncActionStep extends Step {
private Client client;
public AsyncActionStep(String name, String action, String phase, StepKey nextStepKey, Client client) {
super(name, action, phase, nextStepKey);
public AsyncActionStep(StepKey key, StepKey nextStepKey, Client client) {
super(key, nextStepKey);
this.client = client;
}

View File

@ -12,8 +12,8 @@ public abstract class AsyncWaitStep extends Step {
private Client client;
public AsyncWaitStep(String name, String action, String phase, StepKey nextStepKey, Client client) {
super(name, action, phase, nextStepKey);
public AsyncWaitStep(StepKey key, StepKey nextStepKey, Client client) {
super(key, nextStepKey);
this.client = client;
}

View File

@ -10,8 +10,8 @@ import org.elasticsearch.index.Index;
public abstract class ClusterStateActionStep extends Step {
public ClusterStateActionStep(String name, String action, String phase, StepKey nextStepKey) {
super(name, action, phase, nextStepKey);
public ClusterStateActionStep(StepKey key, StepKey nextStepKey) {
super(key, nextStepKey);
}
public abstract ClusterState performAction(Index index, ClusterState clusterState);

View File

@ -10,8 +10,8 @@ import org.elasticsearch.index.Index;
public abstract class ClusterStateWaitStep extends Step {
public ClusterStateWaitStep(String name, String action, String phase, StepKey nextStepKey) {
super(name, action, phase, nextStepKey);
public ClusterStateWaitStep(StepKey key, StepKey nextStepKey) {
super(key, nextStepKey);
}
public abstract boolean isConditionMet(Index index, ClusterState clusterState);

View File

@ -68,12 +68,9 @@ public class DeleteAction implements LifecycleAction {
}
@Override
public List<Step> toSteps(String phase) {
// String indexName = index.getName();
// return Collections.singletonList(new ClientStep<DeleteIndexRequestBuilder, DeleteIndexResponse>( "delete",
// NAME, phase, indexName, client.admin().indices().prepareDelete(indexName),
// clusterState -> clusterState.metaData().hasIndex(indexName), response -> true));
return Arrays.asList();
public List<Step> toSteps(Client client, String phase, Step.StepKey nextStepKey) {
Step.StepKey deleteStepKey = new Step.StepKey(phase, NAME, "delete-step");
return Collections.singletonList(new DeleteAsyncActionStep(deleteStepKey, nextStepKey, client));
}
@Override

View File

@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.indexlifecycle;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.index.Index;
public class DeleteAsyncActionStep extends AsyncActionStep {
public DeleteAsyncActionStep(StepKey key, StepKey nextStepKey, Client client) {
super(key, nextStepKey, client);
}
public void performAction(Index index, Listener listener) {
getClient().admin().indices().prepareDelete(index.getName())
.execute(ActionListener.wrap(response -> listener.onResponse(true) , listener::onFailure));
}
}

View File

@ -97,7 +97,7 @@ public class ForceMergeAction implements LifecycleAction {
}
@Override
public List<Step> toSteps(String phase) {
public List<Step> toSteps(Client client, String phase, Step.StepKey nextStepKey) {
// ClusterStateUpdateStep readOnlyStep = new ClusterStateUpdateStep(
// "read_only", NAME, phase, index.getName(), (currentState) -> {
// Settings readOnlySettings = Settings.builder().put(IndexMetaData.SETTING_BLOCKS_WRITE, true).build();

View File

@ -6,21 +6,28 @@
package org.elasticsearch.xpack.core.indexlifecycle;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.index.Index;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.List;
import java.util.function.LongSupplier;
/**
* Executes an action on an index related to its lifecycle.
*/
public interface LifecycleAction extends ToXContentObject, NamedWriteable {
List<Step> toSteps(String phase);
/**
* converts the {@link LifecycleAction}'s execution plan into a series of
* {@link Step}s that reference each other to preserve order of operations.
* @param client the client that will be used by {@link AsyncActionStep} and {@link AsyncWaitStep} steps
* @param phase the name of the phase this action is being executed within
* @param nextStepKey the next step to execute after this action's steps. If null, then there are no further
* steps to run. It is the responsibility of each {@link LifecycleAction} to implement this
* correctly and not forget to link to this final step so that the policy can continue.
* @return an ordered list of steps that represent the execution plan of the action
*/
List<Step> toSteps(Client client, String phase, @Nullable Step.StepKey nextStepKey);
default boolean indexSurvives() {
return true;

View File

@ -24,11 +24,13 @@ import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.Index;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@ -144,14 +146,36 @@ public class LifecyclePolicy extends AbstractDiffable<LifecyclePolicy>
return builder;
}
public List<Step> toSteps() {
public List<Step> toSteps(Client client, LongSupplier nowSupplier) {
List<Step> steps = new ArrayList<>();
for (Phase phase : type.getOrderedPhases(phases)) {
for (LifecycleAction action : type.getOrderedActions(phase)) {
// TODO(talevy): correctly set `nextStep` between actions and phases
steps.addAll(action.toSteps(phase.getName()));
}
List<Phase> orderedPhases = type.getOrderedPhases(phases);
logger.error("checking phases[" + orderedPhases.size() + "]");
for (Phase t : orderedPhases) {
logger.error(t);
}
ListIterator<Phase> phaseIterator = orderedPhases.listIterator(orderedPhases.size());
Step.StepKey lastStepKey = null;
// add steps for each phase, in reverse
while (phaseIterator.hasPrevious()) {
Phase phase = phaseIterator.previous();
List<LifecycleAction> orderedActions = type.getOrderedActions(phase);
ListIterator<LifecycleAction> actionIterator = orderedActions.listIterator(orderedActions.size());
// add steps for each action, in reverse
while (actionIterator.hasPrevious()) {
LifecycleAction action = actionIterator.previous();
List<Step> actionSteps = action.toSteps(client, phase.getName(), lastStepKey);
ListIterator<Step> actionStepsIterator = actionSteps.listIterator(actionSteps.size());
while (actionStepsIterator.hasPrevious()) {
Step step = actionStepsIterator.previous();
steps.add(step);
lastStepKey = step.getKey();
}
}
Step.StepKey afterStepKey = new Step.StepKey(phase.getName(), null, "after");
steps.add(new PhaseAfterStep(nowSupplier, phase.getAfter(), afterStepKey, lastStepKey));
}
Collections.reverse(steps);
return steps;
}

View File

@ -5,13 +5,26 @@
*/
package org.elasticsearch.xpack.core.indexlifecycle;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
public class PhaseAfterStep extends Step {
import java.util.function.LongSupplier;
public class PhaseAfterStep extends ClusterStateWaitStep {
private final TimeValue after;
private final LongSupplier nowSupplier;
public PhaseAfterStep(String phase, String action, String name, TimeValue after, StepKey nextStepKey) {
super(name, action, phase, nextStepKey);
PhaseAfterStep(LongSupplier nowSupplier, TimeValue after, StepKey key, StepKey nextStepKey) {
super(key, nextStepKey);
this.nowSupplier = nowSupplier;
this.after = after;
}
@Override
public boolean isConditionMet(Index index, ClusterState clusterState) {
long lifecycleDate = clusterState.metaData().settings()
.getAsLong(LifecycleSettings.LIFECYCLE_INDEX_CREATION_DATE, -1L);
return nowSupplier.getAsLong() >= lifecycleDate + after.getMillis();
}
}

View File

@ -83,7 +83,7 @@ public class ReplicasAction implements LifecycleAction {
}
@Override
public List<Step> toSteps(String phase) {
public List<Step> toSteps(Client client, String phase, Step.StepKey nextStepKey) {
// ClusterStateUpdateStep updateAllocationSettings = new ClusterStateUpdateStep(
// "update_replica_count", NAME, phase, index.getName(), (currentState) ->
// ClusterState.builder(currentState).metaData(MetaData.builder(currentState.metaData())

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.core.indexlifecycle;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
@ -145,7 +146,7 @@ public class RolloverAction implements LifecycleAction {
}
@Override
public List<Step> toSteps(String phase) {
public List<Step> toSteps(Client client, String phase, Step.StepKey nextStepKey) {
return Collections.emptyList();
// ConditionalWaitStep wait = new ConditionalWaitStep(clusterService, "wait_for_rollover", index.getName(), phase, action, (clusterState) -> {
// // TODO(talevy): actually, needs to RolloverRequest with dryrun to get the appropriate data; clusterState is not enough...

View File

@ -104,7 +104,7 @@ public class ShrinkAction implements LifecycleAction {
}
@Override
public List<Step> toSteps(String phase) {
public List<Step> toSteps(Client client, String phase, Step.StepKey nextStepKey) {
// String shrunkenIndexName = SHRUNK_INDEX_NAME_PREFIX + index.getName();
// // TODO(talevy): magical node.name to allocate to
// String nodeName = "MAGIC";

View File

@ -11,29 +11,18 @@ import java.util.Objects;
* A {@link LifecycleAction} which deletes the index.
*/
public abstract class Step {
private final String name;
private final String action;
private final String phase;
private final StepKey key;
private final StepKey nextStepKey;
public Step(String name, String action, String phase, StepKey nextStepKey) {
this.name = name;
this.action = action;
this.phase = phase;
public Step(StepKey key, StepKey nextStepKey) {
this.key = key;
this.nextStepKey = nextStepKey;
}
public String getName() {
return name;
public StepKey getKey() {
return key;
}
public String getAction() {
return action;
}
public String getPhase() {
return phase;
}
public StepKey getNextStepKey() {
return nextStepKey;
@ -83,5 +72,10 @@ public abstract class Step {
StepKey other = (StepKey) obj;
return Objects.equals(phase, other.phase) && Objects.equals(action, other.action) && Objects.equals(name, other.name);
}
@Override
public String toString() {
return String.format("[%s][%s][%s]", phase, action, name);
}
}
}

View File

@ -59,7 +59,8 @@ public class TimeseriesLifecycleType implements LifecycleType {
}
public List<Phase> getOrderedPhases(Map<String, Phase> phases) {
return VALID_PHASES.stream().map(p -> phases.getOrDefault(p, null)).collect(Collectors.toList());
return VALID_PHASES.stream().map(p -> phases.getOrDefault(p, null))
.filter(Objects::nonNull).collect(Collectors.toList());
}
public List<LifecycleAction> getOrderedActions(Phase phase) {

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.core.indexlifecycle;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
@ -84,7 +85,7 @@ public class MockAction implements LifecycleAction {
}
@Override
public List<Step> toSteps(String phase) {
public List<Step> toSteps(Client client, String phase, Step.StepKey nextStepKey) {
return steps;
}

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.core.indexlifecycle;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@ -69,11 +70,11 @@ public class TestLifecycleType implements LifecycleType {
@Override
public List<Phase> getOrderedPhases(Map<String, Phase> phases) {
return null;
return new ArrayList<>(phases.values());
}
@Override
public List<LifecycleAction> getOrderedActions(Phase phase) {
return null;
return new ArrayList<>(phase.getActions().values());
}
}

View File

@ -44,8 +44,7 @@ public class IndexLifecycleRunner {
@Override
public void onResponse(boolean conditionMet) {
if (conditionMet) {
moveToStep(index, policy, new StepKey(currentStep.getPhase(), currentStep.getAction(), currentStep.getName()),
currentStep.getNextStepKey(), Cause.CALLBACK);
moveToStep(index, policy, currentStep.getKey(), currentStep.getNextStepKey(), Cause.CALLBACK);
}
}
@ -63,8 +62,7 @@ public class IndexLifecycleRunner {
@Override
public void onResponse(boolean complete) {
if (complete) {
moveToStep(index, policy, new StepKey(currentStep.getPhase(), currentStep.getAction(), currentStep.getName()),
currentStep.getNextStepKey(), Cause.CALLBACK);
moveToStep(index, policy, currentStep.getKey(), currentStep.getNextStepKey(), Cause.CALLBACK);
}
}
@ -76,7 +74,7 @@ public class IndexLifecycleRunner {
}
} else {
throw new IllegalStateException(
"Step with key [" + currentStep.getName() + "] is not a recognised type: [" + currentStep.getClass().getName() + "]");
"Step with key [" + currentStep.getKey() + "] is not a recognised type: [" + currentStep.getClass().getName() + "]");
}
}

View File

@ -82,7 +82,7 @@ public class IndexLifecycleService extends AbstractComponent
if (lifecycleMetadata != null && event.changedCustomMetaDataSet().contains(IndexLifecycleMetadata.TYPE)) {
// update policy steps registry
policyRegistry.update(event.state());
policyRegistry.update(event.state(), client, nowSupplier);
}
if (lifecycleMetadata == null) { // no lifecycle metadata, install initial empty metadata state

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.indexlifecycle;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.DiffableUtils;
@ -18,6 +19,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.function.LongSupplier;
public class PolicyStepsRegistry {
@ -36,19 +38,20 @@ public class PolicyStepsRegistry {
}
@SuppressWarnings("unchecked")
public void update(ClusterState currentState) {
public void update(ClusterState currentState, Client client, LongSupplier nowSupplier) {
IndexLifecycleMetadata meta = currentState.metaData().custom(IndexLifecycleMetadata.TYPE);
Diff<Map<String, LifecyclePolicy>> diff = DiffableUtils.diff(lifecyclePolicyMap, meta.getPolicies(), DiffableUtils.getStringKeySerializer());
DiffableUtils.MapDiff<String, LifecyclePolicy, DiffableUtils.KeySerializer<String>> mapDiff = (DiffableUtils.MapDiff) diff;
if (mapDiff.getUpserts().isEmpty() == false) {
for (LifecyclePolicy policy : mapDiff.getUpserts().values()) {
lifecyclePolicyMap.put(policy.getName(), policy);
List<Step> policyAsSteps = policy.toSteps();
List<Step> policyAsSteps = policy.toSteps(client, nowSupplier);
if (policyAsSteps.isEmpty() == false) {
firstStepMap.put(policy.getName(), policyAsSteps.get(0));
Map<Step.StepKey, Step> stepMapForPolicy = stepMap.put(policy.getName(), new HashMap<>());
stepMap.put(policy.getName(), new HashMap<>());
Map<Step.StepKey, Step> stepMapForPolicy = stepMap.get(policy.getName());
for (Step step : policyAsSteps) {
stepMapForPolicy.put(new Step.StepKey(step.getPhase(), step.getAction(), step.getName()), step);
stepMapForPolicy.put(step.getKey(), step);
}
}
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.indexlifecycle;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@ -14,11 +15,13 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.core.indexlifecycle.DeleteAction;
import org.elasticsearch.xpack.core.indexlifecycle.DeleteAsyncActionStep;
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleAction;
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleType;
import org.elasticsearch.xpack.core.indexlifecycle.MockAction;
import org.elasticsearch.xpack.core.indexlifecycle.Phase;
import org.elasticsearch.xpack.core.indexlifecycle.Step;
import org.elasticsearch.xpack.core.indexlifecycle.TestLifecycleType;
import org.elasticsearch.xpack.core.indexlifecycle.TimeseriesLifecycleType;
@ -30,6 +33,10 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.LongSupplier;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecyclePolicy> {
@ -110,36 +117,26 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
assertSame(TimeseriesLifecycleType.INSTANCE, policy.getType());
}
public void testSteps() throws Exception {
// ThreadPool threadPool = new TestThreadPool("test");
// LongSupplier nowSupplier = () -> 0L;
// Client client = mock(Client.class);
// Step phaseAfterStep = new PhaseAfterStep(threadPool, 0L, nowSupplier,
// TimeValue.timeValueSeconds(0L), "name-0", "index", "phase", "mock_action");
// Step updateStep = new ClusterStateUpdateStep("name-1", "index", "phase", "mock_action",
// (state) -> state);
// Step waitStep = new ConditionalWaitStep("name-2", "index", "phase", "mock_action",
// (state) -> true);
// indexName = randomAlphaOfLengthBetween(1, 20);
// lifecycleName = randomAlphaOfLengthBetween(1, 20);
// Map<String, Phase> phases = new LinkedHashMap<>();
// firstAction = new MockAction(Arrays.asList(phaseAfterStep, updateStep, waitStep));
// Map<String, LifecycleAction> actions = Collections.singletonMap(MockAction.NAME, firstAction);
// firstPhase = new Phase("phase", null, actions);
// phases.put(firstPhase.getName(), firstPhase);
// policy = new LifecyclePolicy(TestLifecycleType.INSTANCE, lifecycleName, null);
//
// List<Step> phaseSteps = firstPhase.toSteps(new Index(indexName, indexName), 0L,
// client, threadPool, nowSupplier);
//
// ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE)
// .metaData(MetaData.builder().put(
// IndexMetaData.builder("index")
// .settings(settings(Version.CURRENT))
// .numberOfShards(1).numberOfReplicas(1))).build();
//
// StepResult result = policy.execute(phaseSteps, clusterState, clusterState.metaData().index("index"), client, nowSupplier);
//
// threadPool.shutdown();
public void testToSteps() throws Exception {
Client client = mock(Client.class);
LongSupplier nowSupplier = () -> 0L;
Step deleteStep = new DeleteAsyncActionStep(
new Step.StepKey("delete", "DELETE", "delete"), null, client);
indexName = randomAlphaOfLengthBetween(1, 20);
lifecycleName = randomAlphaOfLengthBetween(1, 20);
Map<String, Phase> phases = new LinkedHashMap<>();
firstAction = new MockAction(Arrays.asList(deleteStep));
Map<String, LifecycleAction> actions = Collections.singletonMap(MockAction.NAME, firstAction);
firstPhase = new Phase("delete", TimeValue.ZERO, actions);
phases.put(firstPhase.getName(), firstPhase);
policy = new LifecyclePolicy(TestLifecycleType.INSTANCE, lifecycleName, phases);
List<Step> steps = policy.toSteps(client, nowSupplier);
assertThat(steps.size(), equalTo(2));
assertThat(steps.get(0).getKey(), equalTo(new Step.StepKey("delete", null, "after")));
assertThat(steps.get(0).getNextStepKey(), equalTo(deleteStep.getKey()));
assertThat(steps.get(1), equalTo(deleteStep));
assertNull(steps.get(1).getNextStepKey());
}
}