YARN-99. Modify private distributed cache to localize files such that no local directory hits unix file count limits and thus prevent job failures. Contributed by Omkar Vinit Joshi.

svn merge --ignore-ancestry -c 1465853 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1465854 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-04-09 01:35:05 +00:00
parent b287d8de6d
commit 335e02124b
14 changed files with 524 additions and 172 deletions

View File

@ -146,6 +146,10 @@ Release 2.0.5-beta - UNRELEASED
to implement closeable so that they can be stopped when needed via
RPC.stopProxy(). (Siddharth Seth via vinodkv)
YARN-99. Modify private distributed cache to localize files such that no
local directory hits unix file count limits and thus prevent job failures.
(Omkar Vinit Joshi via vinodkv)
Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.api;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.URL;
import com.google.common.annotations.VisibleForTesting;
@Private
@VisibleForTesting
public interface ResourceLocalizationSpec {
public void setResource(LocalResource rsrc);
public LocalResource getResource();
public void setDestinationDirectory(URL destinationDirectory);
public URL getDestinationDirectory();
}

View File

@ -0,0 +1,118 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.api.impl.pb;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.URLPBImpl;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.ResourceLocalizationSpecProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.ResourceLocalizationSpecProtoOrBuilder;
import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
public class ResourceLocalizationSpecPBImpl extends
ProtoBase<ResourceLocalizationSpecProto> implements
ResourceLocalizationSpec {
private ResourceLocalizationSpecProto proto = ResourceLocalizationSpecProto
.getDefaultInstance();
private ResourceLocalizationSpecProto.Builder builder = null;
private boolean viaProto;
private LocalResource resource = null;
private URL destinationDirectory = null;
public ResourceLocalizationSpecPBImpl() {
builder = ResourceLocalizationSpecProto.newBuilder();
}
public ResourceLocalizationSpecPBImpl(ResourceLocalizationSpecProto proto) {
this.proto = proto;
viaProto = true;
}
@Override
public LocalResource getResource() {
ResourceLocalizationSpecProtoOrBuilder p = viaProto ? proto : builder;
if (resource != null) {
return resource;
}
if (!p.hasResource()) {
return null;
}
resource = new LocalResourcePBImpl(p.getResource());
return resource;
}
@Override
public void setResource(LocalResource rsrc) {
maybeInitBuilder();
resource = rsrc;
}
@Override
public URL getDestinationDirectory() {
ResourceLocalizationSpecProtoOrBuilder p = viaProto ? proto : builder;
if (destinationDirectory != null) {
return destinationDirectory;
}
if (!p.hasDestinationDirectory()) {
return null;
}
destinationDirectory = new URLPBImpl(p.getDestinationDirectory());
return destinationDirectory;
}
@Override
public void setDestinationDirectory(URL destinationDirectory) {
maybeInitBuilder();
this.destinationDirectory = destinationDirectory;
}
@Override
public ResourceLocalizationSpecProto getProto() {
mergeLocalToBuilder();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private synchronized void maybeInitBuilder() {
if (builder == null || viaProto) {
builder = ResourceLocalizationSpecProto.newBuilder(proto);
}
viaProto = false;
}
private void mergeLocalToBuilder() {
ResourceLocalizationSpecProtoOrBuilder l = viaProto ? proto : builder;
if (this.resource != null
&& !(l.getResource()
.equals(((LocalResourcePBImpl) resource).getProto()))) {
maybeInitBuilder();
builder.setResource(((LocalResourcePBImpl) resource).getProto());
}
if (this.destinationDirectory != null
&& !(l.getDestinationDirectory()
.equals(((URLPBImpl) destinationDirectory).getProto()))) {
maybeInitBuilder();
builder.setDestinationDirectory(((URLPBImpl) destinationDirectory)
.getProto());
}
}
}

View File

@ -18,18 +18,13 @@
package org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords;
import java.util.List;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.server.nodemanager.api.*;
public interface LocalizerHeartbeatResponse {
public LocalizerAction getLocalizerAction();
public List<LocalResource> getAllResources();
public LocalResource getLocalResource(int i);
public LocalizerAction getLocalizerAction();
public void setLocalizerAction(LocalizerAction action);
public void addAllResources(List<LocalResource> resources);
public void addResource(LocalResource resource);
public void removeResource(int index);
public void clearResources();
}
public List<ResourceLocalizationSpec> getResourceSpecs();
public void setResourceSpecs(List<ResourceLocalizationSpec> rsrcs);
}

View File

