general indexlifecycle cleanup, rest tests, reintroduce integ-test (#3243)
* cleanup, rest tests, reintroduce integ-test - cleaned up some long lines - re-introduced one of the integration tests - added rest-spec tests
This commit is contained in:
parent
d847355b58
commit
ae7e4882e4
|
@ -134,7 +134,8 @@ public class IndexLifecycle extends Plugin {
|
|||
new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(IndexLifecycleMetadata.TYPE),
|
||||
parser -> IndexLifecycleMetadata.PARSER.parse(parser, null)),
|
||||
// Lifecycle Policy
|
||||
new NamedXContentRegistry.Entry(LifecyclePolicy.class, new ParseField(TimeseriesLifecyclePolicy.TYPE), (p, c) -> TimeseriesLifecyclePolicy.parse(p, c)),
|
||||
new NamedXContentRegistry.Entry(LifecyclePolicy.class, new ParseField(TimeseriesLifecyclePolicy.TYPE),
|
||||
(p, c) -> TimeseriesLifecyclePolicy.parse(p, c)),
|
||||
// Lifecycle actions
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(AllocateAction.NAME), AllocateAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ForceMergeAction.NAME), ForceMergeAction::parse),
|
||||
|
|
|
@ -69,12 +69,12 @@ public interface IndexLifecycleContext {
|
|||
* a {@link LifecycleAction.Listener} to pass to the
|
||||
* {@link LifecycleAction}.
|
||||
*/
|
||||
public void executeAction(LifecycleAction action, LifecycleAction.Listener listener);
|
||||
void executeAction(LifecycleAction action, LifecycleAction.Listener listener);
|
||||
|
||||
/**
|
||||
* A callback for use when setting phase or action names.
|
||||
*/
|
||||
public static interface Listener {
|
||||
interface Listener {
|
||||
|
||||
/**
|
||||
* Called if the call to set the action/phase name was successful.
|
||||
|
|
|
@ -147,38 +147,39 @@ public class IndexLifecycleMetadata implements MetaData.Custom {
|
|||
final Long pollIntervalDiff;
|
||||
|
||||
IndexLifecycleMetadataDiff(IndexLifecycleMetadata before, IndexLifecycleMetadata after) {
|
||||
this.policies = DiffableUtils.diff(before.policies, after.policies, DiffableUtils.getStringKeySerializer(), new DiffableUtils.ValueSerializer<String, LifecyclePolicy>() {
|
||||
@Override
|
||||
public void write(LifecyclePolicy value, StreamOutput out) throws IOException {
|
||||
out.writeNamedWriteable(value);
|
||||
}
|
||||
this.policies = DiffableUtils.diff(before.policies, after.policies, DiffableUtils.getStringKeySerializer(),
|
||||
new DiffableUtils.ValueSerializer<String, LifecyclePolicy>() {
|
||||
@Override
|
||||
public void write(LifecyclePolicy value, StreamOutput out) throws IOException {
|
||||
out.writeNamedWriteable(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public LifecyclePolicy read(StreamInput in, String key) throws IOException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
@Override
|
||||
public LifecyclePolicy read(StreamInput in, String key) throws IOException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Diff<LifecyclePolicy> readDiff(StreamInput in, String key) throws IOException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
@Override
|
||||
public Diff<LifecyclePolicy> readDiff(StreamInput in, String key) throws IOException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean supportsDiffableValues() {
|
||||
return true;
|
||||
}
|
||||
@Override
|
||||
public boolean supportsDiffableValues() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Diff<LifecyclePolicy> diff(LifecyclePolicy value, LifecyclePolicy beforePart) {
|
||||
return value.diff(beforePart);
|
||||
}
|
||||
@Override
|
||||
public Diff<LifecyclePolicy> diff(LifecyclePolicy value, LifecyclePolicy beforePart) {
|
||||
return value.diff(beforePart);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeDiff(Diff<LifecyclePolicy> value, StreamOutput out) throws IOException {
|
||||
value.writeTo(out);
|
||||
}
|
||||
@Override
|
||||
public void writeDiff(Diff<LifecyclePolicy> value, StreamOutput out) throws IOException {
|
||||
value.writeTo(out);
|
||||
}
|
||||
|
||||
});
|
||||
});
|
||||
this.pollIntervalDiff = after.pollInterval - before.pollInterval;
|
||||
}
|
||||
|
||||
|
|
|
@ -69,13 +69,13 @@ public class IndexLifecycleService extends AbstractComponent
|
|||
@Override
|
||||
public void triggered(SchedulerEngine.Event event) {
|
||||
if (event.getJobName().equals(NAME)) {
|
||||
logger.error("Job triggered: " + event.getJobName() + ", " + event.getScheduledTime() + ", " + event.getTriggeredTime());
|
||||
logger.info("Job triggered: " + event.getJobName() + ", " + event.getScheduledTime() + ", " + event.getTriggeredTime());
|
||||
IndexLifecycleMetadata indexLifecycleMetadata = clusterService.state().metaData().custom(IndexLifecycleMetadata.TYPE);
|
||||
SortedMap<String, LifecyclePolicy> policies = indexLifecycleMetadata.getPolicies();
|
||||
clusterService.state().getMetaData().getIndices().valuesIt().forEachRemaining((idxMeta) -> {
|
||||
String policyName = IndexLifecycle.LIFECYCLE_TIMESERIES_NAME_SETTING.get(idxMeta.getSettings());
|
||||
if (Strings.isNullOrEmpty(policyName) == false) {
|
||||
logger.error("Checking index for next action: " + idxMeta.getIndex().getName() + " (" + policyName + ")");
|
||||
logger.info("Checking index for next action: " + idxMeta.getIndex().getName() + " (" + policyName + ")");
|
||||
LifecyclePolicy policy = policies.get(policyName);
|
||||
policy.execute(new InternalIndexLifecycleContext(idxMeta, client, nowSupplier));
|
||||
}
|
||||
|
|
|
@ -34,7 +34,7 @@ public interface LifecycleAction extends ToXContentObject, NamedWriteable {
|
|||
/**
|
||||
* A callback for when a {@link LifecycleAction} finishes executing
|
||||
*/
|
||||
public static interface Listener {
|
||||
interface Listener {
|
||||
|
||||
/**
|
||||
* Called if the call to
|
||||
|
|
|
@ -188,14 +188,14 @@ public abstract class LifecyclePolicy extends AbstractDiffable<LifecyclePolicy>
|
|||
|
||||
/**
|
||||
* @param currentPhase the current phase that is or was just executed
|
||||
* @return the next phase after {@param currentPhase} to be execute. If it is `null`, the first
|
||||
* @return the next phase after <code>currentPhase</code> to be execute. If it is `null`, the first
|
||||
* phase to be executed is returned. If it is the last phase, then no next phase is to be
|
||||
* executed and `null` is returned.
|
||||
*/
|
||||
protected abstract Phase nextPhase(@Nullable Phase currentPhase);
|
||||
|
||||
/**
|
||||
* validates whether the specified {@param phases} are valid for this policy instance.
|
||||
* validates whether the specified <code>phases</code> are valid for this policy instance.
|
||||
* @param phases the phases to verify validity against
|
||||
* @throws IllegalArgumentException if a specific phase or lack of a specific phase is invalid.
|
||||
*/
|
||||
|
@ -217,8 +217,8 @@ public abstract class LifecyclePolicy extends AbstractDiffable<LifecyclePolicy>
|
|||
private final String name;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public static ConstructingObjectParser<LifecyclePolicy, Tuple<String, NamedXContentRegistry>> PARSER = new ConstructingObjectParser<>(
|
||||
"lifecycle_policy", false, (a, c) -> {
|
||||
public static ConstructingObjectParser<LifecyclePolicy, Tuple<String, NamedXContentRegistry>> PARSER =
|
||||
new ConstructingObjectParser<>("lifecycle_policy", false, (a, c) -> {
|
||||
String lifecycleType = (String) a[0];
|
||||
List<Phase> phases = (List<Phase>) a[1];
|
||||
Map<String, Phase> phaseMap = phases.stream().collect(Collectors.toMap(Phase::getName, Function.identity()));
|
||||
|
|
|
@ -152,7 +152,8 @@ public class Phase implements ToXContentObject, Writeable {
|
|||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
// Something went wrong so log the error and hopfully it will succeed next time execute is called. NOCOMMIT can we do better here?
|
||||
// Something went wrong so log the error and hopefully it will succeed next time execute
|
||||
// is called. NOCOMMIT can we do better here?
|
||||
logger.error("Failed to initialised action [" + firstActionName + "] for index [" + indexName + "]", e);
|
||||
}
|
||||
});
|
||||
|
|
|
@ -43,7 +43,7 @@ import java.util.TreeMap;
|
|||
public class DeleteLifecycleAction
|
||||
extends Action<DeleteLifecycleAction.Request, DeleteLifecycleAction.Response, DeleteLifecycleAction.RequestBuilder> {
|
||||
public static final DeleteLifecycleAction INSTANCE = new DeleteLifecycleAction();
|
||||
public static final String NAME = "cluster:admin/xpack/indexlifecycle/delete";
|
||||
public static final String NAME = "cluster:admin/xpack/index_lifecycle/delete";
|
||||
|
||||
protected DeleteLifecycleAction() {
|
||||
super(NAME);
|
||||
|
|
|
@ -39,7 +39,7 @@ import java.util.Objects;
|
|||
public class GetLifecycleAction
|
||||
extends Action<GetLifecycleAction.Request, GetLifecycleAction.Response, GetLifecycleAction.RequestBuilder> {
|
||||
public static final GetLifecycleAction INSTANCE = new GetLifecycleAction();
|
||||
public static final String NAME = "cluster:admin/xpack/indexlifecycle/get";
|
||||
public static final String NAME = "cluster:admin/xpack/index_lifecycle/get";
|
||||
|
||||
protected GetLifecycleAction() {
|
||||
super(NAME);
|
||||
|
|
|
@ -46,7 +46,7 @@ import java.util.TreeMap;
|
|||
|
||||
public class PutLifecycleAction extends Action<PutLifecycleAction.Request, PutLifecycleAction.Response, PutLifecycleAction.RequestBuilder> {
|
||||
public static final PutLifecycleAction INSTANCE = new PutLifecycleAction();
|
||||
public static final String NAME = "cluster:admin/xpack/indexlifecycle/put";
|
||||
public static final String NAME = "cluster:admin/xpack/index_lifecycle/put";
|
||||
|
||||
protected PutLifecycleAction() {
|
||||
super(NAME);
|
||||
|
@ -124,8 +124,8 @@ public class PutLifecycleAction extends Action<PutLifecycleAction.Request, PutLi
|
|||
public static class Request extends AcknowledgedRequest<Request> implements ToXContentObject {
|
||||
|
||||
public static final ParseField POLICY_FIELD = new ParseField("policy");
|
||||
private static final ConstructingObjectParser<Request, Tuple<String, NamedXContentRegistry>> PARSER = new ConstructingObjectParser<>(
|
||||
"put_lifecycle_request", a -> new Request((LifecyclePolicy) a[0]));
|
||||
private static final ConstructingObjectParser<Request, Tuple<String, NamedXContentRegistry>> PARSER =
|
||||
new ConstructingObjectParser<>("put_lifecycle_request", a -> new Request((LifecyclePolicy) a[0]));
|
||||
static {
|
||||
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> LifecyclePolicy.parse(p, c), POLICY_FIELD);
|
||||
}
|
||||
|
|
|
@ -33,7 +33,8 @@ public class RestPutLifecycleAction extends BaseRestHandler {
|
|||
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
|
||||
String lifecycleName = restRequest.param("name");
|
||||
XContentParser parser = restRequest.contentParser();
|
||||
PutLifecycleAction.Request putLifecycleRequest = PutLifecycleAction.Request.parseRequest(lifecycleName, parser, restRequest.getXContentRegistry());
|
||||
PutLifecycleAction.Request putLifecycleRequest = PutLifecycleAction.Request.parseRequest(lifecycleName, parser,
|
||||
restRequest.getXContentRegistry());
|
||||
putLifecycleRequest.timeout(restRequest.paramAsTime("timeout", putLifecycleRequest.timeout()));
|
||||
putLifecycleRequest.masterNodeTimeout(restRequest.paramAsTime("master_timeout", putLifecycleRequest.masterNodeTimeout()));
|
||||
|
||||
|
|
|
@ -5,7 +5,6 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.indexlifecycle;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
|
||||
import org.elasticsearch.analysis.common.CommonAnalysisPlugin;
|
||||
|
@ -29,6 +28,7 @@ import java.util.Arrays;
|
|||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.client.Requests.clusterHealthRequest;
|
||||
import static org.elasticsearch.client.Requests.createIndexRequest;
|
||||
|
@ -40,7 +40,6 @@ import static org.hamcrest.CoreMatchers.not;
|
|||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.core.IsNull.nullValue;
|
||||
|
||||
@AwaitsFix(bugUrl = "THIS NEEDS FIXING") // NOCOMMIT Fix this integration test
|
||||
@ESIntegTestCase.ClusterScope(scope = Scope.TEST, numDataNodes = 0)
|
||||
public class IndexLifecycleInitialisationIT extends ESIntegTestCase {
|
||||
private Settings settings;
|
||||
|
@ -84,22 +83,20 @@ public class IndexLifecycleInitialisationIT extends ESIntegTestCase {
|
|||
|
||||
@Before
|
||||
public void init() {
|
||||
settings = Settings.builder().put(indexSettings()).put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.put("index.lifecycle.name", "test_lifecycle").build();
|
||||
List<Phase> phases = new ArrayList<>();
|
||||
phases.add(new Phase("new", TimeValue.timeValueSeconds(0), Collections.emptyList()));
|
||||
settings = Settings.builder().put(indexSettings()).put(SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(SETTING_NUMBER_OF_REPLICAS, 0).put("index.lifecycle.name", "test").build();
|
||||
List<LifecycleAction> deletePhaseActions = Collections.singletonList(new DeleteAction());
|
||||
phases.add(new Phase("delete", TimeValue.timeValueSeconds(3), deletePhaseActions));
|
||||
lifecyclePolicy = new TestLifecyclePolicy("test", phases);
|
||||
Map<String, Phase> phases = Collections.singletonMap("delete", new Phase("delete",
|
||||
TimeValue.timeValueSeconds(3), deletePhaseActions));
|
||||
lifecyclePolicy = new TimeseriesLifecyclePolicy("test", phases);
|
||||
}
|
||||
|
||||
public void testSingleNodeCluster() throws Exception {
|
||||
// start master node
|
||||
logger.info("Starting sever1");
|
||||
logger.info("Starting server1");
|
||||
final String server_1 = internalCluster().startNode();
|
||||
final String node1 = getLocalNodeId(server_1);
|
||||
logger.info("Creating lifecycle [test_lifecycle]");
|
||||
Thread.sleep(10000);
|
||||
PutLifecycleAction.Request putLifecycleRequest = new PutLifecycleAction.Request(lifecyclePolicy);
|
||||
PutLifecycleAction.Response putLifecycleResponse = client().execute(PutLifecycleAction.INSTANCE, putLifecycleRequest).get();
|
||||
assertAcked(putLifecycleResponse);
|
||||
|
@ -115,6 +112,7 @@ public class IndexLifecycleInitialisationIT extends ESIntegTestCase {
|
|||
});
|
||||
}
|
||||
|
||||
@AwaitsFix(bugUrl = "THIS NEEDS FIXING") // NOCOMMIT Fix this integration test
|
||||
public void testMasterDedicatedDataDedicated() throws Exception {
|
||||
// start master node
|
||||
logger.info("Starting sever1");
|
||||
|
@ -139,6 +137,7 @@ public class IndexLifecycleInitialisationIT extends ESIntegTestCase {
|
|||
});
|
||||
}
|
||||
|
||||
@AwaitsFix(bugUrl = "THIS NEEDS FIXING") // NOCOMMIT Fix this integration test
|
||||
public void testMasterFailover() throws Exception {
|
||||
// start one server
|
||||
logger.info("Starting sever1");
|
||||
|
|
|
@ -35,8 +35,10 @@ public class PutLifecycleRequestTests extends AbstractStreamableXContentTestCase
|
|||
@Before
|
||||
public void setup() {
|
||||
List<NamedXContentRegistry.Entry> entries = Arrays
|
||||
.asList(new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(DeleteAction.NAME), DeleteAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecyclePolicy.class, new ParseField(TestLifecyclePolicy.TYPE), TestLifecyclePolicy::parse));
|
||||
.asList(new NamedXContentRegistry.Entry(LifecycleAction.class,
|
||||
new ParseField(DeleteAction.NAME), DeleteAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecyclePolicy.class,
|
||||
new ParseField(TestLifecyclePolicy.TYPE), TestLifecyclePolicy::parse));
|
||||
registry = new NamedXContentRegistry(entries);
|
||||
lifecycleName = randomAlphaOfLength(20); // NOCOMMIT we need to randomise the lifecycle name rather
|
||||
// than use the same name for all instances
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
{
|
||||
"xpack.index_lifecycle.delete_lifecycle": {
|
||||
"documentation": "http://www.elastic.co/guide/en/index_lifecycle/current/index_lifecycle.html",
|
||||
"methods": [ "DELETE" ],
|
||||
"url": {
|
||||
"path": "/_xpack/index_lifecycle/{lifecycle}",
|
||||
"paths": ["/_xpack/index_lifecycle/{lifecycle}"],
|
||||
"parts": {
|
||||
"lifecycle": {
|
||||
"type" : "string",
|
||||
"description" : "The name of the index lifecycle"
|
||||
}
|
||||
},
|
||||
"params": {
|
||||
}
|
||||
},
|
||||
"body": null
|
||||
}
|
||||
}
|
|
@ -0,0 +1,19 @@
|
|||
{
|
||||
"xpack.index_lifecycle.get_lifecycle": {
|
||||
"documentation": "http://www.elastic.co/guide/en/index_lifecycle/current/index_lifecycle.html",
|
||||
"methods": [ "GET" ],
|
||||
"url": {
|
||||
"path": "/_xpack/index_lifecycle/{lifecycle}",
|
||||
"paths": ["/_xpack/index_lifecycle/{lifecycle}"],
|
||||
"parts": {
|
||||
"lifecycle": {
|
||||
"type" : "string",
|
||||
"description" : "The name of the index lifecycle"
|
||||
}
|
||||
},
|
||||
"params": {
|
||||
}
|
||||
},
|
||||
"body": null
|
||||
}
|
||||
}
|
|
@ -0,0 +1,21 @@
|
|||
{
|
||||
"xpack.index_lifecycle.put_lifecycle": {
|
||||
"documentation": "http://www.elastic.co/guide/en/index_lifecycle/current/index_lifecycle.html",
|
||||
"methods": [ "PUT" ],
|
||||
"url": {
|
||||
"path": "/_xpack/index_lifecycle/{lifecycle}",
|
||||
"paths": ["/_xpack/index_lifecycle/{lifecycle}"],
|
||||
"parts": {
|
||||
"lifecycle": {
|
||||
"type" : "string",
|
||||
"description" : "The name of the index lifecycle"
|
||||
}
|
||||
},
|
||||
"params": {
|
||||
}
|
||||
},
|
||||
"body": {
|
||||
"description": "The lifecycle policy definition to register"
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,50 @@
|
|||
---
|
||||
setup:
|
||||
- do:
|
||||
cluster.health:
|
||||
wait_for_status: yellow
|
||||
|
||||
---
|
||||
"Test Basic Policy CRUD":
|
||||
- do:
|
||||
acknowlege: true
|
||||
xpack.index_lifecycle.put_lifecycle:
|
||||
lifecycle: "my_timeseries_lifecycle"
|
||||
body: |
|
||||
{
|
||||
"policy": {
|
||||
"type": "timeseries",
|
||||
"phases": {
|
||||
"warm": {
|
||||
"after": "10s",
|
||||
"actions": {
|
||||
"forcemerge": {}
|
||||
}
|
||||
},
|
||||
"delete": {
|
||||
"after": "30s",
|
||||
"actions": {
|
||||
"delete": {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
- do:
|
||||
acknowledge: true
|
||||
xpack.index_lifecycle.get_lifecycle:
|
||||
lifecycle: "my_timeseries_lifecycle"
|
||||
- match: { type: "timeseries" }
|
||||
- match: { phases.warm.after: "10s" }
|
||||
- match: { phases.delete.after: "30s" }
|
||||
|
||||
- do:
|
||||
acknowledge: true
|
||||
xpack.index_lifecycle.delete_lifecycle:
|
||||
lifecycle: "my_timeseries_lifecycle"
|
||||
|
||||
- do:
|
||||
catch: missing
|
||||
xpack.index_lifecycle.get_lifecycle:
|
||||
lifecycle: "my_timeseries_lifecycle"
|
Loading…
Reference in New Issue