diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/DeleteAction.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/DeleteAction.java index 575edef05a4..d9121b9243b 100644 --- a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/DeleteAction.java +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/DeleteAction.java @@ -21,6 +21,9 @@ import org.elasticsearch.index.Index; import java.io.IOException; +/** + * A {@link LifecycleAction} which deletes the index. + */ public class DeleteAction implements LifecycleAction { public static final String NAME = "delete"; diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java index d0e07eb38c1..0bc1dc6a24a 100644 --- a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java @@ -53,7 +53,7 @@ public class IndexLifecycle extends Plugin { public static final String NAME = "index_lifecycle"; public static final String BASE_PATH = "/_xpack/index_lifecycle/"; public static final String THREAD_POOL_NAME = NAME; - private final SetOnce indexLifecycleInitialisationService = new SetOnce<>(); + private final SetOnce indexLifecycleInitialisationService = new SetOnce<>(); private Settings settings; private boolean enabled; private boolean transportClientMode; @@ -97,7 +97,7 @@ public class IndexLifecycle extends Plugin { return emptyList(); } indexLifecycleInitialisationService - .set(new IndexLifecycleInitialisationService(settings, client, clusterService, clock, threadPool)); + .set(new IndexLifecycleService(settings, client, clusterService, clock, threadPool)); return Collections.singletonList(indexLifecycleInitialisationService.get()); } diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleContext.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleContext.java index 501faf93080..dfe4a7f8b92 100644 --- a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleContext.java +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleContext.java @@ -5,26 +5,94 @@ */ package org.elasticsearch.xpack.indexlifecycle; +import org.elasticsearch.client.Client; +import org.elasticsearch.index.Index; + +/** + * Provides the context to a {@link LifecyclePolicy} for a particular target. + * This context provides the state of the lifecycle target (hereafter referred + * to as the target) as well as allow operations to be performed on the target. + */ public interface IndexLifecycleContext { + /** + * Sets the phase for the target and calls the provided callback. Note that + * this will also set the action name to an empty {@link String}. + * + * @param phase + * the name of the phase to be set. + * @param listener + * a {@link Listener} to call after the operation. + */ void setPhase(String phase, Listener listener); + /** + * Sets the action for the target and calls the provided callback. + * + * @param action + * the name of the action to be set. + * @param listener + * a {@link Listener} to call after the operation. + */ void setAction(String action, Listener listener); + /** + * @return the current {@link LifecycleAction} name for the target. + */ String getAction(); + /** + * @return the current {@link Phase} name for the target. + */ String getPhase(); + /** + * @return the name of the target. + */ String getLifecycleTarget(); + /** + * Determines whether the target is able to move to the provided + * {@link Phase} + * + * @param phase + * the {@link Phase} to test + * @return true iff the target is ready to move to the + * {@link Phase}. + */ boolean canExecute(Phase phase); + /** + * Executes the provided {@link LifecycleAction} passing the relevant target + * state to it. + * + * @param action + * the {@link LifecycleAction} to execute. + * @param listener + * a {@link LifecycleAction.Listener} to pass to the + * {@link LifecycleAction}. + */ public void executeAction(LifecycleAction action, LifecycleAction.Listener listener); + /** + * A callback for use when setting phase or action names. + */ public static interface Listener { + /** + * Called if the call to + * {@link LifecycleAction#execute(Index, Client, Listener)} was + * successful + */ void onSuccess(); + /** + * Called if there was an exception when setting the action or phase + * name. + * + * @param e + * the exception that caused the failure + */ void onFailure(Exception e); } } diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleInitialisationService.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java similarity index 95% rename from x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleInitialisationService.java rename to x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java index 52ab4e25900..8955e416a5d 100644 --- a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleInitialisationService.java +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java @@ -33,9 +33,9 @@ import java.util.function.LongSupplier; import static org.elasticsearch.xpack.indexlifecycle.IndexLifecycle.NAME; -public class IndexLifecycleInitialisationService extends AbstractComponent +public class IndexLifecycleService extends AbstractComponent implements ClusterStateListener, SchedulerEngine.Listener, Closeable { - private static final Logger logger = ESLoggerFactory.getLogger(IndexLifecycleInitialisationService.class); + private static final Logger logger = ESLoggerFactory.getLogger(IndexLifecycleService.class); private final SetOnce scheduler = new SetOnce<>(); private final Clock clock; @@ -44,7 +44,7 @@ public class IndexLifecycleInitialisationService extends AbstractComponent private ThreadPool threadPool; private LongSupplier nowSupplier; - public IndexLifecycleInitialisationService(Settings settings, Client client, ClusterService clusterService, Clock clock, + public IndexLifecycleService(Settings settings, Client client, ClusterService clusterService, Clock clock, ThreadPool threadPool) { super(settings); this.client = client; diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/InternalIndexLifecycleContext.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/InternalIndexLifecycleContext.java index c23b1f87f86..d3ddbfecf1c 100644 --- a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/InternalIndexLifecycleContext.java +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/InternalIndexLifecycleContext.java @@ -14,12 +14,25 @@ import org.elasticsearch.common.settings.Settings; import java.util.function.LongSupplier; +/** + * An Implementation of {@link IndexLifecycleContext} which writes lifecycle + * state to index settings. + */ public class InternalIndexLifecycleContext implements IndexLifecycleContext { private Client client; private IndexMetaData idxMeta; private LongSupplier nowSupplier; + /** + * @param idxMeta + * the {@link IndexMetaData} for the index. + * @param client + * the {@link Client} to use when modifying the index settings. + * @param nowSupplier + * a {@link LongSupplier} to provide the current timestamp when + * required. + */ public InternalIndexLifecycleContext(IndexMetaData idxMeta, Client client, LongSupplier nowSupplier) { this.idxMeta = idxMeta; this.client = client; @@ -54,12 +67,14 @@ public class InternalIndexLifecycleContext implements IndexLifecycleContext { return idxMeta.getIndex().getName(); } + @Override public boolean canExecute(Phase phase) { long now = nowSupplier.getAsLong(); long indexCreated = idxMeta.getCreationDate(); return (indexCreated + phase.getAfter().millis()) <= now; } + @Override public void executeAction(LifecycleAction action, LifecycleAction.Listener listener) { action.execute(idxMeta.getIndex(), client, listener); } diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/LifecycleAction.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/LifecycleAction.java index 339f9140e41..727127577d9 100644 --- a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/LifecycleAction.java +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/LifecycleAction.java @@ -10,14 +10,55 @@ import org.elasticsearch.common.io.stream.NamedWriteable; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.index.Index; +/** + * Executes an action on an index related to its lifecycle. + */ public interface LifecycleAction extends ToXContentObject, NamedWriteable { + /** + * Checks the current state of the {@link LifecycleAction} and progresses + * the action towards completion . Note that a {@link LifecycleAction} may + * require multiple calls to this method before it is complete. Equally this + * method may do nothing if it needs to wait for something to complete + * before proceeding. + * + * @param index + * the {@link Index} on which to perform the action. + * @param client + * the {@link Client} to use for making changes to the index. + * @param listener + * a {@link Listener} to call when this call completes. + */ void execute(Index index, Client client, Listener listener); + /** + * A callback for when a {@link LifecycleAction} finishes executing + */ public static interface Listener { + /** + * Called if the call to + * {@link LifecycleAction#execute(Index, Client, Listener)} was + * successful + * + * @param completed + * true iff the {@link LifecycleAction} is now + * complete and requires no more calls to + * {@link LifecycleAction#execute(Index, Client, Listener) + * execute(Index, Client, Listener)}. + */ void onSuccess(boolean completed); + /** + * Called if there was an exception during + * {@link LifecycleAction#execute(Index, Client, Listener)}. Note that + * even the call to + * {@link LifecycleAction#execute(Index, Client, Listener)} may be + * retried even after this method is called. + * + * @param e + * the exception that caused the failure + */ void onFailure(Exception e); } } diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/LifecyclePolicy.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/LifecyclePolicy.java index a1100361f33..fc2584c450c 100644 --- a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/LifecyclePolicy.java +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/LifecyclePolicy.java @@ -25,6 +25,14 @@ import java.io.IOException; import java.util.List; import java.util.Objects; +/** + * Represents the lifecycle of an index from creation to deletion. A + * {@link LifecyclePolicy} is made up of a set of {@link Phase}s which it will + * move through. Soon we will constrain the phases using some kinda of lifecycle + * type which will allow only particular {@link Phase}s to be defined, will + * dictate the order in which the {@link Phase}s are executed and will define + * which {@link LifecycleAction}s are allowed in each phase. + */ public class LifecyclePolicy extends AbstractDiffable implements ToXContentObject, Writeable { private static final Logger logger = ESLoggerFactory.getLogger(LifecyclePolicy.class); @@ -48,11 +56,22 @@ public class LifecyclePolicy extends AbstractDiffable implement private final String name; private final List phases; + /** + * @param name + * the name of this {@link LifecyclePolicy} + * @param phases + * a {@link List} of {@link Phase}s which make up this + * {@link LifecyclePolicy}. These {@link Phase}s are executed in + * the order of the {@link List}. + */ public LifecyclePolicy(String name, List phases) { this.name = name; this.phases = phases; } + /** + * For Serialization + */ public LifecyclePolicy(StreamInput in) throws IOException { name = in.readString(); phases = in.readList(Phase::new); @@ -64,10 +83,17 @@ public class LifecyclePolicy extends AbstractDiffable implement out.writeList(phases); } + /** + * @return the name of this {@link LifecyclePolicy} + */ public String getName() { return name; } + /** + * @return the {@link Phase}s for this {@link LifecyclePolicy} in the order + * in which they will be executed. + */ public List getPhases() { return phases; } @@ -84,27 +110,40 @@ public class LifecyclePolicy extends AbstractDiffable implement return builder; } + /** + * Checks the current state and executes the appropriate {@link Phase}. + * + * @param context + * the {@link IndexLifecycleContext} to use to execute the + * {@link LifecyclePolicy}. + */ public void execute(IndexLifecycleContext context) { String currentPhaseName = context.getPhase(); boolean currentPhaseActionsComplete = context.getAction().equals(Phase.PHASE_COMPLETED); String indexName = context.getLifecycleTarget(); if (Strings.isNullOrEmpty(currentPhaseName) || currentPhaseActionsComplete) { + // Either this is the first time we have seen this index or the current phase is complete, in both cases we need to move to the next phase int currentPhaseIndex = -1; + // First find the current phase (will not find it if this is the first time we've seen this index) for (int i = 0; i < phases.size(); i++) { if (phases.get(i).getName().equals(currentPhaseName)) { currentPhaseIndex = i; break; } } + // If we have reached the last phase then we don't need to do anything (maybe the last phase doesn't have a delete action?) if (currentPhaseIndex < phases.size() - 1) { Phase nextPhase = phases.get(currentPhaseIndex + 1); + // We only want to execute the phase if the conditions for executing are met (e.g. the index is old enough) if (context.canExecute(nextPhase)) { String nextPhaseName = nextPhase.getName(); + // Set the phase on the context to this phase so we know where we are next time we execute context.setPhase(nextPhaseName, new Listener() { @Override public void onSuccess() { logger.info("Successfully initialised phase [" + nextPhaseName + "] for index [" + indexName + "]"); + // We might as well execute the phase now rather than waiting for execute to be called again nextPhase.execute(context); } @@ -116,6 +155,7 @@ public class LifecyclePolicy extends AbstractDiffable implement } } } else { + // If we have already seen this index and the action is not PHASE_COMPLETED then we just need to execute the current phase again Phase currentPhase = phases.stream().filter(phase -> phase.getName().equals(currentPhaseName)).findAny() .orElseThrow(() -> new IllegalStateException("Current phase [" + currentPhaseName + "] not found in lifecycle [" + getName() + "] for index [" + indexName + "]")); diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/Phase.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/Phase.java index 3b429f69df9..04534b9ec57 100644 --- a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/Phase.java +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/Phase.java @@ -26,6 +26,10 @@ import java.io.IOException; import java.util.List; import java.util.Objects; +/** + * Represents set of {@link LifecycleAction}s which should be executed at a + * particular point in the lifecycle of an index. + */ public class Phase implements ToXContentObject, Writeable { public static final String PHASE_COMPLETED = "ACTIONS COMPLETED"; @@ -54,12 +58,26 @@ public class Phase implements ToXContentObject, Writeable { private List actions; private TimeValue after; + /** + * @param name + * the name of this {@link Phase}. + * @param after + * the age of the index when the index should move to this + * {@link Phase}. + * @param actions + * a {@link List} of the {@link LifecycleAction}s to run when + * during his {@link Phase}. The order of this list defines the + * order in which the {@link LifecycleAction}s will be run. + */ public Phase(String name, TimeValue after, List actions) { this.name = name; this.after = after; this.actions = actions; } + /** + * For Serialization + */ public Phase(StreamInput in) throws IOException { this.name = in.readString(); this.after = new TimeValue(in); @@ -73,66 +91,75 @@ public class Phase implements ToXContentObject, Writeable { out.writeNamedWriteableList(actions); } + /** + * @return the age of the index when the index should move to this + * {@link Phase}. + */ public TimeValue getAfter() { return after; } + /** + * @return the name of this {@link Phase} + */ public String getName() { return name; } + /** + * @return a {@link List} of the {@link LifecycleAction}s to run when during + * his {@link Phase}. The order of this list defines the order in + * which the {@link LifecycleAction}s will be run. + */ public List getActions() { return actions; } + /** + * Checks the current state and executes the appropriate + * {@link LifecycleAction}. + * + * @param context + * the {@link IndexLifecycleContext} to use to execute the + * {@link Phase}. + */ protected void execute(IndexLifecycleContext context) { String currentActionName = context.getAction(); String indexName = context.getLifecycleTarget(); if (Strings.isNullOrEmpty(currentActionName)) { + // This is is the first time this phase has been called so get the first action and execute it. String firstActionName; LifecycleAction firstAction; if (actions.isEmpty()) { + // There are no actions in this phase so use the PHASE_COMPLETE action name. firstAction = null; firstActionName = PHASE_COMPLETED; } else { firstAction = actions.get(0); firstActionName = firstAction.getWriteableName(); } + // Set the action on the context to this first action so we know where we are next time we execute context.setAction(firstActionName, new Listener() { @Override public void onSuccess() { logger.info("Successfully initialised action [" + firstActionName + "] for index [" + indexName + "]"); + // Now execute the action unless its PHASE_COMPLETED if (firstActionName.equals(PHASE_COMPLETED) == false) { - context.executeAction(firstAction, new LifecycleAction.Listener() { - - @Override - public void onSuccess(boolean completed) { - if (completed) { - logger.info("Action [" + firstActionName + "] for index [" + indexName - + "] complete, moving to next action"); - moveToAction(context, indexName, 1); - } else { - logger.info("Action [" + firstActionName + "] for index [" + indexName - + "] executed sucessfully but is not yet complete"); - } - } - - @Override - public void onFailure(Exception e) { - logger.info("Action [" + firstActionName + "] for index [" + indexName + "] failed", e); - } - }); + executeAction(context, indexName, 0, firstAction); } } @Override public void onFailure(Exception e) { + // Something went wrong so log the error and hopfully it will succeed next time execute is called. NOCOMMIT can we do better here? logger.error("Failed to initialised action [" + firstActionName + "] for index [" + indexName + "]", e); } }); } else if (currentActionName.equals(PHASE_COMPLETED) == false) { + // We have an action name and its not PHASE COMPLETED so we need to execute the action int currentActionIndex = -1; + // First find the action in the actions list. for (int i = 0; i < actions.size(); i++) { if (actions.get(i).getWriteableName().equals(currentActionName)) { currentActionIndex = i; @@ -144,28 +171,32 @@ public class Phase implements ToXContentObject, Writeable { + getName() + "] for index [" + indexName + "]"); } LifecycleAction currentAction = actions.get(currentActionIndex); - final int nextActionIndex = currentActionIndex + 1; - context.executeAction(currentAction, new LifecycleAction.Listener() { - - @Override - public void onSuccess(boolean completed) { - if (completed) { - logger.info("Action [" + currentActionName + "] for index [" + indexName + "] complete, moving to next action"); - moveToAction(context, indexName, nextActionIndex); - } else { - logger.info("Action [" + currentActionName + "] for index [" + indexName - + "] executed sucessfully but is not yet complete"); - } - } - - @Override - public void onFailure(Exception e) { - logger.info("Action [" + currentActionName + "] for index [" + indexName + "] failed", e); - } - }); + executeAction(context, indexName, currentActionIndex, currentAction); } } + private void executeAction(IndexLifecycleContext context, String indexName, int actionIndex, LifecycleAction action) { + String actionName = action.getWriteableName(); + context.executeAction(action, new LifecycleAction.Listener() { + + @Override + public void onSuccess(boolean completed) { + if (completed) { + logger.info("Action [" + actionName + "] for index [" + indexName + "] complete, moving to next action"); + // Since we completed the current action move to the next action + moveToAction(context, indexName, actionIndex + 1); + } else { + logger.info("Action [" + actionName + "] for index [" + indexName + "] executed sucessfully but is not yet complete"); + } + } + + @Override + public void onFailure(Exception e) { + logger.info("Action [" + actionName + "] for index [" + indexName + "] failed", e); + } + }); + } + private void moveToAction(IndexLifecycleContext context, String indexName, final int nextActionIndex) { if (nextActionIndex < actions.size()) { LifecycleAction nextAction = actions.get(nextActionIndex); @@ -175,6 +206,7 @@ public class Phase implements ToXContentObject, Writeable { public void onSuccess() { logger.info("Successfully initialised action [" + nextAction.getWriteableName() + "] in phase [" + getName() + "] for index [" + indexName + "]"); + // We might as well execute the new action now rather than waiting for execute to be called again execute(context); } @@ -185,6 +217,7 @@ public class Phase implements ToXContentObject, Writeable { } }); } else { + // There is no next action so set the action to PHASE_COMPLETED context.setAction(Phase.PHASE_COMPLETED, new Listener() { @Override