Adds JavaDocs and comments and small refactoring

This commit is contained in:
Colin Goodheart-Smithe 2017-11-27 12:22:10 +00:00
parent 9ab2121fcf
commit 87766cd4ef
8 changed files with 243 additions and 43 deletions

View File

@ -21,6 +21,9 @@ import org.elasticsearch.index.Index;
import java.io.IOException;
/**
* A {@link LifecycleAction} which deletes the index.
*/
public class DeleteAction implements LifecycleAction {
public static final String NAME = "delete";

View File

@ -53,7 +53,7 @@ public class IndexLifecycle extends Plugin {
public static final String NAME = "index_lifecycle";
public static final String BASE_PATH = "/_xpack/index_lifecycle/";
public static final String THREAD_POOL_NAME = NAME;
private final SetOnce<IndexLifecycleInitialisationService> indexLifecycleInitialisationService = new SetOnce<>();
private final SetOnce<IndexLifecycleService> indexLifecycleInitialisationService = new SetOnce<>();
private Settings settings;
private boolean enabled;
private boolean transportClientMode;
@ -97,7 +97,7 @@ public class IndexLifecycle extends Plugin {
return emptyList();
}
indexLifecycleInitialisationService
.set(new IndexLifecycleInitialisationService(settings, client, clusterService, clock, threadPool));
.set(new IndexLifecycleService(settings, client, clusterService, clock, threadPool));
return Collections.singletonList(indexLifecycleInitialisationService.get());
}

View File

@ -5,26 +5,94 @@
*/
package org.elasticsearch.xpack.indexlifecycle;
import org.elasticsearch.client.Client;
import org.elasticsearch.index.Index;
/**
* Provides the context to a {@link LifecyclePolicy} for a particular target.
* This context provides the state of the lifecycle target (hereafter referred
* to as the target) as well as allow operations to be performed on the target.
*/
public interface IndexLifecycleContext {
/**
* Sets the phase for the target and calls the provided callback. Note that
* this will also set the action name to an empty {@link String}.
*
* @param phase
* the name of the phase to be set.
* @param listener
* a {@link Listener} to call after the operation.
*/
void setPhase(String phase, Listener listener);
/**
* Sets the action for the target and calls the provided callback.
*
* @param action
* the name of the action to be set.
* @param listener
* a {@link Listener} to call after the operation.
*/
void setAction(String action, Listener listener);
/**
* @return the current {@link LifecycleAction} name for the target.
*/
String getAction();
/**
* @return the current {@link Phase} name for the target.
*/
String getPhase();
/**
* @return the name of the target.
*/
String getLifecycleTarget();
/**
* Determines whether the target is able to move to the provided
* {@link Phase}
*
* @param phase
* the {@link Phase} to test
* @return <code>true</code> iff the target is ready to move to the
* {@link Phase}.
*/
boolean canExecute(Phase phase);
/**
* Executes the provided {@link LifecycleAction} passing the relevant target
* state to it.
*
* @param action
* the {@link LifecycleAction} to execute.
* @param listener
* a {@link LifecycleAction.Listener} to pass to the
* {@link LifecycleAction}.
*/
public void executeAction(LifecycleAction action, LifecycleAction.Listener listener);
/**
* A callback for use when setting phase or action names.
*/
public static interface Listener {
/**
* Called if the call to
* {@link LifecycleAction#execute(Index, Client, Listener)} was
* successful
*/
void onSuccess();
/**
* Called if there was an exception when setting the action or phase
* name.
*
* @param e
* the exception that caused the failure
*/
void onFailure(Exception e);
}
}

View File

@ -33,9 +33,9 @@ import java.util.function.LongSupplier;
import static org.elasticsearch.xpack.indexlifecycle.IndexLifecycle.NAME;
public class IndexLifecycleInitialisationService extends AbstractComponent
public class IndexLifecycleService extends AbstractComponent
implements ClusterStateListener, SchedulerEngine.Listener, Closeable {
private static final Logger logger = ESLoggerFactory.getLogger(IndexLifecycleInitialisationService.class);
private static final Logger logger = ESLoggerFactory.getLogger(IndexLifecycleService.class);
private final SetOnce<SchedulerEngine> scheduler = new SetOnce<>();
private final Clock clock;
@ -44,7 +44,7 @@ public class IndexLifecycleInitialisationService extends AbstractComponent
private ThreadPool threadPool;
private LongSupplier nowSupplier;
public IndexLifecycleInitialisationService(Settings settings, Client client, ClusterService clusterService, Clock clock,
public IndexLifecycleService(Settings settings, Client client, ClusterService clusterService, Clock clock,
ThreadPool threadPool) {
super(settings);
this.client = client;

View File

@ -14,12 +14,25 @@ import org.elasticsearch.common.settings.Settings;
import java.util.function.LongSupplier;
/**
* An Implementation of {@link IndexLifecycleContext} which writes lifecycle
* state to index settings.
*/
public class InternalIndexLifecycleContext implements IndexLifecycleContext {
private Client client;
private IndexMetaData idxMeta;
private LongSupplier nowSupplier;
/**
* @param idxMeta
* the {@link IndexMetaData} for the index.
* @param client
* the {@link Client} to use when modifying the index settings.
* @param nowSupplier
* a {@link LongSupplier} to provide the current timestamp when
* required.
*/
public InternalIndexLifecycleContext(IndexMetaData idxMeta, Client client, LongSupplier nowSupplier) {
this.idxMeta = idxMeta;
this.client = client;
@ -54,12 +67,14 @@ public class InternalIndexLifecycleContext implements IndexLifecycleContext {
return idxMeta.getIndex().getName();
}
@Override
public boolean canExecute(Phase phase) {
long now = nowSupplier.getAsLong();
long indexCreated = idxMeta.getCreationDate();
return (indexCreated + phase.getAfter().millis()) <= now;
}
@Override
public void executeAction(LifecycleAction action, LifecycleAction.Listener listener) {
action.execute(idxMeta.getIndex(), client, listener);
}

View File

@ -10,14 +10,55 @@ import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.index.Index;
/**
* Executes an action on an index related to its lifecycle.
*/
public interface LifecycleAction extends ToXContentObject, NamedWriteable {
/**
* Checks the current state of the {@link LifecycleAction} and progresses
* the action towards completion . Note that a {@link LifecycleAction} may
* require multiple calls to this method before it is complete. Equally this
* method may do nothing if it needs to wait for something to complete
* before proceeding.
*
* @param index
* the {@link Index} on which to perform the action.
* @param client
* the {@link Client} to use for making changes to the index.
* @param listener
* a {@link Listener} to call when this call completes.
*/
void execute(Index index, Client client, Listener listener);
/**
* A callback for when a {@link LifecycleAction} finishes executing
*/
public static interface Listener {
/**
* Called if the call to
* {@link LifecycleAction#execute(Index, Client, Listener)} was
* successful
*
* @param completed
* <code>true</code> iff the {@link LifecycleAction} is now
* complete and requires no more calls to
* {@link LifecycleAction#execute(Index, Client, Listener)
* execute(Index, Client, Listener)}.
*/
void onSuccess(boolean completed);
/**
* Called if there was an exception during
* {@link LifecycleAction#execute(Index, Client, Listener)}. Note that
* even the call to
* {@link LifecycleAction#execute(Index, Client, Listener)} may be
* retried even after this method is called.
*
* @param e
* the exception that caused the failure
*/
void onFailure(Exception e);
}
}

View File

@ -25,6 +25,14 @@ import java.io.IOException;
import java.util.List;
import java.util.Objects;
/**
* Represents the lifecycle of an index from creation to deletion. A
* {@link LifecyclePolicy} is made up of a set of {@link Phase}s which it will
* move through. Soon we will constrain the phases using some kinda of lifecycle
* type which will allow only particular {@link Phase}s to be defined, will
* dictate the order in which the {@link Phase}s are executed and will define
* which {@link LifecycleAction}s are allowed in each phase.
*/
public class LifecyclePolicy extends AbstractDiffable<LifecyclePolicy> implements ToXContentObject, Writeable {
private static final Logger logger = ESLoggerFactory.getLogger(LifecyclePolicy.class);
@ -48,11 +56,22 @@ public class LifecyclePolicy extends AbstractDiffable<LifecyclePolicy> implement
private final String name;
private final List<Phase> phases;
/**
* @param name
* the name of this {@link LifecyclePolicy}
* @param phases
* a {@link List} of {@link Phase}s which make up this
* {@link LifecyclePolicy}. These {@link Phase}s are executed in
* the order of the {@link List}.
*/
public LifecyclePolicy(String name, List<Phase> phases) {
this.name = name;
this.phases = phases;
}
/**
* For Serialization
*/
public LifecyclePolicy(StreamInput in) throws IOException {
name = in.readString();
phases = in.readList(Phase::new);
@ -64,10 +83,17 @@ public class LifecyclePolicy extends AbstractDiffable<LifecyclePolicy> implement
out.writeList(phases);
}
/**
* @return the name of this {@link LifecyclePolicy}
*/
public String getName() {
return name;
}
/**
* @return the {@link Phase}s for this {@link LifecyclePolicy} in the order
* in which they will be executed.
*/
public List<Phase> getPhases() {
return phases;
}
@ -84,27 +110,40 @@ public class LifecyclePolicy extends AbstractDiffable<LifecyclePolicy> implement
return builder;
}
/**
* Checks the current state and executes the appropriate {@link Phase}.
*
* @param context
* the {@link IndexLifecycleContext} to use to execute the
* {@link LifecyclePolicy}.
*/
public void execute(IndexLifecycleContext context) {
String currentPhaseName = context.getPhase();
boolean currentPhaseActionsComplete = context.getAction().equals(Phase.PHASE_COMPLETED);
String indexName = context.getLifecycleTarget();
if (Strings.isNullOrEmpty(currentPhaseName) || currentPhaseActionsComplete) {
// Either this is the first time we have seen this index or the current phase is complete, in both cases we need to move to the next phase
int currentPhaseIndex = -1;
// First find the current phase (will not find it if this is the first time we've seen this index)
for (int i = 0; i < phases.size(); i++) {
if (phases.get(i).getName().equals(currentPhaseName)) {
currentPhaseIndex = i;
break;
}
}
// If we have reached the last phase then we don't need to do anything (maybe the last phase doesn't have a delete action?)
if (currentPhaseIndex < phases.size() - 1) {
Phase nextPhase = phases.get(currentPhaseIndex + 1);
// We only want to execute the phase if the conditions for executing are met (e.g. the index is old enough)
if (context.canExecute(nextPhase)) {
String nextPhaseName = nextPhase.getName();
// Set the phase on the context to this phase so we know where we are next time we execute
context.setPhase(nextPhaseName, new Listener() {
@Override
public void onSuccess() {
logger.info("Successfully initialised phase [" + nextPhaseName + "] for index [" + indexName + "]");
// We might as well execute the phase now rather than waiting for execute to be called again
nextPhase.execute(context);
}
@ -116,6 +155,7 @@ public class LifecyclePolicy extends AbstractDiffable<LifecyclePolicy> implement
}
}
} else {
// If we have already seen this index and the action is not PHASE_COMPLETED then we just need to execute the current phase again
Phase currentPhase = phases.stream().filter(phase -> phase.getName().equals(currentPhaseName)).findAny()
.orElseThrow(() -> new IllegalStateException("Current phase [" + currentPhaseName + "] not found in lifecycle ["
+ getName() + "] for index [" + indexName + "]"));

View File

@ -26,6 +26,10 @@ import java.io.IOException;
import java.util.List;
import java.util.Objects;
/**
* Represents set of {@link LifecycleAction}s which should be executed at a
* particular point in the lifecycle of an index.
*/
public class Phase implements ToXContentObject, Writeable {
public static final String PHASE_COMPLETED = "ACTIONS COMPLETED";
@ -54,12 +58,26 @@ public class Phase implements ToXContentObject, Writeable {
private List<LifecycleAction> actions;
private TimeValue after;
/**
* @param name
* the name of this {@link Phase}.
* @param after
* the age of the index when the index should move to this
* {@link Phase}.
* @param actions
* a {@link List} of the {@link LifecycleAction}s to run when
* during his {@link Phase}. The order of this list defines the
* order in which the {@link LifecycleAction}s will be run.
*/
public Phase(String name, TimeValue after, List<LifecycleAction> actions) {
this.name = name;
this.after = after;
this.actions = actions;
}
/**
* For Serialization
*/
public Phase(StreamInput in) throws IOException {
this.name = in.readString();
this.after = new TimeValue(in);
@ -73,66 +91,75 @@ public class Phase implements ToXContentObject, Writeable {
out.writeNamedWriteableList(actions);
}
/**
* @return the age of the index when the index should move to this
* {@link Phase}.
*/
public TimeValue getAfter() {
return after;
}
/**
* @return the name of this {@link Phase}
*/
public String getName() {
return name;
}
/**
* @return a {@link List} of the {@link LifecycleAction}s to run when during
* his {@link Phase}. The order of this list defines the order in
* which the {@link LifecycleAction}s will be run.
*/
public List<LifecycleAction> getActions() {
return actions;
}
/**
* Checks the current state and executes the appropriate
* {@link LifecycleAction}.
*
* @param context
* the {@link IndexLifecycleContext} to use to execute the
* {@link Phase}.
*/
protected void execute(IndexLifecycleContext context) {
String currentActionName = context.getAction();
String indexName = context.getLifecycleTarget();
if (Strings.isNullOrEmpty(currentActionName)) {
// This is is the first time this phase has been called so get the first action and execute it.
String firstActionName;
LifecycleAction firstAction;
if (actions.isEmpty()) {
// There are no actions in this phase so use the PHASE_COMPLETE action name.
firstAction = null;
firstActionName = PHASE_COMPLETED;
} else {
firstAction = actions.get(0);
firstActionName = firstAction.getWriteableName();
}
// Set the action on the context to this first action so we know where we are next time we execute
context.setAction(firstActionName, new Listener() {
@Override
public void onSuccess() {
logger.info("Successfully initialised action [" + firstActionName + "] for index [" + indexName + "]");
// Now execute the action unless its PHASE_COMPLETED
if (firstActionName.equals(PHASE_COMPLETED) == false) {
context.executeAction(firstAction, new LifecycleAction.Listener() {
@Override
public void onSuccess(boolean completed) {
if (completed) {
logger.info("Action [" + firstActionName + "] for index [" + indexName
+ "] complete, moving to next action");
moveToAction(context, indexName, 1);
} else {
logger.info("Action [" + firstActionName + "] for index [" + indexName
+ "] executed sucessfully but is not yet complete");
}
}
@Override
public void onFailure(Exception e) {
logger.info("Action [" + firstActionName + "] for index [" + indexName + "] failed", e);
}
});
executeAction(context, indexName, 0, firstAction);
}
}
@Override
public void onFailure(Exception e) {
// Something went wrong so log the error and hopfully it will succeed next time execute is called. NOCOMMIT can we do better here?
logger.error("Failed to initialised action [" + firstActionName + "] for index [" + indexName + "]", e);
}
});
} else if (currentActionName.equals(PHASE_COMPLETED) == false) {
// We have an action name and its not PHASE COMPLETED so we need to execute the action
int currentActionIndex = -1;
// First find the action in the actions list.
for (int i = 0; i < actions.size(); i++) {
if (actions.get(i).getWriteableName().equals(currentActionName)) {
currentActionIndex = i;
@ -144,27 +171,31 @@ public class Phase implements ToXContentObject, Writeable {
+ getName() + "] for index [" + indexName + "]");
}
LifecycleAction currentAction = actions.get(currentActionIndex);
final int nextActionIndex = currentActionIndex + 1;
context.executeAction(currentAction, new LifecycleAction.Listener() {
executeAction(context, indexName, currentActionIndex, currentAction);
}
}
private void executeAction(IndexLifecycleContext context, String indexName, int actionIndex, LifecycleAction action) {
String actionName = action.getWriteableName();
context.executeAction(action, new LifecycleAction.Listener() {
@Override
public void onSuccess(boolean completed) {
if (completed) {
logger.info("Action [" + currentActionName + "] for index [" + indexName + "] complete, moving to next action");
moveToAction(context, indexName, nextActionIndex);
logger.info("Action [" + actionName + "] for index [" + indexName + "] complete, moving to next action");
// Since we completed the current action move to the next action
moveToAction(context, indexName, actionIndex + 1);
} else {
logger.info("Action [" + currentActionName + "] for index [" + indexName
+ "] executed sucessfully but is not yet complete");
logger.info("Action [" + actionName + "] for index [" + indexName + "] executed sucessfully but is not yet complete");
}
}
@Override
public void onFailure(Exception e) {
logger.info("Action [" + currentActionName + "] for index [" + indexName + "] failed", e);
logger.info("Action [" + actionName + "] for index [" + indexName + "] failed", e);
}
});
}
}
private void moveToAction(IndexLifecycleContext context, String indexName, final int nextActionIndex) {
if (nextActionIndex < actions.size()) {
@ -175,6 +206,7 @@ public class Phase implements ToXContentObject, Writeable {
public void onSuccess() {
logger.info("Successfully initialised action [" + nextAction.getWriteableName() + "] in phase [" + getName()
+ "] for index [" + indexName + "]");
// We might as well execute the new action now rather than waiting for execute to be called again
execute(context);
}
@ -185,6 +217,7 @@ public class Phase implements ToXContentObject, Writeable {
}
});
} else {
// There is no next action so set the action to PHASE_COMPLETED
context.setAction(Phase.PHASE_COMPLETED, new Listener() {
@Override