Adds rollover action to index lifecycle plugin (#3266)

This action will rollover an index when executed if the provided conditions are met.
Users may specify the maximum age, maximum index size in bytes or maximum index size in number of documents as conditions for rollover.

When the action executes it firsts checks the local cluster state to find out if the alias exists on the index. If the alias does not exist then the index was either rolled over by a previous run or something else has rolled over the index so the action can be marked as completed. If the index still has the alias set the action will make a rollover index request using the Client. When that request returns and the listener is called the action will only be marked as complete if the response indicates the index was rolled over. If the index was not rolled over (because the conditions are not yet met) the action is not marked as complete and will be re-evaluated on the next call to execute.
This commit is contained in:
Colin Goodheart-Smithe 2017-12-15 09:36:39 +00:00 committed by GitHub
parent 8d6e53356b
commit 3da42f5603
16 changed files with 501 additions and 55 deletions

View File

@ -6,10 +6,8 @@
package org.elasticsearch.xpack.indexlifecycle;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -57,7 +55,7 @@ public class AllocateAction implements LifecycleAction {
}
@Override
public void execute(Index index, Client client, Listener listener) {
public void execute(Index index, Client client, ClusterService clusterService, Listener listener) {
// nocommit: stub
listener.onSuccess(true);
}

View File

@ -10,6 +10,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -57,7 +58,7 @@ public class DeleteAction implements LifecycleAction {
}
@Override
public void execute(Index index, Client client, Listener listener) {
public void execute(Index index, Client client, ClusterService clusterService, Listener listener) {
client.admin().indices().delete(new DeleteIndexRequest(index.getName()), new ActionListener<DeleteIndexResponse>() {
@Override
public void onResponse(DeleteIndexResponse deleteIndexResponse) {

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.indexlifecycle;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -54,7 +55,7 @@ public class ForceMergeAction implements LifecycleAction {
}
@Override
public void execute(Index index, Client client, Listener listener) {
public void execute(Index index, Client client, ClusterService clusterService, Listener listener) {
// nocommit: stub
listener.onSuccess(true);
}

View File

@ -77,7 +77,7 @@ public class IndexLifecycleService extends AbstractComponent
if (Strings.isNullOrEmpty(policyName) == false) {
logger.info("Checking index for next action: " + idxMeta.getIndex().getName() + " (" + policyName + ")");
LifecyclePolicy policy = policies.get(policyName);
policy.execute(new InternalIndexLifecycleContext(idxMeta, client, nowSupplier));
policy.execute(new InternalIndexLifecycleContext(idxMeta, client, clusterService, nowSupplier));
}
});
}

View File

@ -10,6 +10,7 @@ import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import java.util.function.LongSupplier;
@ -23,6 +24,7 @@ public class InternalIndexLifecycleContext implements IndexLifecycleContext {
private Client client;
private IndexMetaData idxMeta;
private LongSupplier nowSupplier;
private ClusterService clusterService;
/**
* @param idxMeta
@ -33,9 +35,10 @@ public class InternalIndexLifecycleContext implements IndexLifecycleContext {
* a {@link LongSupplier} to provide the current timestamp when
* required.
*/
public InternalIndexLifecycleContext(IndexMetaData idxMeta, Client client, LongSupplier nowSupplier) {
public InternalIndexLifecycleContext(IndexMetaData idxMeta, Client client, ClusterService clusterService, LongSupplier nowSupplier) {
this.idxMeta = idxMeta;
this.client = client;
this.clusterService = clusterService;
this.nowSupplier = nowSupplier;
}
@ -81,7 +84,7 @@ public class InternalIndexLifecycleContext implements IndexLifecycleContext {
@Override
public void executeAction(LifecycleAction action, LifecycleAction.Listener listener) {
action.execute(idxMeta.getIndex(), client, listener);
action.execute(idxMeta.getIndex(), client, clusterService, listener);
}
private void writeSettings(String index, Settings settings, Listener listener) {

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.indexlifecycle;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.index.Index;
@ -29,7 +30,7 @@ public interface LifecycleAction extends ToXContentObject, NamedWriteable {
* @param listener
* a {@link Listener} to call when this call completes.
*/
void execute(Index index, Client client, Listener listener);
void execute(Index index, Client client, ClusterService clusterService, Listener listener);
/**
* A callback for when a {@link LifecycleAction} finishes executing
@ -38,23 +39,23 @@ public interface LifecycleAction extends ToXContentObject, NamedWriteable {
/**
* Called if the call to
* {@link LifecycleAction#execute(Index, Client, Listener)} was
* successful
* {@link LifecycleAction#execute(Index, Client, ClusterService, Listener)}
* was successful
*
* @param completed
* <code>true</code> iff the {@link LifecycleAction} is now
* complete and requires no more calls to
* {@link LifecycleAction#execute(Index, Client, Listener)
* {@link LifecycleAction#execute(Index, Client, ClusterService, Listener)
* execute(Index, Client, Listener)}.
*/
void onSuccess(boolean completed);
/**
* Called if there was an exception during
* {@link LifecycleAction#execute(Index, Client, Listener)}. Note that
* even the call to
* {@link LifecycleAction#execute(Index, Client, Listener)} may be
* retried even after this method is called.
* {@link LifecycleAction#execute(Index, Client, ClusterService, Listener)}.
* Note that even the call to
* {@link LifecycleAction#execute(Index, Client, ClusterService, Listener)}
* may be retried even after this method is called.
*
* @param e
* the exception that caused the failure

View File

@ -6,10 +6,8 @@
package org.elasticsearch.xpack.indexlifecycle;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
@ -74,7 +72,7 @@ public class ReplicasAction implements LifecycleAction {
}
@Override
public void execute(Index index, Client client, Listener listener) {
public void execute(Index index, Client client, ClusterService clusterService, Listener listener) {
// nocommit: stub
listener.onSuccess(true);
}

View File

@ -7,41 +7,107 @@ package org.elasticsearch.xpack.indexlifecycle;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.Index;
import java.io.IOException;
import java.util.Objects;
/**
* A {@link LifecycleAction} which rolls over the index.
* A {@link LifecycleAction} which deletes the index.
*/
public class RolloverAction implements LifecycleAction {
public static final String NAME = "rollover";
public static final ParseField ALIAS_FIELD = new ParseField("alias");
public static final ParseField MAX_SIZE_FIELD = new ParseField("max_size");
public static final ParseField MAX_DOCS_FIELD = new ParseField("max_docs");
public static final ParseField MAX_AGE_FIELD = new ParseField("max_age");
private static final Logger logger = ESLoggerFactory.getLogger(RolloverAction.class);
private static final ObjectParser<RolloverAction, Void> PARSER = new ObjectParser<>(NAME, RolloverAction::new);
private static final ConstructingObjectParser<RolloverAction, Void> PARSER = new ConstructingObjectParser<>(NAME,
a -> new RolloverAction((String) a[0], (ByteSizeValue) a[1], (TimeValue) a[2], (Long) a[3]));
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), ALIAS_FIELD);
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_SIZE_FIELD.getPreferredName()), MAX_SIZE_FIELD, ValueType.VALUE);
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> TimeValue.parseTimeValue(p.text(), MAX_AGE_FIELD.getPreferredName()), MAX_AGE_FIELD, ValueType.VALUE);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), MAX_DOCS_FIELD);
}
private final String alias;
private final ByteSizeValue maxSize;
private final Long maxDocs;
private final TimeValue maxAge;
public static RolloverAction parse(XContentParser parser) {
return PARSER.apply(parser, null);
}
public RolloverAction() {
public RolloverAction(String alias, ByteSizeValue maxSize, TimeValue maxAge, Long maxDocs) {
if (alias == null) {
throw new IllegalArgumentException(ALIAS_FIELD.getPreferredName() + " must be not be null");
}
if (maxSize == null && maxAge == null && maxDocs == null) {
throw new IllegalArgumentException("At least one rollover condition must be set.");
}
this.alias = alias;
this.maxSize = maxSize;
this.maxAge = maxAge;
this.maxDocs = maxDocs;
}
public RolloverAction(StreamInput in) throws IOException {
alias = in.readString();
if (in.readBoolean()) {
maxSize = new ByteSizeValue(in);
} else {
maxSize = null;
}
if (in.readBoolean()) {
maxAge = new TimeValue(in);
} else {
maxAge = null;
}
if (in.readBoolean()) {
maxDocs = in.readVLong();
} else {
maxDocs = null;
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(alias);
boolean hasMaxSize = maxSize != null;
out.writeBoolean(hasMaxSize);
if (hasMaxSize) {
maxSize.writeTo(out);
}
boolean hasMaxAge = maxAge != null;
out.writeBoolean(hasMaxAge);
if (hasMaxAge) {
maxAge.writeTo(out);
}
boolean hasMaxDocs = maxDocs != null;
out.writeBoolean(hasMaxDocs);
if (hasMaxDocs) {
out.writeVLong(maxDocs);
}
}
@Override
@ -49,22 +115,73 @@ public class RolloverAction implements LifecycleAction {
return NAME;
}
public String getAlias() {
return alias;
}
public ByteSizeValue getMaxSize() {
return maxSize;
}
public TimeValue getMaxAge() {
return maxAge;
}
public Long getMaxDocs() {
return maxDocs;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(ALIAS_FIELD.getPreferredName(), alias);
if (maxSize != null) {
builder.field(MAX_SIZE_FIELD.getPreferredName(), maxSize.getStringRep());
}
if (maxAge != null) {
builder.field(MAX_AGE_FIELD.getPreferredName(), maxAge.getStringRep());
}
if (maxDocs != null) {
builder.field(MAX_DOCS_FIELD.getPreferredName(), maxDocs);
}
builder.endObject();
return builder;
}
@Override
public void execute(Index index, Client client, Listener listener) {
// nocommit: stub
listener.onSuccess(true);
public void execute(Index index, Client client, ClusterService clusterService, Listener listener) {
if (clusterService.state().getMetaData().index(index.getName()).getAliases().containsKey(alias)) {
RolloverRequest rolloverRequest = new RolloverRequest(alias, null);
if (maxAge != null) {
rolloverRequest.addMaxIndexAgeCondition(maxAge);
}
if (maxSize != null) {
rolloverRequest.addMaxIndexSizeCondition(maxSize);
}
if (maxDocs != null) {
rolloverRequest.addMaxIndexDocsCondition(maxDocs);
}
client.admin().indices().rolloverIndex(rolloverRequest, new ActionListener<RolloverResponse>() {
@Override
public void onResponse(RolloverResponse rolloverResponse) {
logger.error(rolloverResponse);
listener.onSuccess(rolloverResponse.isRolledOver());
}
@Override
public void onFailure(Exception e) {
logger.error(e);
listener.onFailure(e);
}
});
} else {
listener.onSuccess(true);
}
}
@Override
public int hashCode() {
return 1;
return Objects.hash(alias, maxSize, maxAge, maxDocs);
}
@Override
@ -75,7 +192,11 @@ public class RolloverAction implements LifecycleAction {
if (obj.getClass() != getClass()) {
return false;
}
return true;
RolloverAction other = (RolloverAction) obj;
return Objects.equals(alias, other.alias) &&
Objects.equals(maxSize, other.maxSize) &&
Objects.equals(maxAge, other.maxAge) &&
Objects.equals(maxDocs, other.maxDocs);
}
@Override

View File

@ -10,6 +10,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -57,7 +58,7 @@ public class ShrinkAction implements LifecycleAction {
}
@Override
public void execute(Index index, Client client, Listener listener) {
public void execute(Index index, Client client, ClusterService clusterService, Listener listener) {
client.admin().indices().delete(new DeleteIndexRequest(index.getName()), new ActionListener<DeleteIndexResponse>() {
@Override
public void onResponse(DeleteIndexResponse deleteIndexResponse) {

View File

@ -67,7 +67,7 @@ public class DeleteActionTests extends AbstractSerializingTestCase<DeleteAction>
SetOnce<Boolean> actionCompleted = new SetOnce<>();
DeleteAction action = new DeleteAction();
action.execute(index, client, new Listener() {
action.execute(index, client, null, new Listener() {
@Override
public void onSuccess(boolean completed) {
@ -116,7 +116,7 @@ public class DeleteActionTests extends AbstractSerializingTestCase<DeleteAction>
SetOnce<Boolean> exceptionThrown = new SetOnce<>();
DeleteAction action = new DeleteAction();
action.execute(index, client, new Listener() {
action.execute(index, client, null, new Listener() {
@Override
public void onSuccess(boolean completed) {

View File

@ -57,7 +57,7 @@ public class InternalIndexLifecycleContextTests extends ESTestCase {
}
}).when(indicesClient).updateSettings(Mockito.any(), Mockito.any());
InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, client, () -> {
InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, client, null, () -> {
throw new AssertionError("nowSupplier should not be called");
});
@ -115,7 +115,7 @@ public class InternalIndexLifecycleContextTests extends ESTestCase {
}
}).when(indicesClient).updateSettings(Mockito.any(), Mockito.any());
InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, client, () -> {
InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, client, null, () -> {
throw new AssertionError("nowSupplier should not be called");
});
@ -176,7 +176,7 @@ public class InternalIndexLifecycleContextTests extends ESTestCase {
}
}).when(indicesClient).updateSettings(Mockito.any(), Mockito.any());
InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, client, () -> {
InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, client, null, () -> {
throw new AssertionError("nowSupplier should not be called");
});
@ -212,7 +212,7 @@ public class InternalIndexLifecycleContextTests extends ESTestCase {
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_PHASE_SETTING.getKey(), phase).build())
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, null, () -> {
InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, null, null, () -> {
throw new AssertionError("nowSupplier should not be called");
});
@ -249,7 +249,7 @@ public class InternalIndexLifecycleContextTests extends ESTestCase {
}
}).when(indicesClient).updateSettings(Mockito.any(), Mockito.any());
InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, client, () -> {
InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, client, null, () -> {
throw new AssertionError("nowSupplier should not be called");
});
@ -306,7 +306,7 @@ public class InternalIndexLifecycleContextTests extends ESTestCase {
}
}).when(indicesClient).updateSettings(Mockito.any(), Mockito.any());
InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, client, () -> {
InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, client, null, () -> {
throw new AssertionError("nowSupplier should not be called");
});
@ -366,7 +366,7 @@ public class InternalIndexLifecycleContextTests extends ESTestCase {
}
}).when(indicesClient).updateSettings(Mockito.any(), Mockito.any());
InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, client, () -> {
InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, client, null, () -> {
throw new AssertionError("nowSupplier should not be called");
});
@ -402,7 +402,7 @@ public class InternalIndexLifecycleContextTests extends ESTestCase {
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), action).build())
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, null, () -> {
InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, null, null, () -> {
throw new AssertionError("nowSupplier should not be called");
});
@ -416,7 +416,7 @@ public class InternalIndexLifecycleContextTests extends ESTestCase {
.settings(Settings.builder().put("index.version.created", 7000001L).put("index.creation_date", creationDate).build())
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, null, () -> {
InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, null, null, () -> {
throw new AssertionError("nowSupplier should not be called");
});
@ -432,7 +432,7 @@ public class InternalIndexLifecycleContextTests extends ESTestCase {
.settings(Settings.builder().put("index.version.created", 7000001L).put("index.creation_date", creationDate).build())
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, null, () -> now);
InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, null, null, () -> now);
Phase phase = new Phase("test_phase", after, Collections.emptyMap());
@ -448,7 +448,7 @@ public class InternalIndexLifecycleContextTests extends ESTestCase {
.settings(Settings.builder().put("index.version.created", 7000001L).put("index.creation_date", creationDate).build())
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, null, () -> now);
InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, null, null, () -> now);
Phase phase = new Phase("test_phase", after, Collections.emptyMap());
@ -464,7 +464,7 @@ public class InternalIndexLifecycleContextTests extends ESTestCase {
.settings(Settings.builder().put("index.version.created", 7000001L).put("index.creation_date", creationDate).build())
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, null, () -> now);
InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, null, null, () -> now);
Phase phase = new Phase("test_phase", after, Collections.emptyMap());
@ -476,7 +476,7 @@ public class InternalIndexLifecycleContextTests extends ESTestCase {
.settings(Settings.builder().put("index.version.created", 7000001L).put("index.creation_date", 0L).build())
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, null, () -> {
InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, null, null, () -> {
throw new AssertionError("nowSupplier should not be called");
});

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.indexlifecycle;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
@ -99,7 +100,7 @@ public class MockAction implements LifecycleAction {
}
@Override
public void execute(Index index, Client client, Listener listener) {
public void execute(Index index, Client client, ClusterService clusterService, Listener listener) {
executedCount.incrementAndGet();
if (exceptionToThrow == null) {
if (completeOnExecute) {

View File

@ -56,7 +56,7 @@ public class MockActionTests extends AbstractSerializingTestCase<MockAction> {
SetOnce<Boolean> listenerCalled = new SetOnce<>();
action.execute(null, null, new LifecycleAction.Listener() {
action.execute(null, null, null, new LifecycleAction.Listener() {
@Override
public void onSuccess(boolean completed) {
@ -84,7 +84,7 @@ public class MockActionTests extends AbstractSerializingTestCase<MockAction> {
SetOnce<Boolean> listenerCalled = new SetOnce<>();
action.execute(null, null, new LifecycleAction.Listener() {
action.execute(null, null, null, new LifecycleAction.Listener() {
@Override
public void onSuccess(boolean completed) {
@ -114,7 +114,7 @@ public class MockActionTests extends AbstractSerializingTestCase<MockAction> {
SetOnce<Boolean> listenerCalled = new SetOnce<>();
action.execute(null, null, new LifecycleAction.Listener() {
action.execute(null, null, null, new LifecycleAction.Listener() {
@Override
public void onSuccess(boolean completed) {

View File

@ -72,7 +72,7 @@ public abstract class MockIndexLifecycleContext implements IndexLifecycleContext
@Override
public void executeAction(LifecycleAction action, LifecycleAction.Listener listener) {
action.execute(null, null, listener);
action.execute(null, null, null, listener);
}
}

View File

@ -5,11 +5,41 @@
*/
package org.elasticsearch.xpack.indexlifecycle;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.rollover.Condition;
import org.elasticsearch.action.admin.indices.rollover.MaxAgeCondition;
import org.elasticsearch.action.admin.indices.rollover.MaxDocsCondition;
import org.elasticsearch.action.admin.indices.rollover.MaxSizeCondition;
import org.elasticsearch.action.admin.indices.rollover.RolloverIndexTestHelper;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.Index;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.indexlifecycle.LifecycleAction.Listener;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
public class RolloverActionTests extends AbstractSerializingTestCase<RolloverAction> {
@ -20,11 +50,301 @@ public class RolloverActionTests extends AbstractSerializingTestCase<RolloverAct
@Override
protected RolloverAction createTestInstance() {
return new RolloverAction();
String alias = randomAlphaOfLengthBetween(1, 20);
ByteSizeUnit maxSizeUnit = randomFrom(ByteSizeUnit.values());
ByteSizeValue maxSize = randomBoolean() ? null : new ByteSizeValue(randomNonNegativeLong() / maxSizeUnit.toBytes(1), maxSizeUnit);
Long maxDocs = randomBoolean() ? null : randomNonNegativeLong();
TimeValue maxAge = (maxDocs == null && maxSize == null || randomBoolean())
? TimeValue.parseTimeValue(randomPositiveTimeValue(), "rollover_action_test")
: null;
return new RolloverAction(alias, maxSize, maxAge, maxDocs);
}
@Override
protected Reader<RolloverAction> instanceReader() {
return RolloverAction::new;
}
@Override
protected RolloverAction mutateInstance(RolloverAction instance) throws IOException {
String alias = instance.getAlias();
ByteSizeValue maxSize = instance.getMaxSize();
TimeValue maxAge = instance.getMaxAge();
Long maxDocs = instance.getMaxDocs();
switch (between(0, 3)) {
case 0:
alias = alias + randomAlphaOfLengthBetween(1, 5);
break;
case 1:
maxSize = randomValueOtherThan(maxSize, () -> {
ByteSizeUnit maxSizeUnit = randomFrom(ByteSizeUnit.values());
return new ByteSizeValue(randomNonNegativeLong() / maxSizeUnit.toBytes(1), maxSizeUnit);
});
break;
case 2:
maxAge = TimeValue.parseTimeValue(randomPositiveTimeValue(), "rollover_action_test");
break;
case 3:
maxDocs = randomNonNegativeLong();
break;
default:
throw new AssertionError("Illegal randomisation branch");
}
return new RolloverAction(alias, maxSize, maxAge, maxDocs);
}
public void testNoConditions() {
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
() -> new RolloverAction(randomAlphaOfLengthBetween(1, 20), null, null, null));
assertEquals("At least one rollover condition must be set.", exception.getMessage());
}
public void testNoAlias() {
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> new RolloverAction(null, null, null, 1L));
assertEquals(RolloverAction.ALIAS_FIELD.getPreferredName() + " must be not be null", exception.getMessage());
}
public void testExecute() throws Exception {
Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
RolloverAction action = createTestInstance();
IndexMetaData indexMetadata = IndexMetaData.builder(index.getName())
.settings(Settings.builder().put("index.version.created", Version.CURRENT.id))
.putAlias(AliasMetaData.builder(action.getAlias()))
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
ImmutableOpenMap.Builder<String, IndexMetaData> indices = ImmutableOpenMap.<String, IndexMetaData> builder().fPut(index.getName(),
indexMetadata);
ClusterState clusterstate = ClusterState.builder(ClusterState.EMPTY_STATE).metaData(MetaData.builder().indices(indices.build()))
.build();
Client client = Mockito.mock(Client.class);
AdminClient adminClient = Mockito.mock(AdminClient.class);
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
ClusterService clusterService = Mockito.mock(ClusterService.class);
Mockito.when(client.admin()).thenReturn(adminClient);
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
Mockito.doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
RolloverRequest request = (RolloverRequest) invocation.getArguments()[0];
@SuppressWarnings("unchecked")
ActionListener<RolloverResponse> listener = (ActionListener<RolloverResponse>) invocation.getArguments()[1];
Set<Condition<?>> expectedConditions = new HashSet<>();
if (action.getMaxAge() != null) {
expectedConditions.add(new MaxAgeCondition(action.getMaxAge()));
}
if (action.getMaxSize() != null) {
expectedConditions.add(new MaxSizeCondition(action.getMaxSize()));
}
if (action.getMaxDocs() != null) {
expectedConditions.add(new MaxDocsCondition(action.getMaxDocs()));
}
RolloverIndexTestHelper.assertRolloverIndexRequest(request, action.getAlias(), expectedConditions);
listener.onResponse(RolloverIndexTestHelper.createMockResponse(request, true));
return null;
}
}).when(indicesClient).rolloverIndex(Mockito.any(), Mockito.any());
Mockito.when(clusterService.state()).thenReturn(clusterstate);
SetOnce<Boolean> actionCompleted = new SetOnce<>();
action.execute(index, client, clusterService, new Listener() {
@Override
public void onSuccess(boolean completed) {
actionCompleted.set(completed);
}
@Override
public void onFailure(Exception e) {
throw new AssertionError("Unexpected method call", e);
}
});
assertEquals(true, actionCompleted.get());
Mockito.verify(client, Mockito.only()).admin();
Mockito.verify(adminClient, Mockito.only()).indices();
Mockito.verify(indicesClient, Mockito.only()).rolloverIndex(Mockito.any(), Mockito.any());
Mockito.verify(clusterService, Mockito.only()).state();
}
public void testExecuteNotComplete() throws Exception {
Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
RolloverAction action = createTestInstance();
IndexMetaData indexMetadata = IndexMetaData.builder(index.getName())
.settings(Settings.builder().put("index.version.created", Version.CURRENT.id))
.putAlias(AliasMetaData.builder(action.getAlias())).numberOfShards(randomIntBetween(1, 5))
.numberOfReplicas(randomIntBetween(0, 5)).build();
ImmutableOpenMap.Builder<String, IndexMetaData> indices = ImmutableOpenMap.<String, IndexMetaData> builder().fPut(index.getName(),
indexMetadata);
ClusterState clusterstate = ClusterState.builder(ClusterState.EMPTY_STATE).metaData(MetaData.builder().indices(indices.build()))
.build();
Client client = Mockito.mock(Client.class);
AdminClient adminClient = Mockito.mock(AdminClient.class);
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
ClusterService clusterService = Mockito.mock(ClusterService.class);
Mockito.when(client.admin()).thenReturn(adminClient);
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
Mockito.doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
RolloverRequest request = (RolloverRequest) invocation.getArguments()[0];
@SuppressWarnings("unchecked")
ActionListener<RolloverResponse> listener = (ActionListener<RolloverResponse>) invocation.getArguments()[1];
Set<Condition<?>> expectedConditions = new HashSet<>();
if (action.getMaxAge() != null) {
expectedConditions.add(new MaxAgeCondition(action.getMaxAge()));
}
if (action.getMaxSize() != null) {
expectedConditions.add(new MaxSizeCondition(action.getMaxSize()));
}
if (action.getMaxDocs() != null) {
expectedConditions.add(new MaxDocsCondition(action.getMaxDocs()));
}
RolloverIndexTestHelper.assertRolloverIndexRequest(request, action.getAlias(), expectedConditions);
listener.onResponse(RolloverIndexTestHelper.createMockResponse(request, false));
return null;
}
}).when(indicesClient).rolloverIndex(Mockito.any(), Mockito.any());
Mockito.when(clusterService.state()).thenReturn(clusterstate);
SetOnce<Boolean> actionCompleted = new SetOnce<>();
action.execute(index, client, clusterService, new Listener() {
@Override
public void onSuccess(boolean completed) {
actionCompleted.set(completed);
}
@Override
public void onFailure(Exception e) {
throw new AssertionError("Unexpected method call", e);
}
});
assertEquals(false, actionCompleted.get());
Mockito.verify(client, Mockito.only()).admin();
Mockito.verify(adminClient, Mockito.only()).indices();
Mockito.verify(indicesClient, Mockito.only()).rolloverIndex(Mockito.any(), Mockito.any());
Mockito.verify(clusterService, Mockito.only()).state();
}
public void testExecuteAlreadyCompleted() throws Exception {
Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
RolloverAction action = createTestInstance();
IndexMetaData indexMetadata = IndexMetaData.builder(index.getName())
.settings(Settings.builder().put("index.version.created", Version.CURRENT.id)).numberOfShards(randomIntBetween(1, 5))
.numberOfReplicas(randomIntBetween(0, 5)).build();
ImmutableOpenMap.Builder<String, IndexMetaData> indices = ImmutableOpenMap.<String, IndexMetaData> builder().fPut(index.getName(),
indexMetadata);
ClusterState clusterstate = ClusterState.builder(ClusterState.EMPTY_STATE).metaData(MetaData.builder().indices(indices.build()))
.build();
Client client = Mockito.mock(Client.class);
AdminClient adminClient = Mockito.mock(AdminClient.class);
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
ClusterService clusterService = Mockito.mock(ClusterService.class);
Mockito.when(clusterService.state()).thenReturn(clusterstate);
SetOnce<Boolean> actionCompleted = new SetOnce<>();
action.execute(index, client, clusterService, new Listener() {
@Override
public void onSuccess(boolean completed) {
actionCompleted.set(completed);
}
@Override
public void onFailure(Exception e) {
throw new AssertionError("Unexpected method call", e);
}
});
assertEquals(true, actionCompleted.get());
Mockito.verify(clusterService, Mockito.only()).state();
Mockito.verifyZeroInteractions(client, adminClient, indicesClient);
}
public void testExecuteFailure() throws Exception {
Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
Exception exception = new RuntimeException();
RolloverAction action = createTestInstance();
IndexMetaData indexMetadata = IndexMetaData.builder(index.getName())
.settings(Settings.builder().put("index.version.created", Version.CURRENT.id))
.putAlias(AliasMetaData.builder(action.getAlias())).numberOfShards(randomIntBetween(1, 5))
.numberOfReplicas(randomIntBetween(0, 5)).build();
ImmutableOpenMap.Builder<String, IndexMetaData> indices = ImmutableOpenMap.<String, IndexMetaData> builder().fPut(index.getName(),
indexMetadata);
ClusterState clusterstate = ClusterState.builder(ClusterState.EMPTY_STATE).metaData(MetaData.builder().indices(indices.build()))
.build();
Client client = Mockito.mock(Client.class);
AdminClient adminClient = Mockito.mock(AdminClient.class);
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
ClusterService clusterService = Mockito.mock(ClusterService.class);
Mockito.when(client.admin()).thenReturn(adminClient);
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
Mockito.doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
RolloverRequest request = (RolloverRequest) invocation.getArguments()[0];
@SuppressWarnings("unchecked")
ActionListener<RolloverResponse> listener = (ActionListener<RolloverResponse>) invocation.getArguments()[1];
Set<Condition<?>> expectedConditions = new HashSet<>();
if (action.getMaxAge() != null) {
expectedConditions.add(new MaxAgeCondition(action.getMaxAge()));
}
if (action.getMaxSize() != null) {
expectedConditions.add(new MaxSizeCondition(action.getMaxSize()));
}
if (action.getMaxDocs() != null) {
expectedConditions.add(new MaxDocsCondition(action.getMaxDocs()));
}
RolloverIndexTestHelper.assertRolloverIndexRequest(request, action.getAlias(), expectedConditions);
listener.onFailure(exception);
return null;
}
}).when(indicesClient).rolloverIndex(Mockito.any(), Mockito.any());
Mockito.when(clusterService.state()).thenReturn(clusterstate);
SetOnce<Boolean> exceptionThrown = new SetOnce<>();
action.execute(index, client, clusterService, new Listener() {
@Override
public void onSuccess(boolean completed) {
throw new AssertionError("Unexpected method call");
}
@Override
public void onFailure(Exception e) {
assertEquals(exception, e);
exceptionThrown.set(true);
}
});
assertEquals(true, exceptionThrown.get());
Mockito.verify(client, Mockito.only()).admin();
Mockito.verify(adminClient, Mockito.only()).indices();
Mockito.verify(indicesClient, Mockito.only()).rolloverIndex(Mockito.any(), Mockito.any());
}
}

View File

@ -9,6 +9,7 @@ import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
@ -41,7 +42,7 @@ public class TimeseriesLifecyclePolicyTests extends AbstractSerializingTestCase<
private static final DeleteAction TEST_DELETE_ACTION = new DeleteAction();
private static final ForceMergeAction TEST_FORCE_MERGE_ACTION = new ForceMergeAction();
private static final ReplicasAction TEST_REPLICAS_ACTION = new ReplicasAction(1);
private static final RolloverAction TEST_ROLLOVER_ACTION = new RolloverAction();
private static final RolloverAction TEST_ROLLOVER_ACTION = new RolloverAction("", new ByteSizeValue(1), null, null);
private static final ShrinkAction TEST_SHRINK_ACTION = new ShrinkAction();
@Before