@ -21,13 +21,14 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerActionProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerHeartbeatResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerHeartbeatResponseProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.ResourceLocalizationSpecProto;
import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
import org.apache.hadoop.yarn.server.nodemanager.api.impl.pb.ResourceLocalizationSpecPBImpl;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
@ -40,13 +41,14 @@ public class LocalizerHeartbeatResponsePBImpl
LocalizerHeartbeatResponseProto.Builder builder = null;
boolean viaProto = false;
private List<LocalResource> resources;
private List<ResourceLocalizationSpec> resourceSpecs;
public LocalizerHeartbeatResponsePBImpl() {
builder = LocalizerHeartbeatResponseProto.newBuilder();
}
public LocalizerHeartbeatResponsePBImpl(LocalizerHeartbeatResponseProto proto) {
public LocalizerHeartbeatResponsePBImpl(
LocalizerHeartbeatResponseProto proto) {
this.proto = proto;
viaProto = true;
}
@ -59,7 +61,7 @@ public class LocalizerHeartbeatResponsePBImpl
}
private void mergeLocalToBuilder() {
if (resources != null) {
if (resourceSpecs != null) {
addResourcesToProto();
}
}
@ -79,6 +81,7 @@ public class LocalizerHeartbeatResponsePBImpl
viaProto = false;
}
@Override
public LocalizerAction getLocalizerAction() {
LocalizerHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasAction()) {
@ -87,14 +90,10 @@ public class LocalizerHeartbeatResponsePBImpl
return convertFromProtoFormat(p.getAction());
}
public List<LocalResource> getAllResources() {
@Override
public List<ResourceLocalizationSpec> getResourceSpecs() {
initResources();
return this.resources;
}
public LocalResource getLocalResource(int i) {
initResources();
return this.resources.get(i);
return this.resourceSpecs;
}
public void setLocalizerAction(LocalizerAction action) {
@ -106,31 +105,39 @@ public class LocalizerHeartbeatResponsePBImpl
builder.setAction(convertToProtoFormat(action));
}
public void setResourceSpecs(List<ResourceLocalizationSpec> rsrcs) {
maybeInitBuilder();
if (rsrcs == null) {
builder.clearResources();
return;
}
this.resourceSpecs = rsrcs;
}
private void initResources() {
if (this.resources != null) {
if (this.resourceSpecs != null) {
return;
}
LocalizerHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
List<LocalResourceProto> list = p.getResourcesList();
this.resources = new ArrayList<LocalResource>();
for (LocalResourceProto c : list) {
this.resources.add(convertFromProtoFormat(c));
List<ResourceLocalizationSpecProto> list = p.getResourcesList();
this.resourceSpecs = new ArrayList<ResourceLocalizationSpec>();
for (ResourceLocalizationSpecProto c : list) {
this.resourceSpecs.add(convertFromProtoFormat(c));
}
}
private void addResourcesToProto() {
maybeInitBuilder();
builder.clearResources();
if (this.resources == null)
if (this.resourceSpecs == null)
return;
Iterable<LocalResourceProto> iterable =
new Iterable<LocalResourceProto>() {
Iterable<ResourceLocalizationSpecProto> iterable =
new Iterable<ResourceLocalizationSpecProto>() {
@Override
public Iterator<LocalResourceProto> iterator() {
return new Iterator<LocalResourceProto>() {
public Iterator<ResourceLocalizationSpecProto> iterator() {
return new Iterator<ResourceLocalizationSpecProto>() {
Iterator<LocalResource> iter = resources.iterator();
Iterator<ResourceLocalizationSpec> iter = resourceSpecs.iterator();
@Override
public boolean hasNext() {
@ -138,8 +145,10 @@ public class LocalizerHeartbeatResponsePBImpl
}
@Override
public LocalResourceProto next() {
return convertToProtoFormat(iter.next());
public ResourceLocalizationSpecProto next() {
ResourceLocalizationSpec resource = iter.next();
return ((ResourceLocalizationSpecPBImpl)resource).getProto();
}
@Override
@ -154,34 +163,10 @@ public class LocalizerHeartbeatResponsePBImpl
builder.addAllResources(iterable);
}
public void addAllResources(List<LocalResource> resources) {
if (resources == null)
return;
initResources();
this.resources.addAll(resources);
}
public void addResource(LocalResource resource) {
initResources();
this.resources.add(resource);
}
public void removeResource(int index) {
initResources();
this.resources.remove(index);
}
public void clearResources() {
initResources();
this.resources.clear();
}
private LocalResource convertFromProtoFormat(LocalResourceProto p) {
return new LocalResourcePBImpl(p);
}
private LocalResourceProto convertToProtoFormat(LocalResource s) {
return ((LocalResourcePBImpl)s).getProto();
private ResourceLocalizationSpec convertFromProtoFormat(
ResourceLocalizationSpecProto p) {
return new ResourceLocalizationSpecPBImpl(p);
}
private LocalizerActionProto convertToProtoFormat(LocalizerAction a) {
@ -191,5 +176,4 @@ public class LocalizerHeartbeatResponsePBImpl
private LocalizerAction convertFromProtoFormat(LocalizerActionProto a) {
return LocalizerAction.valueOf(a.name());
}
}
}

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
@ -59,6 +60,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
@ -89,8 +91,6 @@ public class ContainerLocalizer {
private final String localizerId;
private final FileContext lfs;
private final Configuration conf;
private final LocalDirAllocator appDirs;
private final LocalDirAllocator userDirs;
private final RecordFactory recordFactory;
private final Map<LocalResource,Future<Path>> pendingResources;
private final String appCacheDirContextName;
@ -112,8 +112,6 @@ public class ContainerLocalizer {
this.recordFactory = recordFactory;
this.conf = new Configuration();
this.appCacheDirContextName = String.format(APPCACHE_CTXT_FMT, appId);
this.appDirs = new LocalDirAllocator(appCacheDirContextName);
this.userDirs = new LocalDirAllocator(String.format(USERCACHE_CTXT_FMT, user));
this.pendingResources = new HashMap<LocalResource,Future<Path>>();
}
@ -197,10 +195,10 @@ public class ContainerLocalizer {
return new ExecutorCompletionService<Path>(exec);
}
Callable<Path> download(LocalDirAllocator lda, LocalResource rsrc,
Callable<Path> download(Path path, LocalResource rsrc,
UserGroupInformation ugi) throws IOException {
Path destPath = lda.getLocalPathForWrite(".", getEstimatedSize(rsrc), conf);
return new FSDownload(lfs, ugi, conf, destPath, rsrc, new Random());
DiskChecker.checkDir(new File(path.toUri().getRawPath()));
return new FSDownload(lfs, ugi, conf, path, rsrc, new Random());
}
static long getEstimatedSize(LocalResource rsrc) {
@ -238,25 +236,12 @@ public class ContainerLocalizer {
LocalizerHeartbeatResponse response = nodemanager.heartbeat(status);
switch (response.getLocalizerAction()) {
case LIVE:
List<LocalResource> newResources = response.getAllResources();
for (LocalResource r : newResources) {
if (!pendingResources.containsKey(r)) {
final LocalDirAllocator lda;
switch (r.getVisibility()) {
default:
LOG.warn("Unknown visibility: " + r.getVisibility()
+ ", Using userDirs");
//Falling back to userDirs for unknown visibility.
case PUBLIC:
case PRIVATE:
lda = userDirs;
break;
case APPLICATION:
lda = appDirs;
break;
}
// TODO: Synchronization??
pendingResources.put(r, cs.submit(download(lda, r, ugi)));
List<ResourceLocalizationSpec> newRsrcs = response.getResourceSpecs();
for (ResourceLocalizationSpec newRsrc : newRsrcs) {
if (!pendingResources.containsKey(newRsrc.getResource())) {
pendingResources.put(newRsrc.getResource(), cs.submit(download(
new Path(newRsrc.getDestinationDirectory().getFile()),
newRsrc.getResource(), ugi)));
}
}
break;

View File

@ -22,8 +22,6 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.Queue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.conf.YarnConfiguration;

View File

@ -80,10 +80,12 @@ import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
@ -105,6 +107,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerBuilderUtils;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.util.ConverterUtils;
@ -326,7 +329,7 @@ public class ResourceLocalizationService extends CompositeService
// 0) Create application tracking structs
String userName = app.getUser();
privateRsrc.putIfAbsent(userName, new LocalResourcesTrackerImpl(userName,
dispatcher, false, super.getConfig()));
dispatcher, true, super.getConfig()));
if (null != appRsrc.putIfAbsent(
ConverterUtils.toString(app.getAppId()),
new LocalResourcesTrackerImpl(app.getUser(), dispatcher, false, super
@ -476,6 +479,21 @@ public class ResourceLocalizationService extends CompositeService
}
}
private String getUserFileCachePath(String user) {
String path =
"." + Path.SEPARATOR + ContainerLocalizer.USERCACHE + Path.SEPARATOR
+ user + Path.SEPARATOR + ContainerLocalizer.FILECACHE;
return path;
}
private String getUserAppCachePath(String user, String appId) {
String path =
"." + Path.SEPARATOR + ContainerLocalizer.USERCACHE + Path.SEPARATOR
+ user + Path.SEPARATOR + ContainerLocalizer.APPCACHE
+ Path.SEPARATOR + appId;
return path;
}
/**
* Sub-component handling the spawning of {@link ContainerLocalizer}s
*/
@ -803,7 +821,20 @@ public class ResourceLocalizationService extends CompositeService
LocalResource next = findNextResource();
if (next != null) {
response.setLocalizerAction(LocalizerAction.LIVE);
response.addResource(next);
try {
ArrayList<ResourceLocalizationSpec> rsrcs =
new ArrayList<ResourceLocalizationSpec>();
ResourceLocalizationSpec rsrc =
NodeManagerBuilderUtils.newResourceLocalizationSpec(next,
getPathForLocalization(next));
rsrcs.add(rsrc);
response.setResourceSpecs(rsrcs);
} catch (IOException e) {
LOG.error("local path for PRIVATE localization could not be found."
+ "Disks might have failed.", e);
} catch (URISyntaxException e) {
// TODO fail? Already translated several times...
}
} else if (pending.isEmpty()) {
// TODO: Synchronization
response.setLocalizerAction(LocalizerAction.DIE);
@ -812,7 +843,8 @@ public class ResourceLocalizationService extends CompositeService
}
return response;
}
ArrayList<ResourceLocalizationSpec> rsrcs =
new ArrayList<ResourceLocalizationSpec>();
for (LocalResourceStatus stat : remoteResourceStatuses) {
LocalResource rsrc = stat.getResource();
LocalResourceRequest req = null;
@ -835,6 +867,7 @@ public class ResourceLocalizationService extends CompositeService
new ResourceLocalizedEvent(req,
ConverterUtils.getPathFromYarnURL(stat.getLocalPath()),
stat.getLocalSize()));
localizationCompleted(stat);
} catch (URISyntaxException e) { }
if (pending.isEmpty()) {
// TODO: Synchronization
@ -844,7 +877,17 @@ public class ResourceLocalizationService extends CompositeService
response.setLocalizerAction(LocalizerAction.LIVE);
LocalResource next = findNextResource();
if (next != null) {
response.addResource(next);
try {
ResourceLocalizationSpec resource =
NodeManagerBuilderUtils.newResourceLocalizationSpec(next,
getPathForLocalization(next));
rsrcs.add(resource);
} catch (IOException e) {
LOG.error("local path for PRIVATE localization could not be " +
"found. Disks might have failed.", e);
} catch (URISyntaxException e) {
//TODO fail? Already translated several times...
}
}
break;
case FETCH_PENDING:
@ -854,6 +897,7 @@ public class ResourceLocalizationService extends CompositeService
LOG.info("DEBUG: FAILED " + req, stat.getException());
assoc.getResource().unlock();
response.setLocalizerAction(LocalizerAction.DIE);
localizationCompleted(stat);
// TODO: Why is this event going directly to the container. Why not
// the resource itself? What happens to the resource? Is it removed?
dispatcher.getEventHandler().handle(
@ -869,9 +913,53 @@ public class ResourceLocalizationService extends CompositeService
break;
}
}
response.setResourceSpecs(rsrcs);
return response;
}
private void localizationCompleted(LocalResourceStatus stat) {
try {
LocalResource rsrc = stat.getResource();
LocalResourceRequest key = new LocalResourceRequest(rsrc);
String user = context.getUser();
ApplicationId appId =
context.getContainerId().getApplicationAttemptId()
.getApplicationId();
LocalResourceVisibility vis = rsrc.getVisibility();
LocalResourcesTracker tracker =
getLocalResourcesTracker(vis, user, appId);
if (stat.getStatus() == ResourceStatusType.FETCH_SUCCESS) {
tracker.localizationCompleted(key, true);
} else {
tracker.localizationCompleted(key, false);
}
} catch (URISyntaxException e) {
LOG.error("Invalid resource URL specified", e);
}
}
private Path getPathForLocalization(LocalResource rsrc) throws IOException,
URISyntaxException {
String user = context.getUser();
ApplicationId appId =
context.getContainerId().getApplicationAttemptId().getApplicationId();
LocalResourceVisibility vis = rsrc.getVisibility();
LocalResourcesTracker tracker =
getLocalResourcesTracker(vis, user, appId);
String cacheDirectory = null;
if (vis == LocalResourceVisibility.PRIVATE) {// PRIVATE Only
cacheDirectory = getUserFileCachePath(user);
} else {// APPLICATION ONLY
cacheDirectory = getUserAppCachePath(user, appId.toString());
}
Path dirPath =
dirsHandler.getLocalPathForWrite(cacheDirectory,
ContainerLocalizer.getEstimatedSize(rsrc), false);
return tracker.getPathForLocalization(new LocalResourceRequest(rsrc),
dirPath);
}
@Override
@SuppressWarnings("unchecked") // dispatcher not typed
public void run() {
@ -1033,4 +1121,4 @@ public class ResourceLocalizationService extends CompositeService
del.delete(null, dirPath, new Path[] {});
}
}
}

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.util;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
public class NodeManagerBuilderUtils {
public static ResourceLocalizationSpec newResourceLocalizationSpec(
LocalResource rsrc, Path path) {
URL local = ConverterUtils.getYarnUrlFromPath(path);
ResourceLocalizationSpec resourceLocalizationSpec =
Records.newRecord(ResourceLocalizationSpec.class);
resourceLocalizationSpec.setDestinationDirectory(local);
resourceLocalizationSpec.setResource(rsrc);
return resourceLocalizationSpec;
}
}

View File

@ -47,7 +47,12 @@ enum LocalizerActionProto {
DIE = 2;
}
message ResourceLocalizationSpecProto {
optional LocalResourceProto resource = 1;
optional URLProto destination_directory = 2;
}
message LocalizerHeartbeatResponseProto {
optional LocalizerActionProto action = 1;
repeated LocalResourceProto resources = 2;
repeated ResourceLocalizationSpecProto resources = 2;
}

View File

@ -17,6 +17,13 @@
*/
package org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.impl.pb;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.net.URISyntaxException;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
@ -31,15 +38,14 @@ import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalResourceStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerHeartbeatResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerStatusProto;
import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.Test;
import static org.junit.Assert.*;
public class TestPBRecordImpl {
@ -54,9 +60,8 @@ public class TestPBRecordImpl {
static LocalResource createResource() {
LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
assertTrue(ret instanceof LocalResourcePBImpl);
ret.setResource(
ConverterUtils.getYarnUrlFromPath(
new Path("hdfs://y.ak:8020/foo/bar")));
ret.setResource(ConverterUtils.getYarnUrlFromPath(new Path(
"hdfs://y.ak:8020/foo/bar")));
ret.setSize(4344L);
ret.setTimestamp(3141592653589793L);
ret.setVisibility(LocalResourceVisibility.PUBLIC);
@ -90,16 +95,27 @@ public class TestPBRecordImpl {
return ret;
}
static LocalizerHeartbeatResponse createLocalizerHeartbeatResponse() {
static LocalizerHeartbeatResponse createLocalizerHeartbeatResponse()
throws URISyntaxException {
LocalizerHeartbeatResponse ret =
recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class);
assertTrue(ret instanceof LocalizerHeartbeatResponsePBImpl);
ret.setLocalizerAction(LocalizerAction.LIVE);
ret.addResource(createResource());
LocalResource rsrc = createResource();
ArrayList<ResourceLocalizationSpec> rsrcs =
new ArrayList<ResourceLocalizationSpec>();
ResourceLocalizationSpec resource =
recordFactory.newRecordInstance(ResourceLocalizationSpec.class);
resource.setResource(rsrc);
resource.setDestinationDirectory(ConverterUtils
.getYarnUrlFromPath(new Path("/tmp" + System.currentTimeMillis())));
rsrcs.add(resource);
ret.setResourceSpecs(rsrcs);
System.out.println(resource);
return ret;
}
@Test
@Test(timeout=10000)
public void testLocalResourceStatusSerDe() throws Exception {
LocalResourceStatus rsrcS = createLocalResourceStatus();
assertTrue(rsrcS instanceof LocalResourceStatusPBImpl);
@ -119,7 +135,7 @@ public class TestPBRecordImpl {
assertEquals(createResource(), rsrcD.getResource());
}
@Test
@Test(timeout=10000)
public void testLocalizerStatusSerDe() throws Exception {
LocalizerStatus rsrcS = createLocalizerStatus();
assertTrue(rsrcS instanceof LocalizerStatusPBImpl);
@ -141,7 +157,7 @@ public class TestPBRecordImpl {
assertEquals(createLocalResourceStatus(), rsrcD.getResourceStatus(0));
}
@Test
@Test(timeout=10000)
public void testLocalizerHeartbeatResponseSerDe() throws Exception {
LocalizerHeartbeatResponse rsrcS = createLocalizerHeartbeatResponse();
assertTrue(rsrcS instanceof LocalizerHeartbeatResponsePBImpl);
@ -158,8 +174,8 @@ public class TestPBRecordImpl {
new LocalizerHeartbeatResponsePBImpl(rsrcPbD);
assertEquals(rsrcS, rsrcD);
assertEquals(createResource(), rsrcS.getLocalResource(0));
assertEquals(createResource(), rsrcD.getLocalResource(0));
assertEquals(createResource(), rsrcS.getResourceSpecs().get(0).getResource());
assertEquals(createResource(), rsrcD.getResourceSpecs().get(0).getResource());
}
}

View File

@ -20,7 +20,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
@ -28,28 +28,30 @@ public class MockLocalizerHeartbeatResponse
implements LocalizerHeartbeatResponse {
LocalizerAction action;
List<LocalResource> rsrc;
List<ResourceLocalizationSpec> resourceSpecs;
MockLocalizerHeartbeatResponse() {
rsrc = new ArrayList<LocalResource>();
resourceSpecs = new ArrayList<ResourceLocalizationSpec>();
}
MockLocalizerHeartbeatResponse(
LocalizerAction action, List<LocalResource> rsrc) {
LocalizerAction action, List<ResourceLocalizationSpec> resources) {
this.action = action;
this.rsrc = rsrc;
this.resourceSpecs = resources;
}
public LocalizerAction getLocalizerAction() { return action; }
public List<LocalResource> getAllResources() { return rsrc; }
public LocalResource getLocalResource(int i) { return rsrc.get(i); }
public void setLocalizerAction(LocalizerAction action) {
this.action = action;
}
public void addAllResources(List<LocalResource> resources) {
rsrc.addAll(resources);
}
public void addResource(LocalResource resource) { rsrc.add(resource); }
public void removeResource(int index) { rsrc.remove(index); }
public void clearResources() { rsrc.clear(); }
@Override
public List<ResourceLocalizationSpec> getResourceSpecs() {
return resourceSpecs;
}
@Override
public void setResourceSpecs(List<ResourceLocalizationSpec> resourceSpecs) {
this.resourceSpecs = resourceSpecs;
}
}

View File

@ -50,7 +50,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.AbstractFileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.DataInputBuffer;
@ -66,9 +65,11 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
import org.mockito.invocation.InvocationOnMock;
@ -95,12 +96,33 @@ public class TestContainerLocalizer {
public void testContainerLocalizerMain() throws Exception {
ContainerLocalizer localizer = setupContainerLocalizerForTest();
// verify created cache
List<Path> privCacheList = new ArrayList<Path>();
List<Path> appCacheList = new ArrayList<Path>();
for (Path p : localDirs) {
Path base = new Path(new Path(p, ContainerLocalizer.USERCACHE), appUser);
Path privcache = new Path(base, ContainerLocalizer.FILECACHE);
privCacheList.add(privcache);
Path appDir =
new Path(base, new Path(ContainerLocalizer.APPCACHE, appId));
Path appcache = new Path(appDir, ContainerLocalizer.FILECACHE);
appCacheList.add(appcache);
}
// mock heartbeat responses from NM
LocalResource rsrcA = getMockRsrc(random, LocalResourceVisibility.PRIVATE);
LocalResource rsrcB = getMockRsrc(random, LocalResourceVisibility.PRIVATE);
LocalResource rsrcC = getMockRsrc(random,
LocalResourceVisibility.APPLICATION);
LocalResource rsrcD = getMockRsrc(random, LocalResourceVisibility.PRIVATE);
ResourceLocalizationSpec rsrcA =
getMockRsrc(random, LocalResourceVisibility.PRIVATE,
privCacheList.get(0));
ResourceLocalizationSpec rsrcB =
getMockRsrc(random, LocalResourceVisibility.PRIVATE,
privCacheList.get(0));
ResourceLocalizationSpec rsrcC =
getMockRsrc(random, LocalResourceVisibility.APPLICATION,
appCacheList.get(0));
ResourceLocalizationSpec rsrcD =
getMockRsrc(random, LocalResourceVisibility.PRIVATE,
privCacheList.get(0));
when(nmProxy.heartbeat(isA(LocalizerStatus.class)))
.thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE,
Collections.singletonList(rsrcA)))
@ -111,27 +133,33 @@ public class TestContainerLocalizer {
.thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE,
Collections.singletonList(rsrcD)))
.thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE,
Collections.<LocalResource>emptyList()))
Collections.<ResourceLocalizationSpec>emptyList()))
.thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.DIE,
null));
doReturn(new FakeDownload(rsrcA.getResource().getFile(), true)).when(
localizer).download(isA(LocalDirAllocator.class), eq(rsrcA),
LocalResource tRsrcA = rsrcA.getResource();
LocalResource tRsrcB = rsrcB.getResource();
LocalResource tRsrcC = rsrcC.getResource();
LocalResource tRsrcD = rsrcD.getResource();
doReturn(
new FakeDownload(rsrcA.getResource().getResource().getFile(), true))
.when(localizer).download(isA(Path.class), eq(tRsrcA),
isA(UserGroupInformation.class));
doReturn(new FakeDownload(rsrcB.getResource().getFile(), true)).when(
localizer).download(isA(LocalDirAllocator.class), eq(rsrcB),
doReturn(
new FakeDownload(rsrcB.getResource().getResource().getFile(), true))
.when(localizer).download(isA(Path.class), eq(tRsrcB),
isA(UserGroupInformation.class));
doReturn(new FakeDownload(rsrcC.getResource().getFile(), true)).when(
localizer).download(isA(LocalDirAllocator.class), eq(rsrcC),
doReturn(
new FakeDownload(rsrcC.getResource().getResource().getFile(), true))
.when(localizer).download(isA(Path.class), eq(tRsrcC),
isA(UserGroupInformation.class));
doReturn(new FakeDownload(rsrcD.getResource().getFile(), true)).when(
localizer).download(isA(LocalDirAllocator.class), eq(rsrcD),
doReturn(
new FakeDownload(rsrcD.getResource().getResource().getFile(), true))
.when(localizer).download(isA(Path.class), eq(tRsrcD),
isA(UserGroupInformation.class));
// run localization
assertEquals(0, localizer.runLocalization(nmAddr));
// verify created cache
for (Path p : localDirs) {
Path base = new Path(new Path(p, ContainerLocalizer.USERCACHE), appUser);
Path privcache = new Path(base, ContainerLocalizer.FILECACHE);
@ -147,15 +175,14 @@ public class TestContainerLocalizer {
Path appcacheAfsPath = new Path(appcache.toUri().getPath());
verify(spylfs).mkdir(eq(appcacheAfsPath), isA(FsPermission.class), eq(false));
}
// verify tokens read at expected location
verify(spylfs).open(tokenPath);
// verify downloaded resources reported to NM
verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcA)));
verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcB)));
verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcC)));
verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcD)));
verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcA.getResource())));
verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcB.getResource())));
verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcC.getResource())));
verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcD.getResource())));
// verify all HB use localizerID provided
verify(nmProxy, never()).heartbeat(argThat(
@ -310,10 +337,12 @@ public class TestContainerLocalizer {
return mockRF;
}
static LocalResource getMockRsrc(Random r,
LocalResourceVisibility vis) {
LocalResource rsrc = mock(LocalResource.class);
static ResourceLocalizationSpec getMockRsrc(Random r,
LocalResourceVisibility vis, Path p) {
ResourceLocalizationSpec resourceLocalizationSpec =
mock(ResourceLocalizationSpec.class);
LocalResource rsrc = mock(LocalResource.class);
String name = Long.toHexString(r.nextLong());
URL uri = mock(org.apache.hadoop.yarn.api.records.URL.class);
when(uri.getScheme()).thenReturn("file");
@ -326,7 +355,10 @@ public class TestContainerLocalizer {
when(rsrc.getType()).thenReturn(LocalResourceType.FILE);
when(rsrc.getVisibility()).thenReturn(vis);
return rsrc;
when(resourceLocalizationSpec.getResource()).thenReturn(rsrc);
when(resourceLocalizationSpec.getDestinationDirectory()).
thenReturn(ConverterUtils.getYarnUrlFromPath(p));
return resourceLocalizationSpec;
}
@SuppressWarnings({ "rawtypes", "unchecked" })

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
@ -35,6 +36,7 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.times;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -375,7 +377,7 @@ public class TestResourceLocalizationService {
}
}
@Test
@Test( timeout = 10000)
@SuppressWarnings("unchecked") // mocked generics
public void testLocalizationHeartbeat() throws Exception {
Configuration conf = new YarnConfiguration();
@ -386,12 +388,17 @@ public class TestResourceLocalizationService {
isA(Path.class), isA(FsPermission.class), anyBoolean());
List<Path> localDirs = new ArrayList<Path>();
String[] sDirs = new String[4];
for (int i = 0; i < 4; ++i) {
localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
sDirs[i] = localDirs.get(i).toString();
}
String[] sDirs = new String[1];
// Making sure that we have only one local disk so that it will only be
// selected for consecutive resource localization calls. This is required
// to test LocalCacheDirectoryManager.
localDirs.add(lfs.makeQualified(new Path(basedir, 0 + "")));
sDirs[0] = localDirs.get(0).toString();
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
// Adding configuration to make sure there is only one file per
// directory
conf.set(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, "37");
String logDir = lfs.makeQualified(new Path(basedir, "logdir " )).toString();
conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
DrainDispatcher dispatcher = new DrainDispatcher();
@ -452,12 +459,23 @@ public class TestResourceLocalizationService {
doReturn(out).when(spylfs).createInternal(isA(Path.class),
isA(EnumSet.class), isA(FsPermission.class), anyInt(), anyShort(),
anyLong(), isA(Progressable.class), isA(ChecksumOpt.class), anyBoolean());
final LocalResource resource = getPrivateMockedResource(r);
final LocalResourceRequest req = new LocalResourceRequest(resource);
final LocalResource resource1 = getPrivateMockedResource(r);
LocalResource resource2 = null;
do {
resource2 = getPrivateMockedResource(r);
} while (resource2 == null || resource2.equals(resource1));
// above call to make sure we don't get identical resources.
final LocalResourceRequest req1 = new LocalResourceRequest(resource1);
final LocalResourceRequest req2 = new LocalResourceRequest(resource2);
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
new HashMap<LocalResourceVisibility,
Collection<LocalResourceRequest>>();
rsrcs.put(LocalResourceVisibility.PRIVATE, Collections.singletonList(req));
List<LocalResourceRequest> privateResourceList =
new ArrayList<LocalResourceRequest>();
privateResourceList.add(req1);
privateResourceList.add(req2);
rsrcs.put(LocalResourceVisibility.PRIVATE, privateResourceList);
spyService.handle(new ContainerLocalizationRequestEvent(c, rsrcs));
// Sigh. Thread init of private localizer not accessible
Thread.sleep(1000);
@ -471,33 +489,64 @@ public class TestResourceLocalizationService {
Path localizationTokenPath = tokenPathCaptor.getValue();
// heartbeat from localizer
LocalResourceStatus rsrcStat = mock(LocalResourceStatus.class);
LocalResourceStatus rsrcStat1 = mock(LocalResourceStatus.class);
LocalResourceStatus rsrcStat2 = mock(LocalResourceStatus.class);
LocalizerStatus stat = mock(LocalizerStatus.class);
when(stat.getLocalizerId()).thenReturn(ctnrStr);
when(rsrcStat.getResource()).thenReturn(resource);
when(rsrcStat.getLocalSize()).thenReturn(4344L);
when(rsrcStat1.getResource()).thenReturn(resource1);
when(rsrcStat2.getResource()).thenReturn(resource2);
when(rsrcStat1.getLocalSize()).thenReturn(4344L);
when(rsrcStat2.getLocalSize()).thenReturn(2342L);
URL locPath = getPath("/cache/private/blah");
when(rsrcStat.getLocalPath()).thenReturn(locPath);
when(rsrcStat.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS);
when(rsrcStat1.getLocalPath()).thenReturn(locPath);
when(rsrcStat2.getLocalPath()).thenReturn(locPath);
when(rsrcStat1.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS);
when(rsrcStat2.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS);
when(stat.getResources())
.thenReturn(Collections.<LocalResourceStatus>emptyList())
.thenReturn(Collections.singletonList(rsrcStat))
.thenReturn(Collections.singletonList(rsrcStat1))
.thenReturn(Collections.singletonList(rsrcStat2))
.thenReturn(Collections.<LocalResourceStatus>emptyList());
// get rsrc
String localPath = Path.SEPARATOR + ContainerLocalizer.USERCACHE +
Path.SEPARATOR + "user0" + Path.SEPARATOR +
ContainerLocalizer.FILECACHE;
// get first resource
LocalizerHeartbeatResponse response = spyService.heartbeat(stat);
assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
assertEquals(req, new LocalResourceRequest(response.getLocalResource(0)));
assertEquals(1, response.getResourceSpecs().size());
assertEquals(req1,
new LocalResourceRequest(response.getResourceSpecs().get(0).getResource()));
URL localizedPath =
response.getResourceSpecs().get(0).getDestinationDirectory();
assertTrue(localizedPath.getFile().endsWith(localPath));
// get second resource
response = spyService.heartbeat(stat);
assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
assertEquals(1, response.getResourceSpecs().size());
assertEquals(req2, new LocalResourceRequest(response.getResourceSpecs()
.get(0).getResource()));
localizedPath =
response.getResourceSpecs().get(0).getDestinationDirectory();
// Resource's destination path should be now inside sub directory 0 as
// LocalCacheDirectoryManager will be used and we have restricted number
// of files per directory to 1.
assertTrue(localizedPath.getFile().endsWith(
localPath + Path.SEPARATOR + "0"));
// empty rsrc
response = spyService.heartbeat(stat);
assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
assertEquals(0, response.getAllResources().size());
assertEquals(0, response.getResourceSpecs().size());
// get shutdown
response = spyService.heartbeat(stat);
assertEquals(LocalizerAction.DIE, response.getLocalizerAction());
dispatcher.await();
// verify container notification
ArgumentMatcher<ContainerEvent> matchesContainerLoc =
new ArgumentMatcher<ContainerEvent>() {
@ -508,9 +557,9 @@ public class TestResourceLocalizationService {
&& c.getContainerID() == evt.getContainerID();
}
};
dispatcher.await();
verify(containerBus).handle(argThat(matchesContainerLoc));
// total 2 resource localzation calls. one for each resource.
verify(containerBus, times(2)).handle(argThat(matchesContainerLoc));
// Verify deletion of localization token.
verify(delService).delete((String)isNull(), eq(localizationTokenPath));
} finally {