diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java index 329d4c5d0e0..4c8375e759d 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java @@ -6,6 +6,9 @@ package org.elasticsearch.xpack.rollup.job; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.client.Client; @@ -19,6 +22,7 @@ import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregati import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.client.NoOpClient; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.indexing.IndexerState; @@ -198,83 +202,95 @@ public class RollupJobTaskTests extends ESTestCase { public void testStartWhenStopping() throws InterruptedException { RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap()); - Client client = mock(Client.class); - when(client.settings()).thenReturn(Settings.EMPTY); - when(client.threadPool()).thenReturn(pool); - SchedulerEngine schedulerEngine = mock(SchedulerEngine.class); - AtomicInteger counter = new AtomicInteger(0); - TaskId taskId = new TaskId("node", 123); - RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job, - null, client, schedulerEngine, pool, Collections.emptyMap()) { - @Override - public void updatePersistentTaskState(PersistentTaskState taskState, - ActionListener> listener) { - assertThat(taskState, instanceOf(RollupJobStatus.class)); - int c = counter.get(); - if (c == 0) { - assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED)); - } else if (c == 1) { - assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED)); - } else { - fail("Should not have updated persistent statuses > 2 times"); + final CountDownLatch block = new CountDownLatch(1); + final CountDownLatch unblock = new CountDownLatch(1); + try (NoOpClient client = getEmptySearchResponseClient(block, unblock)) { + SchedulerEngine schedulerEngine = mock(SchedulerEngine.class); + + AtomicInteger counter = new AtomicInteger(0); + TaskId taskId = new TaskId("node", 123); + RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job, + null, client, schedulerEngine, pool, Collections.emptyMap()) { + @Override + public void updatePersistentTaskState(PersistentTaskState taskState, + ActionListener> listener) { + assertThat(taskState, instanceOf(RollupJobStatus.class)); + int c = counter.get(); + if (c == 0) { + assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED)); + } else if (c == 1) { + assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED)); + } else if (c == 2) { + assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED)); + } else { + fail("Should not have updated persistent statuses > 3 times"); + } + listener.onResponse(new PersistentTasksCustomMetadata.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1, + new PersistentTasksCustomMetadata.Assignment("foo", "foo"))); + counter.incrementAndGet(); } - listener.onResponse(new PersistentTasksCustomMetadata.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1, - new PersistentTasksCustomMetadata.Assignment("foo", "foo"))); - counter.incrementAndGet(); - } - }; - task.init(null, mock(TaskManager.class), taskId.toString(), 123); - assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); - assertNull(((RollupJobStatus)task.getStatus()).getPosition()); + }; + task.init(null, mock(TaskManager.class), taskId.toString(), 123); + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); + assertNull(((RollupJobStatus)task.getStatus()).getPosition()); - CountDownLatch latch = new CountDownLatch(1); - task.start(new ActionListener() { - @Override - public void onResponse(StartRollupJobAction.Response response) { - assertTrue(response.isStarted()); - assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED)); - latch.countDown(); - } + CountDownLatch latch = new CountDownLatch(1); + task.start(new ActionListener() { + @Override + public void onResponse(StartRollupJobAction.Response response) { + assertTrue(response.isStarted()); + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED)); + latch.countDown(); + } - @Override - public void onFailure(Exception e) { - fail("Should not have entered onFailure"); - } - }); - latch.await(3, TimeUnit.SECONDS); + @Override + public void onFailure(Exception e) { + fail("Should not have entered onFailure"); + } + }); + latch.await(3, TimeUnit.SECONDS); - task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123)); - assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING)); - assertThat(task.getStats().getNumInvocations(), equalTo(1L)); + task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123)); + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING)); + assertThat(task.getStats().getNumInvocations(), equalTo(1L)); - task.stop(new ActionListener() { - @Override - public void onResponse(StopRollupJobAction.Response response) { - assertTrue(response.isStopped()); - } + // wait until the search request is send, this is unblocked in the client + block.await(3, TimeUnit.SECONDS); + task.stop(new ActionListener() { + @Override + public void onResponse(StopRollupJobAction.Response response) { + assertTrue(response.isStopped()); + } - @Override - public void onFailure(Exception e) { - fail("should not have entered onFailure"); - } - }); + @Override + public void onFailure(Exception e) { + fail("should not have entered onFailure"); + } + }); - CountDownLatch latch2 = new CountDownLatch(1); - task.start(new ActionListener() { - @Override - public void onResponse(StartRollupJobAction.Response response) { - fail("should not have entered onResponse"); - } + // we issued stop but the indexer is waiting for the search response, therefore we should be in stopping state + assertThat(((RollupJobStatus) task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPING)); - @Override - public void onFailure(Exception e) { - assertThat(e.getMessage(), equalTo("Cannot start task for Rollup Job [" - + job.getConfig().getId() + "] because state was [STOPPING]")); - latch2.countDown(); - } - }); - latch2.await(3, TimeUnit.SECONDS); + CountDownLatch latch2 = new CountDownLatch(1); + task.start(new ActionListener() { + @Override + public void onResponse(StartRollupJobAction.Response response) { + fail("should not have entered onResponse"); + } + + @Override + public void onFailure(Exception e) { + assertThat(e.getMessage(), equalTo("Cannot start task for Rollup Job [" + + job.getConfig().getId() + "] because state was [STOPPING]")); + latch2.countDown(); + } + }); + latch2.await(3, TimeUnit.SECONDS); + + // the the client answer + unblock.countDown(); + } } public void testStartWhenStopped() throws InterruptedException { @@ -698,85 +714,94 @@ public class RollupJobTaskTests extends ESTestCase { public void testStopWhenStopping() throws InterruptedException { RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap()); - Client client = mock(Client.class); - when(client.settings()).thenReturn(Settings.EMPTY); - when(client.threadPool()).thenReturn(pool); - SchedulerEngine schedulerEngine = mock(SchedulerEngine.class); + final CountDownLatch block = new CountDownLatch(1); + final CountDownLatch unblock = new CountDownLatch(1); + try (NoOpClient client = getEmptySearchResponseClient(block, unblock)) { + SchedulerEngine schedulerEngine = mock(SchedulerEngine.class); - AtomicInteger counter = new AtomicInteger(0); - TaskId taskId = new TaskId("node", 123); - RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job, - null, client, schedulerEngine, pool, Collections.emptyMap()) { - @Override - public void updatePersistentTaskState(PersistentTaskState taskState, - ActionListener> listener) { - assertThat(taskState, instanceOf(RollupJobStatus.class)); - int c = counter.get(); - if (c == 0) { - assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED)); - } else if (c == 1) { - assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED)); - } else if (c == 2) { - assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED)); - } else { - fail("Should not have updated persistent statuses > 3 times"); + AtomicInteger counter = new AtomicInteger(0); + TaskId taskId = new TaskId("node", 123); + RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job, + null, client, schedulerEngine, pool, Collections.emptyMap()) { + @Override + public void updatePersistentTaskState(PersistentTaskState taskState, + ActionListener> listener) { + assertThat(taskState, instanceOf(RollupJobStatus.class)); + int c = counter.get(); + if (c == 0) { + assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED)); + } else if (c == 1) { + assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED)); + } else if (c == 2) { + assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED)); + } else if (c == 3) { + assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED)); + } else { + fail("Should not have updated persistent statuses > 4 times"); + } + listener.onResponse(new PersistentTasksCustomMetadata.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1, + new PersistentTasksCustomMetadata.Assignment("foo", "foo"))); + counter.incrementAndGet(); } - listener.onResponse(new PersistentTasksCustomMetadata.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1, - new PersistentTasksCustomMetadata.Assignment("foo", "foo"))); - counter.incrementAndGet(); + }; + task.init(null, mock(TaskManager.class), taskId.toString(), 123); + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); + assertNull(((RollupJobStatus)task.getStatus()).getPosition()); - } - }; - task.init(null, mock(TaskManager.class), taskId.toString(), 123); - assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); - assertNull(((RollupJobStatus)task.getStatus()).getPosition()); + CountDownLatch latch = new CountDownLatch(1); + task.start(new ActionListener() { + @Override + public void onResponse(StartRollupJobAction.Response response) { + assertTrue(response.isStarted()); + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED)); + latch.countDown(); + } - CountDownLatch latch = new CountDownLatch(1); - task.start(new ActionListener() { - @Override - public void onResponse(StartRollupJobAction.Response response) { - assertTrue(response.isStarted()); - assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED)); - latch.countDown(); - } + @Override + public void onFailure(Exception e) { + fail("Should not have entered onFailure"); + } + }); + latch.await(3, TimeUnit.SECONDS); - @Override - public void onFailure(Exception e) { - fail("Should not have entered onFailure"); - } - }); - latch.await(3, TimeUnit.SECONDS); + task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123)); + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING)); + assertThat(task.getStats().getNumInvocations(), equalTo(1L)); - task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123)); - assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING)); - assertThat(task.getStats().getNumInvocations(), equalTo(1L)); + // wait until the search request is send, this is unblocked in the client + block.await(3, TimeUnit.SECONDS); - task.stop(new ActionListener() { - @Override - public void onResponse(StopRollupJobAction.Response response) { - assertTrue(response.isStopped()); - } + task.stop(new ActionListener() { + @Override + public void onResponse(StopRollupJobAction.Response response) { + assertTrue(response.isStopped()); + } - @Override - public void onFailure(Exception e) { - fail("should not have entered onFailure"); - } - }); + @Override + public void onFailure(Exception e) { + fail("should not have entered onFailure"); + } + }); - CountDownLatch latch2 = new CountDownLatch(1); - task.stop(new ActionListener() { - @Override - public void onResponse(StopRollupJobAction.Response response) { - assertTrue(response.isStopped()); - latch2.countDown(); - } + // we issued stop but the indexer is waiting for the search response, therefore we should be in stopping state + assertThat(((RollupJobStatus) task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPING)); - @Override - public void onFailure(Exception e) { - fail("Should not have entered onFailure"); - } - }); - latch2.await(3, TimeUnit.SECONDS); + CountDownLatch latch2 = new CountDownLatch(1); + task.stop(new ActionListener() { + @Override + public void onResponse(StopRollupJobAction.Response response) { + assertTrue(response.isStopped()); + latch2.countDown(); + } + + @Override + public void onFailure(Exception e) { + fail("Should not have entered onFailure"); + } + }); + latch2.await(3, TimeUnit.SECONDS); + unblock.countDown(); + } } public void testStopWhenAborting() throws InterruptedException { @@ -820,4 +845,21 @@ public class RollupJobTaskTests extends ESTestCase { }); latch.await(3, TimeUnit.SECONDS); } + + private NoOpClient getEmptySearchResponseClient(CountDownLatch unblock, CountDownLatch block) { + return new NoOpClient(getTestName()) { + @SuppressWarnings("unchecked") + @Override + protected + void doExecute(ActionType action, Request request, ActionListener listener) { + try { + unblock.countDown(); + block.await(3, TimeUnit.SECONDS); + } catch (InterruptedException e) { + fail("Should not have timed out"); + } + listener.onResponse((Response) mock(SearchResponse.class)); + } + }; + } }