Gets the scheduler to use poll_interval
This commit is contained in:
parent
3f3f7b9b47
commit
eba5afefb7
|
@ -63,6 +63,8 @@ public class IndexLifecycle extends Plugin {
|
|||
Setting.Property.Dynamic, Setting.Property.IndexScope);
|
||||
public static final Setting<String> LIFECYCLE_TIMESERIES_PHASE_SETTING = Setting.simpleString("index.lifecycle.phase",
|
||||
Setting.Property.Dynamic, Setting.Property.IndexScope);
|
||||
public static final Setting<String> LIFECYCLE_TIMESERIES_ACTION_SETTING = Setting.simpleString("index.lifecycle.action",
|
||||
Setting.Property.Dynamic, Setting.Property.IndexScope);
|
||||
|
||||
public IndexLifecycle(Settings settings) {
|
||||
this.settings = settings;
|
||||
|
@ -84,7 +86,10 @@ public class IndexLifecycle extends Plugin {
|
|||
|
||||
@Override
|
||||
public List<Setting<?>> getSettings() {
|
||||
return Arrays.asList(LIFECYCLE_TIMESERIES_NAME_SETTING, LIFECYCLE_TIMESERIES_PHASE_SETTING);
|
||||
return Arrays.asList(
|
||||
LIFECYCLE_TIMESERIES_NAME_SETTING,
|
||||
LIFECYCLE_TIMESERIES_PHASE_SETTING,
|
||||
LIFECYCLE_TIMESERIES_ACTION_SETTING);
|
||||
}
|
||||
|
||||
public Collection<Object> createComponents(InternalClient internalClient, ClusterService clusterService, Clock clock,
|
||||
|
|
|
@ -17,15 +17,16 @@ import org.elasticsearch.common.component.AbstractComponent;
|
|||
import org.elasticsearch.common.logging.ESLoggerFactory;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.ml.MlMetadata;
|
||||
import org.elasticsearch.xpack.scheduler.SchedulerEngine;
|
||||
import org.elasticsearch.xpack.security.InternalClient;
|
||||
import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule;
|
||||
import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule.Interval;
|
||||
import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule.Interval.Unit;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.time.Clock;
|
||||
|
||||
import static org.elasticsearch.xpack.indexlifecycle.IndexLifecycle.LIFECYCLE_TIMESERIES_NAME_SETTING;
|
||||
import static org.elasticsearch.xpack.indexlifecycle.IndexLifecycle.NAME;
|
||||
|
||||
public class IndexLifecycleInitialisationService extends AbstractComponent
|
||||
|
@ -53,27 +54,27 @@ public class IndexLifecycleInitialisationService extends AbstractComponent
|
|||
if (event.localNodeMaster()) {
|
||||
MetaData metaData = event.state().metaData();
|
||||
installMlMetadata(metaData);
|
||||
if (scheduler.get() == null) {
|
||||
scheduler.set(new SchedulerEngine(clock));
|
||||
scheduler.get().register(this);
|
||||
scheduler.get().add(new SchedulerEngine.Job(NAME, ((startTime, now) -> now + 1000)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void triggered(SchedulerEngine.Event event) {
|
||||
clusterService.state().getMetaData().getIndices().valuesIt()
|
||||
.forEachRemaining((idxMeta) -> {
|
||||
if (LIFECYCLE_TIMESERIES_NAME_SETTING.get(idxMeta.getSettings()) != null) {
|
||||
// get policy by name
|
||||
// idxMeta.getIndex(), idxMeta.getCreationDate(),client
|
||||
}
|
||||
});
|
||||
if (event.getJobName().equals(NAME)) {
|
||||
logger.error("Job triggered: " + event.getJobName() + ", " + event.getScheduledTime() + ", " + event.getTriggeredTime());
|
||||
// clusterService.state().getMetaData().getIndices().valuesIt()
|
||||
// .forEachRemaining((idxMeta) -> {
|
||||
// if (LIFECYCLE_TIMESERIES_NAME_SETTING.get(idxMeta.getSettings())
|
||||
// != null) {
|
||||
// // get policy by name
|
||||
// // idxMeta.getIndex(), idxMeta.getCreationDate(),client
|
||||
// }
|
||||
// });
|
||||
}
|
||||
}
|
||||
|
||||
private void installMlMetadata(MetaData metaData) {
|
||||
if (metaData.custom(MlMetadata.TYPE) == null) {
|
||||
IndexLifecycleMetadata indexLifecycleMetadata = metaData.custom(IndexLifecycleMetadata.TYPE);
|
||||
if (indexLifecycleMetadata == null) {
|
||||
threadPool.executor(ThreadPool.Names.GENERIC)
|
||||
.execute(() -> clusterService.submitStateUpdateTask("install-index-lifecycle-metadata", new ClusterStateUpdateTask() {
|
||||
@Override
|
||||
|
@ -94,6 +95,13 @@ public class IndexLifecycleInitialisationService extends AbstractComponent
|
|||
logger.error("unable to install index lifecycle metadata", e);
|
||||
}
|
||||
}));
|
||||
} else {
|
||||
if (scheduler.get() == null) {
|
||||
scheduler.set(new SchedulerEngine(clock));
|
||||
scheduler.get().register(this);
|
||||
scheduler.get().add(new SchedulerEngine.Job(NAME,
|
||||
new IntervalSchedule(new Interval(indexLifecycleMetadata.getPollInterval(), Unit.SECONDS))));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -32,7 +32,7 @@ public class IndexLifecycleMetadata implements MetaData.Custom {
|
|||
public static final ParseField POLICIES_FIELD = new ParseField("policies");
|
||||
public static final ParseField POLL_INTERVAL_FIELD = new ParseField("poll_interval");
|
||||
|
||||
public static final IndexLifecycleMetadata EMPTY_METADATA = new IndexLifecycleMetadata(Collections.emptySortedMap(), 300000);
|
||||
public static final IndexLifecycleMetadata EMPTY_METADATA = new IndexLifecycleMetadata(Collections.emptySortedMap(), 3);
|
||||
@SuppressWarnings("unchecked")
|
||||
public static final ConstructingObjectParser<IndexLifecycleMetadata, NamedXContentRegistry> PARSER = new ConstructingObjectParser<>(
|
||||
TYPE, a -> new IndexLifecycleMetadata((SortedMap<String, LifecyclePolicy>) a[0], (long) a[1]));
|
||||
|
|
Loading…
Reference in New Issue