Persistent Tasks: check the current state in waitForPersistentTaskStatus (elastic/x-pack-elasticsearch#935)

Add a check for the current state waitForPersistentTaskStatus before waiting for the next one. This fixes sporadic failure in testPersistentActionStatusUpdate test.

relates elastic/x-pack-elasticsearch#928

Original commit: elastic/x-pack-elasticsearch@0db4ac92d2
This commit is contained in:
Igor Motov 2017-04-04 09:44:56 -04:00 committed by GitHub
parent 5716b2bf16
commit eb79be392c

View File

@ -117,28 +117,32 @@ public class PersistentTasksService extends AbstractComponent {
} }
/** /**
* Waits for the persistent task with giving id (taskId) to achieve the desired status. * Checks if the persistent task with giving id (taskId) has the desired state and if it doesn't
* waits of it.
*/ */
public void waitForPersistentTaskStatus(long taskId, Predicate<PersistentTask<?>> predicate, @Nullable TimeValue timeout, public void waitForPersistentTaskStatus(long taskId, Predicate<PersistentTask<?>> predicate, @Nullable TimeValue timeout,
WaitForPersistentTaskStatusListener listener) { WaitForPersistentTaskStatusListener listener) {
ClusterStateObserver stateObserver = new ClusterStateObserver(clusterService, timeout, logger, threadPool.getThreadContext()); ClusterStateObserver stateObserver = new ClusterStateObserver(clusterService, timeout, logger, threadPool.getThreadContext());
stateObserver.waitForNextChange(new ClusterStateObserver.Listener() { if (predicate.test(PersistentTasksCustomMetaData.getTaskWithId(stateObserver.setAndGetObservedState(), taskId))) {
@Override listener.onResponse(taskId);
public void onNewClusterState(ClusterState state) { } else {
listener.onResponse(taskId); stateObserver.waitForNextChange(new ClusterStateObserver.Listener() {
} @Override
public void onNewClusterState(ClusterState state) {
listener.onResponse(taskId);
}
@Override @Override
public void onClusterServiceClose() { public void onClusterServiceClose() {
listener.onFailure(new NodeClosedException(clusterService.localNode())); listener.onFailure(new NodeClosedException(clusterService.localNode()));
}
} @Override
public void onTimeout(TimeValue timeout) {
@Override listener.onTimeout(timeout);
public void onTimeout(TimeValue timeout) { }
listener.onTimeout(timeout); }, clusterState -> predicate.test(PersistentTasksCustomMetaData.getTaskWithId(clusterState, taskId)));
} }
}, clusterState -> predicate.test(PersistentTasksCustomMetaData.getTaskWithId(clusterState, taskId)));
} }
public interface WaitForPersistentTaskStatusListener extends PersistentTaskOperationListener { public interface WaitForPersistentTaskStatusListener extends PersistentTaskOperationListener {