Started writing/reading phase and action settings

This commit is contained in:
Colin Goodheart-Smithe 2017-11-22 13:26:05 +00:00
parent a26a5969dd
commit 3f0cf05dcc
4 changed files with 79 additions and 9 deletions

View File

@ -52,7 +52,7 @@ public class DeleteAction extends LifecycleAction {
} }
@Override @Override
protected void execute(Client client, Index index) { protected void execute(Index index, Client client) {
client.admin().indices().prepareDelete(index.getName()).execute(new ActionListener<DeleteIndexResponse>() { client.admin().indices().prepareDelete(index.getName()).execute(new ActionListener<DeleteIndexResponse>() {
@Override @Override
public void onResponse(DeleteIndexResponse deleteIndexResponse) { public void onResponse(DeleteIndexResponse deleteIndexResponse) {

View File

@ -12,6 +12,6 @@ import org.elasticsearch.index.Index;
public abstract class LifecycleAction implements ToXContentObject, NamedWriteable { public abstract class LifecycleAction implements ToXContentObject, NamedWriteable {
protected abstract void execute(Client client, Index index); protected abstract void execute(Index index, Client client);
} }

View File

@ -5,13 +5,19 @@
*/ */
package org.elasticsearch.xpack.indexlifecycle; package org.elasticsearch.xpack.indexlifecycle;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse;
import org.elasticsearch.cluster.AbstractDiffable; import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.ToXContentObject;
@ -23,6 +29,7 @@ import java.io.IOException;
import java.util.List; import java.util.List;
public class LifecyclePolicy extends AbstractDiffable<LifecyclePolicy> implements ToXContentObject, Writeable { public class LifecyclePolicy extends AbstractDiffable<LifecyclePolicy> implements ToXContentObject, Writeable {
private static final Logger logger = ESLoggerFactory.getLogger(LifecyclePolicy.class);
public static final ParseField PHASES_FIELD = new ParseField("phases"); public static final ParseField PHASES_FIELD = new ParseField("phases");
@ -41,8 +48,8 @@ public class LifecyclePolicy extends AbstractDiffable<LifecyclePolicy> implement
return PARSER.apply(parser, context); return PARSER.apply(parser, context);
} }
private String name; private final String name;
private List<Phase> phases; private final List<Phase> phases;
public LifecyclePolicy(String name, List<Phase> phases) { public LifecyclePolicy(String name, List<Phase> phases) {
this.name = name; this.name = name;
@ -82,5 +89,31 @@ public class LifecyclePolicy extends AbstractDiffable<LifecyclePolicy> implement
} }
public void execute(IndexMetaData idxMeta, InternalClient client) { public void execute(IndexMetaData idxMeta, InternalClient client) {
String currentPhaseName = IndexLifecycle.LIFECYCLE_TIMESERIES_PHASE_SETTING.get(idxMeta.getSettings());
String indexName = idxMeta.getIndex().getName();
if (Strings.isNullOrEmpty(currentPhaseName)) {
String firstPhaseName = phases.get(0).getName();
client.admin().indices().prepareUpdateSettings(indexName)
.setSettings(Settings.builder().put(IndexLifecycle.LIFECYCLE_TIMESERIES_PHASE_SETTING.getKey(), firstPhaseName))
.execute(new ActionListener<UpdateSettingsResponse>() {
@Override
public void onResponse(UpdateSettingsResponse response) {
if (response.isAcknowledged() == false) {
logger.info("Successfully initialised phase [" + firstPhaseName + "] for index [" + indexName + "]");
}
}
@Override
public void onFailure(Exception e) {
logger.error("Failed to initialised phase [" + firstPhaseName + "] for index [" + indexName + "]", e);
}
});
} else {
Phase currentPhase = phases.stream().filter(phase -> phase.getName().equals(currentPhaseName)).findAny()
.orElseThrow(() -> new IllegalStateException("Current phase [" + currentPhaseName + "] not found in lifecycle ["
+ getName() + "] for index [" + indexName + "]"));
currentPhase.execute(idxMeta, client);
}
} }
} }

View File

@ -5,12 +5,18 @@
*/ */
package org.elasticsearch.xpack.indexlifecycle; package org.elasticsearch.xpack.indexlifecycle;
import org.elasticsearch.client.Client; import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@ -18,12 +24,15 @@ import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.Index; import org.elasticsearch.xpack.security.InternalClient;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
public class Phase implements ToXContentObject, Writeable { public class Phase implements ToXContentObject, Writeable {
private static final String PHASE_COMPLETED = "PHASE COMPLETED";
private static final Logger logger = ESLoggerFactory.getLogger(Phase.class);
public static final ParseField AFTER_FIELD = new ParseField("after"); public static final ParseField AFTER_FIELD = new ParseField("after");
public static final ParseField ACTIONS_FIELD = new ParseField("actions"); public static final ParseField ACTIONS_FIELD = new ParseField("actions");
@ -79,9 +88,37 @@ public class Phase implements ToXContentObject, Writeable {
return actions; return actions;
} }
protected void performActions(Client client, Index index) { protected void execute(IndexMetaData idxMeta, InternalClient client) {
for (LifecycleAction action : actions) { String currentActionName = IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.get(idxMeta.getSettings());
action.execute(client, index); String indexName = idxMeta.getIndex().getName();
if (Strings.isNullOrEmpty(currentActionName)) {
String firstPhaseName;
if (actions.isEmpty()) {
firstPhaseName = PHASE_COMPLETED;
} else {
firstPhaseName = actions.get(0).getWriteableName();
}
client.admin().indices().prepareUpdateSettings(indexName)
.setSettings(Settings.builder().put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), firstPhaseName))
.execute(new ActionListener<UpdateSettingsResponse>() {
@Override
public void onResponse(UpdateSettingsResponse response) {
if (response.isAcknowledged() == false) {
logger.info("Successfully initialised phase [" + firstPhaseName + "] for index [" + indexName + "]");
}
}
@Override
public void onFailure(Exception e) {
logger.error("Failed to initialised phase [" + firstPhaseName + "] for index [" + indexName + "]", e);
}
});
} else if (currentActionName.equals(PHASE_COMPLETED) == false) {
LifecycleAction currentAction = actions.stream().filter(action -> action.getWriteableName().equals(currentActionName)).findAny()
.orElseThrow(() -> new IllegalStateException("Current action [" + currentActionName + "] not found in phase ["
+ getName() + "] for index [" + indexName + "]"));
currentAction.execute(idxMeta.getIndex(), client);
} }
} }