diff --git a/server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java b/server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java index 54dcffab6e3..15fe23d58c7 100644 --- a/server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java +++ b/server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.persistent; +import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; @@ -37,13 +38,13 @@ import java.util.function.Predicate; */ public class AllocatedPersistentTask extends CancellableTask { + private static final Logger logger = LogManager.getLogger(AllocatedPersistentTask.class); private final AtomicReference state; private volatile String persistentTaskId; private volatile long allocationId; private volatile @Nullable Exception failure; private volatile PersistentTasksService persistentTasksService; - private volatile Logger logger; private volatile TaskManager taskManager; public AllocatedPersistentTask(long id, String type, String action, String description, TaskId parentTask, @@ -85,10 +86,9 @@ public class AllocatedPersistentTask extends CancellableTask { return persistentTaskId; } - void init(PersistentTasksService persistentTasksService, TaskManager taskManager, Logger logger, String persistentTaskId, long - allocationId) { + protected void init(PersistentTasksService persistentTasksService, TaskManager taskManager, + String persistentTaskId, long allocationId) { this.persistentTasksService = persistentTasksService; - this.logger = logger; this.taskManager = taskManager; this.persistentTaskId = persistentTaskId; this.allocationId = allocationId; diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java index 9b811a079ef..14ff29e1397 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java @@ -183,7 +183,7 @@ public class PersistentTasksNodeService implements ClusterStateListener { boolean processed = false; try { - task.init(persistentTasksService, taskManager, logger, taskInProgress.getId(), taskInProgress.getAllocationId()); + task.init(persistentTasksService, taskManager, taskInProgress.getId(), taskInProgress.getAllocationId()); logger.trace("Persistent task [{}] with id [{}] and allocation id [{}] was created", task.getAction(), task.getPersistentTaskId(), task.getAllocationId()); try { diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java index 668d62a9718..4d2fb8d5d7a 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java @@ -22,7 +22,9 @@ import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.persistent.PersistentTaskState; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksExecutor; +import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.indexing.IndexerState; @@ -159,8 +161,11 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE private final RollupJob job; private final SchedulerEngine schedulerEngine; private final ThreadPool threadPool; - private final RollupIndexer indexer; - private AtomicBoolean upgradedDocumentID; + private final Client client; + private final IndexerState initialIndexerState; + private final Map initialPosition; + private RollupIndexer indexer; + private final AtomicBoolean upgradedDocumentID = new AtomicBoolean(false); RollupJobTask(long id, String type, String action, TaskId parentTask, RollupJob job, RollupJobStatus state, Client client, SchedulerEngine schedulerEngine, ThreadPool threadPool, Map headers) { @@ -168,35 +173,15 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE this.job = job; this.schedulerEngine = schedulerEngine; this.threadPool = threadPool; - - // We can assume the new ID scheme only for new jobs - this.upgradedDocumentID = new AtomicBoolean(true); - - // If status is not null, we are resuming rather than starting fresh. - Map initialPosition = null; - IndexerState initialState = IndexerState.STOPPED; - if (state != null) { - final IndexerState existingState = state.getIndexerState(); - logger.debug("We have existing state, setting state to [" + existingState + "] " + - "and current position to [" + state.getPosition() + "] for job [" + job.getConfig().getId() + "]"); - if (existingState.equals(IndexerState.INDEXING)) { - /* - * If we were indexing, we have to reset back to STARTED otherwise the indexer will be "stuck" thinking - * it is indexing but without the actual indexing thread running. - */ - initialState = IndexerState.STARTED; - - } else if (existingState.equals(IndexerState.ABORTING) || existingState.equals(IndexerState.STOPPING)) { - // It shouldn't be possible to persist ABORTING, but if for some reason it does, - // play it safe and restore the job as STOPPED. An admin will have to clean it up, - // but it won't be running, and won't delete itself either. Safest option. - // If we were STOPPING, that means it persisted but was killed before finally stopped... so ok - // to restore as STOPPED - initialState = IndexerState.STOPPED; - } else { - initialState = existingState; - } - initialPosition = state.getPosition(); + this.client = client; + if (state == null) { + this.initialIndexerState = null; + this.initialPosition = null; + // We can assume the new ID scheme only for new jobs + this.upgradedDocumentID.set(true); + } else { + this.initialIndexerState = state.getIndexerState(); + this.initialPosition = state.getPosition(); // Since we have state, we are resuming a job/checkpoint. Although we are resuming // from something that was checkpointed, we can't guarantee it was the _final_ checkpoint @@ -207,8 +192,39 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE // be true if it actually finished a full checkpoint. this.upgradedDocumentID.set(state.isUpgradedDocumentID()); } + + } + + @Override + protected void init(PersistentTasksService persistentTasksService, TaskManager taskManager, + String persistentTaskId, long allocationId) { + super.init(persistentTasksService, taskManager, persistentTaskId, allocationId); + + // If status is not null, we are resuming rather than starting fresh. + IndexerState initialState = IndexerState.STOPPED; + if (initialIndexerState != null) { + logger.debug("We have existing state, setting state to [" + initialIndexerState + "] " + + "and current position to [" + initialIndexerState + "] for job [" + job.getConfig().getId() + "]"); + if (initialIndexerState.equals(IndexerState.INDEXING)) { + /* + * If we were indexing, we have to reset back to STARTED otherwise the indexer will be "stuck" thinking + * it is indexing but without the actual indexing thread running. + */ + initialState = IndexerState.STARTED; + + } else if (initialIndexerState.equals(IndexerState.ABORTING) || initialIndexerState.equals(IndexerState.STOPPING)) { + // It shouldn't be possible to persist ABORTING, but if for some reason it does, + // play it safe and restore the job as STOPPED. An admin will have to clean it up, + // but it won't be running, and won't delete itself either. Safest option. + // If we were STOPPING, that means it persisted but was killed before finally stopped... so ok + // to restore as STOPPED + initialState = IndexerState.STOPPED; + } else { + initialState = initialIndexerState; + } + } this.indexer = new ClientRollupPageManager(job, initialState, initialPosition, - new ParentTaskAssigningClient(client, new TaskId(getPersistentTaskId())), upgradedDocumentID); + new ParentTaskAssigningClient(client, getParentTaskId()), upgradedDocumentID); } @Override diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java index 59073e763c2..1ebc65ef83a 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; @@ -67,8 +68,10 @@ public class RollupJobTaskTests extends ESTestCase { Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC()); - RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, + TaskId taskId = new TaskId("node", 123); + RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job, status, client, schedulerEngine, pool, Collections.emptyMap()); + task.init(null, mock(TaskManager.class), taskId.toString(), 123); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1)); assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo")); @@ -80,8 +83,10 @@ public class RollupJobTaskTests extends ESTestCase { Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC()); + TaskId taskId = new TaskId("node", 123); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, status, client, schedulerEngine, pool, Collections.emptyMap()); + task.init(null, mock(TaskManager.class), taskId.toString(), 123); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1)); assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo")); @@ -93,8 +98,10 @@ public class RollupJobTaskTests extends ESTestCase { Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC()); - RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, + TaskId taskId = new TaskId("node", 123); + RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job, status, client, schedulerEngine, pool, Collections.emptyMap()); + task.init(null, mock(TaskManager.class), taskId.toString(), 123); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1)); assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo")); @@ -106,8 +113,10 @@ public class RollupJobTaskTests extends ESTestCase { Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC()); - RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, + TaskId taskId = new TaskId("node", 123); + RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job, status, client, schedulerEngine, pool, Collections.emptyMap()); + task.init(null, mock(TaskManager.class), taskId.toString(), 123); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED)); assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1)); assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo")); @@ -119,12 +128,13 @@ public class RollupJobTaskTests extends ESTestCase { Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC()); - RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, + TaskId taskId = new TaskId("node", 123); + RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job, status, client, schedulerEngine, pool, Collections.emptyMap()); + task.init(null, mock(TaskManager.class), taskId.toString(), 123); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED)); assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1)); assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo")); - assertFalse(((RollupJobStatus) task.getStatus()).isUpgradedDocumentID()); } public void testInitialStatusIndexingNewID() { @@ -133,12 +143,13 @@ public class RollupJobTaskTests extends ESTestCase { Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC()); - RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, + TaskId taskId = new TaskId("node", 123); + RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job, status, client, schedulerEngine, pool, Collections.emptyMap()); + task.init(null, mock(TaskManager.class), taskId.toString(), 123); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED)); assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1)); assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo")); - assertTrue(((RollupJobStatus) task.getStatus()).isUpgradedDocumentID()); } public void testNoInitialStatus() { @@ -146,11 +157,12 @@ public class RollupJobTaskTests extends ESTestCase { Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC()); - RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, + TaskId taskId = new TaskId("node", 123); + RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job, null, client, schedulerEngine, pool, Collections.emptyMap()); + task.init(null, mock(TaskManager.class), taskId.toString(), 123); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); assertNull(((RollupJobStatus)task.getStatus()).getPosition()); - assertTrue(((RollupJobStatus) task.getStatus()).isUpgradedDocumentID()); } public void testStartWhenStarted() throws InterruptedException { @@ -159,8 +171,10 @@ public class RollupJobTaskTests extends ESTestCase { Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC()); - RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, + TaskId taskId = new TaskId("node", 123); + RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job, status, client, schedulerEngine, pool, Collections.emptyMap()); + task.init(null, mock(TaskManager.class), taskId.toString(), 123); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED)); assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1)); assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo")); @@ -190,8 +204,9 @@ public class RollupJobTaskTests extends ESTestCase { SchedulerEngine schedulerEngine = mock(SchedulerEngine.class); AtomicInteger counter = new AtomicInteger(0); - RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, - null, client, schedulerEngine, pool, Collections.emptyMap()) { + TaskId taskId = new TaskId("node", 123); + RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job, + null, client, schedulerEngine, pool, Collections.emptyMap()) { @Override public void updatePersistentTaskState(PersistentTaskState taskState, ActionListener> listener) { @@ -209,6 +224,7 @@ public class RollupJobTaskTests extends ESTestCase { counter.incrementAndGet(); } }; + task.init(null, mock(TaskManager.class), taskId.toString(), 123); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); assertNull(((RollupJobStatus)task.getStatus()).getPosition()); @@ -267,7 +283,8 @@ public class RollupJobTaskTests extends ESTestCase { Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); SchedulerEngine schedulerEngine = mock(SchedulerEngine.class); - RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, + TaskId taskId = new TaskId("node", 123); + RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job, status, client, schedulerEngine, pool, Collections.emptyMap()) { @Override public void updatePersistentTaskState(PersistentTaskState taskState, @@ -278,6 +295,7 @@ public class RollupJobTaskTests extends ESTestCase { new PersistentTasksCustomMetaData.Assignment("foo", "foo"))); } }; + task.init(null, mock(TaskManager.class), taskId.toString(), 123); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1)); assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo")); @@ -305,7 +323,8 @@ public class RollupJobTaskTests extends ESTestCase { Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); SchedulerEngine schedulerEngine = mock(SchedulerEngine.class); - RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, + TaskId taskId = new TaskId("node", 123); + RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job, status, client, schedulerEngine, pool, Collections.emptyMap()) { @Override public void updatePersistentTaskState(PersistentTaskState taskState, @@ -316,6 +335,7 @@ public class RollupJobTaskTests extends ESTestCase { new PersistentTasksCustomMetaData.Assignment("foo", "foo"))); } }; + task.init(null, mock(TaskManager.class), taskId.toString(), 123); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1)); assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo")); @@ -346,7 +366,8 @@ public class RollupJobTaskTests extends ESTestCase { when(client.settings()).thenReturn(Settings.EMPTY); when(client.threadPool()).thenReturn(pool); SchedulerEngine schedulerEngine = mock(SchedulerEngine.class); - RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, + TaskId taskId = new TaskId("node", 123); + RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job, null, client, schedulerEngine, pool, Collections.emptyMap()) { @Override public void updatePersistentTaskState(PersistentTaskState taskState, @@ -357,6 +378,7 @@ public class RollupJobTaskTests extends ESTestCase { new PersistentTasksCustomMetaData.Assignment("foo", "foo"))); } }; + task.init(null, mock(TaskManager.class), taskId.toString(), 123); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); assertNull(((RollupJobStatus)task.getStatus()).getPosition()); @@ -414,7 +436,8 @@ public class RollupJobTaskTests extends ESTestCase { }).when(client).execute(anyObject(), anyObject(), anyObject()); SchedulerEngine schedulerEngine = mock(SchedulerEngine.class); - RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, + TaskId taskId = new TaskId("node", 123); + RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job, null, client, schedulerEngine, pool, Collections.emptyMap()) { @Override public void updatePersistentTaskState(PersistentTaskState taskState, @@ -435,6 +458,7 @@ public class RollupJobTaskTests extends ESTestCase { } }; + task.init(null, mock(TaskManager.class), taskId.toString(), 123); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); assertNull(((RollupJobStatus)task.getStatus()).getPosition()); @@ -502,7 +526,8 @@ public class RollupJobTaskTests extends ESTestCase { }).when(client).execute(anyObject(), anyObject(), anyObject()); SchedulerEngine schedulerEngine = mock(SchedulerEngine.class); - RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, + TaskId taskId = new TaskId("node", 123); + RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job, null, client, schedulerEngine, pool, Collections.emptyMap()) { @Override public void updatePersistentTaskState(PersistentTaskState taskState, @@ -523,6 +548,7 @@ public class RollupJobTaskTests extends ESTestCase { } }; + task.init(null, mock(TaskManager.class), taskId.toString(), 123); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); assertNull(((RollupJobStatus)task.getStatus()).getPosition()); @@ -591,7 +617,8 @@ public class RollupJobTaskTests extends ESTestCase { SchedulerEngine schedulerEngine = mock(SchedulerEngine.class); RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPED, null, false); - RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, + TaskId taskId = new TaskId("node", 123); + RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job, status, client, schedulerEngine, pool, Collections.emptyMap()) { @Override public void updatePersistentTaskState(PersistentTaskState taskState, @@ -612,6 +639,7 @@ public class RollupJobTaskTests extends ESTestCase { } }; + task.init(null, mock(TaskManager.class), taskId.toString(), 123); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); assertNull(((RollupJobStatus)task.getStatus()).getPosition()); @@ -646,8 +674,10 @@ public class RollupJobTaskTests extends ESTestCase { Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC()); - RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, + TaskId taskId = new TaskId("node", 123); + RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job, status, client, schedulerEngine, pool, Collections.emptyMap()); + task.init(null, mock(TaskManager.class), taskId.toString(), 123); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); CountDownLatch latch = new CountDownLatch(1); @@ -674,7 +704,8 @@ public class RollupJobTaskTests extends ESTestCase { SchedulerEngine schedulerEngine = mock(SchedulerEngine.class); AtomicInteger counter = new AtomicInteger(0); - RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, + TaskId taskId = new TaskId("node", 123); + RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job, null, client, schedulerEngine, pool, Collections.emptyMap()) { @Override public void updatePersistentTaskState(PersistentTaskState taskState, @@ -696,6 +727,7 @@ public class RollupJobTaskTests extends ESTestCase { } }; + task.init(null, mock(TaskManager.class), taskId.toString(), 123); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); assertNull(((RollupJobStatus)task.getStatus()).getPosition()); @@ -760,13 +792,15 @@ public class RollupJobTaskTests extends ESTestCase { // the task would end before stop could be called. But to help test out all pathways, // just in case, we can override markAsCompleted so it's a no-op and test how stop // handles the situation - RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, + TaskId taskId = new TaskId("node", 123); + RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job, status, client, schedulerEngine, pool, Collections.emptyMap()) { @Override public void markAsCompleted() { latch.countDown(); } }; + task.init(null, mock(TaskManager.class), taskId.toString(), 123); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); task.onCancelled();