Adds listeners to Lifecycle actions
This means that the result of the action can now be async and we can then implement moving immediately to the next action if the current one is complete
This commit is contained in:
parent
26ad2142d5
commit
152d5d45e5
|
@ -21,7 +21,7 @@ import org.elasticsearch.index.Index;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
public class DeleteAction extends LifecycleAction {
|
||||
public class DeleteAction implements LifecycleAction {
|
||||
public static final String NAME = "delete";
|
||||
|
||||
private static final Logger logger = ESLoggerFactory.getLogger(DeleteAction.class);
|
||||
|
@ -54,16 +54,18 @@ public class DeleteAction extends LifecycleAction {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void execute(Index index, Client client) {
|
||||
public void execute(Index index, Client client, Listener listener) {
|
||||
client.admin().indices().delete(new DeleteIndexRequest(index.getName()), new ActionListener<DeleteIndexResponse>() {
|
||||
@Override
|
||||
public void onResponse(DeleteIndexResponse deleteIndexResponse) {
|
||||
logger.error(deleteIndexResponse);
|
||||
listener.onSuccess(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
logger.error(e);
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -19,7 +19,7 @@ public interface IndexLifecycleContext {
|
|||
|
||||
boolean canExecute(Phase phase);
|
||||
|
||||
public void executeAction(LifecycleAction action);
|
||||
public void executeAction(LifecycleAction action, LifecycleAction.Listener listener);
|
||||
|
||||
public static interface Listener {
|
||||
|
||||
|
|
|
@ -60,8 +60,8 @@ public class InternalIndexLifecycleContext implements IndexLifecycleContext {
|
|||
return (indexCreated + phase.getAfter().millis()) <= now;
|
||||
}
|
||||
|
||||
public void executeAction(LifecycleAction action) {
|
||||
action.execute(idxMeta.getIndex(), client);
|
||||
public void executeAction(LifecycleAction action, LifecycleAction.Listener listener) {
|
||||
action.execute(idxMeta.getIndex(), client, listener);
|
||||
}
|
||||
|
||||
private void writeSettings(String index, Settings settings, Listener listener) {
|
||||
|
|
|
@ -10,8 +10,14 @@ import org.elasticsearch.common.io.stream.NamedWriteable;
|
|||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.index.Index;
|
||||
|
||||
public abstract class LifecycleAction implements ToXContentObject, NamedWriteable {
|
||||
public interface LifecycleAction extends ToXContentObject, NamedWriteable {
|
||||
|
||||
protected abstract void execute(Index index, Client client);
|
||||
void execute(Index index, Client client, Listener listener);
|
||||
|
||||
public static interface Listener {
|
||||
|
||||
void onSuccess(boolean completed);
|
||||
|
||||
void onFailure(Exception e);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -105,6 +105,7 @@ public class LifecyclePolicy extends AbstractDiffable<LifecyclePolicy> implement
|
|||
@Override
|
||||
public void onSuccess() {
|
||||
logger.info("Successfully initialised phase [" + nextPhaseName + "] for index [" + indexName + "]");
|
||||
nextPhase.execute(context);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -90,16 +90,39 @@ public class Phase implements ToXContentObject, Writeable {
|
|||
String indexName = context.getLifecycleTarget();
|
||||
if (Strings.isNullOrEmpty(currentActionName)) {
|
||||
String firstActionName;
|
||||
LifecycleAction firstAction;
|
||||
if (actions.isEmpty()) {
|
||||
firstAction = null;
|
||||
firstActionName = PHASE_COMPLETED;
|
||||
} else {
|
||||
firstActionName = actions.get(0).getWriteableName();
|
||||
firstAction = actions.get(0);
|
||||
firstActionName = firstAction.getWriteableName();
|
||||
}
|
||||
context.setAction(firstActionName, new Listener() {
|
||||
|
||||
@Override
|
||||
public void onSuccess() {
|
||||
logger.info("Successfully initialised action [" + firstActionName + "] for index [" + indexName + "]");
|
||||
if (firstActionName.equals(PHASE_COMPLETED) == false) {
|
||||
context.executeAction(firstAction, new LifecycleAction.Listener() {
|
||||
|
||||
@Override
|
||||
public void onSuccess(boolean completed) {
|
||||
if (completed) {
|
||||
logger.info("Action [" + firstActionName + "] for index [" + indexName
|
||||
+ "] executed sucessfully but is not yet complete");
|
||||
} else {
|
||||
logger.info("Action [" + firstActionName + "] for index [" + indexName
|
||||
+ "] complete, moving to next action");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
logger.info("Action [" + firstActionName + "] for index [" + indexName + "] failed", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -111,7 +134,23 @@ public class Phase implements ToXContentObject, Writeable {
|
|||
LifecycleAction currentAction = actions.stream().filter(action -> action.getWriteableName().equals(currentActionName)).findAny()
|
||||
.orElseThrow(() -> new IllegalStateException("Current action [" + currentActionName + "] not found in phase ["
|
||||
+ getName() + "] for index [" + indexName + "]"));
|
||||
context.executeAction(currentAction);
|
||||
context.executeAction(currentAction, new LifecycleAction.Listener() {
|
||||
|
||||
@Override
|
||||
public void onSuccess(boolean completed) {
|
||||
if (completed) {
|
||||
logger.info("Action [" + currentActionName + "] for index [" + indexName
|
||||
+ "] executed sucessfully but is not yet complete");
|
||||
} else {
|
||||
logger.info("Action [" + currentActionName + "] for index [" + indexName + "] complete, moving to next action");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
logger.info("Action [" + currentActionName + "] for index [" + indexName + "] failed", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.indexlifecycle;
|
||||
|
||||
import org.apache.lucene.util.SetOnce;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
|
||||
|
@ -15,6 +16,7 @@ import org.elasticsearch.common.io.stream.Writeable.Reader;
|
|||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.test.AbstractSerializingTestCase;
|
||||
import org.elasticsearch.xpack.indexlifecycle.LifecycleAction.Listener;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
@ -63,8 +65,72 @@ public class DeleteActionTests extends AbstractSerializingTestCase<DeleteAction>
|
|||
|
||||
}).when(indicesClient).delete(Mockito.any(), Mockito.any());
|
||||
|
||||
SetOnce<Boolean> actionCompleted = new SetOnce<>();
|
||||
DeleteAction action = new DeleteAction();
|
||||
action.execute(index, client);
|
||||
action.execute(index, client, new Listener() {
|
||||
|
||||
@Override
|
||||
public void onSuccess(boolean completed) {
|
||||
actionCompleted.set(completed);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
throw new AssertionError("Unexpected method call", e);
|
||||
}
|
||||
});
|
||||
|
||||
assertEquals(true, actionCompleted.get());
|
||||
|
||||
Mockito.verify(client, Mockito.only()).admin();
|
||||
Mockito.verify(adminClient, Mockito.only()).indices();
|
||||
Mockito.verify(indicesClient, Mockito.only()).delete(Mockito.any(), Mockito.any());
|
||||
}
|
||||
|
||||
public void testExecuteFailure() throws Exception {
|
||||
Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
|
||||
Exception exception = new RuntimeException();
|
||||
|
||||
Client client = Mockito.mock(Client.class);
|
||||
AdminClient adminClient = Mockito.mock(AdminClient.class);
|
||||
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
|
||||
|
||||
Mockito.when(client.admin()).thenReturn(adminClient);
|
||||
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
|
||||
Mockito.doAnswer(new Answer<Void>() {
|
||||
|
||||
@Override
|
||||
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||
DeleteIndexRequest request = (DeleteIndexRequest) invocation.getArguments()[0];
|
||||
@SuppressWarnings("unchecked")
|
||||
ActionListener<DeleteIndexResponse> listener = (ActionListener<DeleteIndexResponse>) invocation.getArguments()[1];
|
||||
assertNotNull(request);
|
||||
assertEquals(1, request.indices().length);
|
||||
assertEquals(index.getName(), request.indices()[0]);
|
||||
listener.onFailure(exception);
|
||||
;
|
||||
return null;
|
||||
}
|
||||
|
||||
}).when(indicesClient).delete(Mockito.any(), Mockito.any());
|
||||
|
||||
SetOnce<Boolean> exceptionThrown = new SetOnce<>();
|
||||
DeleteAction action = new DeleteAction();
|
||||
action.execute(index, client, new Listener() {
|
||||
|
||||
@Override
|
||||
public void onSuccess(boolean completed) {
|
||||
throw new AssertionError("Unexpected method call");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
assertEquals(exception, e);
|
||||
exceptionThrown.set(true);
|
||||
}
|
||||
});
|
||||
|
||||
assertEquals(true, exceptionThrown.get());
|
||||
|
||||
Mockito.verify(client, Mockito.only()).admin();
|
||||
Mockito.verify(adminClient, Mockito.only()).indices();
|
||||
|
|
|
@ -482,10 +482,26 @@ public class InternalIndexLifecycleContextTests extends ESTestCase {
|
|||
|
||||
MockAction action = new MockAction();
|
||||
|
||||
assertFalse(action.wasExecuted());
|
||||
assertFalse(action.wasCompleted());
|
||||
assertEquals(0L, action.getExecutedCount());
|
||||
|
||||
context.executeAction(action);
|
||||
SetOnce<Boolean> listenerCalled = new SetOnce<>();
|
||||
|
||||
assertTrue(action.wasExecuted());
|
||||
context.executeAction(action, new LifecycleAction.Listener() {
|
||||
|
||||
@Override
|
||||
public void onSuccess(boolean completed) {
|
||||
listenerCalled.set(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
throw new AssertionError("Unexpected method call", e);
|
||||
}
|
||||
});
|
||||
|
||||
assertTrue(action.wasCompleted());
|
||||
assertEquals(1L, action.getExecutedCount());
|
||||
assertEquals(true, listenerCalled.get());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -131,9 +131,12 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
|
|||
assertEquals("", context.getPhase());
|
||||
assertEquals("", context.getAction());
|
||||
|
||||
assertFalse(firstAction.wasExecuted());
|
||||
assertFalse(secondAction.wasExecuted());
|
||||
assertFalse(thirdAction.wasExecuted());
|
||||
assertFalse(firstAction.wasCompleted());
|
||||
assertEquals(0L, firstAction.getExecutedCount());
|
||||
assertFalse(secondAction.wasCompleted());
|
||||
assertEquals(0L, secondAction.getExecutedCount());
|
||||
assertFalse(thirdAction.wasCompleted());
|
||||
assertEquals(0L, thirdAction.getExecutedCount());
|
||||
}
|
||||
|
||||
public void testExecuteNewIndexAfterTrigger() throws Exception {
|
||||
|
@ -176,11 +179,14 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
|
|||
|
||||
assertEquals(indexName, context.getLifecycleTarget());
|
||||
assertEquals(firstPhase.getName(), context.getPhase());
|
||||
assertEquals("", context.getAction());
|
||||
assertEquals(MockAction.NAME, context.getAction());
|
||||
|
||||
assertFalse(firstAction.wasExecuted());
|
||||
assertFalse(secondAction.wasExecuted());
|
||||
assertFalse(thirdAction.wasExecuted());
|
||||
assertTrue(firstAction.wasCompleted());
|
||||
assertEquals(1L, firstAction.getExecutedCount());
|
||||
assertFalse(secondAction.wasCompleted());
|
||||
assertEquals(0L, secondAction.getExecutedCount());
|
||||
assertFalse(thirdAction.wasCompleted());
|
||||
assertEquals(0L, thirdAction.getExecutedCount());
|
||||
}
|
||||
|
||||
public void testExecuteNewIndexAfterTriggerFailure() throws Exception {
|
||||
|
@ -229,9 +235,12 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
|
|||
assertEquals("", context.getPhase());
|
||||
assertEquals("", context.getAction());
|
||||
|
||||
assertFalse(firstAction.wasExecuted());
|
||||
assertFalse(secondAction.wasExecuted());
|
||||
assertFalse(thirdAction.wasExecuted());
|
||||
assertFalse(firstAction.wasCompleted());
|
||||
assertEquals(0L, firstAction.getExecutedCount());
|
||||
assertFalse(secondAction.wasCompleted());
|
||||
assertEquals(0L, secondAction.getExecutedCount());
|
||||
assertFalse(thirdAction.wasCompleted());
|
||||
assertEquals(0L, thirdAction.getExecutedCount());
|
||||
}
|
||||
|
||||
public void testExecuteFirstPhase() throws Exception {
|
||||
|
@ -272,9 +281,12 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
|
|||
assertEquals(firstPhase.getName(), context.getPhase());
|
||||
assertEquals(MockAction.NAME, context.getAction());
|
||||
|
||||
assertFalse(firstAction.wasExecuted());
|
||||
assertFalse(secondAction.wasExecuted());
|
||||
assertFalse(thirdAction.wasExecuted());
|
||||
assertTrue(firstAction.wasCompleted());
|
||||
assertEquals(1L, firstAction.getExecutedCount());
|
||||
assertFalse(secondAction.wasCompleted());
|
||||
assertEquals(0L, secondAction.getExecutedCount());
|
||||
assertFalse(thirdAction.wasCompleted());
|
||||
assertEquals(0L, thirdAction.getExecutedCount());
|
||||
}
|
||||
|
||||
public void testExecuteSecondPhase() throws Exception {
|
||||
|
@ -315,9 +327,12 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
|
|||
assertEquals(secondPhase.getName(), context.getPhase());
|
||||
assertEquals(MockAction.NAME, context.getAction());
|
||||
|
||||
assertFalse(firstAction.wasExecuted());
|
||||
assertFalse(secondAction.wasExecuted());
|
||||
assertFalse(thirdAction.wasExecuted());
|
||||
assertFalse(firstAction.wasCompleted());
|
||||
assertEquals(0L, firstAction.getExecutedCount());
|
||||
assertTrue(secondAction.wasCompleted());
|
||||
assertEquals(1L, secondAction.getExecutedCount());
|
||||
assertFalse(thirdAction.wasCompleted());
|
||||
assertEquals(0L, thirdAction.getExecutedCount());
|
||||
}
|
||||
|
||||
public void testExecuteThirdPhase() throws Exception {
|
||||
|
@ -358,9 +373,12 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
|
|||
assertEquals(thirdPhase.getName(), context.getPhase());
|
||||
assertEquals(MockAction.NAME, context.getAction());
|
||||
|
||||
assertFalse(firstAction.wasExecuted());
|
||||
assertFalse(secondAction.wasExecuted());
|
||||
assertFalse(thirdAction.wasExecuted());
|
||||
assertFalse(firstAction.wasCompleted());
|
||||
assertEquals(0L, firstAction.getExecutedCount());
|
||||
assertFalse(secondAction.wasCompleted());
|
||||
assertEquals(0L, secondAction.getExecutedCount());
|
||||
assertTrue(thirdAction.wasCompleted());
|
||||
assertEquals(1L, thirdAction.getExecutedCount());
|
||||
}
|
||||
|
||||
public void testExecuteMissingPhase() throws Exception {
|
||||
|
@ -404,9 +422,12 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
|
|||
assertEquals("does_not_exist", context.getPhase());
|
||||
assertEquals("", context.getAction());
|
||||
|
||||
assertFalse(firstAction.wasExecuted());
|
||||
assertFalse(secondAction.wasExecuted());
|
||||
assertFalse(thirdAction.wasExecuted());
|
||||
assertFalse(firstAction.wasCompleted());
|
||||
assertEquals(0L, firstAction.getExecutedCount());
|
||||
assertFalse(secondAction.wasCompleted());
|
||||
assertEquals(0L, secondAction.getExecutedCount());
|
||||
assertFalse(thirdAction.wasCompleted());
|
||||
assertEquals(0L, thirdAction.getExecutedCount());
|
||||
}
|
||||
|
||||
public void testExecuteFirstPhaseCompletedBeforeTrigger() throws Exception {
|
||||
|
@ -451,9 +472,12 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
|
|||
assertEquals(firstPhase.getName(), context.getPhase());
|
||||
assertEquals(Phase.PHASE_COMPLETED, context.getAction());
|
||||
|
||||
assertFalse(firstAction.wasExecuted());
|
||||
assertFalse(secondAction.wasExecuted());
|
||||
assertFalse(thirdAction.wasExecuted());
|
||||
assertFalse(firstAction.wasCompleted());
|
||||
assertEquals(0L, thirdAction.getExecutedCount());
|
||||
assertFalse(thirdAction.wasCompleted());
|
||||
assertEquals(0L, secondAction.getExecutedCount());
|
||||
assertFalse(thirdAction.wasCompleted());
|
||||
assertEquals(0L, thirdAction.getExecutedCount());
|
||||
}
|
||||
|
||||
public void testExecuteFirstPhaseCompletedAfterTrigger() throws Exception {
|
||||
|
@ -496,11 +520,14 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
|
|||
|
||||
assertEquals(indexName, context.getLifecycleTarget());
|
||||
assertEquals(secondPhase.getName(), context.getPhase());
|
||||
assertEquals("", context.getAction());
|
||||
assertEquals(MockAction.NAME, context.getAction());
|
||||
|
||||
assertFalse(firstAction.wasExecuted());
|
||||
assertFalse(secondAction.wasExecuted());
|
||||
assertFalse(thirdAction.wasExecuted());
|
||||
assertFalse(firstAction.wasCompleted());
|
||||
assertEquals(0L, firstAction.getExecutedCount());
|
||||
assertTrue(secondAction.wasCompleted());
|
||||
assertEquals(1L, secondAction.getExecutedCount());
|
||||
assertFalse(thirdAction.wasCompleted());
|
||||
assertEquals(0L, thirdAction.getExecutedCount());
|
||||
}
|
||||
|
||||
public void testExecuteSecondPhaseCompletedBeforeTrigger() throws Exception {
|
||||
|
@ -545,9 +572,12 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
|
|||
assertEquals(secondPhase.getName(), context.getPhase());
|
||||
assertEquals(Phase.PHASE_COMPLETED, context.getAction());
|
||||
|
||||
assertFalse(firstAction.wasExecuted());
|
||||
assertFalse(secondAction.wasExecuted());
|
||||
assertFalse(thirdAction.wasExecuted());
|
||||
assertFalse(firstAction.wasCompleted());
|
||||
assertEquals(0L, firstAction.getExecutedCount());
|
||||
assertFalse(secondAction.wasCompleted());
|
||||
assertEquals(0L, secondAction.getExecutedCount());
|
||||
assertFalse(thirdAction.wasCompleted());
|
||||
assertEquals(0L, thirdAction.getExecutedCount());
|
||||
}
|
||||
|
||||
public void testExecuteSecondPhaseCompletedAfterTrigger() throws Exception {
|
||||
|
@ -590,11 +620,14 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
|
|||
|
||||
assertEquals(indexName, context.getLifecycleTarget());
|
||||
assertEquals(thirdPhase.getName(), context.getPhase());
|
||||
assertEquals("", context.getAction());
|
||||
assertEquals(MockAction.NAME, context.getAction());
|
||||
|
||||
assertFalse(firstAction.wasExecuted());
|
||||
assertFalse(secondAction.wasExecuted());
|
||||
assertFalse(thirdAction.wasExecuted());
|
||||
assertFalse(firstAction.wasCompleted());
|
||||
assertEquals(0L, firstAction.getExecutedCount());
|
||||
assertFalse(secondAction.wasCompleted());
|
||||
assertEquals(0L, secondAction.getExecutedCount());
|
||||
assertTrue(thirdAction.wasCompleted());
|
||||
assertEquals(1L, thirdAction.getExecutedCount());
|
||||
}
|
||||
|
||||
public void testExecuteThirdPhaseCompleted() throws Exception {
|
||||
|
@ -635,9 +668,12 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
|
|||
assertEquals(thirdPhase.getName(), context.getPhase());
|
||||
assertEquals(Phase.PHASE_COMPLETED, context.getAction());
|
||||
|
||||
assertFalse(firstAction.wasExecuted());
|
||||
assertFalse(secondAction.wasExecuted());
|
||||
assertFalse(thirdAction.wasExecuted());
|
||||
assertFalse(firstAction.wasCompleted());
|
||||
assertEquals(0L, firstAction.getExecutedCount());
|
||||
assertFalse(secondAction.wasCompleted());
|
||||
assertEquals(0L, secondAction.getExecutedCount());
|
||||
assertFalse(thirdAction.wasCompleted());
|
||||
assertEquals(0L, thirdAction.getExecutedCount());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ package org.elasticsearch.xpack.indexlifecycle;
|
|||
import org.apache.lucene.util.SetOnce;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
|
@ -17,17 +18,23 @@ import org.elasticsearch.index.Index;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
public class MockAction extends LifecycleAction {
|
||||
public static final ParseField EXECUTED_FIELD = new ParseField("executed");
|
||||
public class MockAction implements LifecycleAction {
|
||||
public static final ParseField COMPLETED_FIELD = new ParseField("completed");
|
||||
public static final ParseField EXECUTED_COUNT_FIELD = new ParseField("executed_count");
|
||||
public static final ParseField NAME_FIELD = new ParseField("name");
|
||||
public static final String NAME = "TEST_ACTION";
|
||||
private final SetOnce<Boolean> executed = new SetOnce<>();
|
||||
private final SetOnce<Boolean> completed = new SetOnce<>();
|
||||
private final AtomicLong executedCount;
|
||||
private Exception exceptionToThrow = null;
|
||||
private boolean completeOnExecute = true;
|
||||
|
||||
private static final ConstructingObjectParser<MockAction, Void> PARSER = new ConstructingObjectParser<>(NAME,
|
||||
a -> new MockAction((Boolean) a[1]));
|
||||
a -> new MockAction((Boolean) a[0], (Long) a[1]));
|
||||
static {
|
||||
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), EXECUTED_FIELD);
|
||||
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), COMPLETED_FIELD);
|
||||
PARSER.declareLong(ConstructingObjectParser.constructorArg(), EXECUTED_COUNT_FIELD);
|
||||
}
|
||||
|
||||
public static MockAction parse(XContentParser parser) {
|
||||
|
@ -35,27 +42,31 @@ public class MockAction extends LifecycleAction {
|
|||
}
|
||||
|
||||
public MockAction() {
|
||||
this(null, 0);
|
||||
}
|
||||
|
||||
private MockAction(Boolean executed) {
|
||||
if (executed != null) {
|
||||
this.executed.set(executed);
|
||||
MockAction(Boolean completed, long executedCount) {
|
||||
if (completed != null) {
|
||||
this.completed.set(completed);
|
||||
}
|
||||
this.executedCount = new AtomicLong(executedCount);
|
||||
}
|
||||
|
||||
public MockAction(StreamInput in) throws IOException {
|
||||
Boolean executed = in.readOptionalBoolean();
|
||||
if (executed != null) {
|
||||
this.executed.set(executed);
|
||||
this.completed.set(executed);
|
||||
}
|
||||
this.executedCount = new AtomicLong(in.readLong());
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
if (executed.get() != null) {
|
||||
builder.field("executed", executed.get());
|
||||
if (completed.get() != null) {
|
||||
builder.field(COMPLETED_FIELD.getPreferredName(), completed.get());
|
||||
}
|
||||
builder.field(EXECUTED_COUNT_FIELD.getPreferredName(), executedCount.get());
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
@ -65,23 +76,44 @@ public class MockAction extends LifecycleAction {
|
|||
return NAME;
|
||||
}
|
||||
|
||||
public Boolean wasExecuted() {
|
||||
return executed.get() != null && executed.get();
|
||||
public void setCompleteOnExecute(boolean completeOnExecute) {
|
||||
this.completeOnExecute = completeOnExecute;
|
||||
}
|
||||
|
||||
public void setExceptionToThrow(Exception exceptionToThrow) {
|
||||
this.exceptionToThrow = exceptionToThrow;
|
||||
}
|
||||
|
||||
public boolean wasCompleted() {
|
||||
return completed.get() != null && completed.get();
|
||||
}
|
||||
|
||||
public long getExecutedCount() {
|
||||
return executedCount.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeOptionalBoolean(executed.get());
|
||||
out.writeOptionalBoolean(completed.get());
|
||||
out.writeLong(executedCount.get());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void execute(Index index, Client client) {
|
||||
executed.set(true);
|
||||
public void execute(Index index, Client client, Listener listener) {
|
||||
executedCount.incrementAndGet();
|
||||
if (exceptionToThrow == null) {
|
||||
if (completeOnExecute) {
|
||||
completed.set(true);
|
||||
}
|
||||
listener.onSuccess(completeOnExecute);
|
||||
} else {
|
||||
listener.onFailure(exceptionToThrow);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(executed.get());
|
||||
return Objects.hash(completed.get(), executedCount.get());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -89,11 +121,17 @@ public class MockAction extends LifecycleAction {
|
|||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (obj.getClass() == getClass()) {
|
||||
if (obj.getClass() != getClass()) {
|
||||
return false;
|
||||
}
|
||||
MockAction other = (MockAction) obj;
|
||||
return Objects.equals(executed.get(), other.executed.get());
|
||||
return Objects.equals(completed.get(), other.completed.get()) &&
|
||||
Objects.equals(executedCount.get(), other.executedCount.get());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return Strings.toString(this);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,136 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.indexlifecycle;
|
||||
|
||||
import org.apache.lucene.util.SetOnce;
|
||||
import org.elasticsearch.common.io.stream.Writeable.Reader;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.test.AbstractSerializingTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class MockActionTests extends AbstractSerializingTestCase<MockAction> {
|
||||
|
||||
@Override
|
||||
protected MockAction createTestInstance() {
|
||||
return new MockAction(randomBoolean() ? null : randomBoolean(), randomLong());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MockAction doParseInstance(XContentParser parser) throws IOException {
|
||||
return MockAction.parse(parser);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Reader<MockAction> instanceReader() {
|
||||
return MockAction::new;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MockAction mutateInstance(MockAction instance) throws IOException {
|
||||
boolean completed = instance.wasCompleted();
|
||||
long executedCount = instance.getExecutedCount();
|
||||
switch (randomIntBetween(0, 1)) {
|
||||
case 0:
|
||||
completed = completed == false;
|
||||
break;
|
||||
case 1:
|
||||
executedCount = executedCount + randomInt(1000);
|
||||
break;
|
||||
default:
|
||||
throw new AssertionError("Illegal randomisation branch");
|
||||
}
|
||||
return new MockAction(completed, executedCount);
|
||||
}
|
||||
|
||||
public void testExecuteNotComplete() {
|
||||
|
||||
MockAction action = new MockAction();
|
||||
action.setCompleteOnExecute(false);
|
||||
|
||||
assertFalse(action.wasCompleted());
|
||||
assertEquals(0L, action.getExecutedCount());
|
||||
|
||||
SetOnce<Boolean> listenerCalled = new SetOnce<>();
|
||||
|
||||
action.execute(null, null, new LifecycleAction.Listener() {
|
||||
|
||||
@Override
|
||||
public void onSuccess(boolean completed) {
|
||||
listenerCalled.set(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
throw new AssertionError("Unexpected method call", e);
|
||||
}
|
||||
});
|
||||
|
||||
assertFalse(action.wasCompleted());
|
||||
assertEquals(1L, action.getExecutedCount());
|
||||
assertEquals(true, listenerCalled.get());
|
||||
}
|
||||
|
||||
public void testExecuteComplete() {
|
||||
|
||||
MockAction action = new MockAction();
|
||||
action.setCompleteOnExecute(true);
|
||||
|
||||
assertFalse(action.wasCompleted());
|
||||
assertEquals(0L, action.getExecutedCount());
|
||||
|
||||
SetOnce<Boolean> listenerCalled = new SetOnce<>();
|
||||
|
||||
action.execute(null, null, new LifecycleAction.Listener() {
|
||||
|
||||
@Override
|
||||
public void onSuccess(boolean completed) {
|
||||
listenerCalled.set(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
throw new AssertionError("Unexpected method call", e);
|
||||
}
|
||||
});
|
||||
|
||||
assertTrue(action.wasCompleted());
|
||||
assertEquals(1L, action.getExecutedCount());
|
||||
assertEquals(true, listenerCalled.get());
|
||||
}
|
||||
|
||||
public void testExecuteFailure() {
|
||||
Exception exception = new RuntimeException();
|
||||
|
||||
MockAction action = new MockAction();
|
||||
action.setCompleteOnExecute(randomBoolean());
|
||||
action.setExceptionToThrow(exception);
|
||||
|
||||
assertFalse(action.wasCompleted());
|
||||
assertEquals(0L, action.getExecutedCount());
|
||||
|
||||
SetOnce<Boolean> listenerCalled = new SetOnce<>();
|
||||
|
||||
action.execute(null, null, new LifecycleAction.Listener() {
|
||||
|
||||
@Override
|
||||
public void onSuccess(boolean completed) {
|
||||
throw new AssertionError("Unexpected method call");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
assertSame(exception, e);
|
||||
listenerCalled.set(true);
|
||||
}
|
||||
});
|
||||
|
||||
assertFalse(action.wasCompleted());
|
||||
assertEquals(1L, action.getExecutedCount());
|
||||
assertEquals(true, listenerCalled.get());
|
||||
}
|
||||
|
||||
}
|
|
@ -59,8 +59,8 @@ public abstract class MockIndexLifecycleContext implements IndexLifecycleContext
|
|||
}
|
||||
|
||||
@Override
|
||||
public void executeAction(LifecycleAction action) {
|
||||
action.execute(null, null);
|
||||
public void executeAction(LifecycleAction action, LifecycleAction.Listener listener) {
|
||||
action.execute(null, null, listener);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -138,10 +138,26 @@ public class MockIndexLifecycleContextTests extends ESTestCase {
|
|||
|
||||
MockAction action = new MockAction();
|
||||
|
||||
assertFalse(action.wasExecuted());
|
||||
assertFalse(action.wasCompleted());
|
||||
assertEquals(0L, action.getExecutedCount());
|
||||
|
||||
context.executeAction(action);
|
||||
SetOnce<Boolean> listenerCalled = new SetOnce<>();
|
||||
|
||||
assertTrue(action.wasExecuted());
|
||||
context.executeAction(action, new LifecycleAction.Listener() {
|
||||
|
||||
@Override
|
||||
public void onSuccess(boolean completed) {
|
||||
listenerCalled.set(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
throw new AssertionError("Unexpected method call", e);
|
||||
}
|
||||
});
|
||||
|
||||
assertTrue(action.wasCompleted());
|
||||
assertEquals(1L, action.getExecutedCount());
|
||||
assertEquals(true, listenerCalled.get());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -125,9 +125,12 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
|
|||
assertEquals(phaseName, context.getPhase());
|
||||
assertEquals(firstAction.getWriteableName(), context.getAction());
|
||||
|
||||
assertFalse(firstAction.wasExecuted());
|
||||
assertFalse(secondAction.wasExecuted());
|
||||
assertFalse(thirdAction.wasExecuted());
|
||||
assertTrue(firstAction.wasCompleted());
|
||||
assertEquals(1L, firstAction.getExecutedCount());
|
||||
assertFalse(secondAction.wasCompleted());
|
||||
assertEquals(0L, secondAction.getExecutedCount());
|
||||
assertFalse(thirdAction.wasCompleted());
|
||||
assertEquals(0L, thirdAction.getExecutedCount());
|
||||
}
|
||||
|
||||
public void testExecuteNewIndexFailure() throws Exception {
|
||||
|
@ -176,9 +179,12 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
|
|||
assertEquals(phaseName, context.getPhase());
|
||||
assertEquals("", context.getAction());
|
||||
|
||||
assertFalse(firstAction.wasExecuted());
|
||||
assertFalse(secondAction.wasExecuted());
|
||||
assertFalse(thirdAction.wasExecuted());
|
||||
assertFalse(firstAction.wasCompleted());
|
||||
assertEquals(0L, firstAction.getExecutedCount());
|
||||
assertFalse(secondAction.wasCompleted());
|
||||
assertEquals(0L, secondAction.getExecutedCount());
|
||||
assertFalse(thirdAction.wasCompleted());
|
||||
assertEquals(0L, thirdAction.getExecutedCount());
|
||||
}
|
||||
|
||||
public void testExecuteNewIndexNoActions() throws Exception {
|
||||
|
@ -243,9 +249,12 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
|
|||
assertEquals(phaseName, context.getPhase());
|
||||
assertEquals(Phase.PHASE_COMPLETED, context.getAction());
|
||||
|
||||
assertFalse(firstAction.wasExecuted());
|
||||
assertFalse(secondAction.wasExecuted());
|
||||
assertFalse(thirdAction.wasExecuted());
|
||||
assertFalse(firstAction.wasCompleted());
|
||||
assertEquals(0L, firstAction.getExecutedCount());
|
||||
assertFalse(secondAction.wasCompleted());
|
||||
assertEquals(0L, secondAction.getExecutedCount());
|
||||
assertFalse(thirdAction.wasCompleted());
|
||||
assertEquals(0L, thirdAction.getExecutedCount());
|
||||
}
|
||||
|
||||
public void testExecuteFirstAction() throws Exception {
|
||||
|
@ -290,9 +299,12 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
|
|||
assertEquals(phaseName, context.getPhase());
|
||||
assertEquals(firstAction.getWriteableName(), context.getAction());
|
||||
|
||||
assertTrue(firstAction.wasExecuted());
|
||||
assertFalse(secondAction.wasExecuted());
|
||||
assertFalse(thirdAction.wasExecuted());
|
||||
assertTrue(firstAction.wasCompleted());
|
||||
assertEquals(1L, firstAction.getExecutedCount());
|
||||
assertFalse(secondAction.wasCompleted());
|
||||
assertEquals(0L, secondAction.getExecutedCount());
|
||||
assertFalse(thirdAction.wasCompleted());
|
||||
assertEquals(0L, thirdAction.getExecutedCount());
|
||||
}
|
||||
|
||||
public void testExecuteSecondAction() throws Exception {
|
||||
|
@ -337,9 +349,12 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
|
|||
assertEquals(phaseName, context.getPhase());
|
||||
assertEquals(secondAction.getWriteableName(), context.getAction());
|
||||
|
||||
assertFalse(firstAction.wasExecuted());
|
||||
assertTrue(secondAction.wasExecuted());
|
||||
assertFalse(thirdAction.wasExecuted());
|
||||
assertFalse(firstAction.wasCompleted());
|
||||
assertEquals(0L, firstAction.getExecutedCount());
|
||||
assertTrue(secondAction.wasCompleted());
|
||||
assertEquals(1L, secondAction.getExecutedCount());
|
||||
assertFalse(thirdAction.wasCompleted());
|
||||
assertEquals(0L, thirdAction.getExecutedCount());
|
||||
}
|
||||
|
||||
public void testExecuteThirdAction() throws Exception {
|
||||
|
@ -385,9 +400,12 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
|
|||
assertEquals(phaseName, context.getPhase());
|
||||
assertEquals(thirdAction.getWriteableName(), context.getAction());
|
||||
|
||||
assertFalse(firstAction.wasExecuted());
|
||||
assertFalse(secondAction.wasExecuted());
|
||||
assertTrue(thirdAction.wasExecuted());
|
||||
assertFalse(firstAction.wasCompleted());
|
||||
assertEquals(0L, firstAction.getExecutedCount());
|
||||
assertFalse(secondAction.wasCompleted());
|
||||
assertEquals(0L, secondAction.getExecutedCount());
|
||||
assertTrue(thirdAction.wasCompleted());
|
||||
assertEquals(1L, thirdAction.getExecutedCount());
|
||||
}
|
||||
|
||||
public void testExecuteMissingAction() throws Exception {
|
||||
|
@ -434,9 +452,12 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
|
|||
assertEquals(phaseName, context.getPhase());
|
||||
assertEquals("does_not_exist", context.getAction());
|
||||
|
||||
assertFalse(firstAction.wasExecuted());
|
||||
assertFalse(secondAction.wasExecuted());
|
||||
assertFalse(thirdAction.wasExecuted());
|
||||
assertFalse(firstAction.wasCompleted());
|
||||
assertEquals(0L, firstAction.getExecutedCount());
|
||||
assertFalse(secondAction.wasCompleted());
|
||||
assertEquals(0L, secondAction.getExecutedCount());
|
||||
assertFalse(thirdAction.wasCompleted());
|
||||
assertEquals(0L, thirdAction.getExecutedCount());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue