[Rollup] improve stopping tests (#55666)

improve tests related to stopping using a client that answers and can be
synchronized with the test thread in order to test special situations

relates #55011
This commit is contained in:
Hendrik Muhs 2020-04-24 08:46:05 +02:00
parent 30f8c326fe
commit b213209f0c
1 changed files with 179 additions and 137 deletions

View File

@ -6,6 +6,9 @@
package org.elasticsearch.xpack.rollup.job; package org.elasticsearch.xpack.rollup.job;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
@ -19,6 +22,7 @@ import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregati
import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.client.NoOpClient;
import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.indexing.IndexerState;
@ -198,9 +202,10 @@ public class RollupJobTaskTests extends ESTestCase {
public void testStartWhenStopping() throws InterruptedException { public void testStartWhenStopping() throws InterruptedException {
RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap()); RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap());
Client client = mock(Client.class);
when(client.settings()).thenReturn(Settings.EMPTY); final CountDownLatch block = new CountDownLatch(1);
when(client.threadPool()).thenReturn(pool); final CountDownLatch unblock = new CountDownLatch(1);
try (NoOpClient client = getEmptySearchResponseClient(block, unblock)) {
SchedulerEngine schedulerEngine = mock(SchedulerEngine.class); SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);
AtomicInteger counter = new AtomicInteger(0); AtomicInteger counter = new AtomicInteger(0);
@ -216,8 +221,10 @@ public class RollupJobTaskTests extends ESTestCase {
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED)); assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED));
} else if (c == 1) { } else if (c == 1) {
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED)); assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED));
} else if (c == 2) {
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED));
} else { } else {
fail("Should not have updated persistent statuses > 2 times"); fail("Should not have updated persistent statuses > 3 times");
} }
listener.onResponse(new PersistentTasksCustomMetadata.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1, listener.onResponse(new PersistentTasksCustomMetadata.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1,
new PersistentTasksCustomMetadata.Assignment("foo", "foo"))); new PersistentTasksCustomMetadata.Assignment("foo", "foo")));
@ -248,6 +255,8 @@ public class RollupJobTaskTests extends ESTestCase {
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING)); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING));
assertThat(task.getStats().getNumInvocations(), equalTo(1L)); assertThat(task.getStats().getNumInvocations(), equalTo(1L));
// wait until the search request is send, this is unblocked in the client
block.await(3, TimeUnit.SECONDS);
task.stop(new ActionListener<StopRollupJobAction.Response>() { task.stop(new ActionListener<StopRollupJobAction.Response>() {
@Override @Override
public void onResponse(StopRollupJobAction.Response response) { public void onResponse(StopRollupJobAction.Response response) {
@ -260,6 +269,9 @@ public class RollupJobTaskTests extends ESTestCase {
} }
}); });
// we issued stop but the indexer is waiting for the search response, therefore we should be in stopping state
assertThat(((RollupJobStatus) task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPING));
CountDownLatch latch2 = new CountDownLatch(1); CountDownLatch latch2 = new CountDownLatch(1);
task.start(new ActionListener<StartRollupJobAction.Response>() { task.start(new ActionListener<StartRollupJobAction.Response>() {
@Override @Override
@ -275,6 +287,10 @@ public class RollupJobTaskTests extends ESTestCase {
} }
}); });
latch2.await(3, TimeUnit.SECONDS); latch2.await(3, TimeUnit.SECONDS);
// the the client answer
unblock.countDown();
}
} }
public void testStartWhenStopped() throws InterruptedException { public void testStartWhenStopped() throws InterruptedException {
@ -698,9 +714,9 @@ public class RollupJobTaskTests extends ESTestCase {
public void testStopWhenStopping() throws InterruptedException { public void testStopWhenStopping() throws InterruptedException {
RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap()); RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap());
Client client = mock(Client.class); final CountDownLatch block = new CountDownLatch(1);
when(client.settings()).thenReturn(Settings.EMPTY); final CountDownLatch unblock = new CountDownLatch(1);
when(client.threadPool()).thenReturn(pool); try (NoOpClient client = getEmptySearchResponseClient(block, unblock)) {
SchedulerEngine schedulerEngine = mock(SchedulerEngine.class); SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);
AtomicInteger counter = new AtomicInteger(0); AtomicInteger counter = new AtomicInteger(0);
@ -718,13 +734,14 @@ public class RollupJobTaskTests extends ESTestCase {
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED)); assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED));
} else if (c == 2) { } else if (c == 2) {
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED)); assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED));
} else if (c == 3) {
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED));
} else { } else {
fail("Should not have updated persistent statuses > 3 times"); fail("Should not have updated persistent statuses > 4 times");
} }
listener.onResponse(new PersistentTasksCustomMetadata.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1, listener.onResponse(new PersistentTasksCustomMetadata.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1,
new PersistentTasksCustomMetadata.Assignment("foo", "foo"))); new PersistentTasksCustomMetadata.Assignment("foo", "foo")));
counter.incrementAndGet(); counter.incrementAndGet();
} }
}; };
task.init(null, mock(TaskManager.class), taskId.toString(), 123); task.init(null, mock(TaskManager.class), taskId.toString(), 123);
@ -751,6 +768,9 @@ public class RollupJobTaskTests extends ESTestCase {
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING)); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING));
assertThat(task.getStats().getNumInvocations(), equalTo(1L)); assertThat(task.getStats().getNumInvocations(), equalTo(1L));
// wait until the search request is send, this is unblocked in the client
block.await(3, TimeUnit.SECONDS);
task.stop(new ActionListener<StopRollupJobAction.Response>() { task.stop(new ActionListener<StopRollupJobAction.Response>() {
@Override @Override
public void onResponse(StopRollupJobAction.Response response) { public void onResponse(StopRollupJobAction.Response response) {
@ -763,6 +783,9 @@ public class RollupJobTaskTests extends ESTestCase {
} }
}); });
// we issued stop but the indexer is waiting for the search response, therefore we should be in stopping state
assertThat(((RollupJobStatus) task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPING));
CountDownLatch latch2 = new CountDownLatch(1); CountDownLatch latch2 = new CountDownLatch(1);
task.stop(new ActionListener<StopRollupJobAction.Response>() { task.stop(new ActionListener<StopRollupJobAction.Response>() {
@Override @Override
@ -777,6 +800,8 @@ public class RollupJobTaskTests extends ESTestCase {
} }
}); });
latch2.await(3, TimeUnit.SECONDS); latch2.await(3, TimeUnit.SECONDS);
unblock.countDown();
}
} }
public void testStopWhenAborting() throws InterruptedException { public void testStopWhenAborting() throws InterruptedException {
@ -820,4 +845,21 @@ public class RollupJobTaskTests extends ESTestCase {
}); });
latch.await(3, TimeUnit.SECONDS); latch.await(3, TimeUnit.SECONDS);
} }
private NoOpClient getEmptySearchResponseClient(CountDownLatch unblock, CountDownLatch block) {
return new NoOpClient(getTestName()) {
@SuppressWarnings("unchecked")
@Override
protected <Request extends ActionRequest, Response extends ActionResponse>
void doExecute(ActionType<Response> action, Request request, ActionListener<Response> listener) {
try {
unblock.countDown();
block.await(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
fail("Should not have timed out");
}
listener.onResponse((Response) mock(SearchResponse.class));
}
};
}
} }