Fix testTrackingChannelTask (#57061)

A task might not be canceled on disconnection if it is completed before the cancellation
is started. We need to relax the assertion in this test.

Closes #56746
This commit is contained in:
Nhat Nguyen 2020-05-25 09:35:56 -04:00
parent ea2012778e
commit 4511611802
1 changed files with 6 additions and 4 deletions

View File

@ -48,6 +48,8 @@ import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.in;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
public class TaskManagerTests extends ESTestCase { public class TaskManagerTests extends ESTestCase {
@ -76,10 +78,9 @@ public class TaskManagerTests extends ESTestCase {
assertEquals(600000L, total); assertEquals(600000L, total);
} }
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/56746")
public void testTrackingChannelTask() throws Exception { public void testTrackingChannelTask() throws Exception {
final TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()); final TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet());
Set<CancellableTask> cancelledTasks = ConcurrentCollections.newConcurrentSet(); Set<Task> cancelledTasks = ConcurrentCollections.newConcurrentSet();
taskManager.setTaskCancellationService(new TaskCancellationService(mock(TransportService.class)) { taskManager.setTaskCancellationService(new TaskCancellationService(mock(TransportService.class)) {
@Override @Override
void cancelTaskAndDescendants(CancellableTask task, String reason, boolean waitForCompletion, ActionListener<Void> listener) { void cancelTaskAndDescendants(CancellableTask task, String reason, boolean waitForCompletion, ActionListener<Void> listener) {
@ -111,13 +112,14 @@ public class TaskManagerTests extends ESTestCase {
pendingTasks.computeIfAbsent(channel, k -> new HashSet<>()).add(task); pendingTasks.computeIfAbsent(channel, k -> new HashSet<>()).add(task);
stopTrackingTasks.add(() -> { stopTrackingTasks.add(() -> {
stopTracking.close(); stopTracking.close();
pendingTasks.get(channel).remove(task); assertTrue(pendingTasks.get(channel).remove(task));
expectedCancelledTasks.remove(task);
}); });
} else { } else {
expectedCancelledTasks.add(task); expectedCancelledTasks.add(task);
} }
} }
assertBusy(() -> assertThat(cancelledTasks, equalTo(expectedCancelledTasks)), 30, TimeUnit.SECONDS); assertBusy(() -> assertThat(expectedCancelledTasks, everyItem(in(cancelledTasks))), 30, TimeUnit.SECONDS);
for (FakeTcpChannel channel : channels) { for (FakeTcpChannel channel : channels) {
channel.close(); channel.close();
} }