Persistent Tasks: check the current state in waitForPersistentTaskStatus (#935)
Add a check for the current state waitForPersistentTaskStatus before waiting for the next one. This fixes sporadic failure in testPersistentActionStatusUpdate test. Fixes #928
This commit is contained in:
parent
a5acb556b0
commit
5b45b167bd
|
@ -130,11 +130,15 @@ public class PersistentTasksService extends AbstractComponent {
|
|||
}
|
||||
|
||||
/**
|
||||
* Waits for the persistent task with giving id (taskId) to achieve the desired status.
|
||||
* Checks if the persistent task with giving id (taskId) has the desired state and if it doesn't
|
||||
* waits of it.
|
||||
*/
|
||||
public void waitForPersistentTaskStatus(long taskId, Predicate<PersistentTask<?>> predicate, @Nullable TimeValue timeout,
|
||||
WaitForPersistentTaskStatusListener listener) {
|
||||
ClusterStateObserver stateObserver = new ClusterStateObserver(clusterService, timeout, logger, threadPool.getThreadContext());
|
||||
if (predicate.test(PersistentTasksCustomMetaData.getTaskWithId(stateObserver.setAndGetObservedState(), taskId))) {
|
||||
listener.onResponse(taskId);
|
||||
} else {
|
||||
stateObserver.waitForNextChange(new ClusterStateObserver.Listener() {
|
||||
@Override
|
||||
public void onNewClusterState(ClusterState state) {
|
||||
|
@ -144,7 +148,6 @@ public class PersistentTasksService extends AbstractComponent {
|
|||
@Override
|
||||
public void onClusterServiceClose() {
|
||||
listener.onFailure(new NodeClosedException(clusterService.localNode()));
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -153,6 +156,7 @@ public class PersistentTasksService extends AbstractComponent {
|
|||
}
|
||||
}, clusterState -> predicate.test(PersistentTasksCustomMetaData.getTaskWithId(clusterState, taskId)));
|
||||
}
|
||||
}
|
||||
|
||||
public interface WaitForPersistentTaskStatusListener extends PersistentTaskOperationListener {
|
||||
default void onTimeout(TimeValue timeout) {
|
||||
|
|
Loading…
Reference in New Issue