refactors code to allow better testing
The client and index metadata have now been abstracted away from the Lifecycle classes behind IndexLifecycleContext. This allow us to test the state machine without having to worry about how the state is persisted and read. It also makes the classes much easier to read and reason about.
This commit is contained in:
parent
e113189347
commit
9e891de777
|
@ -0,0 +1,30 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.indexlifecycle;
|
||||
|
||||
public interface IndexLifecycleContext {
|
||||
|
||||
void setPhase(String phase, Listener listener);
|
||||
|
||||
void setAction(String action, Listener listener);
|
||||
|
||||
String getAction();
|
||||
|
||||
String getPhase();
|
||||
|
||||
String getLifecycleTarget();
|
||||
|
||||
boolean canExecute(Phase phase);
|
||||
|
||||
public void executeAction(LifecycleAction action);
|
||||
|
||||
public static interface Listener {
|
||||
|
||||
void onSuccess();
|
||||
|
||||
void onFailure(Exception e);
|
||||
}
|
||||
}
|
|
@ -74,7 +74,7 @@ public class IndexLifecycleInitialisationService extends AbstractComponent
|
|||
if (Strings.isNullOrEmpty(policyName) == false) {
|
||||
logger.error("Checking index for next action: " + idxMeta.getIndex().getName() + " (" + policyName + ")");
|
||||
LifecyclePolicy policy = policies.get(policyName);
|
||||
policy.execute(idxMeta, client, nowSupplier);
|
||||
policy.execute(new InternalIndexLifecycleContext(idxMeta, client, nowSupplier));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -0,0 +1,86 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
|
||||
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
||||
import java.util.function.LongSupplier;
|
||||
|
||||
public class InternalIndexLifecycleContext implements IndexLifecycleContext {
|
||||
|
||||
private Client client;
|
||||
private IndexMetaData idxMeta;
|
||||
private LongSupplier nowSupplier;
|
||||
|
||||
public InternalIndexLifecycleContext(IndexMetaData idxMeta, Client client, LongSupplier nowSupplier) {
|
||||
this.idxMeta = idxMeta;
|
||||
this.client = client;
|
||||
this.nowSupplier = nowSupplier;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setPhase(String phase, Listener listener) {
|
||||
writeSettings(idxMeta.getIndex().getName(),
|
||||
Settings.builder().put(IndexLifecycle.LIFECYCLE_TIMESERIES_PHASE_SETTING.getKey(), phase)
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), "").build(), listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPhase() {
|
||||
return IndexLifecycle.LIFECYCLE_TIMESERIES_PHASE_SETTING.get(idxMeta.getSettings());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAction(String action, Listener listener) {
|
||||
writeSettings(idxMeta.getIndex().getName(),
|
||||
Settings.builder().put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), action).build(), listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getAction() {
|
||||
return IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.get(idxMeta.getSettings());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getLifecycleTarget() {
|
||||
return idxMeta.getIndex().getName();
|
||||
}
|
||||
|
||||
public boolean canExecute(Phase phase) {
|
||||
long now = nowSupplier.getAsLong();
|
||||
long indexCreated = idxMeta.getCreationDate();
|
||||
return (indexCreated + phase.getAfter().millis()) <= now;
|
||||
}
|
||||
|
||||
public void executeAction(LifecycleAction action) {
|
||||
action.execute(idxMeta.getIndex(), client);
|
||||
}
|
||||
|
||||
private void writeSettings(String index, Settings settings, Listener listener) {
|
||||
client.admin().indices().updateSettings(new UpdateSettingsRequest(settings, index), new ActionListener<UpdateSettingsResponse>() {
|
||||
|
||||
@Override
|
||||
public void onResponse(UpdateSettingsResponse response) {
|
||||
if (response.isAcknowledged()) {
|
||||
listener.onSuccess();
|
||||
} else {
|
||||
listener.onFailure(null);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
}
|
|
@ -6,12 +6,7 @@
|
|||
package org.elasticsearch.xpack.indexlifecycle;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
|
||||
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.AbstractDiffable;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
|
@ -19,17 +14,16 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.logging.ESLoggerFactory;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.xpack.indexlifecycle.IndexLifecycleContext.Listener;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.function.LongSupplier;
|
||||
|
||||
public class LifecyclePolicy extends AbstractDiffable<LifecyclePolicy> implements ToXContentObject, Writeable {
|
||||
private static final Logger logger = ESLoggerFactory.getLogger(LifecyclePolicy.class);
|
||||
|
@ -90,34 +84,11 @@ public class LifecyclePolicy extends AbstractDiffable<LifecyclePolicy> implement
|
|||
return builder;
|
||||
}
|
||||
|
||||
public void execute(IndexMetaData idxMeta, Client client, LongSupplier nowSupplier) {
|
||||
String currentPhaseName = IndexLifecycle.LIFECYCLE_TIMESERIES_PHASE_SETTING.get(idxMeta.getSettings());
|
||||
boolean currentPhaseActionsComplete = IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.get(idxMeta.getSettings())
|
||||
.equals(Phase.PHASE_COMPLETED);
|
||||
String indexName = idxMeta.getIndex().getName();
|
||||
if (Strings.isNullOrEmpty(currentPhaseName)) {
|
||||
String firstPhaseName = phases.get(0).getName();
|
||||
client.admin().indices().updateSettings(new UpdateSettingsRequest(
|
||||
Settings.builder().put(IndexLifecycle.LIFECYCLE_TIMESERIES_PHASE_SETTING.getKey(), firstPhaseName)
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), "").build(),
|
||||
indexName),
|
||||
new ActionListener<UpdateSettingsResponse>() {
|
||||
|
||||
@Override
|
||||
public void onResponse(UpdateSettingsResponse response) {
|
||||
if (response.isAcknowledged()) {
|
||||
logger.info("Successfully initialised phase [" + firstPhaseName + "] for index [" + indexName + "]");
|
||||
} else {
|
||||
logger.error("Failed to initialised phase [" + firstPhaseName + "] for index [" + indexName + "]");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
logger.error("Failed to initialised phase [" + firstPhaseName + "] for index [" + indexName + "]", e);
|
||||
}
|
||||
});
|
||||
} else if (currentPhaseActionsComplete) {
|
||||
public void execute(IndexLifecycleContext context) {
|
||||
String currentPhaseName = context.getPhase();
|
||||
boolean currentPhaseActionsComplete = context.getAction().equals(Phase.PHASE_COMPLETED);
|
||||
String indexName = context.getLifecycleTarget();
|
||||
if (Strings.isNullOrEmpty(currentPhaseName) || currentPhaseActionsComplete) {
|
||||
int currentPhaseIndex = -1;
|
||||
for (int i = 0; i < phases.size(); i++) {
|
||||
if (phases.get(i).getName().equals(currentPhaseName)) {
|
||||
|
@ -127,36 +98,27 @@ public class LifecyclePolicy extends AbstractDiffable<LifecyclePolicy> implement
|
|||
}
|
||||
if (currentPhaseIndex < phases.size() - 1) {
|
||||
Phase nextPhase = phases.get(currentPhaseIndex + 1);
|
||||
if (nextPhase.canExecute(idxMeta, nowSupplier)) {
|
||||
if (context.canExecute(nextPhase)) {
|
||||
String nextPhaseName = nextPhase.getName();
|
||||
client.admin().indices().updateSettings(
|
||||
new UpdateSettingsRequest(
|
||||
Settings.builder().put(IndexLifecycle.LIFECYCLE_TIMESERIES_PHASE_SETTING.getKey(), nextPhaseName)
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), "").build(),
|
||||
indexName),
|
||||
new ActionListener<UpdateSettingsResponse>() {
|
||||
context.setPhase(nextPhaseName, new Listener() {
|
||||
|
||||
@Override
|
||||
public void onResponse(UpdateSettingsResponse response) {
|
||||
if (response.isAcknowledged()) {
|
||||
logger.info("Successfully initialised phase [" + nextPhaseName + "] for index [" + indexName + "]");
|
||||
} else {
|
||||
logger.error("Failed to initialised phase [" + nextPhaseName + "] for index [" + indexName + "]");
|
||||
}
|
||||
}
|
||||
@Override
|
||||
public void onSuccess() {
|
||||
logger.info("Successfully initialised phase [" + nextPhaseName + "] for index [" + indexName + "]");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
logger.error("Failed to initialised phase [" + nextPhaseName + "] for index [" + indexName + "]", e);
|
||||
}
|
||||
});
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
logger.error("Failed to initialised phase [" + nextPhaseName + "] for index [" + indexName + "]", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Phase currentPhase = phases.stream().filter(phase -> phase.getName().equals(currentPhaseName)).findAny()
|
||||
.orElseThrow(() -> new IllegalStateException("Current phase [" + currentPhaseName + "] not found in lifecycle ["
|
||||
+ getName() + "] for index [" + indexName + "]"));
|
||||
currentPhase.execute(idxMeta, client);
|
||||
currentPhase.execute(context);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -6,11 +6,6 @@
|
|||
package org.elasticsearch.xpack.indexlifecycle;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
|
||||
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
|
@ -18,7 +13,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.logging.ESLoggerFactory;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
|
@ -26,11 +20,11 @@ import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
|
|||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.xpack.indexlifecycle.IndexLifecycleContext.Listener;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.function.LongSupplier;
|
||||
|
||||
public class Phase implements ToXContentObject, Writeable {
|
||||
public static final String PHASE_COMPLETED = "ACTIONS COMPLETED";
|
||||
|
@ -91,15 +85,9 @@ public class Phase implements ToXContentObject, Writeable {
|
|||
return actions;
|
||||
}
|
||||
|
||||
protected boolean canExecute(IndexMetaData idxMeta, LongSupplier nowSupplier) {
|
||||
long now = nowSupplier.getAsLong();
|
||||
long indexCreated = idxMeta.getCreationDate();
|
||||
return (indexCreated + after.millis()) <= now;
|
||||
}
|
||||
|
||||
protected void execute(IndexMetaData idxMeta, Client client) {
|
||||
String currentActionName = IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.get(idxMeta.getSettings());
|
||||
String indexName = idxMeta.getIndex().getName();
|
||||
protected void execute(IndexLifecycleContext context) {
|
||||
String currentActionName = context.getAction();
|
||||
String indexName = context.getLifecycleTarget();
|
||||
if (Strings.isNullOrEmpty(currentActionName)) {
|
||||
String firstActionName;
|
||||
if (actions.isEmpty()) {
|
||||
|
@ -107,33 +95,23 @@ public class Phase implements ToXContentObject, Writeable {
|
|||
} else {
|
||||
firstActionName = actions.get(0).getWriteableName();
|
||||
}
|
||||
client.admin().indices()
|
||||
.updateSettings(
|
||||
new UpdateSettingsRequest(Settings.builder()
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), firstActionName).build(), indexName),
|
||||
new ActionListener<UpdateSettingsResponse>() {
|
||||
context.setAction(firstActionName, new Listener() {
|
||||
|
||||
@Override
|
||||
public void onResponse(UpdateSettingsResponse response) {
|
||||
if (response.isAcknowledged()) {
|
||||
logger.info(
|
||||
"Successfully initialised action [" + firstActionName + "] for index [" + indexName + "]");
|
||||
} else {
|
||||
logger.error(
|
||||
"Failed to initialised action [" + firstActionName + "] for index [" + indexName + "]");
|
||||
}
|
||||
}
|
||||
@Override
|
||||
public void onSuccess() {
|
||||
logger.info("Successfully initialised action [" + firstActionName + "] for index [" + indexName + "]");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
logger.error("Failed to initialised action [" + firstActionName + "] for index [" + indexName + "]", e);
|
||||
}
|
||||
});
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
logger.error("Failed to initialised action [" + firstActionName + "] for index [" + indexName + "]", e);
|
||||
}
|
||||
});
|
||||
} else if (currentActionName.equals(PHASE_COMPLETED) == false) {
|
||||
LifecycleAction currentAction = actions.stream().filter(action -> action.getWriteableName().equals(currentActionName)).findAny()
|
||||
.orElseThrow(() -> new IllegalStateException("Current action [" + currentActionName + "] not found in phase ["
|
||||
+ getName() + "] for index [" + indexName + "]"));
|
||||
currentAction.execute(idxMeta.getIndex(), client);
|
||||
context.executeAction(currentAction);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.Diff;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData.Custom;
|
||||
|
@ -109,4 +110,12 @@ public class IndexLifecycleMetadataTests extends AbstractDiffableSerializationTe
|
|||
return IndexLifecycleMetadataDiff::new;
|
||||
}
|
||||
|
||||
public void testMinimumSupportedVersion() {
|
||||
assertEquals(Version.V_7_0_0_alpha1, createTestInstance().getMinimalSupportedVersion());
|
||||
}
|
||||
|
||||
public void testcontext() {
|
||||
assertEquals(MetaData.ALL_CONTEXTS, createTestInstance().context());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,491 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.indexlifecycle;
|
||||
|
||||
import org.apache.lucene.util.SetOnce;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
|
||||
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse;
|
||||
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsTestHelper;
|
||||
import org.elasticsearch.client.AdminClient;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.IndicesAdminClient;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.indexlifecycle.IndexLifecycleContext.Listener;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import java.util.Collections;
|
||||
|
||||
public class InternalIndexLifecycleContextTests extends ESTestCase {
|
||||
|
||||
public void testSetPhase() {
|
||||
long creationDate = randomNonNegativeLong();
|
||||
String newPhase = randomAlphaOfLengthBetween(1, 20);
|
||||
String indexName = "test";
|
||||
IndexMetaData idxMeta = IndexMetaData.builder(indexName)
|
||||
.settings(Settings.builder().put("index.version.created", 7000001L).put("index.creation_date", creationDate)
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_PHASE_SETTING.getKey(), randomAlphaOfLengthBetween(1, 20))
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), randomAlphaOfLengthBetween(1, 20)).build())
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
|
||||
Client client = Mockito.mock(Client.class);
|
||||
AdminClient adminClient = Mockito.mock(AdminClient.class);
|
||||
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
|
||||
|
||||
Mockito.when(client.admin()).thenReturn(adminClient);
|
||||
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
|
||||
Mockito.doAnswer(new Answer<Void>() {
|
||||
|
||||
@Override
|
||||
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||
UpdateSettingsRequest request = (UpdateSettingsRequest) invocation.getArguments()[0];
|
||||
@SuppressWarnings("unchecked")
|
||||
ActionListener<UpdateSettingsResponse> listener = (ActionListener<UpdateSettingsResponse>) invocation.getArguments()[1];
|
||||
Settings expectedSettings = Settings.builder().put(IndexLifecycle.LIFECYCLE_TIMESERIES_PHASE_SETTING.getKey(), newPhase)
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), "").build();
|
||||
UpdateSettingsTestHelper.assertSettingsRequest(request, expectedSettings, indexName);
|
||||
listener.onResponse(UpdateSettingsTestHelper.createMockResponse(true));
|
||||
return null;
|
||||
}
|
||||
}).when(indicesClient).updateSettings(Mockito.any(), Mockito.any());
|
||||
|
||||
InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, client, () -> {
|
||||
throw new AssertionError("nowSupplier should not be called");
|
||||
});
|
||||
|
||||
// Use setOnce so it throws an error if we call the listener multiple
|
||||
// times
|
||||
SetOnce<Boolean> listenerCalled = new SetOnce<>();
|
||||
context.setPhase(newPhase, new Listener() {
|
||||
|
||||
@Override
|
||||
public void onSuccess() {
|
||||
listenerCalled.set(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
throw new AssertionError("Unexpected Error", e);
|
||||
}
|
||||
});
|
||||
|
||||
assertEquals(true, listenerCalled.get());
|
||||
|
||||
Mockito.verify(client, Mockito.only()).admin();
|
||||
Mockito.verify(adminClient, Mockito.only()).indices();
|
||||
Mockito.verify(indicesClient, Mockito.only()).updateSettings(Mockito.any(), Mockito.any());
|
||||
}
|
||||
|
||||
public void testSetPhaseNotAcknowledged() {
|
||||
long creationDate = randomNonNegativeLong();
|
||||
String newPhase = randomAlphaOfLengthBetween(1, 20);
|
||||
String indexName = "test";
|
||||
IndexMetaData idxMeta = IndexMetaData.builder(indexName)
|
||||
.settings(Settings.builder().put("index.version.created", 7000001L).put("index.creation_date", creationDate)
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_PHASE_SETTING.getKey(), randomAlphaOfLengthBetween(1, 20))
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), randomAlphaOfLengthBetween(1, 20)).build())
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
|
||||
Client client = Mockito.mock(Client.class);
|
||||
AdminClient adminClient = Mockito.mock(AdminClient.class);
|
||||
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
|
||||
|
||||
Mockito.when(client.admin()).thenReturn(adminClient);
|
||||
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
|
||||
Mockito.doAnswer(new Answer<Void>() {
|
||||
|
||||
@Override
|
||||
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||
UpdateSettingsRequest request = (UpdateSettingsRequest) invocation.getArguments()[0];
|
||||
@SuppressWarnings("unchecked")
|
||||
ActionListener<UpdateSettingsResponse> listener = (ActionListener<UpdateSettingsResponse>) invocation.getArguments()[1];
|
||||
Settings expectedSettings = Settings.builder().put(IndexLifecycle.LIFECYCLE_TIMESERIES_PHASE_SETTING.getKey(), newPhase)
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), "").build();
|
||||
UpdateSettingsTestHelper.assertSettingsRequest(request, expectedSettings, indexName);
|
||||
listener.onResponse(UpdateSettingsTestHelper.createMockResponse(false));
|
||||
return null;
|
||||
}
|
||||
}).when(indicesClient).updateSettings(Mockito.any(), Mockito.any());
|
||||
|
||||
InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, client, () -> {
|
||||
throw new AssertionError("nowSupplier should not be called");
|
||||
});
|
||||
|
||||
// Use setOnce so it throws an error if we call the listener multiple
|
||||
// times
|
||||
SetOnce<Boolean> listenerCalled = new SetOnce<>();
|
||||
context.setPhase(newPhase, new Listener() {
|
||||
|
||||
@Override
|
||||
public void onSuccess() {
|
||||
throw new AssertionError("Unexpected Error");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
assertNull(e);
|
||||
listenerCalled.set(true);
|
||||
}
|
||||
});
|
||||
|
||||
assertEquals(true, listenerCalled.get());
|
||||
|
||||
Mockito.verify(client, Mockito.only()).admin();
|
||||
Mockito.verify(adminClient, Mockito.only()).indices();
|
||||
Mockito.verify(indicesClient, Mockito.only()).updateSettings(Mockito.any(), Mockito.any());
|
||||
}
|
||||
|
||||
public void testSetPhaseFailure() {
|
||||
long creationDate = randomNonNegativeLong();
|
||||
String newPhase = randomAlphaOfLengthBetween(1, 20);
|
||||
String indexName = "test";
|
||||
IndexMetaData idxMeta = IndexMetaData.builder(indexName)
|
||||
.settings(Settings.builder().put("index.version.created", 7000001L).put("index.creation_date", creationDate)
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_PHASE_SETTING.getKey(), randomAlphaOfLengthBetween(1, 20))
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), randomAlphaOfLengthBetween(1, 20)).build())
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
|
||||
Exception exception = new RuntimeException();
|
||||
|
||||
Client client = Mockito.mock(Client.class);
|
||||
AdminClient adminClient = Mockito.mock(AdminClient.class);
|
||||
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
|
||||
|
||||
Mockito.when(client.admin()).thenReturn(adminClient);
|
||||
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
|
||||
Mockito.doAnswer(new Answer<Void>() {
|
||||
|
||||
@Override
|
||||
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||
UpdateSettingsRequest request = (UpdateSettingsRequest) invocation.getArguments()[0];
|
||||
@SuppressWarnings("unchecked")
|
||||
ActionListener<UpdateSettingsResponse> listener = (ActionListener<UpdateSettingsResponse>) invocation.getArguments()[1];
|
||||
Settings expectedSettings = Settings.builder().put(IndexLifecycle.LIFECYCLE_TIMESERIES_PHASE_SETTING.getKey(), newPhase)
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), "").build();
|
||||
UpdateSettingsTestHelper.assertSettingsRequest(request, expectedSettings, indexName);
|
||||
listener.onFailure(exception);
|
||||
return null;
|
||||
}
|
||||
}).when(indicesClient).updateSettings(Mockito.any(), Mockito.any());
|
||||
|
||||
InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, client, () -> {
|
||||
throw new AssertionError("nowSupplier should not be called");
|
||||
});
|
||||
|
||||
// Use setOnce so it throws an error if we call the listener multiple
|
||||
// times
|
||||
SetOnce<Boolean> listenerCalled = new SetOnce<>();
|
||||
context.setPhase(newPhase, new Listener() {
|
||||
|
||||
@Override
|
||||
public void onSuccess() {
|
||||
throw new AssertionError("Unexpected Error");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
assertSame(exception, e);
|
||||
listenerCalled.set(true);
|
||||
}
|
||||
});
|
||||
|
||||
assertEquals(true, listenerCalled.get());
|
||||
|
||||
Mockito.verify(client, Mockito.only()).admin();
|
||||
Mockito.verify(adminClient, Mockito.only()).indices();
|
||||
Mockito.verify(indicesClient, Mockito.only()).updateSettings(Mockito.any(), Mockito.any());
|
||||
}
|
||||
|
||||
public void testGetPhase() {
|
||||
long creationDate = randomNonNegativeLong();
|
||||
String phase = randomAlphaOfLengthBetween(1, 20);
|
||||
IndexMetaData idxMeta = IndexMetaData.builder("test")
|
||||
.settings(Settings.builder().put("index.version.created", 7000001L).put("index.creation_date", creationDate)
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_PHASE_SETTING.getKey(), phase).build())
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
|
||||
InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, null, () -> {
|
||||
throw new AssertionError("nowSupplier should not be called");
|
||||
});
|
||||
|
||||
assertEquals(phase, context.getPhase());
|
||||
}
|
||||
|
||||
public void testSetAction() {
|
||||
long creationDate = randomNonNegativeLong();
|
||||
String newAction = randomAlphaOfLengthBetween(1, 20);
|
||||
String indexName = "test";
|
||||
IndexMetaData idxMeta = IndexMetaData.builder(indexName)
|
||||
.settings(Settings.builder().put("index.version.created", 7000001L).put("index.creation_date", creationDate)
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), randomAlphaOfLengthBetween(1, 20)).build())
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
|
||||
Client client = Mockito.mock(Client.class);
|
||||
AdminClient adminClient = Mockito.mock(AdminClient.class);
|
||||
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
|
||||
|
||||
Mockito.when(client.admin()).thenReturn(adminClient);
|
||||
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
|
||||
Mockito.doAnswer(new Answer<Void>() {
|
||||
|
||||
@Override
|
||||
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||
UpdateSettingsRequest request = (UpdateSettingsRequest) invocation.getArguments()[0];
|
||||
@SuppressWarnings("unchecked")
|
||||
ActionListener<UpdateSettingsResponse> listener = (ActionListener<UpdateSettingsResponse>) invocation.getArguments()[1];
|
||||
Settings expectedSettings = Settings.builder().put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), newAction)
|
||||
.build();
|
||||
UpdateSettingsTestHelper.assertSettingsRequest(request, expectedSettings, indexName);
|
||||
listener.onResponse(UpdateSettingsTestHelper.createMockResponse(true));
|
||||
return null;
|
||||
}
|
||||
}).when(indicesClient).updateSettings(Mockito.any(), Mockito.any());
|
||||
|
||||
InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, client, () -> {
|
||||
throw new AssertionError("nowSupplier should not be called");
|
||||
});
|
||||
|
||||
// Use setOnce so it throws an error if we call the listener multiple
|
||||
// times
|
||||
SetOnce<Boolean> listenerCalled = new SetOnce<>();
|
||||
context.setAction(newAction, new Listener() {
|
||||
|
||||
@Override
|
||||
public void onSuccess() {
|
||||
listenerCalled.set(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
throw new AssertionError("Unexpected Error", e);
|
||||
}
|
||||
});
|
||||
|
||||
assertEquals(true, listenerCalled.get());
|
||||
|
||||
Mockito.verify(client, Mockito.only()).admin();
|
||||
Mockito.verify(adminClient, Mockito.only()).indices();
|
||||
Mockito.verify(indicesClient, Mockito.only()).updateSettings(Mockito.any(), Mockito.any());
|
||||
}
|
||||
|
||||
public void testSetActionNotAcknoledged() {
|
||||
long creationDate = randomNonNegativeLong();
|
||||
String newAction = randomAlphaOfLengthBetween(1, 20);
|
||||
String indexName = "test";
|
||||
IndexMetaData idxMeta = IndexMetaData.builder(indexName)
|
||||
.settings(Settings.builder().put("index.version.created", 7000001L).put("index.creation_date", creationDate)
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), randomAlphaOfLengthBetween(1, 20)).build())
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
|
||||
Client client = Mockito.mock(Client.class);
|
||||
AdminClient adminClient = Mockito.mock(AdminClient.class);
|
||||
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
|
||||
|
||||
Mockito.when(client.admin()).thenReturn(adminClient);
|
||||
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
|
||||
Mockito.doAnswer(new Answer<Void>() {
|
||||
|
||||
@Override
|
||||
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||
UpdateSettingsRequest request = (UpdateSettingsRequest) invocation.getArguments()[0];
|
||||
@SuppressWarnings("unchecked")
|
||||
ActionListener<UpdateSettingsResponse> listener = (ActionListener<UpdateSettingsResponse>) invocation.getArguments()[1];
|
||||
Settings expectedSettings = Settings.builder().put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), newAction)
|
||||
.build();
|
||||
UpdateSettingsTestHelper.assertSettingsRequest(request, expectedSettings, indexName);
|
||||
listener.onResponse(UpdateSettingsTestHelper.createMockResponse(false));
|
||||
return null;
|
||||
}
|
||||
}).when(indicesClient).updateSettings(Mockito.any(), Mockito.any());
|
||||
|
||||
InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, client, () -> {
|
||||
throw new AssertionError("nowSupplier should not be called");
|
||||
});
|
||||
|
||||
// Use setOnce so it throws an error if we call the listener multiple
|
||||
// times
|
||||
SetOnce<Boolean> listenerCalled = new SetOnce<>();
|
||||
context.setAction(newAction, new Listener() {
|
||||
|
||||
@Override
|
||||
public void onSuccess() {
|
||||
throw new AssertionError("Unexpected Error");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
assertNull(e);
|
||||
listenerCalled.set(true);
|
||||
}
|
||||
});
|
||||
|
||||
assertEquals(true, listenerCalled.get());
|
||||
|
||||
Mockito.verify(client, Mockito.only()).admin();
|
||||
Mockito.verify(adminClient, Mockito.only()).indices();
|
||||
Mockito.verify(indicesClient, Mockito.only()).updateSettings(Mockito.any(), Mockito.any());
|
||||
}
|
||||
|
||||
public void testSetActionFailure() {
|
||||
long creationDate = randomNonNegativeLong();
|
||||
String newAction = randomAlphaOfLengthBetween(1, 20);
|
||||
String indexName = "test";
|
||||
IndexMetaData idxMeta = IndexMetaData.builder(indexName)
|
||||
.settings(Settings.builder().put("index.version.created", 7000001L).put("index.creation_date", creationDate)
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), randomAlphaOfLengthBetween(1, 20)).build())
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
|
||||
Exception exception = new RuntimeException();
|
||||
|
||||
Client client = Mockito.mock(Client.class);
|
||||
AdminClient adminClient = Mockito.mock(AdminClient.class);
|
||||
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
|
||||
|
||||
Mockito.when(client.admin()).thenReturn(adminClient);
|
||||
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
|
||||
Mockito.doAnswer(new Answer<Void>() {
|
||||
|
||||
@Override
|
||||
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||
UpdateSettingsRequest request = (UpdateSettingsRequest) invocation.getArguments()[0];
|
||||
@SuppressWarnings("unchecked")
|
||||
ActionListener<UpdateSettingsResponse> listener = (ActionListener<UpdateSettingsResponse>) invocation.getArguments()[1];
|
||||
Settings expectedSettings = Settings.builder().put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), newAction)
|
||||
.build();
|
||||
UpdateSettingsTestHelper.assertSettingsRequest(request, expectedSettings, indexName);
|
||||
listener.onFailure(exception);
|
||||
return null;
|
||||
}
|
||||
}).when(indicesClient).updateSettings(Mockito.any(), Mockito.any());
|
||||
|
||||
InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, client, () -> {
|
||||
throw new AssertionError("nowSupplier should not be called");
|
||||
});
|
||||
|
||||
// Use setOnce so it throws an error if we call the listener multiple
|
||||
// times
|
||||
SetOnce<Boolean> listenerCalled = new SetOnce<>();
|
||||
context.setAction(newAction, new Listener() {
|
||||
|
||||
@Override
|
||||
public void onSuccess() {
|
||||
throw new AssertionError("Unexpected Error");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
assertSame(exception, e);
|
||||
listenerCalled.set(true);
|
||||
}
|
||||
});
|
||||
|
||||
assertEquals(true, listenerCalled.get());
|
||||
|
||||
Mockito.verify(client, Mockito.only()).admin();
|
||||
Mockito.verify(adminClient, Mockito.only()).indices();
|
||||
Mockito.verify(indicesClient, Mockito.only()).updateSettings(Mockito.any(), Mockito.any());
|
||||
}
|
||||
|
||||
public void testGetAction() {
|
||||
long creationDate = randomNonNegativeLong();
|
||||
String action = randomAlphaOfLengthBetween(1, 20);
|
||||
IndexMetaData idxMeta = IndexMetaData.builder("test")
|
||||
.settings(Settings.builder().put("index.version.created", 7000001L).put("index.creation_date", creationDate)
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), action).build())
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
|
||||
InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, null, () -> {
|
||||
throw new AssertionError("nowSupplier should not be called");
|
||||
});
|
||||
|
||||
assertEquals(action, context.getAction());
|
||||
}
|
||||
|
||||
public void testGetLifecycleTarget() {
|
||||
long creationDate = randomNonNegativeLong();
|
||||
String index = randomAlphaOfLengthBetween(1, 20);
|
||||
IndexMetaData idxMeta = IndexMetaData.builder(index)
|
||||
.settings(Settings.builder().put("index.version.created", 7000001L).put("index.creation_date", creationDate).build())
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
|
||||
InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, null, () -> {
|
||||
throw new AssertionError("nowSupplier should not be called");
|
||||
});
|
||||
|
||||
assertEquals(index, context.getLifecycleTarget());
|
||||
}
|
||||
|
||||
public void testCanExecuteBeforeTrigger() throws Exception {
|
||||
TimeValue after = TimeValue.timeValueSeconds(randomIntBetween(0, 100000));
|
||||
long creationDate = randomNonNegativeLong();
|
||||
long now = random().longs(creationDate, creationDate + after.millis()).iterator().nextLong();
|
||||
|
||||
IndexMetaData idxMeta = IndexMetaData.builder("test")
|
||||
.settings(Settings.builder().put("index.version.created", 7000001L).put("index.creation_date", creationDate).build())
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
|
||||
InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, null, () -> now);
|
||||
|
||||
Phase phase = new Phase("test_phase", after, Collections.emptyList());
|
||||
|
||||
assertFalse(context.canExecute(phase));
|
||||
}
|
||||
|
||||
public void testCanExecuteOnTrigger() throws Exception {
|
||||
TimeValue after = TimeValue.timeValueSeconds(randomIntBetween(0, 100000));
|
||||
long creationDate = randomNonNegativeLong();
|
||||
long now = creationDate + after.millis();
|
||||
|
||||
IndexMetaData idxMeta = IndexMetaData.builder("test")
|
||||
.settings(Settings.builder().put("index.version.created", 7000001L).put("index.creation_date", creationDate).build())
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
|
||||
InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, null, () -> now);
|
||||
|
||||
Phase phase = new Phase("test_phase", after, Collections.emptyList());
|
||||
|
||||
assertTrue(context.canExecute(phase));
|
||||
}
|
||||
|
||||
public void testCanExecuteAfterTrigger() throws Exception {
|
||||
TimeValue after = TimeValue.timeValueSeconds(randomIntBetween(0, 100000));
|
||||
long creationDate = randomNonNegativeLong();
|
||||
long now = random().longs(creationDate + after.millis(), Long.MAX_VALUE).iterator().nextLong();
|
||||
|
||||
IndexMetaData idxMeta = IndexMetaData.builder("test")
|
||||
.settings(Settings.builder().put("index.version.created", 7000001L).put("index.creation_date", creationDate).build())
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
|
||||
InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, null, () -> now);
|
||||
|
||||
Phase phase = new Phase("test_phase", after, Collections.emptyList());
|
||||
|
||||
assertTrue(context.canExecute(phase));
|
||||
}
|
||||
|
||||
public void testExecuteAction() {
|
||||
IndexMetaData idxMeta = IndexMetaData.builder("test")
|
||||
.settings(Settings.builder().put("index.version.created", 7000001L).put("index.creation_date", 0L).build())
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
|
||||
InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, null, () -> {
|
||||
throw new AssertionError("nowSupplier should not be called");
|
||||
});
|
||||
|
||||
MockAction action = new MockAction();
|
||||
|
||||
assertFalse(action.wasExecuted());
|
||||
|
||||
context.executeAction(action);
|
||||
|
||||
assertTrue(action.wasExecuted());
|
||||
}
|
||||
}
|
|
@ -5,27 +5,15 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
|
||||
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse;
|
||||
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsTestHelper;
|
||||
import org.elasticsearch.client.AdminClient;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.IndicesAdminClient;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.Writeable.Reader;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.test.AbstractSerializingTestCase;
|
||||
import org.junit.Before;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -101,7 +89,7 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
|
|||
return new LifecyclePolicy(name, phases);
|
||||
}
|
||||
|
||||
public void testExecuteNewIndex() throws Exception {
|
||||
public void testExecuteNewIndexBeforeTrigger() throws Exception {
|
||||
String indexName = randomAlphaOfLengthBetween(1, 20);
|
||||
String lifecycleName = randomAlphaOfLengthBetween(1, 20);
|
||||
List<Phase> phases = new ArrayList<>();
|
||||
|
@ -125,41 +113,125 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
|
|||
phases.add(thirdPhase);
|
||||
LifecyclePolicy policy = new LifecyclePolicy(lifecycleName, phases);
|
||||
|
||||
IndexMetaData idxMeta = IndexMetaData.builder(indexName).settings(Settings.builder().put("index.version.created", 7000001L).build())
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
|
||||
Client client = Mockito.mock(Client.class);
|
||||
AdminClient adminClient = Mockito.mock(AdminClient.class);
|
||||
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
|
||||
|
||||
Mockito.when(client.admin()).thenReturn(adminClient);
|
||||
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
|
||||
Mockito.doAnswer(new Answer<Void>() {
|
||||
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, "", "") {
|
||||
|
||||
@Override
|
||||
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||
UpdateSettingsRequest request = (UpdateSettingsRequest) invocation.getArguments()[0];
|
||||
@SuppressWarnings("unchecked")
|
||||
ActionListener<UpdateSettingsResponse> listener = (ActionListener<UpdateSettingsResponse>) invocation.getArguments()[1];
|
||||
Settings expectedSettings = Settings.builder()
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_PHASE_SETTING.getKey(), "first_phase")
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), "").build();
|
||||
UpdateSettingsTestHelper.assertSettingsRequest(request, expectedSettings, indexName);
|
||||
listener.onResponse(UpdateSettingsTestHelper.createMockResponse(true));
|
||||
return null;
|
||||
public boolean canExecute(Phase phase) {
|
||||
if (phase == firstPhase) {
|
||||
return false;
|
||||
} else {
|
||||
throw new AssertionError("canExecute should not have been called on this phase: " + phase.getName());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
}).when(indicesClient).updateSettings(Mockito.any(), Mockito.any());
|
||||
policy.execute(context);
|
||||
|
||||
policy.execute(idxMeta, client, () -> 0L);
|
||||
assertEquals(indexName, context.getLifecycleTarget());
|
||||
assertEquals("", context.getPhase());
|
||||
assertEquals("", context.getAction());
|
||||
|
||||
assertFalse(firstAction.wasExecuted());
|
||||
assertFalse(secondAction.wasExecuted());
|
||||
assertFalse(thirdAction.wasExecuted());
|
||||
}
|
||||
|
||||
Mockito.verify(client, Mockito.only()).admin();
|
||||
Mockito.verify(adminClient, Mockito.only()).indices();
|
||||
Mockito.verify(indicesClient, Mockito.only()).updateSettings(Mockito.any(), Mockito.any());
|
||||
public void testExecuteNewIndexAfterTrigger() throws Exception {
|
||||
String indexName = randomAlphaOfLengthBetween(1, 20);
|
||||
String lifecycleName = randomAlphaOfLengthBetween(1, 20);
|
||||
List<Phase> phases = new ArrayList<>();
|
||||
List<LifecycleAction> actions = new ArrayList<>();
|
||||
MockAction firstAction = new MockAction();
|
||||
actions.add(firstAction);
|
||||
TimeValue after = TimeValue.timeValueSeconds(0);
|
||||
Phase firstPhase = new Phase("first_phase", after, actions);
|
||||
phases.add(firstPhase);
|
||||
actions = new ArrayList<>();
|
||||
MockAction secondAction = new MockAction();
|
||||
actions.add(secondAction);
|
||||
after = TimeValue.timeValueSeconds(10);
|
||||
Phase secondPhase = new Phase("second_phase", after, actions);
|
||||
phases.add(secondPhase);
|
||||
actions = new ArrayList<>();
|
||||
MockAction thirdAction = new MockAction();
|
||||
actions.add(thirdAction);
|
||||
after = TimeValue.timeValueSeconds(20);
|
||||
Phase thirdPhase = new Phase("third_phase", after, actions);
|
||||
phases.add(thirdPhase);
|
||||
LifecyclePolicy policy = new LifecyclePolicy(lifecycleName, phases);
|
||||
|
||||
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, "", "") {
|
||||
|
||||
@Override
|
||||
public boolean canExecute(Phase phase) {
|
||||
if (phase == firstPhase) {
|
||||
return true;
|
||||
} else {
|
||||
throw new AssertionError("canExecute should not have been called on this phase: " + phase.getName());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
policy.execute(context);
|
||||
|
||||
assertEquals(indexName, context.getLifecycleTarget());
|
||||
assertEquals(firstPhase.getName(), context.getPhase());
|
||||
assertEquals("", context.getAction());
|
||||
|
||||
assertFalse(firstAction.wasExecuted());
|
||||
assertFalse(secondAction.wasExecuted());
|
||||
assertFalse(thirdAction.wasExecuted());
|
||||
}
|
||||
|
||||
public void testExecuteNewIndexAfterTriggerFailure() throws Exception {
|
||||
String indexName = randomAlphaOfLengthBetween(1, 20);
|
||||
String lifecycleName = randomAlphaOfLengthBetween(1, 20);
|
||||
List<Phase> phases = new ArrayList<>();
|
||||
List<LifecycleAction> actions = new ArrayList<>();
|
||||
MockAction firstAction = new MockAction();
|
||||
actions.add(firstAction);
|
||||
TimeValue after = TimeValue.timeValueSeconds(0);
|
||||
Phase firstPhase = new Phase("first_phase", after, actions);
|
||||
phases.add(firstPhase);
|
||||
actions = new ArrayList<>();
|
||||
MockAction secondAction = new MockAction();
|
||||
actions.add(secondAction);
|
||||
after = TimeValue.timeValueSeconds(10);
|
||||
Phase secondPhase = new Phase("second_phase", after, actions);
|
||||
phases.add(secondPhase);
|
||||
actions = new ArrayList<>();
|
||||
MockAction thirdAction = new MockAction();
|
||||
actions.add(thirdAction);
|
||||
after = TimeValue.timeValueSeconds(20);
|
||||
Phase thirdPhase = new Phase("third_phase", after, actions);
|
||||
phases.add(thirdPhase);
|
||||
LifecyclePolicy policy = new LifecyclePolicy(lifecycleName, phases);
|
||||
|
||||
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, "", "") {
|
||||
|
||||
@Override
|
||||
public boolean canExecute(Phase phase) {
|
||||
if (phase == firstPhase) {
|
||||
return true;
|
||||
} else {
|
||||
throw new AssertionError("canExecute should not have been called on this phase: " + phase.getName());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
RuntimeException exception = new RuntimeException();
|
||||
|
||||
context.failOnSetters(exception);
|
||||
|
||||
policy.execute(context);
|
||||
|
||||
assertEquals(indexName, context.getLifecycleTarget());
|
||||
assertEquals("", context.getPhase());
|
||||
assertEquals("", context.getAction());
|
||||
|
||||
assertFalse(firstAction.wasExecuted());
|
||||
assertFalse(secondAction.wasExecuted());
|
||||
assertFalse(thirdAction.wasExecuted());
|
||||
}
|
||||
|
||||
public void testExecuteFirstPhase() throws Exception {
|
||||
|
@ -186,43 +258,23 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
|
|||
phases.add(thirdPhase);
|
||||
LifecyclePolicy policy = new LifecyclePolicy(lifecycleName, phases);
|
||||
|
||||
IndexMetaData idxMeta = IndexMetaData.builder(indexName)
|
||||
.settings(Settings.builder().put("index.version.created", 7000001L)
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_PHASE_SETTING.getKey(), firstPhase.getName())
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), "").build())
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
|
||||
Client client = Mockito.mock(Client.class);
|
||||
AdminClient adminClient = Mockito.mock(AdminClient.class);
|
||||
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
|
||||
|
||||
Mockito.when(client.admin()).thenReturn(adminClient);
|
||||
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
|
||||
Mockito.doAnswer(new Answer<Void>() {
|
||||
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, firstPhase.getName(), "") {
|
||||
|
||||
@Override
|
||||
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||
UpdateSettingsRequest request = (UpdateSettingsRequest) invocation.getArguments()[0];
|
||||
@SuppressWarnings("unchecked")
|
||||
ActionListener<UpdateSettingsResponse> listener = (ActionListener<UpdateSettingsResponse>) invocation.getArguments()[1];
|
||||
Settings expectedSettings = Settings.builder()
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), MockAction.NAME).build();
|
||||
UpdateSettingsTestHelper.assertSettingsRequest(request, expectedSettings, indexName);
|
||||
listener.onResponse(UpdateSettingsTestHelper.createMockResponse(true));
|
||||
return null;
|
||||
public boolean canExecute(Phase phase) {
|
||||
throw new AssertionError("canExecute should not have been called.");
|
||||
}
|
||||
};
|
||||
|
||||
}).when(indicesClient).updateSettings(Mockito.any(), Mockito.any());
|
||||
policy.execute(context);
|
||||
|
||||
policy.execute(idxMeta, client, () -> 0L);
|
||||
assertEquals(indexName, context.getLifecycleTarget());
|
||||
assertEquals(firstPhase.getName(), context.getPhase());
|
||||
assertEquals(MockAction.NAME, context.getAction());
|
||||
|
||||
assertFalse(firstAction.wasExecuted());
|
||||
assertFalse(secondAction.wasExecuted());
|
||||
assertFalse(thirdAction.wasExecuted());
|
||||
|
||||
Mockito.verify(client, Mockito.only()).admin();
|
||||
Mockito.verify(adminClient, Mockito.only()).indices();
|
||||
Mockito.verify(indicesClient, Mockito.only()).updateSettings(Mockito.any(), Mockito.any());
|
||||
}
|
||||
|
||||
public void testExecuteSecondPhase() throws Exception {
|
||||
|
@ -249,43 +301,23 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
|
|||
phases.add(thirdPhase);
|
||||
LifecyclePolicy policy = new LifecyclePolicy(lifecycleName, phases);
|
||||
|
||||
IndexMetaData idxMeta = IndexMetaData.builder(indexName)
|
||||
.settings(Settings.builder().put("index.version.created", 7000001L)
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_PHASE_SETTING.getKey(), secondPhase.getName())
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), "").build())
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
|
||||
Client client = Mockito.mock(Client.class);
|
||||
AdminClient adminClient = Mockito.mock(AdminClient.class);
|
||||
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
|
||||
|
||||
Mockito.when(client.admin()).thenReturn(adminClient);
|
||||
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
|
||||
Mockito.doAnswer(new Answer<Void>() {
|
||||
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, secondPhase.getName(), "") {
|
||||
|
||||
@Override
|
||||
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||
UpdateSettingsRequest request = (UpdateSettingsRequest) invocation.getArguments()[0];
|
||||
@SuppressWarnings("unchecked")
|
||||
ActionListener<UpdateSettingsResponse> listener = (ActionListener<UpdateSettingsResponse>) invocation.getArguments()[1];
|
||||
Settings expectedSettings = Settings.builder()
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), MockAction.NAME).build();
|
||||
UpdateSettingsTestHelper.assertSettingsRequest(request, expectedSettings, indexName);
|
||||
listener.onResponse(UpdateSettingsTestHelper.createMockResponse(true));
|
||||
return null;
|
||||
public boolean canExecute(Phase phase) {
|
||||
throw new AssertionError("canExecute should not have been called.");
|
||||
}
|
||||
};
|
||||
|
||||
}).when(indicesClient).updateSettings(Mockito.any(), Mockito.any());
|
||||
policy.execute(context);
|
||||
|
||||
policy.execute(idxMeta, client, () -> 0L);
|
||||
assertEquals(indexName, context.getLifecycleTarget());
|
||||
assertEquals(secondPhase.getName(), context.getPhase());
|
||||
assertEquals(MockAction.NAME, context.getAction());
|
||||
|
||||
assertFalse(firstAction.wasExecuted());
|
||||
assertFalse(secondAction.wasExecuted());
|
||||
assertFalse(thirdAction.wasExecuted());
|
||||
|
||||
Mockito.verify(client, Mockito.only()).admin();
|
||||
Mockito.verify(adminClient, Mockito.only()).indices();
|
||||
Mockito.verify(indicesClient, Mockito.only()).updateSettings(Mockito.any(), Mockito.any());
|
||||
}
|
||||
|
||||
public void testExecuteThirdPhase() throws Exception {
|
||||
|
@ -312,43 +344,23 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
|
|||
phases.add(thirdPhase);
|
||||
LifecyclePolicy policy = new LifecyclePolicy(lifecycleName, phases);
|
||||
|
||||
IndexMetaData idxMeta = IndexMetaData.builder(indexName)
|
||||
.settings(Settings.builder().put("index.version.created", 7000001L)
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_PHASE_SETTING.getKey(), thirdPhase.getName())
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), "").build())
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
|
||||
Client client = Mockito.mock(Client.class);
|
||||
AdminClient adminClient = Mockito.mock(AdminClient.class);
|
||||
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
|
||||
|
||||
Mockito.when(client.admin()).thenReturn(adminClient);
|
||||
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
|
||||
Mockito.doAnswer(new Answer<Void>() {
|
||||
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, thirdPhase.getName(), "") {
|
||||
|
||||
@Override
|
||||
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||
UpdateSettingsRequest request = (UpdateSettingsRequest) invocation.getArguments()[0];
|
||||
@SuppressWarnings("unchecked")
|
||||
ActionListener<UpdateSettingsResponse> listener = (ActionListener<UpdateSettingsResponse>) invocation.getArguments()[1];
|
||||
Settings expectedSettings = Settings.builder()
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), MockAction.NAME).build();
|
||||
UpdateSettingsTestHelper.assertSettingsRequest(request, expectedSettings, indexName);
|
||||
listener.onResponse(UpdateSettingsTestHelper.createMockResponse(true));
|
||||
return null;
|
||||
public boolean canExecute(Phase phase) {
|
||||
throw new AssertionError("canExecute should not have been called.");
|
||||
}
|
||||
};
|
||||
|
||||
}).when(indicesClient).updateSettings(Mockito.any(), Mockito.any());
|
||||
policy.execute(context);
|
||||
|
||||
policy.execute(idxMeta, client, () -> 0L);
|
||||
assertEquals(indexName, context.getLifecycleTarget());
|
||||
assertEquals(thirdPhase.getName(), context.getPhase());
|
||||
assertEquals(MockAction.NAME, context.getAction());
|
||||
|
||||
assertFalse(firstAction.wasExecuted());
|
||||
assertFalse(secondAction.wasExecuted());
|
||||
assertFalse(thirdAction.wasExecuted());
|
||||
|
||||
Mockito.verify(client, Mockito.only()).admin();
|
||||
Mockito.verify(adminClient, Mockito.only()).indices();
|
||||
Mockito.verify(indicesClient, Mockito.only()).updateSettings(Mockito.any(), Mockito.any());
|
||||
}
|
||||
|
||||
public void testExecuteMissingPhase() throws Exception {
|
||||
|
@ -375,34 +387,29 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
|
|||
phases.add(thirdPhase);
|
||||
LifecyclePolicy policy = new LifecyclePolicy(lifecycleName, phases);
|
||||
|
||||
IndexMetaData idxMeta = IndexMetaData.builder(indexName)
|
||||
.settings(Settings.builder().put("index.version.created", 7000001L)
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_PHASE_SETTING.getKey(), "does_not_exist")
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), "").build())
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, "does_not_exist", "") {
|
||||
|
||||
Client client = Mockito.mock(Client.class);
|
||||
AdminClient adminClient = Mockito.mock(AdminClient.class);
|
||||
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
|
||||
@Override
|
||||
public boolean canExecute(Phase phase) {
|
||||
throw new AssertionError("canExecute should not have been called.");
|
||||
}
|
||||
};
|
||||
|
||||
Mockito.when(client.admin()).thenReturn(adminClient);
|
||||
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
|
||||
|
||||
IllegalStateException exception = expectThrows(IllegalStateException.class, () -> policy.execute(idxMeta, client, () -> 0L));
|
||||
IllegalStateException exception = expectThrows(IllegalStateException.class, () -> policy.execute(context));
|
||||
assertEquals(
|
||||
"Current phase [" + "does_not_exist" + "] not found in lifecycle [" + lifecycleName + "] for index [" + indexName + "]",
|
||||
exception.getMessage());
|
||||
|
||||
assertEquals(indexName, context.getLifecycleTarget());
|
||||
assertEquals("does_not_exist", context.getPhase());
|
||||
assertEquals("", context.getAction());
|
||||
|
||||
assertFalse(firstAction.wasExecuted());
|
||||
assertFalse(secondAction.wasExecuted());
|
||||
assertFalse(thirdAction.wasExecuted());
|
||||
|
||||
Mockito.verifyZeroInteractions(client, adminClient, indicesClient);
|
||||
}
|
||||
|
||||
public void testExecuteFirstPhaseCompletedBeforeTrigger() throws Exception {
|
||||
long creationDate = 0L;
|
||||
long now = randomIntBetween(0, 9999);
|
||||
String indexName = randomAlphaOfLengthBetween(1, 20);
|
||||
String lifecycleName = randomAlphaOfLengthBetween(1, 20);
|
||||
List<Phase> phases = new ArrayList<>();
|
||||
|
@ -425,32 +432,31 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
|
|||
Phase thirdPhase = new Phase("third_phase", after, actions);
|
||||
phases.add(thirdPhase);
|
||||
LifecyclePolicy policy = new LifecyclePolicy(lifecycleName, phases);
|
||||
|
||||
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, firstPhase.getName(), Phase.PHASE_COMPLETED) {
|
||||
|
||||
@Override
|
||||
public boolean canExecute(Phase phase) {
|
||||
if (phase == secondPhase) {
|
||||
return false;
|
||||
} else {
|
||||
throw new AssertionError("canExecute should not have been called on this phase: " + phase.getName());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
IndexMetaData idxMeta = IndexMetaData.builder(indexName)
|
||||
.settings(Settings.builder().put("index.version.created", 7000001L).put("index.creation_date", creationDate)
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_PHASE_SETTING.getKey(), firstPhase.getName())
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), Phase.PHASE_COMPLETED).build())
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
policy.execute(context);
|
||||
|
||||
Client client = Mockito.mock(Client.class);
|
||||
AdminClient adminClient = Mockito.mock(AdminClient.class);
|
||||
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
|
||||
|
||||
Mockito.when(client.admin()).thenReturn(adminClient);
|
||||
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
|
||||
|
||||
policy.execute(idxMeta, client, () -> now);
|
||||
assertEquals(indexName, context.getLifecycleTarget());
|
||||
assertEquals(firstPhase.getName(), context.getPhase());
|
||||
assertEquals(Phase.PHASE_COMPLETED, context.getAction());
|
||||
|
||||
assertFalse(firstAction.wasExecuted());
|
||||
assertFalse(secondAction.wasExecuted());
|
||||
assertFalse(thirdAction.wasExecuted());
|
||||
|
||||
Mockito.verifyZeroInteractions(client, adminClient, indicesClient);
|
||||
}
|
||||
|
||||
public void testExecuteFirstPhaseCompletedAfterTrigger() throws Exception {
|
||||
long creationDate = 0L;
|
||||
long now = randomIntBetween(10000, 1000000);
|
||||
String indexName = randomAlphaOfLengthBetween(1, 20);
|
||||
String lifecycleName = randomAlphaOfLengthBetween(1, 20);
|
||||
List<Phase> phases = new ArrayList<>();
|
||||
|
@ -474,49 +480,30 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
|
|||
phases.add(thirdPhase);
|
||||
LifecyclePolicy policy = new LifecyclePolicy(lifecycleName, phases);
|
||||
|
||||
IndexMetaData idxMeta = IndexMetaData.builder(indexName)
|
||||
.settings(Settings.builder().put("index.version.created", 7000001L).put("index.creation_date", creationDate)
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_PHASE_SETTING.getKey(), firstPhase.getName())
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), Phase.PHASE_COMPLETED).build())
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
|
||||
Client client = Mockito.mock(Client.class);
|
||||
AdminClient adminClient = Mockito.mock(AdminClient.class);
|
||||
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
|
||||
|
||||
Mockito.when(client.admin()).thenReturn(adminClient);
|
||||
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
|
||||
Mockito.doAnswer(new Answer<Void>() {
|
||||
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, firstPhase.getName(), Phase.PHASE_COMPLETED) {
|
||||
|
||||
@Override
|
||||
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||
UpdateSettingsRequest request = (UpdateSettingsRequest) invocation.getArguments()[0];
|
||||
@SuppressWarnings("unchecked")
|
||||
ActionListener<UpdateSettingsResponse> listener = (ActionListener<UpdateSettingsResponse>) invocation.getArguments()[1];
|
||||
Settings expectedSettings = Settings.builder()
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_PHASE_SETTING.getKey(), "second_phase")
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), "").build();
|
||||
UpdateSettingsTestHelper.assertSettingsRequest(request, expectedSettings, indexName);
|
||||
listener.onResponse(UpdateSettingsTestHelper.createMockResponse(true));
|
||||
return null;
|
||||
public boolean canExecute(Phase phase) {
|
||||
if (phase == secondPhase) {
|
||||
return true;
|
||||
} else {
|
||||
throw new AssertionError("canExecute should not have been called on this phase: " + phase.getName());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
}).when(indicesClient).updateSettings(Mockito.any(), Mockito.any());
|
||||
policy.execute(context);
|
||||
|
||||
policy.execute(idxMeta, client, () -> now);
|
||||
assertEquals(indexName, context.getLifecycleTarget());
|
||||
assertEquals(secondPhase.getName(), context.getPhase());
|
||||
assertEquals("", context.getAction());
|
||||
|
||||
assertFalse(firstAction.wasExecuted());
|
||||
assertFalse(secondAction.wasExecuted());
|
||||
assertFalse(thirdAction.wasExecuted());
|
||||
|
||||
Mockito.verify(client, Mockito.only()).admin();
|
||||
Mockito.verify(adminClient, Mockito.only()).indices();
|
||||
Mockito.verify(indicesClient, Mockito.only()).updateSettings(Mockito.any(), Mockito.any());
|
||||
}
|
||||
|
||||
public void testExecuteSecondPhaseCompletedBeforeTrigger() throws Exception {
|
||||
long creationDate = 0L;
|
||||
long now = randomIntBetween(0, 19999);
|
||||
String indexName = randomAlphaOfLengthBetween(1, 20);
|
||||
String lifecycleName = randomAlphaOfLengthBetween(1, 20);
|
||||
List<Phase> phases = new ArrayList<>();
|
||||
|
@ -540,31 +527,30 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
|
|||
phases.add(thirdPhase);
|
||||
LifecyclePolicy policy = new LifecyclePolicy(lifecycleName, phases);
|
||||
|
||||
IndexMetaData idxMeta = IndexMetaData.builder(indexName)
|
||||
.settings(Settings.builder().put("index.version.created", 7000001L).put("index.creation_date", creationDate)
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_PHASE_SETTING.getKey(), secondPhase.getName())
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), Phase.PHASE_COMPLETED).build())
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, secondPhase.getName(), Phase.PHASE_COMPLETED) {
|
||||
|
||||
Client client = Mockito.mock(Client.class);
|
||||
AdminClient adminClient = Mockito.mock(AdminClient.class);
|
||||
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
|
||||
@Override
|
||||
public boolean canExecute(Phase phase) {
|
||||
if (phase == thirdPhase) {
|
||||
return false;
|
||||
} else {
|
||||
throw new AssertionError("canExecute should not have been called on this phase: " + phase.getName());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Mockito.when(client.admin()).thenReturn(adminClient);
|
||||
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
|
||||
policy.execute(context);
|
||||
|
||||
policy.execute(idxMeta, client, () -> now);
|
||||
assertEquals(indexName, context.getLifecycleTarget());
|
||||
assertEquals(secondPhase.getName(), context.getPhase());
|
||||
assertEquals(Phase.PHASE_COMPLETED, context.getAction());
|
||||
|
||||
assertFalse(firstAction.wasExecuted());
|
||||
assertFalse(secondAction.wasExecuted());
|
||||
assertFalse(thirdAction.wasExecuted());
|
||||
|
||||
Mockito.verifyZeroInteractions(client, adminClient, indicesClient);
|
||||
}
|
||||
|
||||
public void testExecuteSecondPhaseCompletedAfterTrigger() throws Exception {
|
||||
long creationDate = 0L;
|
||||
long now = randomIntBetween(20000, 1000000);
|
||||
String indexName = randomAlphaOfLengthBetween(1, 20);
|
||||
String lifecycleName = randomAlphaOfLengthBetween(1, 20);
|
||||
List<Phase> phases = new ArrayList<>();
|
||||
|
@ -588,49 +574,30 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
|
|||
phases.add(thirdPhase);
|
||||
LifecyclePolicy policy = new LifecyclePolicy(lifecycleName, phases);
|
||||
|
||||
IndexMetaData idxMeta = IndexMetaData.builder(indexName)
|
||||
.settings(Settings.builder().put("index.version.created", 7000001L).put("index.creation_date", creationDate)
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_PHASE_SETTING.getKey(), secondPhase.getName())
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), Phase.PHASE_COMPLETED).build())
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
|
||||
Client client = Mockito.mock(Client.class);
|
||||
AdminClient adminClient = Mockito.mock(AdminClient.class);
|
||||
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
|
||||
|
||||
Mockito.when(client.admin()).thenReturn(adminClient);
|
||||
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
|
||||
Mockito.doAnswer(new Answer<Void>() {
|
||||
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, secondPhase.getName(), Phase.PHASE_COMPLETED) {
|
||||
|
||||
@Override
|
||||
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||
UpdateSettingsRequest request = (UpdateSettingsRequest) invocation.getArguments()[0];
|
||||
@SuppressWarnings("unchecked")
|
||||
ActionListener<UpdateSettingsResponse> listener = (ActionListener<UpdateSettingsResponse>) invocation.getArguments()[1];
|
||||
Settings expectedSettings = Settings.builder()
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_PHASE_SETTING.getKey(), "third_phase")
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), "").build();
|
||||
UpdateSettingsTestHelper.assertSettingsRequest(request, expectedSettings, indexName);
|
||||
listener.onResponse(UpdateSettingsTestHelper.createMockResponse(true));
|
||||
return null;
|
||||
public boolean canExecute(Phase phase) {
|
||||
if (phase == thirdPhase) {
|
||||
return true;
|
||||
} else {
|
||||
throw new AssertionError("canExecute should not have been called on this phase: " + phase.getName());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
}).when(indicesClient).updateSettings(Mockito.any(), Mockito.any());
|
||||
policy.execute(context);
|
||||
|
||||
policy.execute(idxMeta, client, () -> now);
|
||||
assertEquals(indexName, context.getLifecycleTarget());
|
||||
assertEquals(thirdPhase.getName(), context.getPhase());
|
||||
assertEquals("", context.getAction());
|
||||
|
||||
assertFalse(firstAction.wasExecuted());
|
||||
assertFalse(secondAction.wasExecuted());
|
||||
assertFalse(thirdAction.wasExecuted());
|
||||
|
||||
Mockito.verify(client, Mockito.only()).admin();
|
||||
Mockito.verify(adminClient, Mockito.only()).indices();
|
||||
Mockito.verify(indicesClient, Mockito.only()).updateSettings(Mockito.any(), Mockito.any());
|
||||
}
|
||||
|
||||
public void testExecuteThirdPhaseCompleted() throws Exception {
|
||||
long creationDate = 0L;
|
||||
long now = randomIntBetween(20000, 1000000);
|
||||
String indexName = randomAlphaOfLengthBetween(1, 20);
|
||||
String lifecycleName = randomAlphaOfLengthBetween(1, 20);
|
||||
List<Phase> phases = new ArrayList<>();
|
||||
|
@ -654,26 +621,23 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
|
|||
phases.add(thirdPhase);
|
||||
LifecyclePolicy policy = new LifecyclePolicy(lifecycleName, phases);
|
||||
|
||||
IndexMetaData idxMeta = IndexMetaData.builder(indexName)
|
||||
.settings(Settings.builder().put("index.version.created", 7000001L).put("index.creation_date", creationDate)
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_PHASE_SETTING.getKey(), thirdPhase.getName())
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), Phase.PHASE_COMPLETED).build())
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, thirdPhase.getName(), Phase.PHASE_COMPLETED) {
|
||||
|
||||
Client client = Mockito.mock(Client.class);
|
||||
AdminClient adminClient = Mockito.mock(AdminClient.class);
|
||||
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
|
||||
@Override
|
||||
public boolean canExecute(Phase phase) {
|
||||
throw new AssertionError("canExecute should not have been called");
|
||||
}
|
||||
};
|
||||
|
||||
Mockito.when(client.admin()).thenReturn(adminClient);
|
||||
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
|
||||
policy.execute(context);
|
||||
|
||||
policy.execute(idxMeta, client, () -> now);
|
||||
assertEquals(indexName, context.getLifecycleTarget());
|
||||
assertEquals(thirdPhase.getName(), context.getPhase());
|
||||
assertEquals(Phase.PHASE_COMPLETED, context.getAction());
|
||||
|
||||
assertFalse(firstAction.wasExecuted());
|
||||
assertFalse(secondAction.wasExecuted());
|
||||
assertFalse(thirdAction.wasExecuted());
|
||||
|
||||
Mockito.verifyZeroInteractions(client, adminClient, indicesClient);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,66 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.indexlifecycle;
|
||||
|
||||
public abstract class MockIndexLifecycleContext implements IndexLifecycleContext {
|
||||
|
||||
private final String targetName;
|
||||
private String phase;
|
||||
private String action;
|
||||
private RuntimeException exceptionToThrow;
|
||||
|
||||
public MockIndexLifecycleContext(String targetName, String initialPhase, String initialAction) {
|
||||
this.targetName = targetName;
|
||||
this.phase = initialPhase;
|
||||
this.action = initialAction;
|
||||
}
|
||||
|
||||
public void failOnSetters(RuntimeException exceptionToThrow) {
|
||||
this.exceptionToThrow = exceptionToThrow;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setPhase(String phase, Listener listener) {
|
||||
if (exceptionToThrow != null) {
|
||||
listener.onFailure(exceptionToThrow);
|
||||
return;
|
||||
}
|
||||
this.phase = phase;
|
||||
this.action = "";
|
||||
listener.onSuccess();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAction(String action, Listener listener) {
|
||||
if (exceptionToThrow != null) {
|
||||
listener.onFailure(exceptionToThrow);
|
||||
return;
|
||||
}
|
||||
this.action = action;
|
||||
listener.onSuccess();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getAction() {
|
||||
return action;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPhase() {
|
||||
return phase;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getLifecycleTarget() {
|
||||
return targetName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void executeAction(LifecycleAction action) {
|
||||
action.execute(null, null);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,147 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.indexlifecycle;
|
||||
|
||||
import org.apache.lucene.util.SetOnce;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.indexlifecycle.IndexLifecycleContext.Listener;
|
||||
|
||||
public class MockIndexLifecycleContextTests extends ESTestCase {
|
||||
|
||||
public void testSetPhase() {
|
||||
String targetName = randomAlphaOfLengthBetween(1, 20);
|
||||
String newPhase = randomAlphaOfLengthBetween(1, 20);
|
||||
|
||||
MockIndexLifecycleContext context = new MockIndexLifecycleContext(targetName,
|
||||
randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20)) {
|
||||
|
||||
@Override
|
||||
public boolean canExecute(Phase phase) {
|
||||
throw new AssertionError("canExecute should not have been called.");
|
||||
}
|
||||
};
|
||||
|
||||
// Use setOnce so it throws an error if we call the listener multiple
|
||||
// times
|
||||
SetOnce<Boolean> listenerCalled = new SetOnce<>();
|
||||
context.setPhase(newPhase, new Listener() {
|
||||
|
||||
@Override
|
||||
public void onSuccess() {
|
||||
listenerCalled.set(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
throw new AssertionError("Unexpected Error", e);
|
||||
}
|
||||
});
|
||||
|
||||
assertEquals(true, listenerCalled.get());
|
||||
assertEquals(newPhase, context.getPhase());
|
||||
assertEquals("", context.getAction());
|
||||
assertEquals(targetName, context.getLifecycleTarget());
|
||||
}
|
||||
|
||||
public void testGetPhase() {
|
||||
String phase = randomAlphaOfLengthBetween(1, 20);
|
||||
|
||||
MockIndexLifecycleContext context = new MockIndexLifecycleContext(randomAlphaOfLengthBetween(1, 20), phase,
|
||||
randomAlphaOfLengthBetween(1, 20)) {
|
||||
|
||||
@Override
|
||||
public boolean canExecute(Phase phase) {
|
||||
throw new AssertionError("canExecute should not have been called.");
|
||||
}
|
||||
};
|
||||
|
||||
assertEquals(phase, context.getPhase());
|
||||
}
|
||||
|
||||
public void testSetAction() {
|
||||
String targetName = randomAlphaOfLengthBetween(1, 20);
|
||||
String phase = randomAlphaOfLengthBetween(1, 20);
|
||||
String newAction = randomAlphaOfLengthBetween(1, 20);
|
||||
|
||||
MockIndexLifecycleContext context = new MockIndexLifecycleContext(targetName, phase,
|
||||
randomAlphaOfLengthBetween(1, 20)) {
|
||||
|
||||
@Override
|
||||
public boolean canExecute(Phase phase) {
|
||||
throw new AssertionError("canExecute should not have been called.");
|
||||
}
|
||||
};
|
||||
|
||||
// Use setOnce so it throws an error if we call the listener multiple
|
||||
// times
|
||||
SetOnce<Boolean> listenerCalled = new SetOnce<>();
|
||||
context.setAction(newAction, new Listener() {
|
||||
|
||||
@Override
|
||||
public void onSuccess() {
|
||||
listenerCalled.set(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
throw new AssertionError("Unexpected Error", e);
|
||||
}
|
||||
});
|
||||
|
||||
assertEquals(true, listenerCalled.get());
|
||||
assertEquals(newAction, context.getAction());
|
||||
assertEquals(phase, context.getPhase());
|
||||
assertEquals(targetName, context.getLifecycleTarget());
|
||||
}
|
||||
|
||||
public void testGetAction() {
|
||||
String action = randomAlphaOfLengthBetween(1, 20);
|
||||
|
||||
MockIndexLifecycleContext context = new MockIndexLifecycleContext(randomAlphaOfLengthBetween(1, 20),
|
||||
randomAlphaOfLengthBetween(1, 20), action) {
|
||||
|
||||
@Override
|
||||
public boolean canExecute(Phase phase) {
|
||||
throw new AssertionError("canExecute should not have been called.");
|
||||
}
|
||||
};
|
||||
|
||||
assertEquals(action, context.getAction());
|
||||
}
|
||||
|
||||
public void testGetLifecycleTarget() {
|
||||
String target = randomAlphaOfLengthBetween(1, 20);
|
||||
MockIndexLifecycleContext context = new MockIndexLifecycleContext(target, randomAlphaOfLengthBetween(1, 20),
|
||||
randomAlphaOfLengthBetween(1, 20)) {
|
||||
|
||||
@Override
|
||||
public boolean canExecute(Phase phase) {
|
||||
throw new AssertionError("canExecute should not have been called.");
|
||||
}
|
||||
};
|
||||
|
||||
assertEquals(target, context.getLifecycleTarget());
|
||||
}
|
||||
|
||||
public void testExecuteAction() {
|
||||
MockIndexLifecycleContext context = new MockIndexLifecycleContext(randomAlphaOfLengthBetween(1, 20),
|
||||
randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20)) {
|
||||
|
||||
@Override
|
||||
public boolean canExecute(Phase phase) {
|
||||
throw new AssertionError("canExecute should not have been called.");
|
||||
}
|
||||
};
|
||||
|
||||
MockAction action = new MockAction();
|
||||
|
||||
assertFalse(action.wasExecuted());
|
||||
|
||||
context.executeAction(action);
|
||||
|
||||
assertTrue(action.wasExecuted());
|
||||
}
|
||||
}
|
|
@ -5,27 +5,15 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
|
||||
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse;
|
||||
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsTestHelper;
|
||||
import org.elasticsearch.client.AdminClient;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.IndicesAdminClient;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.Writeable.Reader;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.test.AbstractSerializingTestCase;
|
||||
import org.junit.Before;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -95,51 +83,6 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
|
|||
return new Phase(name, after, actions);
|
||||
}
|
||||
|
||||
public void testCanExecuteBeforeTrigger() throws Exception {
|
||||
TimeValue after = TimeValue.timeValueSeconds(randomIntBetween(0, 100000));
|
||||
long creationDate = randomNonNegativeLong();
|
||||
long now = random().longs(creationDate, creationDate + after.millis()).iterator().nextLong();
|
||||
|
||||
IndexMetaData idxMeta = IndexMetaData.builder("test")
|
||||
.settings(Settings.builder().put("index.version.created", 7000001L).put("index.creation_date", creationDate).build())
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
|
||||
Phase phase = new Phase("test_phase", after, Collections.emptyList());
|
||||
|
||||
assertFalse(phase.canExecute(idxMeta, () -> now));
|
||||
}
|
||||
|
||||
public void testCanExecuteOnTrigger() throws Exception {
|
||||
TimeValue after = TimeValue.timeValueSeconds(randomIntBetween(0, 100000));
|
||||
long creationDate = randomNonNegativeLong();
|
||||
long now = creationDate + after.millis();
|
||||
|
||||
IndexMetaData idxMeta = IndexMetaData.builder("test")
|
||||
.settings(Settings.builder().put("index.version.created", 7000001L).put("index.creation_date", creationDate).build())
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
|
||||
Phase phase = new Phase("test_phase", after, Collections.emptyList());
|
||||
|
||||
assertTrue(phase.canExecute(idxMeta, () -> now));
|
||||
}
|
||||
|
||||
public void testCanExecuteAfterTrigger() throws Exception {
|
||||
TimeValue after = TimeValue.timeValueSeconds(randomIntBetween(0, 100000));
|
||||
long creationDate = randomNonNegativeLong();
|
||||
long now = random().longs(creationDate + after.millis(), Long.MAX_VALUE).iterator().nextLong();
|
||||
|
||||
IndexMetaData idxMeta = IndexMetaData.builder("test").settings(Settings.builder()
|
||||
.put("index.version.created", 7000001L)
|
||||
.put("index.creation_date", creationDate)
|
||||
.build())
|
||||
.numberOfShards(randomIntBetween(1, 5))
|
||||
.numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
|
||||
Phase phase = new Phase("test_phase", after, Collections.emptyList());
|
||||
|
||||
assertTrue(phase.canExecute(idxMeta, () -> now));
|
||||
}
|
||||
|
||||
public void testExecuteNewIndex() throws Exception {
|
||||
String indexName = randomAlphaOfLengthBetween(1, 20);
|
||||
String phaseName = randomAlphaOfLengthBetween(1, 20);
|
||||
|
@ -168,40 +111,74 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
|
|||
actions.add(thirdAction);
|
||||
Phase phase = new Phase(phaseName, after, actions);
|
||||
|
||||
IndexMetaData idxMeta = IndexMetaData.builder(indexName).settings(Settings.builder().put("index.version.created", 7000001L).build())
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
|
||||
Client client = Mockito.mock(Client.class);
|
||||
AdminClient adminClient = Mockito.mock(AdminClient.class);
|
||||
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
|
||||
|
||||
Mockito.when(client.admin()).thenReturn(adminClient);
|
||||
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
|
||||
Mockito.doAnswer(new Answer<Void>() {
|
||||
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, phaseName, "") {
|
||||
|
||||
@Override
|
||||
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||
UpdateSettingsRequest request = (UpdateSettingsRequest) invocation.getArguments()[0];
|
||||
@SuppressWarnings("unchecked")
|
||||
ActionListener<UpdateSettingsResponse> listener = (ActionListener<UpdateSettingsResponse>) invocation.getArguments()[1];
|
||||
Settings expectedSettings = Settings.builder()
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), "first_action").build();
|
||||
UpdateSettingsTestHelper.assertSettingsRequest(request, expectedSettings, indexName);
|
||||
listener.onResponse(UpdateSettingsTestHelper.createMockResponse(true));
|
||||
return null;
|
||||
public boolean canExecute(Phase phase) {
|
||||
throw new AssertionError("canExecute should not have been called");
|
||||
}
|
||||
};
|
||||
|
||||
}).when(indicesClient).updateSettings(Mockito.any(), Mockito.any());
|
||||
phase.execute(context);
|
||||
|
||||
phase.execute(idxMeta, client);
|
||||
assertEquals(indexName, context.getLifecycleTarget());
|
||||
assertEquals(phaseName, context.getPhase());
|
||||
assertEquals(firstAction.getWriteableName(), context.getAction());
|
||||
|
||||
assertFalse(firstAction.wasExecuted());
|
||||
assertFalse(secondAction.wasExecuted());
|
||||
assertFalse(thirdAction.wasExecuted());
|
||||
}
|
||||
|
||||
Mockito.verify(client, Mockito.only()).admin();
|
||||
Mockito.verify(adminClient, Mockito.only()).indices();
|
||||
Mockito.verify(indicesClient, Mockito.only()).updateSettings(Mockito.any(), Mockito.any());
|
||||
public void testExecuteNewIndexFailure() throws Exception {
|
||||
String indexName = randomAlphaOfLengthBetween(1, 20);
|
||||
String phaseName = randomAlphaOfLengthBetween(1, 20);
|
||||
TimeValue after = TimeValue.timeValueSeconds(randomIntBetween(10, 100));
|
||||
List<LifecycleAction> actions = new ArrayList<>();
|
||||
MockAction firstAction = new MockAction() {
|
||||
@Override
|
||||
public String getWriteableName() {
|
||||
return "first_action";
|
||||
}
|
||||
};
|
||||
actions.add(firstAction);
|
||||
MockAction secondAction = new MockAction() {
|
||||
@Override
|
||||
public String getWriteableName() {
|
||||
return "second_action";
|
||||
}
|
||||
};
|
||||
actions.add(secondAction);
|
||||
MockAction thirdAction = new MockAction() {
|
||||
@Override
|
||||
public String getWriteableName() {
|
||||
return "third_action";
|
||||
}
|
||||
};
|
||||
actions.add(thirdAction);
|
||||
Phase phase = new Phase(phaseName, after, actions);
|
||||
|
||||
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, phaseName, "") {
|
||||
|
||||
@Override
|
||||
public boolean canExecute(Phase phase) {
|
||||
throw new AssertionError("canExecute should not have been called");
|
||||
}
|
||||
};
|
||||
|
||||
RuntimeException exception = new RuntimeException();
|
||||
|
||||
context.failOnSetters(exception);
|
||||
|
||||
phase.execute(context);
|
||||
|
||||
assertEquals(indexName, context.getLifecycleTarget());
|
||||
assertEquals(phaseName, context.getPhase());
|
||||
assertEquals("", context.getAction());
|
||||
|
||||
assertFalse(firstAction.wasExecuted());
|
||||
assertFalse(secondAction.wasExecuted());
|
||||
assertFalse(thirdAction.wasExecuted());
|
||||
}
|
||||
|
||||
public void testExecuteNewIndexNoActions() throws Exception {
|
||||
|
@ -210,36 +187,19 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
|
|||
TimeValue after = TimeValue.timeValueSeconds(randomIntBetween(10, 100));
|
||||
Phase phase = new Phase(phaseName, after, Collections.emptyList());
|
||||
|
||||
IndexMetaData idxMeta = IndexMetaData.builder(indexName).settings(Settings.builder().put("index.version.created", 7000001L).build())
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
|
||||
Client client = Mockito.mock(Client.class);
|
||||
AdminClient adminClient = Mockito.mock(AdminClient.class);
|
||||
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
|
||||
|
||||
Mockito.when(client.admin()).thenReturn(adminClient);
|
||||
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
|
||||
Mockito.doAnswer(new Answer<Void>() {
|
||||
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, phaseName, "") {
|
||||
|
||||
@Override
|
||||
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||
UpdateSettingsRequest request = (UpdateSettingsRequest) invocation.getArguments()[0];
|
||||
@SuppressWarnings("unchecked")
|
||||
ActionListener<UpdateSettingsResponse> listener = (ActionListener<UpdateSettingsResponse>) invocation.getArguments()[1];
|
||||
Settings expectedSettings = Settings.builder()
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), Phase.PHASE_COMPLETED).build();
|
||||
UpdateSettingsTestHelper.assertSettingsRequest(request, expectedSettings, indexName);
|
||||
listener.onResponse(UpdateSettingsTestHelper.createMockResponse(true));
|
||||
return null;
|
||||
public boolean canExecute(Phase phase) {
|
||||
throw new AssertionError("canExecute should not have been called");
|
||||
}
|
||||
};
|
||||
|
||||
}).when(indicesClient).updateSettings(Mockito.any(), Mockito.any());
|
||||
phase.execute(context);
|
||||
|
||||
phase.execute(idxMeta, client);
|
||||
|
||||
Mockito.verify(client, Mockito.only()).admin();
|
||||
Mockito.verify(adminClient, Mockito.only()).indices();
|
||||
Mockito.verify(indicesClient, Mockito.only()).updateSettings(Mockito.any(), Mockito.any());
|
||||
assertEquals(indexName, context.getLifecycleTarget());
|
||||
assertEquals(phaseName, context.getPhase());
|
||||
assertEquals(Phase.PHASE_COMPLETED, context.getAction());
|
||||
}
|
||||
|
||||
public void testExecutePhaseAlreadyComplete() throws Exception {
|
||||
|
@ -269,25 +229,23 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
|
|||
};
|
||||
Phase phase = new Phase(phaseName, after, Collections.emptyList());
|
||||
|
||||
IndexMetaData idxMeta = IndexMetaData.builder(indexName)
|
||||
.settings(Settings.builder().put("index.version.created", 7000001L)
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), Phase.PHASE_COMPLETED).build())
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, phaseName, Phase.PHASE_COMPLETED) {
|
||||
|
||||
Client client = Mockito.mock(Client.class);
|
||||
AdminClient adminClient = Mockito.mock(AdminClient.class);
|
||||
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
|
||||
@Override
|
||||
public boolean canExecute(Phase phase) {
|
||||
throw new AssertionError("canExecute should not have been called");
|
||||
}
|
||||
};
|
||||
|
||||
Mockito.when(client.admin()).thenReturn(adminClient);
|
||||
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
|
||||
phase.execute(context);
|
||||
|
||||
phase.execute(idxMeta, client);
|
||||
assertEquals(indexName, context.getLifecycleTarget());
|
||||
assertEquals(phaseName, context.getPhase());
|
||||
assertEquals(Phase.PHASE_COMPLETED, context.getAction());
|
||||
|
||||
assertFalse(firstAction.wasExecuted());
|
||||
assertFalse(secondAction.wasExecuted());
|
||||
assertFalse(thirdAction.wasExecuted());
|
||||
|
||||
Mockito.verifyZeroInteractions(client, adminClient, indicesClient);
|
||||
}
|
||||
|
||||
public void testExecuteFirstAction() throws Exception {
|
||||
|
@ -318,25 +276,23 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
|
|||
actions.add(thirdAction);
|
||||
Phase phase = new Phase(phaseName, after, actions);
|
||||
|
||||
IndexMetaData idxMeta = IndexMetaData.builder(indexName)
|
||||
.settings(Settings.builder().put("index.version.created", 7000001L)
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), firstAction.getWriteableName()).build())
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, phaseName, firstAction.getWriteableName()) {
|
||||
|
||||
Client client = Mockito.mock(Client.class);
|
||||
AdminClient adminClient = Mockito.mock(AdminClient.class);
|
||||
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
|
||||
@Override
|
||||
public boolean canExecute(Phase phase) {
|
||||
throw new AssertionError("canExecute should not have been called");
|
||||
}
|
||||
};
|
||||
|
||||
Mockito.when(client.admin()).thenReturn(adminClient);
|
||||
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
|
||||
phase.execute(context);
|
||||
|
||||
phase.execute(idxMeta, client);
|
||||
assertEquals(indexName, context.getLifecycleTarget());
|
||||
assertEquals(phaseName, context.getPhase());
|
||||
assertEquals(firstAction.getWriteableName(), context.getAction());
|
||||
|
||||
assertTrue(firstAction.wasExecuted());
|
||||
assertFalse(secondAction.wasExecuted());
|
||||
assertFalse(thirdAction.wasExecuted());
|
||||
|
||||
Mockito.verifyZeroInteractions(client, adminClient, indicesClient);
|
||||
}
|
||||
|
||||
public void testExecuteSecondAction() throws Exception {
|
||||
|
@ -367,25 +323,23 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
|
|||
actions.add(thirdAction);
|
||||
Phase phase = new Phase(phaseName, after, actions);
|
||||
|
||||
IndexMetaData idxMeta = IndexMetaData.builder(indexName)
|
||||
.settings(Settings.builder().put("index.version.created", 7000001L)
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), secondAction.getWriteableName()).build())
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, phaseName, secondAction.getWriteableName()) {
|
||||
|
||||
Client client = Mockito.mock(Client.class);
|
||||
AdminClient adminClient = Mockito.mock(AdminClient.class);
|
||||
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
|
||||
@Override
|
||||
public boolean canExecute(Phase phase) {
|
||||
throw new AssertionError("canExecute should not have been called");
|
||||
}
|
||||
};
|
||||
|
||||
Mockito.when(client.admin()).thenReturn(adminClient);
|
||||
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
|
||||
phase.execute(context);
|
||||
|
||||
phase.execute(idxMeta, client);
|
||||
assertEquals(indexName, context.getLifecycleTarget());
|
||||
assertEquals(phaseName, context.getPhase());
|
||||
assertEquals(secondAction.getWriteableName(), context.getAction());
|
||||
|
||||
assertFalse(firstAction.wasExecuted());
|
||||
assertTrue(secondAction.wasExecuted());
|
||||
assertFalse(thirdAction.wasExecuted());
|
||||
|
||||
Mockito.verifyZeroInteractions(client, adminClient, indicesClient);
|
||||
}
|
||||
|
||||
public void testExecuteThirdAction() throws Exception {
|
||||
|
@ -416,25 +370,24 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
|
|||
actions.add(thirdAction);
|
||||
Phase phase = new Phase(phaseName, after, actions);
|
||||
|
||||
IndexMetaData idxMeta = IndexMetaData.builder(indexName)
|
||||
.settings(Settings.builder().put("index.version.created", 7000001L)
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), thirdAction.getWriteableName()).build())
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, phaseName, thirdAction.getWriteableName()) {
|
||||
|
||||
Client client = Mockito.mock(Client.class);
|
||||
AdminClient adminClient = Mockito.mock(AdminClient.class);
|
||||
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
|
||||
@Override
|
||||
public boolean canExecute(Phase phase) {
|
||||
throw new AssertionError("canExecute should not have been called");
|
||||
}
|
||||
};
|
||||
|
||||
Mockito.when(client.admin()).thenReturn(adminClient);
|
||||
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
|
||||
phase.execute(context);
|
||||
|
||||
phase.execute(idxMeta, client);
|
||||
|
||||
assertEquals(indexName, context.getLifecycleTarget());
|
||||
assertEquals(phaseName, context.getPhase());
|
||||
assertEquals(thirdAction.getWriteableName(), context.getAction());
|
||||
|
||||
assertFalse(firstAction.wasExecuted());
|
||||
assertFalse(secondAction.wasExecuted());
|
||||
assertTrue(thirdAction.wasExecuted());
|
||||
|
||||
Mockito.verifyZeroInteractions(client, adminClient, indicesClient);
|
||||
}
|
||||
|
||||
public void testExecuteMissingAction() throws Exception {
|
||||
|
@ -465,27 +418,25 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
|
|||
actions.add(thirdAction);
|
||||
Phase phase = new Phase(phaseName, after, actions);
|
||||
|
||||
IndexMetaData idxMeta = IndexMetaData.builder(indexName)
|
||||
.settings(Settings.builder().put("index.version.created", 7000001L)
|
||||
.put(IndexLifecycle.LIFECYCLE_TIMESERIES_ACTION_SETTING.getKey(), "does_not_exist").build())
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, phaseName, "does_not_exist") {
|
||||
|
||||
Client client = Mockito.mock(Client.class);
|
||||
AdminClient adminClient = Mockito.mock(AdminClient.class);
|
||||
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
|
||||
@Override
|
||||
public boolean canExecute(Phase phase) {
|
||||
throw new AssertionError("canExecute should not have been called");
|
||||
}
|
||||
};
|
||||
|
||||
Mockito.when(client.admin()).thenReturn(adminClient);
|
||||
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
|
||||
|
||||
IllegalStateException exception = expectThrows(IllegalStateException.class, () -> phase.execute(idxMeta, client));
|
||||
IllegalStateException exception = expectThrows(IllegalStateException.class, () -> phase.execute(context));
|
||||
assertEquals("Current action [" + "does_not_exist" + "] not found in phase [" + phaseName + "] for index [" + indexName + "]",
|
||||
exception.getMessage());
|
||||
|
||||
assertEquals(indexName, context.getLifecycleTarget());
|
||||
assertEquals(phaseName, context.getPhase());
|
||||
assertEquals("does_not_exist", context.getAction());
|
||||
|
||||
assertFalse(firstAction.wasExecuted());
|
||||
assertFalse(secondAction.wasExecuted());
|
||||
assertFalse(thirdAction.wasExecuted());
|
||||
|
||||
Mockito.verifyZeroInteractions(client, adminClient, indicesClient);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue