[7.x Backport] Refactor AllocatedPersistentTask#init(), move rollup ctor logic (#46406)

This makes the AllocatedPersistentTask#init() method protected so that
implementing classes can perform their initialization logic there,
instead of the constructor.  Rollup's task is adjusted to use this
init method.

It also slightly refactors the methods to se a static logger in the
AllocatedTask instead of passing it in via an argument.  This is
simpler, logged messages come from the task instead of the
service, and is easier for tests
This commit is contained in:
Zachary Tong 2019-09-11 17:00:28 -04:00 committed by GitHub
parent ffeeb41066
commit 6dc8ed5d57
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 108 additions and 58 deletions

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.persistent;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
@ -37,13 +38,13 @@ import java.util.function.Predicate;
*/
public class AllocatedPersistentTask extends CancellableTask {
private static final Logger logger = LogManager.getLogger(AllocatedPersistentTask.class);
private final AtomicReference<State> state;
private volatile String persistentTaskId;
private volatile long allocationId;
private volatile @Nullable Exception failure;
private volatile PersistentTasksService persistentTasksService;
private volatile Logger logger;
private volatile TaskManager taskManager;
public AllocatedPersistentTask(long id, String type, String action, String description, TaskId parentTask,
@ -85,10 +86,9 @@ public class AllocatedPersistentTask extends CancellableTask {
return persistentTaskId;
}
void init(PersistentTasksService persistentTasksService, TaskManager taskManager, Logger logger, String persistentTaskId, long
allocationId) {
protected void init(PersistentTasksService persistentTasksService, TaskManager taskManager,
String persistentTaskId, long allocationId) {
this.persistentTasksService = persistentTasksService;
this.logger = logger;
this.taskManager = taskManager;
this.persistentTaskId = persistentTaskId;
this.allocationId = allocationId;

View File

@ -183,7 +183,7 @@ public class PersistentTasksNodeService implements ClusterStateListener {
boolean processed = false;
try {
task.init(persistentTasksService, taskManager, logger, taskInProgress.getId(), taskInProgress.getAllocationId());
task.init(persistentTasksService, taskManager, taskInProgress.getId(), taskInProgress.getAllocationId());
logger.trace("Persistent task [{}] with id [{}] and allocation id [{}] was created", task.getAction(),
task.getPersistentTaskId(), task.getAllocationId());
try {

View File

@ -22,7 +22,9 @@ import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.persistent.PersistentTaskState;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.indexing.IndexerState;
@ -159,8 +161,11 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE
private final RollupJob job;
private final SchedulerEngine schedulerEngine;
private final ThreadPool threadPool;
private final RollupIndexer indexer;
private AtomicBoolean upgradedDocumentID;
private final Client client;
private final IndexerState initialIndexerState;
private final Map<String, Object> initialPosition;
private RollupIndexer indexer;
private final AtomicBoolean upgradedDocumentID = new AtomicBoolean(false);
RollupJobTask(long id, String type, String action, TaskId parentTask, RollupJob job, RollupJobStatus state,
Client client, SchedulerEngine schedulerEngine, ThreadPool threadPool, Map<String, String> headers) {
@ -168,35 +173,15 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE
this.job = job;
this.schedulerEngine = schedulerEngine;
this.threadPool = threadPool;
// We can assume the new ID scheme only for new jobs
this.upgradedDocumentID = new AtomicBoolean(true);
// If status is not null, we are resuming rather than starting fresh.
Map<String, Object> initialPosition = null;
IndexerState initialState = IndexerState.STOPPED;
if (state != null) {
final IndexerState existingState = state.getIndexerState();
logger.debug("We have existing state, setting state to [" + existingState + "] " +
"and current position to [" + state.getPosition() + "] for job [" + job.getConfig().getId() + "]");
if (existingState.equals(IndexerState.INDEXING)) {
/*
* If we were indexing, we have to reset back to STARTED otherwise the indexer will be "stuck" thinking
* it is indexing but without the actual indexing thread running.
*/
initialState = IndexerState.STARTED;
} else if (existingState.equals(IndexerState.ABORTING) || existingState.equals(IndexerState.STOPPING)) {
// It shouldn't be possible to persist ABORTING, but if for some reason it does,
// play it safe and restore the job as STOPPED. An admin will have to clean it up,
// but it won't be running, and won't delete itself either. Safest option.
// If we were STOPPING, that means it persisted but was killed before finally stopped... so ok
// to restore as STOPPED
initialState = IndexerState.STOPPED;
} else {
initialState = existingState;
}
initialPosition = state.getPosition();
this.client = client;
if (state == null) {
this.initialIndexerState = null;
this.initialPosition = null;
// We can assume the new ID scheme only for new jobs
this.upgradedDocumentID.set(true);
} else {
this.initialIndexerState = state.getIndexerState();
this.initialPosition = state.getPosition();
// Since we have state, we are resuming a job/checkpoint. Although we are resuming
// from something that was checkpointed, we can't guarantee it was the _final_ checkpoint
@ -207,8 +192,39 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE
// be true if it actually finished a full checkpoint.
this.upgradedDocumentID.set(state.isUpgradedDocumentID());
}
}
@Override
protected void init(PersistentTasksService persistentTasksService, TaskManager taskManager,
String persistentTaskId, long allocationId) {
super.init(persistentTasksService, taskManager, persistentTaskId, allocationId);
// If status is not null, we are resuming rather than starting fresh.
IndexerState initialState = IndexerState.STOPPED;
if (initialIndexerState != null) {
logger.debug("We have existing state, setting state to [" + initialIndexerState + "] " +
"and current position to [" + initialIndexerState + "] for job [" + job.getConfig().getId() + "]");
if (initialIndexerState.equals(IndexerState.INDEXING)) {
/*
* If we were indexing, we have to reset back to STARTED otherwise the indexer will be "stuck" thinking
* it is indexing but without the actual indexing thread running.
*/
initialState = IndexerState.STARTED;
} else if (initialIndexerState.equals(IndexerState.ABORTING) || initialIndexerState.equals(IndexerState.STOPPING)) {
// It shouldn't be possible to persist ABORTING, but if for some reason it does,
// play it safe and restore the job as STOPPED. An admin will have to clean it up,
// but it won't be running, and won't delete itself either. Safest option.
// If we were STOPPING, that means it persisted but was killed before finally stopped... so ok
// to restore as STOPPED
initialState = IndexerState.STOPPED;
} else {
initialState = initialIndexerState;
}
}
this.indexer = new ClientRollupPageManager(job, initialState, initialPosition,
new ParentTaskAssigningClient(client, new TaskId(getPersistentTaskId())), upgradedDocumentID);
new ParentTaskAssigningClient(client, getParentTaskId()), upgradedDocumentID);
}
@Override

View File

@ -17,6 +17,7 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
@ -67,8 +68,10 @@ public class RollupJobTaskTests extends ESTestCase {
Client client = mock(Client.class);
when(client.settings()).thenReturn(Settings.EMPTY);
SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
TaskId taskId = new TaskId("node", 123);
RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
status, client, schedulerEngine, pool, Collections.emptyMap());
task.init(null, mock(TaskManager.class), taskId.toString(), 123);
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
@ -80,8 +83,10 @@ public class RollupJobTaskTests extends ESTestCase {
Client client = mock(Client.class);
when(client.settings()).thenReturn(Settings.EMPTY);
SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
TaskId taskId = new TaskId("node", 123);
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
status, client, schedulerEngine, pool, Collections.emptyMap());
task.init(null, mock(TaskManager.class), taskId.toString(), 123);
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
@ -93,8 +98,10 @@ public class RollupJobTaskTests extends ESTestCase {
Client client = mock(Client.class);
when(client.settings()).thenReturn(Settings.EMPTY);
SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
TaskId taskId = new TaskId("node", 123);
RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
status, client, schedulerEngine, pool, Collections.emptyMap());
task.init(null, mock(TaskManager.class), taskId.toString(), 123);
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
@ -106,8 +113,10 @@ public class RollupJobTaskTests extends ESTestCase {
Client client = mock(Client.class);
when(client.settings()).thenReturn(Settings.EMPTY);
SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
TaskId taskId = new TaskId("node", 123);
RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
status, client, schedulerEngine, pool, Collections.emptyMap());
task.init(null, mock(TaskManager.class), taskId.toString(), 123);
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
@ -119,12 +128,13 @@ public class RollupJobTaskTests extends ESTestCase {
Client client = mock(Client.class);
when(client.settings()).thenReturn(Settings.EMPTY);
SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
TaskId taskId = new TaskId("node", 123);
RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
status, client, schedulerEngine, pool, Collections.emptyMap());
task.init(null, mock(TaskManager.class), taskId.toString(), 123);
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
assertFalse(((RollupJobStatus) task.getStatus()).isUpgradedDocumentID());
}
public void testInitialStatusIndexingNewID() {
@ -133,12 +143,13 @@ public class RollupJobTaskTests extends ESTestCase {
Client client = mock(Client.class);
when(client.settings()).thenReturn(Settings.EMPTY);
SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
TaskId taskId = new TaskId("node", 123);
RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
status, client, schedulerEngine, pool, Collections.emptyMap());
task.init(null, mock(TaskManager.class), taskId.toString(), 123);
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
assertTrue(((RollupJobStatus) task.getStatus()).isUpgradedDocumentID());
}
public void testNoInitialStatus() {
@ -146,11 +157,12 @@ public class RollupJobTaskTests extends ESTestCase {
Client client = mock(Client.class);
when(client.settings()).thenReturn(Settings.EMPTY);
SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
TaskId taskId = new TaskId("node", 123);
RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
null, client, schedulerEngine, pool, Collections.emptyMap());
task.init(null, mock(TaskManager.class), taskId.toString(), 123);
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
assertNull(((RollupJobStatus)task.getStatus()).getPosition());
assertTrue(((RollupJobStatus) task.getStatus()).isUpgradedDocumentID());
}
public void testStartWhenStarted() throws InterruptedException {
@ -159,8 +171,10 @@ public class RollupJobTaskTests extends ESTestCase {
Client client = mock(Client.class);
when(client.settings()).thenReturn(Settings.EMPTY);
SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
TaskId taskId = new TaskId("node", 123);
RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
status, client, schedulerEngine, pool, Collections.emptyMap());
task.init(null, mock(TaskManager.class), taskId.toString(), 123);
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
@ -190,8 +204,9 @@ public class RollupJobTaskTests extends ESTestCase {
SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);
AtomicInteger counter = new AtomicInteger(0);
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
null, client, schedulerEngine, pool, Collections.emptyMap()) {
TaskId taskId = new TaskId("node", 123);
RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
null, client, schedulerEngine, pool, Collections.emptyMap()) {
@Override
public void updatePersistentTaskState(PersistentTaskState taskState,
ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
@ -209,6 +224,7 @@ public class RollupJobTaskTests extends ESTestCase {
counter.incrementAndGet();
}
};
task.init(null, mock(TaskManager.class), taskId.toString(), 123);
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
assertNull(((RollupJobStatus)task.getStatus()).getPosition());
@ -267,7 +283,8 @@ public class RollupJobTaskTests extends ESTestCase {
Client client = mock(Client.class);
when(client.settings()).thenReturn(Settings.EMPTY);
SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
TaskId taskId = new TaskId("node", 123);
RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
status, client, schedulerEngine, pool, Collections.emptyMap()) {
@Override
public void updatePersistentTaskState(PersistentTaskState taskState,
@ -278,6 +295,7 @@ public class RollupJobTaskTests extends ESTestCase {
new PersistentTasksCustomMetaData.Assignment("foo", "foo")));
}
};
task.init(null, mock(TaskManager.class), taskId.toString(), 123);
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
@ -305,7 +323,8 @@ public class RollupJobTaskTests extends ESTestCase {
Client client = mock(Client.class);
when(client.settings()).thenReturn(Settings.EMPTY);
SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
TaskId taskId = new TaskId("node", 123);
RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
status, client, schedulerEngine, pool, Collections.emptyMap()) {
@Override
public void updatePersistentTaskState(PersistentTaskState taskState,
@ -316,6 +335,7 @@ public class RollupJobTaskTests extends ESTestCase {
new PersistentTasksCustomMetaData.Assignment("foo", "foo")));
}
};
task.init(null, mock(TaskManager.class), taskId.toString(), 123);
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
@ -346,7 +366,8 @@ public class RollupJobTaskTests extends ESTestCase {
when(client.settings()).thenReturn(Settings.EMPTY);
when(client.threadPool()).thenReturn(pool);
SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
TaskId taskId = new TaskId("node", 123);
RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
null, client, schedulerEngine, pool, Collections.emptyMap()) {
@Override
public void updatePersistentTaskState(PersistentTaskState taskState,
@ -357,6 +378,7 @@ public class RollupJobTaskTests extends ESTestCase {
new PersistentTasksCustomMetaData.Assignment("foo", "foo")));
}
};
task.init(null, mock(TaskManager.class), taskId.toString(), 123);
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
assertNull(((RollupJobStatus)task.getStatus()).getPosition());
@ -414,7 +436,8 @@ public class RollupJobTaskTests extends ESTestCase {
}).when(client).execute(anyObject(), anyObject(), anyObject());
SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
TaskId taskId = new TaskId("node", 123);
RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
null, client, schedulerEngine, pool, Collections.emptyMap()) {
@Override
public void updatePersistentTaskState(PersistentTaskState taskState,
@ -435,6 +458,7 @@ public class RollupJobTaskTests extends ESTestCase {
}
};
task.init(null, mock(TaskManager.class), taskId.toString(), 123);
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
assertNull(((RollupJobStatus)task.getStatus()).getPosition());
@ -502,7 +526,8 @@ public class RollupJobTaskTests extends ESTestCase {
}).when(client).execute(anyObject(), anyObject(), anyObject());
SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
TaskId taskId = new TaskId("node", 123);
RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
null, client, schedulerEngine, pool, Collections.emptyMap()) {
@Override
public void updatePersistentTaskState(PersistentTaskState taskState,
@ -523,6 +548,7 @@ public class RollupJobTaskTests extends ESTestCase {
}
};
task.init(null, mock(TaskManager.class), taskId.toString(), 123);
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
assertNull(((RollupJobStatus)task.getStatus()).getPosition());
@ -591,7 +617,8 @@ public class RollupJobTaskTests extends ESTestCase {
SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);
RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPED, null, false);
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
TaskId taskId = new TaskId("node", 123);
RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
status, client, schedulerEngine, pool, Collections.emptyMap()) {
@Override
public void updatePersistentTaskState(PersistentTaskState taskState,
@ -612,6 +639,7 @@ public class RollupJobTaskTests extends ESTestCase {
}
};
task.init(null, mock(TaskManager.class), taskId.toString(), 123);
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
assertNull(((RollupJobStatus)task.getStatus()).getPosition());
@ -646,8 +674,10 @@ public class RollupJobTaskTests extends ESTestCase {
Client client = mock(Client.class);
when(client.settings()).thenReturn(Settings.EMPTY);
SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
TaskId taskId = new TaskId("node", 123);
RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
status, client, schedulerEngine, pool, Collections.emptyMap());
task.init(null, mock(TaskManager.class), taskId.toString(), 123);
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
CountDownLatch latch = new CountDownLatch(1);
@ -674,7 +704,8 @@ public class RollupJobTaskTests extends ESTestCase {
SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);
AtomicInteger counter = new AtomicInteger(0);
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
TaskId taskId = new TaskId("node", 123);
RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
null, client, schedulerEngine, pool, Collections.emptyMap()) {
@Override
public void updatePersistentTaskState(PersistentTaskState taskState,
@ -696,6 +727,7 @@ public class RollupJobTaskTests extends ESTestCase {
}
};
task.init(null, mock(TaskManager.class), taskId.toString(), 123);
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
assertNull(((RollupJobStatus)task.getStatus()).getPosition());
@ -760,13 +792,15 @@ public class RollupJobTaskTests extends ESTestCase {
// the task would end before stop could be called. But to help test out all pathways,
// just in case, we can override markAsCompleted so it's a no-op and test how stop
// handles the situation
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
TaskId taskId = new TaskId("node", 123);
RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
status, client, schedulerEngine, pool, Collections.emptyMap()) {
@Override
public void markAsCompleted() {
latch.countDown();
}
};
task.init(null, mock(TaskManager.class), taskId.toString(), 123);
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
task.onCancelled();