YARN-4380. TestResourceLocalizationService.testDownloadingResourcesOnContainerKill fails intermittently. Contributed by Varun Saxena.
(cherry picked from commit 0656d2dc83
)
Conflicts:
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
This commit is contained in:
parent
b68f527b9f
commit
f50f889c17
|
@ -34,6 +34,9 @@ Release 2.7.3 - UNRELEASED
|
||||||
YARN-4365. FileSystemNodeLabelStore should check for root dir existence on
|
YARN-4365. FileSystemNodeLabelStore should check for root dir existence on
|
||||||
startup (Kuhu Shukla via jlowe)
|
startup (Kuhu Shukla via jlowe)
|
||||||
|
|
||||||
|
YARN-4380. TestResourceLocalizationService.testDownloadingResourcesOnContainerKill
|
||||||
|
fails intermittently. (Varun Saxena via ozawa)
|
||||||
|
|
||||||
Release 2.7.2 - UNRELEASED
|
Release 2.7.2 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -63,6 +63,7 @@ import java.util.Set;
|
||||||
import java.util.concurrent.BrokenBarrierException;
|
import java.util.concurrent.BrokenBarrierException;
|
||||||
import java.util.concurrent.CyclicBarrier;
|
import java.util.concurrent.CyclicBarrier;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.Options;
|
import org.apache.hadoop.fs.Options;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -1091,16 +1092,23 @@ public class TestResourceLocalizationService {
|
||||||
|
|
||||||
private static class DummyExecutor extends DefaultContainerExecutor {
|
private static class DummyExecutor extends DefaultContainerExecutor {
|
||||||
private volatile boolean stopLocalization = false;
|
private volatile boolean stopLocalization = false;
|
||||||
|
private AtomicInteger numLocalizers = new AtomicInteger(0);
|
||||||
@Override
|
@Override
|
||||||
public void startLocalizer(Path nmPrivateContainerTokensPath,
|
public void startLocalizer(Path nmPrivateContainerTokensPath,
|
||||||
InetSocketAddress nmAddr, String user, String appId, String locId,
|
InetSocketAddress nmAddr, String user, String appId, String locId,
|
||||||
LocalDirsHandlerService dirsHandler) throws IOException,
|
LocalDirsHandlerService dirsHandler) throws IOException,
|
||||||
InterruptedException {
|
InterruptedException {
|
||||||
|
numLocalizers.incrementAndGet();
|
||||||
while (!stopLocalization) {
|
while (!stopLocalization) {
|
||||||
Thread.yield();
|
Thread.yield();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
void setStopLocalization() {
|
private void waitForLocalizers(int num) {
|
||||||
|
while (numLocalizers.intValue() < num) {
|
||||||
|
Thread.yield();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
private void setStopLocalization() {
|
||||||
stopLocalization = true;
|
stopLocalization = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1243,6 +1251,10 @@ public class TestResourceLocalizationService {
|
||||||
spyService.handle(new ContainerLocalizationRequestEvent(c2, rsrcs1));
|
spyService.handle(new ContainerLocalizationRequestEvent(c2, rsrcs1));
|
||||||
|
|
||||||
dispatcher.await();
|
dispatcher.await();
|
||||||
|
// Wait for localizers of both container c1 and c2 to begin.
|
||||||
|
exec.waitForLocalizers(2);
|
||||||
|
LocalizerRunner locC1 =
|
||||||
|
spyService.getLocalizerRunner(c1.getContainerId().toString());
|
||||||
final String containerIdStr = c1.getContainerId().toString();
|
final String containerIdStr = c1.getContainerId().toString();
|
||||||
// Heartbeats from container localizer
|
// Heartbeats from container localizer
|
||||||
LocalResourceStatus rsrc1success = mock(LocalResourceStatus.class);
|
LocalResourceStatus rsrc1success = mock(LocalResourceStatus.class);
|
||||||
|
@ -1310,6 +1322,10 @@ public class TestResourceLocalizationService {
|
||||||
Set<Path> paths =
|
Set<Path> paths =
|
||||||
Sets.newHashSet(new Path(locPath1), new Path(locPath1 + "_tmp"),
|
Sets.newHashSet(new Path(locPath1), new Path(locPath1 + "_tmp"),
|
||||||
new Path(locPath2), new Path(locPath2 + "_tmp"));
|
new Path(locPath2), new Path(locPath2 + "_tmp"));
|
||||||
|
// Wait for localizer runner thread for container c1 to finish.
|
||||||
|
while (locC1.getState() != Thread.State.TERMINATED) {
|
||||||
|
Thread.sleep(50);
|
||||||
|
}
|
||||||
// Verify if downloading resources were submitted for deletion.
|
// Verify if downloading resources were submitted for deletion.
|
||||||
verify(delService).delete(eq(user),
|
verify(delService).delete(eq(user),
|
||||||
(Path) eq(null), argThat(new DownloadingPathsMatcher(paths)));
|
(Path) eq(null), argThat(new DownloadingPathsMatcher(paths)));
|
||||||
|
|
Loading…
Reference in New Issue