From 6dc8ed5d573cf705d8664025c63edd9306f30607 Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Wed, 11 Sep 2019 17:00:28 -0400 Subject: [PATCH] [7.x Backport] Refactor AllocatedPersistentTask#init(), move rollup ctor logic (#46406) This makes the AllocatedPersistentTask#init() method protected so that implementing classes can perform their initialization logic there, instead of the constructor. Rollup's task is adjusted to use this init method. It also slightly refactors the methods to se a static logger in the AllocatedTask instead of passing it in via an argument. This is simpler, logged messages come from the task instead of the service, and is easier for tests --- .../persistent/AllocatedPersistentTask.java | 8 +- .../PersistentTasksNodeService.java | 2 +- .../xpack/rollup/job/RollupJobTask.java | 80 +++++++++++-------- .../xpack/rollup/job/RollupJobTaskTests.java | 76 +++++++++++++----- 4 files changed, 108 insertions(+), 58 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java b/server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java index 54dcffab6e3..15fe23d58c7 100644 --- a/server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java +++ b/server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.persistent; +import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; @@ -37,13 +38,13 @@ import java.util.function.Predicate; */ public class AllocatedPersistentTask extends CancellableTask { + private static final Logger logger = LogManager.getLogger(AllocatedPersistentTask.class); private final AtomicReference state; private volatile String persistentTaskId; private volatile long allocationId; private volatile @Nullable Exception failure; private volatile PersistentTasksService persistentTasksService; - private volatile Logger logger; private volatile TaskManager taskManager; public AllocatedPersistentTask(long id, String type, String action, String description, TaskId parentTask, @@ -85,10 +86,9 @@ public class AllocatedPersistentTask extends CancellableTask { return persistentTaskId; } - void init(PersistentTasksService persistentTasksService, TaskManager taskManager, Logger logger, String persistentTaskId, long - allocationId) { + protected void init(PersistentTasksService persistentTasksService, TaskManager taskManager, + String persistentTaskId, long allocationId) { this.persistentTasksService = persistentTasksService; - this.logger = logger; this.taskManager = taskManager; this.persistentTaskId = persistentTaskId; this.allocationId = allocationId; diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java index 9b811a079ef..14ff29e1397 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java @@ -183,7 +183,7 @@ public class PersistentTasksNodeService implements ClusterStateListener { boolean processed = false; try { - task.init(persistentTasksService, taskManager, logger, taskInProgress.getId(), taskInProgress.getAllocationId()); + task.init(persistentTasksService, taskManager, taskInProgress.getId(), taskInProgress.getAllocationId()); logger.trace("Persistent task [{}] with id [{}] and allocation id [{}] was created", task.getAction(), task.getPersistentTaskId(), task.getAllocationId()); try { diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java index 668d62a9718..4d2fb8d5d7a 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java @@ -22,7 +22,9 @@ import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.persistent.PersistentTaskState; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksExecutor; +import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.indexing.IndexerState; @@ -159,8 +161,11 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE private final RollupJob job; private final SchedulerEngine schedulerEngine; private final ThreadPool threadPool; - private final RollupIndexer indexer; - private AtomicBoolean upgradedDocumentID; + private final Client client; + private final IndexerState initialIndexerState; + private final Map initialPosition; + private RollupIndexer indexer; + private final AtomicBoolean upgradedDocumentID = new AtomicBoolean(false); RollupJobTask(long id, String type, String action, TaskId parentTask, RollupJob job, RollupJobStatus state, Client client, SchedulerEngine schedulerEngine, ThreadPool threadPool, Map headers) { @@ -168,35 +173,15 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE this.job = job; this.schedulerEngine = schedulerEngine; this.threadPool = threadPool; - - // We can assume the new ID scheme only for new jobs - this.upgradedDocumentID = new AtomicBoolean(true); - - // If status is not null, we are resuming rather than starting fresh. - Map initialPosition = null; - IndexerState initialState = IndexerState.STOPPED; - if (state != null) { - final IndexerState existingState = state.getIndexerState(); - logger.debug("We have existing state, setting state to [" + existingState + "] " + - "and current position to [" + state.getPosition() + "] for job [" + job.getConfig().getId() + "]"); - if (existingState.equals(IndexerState.INDEXING)) { - /* - * If we were indexing, we have to reset back to STARTED otherwise the indexer will be "stuck" thinking - * it is indexing but without the actual indexing thread running. - */ - initialState = IndexerState.STARTED; - - } else if (existingState.equals(IndexerState.ABORTING) || existingState.equals(IndexerState.STOPPING)) { - // It shouldn't be possible to persist ABORTING, but if for some reason it does, - // play it safe and restore the job as STOPPED. An admin will have to clean it up, - // but it won't be running, and won't delete itself either. Safest option. - // If we were STOPPING, that means it persisted but was killed before finally stopped... so ok - // to restore as STOPPED - initialState = IndexerState.STOPPED; - } else { - initialState = existingState; - } - initialPosition = state.getPosition(); + this.client = client; + if (state == null) { + this.initialIndexerState = null; + this.initialPosition = null; + // We can assume the new ID scheme only for new jobs + this.upgradedDocumentID.set(true); + } else { + this.initialIndexerState = state.getIndexerState(); + this.initialPosition = state.getPosition(); // Since we have state, we are resuming a job/checkpoint. Although we are resuming // from something that was checkpointed, we can't guarantee it was the _final_ checkpoint @@ -207,8 +192,39 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE // be true if it actually finished a full checkpoint. this.upgradedDocumentID.set(state.isUpgradedDocumentID()); } + + } + + @Override + protected void init(PersistentTasksService persistentTasksService, TaskManager taskManager, + String persistentTaskId, long allocationId) { + super.init(persistentTasksService, taskManager, persistentTaskId, allocationId); + + // If status is not null, we are resuming rather than starting fresh. + IndexerState initialState = IndexerState.STOPPED; + if (initialIndexerState != null) { + logger.debug("We have existing state, setting state to [" + initialIndexerState + "] " + + "and current position to [" + initialIndexerState + "] for job [" + job.getConfig().getId() + "]"); + if (initialIndexerState.equals(IndexerState.INDEXING)) { + /* + * If we were indexing, we have to reset back to STARTED otherwise the indexer will be "stuck" thinking + * it is indexing but without the actual indexing thread running. + */ + initialState = IndexerState.STARTED; + + } else if (initialIndexerState.equals(IndexerState.ABORTING) || initialIndexerState.equals(IndexerState.STOPPING)) { + // It shouldn't be possible to persist ABORTING, but if for some reason it does, + // play it safe and restore the job as STOPPED. An admin will have to clean it up, + // but it won't be running, and won't delete itself either. Safest option. + // If we were STOPPING, that means it persisted but was killed before finally stopped... so ok + // to restore as STOPPED + initialState = IndexerState.STOPPED; + } else { + initialState = initialIndexerState; + } + } this.indexer = new ClientRollupPageManager(job, initialState, initialPosition, - new ParentTaskAssigningClient(client, new TaskId(getPersistentTaskId())), upgradedDocumentID); + new ParentTaskAssigningClient(client, getParentTaskId()), upgradedDocumentID); } @Override diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java index 59073e763c2..1ebc65ef83a 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; @@ -67,8 +68,10 @@ public class RollupJobTaskTests extends ESTestCase { Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC()); - RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, + TaskId taskId = new TaskId("node", 123); + RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job, status, client, schedulerEngine, pool, Collections.emptyMap()); + task.init(null, mock(TaskManager.class), taskId.toString(), 123); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1)); assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo")); @@ -80,8 +83,10 @@ public class RollupJobTaskTests extends ESTestCase { Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC()); + TaskId taskId = new TaskId("node", 123); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, status, client, schedulerEngine, pool, Collections.emptyMap()); + task.init(null, mock(TaskManager.class), taskId.toString(), 123); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1)); assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo")); @@ -93,8 +98,10 @@ public class RollupJobTaskTests extends ESTestCase { Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC()); - RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, + TaskId taskId = new TaskId("node", 123); + RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job, status, client, schedulerEngine, pool, Collections.emptyMap()); + task.init(null, mock(TaskManager.class), taskId.toString(), 123); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1)); assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo")); @@ -106,8 +113,10 @@ public class RollupJobTaskTests extends ESTestCase { Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC()); - RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, + TaskId taskId = new TaskId("node", 123); + RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job, status, client, schedulerEngine, pool, Collections.emptyMap()); + task.init(null, mock(TaskManager.class), taskId.toString(), 123); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED)); assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1)); assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo")); @@ -119,12 +128,13 @@ public class RollupJobTaskTests extends ESTestCase { Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC()); - RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, + TaskId taskId = new TaskId("node", 123); + RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job, status, client, schedulerEngine, pool, Collections.emptyMap()); + task.init(null, mock(TaskManager.class), taskId.toString(), 123); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED)); assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1)); assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo")); - assertFalse(((RollupJobStatus) task.getStatus()).isUpgradedDocumentID()); } public void testInitialStatusIndexingNewID() { @@ -133,12 +143,13 @@ public class RollupJobTaskTests extends ESTestCase { Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC()); - RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, + TaskId taskId = new TaskId("node", 123); + RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job, status, client, schedulerEngine, pool, Collections.emptyMap()); + task.init(null, mock(TaskManager.class), taskId.toString(), 123); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED)); assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1)); assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo")); - assertTrue(((RollupJobStatus) task.getStatus()).isUpgradedDocumentID()); } public void testNoInitialStatus() { @@ -146,11 +157,12 @@ public class RollupJobTaskTests extends ESTestCase { Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC()); - RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, + TaskId taskId = new TaskId("node", 123); + RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job, null, client, schedulerEngine, pool, Collections.emptyMap()); + task.init(null, mock(TaskManager.class), taskId.toString(), 123); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); assertNull(((RollupJobStatus)task.getStatus()).getPosition()); - assertTrue(((RollupJobStatus) task.getStatus()).isUpgradedDocumentID()); } public void testStartWhenStarted() throws InterruptedException { @@ -159,8 +171,10 @@ public class RollupJobTaskTests extends ESTestCase { Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC()); - RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, + TaskId taskId = new TaskId("node", 123); + RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job, status, client, schedulerEngine, pool, Collections.emptyMap()); + task.init(null, mock(TaskManager.class), taskId.toString(), 123); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED)); assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1)); assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo")); @@ -190,8 +204,9 @@ public class RollupJobTaskTests extends ESTestCase { SchedulerEngine schedulerEngine = mock(SchedulerEngine.class); AtomicInteger counter = new AtomicInteger(0); - RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, - null, client, schedulerEngine, pool, Collections.emptyMap()) { + TaskId taskId = new TaskId("node", 123); + RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job, + null, client, schedulerEngine, pool, Collections.emptyMap()) { @Override public void updatePersistentTaskState(PersistentTaskState taskState, ActionListener> listener) { @@ -209,6 +224,7 @@ public class RollupJobTaskTests extends ESTestCase { counter.incrementAndGet(); } }; + task.init(null, mock(TaskManager.class), taskId.toString(), 123); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); assertNull(((RollupJobStatus)task.getStatus()).getPosition()); @@ -267,7 +283,8 @@ public class RollupJobTaskTests extends ESTestCase { Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); SchedulerEngine schedulerEngine = mock(SchedulerEngine.class); - RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, + TaskId taskId = new TaskId("node", 123); + RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job, status, client, schedulerEngine, pool, Collections.emptyMap()) { @Override public void updatePersistentTaskState(PersistentTaskState taskState, @@ -278,6 +295,7 @@ public class RollupJobTaskTests extends ESTestCase { new PersistentTasksCustomMetaData.Assignment("foo", "foo"))); } }; + task.init(null, mock(TaskManager.class), taskId.toString(), 123); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1)); assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo")); @@ -305,7 +323,8 @@ public class RollupJobTaskTests extends ESTestCase { Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); SchedulerEngine schedulerEngine = mock(SchedulerEngine.class); - RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, + TaskId taskId = new TaskId("node", 123); + RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job, status, client, schedulerEngine, pool, Collections.emptyMap()) { @Override public void updatePersistentTaskState(PersistentTaskState taskState, @@ -316,6 +335,7 @@ public class RollupJobTaskTests extends ESTestCase { new PersistentTasksCustomMetaData.Assignment("foo", "foo"))); } }; + task.init(null, mock(TaskManager.class), taskId.toString(), 123); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1)); assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo")); @@ -346,7 +366,8 @@ public class RollupJobTaskTests extends ESTestCase { when(client.settings()).thenReturn(Settings.EMPTY); when(client.threadPool()).thenReturn(pool); SchedulerEngine schedulerEngine = mock(SchedulerEngine.class); - RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, + TaskId taskId = new TaskId("node", 123); + RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job, null, client, schedulerEngine, pool, Collections.emptyMap()) { @Override public void updatePersistentTaskState(PersistentTaskState taskState, @@ -357,6 +378,7 @@ public class RollupJobTaskTests extends ESTestCase { new PersistentTasksCustomMetaData.Assignment("foo", "foo"))); } }; + task.init(null, mock(TaskManager.class), taskId.toString(), 123); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); assertNull(((RollupJobStatus)task.getStatus()).getPosition()); @@ -414,7 +436,8 @@ public class RollupJobTaskTests extends ESTestCase { }).when(client).execute(anyObject(), anyObject(), anyObject()); SchedulerEngine schedulerEngine = mock(SchedulerEngine.class); - RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, + TaskId taskId = new TaskId("node", 123); + RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job, null, client, schedulerEngine, pool, Collections.emptyMap()) { @Override public void updatePersistentTaskState(PersistentTaskState taskState, @@ -435,6 +458,7 @@ public class RollupJobTaskTests extends ESTestCase { } }; + task.init(null, mock(TaskManager.class), taskId.toString(), 123); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); assertNull(((RollupJobStatus)task.getStatus()).getPosition()); @@ -502,7 +526,8 @@ public class RollupJobTaskTests extends ESTestCase { }).when(client).execute(anyObject(), anyObject(), anyObject()); SchedulerEngine schedulerEngine = mock(SchedulerEngine.class); - RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, + TaskId taskId = new TaskId("node", 123); + RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job, null, client, schedulerEngine, pool, Collections.emptyMap()) { @Override public void updatePersistentTaskState(PersistentTaskState taskState, @@ -523,6 +548,7 @@ public class RollupJobTaskTests extends ESTestCase { } }; + task.init(null, mock(TaskManager.class), taskId.toString(), 123); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); assertNull(((RollupJobStatus)task.getStatus()).getPosition()); @@ -591,7 +617,8 @@ public class RollupJobTaskTests extends ESTestCase { SchedulerEngine schedulerEngine = mock(SchedulerEngine.class); RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPED, null, false); - RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, + TaskId taskId = new TaskId("node", 123); + RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job, status, client, schedulerEngine, pool, Collections.emptyMap()) { @Override public void updatePersistentTaskState(PersistentTaskState taskState, @@ -612,6 +639,7 @@ public class RollupJobTaskTests extends ESTestCase { } }; + task.init(null, mock(TaskManager.class), taskId.toString(), 123); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); assertNull(((RollupJobStatus)task.getStatus()).getPosition()); @@ -646,8 +674,10 @@ public class RollupJobTaskTests extends ESTestCase { Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC()); - RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, + TaskId taskId = new TaskId("node", 123); + RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job, status, client, schedulerEngine, pool, Collections.emptyMap()); + task.init(null, mock(TaskManager.class), taskId.toString(), 123); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); CountDownLatch latch = new CountDownLatch(1); @@ -674,7 +704,8 @@ public class RollupJobTaskTests extends ESTestCase { SchedulerEngine schedulerEngine = mock(SchedulerEngine.class); AtomicInteger counter = new AtomicInteger(0); - RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, + TaskId taskId = new TaskId("node", 123); + RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job, null, client, schedulerEngine, pool, Collections.emptyMap()) { @Override public void updatePersistentTaskState(PersistentTaskState taskState, @@ -696,6 +727,7 @@ public class RollupJobTaskTests extends ESTestCase { } }; + task.init(null, mock(TaskManager.class), taskId.toString(), 123); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); assertNull(((RollupJobStatus)task.getStatus()).getPosition()); @@ -760,13 +792,15 @@ public class RollupJobTaskTests extends ESTestCase { // the task would end before stop could be called. But to help test out all pathways, // just in case, we can override markAsCompleted so it's a no-op and test how stop // handles the situation - RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, + TaskId taskId = new TaskId("node", 123); + RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job, status, client, schedulerEngine, pool, Collections.emptyMap()) { @Override public void markAsCompleted() { latch.countDown(); } }; + task.init(null, mock(TaskManager.class), taskId.toString(), 123); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); task.onCancelled();