YARN-4380. TestResourceLocalizationService.testDownloadingResourcesOnContainerKill fails intermittently. Contributed by Varun Saxena.

(cherry picked from commit 0656d2dc83)
This commit is contained in:
Tsuyoshi Ozawa 2015-11-26 01:10:02 +09:00
parent 5794dc83b0
commit d76b523b02
2 changed files with 20 additions and 1 deletions

View File

@ -1054,6 +1054,9 @@ Release 2.7.3 - UNRELEASED
YARN-4365. FileSystemNodeLabelStore should check for root dir existence on YARN-4365. FileSystemNodeLabelStore should check for root dir existence on
startup (Kuhu Shukla via jlowe) startup (Kuhu Shukla via jlowe)
YARN-4380. TestResourceLocalizationService.testDownloadingResourcesOnContainerKill
fails intermittently. (Varun Saxena via ozawa)
Release 2.7.2 - UNRELEASED Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -63,6 +63,7 @@ import java.util.Set;
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Options;
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext; import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
@ -1101,14 +1102,21 @@ public class TestResourceLocalizationService {
private static class DummyExecutor extends DefaultContainerExecutor { private static class DummyExecutor extends DefaultContainerExecutor {
private volatile boolean stopLocalization = false; private volatile boolean stopLocalization = false;
private AtomicInteger numLocalizers = new AtomicInteger(0);
@Override @Override
public void startLocalizer(LocalizerStartContext ctx) public void startLocalizer(LocalizerStartContext ctx)
throws IOException, InterruptedException { throws IOException, InterruptedException {
numLocalizers.incrementAndGet();
while (!stopLocalization) { while (!stopLocalization) {
Thread.yield(); Thread.yield();
} }
} }
void setStopLocalization() { private void waitForLocalizers(int num) {
while (numLocalizers.intValue() < num) {
Thread.yield();
}
}
private void setStopLocalization() {
stopLocalization = true; stopLocalization = true;
} }
} }
@ -1251,6 +1259,10 @@ public class TestResourceLocalizationService {
spyService.handle(new ContainerLocalizationRequestEvent(c2, rsrcs1)); spyService.handle(new ContainerLocalizationRequestEvent(c2, rsrcs1));
dispatcher.await(); dispatcher.await();
// Wait for localizers of both container c1 and c2 to begin.
exec.waitForLocalizers(2);
LocalizerRunner locC1 =
spyService.getLocalizerRunner(c1.getContainerId().toString());
final String containerIdStr = c1.getContainerId().toString(); final String containerIdStr = c1.getContainerId().toString();
// Heartbeats from container localizer // Heartbeats from container localizer
LocalResourceStatus rsrc1success = mock(LocalResourceStatus.class); LocalResourceStatus rsrc1success = mock(LocalResourceStatus.class);
@ -1318,6 +1330,10 @@ public class TestResourceLocalizationService {
Set<Path> paths = Set<Path> paths =
Sets.newHashSet(new Path(locPath1), new Path(locPath1 + "_tmp"), Sets.newHashSet(new Path(locPath1), new Path(locPath1 + "_tmp"),
new Path(locPath2), new Path(locPath2 + "_tmp")); new Path(locPath2), new Path(locPath2 + "_tmp"));
// Wait for localizer runner thread for container c1 to finish.
while (locC1.getState() != Thread.State.TERMINATED) {
Thread.sleep(50);
}
// Verify if downloading resources were submitted for deletion. // Verify if downloading resources were submitted for deletion.
verify(delService).delete(eq(user), verify(delService).delete(eq(user),
(Path) eq(null), argThat(new DownloadingPathsMatcher(paths))); (Path) eq(null), argThat(new DownloadingPathsMatcher(paths)));