YARN-9527. Prevent rogue Localizer Runner from downloading same file repeatly.
Contributed by Jim Brennan
This commit is contained in:
parent
a79564fed0
commit
6ff0453ede
|
@ -141,6 +141,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.Re
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
|
import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerBuilderUtils;
|
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerBuilderUtils;
|
||||||
import org.apache.hadoop.yarn.util.FSDownload;
|
import org.apache.hadoop.yarn.util.FSDownload;
|
||||||
|
import org.apache.hadoop.yarn.util.LRUCacheHashMap;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.cache.CacheBuilder;
|
import com.google.common.cache.CacheBuilder;
|
||||||
|
@ -722,6 +723,8 @@ public class ResourceLocalizationService extends CompositeService
|
||||||
|
|
||||||
private final PublicLocalizer publicLocalizer;
|
private final PublicLocalizer publicLocalizer;
|
||||||
private final Map<String,LocalizerRunner> privLocalizers;
|
private final Map<String,LocalizerRunner> privLocalizers;
|
||||||
|
private final Map<String, String> recentlyCleanedLocalizers;
|
||||||
|
private final int maxRecentlyCleaned = 128;
|
||||||
|
|
||||||
LocalizerTracker(Configuration conf) {
|
LocalizerTracker(Configuration conf) {
|
||||||
this(conf, new HashMap<String,LocalizerRunner>());
|
this(conf, new HashMap<String,LocalizerRunner>());
|
||||||
|
@ -732,6 +735,8 @@ public class ResourceLocalizationService extends CompositeService
|
||||||
super(LocalizerTracker.class.getName());
|
super(LocalizerTracker.class.getName());
|
||||||
this.publicLocalizer = new PublicLocalizer(conf);
|
this.publicLocalizer = new PublicLocalizer(conf);
|
||||||
this.privLocalizers = privLocalizers;
|
this.privLocalizers = privLocalizers;
|
||||||
|
this.recentlyCleanedLocalizers =
|
||||||
|
new LRUCacheHashMap<String, String>(maxRecentlyCleaned, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -783,14 +788,24 @@ public class ResourceLocalizationService extends CompositeService
|
||||||
synchronized (privLocalizers) {
|
synchronized (privLocalizers) {
|
||||||
LocalizerRunner localizer = privLocalizers.get(locId);
|
LocalizerRunner localizer = privLocalizers.get(locId);
|
||||||
if (localizer != null && localizer.killContainerLocalizer.get()) {
|
if (localizer != null && localizer.killContainerLocalizer.get()) {
|
||||||
// Old localizer thread has been stopped, remove it and creates
|
// Old localizer thread has been stopped, remove it and create
|
||||||
// a new localizer thread.
|
// a new localizer thread.
|
||||||
LOG.info("New " + event.getType() + " localize request for "
|
LOG.info("New " + event.getType() + " localize request for "
|
||||||
+ locId + ", remove old private localizer.");
|
+ locId + ", remove old private localizer.");
|
||||||
cleanupPrivLocalizers(locId);
|
privLocalizers.remove(locId);
|
||||||
|
localizer.interrupt();
|
||||||
localizer = null;
|
localizer = null;
|
||||||
}
|
}
|
||||||
if (null == localizer) {
|
if (null == localizer) {
|
||||||
|
// Don't create a new localizer if this one has been recently
|
||||||
|
// cleaned up - this can happen if localization requests come
|
||||||
|
// in after cleanupPrivLocalizers has been called.
|
||||||
|
if (recentlyCleanedLocalizers.containsKey(locId)) {
|
||||||
|
LOG.info(
|
||||||
|
"Skipping localization request for recently cleaned " +
|
||||||
|
"localizer " + locId + " resource:" + req.getResource());
|
||||||
|
break;
|
||||||
|
}
|
||||||
LOG.info("Created localizer for " + locId);
|
LOG.info("Created localizer for " + locId);
|
||||||
localizer = new LocalizerRunner(req.getContext(), locId);
|
localizer = new LocalizerRunner(req.getContext(), locId);
|
||||||
privLocalizers.put(locId, localizer);
|
privLocalizers.put(locId, localizer);
|
||||||
|
@ -808,6 +823,7 @@ public class ResourceLocalizationService extends CompositeService
|
||||||
public void cleanupPrivLocalizers(String locId) {
|
public void cleanupPrivLocalizers(String locId) {
|
||||||
synchronized (privLocalizers) {
|
synchronized (privLocalizers) {
|
||||||
LocalizerRunner localizer = privLocalizers.get(locId);
|
LocalizerRunner localizer = privLocalizers.get(locId);
|
||||||
|
recentlyCleanedLocalizers.put(locId, locId);
|
||||||
if (null == localizer) {
|
if (null == localizer) {
|
||||||
return; // ignore; already gone
|
return; // ignore; already gone
|
||||||
}
|
}
|
||||||
|
@ -1047,44 +1063,74 @@ public class ResourceLocalizationService extends CompositeService
|
||||||
*
|
*
|
||||||
* @return the next resource to be localized
|
* @return the next resource to be localized
|
||||||
*/
|
*/
|
||||||
private LocalResource findNextResource() {
|
private ResourceLocalizationSpec findNextResource(
|
||||||
|
String user, ApplicationId applicationId) {
|
||||||
synchronized (pending) {
|
synchronized (pending) {
|
||||||
for (Iterator<LocalizerResourceRequestEvent> i = pending.iterator();
|
for (Iterator<LocalizerResourceRequestEvent> i = pending.iterator();
|
||||||
i.hasNext();) {
|
i.hasNext();) {
|
||||||
LocalizerResourceRequestEvent evt = i.next();
|
LocalizerResourceRequestEvent evt = i.next();
|
||||||
LocalizedResource nRsrc = evt.getResource();
|
LocalizedResource nRsrc = evt.getResource();
|
||||||
// Resource download should take place ONLY if resource is in
|
// Resource download should take place ONLY if resource is in
|
||||||
// Downloading state
|
// Downloading state
|
||||||
if (nRsrc.getState() != ResourceState.DOWNLOADING) {
|
if (nRsrc.getState() != ResourceState.DOWNLOADING) {
|
||||||
i.remove();
|
i.remove();
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
/*
|
/*
|
||||||
* Multiple containers will try to download the same resource. So the
|
* Multiple containers will try to download the same resource. So the
|
||||||
* resource download should start only if
|
* resource download should start only if
|
||||||
* 1) We can acquire a non blocking semaphore lock on resource
|
* 1) We can acquire a non blocking semaphore lock on resource
|
||||||
* 2) Resource is still in DOWNLOADING state
|
* 2) Resource is still in DOWNLOADING state
|
||||||
*/
|
*/
|
||||||
if (nRsrc.tryAcquire()) {
|
if (nRsrc.tryAcquire()) {
|
||||||
if (nRsrc.getState() == ResourceState.DOWNLOADING) {
|
if (nRsrc.getState() == ResourceState.DOWNLOADING) {
|
||||||
LocalResourceRequest nextRsrc = nRsrc.getRequest();
|
LocalResourceRequest nextRsrc = nRsrc.getRequest();
|
||||||
LocalResource next =
|
LocalResource next =
|
||||||
recordFactory.newRecordInstance(LocalResource.class);
|
recordFactory.newRecordInstance(LocalResource.class);
|
||||||
next.setResource(URL.fromPath(nextRsrc
|
next.setResource(URL.fromPath(nextRsrc.getPath()));
|
||||||
.getPath()));
|
next.setTimestamp(nextRsrc.getTimestamp());
|
||||||
next.setTimestamp(nextRsrc.getTimestamp());
|
next.setType(nextRsrc.getType());
|
||||||
next.setType(nextRsrc.getType());
|
next.setVisibility(evt.getVisibility());
|
||||||
next.setVisibility(evt.getVisibility());
|
next.setPattern(evt.getPattern());
|
||||||
next.setPattern(evt.getPattern());
|
ResourceLocalizationSpec nextSpec = null;
|
||||||
scheduled.put(nextRsrc, evt);
|
try {
|
||||||
return next;
|
LocalResourcesTracker tracker = getLocalResourcesTracker(
|
||||||
} else {
|
next.getVisibility(), user, applicationId);
|
||||||
// Need to release acquired lock
|
if (tracker != null) {
|
||||||
nRsrc.unlock();
|
Path localPath = getPathForLocalization(next, tracker);
|
||||||
}
|
if (localPath != null) {
|
||||||
}
|
nextSpec = NodeManagerBuilderUtils.
|
||||||
}
|
newResourceLocalizationSpec(next, localPath);
|
||||||
return null;
|
}
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("local path for PRIVATE localization could not be " +
|
||||||
|
"found. Disks might have failed.", e);
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
LOG.error("Incorrect path for PRIVATE localization."
|
||||||
|
+ next.getResource().getFile(), e);
|
||||||
|
} catch (URISyntaxException e) {
|
||||||
|
LOG.error(
|
||||||
|
"Got exception in parsing URL of LocalResource:"
|
||||||
|
+ next.getResource(), e);
|
||||||
|
}
|
||||||
|
if (nextSpec != null) {
|
||||||
|
scheduled.put(nextRsrc, evt);
|
||||||
|
return nextSpec;
|
||||||
|
} else {
|
||||||
|
// We failed to get a path for this, don't try to localize this
|
||||||
|
// resource again.
|
||||||
|
nRsrc.unlock();
|
||||||
|
i.remove();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Need to release acquired lock
|
||||||
|
nRsrc.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1170,29 +1216,9 @@ public class ResourceLocalizationService extends CompositeService
|
||||||
* TODO : It doesn't support multiple downloads per ContainerLocalizer
|
* TODO : It doesn't support multiple downloads per ContainerLocalizer
|
||||||
* at the same time. We need to think whether we should support this.
|
* at the same time. We need to think whether we should support this.
|
||||||
*/
|
*/
|
||||||
LocalResource next = findNextResource();
|
ResourceLocalizationSpec next = findNextResource(user, applicationId);
|
||||||
if (next != null) {
|
if (next != null) {
|
||||||
try {
|
rsrcs.add(next);
|
||||||
LocalResourcesTracker tracker = getLocalResourcesTracker(
|
|
||||||
next.getVisibility(), user, applicationId);
|
|
||||||
if (tracker != null) {
|
|
||||||
Path localPath = getPathForLocalization(next, tracker);
|
|
||||||
if (localPath != null) {
|
|
||||||
rsrcs.add(NodeManagerBuilderUtils.newResourceLocalizationSpec(
|
|
||||||
next, localPath));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.error("local path for PRIVATE localization could not be " +
|
|
||||||
"found. Disks might have failed.", e);
|
|
||||||
} catch (IllegalArgumentException e) {
|
|
||||||
LOG.error("Incorrect path for PRIVATE localization."
|
|
||||||
+ next.getResource().getFile(), e);
|
|
||||||
} catch (URISyntaxException e) {
|
|
||||||
LOG.error(
|
|
||||||
"Got exception in parsing URL of LocalResource:"
|
|
||||||
+ next.getResource(), e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
response.setLocalizerAction(LocalizerAction.LIVE);
|
response.setLocalizerAction(LocalizerAction.LIVE);
|
||||||
|
|
|
@ -140,6 +140,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
|
||||||
|
@ -1108,9 +1110,21 @@ public class TestResourceLocalizationService {
|
||||||
Thread.yield();
|
Thread.yield();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
private void yieldForLocalizers(int num) {
|
||||||
|
for (int i = 0; i < num; i++) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(100);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
private void setStopLocalization() {
|
private void setStopLocalization() {
|
||||||
stopLocalization = true;
|
stopLocalization = true;
|
||||||
}
|
}
|
||||||
|
private int getNumLocalizers() {
|
||||||
|
return numLocalizers.get();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 20000)
|
@Test(timeout = 20000)
|
||||||
|
@ -1137,7 +1151,8 @@ public class TestResourceLocalizationService {
|
||||||
|
|
||||||
ResourceLocalizationService spyService = spy(rawService);
|
ResourceLocalizationService spyService = spy(rawService);
|
||||||
doReturn(mockServer).when(spyService).createServer();
|
doReturn(mockServer).when(spyService).createServer();
|
||||||
doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class));
|
doReturn(lfs).when(spyService).
|
||||||
|
getLocalFileContext(isA(Configuration.class));
|
||||||
FsPermission defaultPermission =
|
FsPermission defaultPermission =
|
||||||
FsPermission.getDirDefault().applyUMask(lfs.getUMask());
|
FsPermission.getDirDefault().applyUMask(lfs.getUMask());
|
||||||
FsPermission nmPermission =
|
FsPermission nmPermission =
|
||||||
|
@ -1184,6 +1199,78 @@ public class TestResourceLocalizationService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testResourceLocalizationReqsAfterContainerKill()
|
||||||
|
throws Exception {
|
||||||
|
List<Path> localDirs = new ArrayList<Path>();
|
||||||
|
String[] sDirs = new String[1];
|
||||||
|
localDirs.add(lfs.makeQualified(new Path(basedir, 0 + "")));
|
||||||
|
sDirs[0] = localDirs.get(0).toString();
|
||||||
|
|
||||||
|
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
|
||||||
|
|
||||||
|
DummyExecutor exec = new DummyExecutor();
|
||||||
|
LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
|
||||||
|
dirsHandler.init(conf);
|
||||||
|
|
||||||
|
DeletionService delServiceReal = new DeletionService(exec);
|
||||||
|
DeletionService delService = spy(delServiceReal);
|
||||||
|
delService.init(new Configuration());
|
||||||
|
delService.start();
|
||||||
|
|
||||||
|
DrainDispatcher dispatcher = getDispatcher(conf);
|
||||||
|
ResourceLocalizationService rawService = new ResourceLocalizationService(
|
||||||
|
dispatcher, exec, delService, dirsHandler, nmContext, metrics);
|
||||||
|
|
||||||
|
ResourceLocalizationService spyService = spy(rawService);
|
||||||
|
doReturn(mockServer).when(spyService).createServer();
|
||||||
|
doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class));
|
||||||
|
FsPermission defaultPermission =
|
||||||
|
FsPermission.getDirDefault().applyUMask(lfs.getUMask());
|
||||||
|
FsPermission nmPermission =
|
||||||
|
ResourceLocalizationService.NM_PRIVATE_PERM.applyUMask(lfs.getUMask());
|
||||||
|
final Path userDir =
|
||||||
|
new Path(sDirs[0].substring("file:".length()),
|
||||||
|
ContainerLocalizer.USERCACHE);
|
||||||
|
final Path fileDir =
|
||||||
|
new Path(sDirs[0].substring("file:".length()),
|
||||||
|
ContainerLocalizer.FILECACHE);
|
||||||
|
final Path sysDir =
|
||||||
|
new Path(sDirs[0].substring("file:".length()),
|
||||||
|
ResourceLocalizationService.NM_PRIVATE_DIR);
|
||||||
|
final FileStatus fs =
|
||||||
|
new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
|
||||||
|
defaultPermission, "", "", new Path(sDirs[0]));
|
||||||
|
final FileStatus nmFs =
|
||||||
|
new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
|
||||||
|
nmPermission, "", "", sysDir);
|
||||||
|
|
||||||
|
doAnswer(new Answer<FileStatus>() {
|
||||||
|
@Override
|
||||||
|
public FileStatus answer(InvocationOnMock invocation) throws Throwable {
|
||||||
|
Object[] args = invocation.getArguments();
|
||||||
|
if (args.length > 0) {
|
||||||
|
if (args[0].equals(userDir) || args[0].equals(fileDir)) {
|
||||||
|
return fs;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nmFs;
|
||||||
|
}
|
||||||
|
}).when(spylfs).getFileStatus(isA(Path.class));
|
||||||
|
|
||||||
|
try {
|
||||||
|
spyService.init(conf);
|
||||||
|
spyService.start();
|
||||||
|
|
||||||
|
doLocalizationAfterCleanup(spyService, dispatcher, exec, delService);
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
spyService.stop();
|
||||||
|
dispatcher.stop();
|
||||||
|
delService.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private DrainDispatcher getDispatcher(Configuration config) {
|
private DrainDispatcher getDispatcher(Configuration config) {
|
||||||
DrainDispatcher dispatcher = new DrainDispatcher();
|
DrainDispatcher dispatcher = new DrainDispatcher();
|
||||||
dispatcher.init(config);
|
dispatcher.init(config);
|
||||||
|
@ -1342,6 +1429,149 @@ public class TestResourceLocalizationService {
|
||||||
assertNull(rsrc3);
|
assertNull(rsrc3);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void doLocalizationAfterCleanup(
|
||||||
|
ResourceLocalizationService spyService,
|
||||||
|
DrainDispatcher dispatcher, DummyExecutor exec,
|
||||||
|
DeletionService delService)
|
||||||
|
throws IOException, URISyntaxException, InterruptedException {
|
||||||
|
final Application app = mock(Application.class);
|
||||||
|
final ApplicationId appId =
|
||||||
|
BuilderUtils.newApplicationId(314159265358979L, 3);
|
||||||
|
String user = "user0";
|
||||||
|
when(app.getUser()).thenReturn(user);
|
||||||
|
when(app.getAppId()).thenReturn(appId);
|
||||||
|
List<LocalResource> resources = initializeLocalizer(appId);
|
||||||
|
LocalResource resource1 = resources.get(0);
|
||||||
|
LocalResource resource2 = resources.get(1);
|
||||||
|
LocalResource resource3 = resources.get(2);
|
||||||
|
final Container c1 = getMockContainer(appId, 42, "user0");
|
||||||
|
final Container c2 = getMockContainer(appId, 43, "user0");
|
||||||
|
|
||||||
|
EventHandler<ApplicationEvent> applicationBus =
|
||||||
|
getApplicationBus(dispatcher);
|
||||||
|
EventHandler<ContainerEvent> containerBus = getContainerBus(dispatcher);
|
||||||
|
initApp(spyService, applicationBus, app, appId, dispatcher);
|
||||||
|
|
||||||
|
// Send localization requests for container c1 and c2.
|
||||||
|
final LocalResourceRequest req1 = new LocalResourceRequest(resource1);
|
||||||
|
final LocalResourceRequest req2 = new LocalResourceRequest(resource2);
|
||||||
|
final LocalResourceRequest req3 = new LocalResourceRequest(resource3);
|
||||||
|
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
|
||||||
|
new HashMap<LocalResourceVisibility,
|
||||||
|
Collection<LocalResourceRequest>>();
|
||||||
|
List<LocalResourceRequest> privateResourceList =
|
||||||
|
new ArrayList<LocalResourceRequest>();
|
||||||
|
rsrcs.put(LocalResourceVisibility.PRIVATE, privateResourceList);
|
||||||
|
|
||||||
|
// Start Localization without any resources (so we can simulate the
|
||||||
|
// resource requests being delayed until after cleanup.
|
||||||
|
spyService.handle(new ContainerLocalizationRequestEvent(c1, rsrcs));
|
||||||
|
dispatcher.await();
|
||||||
|
|
||||||
|
// Kill c1 which leads to cleanup
|
||||||
|
spyService.handle(new ContainerLocalizationCleanupEvent(c1, rsrcs));
|
||||||
|
dispatcher.await();
|
||||||
|
|
||||||
|
// Now we will send the resource requests and releases directly to tracker
|
||||||
|
privateResourceList.add(req1);
|
||||||
|
privateResourceList.add(req2);
|
||||||
|
privateResourceList.add(req3);
|
||||||
|
|
||||||
|
rsrcs.put(LocalResourceVisibility.PRIVATE, privateResourceList);
|
||||||
|
LocalizerContext locCtx =
|
||||||
|
new LocalizerContext(user, c1.getContainerId(), c1.getCredentials());
|
||||||
|
LocalResourcesTracker tracker =
|
||||||
|
spyService.getLocalResourcesTracker(LocalResourceVisibility.PRIVATE,
|
||||||
|
user, null);
|
||||||
|
for (LocalResourceRequest req : privateResourceList) {
|
||||||
|
tracker.handle(
|
||||||
|
new ResourceRequestEvent(req, LocalResourceVisibility.PRIVATE,
|
||||||
|
locCtx));
|
||||||
|
}
|
||||||
|
dispatcher.await();
|
||||||
|
for (LocalResourceRequest req : privateResourceList) {
|
||||||
|
tracker.handle(
|
||||||
|
new ResourceReleaseEvent(req, c1.getContainerId()));
|
||||||
|
}
|
||||||
|
dispatcher.await();
|
||||||
|
|
||||||
|
// Now start a second container with the same list of resources
|
||||||
|
spyService.handle(new ContainerLocalizationRequestEvent(c2, rsrcs));
|
||||||
|
dispatcher.await();
|
||||||
|
|
||||||
|
// Wait for localizers to begin (should only be one for container2)
|
||||||
|
exec.yieldForLocalizers(2);
|
||||||
|
assertThat(exec.getNumLocalizers()).isEqualTo(1);
|
||||||
|
|
||||||
|
LocalizerRunner locC2 =
|
||||||
|
spyService.getLocalizerRunner(c2.getContainerId().toString());
|
||||||
|
LocalizerStatus stat = mockLocalizerStatus(c2, resource1, resource2);
|
||||||
|
|
||||||
|
// First heartbeat which schedules first resource.
|
||||||
|
LocalizerHeartbeatResponse response = spyService.heartbeat(stat);
|
||||||
|
assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
|
||||||
|
|
||||||
|
// Second heartbeat which reports first resource as success.
|
||||||
|
// Second resource is scheduled.
|
||||||
|
response = spyService.heartbeat(stat);
|
||||||
|
assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
|
||||||
|
final String locPath1 =
|
||||||
|
response.getResourceSpecs().get(0).getDestinationDirectory().getFile();
|
||||||
|
|
||||||
|
// Third heartbeat which reports second resource as pending.
|
||||||
|
// Third resource is scheduled.
|
||||||
|
response = spyService.heartbeat(stat);
|
||||||
|
assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
|
||||||
|
final String locPath2 =
|
||||||
|
response.getResourceSpecs().get(0).getDestinationDirectory().getFile();
|
||||||
|
|
||||||
|
// Container c2 is killed which leads to cleanup
|
||||||
|
spyService.handle(new ContainerLocalizationCleanupEvent(c2, rsrcs));
|
||||||
|
|
||||||
|
// This heartbeat will indicate to container localizer to die as localizer
|
||||||
|
// runner has stopped.
|
||||||
|
response = spyService.heartbeat(stat);
|
||||||
|
assertEquals(LocalizerAction.DIE, response.getLocalizerAction());
|
||||||
|
|
||||||
|
exec.setStopLocalization();
|
||||||
|
dispatcher.await();
|
||||||
|
|
||||||
|
// verify container notification
|
||||||
|
ArgumentMatcher<ContainerEvent> successContainerLoc =
|
||||||
|
evt -> evt.getType() == ContainerEventType.RESOURCE_LOCALIZED
|
||||||
|
&& c2.getContainerId() == evt.getContainerID();
|
||||||
|
// Only one resource gets localized for container c2.
|
||||||
|
verify(containerBus).handle(argThat(successContainerLoc));
|
||||||
|
|
||||||
|
Set<Path> paths =
|
||||||
|
Sets.newHashSet(new Path(locPath1), new Path(locPath1 + "_tmp"),
|
||||||
|
new Path(locPath2), new Path(locPath2 + "_tmp"));
|
||||||
|
// Wait for localizer runner thread for container c1 to finish.
|
||||||
|
while (locC2.getState() != Thread.State.TERMINATED) {
|
||||||
|
Thread.sleep(50);
|
||||||
|
}
|
||||||
|
// Verify if downloading resources were submitted for deletion.
|
||||||
|
verify(delService, times(3)).delete(argThat(new FileDeletionMatcher(
|
||||||
|
delService, user, null, new ArrayList<>(paths))));
|
||||||
|
|
||||||
|
// Container c2 was killed but this resource was localized before kill
|
||||||
|
// hence its not removed despite ref cnt being 0.
|
||||||
|
LocalizedResource rsrc1 = tracker.getLocalizedResource(req1);
|
||||||
|
assertNotNull(rsrc1);
|
||||||
|
assertThat(rsrc1.getState()).isEqualTo(ResourceState.LOCALIZED);
|
||||||
|
assertThat(rsrc1.getRefCount()).isEqualTo(0);
|
||||||
|
|
||||||
|
// Container c1 and c2 were killed before this finished downloading
|
||||||
|
// these should no longer be there.
|
||||||
|
LocalizedResource rsrc2 = tracker.getLocalizedResource(req2);
|
||||||
|
assertNull(rsrc2);
|
||||||
|
LocalizedResource rsrc3 = tracker.getLocalizedResource(req3);
|
||||||
|
assertNull(rsrc3);
|
||||||
|
|
||||||
|
// Double-check that we never created a Localizer for C1
|
||||||
|
assertThat(exec.getNumLocalizers()).isEqualTo(1);
|
||||||
|
}
|
||||||
|
|
||||||
private LocalizerStatus mockLocalizerStatus(Container c1,
|
private LocalizerStatus mockLocalizerStatus(Container c1,
|
||||||
LocalResource resource1, LocalResource resource2) {
|
LocalResource resource1, LocalResource resource2) {
|
||||||
final String containerIdStr = c1.getContainerId().toString();
|
final String containerIdStr = c1.getContainerId().toString();
|
||||||
|
|
Loading…
Reference in New Issue