tal-temp migration to not use settings for the lifecycle policy
This commit is contained in:
parent
dd6d192e25
commit
1fad59b295
|
@ -52,22 +52,16 @@ public class IndexLifecycle extends Plugin {
|
|||
private Settings settings;
|
||||
private boolean enabled;
|
||||
private boolean transportClientMode;
|
||||
private boolean tribeNode;
|
||||
private boolean tribeNodeClient;
|
||||
|
||||
public static final Setting LIFECYCLE_TIMESERIES_SETTING = Setting.groupSetting("index.lifecycle.timeseries.", (settings) -> {
|
||||
ESLoggerFactory.getLogger("INDEX-LIFECYCLE-PLUGIN").error("validating setting internally: " + settings);
|
||||
if (settings.size() == 0) {
|
||||
return;
|
||||
}
|
||||
}, Setting.Property.Dynamic, Setting.Property.IndexScope);
|
||||
public static final Setting LIFECYCLE_TIMESERIES_NAME_SETTING = Setting.simpleString("index.lifecycle.name",
|
||||
Setting.Property.Dynamic, Setting.Property.IndexScope);
|
||||
public static final Setting LIFECYCLE_TIMESERIES_PHASE_SETTING = Setting.simpleString("index.lifecycle.phase",
|
||||
Setting.Property.Dynamic, Setting.Property.IndexScope);
|
||||
|
||||
public IndexLifecycle(Settings settings) {
|
||||
this.settings = settings;
|
||||
this.enabled = XPackSettings.INDEX_LIFECYCLE_ENABLED.get(settings);
|
||||
this.transportClientMode = XPackPlugin.transportClientMode(settings);
|
||||
this.tribeNode = XPackPlugin.isTribeNode(settings);
|
||||
this.tribeNodeClient = XPackPlugin.isTribeClientNode(settings);
|
||||
}
|
||||
|
||||
public Collection<Module> nodeModules() {
|
||||
|
@ -82,19 +76,9 @@ public class IndexLifecycle extends Plugin {
|
|||
return modules;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onIndexModule(IndexModule indexModule) {
|
||||
Index index = indexModule.getIndex();
|
||||
ESLoggerFactory.getLogger("INDEX-LIFECYCLE-PLUGIN").error("onIndexModule: " + index.getName());
|
||||
long creationDate = settings.getAsLong("index.creation_date", -1L);
|
||||
indexModule.addSettingsUpdateConsumer(LIFECYCLE_TIMESERIES_SETTING,
|
||||
(Settings s) -> indexLifecycleInitialisationService.get().setLifecycleSettings(index, creationDate, s));
|
||||
indexModule.addIndexEventListener(indexLifecycleInitialisationService.get());
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Setting<?>> getSettings() {
|
||||
return Arrays.asList(LIFECYCLE_TIMESERIES_SETTING);
|
||||
return Arrays.asList(LIFECYCLE_TIMESERIES_NAME_SETTING, LIFECYCLE_TIMESERIES_PHASE_SETTING);
|
||||
}
|
||||
|
||||
public Collection<Object> createComponents(InternalClient internalClient, ClusterService clusterService, Clock clock) {
|
||||
|
|
|
@ -12,11 +12,7 @@ import org.elasticsearch.cluster.service.ClusterService;
|
|||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.logging.ESLoggerFactory;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.shard.IndexEventListener;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.XPackPlugin;
|
||||
import org.elasticsearch.xpack.scheduler.SchedulerEngine;
|
||||
import org.elasticsearch.xpack.security.InternalClient;
|
||||
|
||||
|
@ -24,16 +20,16 @@ import java.io.Closeable;
|
|||
import java.io.IOException;
|
||||
import java.time.Clock;
|
||||
|
||||
import static org.elasticsearch.xpack.indexlifecycle.IndexLifecycle.LIFECYCLE_TIMESERIES_SETTING;
|
||||
import static org.elasticsearch.xpack.indexlifecycle.IndexLifecycle.LIFECYCLE_TIMESERIES_NAME_SETTING;
|
||||
import static org.elasticsearch.xpack.indexlifecycle.IndexLifecycle.NAME;
|
||||
|
||||
public class IndexLifecycleInitialisationService extends AbstractComponent implements LocalNodeMasterListener, IndexEventListener, SchedulerEngine.Listener, Closeable {
|
||||
private static final Logger logger = ESLoggerFactory.getLogger(XPackPlugin.class);
|
||||
public class IndexLifecycleInitialisationService extends AbstractComponent implements LocalNodeMasterListener, SchedulerEngine.Listener, Closeable {
|
||||
private static final Logger logger = ESLoggerFactory.getLogger(IndexLifecycleInitialisationService.class);
|
||||
|
||||
private final SetOnce<SchedulerEngine> scheduler = new SetOnce<>();
|
||||
private final Clock clock;
|
||||
private InternalClient client;
|
||||
private ClusterService clusterService;
|
||||
private boolean isMaster;
|
||||
|
||||
public IndexLifecycleInitialisationService(Settings settings, InternalClient client, ClusterService clusterService, Clock clock) {
|
||||
super(settings);
|
||||
|
@ -43,67 +39,31 @@ public class IndexLifecycleInitialisationService extends AbstractComponent imple
|
|||
clusterService.addLocalNodeMasterListener(this);
|
||||
}
|
||||
|
||||
/**
|
||||
* This should kick-off some stuff in the scheduler
|
||||
* This is triggered every update to settings in the index
|
||||
* @param settings the settings to read the lifecycle details from
|
||||
*/
|
||||
public synchronized void setLifecycleSettings(Index index, long creationDate, Settings settings) {
|
||||
if (isMaster) {
|
||||
IndexLifecycleSettings lifecycleSettings = new IndexLifecycleSettings(index, creationDate, settings, client);
|
||||
lifecycleSettings.schedulePhases(scheduler.get());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called before the index gets created. Note that this is also called
|
||||
* when the index is created on data nodes
|
||||
* @param index The index whose settings are to be validated
|
||||
* @param indexSettings The settings for the specified index
|
||||
*/
|
||||
@Override
|
||||
public void beforeIndexCreated(Index index, Settings indexSettings) {
|
||||
ESLoggerFactory.getLogger("INDEX-LIFECYCLE-PLUGIN").error("validate setting before index is created");
|
||||
LIFECYCLE_TIMESERIES_SETTING.get(indexSettings);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterIndexCreated(IndexService indexService) {
|
||||
Settings indexSettings = indexService.getIndexSettings().getSettings();
|
||||
Settings lifecycleSettings = (Settings) LIFECYCLE_TIMESERIES_SETTING.get(indexSettings);
|
||||
if (isMaster && lifecycleSettings.size() > 0) {
|
||||
setLifecycleSettings(indexService.index(), indexService.getMetaData().getCreationDate(),
|
||||
indexSettings.getByPrefix(LIFECYCLE_TIMESERIES_SETTING.getKey()));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void triggered(SchedulerEngine.Event event) {
|
||||
throw new UnsupportedOperationException();
|
||||
public String executorName() {
|
||||
return ThreadPool.Names.MANAGEMENT;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMaster() {
|
||||
isMaster = true;
|
||||
scheduler.set(new SchedulerEngine(clock));
|
||||
clusterService.state().getMetaData().getIndices().valuesIt()
|
||||
.forEachRemaining((idxMeta) -> {
|
||||
if (idxMeta.getSettings().getByPrefix(LIFECYCLE_TIMESERIES_SETTING.getKey()).size() > 0) {
|
||||
setLifecycleSettings(idxMeta.getIndex(), idxMeta.getCreationDate(),
|
||||
idxMeta.getSettings().getByPrefix(LIFECYCLE_TIMESERIES_SETTING.getKey()));
|
||||
}
|
||||
});
|
||||
scheduler.get().register(this);
|
||||
scheduler.get().add(new SchedulerEngine.Job(NAME, ((startTime, now) -> now + 1000)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void offMaster() {
|
||||
isMaster = false;
|
||||
// when is this called?
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executorName() {
|
||||
return ThreadPool.Names.MANAGEMENT;
|
||||
public void triggered(SchedulerEngine.Event event) {
|
||||
clusterService.state().getMetaData().getIndices().valuesIt()
|
||||
.forEachRemaining((idxMeta) -> {
|
||||
if (LIFECYCLE_TIMESERIES_NAME_SETTING.get(idxMeta.getSettings()) != null) {
|
||||
// get policy by name
|
||||
// idxMeta.getIndex(), idxMeta.getCreationDate(),client
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1,74 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.logging.ESLoggerFactory;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.xpack.scheduler.SchedulerEngine;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class IndexLifecycleSettings implements ToXContentObject {
|
||||
|
||||
public static final ParseField PHASES_FIELD = new ParseField("phases");
|
||||
|
||||
private List<Phase> phases;
|
||||
|
||||
public IndexLifecycleSettings(Index index, long indexCreationDate, Settings settings, Client client) {
|
||||
phases = new ArrayList<>();
|
||||
for (Map.Entry<String, Settings> e : settings.getAsGroups().entrySet()) {
|
||||
Phase phase = new Phase(e.getKey(), index, indexCreationDate, e.getValue(), client);
|
||||
phases.add(phase);
|
||||
}
|
||||
}
|
||||
|
||||
public IndexLifecycleSettings(List<Phase> phases) {
|
||||
this.phases = phases;
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.array(PHASES_FIELD.getPreferredName(), phases);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
public void schedulePhases(SchedulerEngine scheduler) {
|
||||
for (Phase phase : phases) {
|
||||
scheduler.register(phase);
|
||||
scheduler.add(phase);
|
||||
ESLoggerFactory.getLogger("INDEX-LIFECYCLE-PLUGIN")
|
||||
.error("kicked off lifecycle job to be triggered in " + phase.getAfter() + " seconds");
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
Settings.Builder settingsBuilder = Settings.builder();
|
||||
settingsBuilder.put("new.after", "1m");
|
||||
settingsBuilder.put("delete.after", "3s");
|
||||
settingsBuilder.put("delete.actions.delete.what", "me");
|
||||
Settings settings = settingsBuilder.build();
|
||||
System.out.println(settings);
|
||||
|
||||
long currentTimeMillis = System.currentTimeMillis();
|
||||
System.out.println(currentTimeMillis);
|
||||
|
||||
IndexLifecycleSettings lifecycleSettings = new IndexLifecycleSettings(new Index("test_index", "1234567890"), currentTimeMillis,
|
||||
settings, null);
|
||||
System.out.println(Strings.toString(lifecycleSettings));
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,32 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
public class LifecyclePolicy implements ToXContentObject {
|
||||
|
||||
public static final ParseField PHASES_FIELD = new ParseField("phases");
|
||||
|
||||
private List<Phase> phases;
|
||||
|
||||
public LifecyclePolicy(List<Phase> phases) {
|
||||
this.phases = phases;
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.array(PHASES_FIELD.getPreferredName(), phases);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
}
|
|
@ -8,64 +8,41 @@ package org.elasticsearch.xpack.indexlifecycle;
|
|||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.logging.ESLoggerFactory;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.xpack.scheduler.SchedulerEngine;
|
||||
import org.elasticsearch.xpack.scheduler.SchedulerEngine.Event;
|
||||
import org.elasticsearch.xpack.scheduler.SchedulerEngine.Schedule;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class Phase extends SchedulerEngine.Job implements ToXContentObject, SchedulerEngine.Listener {
|
||||
public class Phase extends SchedulerEngine.Job implements ToXContentObject {
|
||||
|
||||
public static final ParseField NAME_FIELD = new ParseField("name");
|
||||
public static final ParseField ID_FIELD = new ParseField("id");
|
||||
public static final ParseField ACTIONS_FIELD = new ParseField("actions");
|
||||
public static final ParseField AFTER_FIELD = new ParseField("after");
|
||||
public static final Setting<TimeValue> AFTER_SETTING = Setting.positiveTimeSetting(AFTER_FIELD.getPreferredName(),
|
||||
TimeValue.timeValueSeconds(60), Property.IndexScope, Property.Dynamic);
|
||||
public static final Setting<Settings> ACTIONS_SETTING = Setting.groupSetting(ACTIONS_FIELD.getPreferredName() + ".", (settings) -> {
|
||||
if (settings.size() == 0) {
|
||||
return;
|
||||
}
|
||||
// NOCOMMIT add validation here
|
||||
}, Setting.Property.Dynamic, Setting.Property.IndexScope);
|
||||
|
||||
private String name;
|
||||
private List<Action> actions;
|
||||
private Client client;
|
||||
private TimeValue after;
|
||||
|
||||
public Phase(String name, Index index, long creationDate, Settings settings, Client client) {
|
||||
super(index.getName() + "-" + name, getSchedule(creationDate, settings));
|
||||
public Phase(String name, Index index, long creationDate, TimeValue after, List<Action> actions, Client client) {
|
||||
super(index.getName() + "-" + name, getSchedule(creationDate, after));
|
||||
this.name = name;
|
||||
this.client = client;
|
||||
this.after = AFTER_SETTING.get(settings);
|
||||
this.actions = new ArrayList<>();
|
||||
Settings actionsSettings = ACTIONS_SETTING.get(settings);
|
||||
for (Map.Entry<String, Settings> e : actionsSettings.getAsGroups().entrySet()) {
|
||||
if (e.getKey().equals("delete")) {
|
||||
Action action = new DeleteAction(index);
|
||||
actions.add(action);
|
||||
}
|
||||
}
|
||||
this.after = after;
|
||||
this.actions = actions;
|
||||
}
|
||||
|
||||
public TimeValue getAfter() {
|
||||
return after;
|
||||
}
|
||||
|
||||
private static Schedule getSchedule(long creationDate, Settings settings) {
|
||||
System.out.println(settings);
|
||||
TimeValue after = AFTER_SETTING.get(settings);
|
||||
private static Schedule getSchedule(long creationDate, TimeValue after) {
|
||||
SchedulerEngine.Schedule schedule = (startTime, now) -> {
|
||||
ESLoggerFactory.getLogger("INDEX-LIFECYCLE-PLUGIN")
|
||||
.error("calculating schedule with creationTime:" + creationDate + ", and now:" + now);
|
||||
|
@ -91,13 +68,6 @@ public class Phase extends SchedulerEngine.Job implements ToXContentObject, Sche
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void triggered(Event event) {
|
||||
if (event.getJobName().equals(getId())) {
|
||||
performActions();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
|
|
Loading…
Reference in New Issue