Remove AllocatedPersistentTask.getState() (#30858)

This commit removes the method AllocatedPersistentTask.getState() that
exposes the internal state of an AllocatedPersistentTask and replaces
it with a new isCompleted() method. Related to #29608.
This commit is contained in:
Tanguy Leroux 2018-05-29 09:26:02 +02:00 committed by GitHub
parent 6577f5b0d1
commit 6e480663d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 28 additions and 28 deletions

View File

@ -26,7 +26,6 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskManager;
@ -38,18 +37,16 @@ import java.util.function.Predicate;
* Represents a executor node operation that corresponds to a persistent task
*/
public class AllocatedPersistentTask extends CancellableTask {
private volatile String persistentTaskId;
private volatile long allocationId;
private final AtomicReference<State> state;
@Nullable
private volatile Exception failure;
private volatile String persistentTaskId;
private volatile long allocationId;
private volatile @Nullable Exception failure;
private volatile PersistentTasksService persistentTasksService;
private volatile Logger logger;
private volatile TaskManager taskManager;
public AllocatedPersistentTask(long id, String type, String action, String description, TaskId parentTask,
Map<String, String> headers) {
super(id, type, action, description, parentTask, headers);
@ -101,24 +98,10 @@ public class AllocatedPersistentTask extends CancellableTask {
return failure;
}
boolean markAsCancelled() {
return state.compareAndSet(AllocatedPersistentTask.State.STARTED, AllocatedPersistentTask.State.PENDING_CANCEL);
}
public State getState() {
return state.get();
}
public long getAllocationId() {
return allocationId;
}
public enum State {
STARTED, // the task is currently running
PENDING_CANCEL, // the task is cancelled on master, cancelling it locally
COMPLETED // the task is done running and trying to notify caller
}
/**
* Waits for this persistent task to have the desired state.
*/
@ -128,6 +111,14 @@ public class AllocatedPersistentTask extends CancellableTask {
persistentTasksService.waitForPersistentTaskStatus(persistentTaskId, predicate, timeout, listener);
}
final boolean isCompleted() {
return state.get() == State.COMPLETED;
}
boolean markAsCancelled() {
return state.compareAndSet(State.STARTED, State.PENDING_CANCEL);
}
public void markAsCompleted() {
completeAndNotifyIfNeeded(null);
}
@ -138,11 +129,10 @@ public class AllocatedPersistentTask extends CancellableTask {
} else {
completeAndNotifyIfNeeded(e);
}
}
private void completeAndNotifyIfNeeded(@Nullable Exception failure) {
State prevState = state.getAndSet(AllocatedPersistentTask.State.COMPLETED);
final State prevState = state.getAndSet(State.COMPLETED);
if (prevState == State.COMPLETED) {
logger.warn("attempt to complete task [{}] with id [{}] in the [{}] state", getAction(), getPersistentTaskId(), prevState);
} else {
@ -173,4 +163,10 @@ public class AllocatedPersistentTask extends CancellableTask {
}
}
}
public enum State {
STARTED, // the task is currently running
PENDING_CANCEL, // the task is cancelled on master, cancelling it locally
COMPLETED // the task is done running and trying to notify caller
}
}

View File

@ -123,7 +123,7 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
for (Long id : notVisitedTasks) {
AllocatedPersistentTask task = runningTasks.get(id);
if (task.getState() == AllocatedPersistentTask.State.COMPLETED) {
if (task.isCompleted()) {
// Result was sent to the caller and the caller acknowledged acceptance of the result
logger.trace("Found completed persistent task [{}] with id [{}] and allocation id [{}] - removing",
task.getAction(), task.getPersistentTaskId(), task.getAllocationId());

View File

@ -52,6 +52,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.sameInstance;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.mockito.Matchers.any;
@ -73,7 +74,6 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
threadPool = new TestThreadPool(getClass().getName());
}
@Override
@After
public void tearDown() throws Exception {
@ -95,7 +95,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
return state.build();
}
public void testStartTask() throws Exception {
public void testStartTask() {
PersistentTasksService persistentTasksService = mock(PersistentTasksService.class);
@SuppressWarnings("unchecked") PersistentTasksExecutor<TestParams> action = mock(PersistentTasksExecutor.class);
when(action.getExecutor()).thenReturn(ThreadPool.Names.SAME);
@ -131,8 +131,8 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
if (added == false) {
logger.info("No local node action was added");
}
MetaData.Builder metaData = MetaData.builder(state.metaData());
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks.build());
ClusterState newClusterState = ClusterState.builder(state).metaData(metaData).build();
@ -149,6 +149,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
// Make sure action wasn't called again
assertThat(executor.executions.size(), equalTo(1));
assertThat(executor.get(0).task.isCompleted(), is(false));
// Start another task on this node
state = newClusterState;
@ -157,10 +158,15 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
// Make sure action was called this time
assertThat(executor.size(), equalTo(2));
assertThat(executor.get(1).task.isCompleted(), is(false));
// Finish both tasks
executor.get(0).task.markAsFailed(new RuntimeException());
executor.get(1).task.markAsCompleted();
assertThat(executor.get(0).task.isCompleted(), is(true));
assertThat(executor.get(1).task.isCompleted(), is(true));
String failedTaskId = executor.get(0).task.getPersistentTaskId();
String finishedTaskId = executor.get(1).task.getPersistentTaskId();
executor.clear();
@ -186,7 +192,6 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
// Make sure action was only allocated on this node once
assertThat(executor.size(), equalTo(1));
}
}
public void testParamsStatusAndNodeTaskAreDelegated() throws Exception {
@ -300,7 +305,6 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
// Check the the task is now removed from task manager
assertThat(taskManager.getTasks().values(), empty());
}
private <Params extends PersistentTaskParams> ClusterState addTask(ClusterState state, String action, Params params,