Adds PUT/GET/DELETE lifecycle APIs
The lifecycles are stored as custom metadata objects in the cluster state. This change also cleans up the parsing of the lifecycle state so that it can be parsed properly
This commit is contained in:
parent
1fad59b295
commit
3f3f7b9b47
|
@ -9,34 +9,50 @@ import org.apache.logging.log4j.Logger;
|
|||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.logging.ESLoggerFactory;
|
||||
import org.elasticsearch.common.xcontent.ObjectParser;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.index.Index;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class DeleteAction extends Action {
|
||||
public class DeleteAction extends LifecycleAction {
|
||||
public static final String NAME = "delete";
|
||||
|
||||
private static final Logger logger = ESLoggerFactory.getLogger(DeleteAction.class);
|
||||
private static final ObjectParser<DeleteAction, Void> PARSER = new ObjectParser<>(NAME, DeleteAction::new);
|
||||
|
||||
public static final ParseField INDEX_FIELD = new ParseField("index");
|
||||
public static DeleteAction parse(XContentParser parser) {
|
||||
return PARSER.apply(parser, null);
|
||||
}
|
||||
|
||||
private Index index;
|
||||
public DeleteAction() {
|
||||
}
|
||||
|
||||
public DeleteAction(Index index) {
|
||||
this.index = index;
|
||||
public DeleteAction(StreamInput in) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getWriteableName() {
|
||||
return NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field(INDEX_FIELD.getPreferredName(), index.getName());
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void execute(Client client) {
|
||||
protected void execute(Client client, Index index) {
|
||||
client.admin().indices().prepareDelete(index.getName()).execute(new ActionListener<DeleteIndexResponse>() {
|
||||
@Override
|
||||
public void onResponse(DeleteIndexResponse deleteIndexResponse) {
|
||||
|
|
|
@ -9,12 +9,15 @@ import org.apache.logging.log4j.Logger;
|
|||
import org.apache.lucene.util.SetOnce;
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.cluster.NamedDiff;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.inject.Module;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.logging.ESLoggerFactory;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.IndexScopedSettings;
|
||||
|
@ -22,16 +25,19 @@ import org.elasticsearch.common.settings.Setting;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.settings.SettingsFilter;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexModule;
|
||||
import org.elasticsearch.plugins.ActionPlugin;
|
||||
import org.elasticsearch.plugins.ActionPlugin.ActionHandler;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.rest.RestController;
|
||||
import org.elasticsearch.rest.RestHandler;
|
||||
import org.elasticsearch.threadpool.ExecutorBuilder;
|
||||
import org.elasticsearch.threadpool.FixedExecutorBuilder;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.XPackPlugin;
|
||||
import org.elasticsearch.xpack.XPackSettings;
|
||||
import org.elasticsearch.xpack.indexlifecycle.action.DeleteLifecycleAction;
|
||||
import org.elasticsearch.xpack.indexlifecycle.action.GetLifecycleAction;
|
||||
import org.elasticsearch.xpack.indexlifecycle.action.PutLifecycleAction;
|
||||
import org.elasticsearch.xpack.indexlifecycle.action.RestDeleteLifecycleAction;
|
||||
import org.elasticsearch.xpack.indexlifecycle.action.RestGetLifecycleAction;
|
||||
import org.elasticsearch.xpack.indexlifecycle.action.RestPutLifecycleAction;
|
||||
import org.elasticsearch.xpack.security.InternalClient;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -53,9 +59,9 @@ public class IndexLifecycle extends Plugin {
|
|||
private boolean enabled;
|
||||
private boolean transportClientMode;
|
||||
|
||||
public static final Setting LIFECYCLE_TIMESERIES_NAME_SETTING = Setting.simpleString("index.lifecycle.name",
|
||||
public static final Setting<String> LIFECYCLE_TIMESERIES_NAME_SETTING = Setting.simpleString("index.lifecycle.name",
|
||||
Setting.Property.Dynamic, Setting.Property.IndexScope);
|
||||
public static final Setting LIFECYCLE_TIMESERIES_PHASE_SETTING = Setting.simpleString("index.lifecycle.phase",
|
||||
public static final Setting<String> LIFECYCLE_TIMESERIES_PHASE_SETTING = Setting.simpleString("index.lifecycle.phase",
|
||||
Setting.Property.Dynamic, Setting.Property.IndexScope);
|
||||
|
||||
public IndexLifecycle(Settings settings) {
|
||||
|
@ -81,8 +87,10 @@ public class IndexLifecycle extends Plugin {
|
|||
return Arrays.asList(LIFECYCLE_TIMESERIES_NAME_SETTING, LIFECYCLE_TIMESERIES_PHASE_SETTING);
|
||||
}
|
||||
|
||||
public Collection<Object> createComponents(InternalClient internalClient, ClusterService clusterService, Clock clock) {
|
||||
indexLifecycleInitialisationService.set(new IndexLifecycleInitialisationService(settings, internalClient, clusterService, clock));
|
||||
public Collection<Object> createComponents(InternalClient internalClient, ClusterService clusterService, Clock clock,
|
||||
ThreadPool threadPool) {
|
||||
indexLifecycleInitialisationService
|
||||
.set(new IndexLifecycleInitialisationService(settings, internalClient, clusterService, clock, threadPool));
|
||||
return Collections.singletonList(indexLifecycleInitialisationService.get());
|
||||
}
|
||||
|
||||
|
@ -91,4 +99,42 @@ public class IndexLifecycle extends Plugin {
|
|||
indexLifecycleInitialisationService.get().close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Entry> getNamedWriteables() {
|
||||
return Arrays.asList(
|
||||
// Custom metadata
|
||||
new NamedWriteableRegistry.Entry(MetaData.Custom.class, IndexLifecycleMetadata.TYPE, IndexLifecycleMetadata::new),
|
||||
new NamedWriteableRegistry.Entry(NamedDiff.class, IndexLifecycleMetadata.TYPE,
|
||||
IndexLifecycleMetadata.IndexLifecycleMetadataDiff::new),
|
||||
|
||||
// Lifecycle actions
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, DeleteAction.NAME, DeleteAction::new));
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<org.elasticsearch.common.xcontent.NamedXContentRegistry.Entry> getNamedXContent() {
|
||||
return Arrays.asList(
|
||||
// Custom metadata
|
||||
new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(IndexLifecycleMetadata.TYPE),
|
||||
parser -> IndexLifecycleMetadata.PARSER.parse(parser, null)),
|
||||
// Lifecycle actions
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(DeleteAction.NAME), DeleteAction::parse));
|
||||
}
|
||||
|
||||
public List<RestHandler> getRestHandlers(Settings settings, RestController restController, ClusterSettings clusterSettings,
|
||||
IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
Supplier<DiscoveryNodes> nodesInCluster) {
|
||||
return Arrays.asList(
|
||||
new RestPutLifecycleAction(settings, restController),
|
||||
new RestGetLifecycleAction(settings, restController),
|
||||
new RestDeleteLifecycleAction(settings, restController));
|
||||
}
|
||||
|
||||
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
|
||||
return Arrays.asList(
|
||||
new ActionHandler<>(PutLifecycleAction.INSTANCE, PutLifecycleAction.TransportAction.class),
|
||||
new ActionHandler<>(GetLifecycleAction.INSTANCE, GetLifecycleAction.TransportAction.class),
|
||||
new ActionHandler<>(DeleteLifecycleAction.INSTANCE, DeleteLifecycleAction.TransportAction.class));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -7,12 +7,17 @@ package org.elasticsearch.xpack.indexlifecycle;
|
|||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.lucene.util.SetOnce;
|
||||
import org.elasticsearch.cluster.LocalNodeMasterListener;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.logging.ESLoggerFactory;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.ml.MlMetadata;
|
||||
import org.elasticsearch.xpack.scheduler.SchedulerEngine;
|
||||
import org.elasticsearch.xpack.security.InternalClient;
|
||||
|
||||
|
@ -23,36 +28,37 @@ import java.time.Clock;
|
|||
import static org.elasticsearch.xpack.indexlifecycle.IndexLifecycle.LIFECYCLE_TIMESERIES_NAME_SETTING;
|
||||
import static org.elasticsearch.xpack.indexlifecycle.IndexLifecycle.NAME;
|
||||
|
||||
public class IndexLifecycleInitialisationService extends AbstractComponent implements LocalNodeMasterListener, SchedulerEngine.Listener, Closeable {
|
||||
public class IndexLifecycleInitialisationService extends AbstractComponent
|
||||
implements ClusterStateListener, SchedulerEngine.Listener, Closeable {
|
||||
private static final Logger logger = ESLoggerFactory.getLogger(IndexLifecycleInitialisationService.class);
|
||||
|
||||
private final SetOnce<SchedulerEngine> scheduler = new SetOnce<>();
|
||||
private final Clock clock;
|
||||
private InternalClient client;
|
||||
private ClusterService clusterService;
|
||||
private ThreadPool threadPool;
|
||||
|
||||
public IndexLifecycleInitialisationService(Settings settings, InternalClient client, ClusterService clusterService, Clock clock) {
|
||||
public IndexLifecycleInitialisationService(Settings settings, InternalClient client, ClusterService clusterService, Clock clock,
|
||||
ThreadPool threadPool) {
|
||||
super(settings);
|
||||
this.client = client;
|
||||
this.clusterService = clusterService;
|
||||
this.clock = clock;
|
||||
clusterService.addLocalNodeMasterListener(this);
|
||||
this.threadPool = threadPool;
|
||||
clusterService.addListener(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executorName() {
|
||||
return ThreadPool.Names.MANAGEMENT;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMaster() {
|
||||
scheduler.set(new SchedulerEngine(clock));
|
||||
scheduler.get().register(this);
|
||||
scheduler.get().add(new SchedulerEngine.Job(NAME, ((startTime, now) -> now + 1000)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void offMaster() {
|
||||
public void clusterChanged(ClusterChangedEvent event) {
|
||||
if (event.localNodeMaster()) {
|
||||
MetaData metaData = event.state().metaData();
|
||||
installMlMetadata(metaData);
|
||||
if (scheduler.get() == null) {
|
||||
scheduler.set(new SchedulerEngine(clock));
|
||||
scheduler.get().register(this);
|
||||
scheduler.get().add(new SchedulerEngine.Job(NAME, ((startTime, now) -> now + 1000)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -66,6 +72,31 @@ public class IndexLifecycleInitialisationService extends AbstractComponent imple
|
|||
});
|
||||
}
|
||||
|
||||
private void installMlMetadata(MetaData metaData) {
|
||||
if (metaData.custom(MlMetadata.TYPE) == null) {
|
||||
threadPool.executor(ThreadPool.Names.GENERIC)
|
||||
.execute(() -> clusterService.submitStateUpdateTask("install-index-lifecycle-metadata", new ClusterStateUpdateTask() {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
// If the metadata has been added already don't try to update
|
||||
if (currentState.metaData().custom(IndexLifecycleMetadata.TYPE) != null) {
|
||||
return currentState;
|
||||
}
|
||||
ClusterState.Builder builder = new ClusterState.Builder(currentState);
|
||||
MetaData.Builder metadataBuilder = MetaData.builder(currentState.metaData());
|
||||
metadataBuilder.putCustom(IndexLifecycleMetadata.TYPE, IndexLifecycleMetadata.EMPTY_METADATA);
|
||||
builder.metaData(metadataBuilder.build());
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
logger.error("unable to install index lifecycle metadata", e);
|
||||
}
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
SchedulerEngine engine = scheduler.get();
|
||||
|
|
|
@ -0,0 +1,148 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.AbstractDiffable;
|
||||
import org.elasticsearch.cluster.Diff;
|
||||
import org.elasticsearch.cluster.DiffableUtils;
|
||||
import org.elasticsearch.cluster.NamedDiff;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData.Custom;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.Map;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
|
||||
public class IndexLifecycleMetadata implements MetaData.Custom {
|
||||
public static final String TYPE = "index_lifecycle";
|
||||
public static final ParseField POLICIES_FIELD = new ParseField("policies");
|
||||
public static final ParseField POLL_INTERVAL_FIELD = new ParseField("poll_interval");
|
||||
|
||||
public static final IndexLifecycleMetadata EMPTY_METADATA = new IndexLifecycleMetadata(Collections.emptySortedMap(), 300000);
|
||||
@SuppressWarnings("unchecked")
|
||||
public static final ConstructingObjectParser<IndexLifecycleMetadata, NamedXContentRegistry> PARSER = new ConstructingObjectParser<>(
|
||||
TYPE, a -> new IndexLifecycleMetadata((SortedMap<String, LifecyclePolicy>) a[0], (long) a[1]));
|
||||
static {
|
||||
PARSER.declareNamedObjects(ConstructingObjectParser.constructorArg(), (p, c, n) -> LifecyclePolicy.parse(p, new Tuple<>(n, c)),
|
||||
v -> {
|
||||
throw new IllegalArgumentException("ordered " + POLICIES_FIELD.getPreferredName() + " are not supported");
|
||||
}, POLICIES_FIELD);
|
||||
PARSER.declareLong(ConstructingObjectParser.constructorArg(), POLL_INTERVAL_FIELD);
|
||||
}
|
||||
|
||||
private final SortedMap<String, LifecyclePolicy> policies;
|
||||
private final long pollInterval;
|
||||
|
||||
public IndexLifecycleMetadata(SortedMap<String, LifecyclePolicy> policies, long pollInterval) {
|
||||
this.policies = Collections.unmodifiableSortedMap(policies);
|
||||
this.pollInterval = pollInterval;
|
||||
}
|
||||
|
||||
public IndexLifecycleMetadata(StreamInput in) throws IOException {
|
||||
int size = in.readVInt();
|
||||
TreeMap<String, LifecyclePolicy> policies = new TreeMap<>();
|
||||
for (int i = 0; i < size; i++) {
|
||||
policies.put(in.readString(), new LifecyclePolicy(in));
|
||||
}
|
||||
this.policies = policies;
|
||||
this.pollInterval = in.readVLong();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeVInt(policies.size());
|
||||
for (Map.Entry<String, LifecyclePolicy> entry : policies.entrySet()) {
|
||||
out.writeString(entry.getKey());
|
||||
entry.getValue().writeTo(out);
|
||||
}
|
||||
out.writeVLong(pollInterval);
|
||||
}
|
||||
|
||||
public SortedMap<String, LifecyclePolicy> getPolicies() {
|
||||
return policies;
|
||||
}
|
||||
|
||||
public long getPollInterval() {
|
||||
return pollInterval;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Diff<Custom> diff(Custom previousState) {
|
||||
return new IndexLifecycleMetadataDiff((IndexLifecycleMetadata) previousState, this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.field(POLICIES_FIELD.getPreferredName(), policies);
|
||||
builder.field(POLL_INTERVAL_FIELD.getPreferredName(), pollInterval);
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Version getMinimalSupportedVersion() {
|
||||
return Version.V_7_0_0_alpha1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getWriteableName() {
|
||||
return TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public EnumSet<MetaData.XContentContext> context() {
|
||||
return MetaData.ALL_CONTEXTS;
|
||||
}
|
||||
|
||||
public static class IndexLifecycleMetadataDiff implements NamedDiff<MetaData.Custom> {
|
||||
|
||||
final Diff<Map<String, LifecyclePolicy>> policies;
|
||||
final Long pollInterval;
|
||||
|
||||
IndexLifecycleMetadataDiff(IndexLifecycleMetadata before, IndexLifecycleMetadata after) {
|
||||
this.policies = DiffableUtils.diff(before.policies, after.policies, DiffableUtils.getStringKeySerializer());
|
||||
this.pollInterval = after.pollInterval;
|
||||
}
|
||||
|
||||
public IndexLifecycleMetadataDiff(StreamInput in) throws IOException {
|
||||
this.policies = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), LifecyclePolicy::new,
|
||||
IndexLifecycleMetadataDiff::readLifecyclePolicyDiffFrom);
|
||||
this.pollInterval = in.readVLong();
|
||||
}
|
||||
|
||||
@Override
|
||||
public MetaData.Custom apply(MetaData.Custom part) {
|
||||
TreeMap<String, LifecyclePolicy> newPolicies = new TreeMap<>(policies.apply(((IndexLifecycleMetadata) part).policies));
|
||||
long pollInterval = ((IndexLifecycleMetadata) part).pollInterval;
|
||||
return new IndexLifecycleMetadata(newPolicies, pollInterval);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
policies.writeTo(out);
|
||||
out.writeVLong(pollInterval);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getWriteableName() {
|
||||
return TYPE;
|
||||
}
|
||||
|
||||
static Diff<LifecyclePolicy> readLifecyclePolicyDiffFrom(StreamInput in) throws IOException {
|
||||
return AbstractDiffable.readDiffFrom(LifecyclePolicy::new, in);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -6,10 +6,12 @@
|
|||
package org.elasticsearch.xpack.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteable;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.index.Index;
|
||||
|
||||
public abstract class Action implements ToXContentObject {
|
||||
public abstract class LifecycleAction implements ToXContentObject, NamedWriteable {
|
||||
|
||||
protected abstract void execute(Client client);
|
||||
protected abstract void execute(Client client, Index index);
|
||||
|
||||
}
|
|
@ -5,27 +5,76 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.cluster.AbstractDiffable;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
public class LifecyclePolicy implements ToXContentObject {
|
||||
public class LifecyclePolicy extends AbstractDiffable<LifecyclePolicy> implements ToXContentObject, Writeable {
|
||||
|
||||
public static final ParseField PHASES_FIELD = new ParseField("phases");
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static ConstructingObjectParser<LifecyclePolicy, Tuple<String, NamedXContentRegistry>> PARSER = new ConstructingObjectParser<>(
|
||||
"lifecycle_policy", false, (a, c) -> new LifecyclePolicy(c.v1(), (List<Phase>) a[0]));
|
||||
|
||||
static {
|
||||
PARSER.declareNamedObjects(ConstructingObjectParser.constructorArg(), (p, c, n) -> Phase.parse(p, new Tuple<>(n, c.v2())),
|
||||
v -> {
|
||||
throw new IllegalArgumentException("ordered " + PHASES_FIELD.getPreferredName() + " are not supported");
|
||||
}, PHASES_FIELD);
|
||||
}
|
||||
|
||||
public static LifecyclePolicy parse(XContentParser parser, Tuple<String, NamedXContentRegistry> context) {
|
||||
return PARSER.apply(parser, context);
|
||||
}
|
||||
|
||||
private String name;
|
||||
private List<Phase> phases;
|
||||
|
||||
public LifecyclePolicy(List<Phase> phases) {
|
||||
public LifecyclePolicy(String name, List<Phase> phases) {
|
||||
this.name = name;
|
||||
this.phases = phases;
|
||||
}
|
||||
|
||||
public LifecyclePolicy(StreamInput in) throws IOException {
|
||||
name = in.readString();
|
||||
phases = in.readList(Phase::new);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeString(name);
|
||||
out.writeList(phases);
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
public List<Phase> getPhases() {
|
||||
return phases;
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.array(PHASES_FIELD.getPreferredName(), phases);
|
||||
builder.startObject(PHASES_FIELD.getPreferredName());
|
||||
for (Phase phase : phases) {
|
||||
builder.field(phase.getName(), phase);
|
||||
}
|
||||
builder.endObject();
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
|
|
@ -7,74 +7,93 @@ package org.elasticsearch.xpack.indexlifecycle;
|
|||
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.logging.ESLoggerFactory;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.xpack.scheduler.SchedulerEngine;
|
||||
import org.elasticsearch.xpack.scheduler.SchedulerEngine.Schedule;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
public class Phase extends SchedulerEngine.Job implements ToXContentObject {
|
||||
public class Phase implements ToXContentObject, Writeable {
|
||||
|
||||
public static final ParseField NAME_FIELD = new ParseField("name");
|
||||
public static final ParseField ID_FIELD = new ParseField("id");
|
||||
public static final ParseField ACTIONS_FIELD = new ParseField("actions");
|
||||
public static final ParseField AFTER_FIELD = new ParseField("after");
|
||||
public static final ParseField ACTIONS_FIELD = new ParseField("actions");
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static final ConstructingObjectParser<Phase, Tuple<String, NamedXContentRegistry>> PARSER = new ConstructingObjectParser<>(
|
||||
"phase", false, (a, c) -> new Phase(c.v1(), (TimeValue) a[0], (List<LifecycleAction>) a[1]));
|
||||
static {
|
||||
PARSER.declareField(ConstructingObjectParser.constructorArg(),
|
||||
(p, c) -> TimeValue.parseTimeValue(p.text(), AFTER_FIELD.getPreferredName()), AFTER_FIELD, ValueType.VALUE);
|
||||
PARSER.declareNamedObjects(ConstructingObjectParser.constructorArg(),
|
||||
(p, c, n) -> c.v2().parseNamedObject(LifecycleAction.class, n, p, c.v2()), v -> {
|
||||
throw new IllegalArgumentException("ordered " + ACTIONS_FIELD.getPreferredName() + " are not supported");
|
||||
}, ACTIONS_FIELD);
|
||||
}
|
||||
|
||||
public static Phase parse(XContentParser parser, Tuple<String, NamedXContentRegistry> context) {
|
||||
return PARSER.apply(parser, context);
|
||||
}
|
||||
|
||||
private String name;
|
||||
private List<Action> actions;
|
||||
private Client client;
|
||||
private List<LifecycleAction> actions;
|
||||
private TimeValue after;
|
||||
|
||||
public Phase(String name, Index index, long creationDate, TimeValue after, List<Action> actions, Client client) {
|
||||
super(index.getName() + "-" + name, getSchedule(creationDate, after));
|
||||
public Phase(String name, TimeValue after, List<LifecycleAction> actions) {
|
||||
this.name = name;
|
||||
this.client = client;
|
||||
this.after = after;
|
||||
this.actions = actions;
|
||||
}
|
||||
|
||||
public Phase(StreamInput in) throws IOException {
|
||||
this.name = in.readString();
|
||||
this.after = new TimeValue(in);
|
||||
this.actions = in.readNamedWriteableList(LifecycleAction.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeString(name);
|
||||
after.writeTo(out);
|
||||
out.writeNamedWriteableList(actions);
|
||||
}
|
||||
|
||||
public TimeValue getAfter() {
|
||||
return after;
|
||||
}
|
||||
|
||||
private static Schedule getSchedule(long creationDate, TimeValue after) {
|
||||
SchedulerEngine.Schedule schedule = (startTime, now) -> {
|
||||
ESLoggerFactory.getLogger("INDEX-LIFECYCLE-PLUGIN")
|
||||
.error("calculating schedule with creationTime:" + creationDate + ", and now:" + now);
|
||||
if (startTime == now) {
|
||||
return creationDate + after.getMillis();
|
||||
} else {
|
||||
return -1; // do not schedule another delete after already deleted
|
||||
}
|
||||
};
|
||||
return schedule;
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
public Phase(String name, List<Action> actions, Schedule schedule, Client client) {
|
||||
super(name, schedule);
|
||||
this.name = name;
|
||||
this.actions = actions;
|
||||
this.client = client;
|
||||
public List<LifecycleAction> getActions() {
|
||||
return actions;
|
||||
}
|
||||
|
||||
protected void performActions() {
|
||||
for (Action action : actions) {
|
||||
action.execute(client);
|
||||
protected void performActions(Client client, Index index) {
|
||||
for (LifecycleAction action : actions) {
|
||||
action.execute(client, index);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field(NAME_FIELD.getPreferredName(), name);
|
||||
builder.field(ID_FIELD.getPreferredName(), name);
|
||||
builder.field(AFTER_FIELD.getPreferredName(), after);
|
||||
builder.array(ACTIONS_FIELD.getPreferredName(), actions);
|
||||
builder.startObject(ACTIONS_FIELD.getPreferredName());
|
||||
for (LifecycleAction action : actions) {
|
||||
builder.field(action.getWriteableName(), action);
|
||||
}
|
||||
builder.endObject();
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,175 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.indexlifecycle.action;
|
||||
|
||||
import org.elasticsearch.ResourceNotFoundException;
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequestBuilder;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedRequest;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.indexlifecycle.IndexLifecycleMetadata;
|
||||
import org.elasticsearch.xpack.indexlifecycle.LifecycleAction;
|
||||
import org.elasticsearch.xpack.indexlifecycle.LifecyclePolicy;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
|
||||
public class DeleteLifecycleAction
|
||||
extends Action<DeleteLifecycleAction.Request, DeleteLifecycleAction.Response, DeleteLifecycleAction.RequestBuilder> {
|
||||
public static final DeleteLifecycleAction INSTANCE = new DeleteLifecycleAction();
|
||||
public static final String NAME = "cluster:admin/xpack/indexlifecycle/delete";
|
||||
|
||||
protected DeleteLifecycleAction() {
|
||||
super(NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
|
||||
return new RequestBuilder(client, this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Response newResponse() {
|
||||
return new Response();
|
||||
}
|
||||
|
||||
public static class RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder> {
|
||||
|
||||
protected RequestBuilder(ElasticsearchClient client, Action<Request, Response, RequestBuilder> action) {
|
||||
super(client, action, new Request());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class Response extends AcknowledgedResponse implements ToXContentObject {
|
||||
|
||||
public Response() {
|
||||
}
|
||||
|
||||
public Response(boolean acknowledged) {
|
||||
super(acknowledged);
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
addAcknowledgedField(builder);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class Request extends AcknowledgedRequest<Request> {
|
||||
|
||||
public static final ParseField POLICY_FIELD = new ParseField("policy");
|
||||
|
||||
private String policyName;
|
||||
|
||||
public Request(String policyName) {
|
||||
this.policyName = policyName;
|
||||
}
|
||||
|
||||
Request() {
|
||||
}
|
||||
|
||||
public String getPolicyName() {
|
||||
return policyName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionRequestValidationException validate() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
policyName = in.readString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeString(policyName);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class TransportAction extends TransportMasterNodeAction<Request, Response> {
|
||||
|
||||
@Inject
|
||||
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
super(settings, DeleteLifecycleAction.NAME, transportService, clusterService, threadPool, actionFilters,
|
||||
indexNameExpressionResolver, Request::new);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Response newResponse() {
|
||||
return new Response();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
|
||||
clusterService.submitStateUpdateTask("delete-lifecycle-" + request.getPolicyName(),
|
||||
new AckedClusterStateUpdateTask<Response>(request, listener) {
|
||||
@Override
|
||||
protected Response newResponse(boolean acknowledged) {
|
||||
return new Response(acknowledged);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
ClusterState.Builder newState = ClusterState.builder(currentState);
|
||||
IndexLifecycleMetadata currentMetadata = currentState.metaData().custom(IndexLifecycleMetadata.TYPE);
|
||||
if (currentMetadata.getPolicies().containsKey(request.getPolicyName()) == false) {
|
||||
throw new ResourceNotFoundException("Lifecycle policy not found: {}", request.getPolicyName());
|
||||
}
|
||||
SortedMap<String, LifecyclePolicy> newPolicies = new TreeMap<>(currentMetadata.getPolicies());
|
||||
newPolicies.remove(request.getPolicyName());
|
||||
IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata(newPolicies, currentMetadata.getPollInterval());
|
||||
newState.metaData(MetaData.builder(currentState.getMetaData())
|
||||
.putCustom(IndexLifecycleMetadata.TYPE, newMetadata).build());
|
||||
return newState.build();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
|
||||
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,160 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.indexlifecycle.action;
|
||||
|
||||
import org.elasticsearch.ResourceNotFoundException;
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequestBuilder;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedRequest;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.indexlifecycle.IndexLifecycleMetadata;
|
||||
import org.elasticsearch.xpack.indexlifecycle.LifecyclePolicy;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class GetLifecycleAction
|
||||
extends Action<GetLifecycleAction.Request, GetLifecycleAction.Response, GetLifecycleAction.RequestBuilder> {
|
||||
public static final GetLifecycleAction INSTANCE = new GetLifecycleAction();
|
||||
public static final String NAME = "cluster:admin/xpack/indexlifecycle/get";
|
||||
|
||||
protected GetLifecycleAction() {
|
||||
super(NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
|
||||
return new RequestBuilder(client, this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Response newResponse() {
|
||||
return new Response();
|
||||
}
|
||||
|
||||
public static class RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder> {
|
||||
|
||||
protected RequestBuilder(ElasticsearchClient client, Action<Request, Response, RequestBuilder> action) {
|
||||
super(client, action, new Request());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class Response extends ActionResponse implements ToXContentObject {
|
||||
|
||||
private final LifecyclePolicy policy;
|
||||
|
||||
public Response() {
|
||||
this(null);
|
||||
}
|
||||
|
||||
public Response(LifecyclePolicy policy) {
|
||||
this.policy = policy;
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
policy.toXContent(builder, params);
|
||||
return builder;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class Request extends AcknowledgedRequest<Request> {
|
||||
|
||||
public static final ParseField POLICY_FIELD = new ParseField("policy");
|
||||
|
||||
private String policyName;
|
||||
|
||||
public Request(String policyName) {
|
||||
this.policyName = policyName;
|
||||
}
|
||||
|
||||
Request() {
|
||||
}
|
||||
|
||||
public String getPolicyName() {
|
||||
return policyName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionRequestValidationException validate() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
policyName = in.readString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeString(policyName);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class TransportAction extends TransportMasterNodeAction<Request, Response> {
|
||||
|
||||
@Inject
|
||||
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
super(settings, GetLifecycleAction.NAME, transportService, clusterService, threadPool, actionFilters,
|
||||
indexNameExpressionResolver, Request::new);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Response newResponse() {
|
||||
return new Response();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
|
||||
IndexLifecycleMetadata metadata = clusterService.state().metaData().custom(IndexLifecycleMetadata.TYPE);
|
||||
if (metadata == null) {
|
||||
listener.onFailure(new ResourceNotFoundException("Lifecycle policy not found: {}", request.getPolicyName()));
|
||||
} else {
|
||||
LifecyclePolicy policy = metadata.getPolicies().get(request.getPolicyName());
|
||||
if (policy == null) {
|
||||
listener.onFailure(new ResourceNotFoundException("Lifecycle policy not found: {}", request.getPolicyName()));
|
||||
} else {
|
||||
listener.onResponse(new Response(policy));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
|
||||
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,196 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.indexlifecycle.action;
|
||||
|
||||
import org.elasticsearch.ResourceAlreadyExistsException;
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequestBuilder;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedRequest;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.indexlifecycle.IndexLifecycleMetadata;
|
||||
import org.elasticsearch.xpack.indexlifecycle.LifecyclePolicy;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
|
||||
public class PutLifecycleAction extends Action<PutLifecycleAction.Request, PutLifecycleAction.Response, PutLifecycleAction.RequestBuilder> {
|
||||
public static final PutLifecycleAction INSTANCE = new PutLifecycleAction();
|
||||
public static final String NAME = "cluster:admin/xpack/indexlifecycle/put";
|
||||
|
||||
protected PutLifecycleAction() {
|
||||
super(NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
|
||||
return new RequestBuilder(client, this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Response newResponse() {
|
||||
return new Response();
|
||||
}
|
||||
|
||||
public static class RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder> {
|
||||
|
||||
protected RequestBuilder(ElasticsearchClient client, Action<Request, Response, RequestBuilder> action) {
|
||||
super(client, action, new Request());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class Response extends AcknowledgedResponse implements ToXContentObject {
|
||||
|
||||
public Response() {
|
||||
}
|
||||
|
||||
public Response(boolean acknowledged) {
|
||||
super(acknowledged);
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
addAcknowledgedField(builder);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class Request extends AcknowledgedRequest<Request> implements ToXContentObject {
|
||||
|
||||
public static final ParseField POLICY_FIELD = new ParseField("policy");
|
||||
private static final ConstructingObjectParser<Request, Tuple<String, NamedXContentRegistry>> PARSER = new ConstructingObjectParser<>(
|
||||
"put_lifecycle_request", a -> new Request((LifecyclePolicy) a[0]));
|
||||
static {
|
||||
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> LifecyclePolicy.parse(p, c), POLICY_FIELD);
|
||||
}
|
||||
|
||||
private LifecyclePolicy policy;
|
||||
|
||||
public Request(LifecyclePolicy policy) {
|
||||
this.policy = policy;
|
||||
}
|
||||
|
||||
Request() {
|
||||
}
|
||||
|
||||
public LifecyclePolicy getPolicy() {
|
||||
return policy;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionRequestValidationException validate() {
|
||||
return null;
|
||||
}
|
||||
|
||||
public static Request parseRequest(String name, XContentParser parser, NamedXContentRegistry namedXContentRegistry) {
|
||||
return PARSER.apply(parser, new Tuple<String, NamedXContentRegistry>(name, namedXContentRegistry));
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field(POLICY_FIELD.getPreferredName(), policy);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
policy = new LifecyclePolicy(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
policy.writeTo(out);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class TransportAction extends TransportMasterNodeAction<Request, Response> {
|
||||
|
||||
@Inject
|
||||
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
super(settings, PutLifecycleAction.NAME, transportService, clusterService, threadPool, actionFilters,
|
||||
indexNameExpressionResolver,
|
||||
Request::new);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Response newResponse() {
|
||||
return new Response();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
|
||||
clusterService.submitStateUpdateTask("put-lifecycle-" + request.getPolicy().getName(),
|
||||
new AckedClusterStateUpdateTask<Response>(request, listener) {
|
||||
@Override
|
||||
protected Response newResponse(boolean acknowledged) {
|
||||
return new Response(acknowledged);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
ClusterState.Builder newState = ClusterState.builder(currentState);
|
||||
IndexLifecycleMetadata currentMetadata = currentState.metaData().custom(IndexLifecycleMetadata.TYPE);
|
||||
if (currentMetadata.getPolicies().containsKey(request.getPolicy().getName())) {
|
||||
throw new ResourceAlreadyExistsException("Lifecycle policy already exists: {}",
|
||||
request.getPolicy().getName());
|
||||
}
|
||||
SortedMap<String, LifecyclePolicy> newPolicies = new TreeMap<>(currentMetadata.getPolicies());
|
||||
newPolicies.put(request.getPolicy().getName(), request.getPolicy());
|
||||
IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata(newPolicies, currentMetadata.getPollInterval());
|
||||
newState.metaData(MetaData.builder(currentState.getMetaData())
|
||||
.putCustom(IndexLifecycleMetadata.TYPE, newMetadata).build());
|
||||
return newState.build();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
|
||||
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,40 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.indexlifecycle.action;
|
||||
|
||||
import org.elasticsearch.client.node.NodeClient;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.rest.BaseRestHandler;
|
||||
import org.elasticsearch.rest.RestController;
|
||||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.rest.action.RestToXContentListener;
|
||||
import org.elasticsearch.xpack.indexlifecycle.IndexLifecycle;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class RestDeleteLifecycleAction extends BaseRestHandler {
|
||||
|
||||
public RestDeleteLifecycleAction(Settings settings, RestController controller) {
|
||||
super(settings);
|
||||
controller.registerHandler(RestRequest.Method.DELETE,
|
||||
IndexLifecycle.BASE_PATH + "{name}", this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "xpack_lifecycle_delete_action";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
|
||||
String lifecycleName = restRequest.param("name");
|
||||
DeleteLifecycleAction.Request deleteLifecycleRequest = new DeleteLifecycleAction.Request(lifecycleName);
|
||||
deleteLifecycleRequest.timeout(restRequest.paramAsTime("timeout", deleteLifecycleRequest.timeout()));
|
||||
deleteLifecycleRequest.masterNodeTimeout(restRequest.paramAsTime("master_timeout", deleteLifecycleRequest.masterNodeTimeout()));
|
||||
|
||||
return channel -> client.execute(DeleteLifecycleAction.INSTANCE, deleteLifecycleRequest, new RestToXContentListener<>(channel));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,40 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.indexlifecycle.action;
|
||||
|
||||
import org.elasticsearch.client.node.NodeClient;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.rest.BaseRestHandler;
|
||||
import org.elasticsearch.rest.RestController;
|
||||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.rest.action.RestToXContentListener;
|
||||
import org.elasticsearch.xpack.indexlifecycle.IndexLifecycle;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class RestGetLifecycleAction extends BaseRestHandler {
|
||||
|
||||
public RestGetLifecycleAction(Settings settings, RestController controller) {
|
||||
super(settings);
|
||||
controller.registerHandler(RestRequest.Method.GET,
|
||||
IndexLifecycle.BASE_PATH + "{name}", this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "xpack_lifecycle_get_action";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
|
||||
String lifecycleName = restRequest.param("name");
|
||||
GetLifecycleAction.Request getLifecycleRequest = new GetLifecycleAction.Request(lifecycleName);
|
||||
getLifecycleRequest.timeout(restRequest.paramAsTime("timeout", getLifecycleRequest.timeout()));
|
||||
getLifecycleRequest.masterNodeTimeout(restRequest.paramAsTime("master_timeout", getLifecycleRequest.masterNodeTimeout()));
|
||||
|
||||
return channel -> client.execute(GetLifecycleAction.INSTANCE, getLifecycleRequest, new RestToXContentListener<>(channel));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,42 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.indexlifecycle.action;
|
||||
|
||||
import org.elasticsearch.client.node.NodeClient;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.rest.BaseRestHandler;
|
||||
import org.elasticsearch.rest.RestController;
|
||||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.rest.action.RestToXContentListener;
|
||||
import org.elasticsearch.xpack.indexlifecycle.IndexLifecycle;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class RestPutLifecycleAction extends BaseRestHandler {
|
||||
|
||||
public RestPutLifecycleAction(Settings settings, RestController controller) {
|
||||
super(settings);
|
||||
controller.registerHandler(RestRequest.Method.PUT,
|
||||
IndexLifecycle.BASE_PATH + "{name}", this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "xpack_lifecycle_put_action";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
|
||||
String lifecycleName = restRequest.param("name");
|
||||
XContentParser parser = restRequest.contentParser();
|
||||
PutLifecycleAction.Request putLifecycleRequest = PutLifecycleAction.Request.parseRequest(lifecycleName, parser, restRequest.getXContentRegistry());
|
||||
putLifecycleRequest.timeout(restRequest.paramAsTime("timeout", putLifecycleRequest.timeout()));
|
||||
putLifecycleRequest.masterNodeTimeout(restRequest.paramAsTime("master_timeout", putLifecycleRequest.masterNodeTimeout()));
|
||||
|
||||
return channel -> client.execute(PutLifecycleAction.INSTANCE, putLifecycleRequest, new RestToXContentListener<>(channel));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue