YARN-4380. TestResourceLocalizationService.testDownloadingResourcesOnContainerKill fails intermittently. Contributed by Varun Saxena.
This commit is contained in:
parent
3e85542a7a
commit
0656d2dc83
|
@ -1106,6 +1106,9 @@ Release 2.7.3 - UNRELEASED
|
|||
YARN-4365. FileSystemNodeLabelStore should check for root dir existence on
|
||||
startup (Kuhu Shukla via jlowe)
|
||||
|
||||
YARN-4380. TestResourceLocalizationService.testDownloadingResourcesOnContainerKill
|
||||
fails intermittently. (Varun Saxena via ozawa)
|
||||
|
||||
Release 2.7.2 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -63,6 +63,7 @@ import java.util.Set;
|
|||
import java.util.concurrent.BrokenBarrierException;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.hadoop.fs.Options;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
|
||||
|
@ -1101,14 +1102,21 @@ public class TestResourceLocalizationService {
|
|||
|
||||
private static class DummyExecutor extends DefaultContainerExecutor {
|
||||
private volatile boolean stopLocalization = false;
|
||||
private AtomicInteger numLocalizers = new AtomicInteger(0);
|
||||
@Override
|
||||
public void startLocalizer(LocalizerStartContext ctx)
|
||||
throws IOException, InterruptedException {
|
||||
numLocalizers.incrementAndGet();
|
||||
while (!stopLocalization) {
|
||||
Thread.yield();
|
||||
}
|
||||
}
|
||||
void setStopLocalization() {
|
||||
private void waitForLocalizers(int num) {
|
||||
while (numLocalizers.intValue() < num) {
|
||||
Thread.yield();
|
||||
}
|
||||
}
|
||||
private void setStopLocalization() {
|
||||
stopLocalization = true;
|
||||
}
|
||||
}
|
||||
|
@ -1251,6 +1259,10 @@ public class TestResourceLocalizationService {
|
|||
spyService.handle(new ContainerLocalizationRequestEvent(c2, rsrcs1));
|
||||
|
||||
dispatcher.await();
|
||||
// Wait for localizers of both container c1 and c2 to begin.
|
||||
exec.waitForLocalizers(2);
|
||||
LocalizerRunner locC1 =
|
||||
spyService.getLocalizerRunner(c1.getContainerId().toString());
|
||||
final String containerIdStr = c1.getContainerId().toString();
|
||||
// Heartbeats from container localizer
|
||||
LocalResourceStatus rsrc1success = mock(LocalResourceStatus.class);
|
||||
|
@ -1318,6 +1330,10 @@ public class TestResourceLocalizationService {
|
|||
Set<Path> paths =
|
||||
Sets.newHashSet(new Path(locPath1), new Path(locPath1 + "_tmp"),
|
||||
new Path(locPath2), new Path(locPath2 + "_tmp"));
|
||||
// Wait for localizer runner thread for container c1 to finish.
|
||||
while (locC1.getState() != Thread.State.TERMINATED) {
|
||||
Thread.sleep(50);
|
||||
}
|
||||
// Verify if downloading resources were submitted for deletion.
|
||||
verify(delService).delete(eq(user),
|
||||
(Path) eq(null), argThat(new DownloadingPathsMatcher(paths)));
|
||||
|
|
Loading…
Reference in New Issue