From 335e02124bd2145f3765c37313a8693e79f612b7 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Tue, 9 Apr 2013 01:35:05 +0000 Subject: [PATCH] YARN-99. Modify private distributed cache to localize files such that no local directory hits unix file count limits and thus prevent job failures. Contributed by Omkar Vinit Joshi. svn merge --ignore-ancestry -c 1465853 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1465854 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 4 + .../api/ResourceLocalizationSpec.java | 37 ++++++ .../pb/ResourceLocalizationSpecPBImpl.java | 118 ++++++++++++++++++ .../LocalizerHeartbeatResponse.java | 15 +-- .../pb/LocalizerHeartbeatResponsePBImpl.java | 96 ++++++-------- .../localizer/ContainerLocalizer.java | 37 ++---- .../localizer/LocalCacheDirectoryManager.java | 2 - .../ResourceLocalizationService.java | 98 ++++++++++++++- .../util/NodeManagerBuilderUtils.java | 39 ++++++ ...rn_server_nodemanager_service_protos.proto | 7 +- .../impl/pb/TestPBRecordImpl.java | 40 ++++-- .../MockLocalizerHeartbeatResponse.java | 28 +++-- .../localizer/TestContainerLocalizer.java | 84 +++++++++---- .../TestResourceLocalizationService.java | 91 ++++++++++---- 14 files changed, 524 insertions(+), 172 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/ResourceLocalizationSpec.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/ResourceLocalizationSpecPBImpl.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/NodeManagerBuilderUtils.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index e7f0913008b..062a5aef293 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -146,6 +146,10 @@ Release 2.0.5-beta - UNRELEASED to implement closeable so that they can be stopped when needed via RPC.stopProxy(). (Siddharth Seth via vinodkv) + YARN-99. Modify private distributed cache to localize files such that no + local directory hits unix file count limits and thus prevent job failures. + (Omkar Vinit Joshi via vinodkv) + Release 2.0.4-alpha - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/ResourceLocalizationSpec.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/ResourceLocalizationSpec.java new file mode 100644 index 00000000000..63c3fd3fb8b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/ResourceLocalizationSpec.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.api; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.URL; + +import com.google.common.annotations.VisibleForTesting; + +@Private +@VisibleForTesting +public interface ResourceLocalizationSpec { + + public void setResource(LocalResource rsrc); + + public LocalResource getResource(); + + public void setDestinationDirectory(URL destinationDirectory); + + public URL getDestinationDirectory(); +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/ResourceLocalizationSpecPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/ResourceLocalizationSpecPBImpl.java new file mode 100644 index 00000000000..643d3a692eb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/ResourceLocalizationSpecPBImpl.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.api.impl.pb; + +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.ProtoBase; +import org.apache.hadoop.yarn.api.records.URL; +import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.URLPBImpl; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.ResourceLocalizationSpecProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.ResourceLocalizationSpecProtoOrBuilder; +import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec; + +public class ResourceLocalizationSpecPBImpl extends + ProtoBase implements + ResourceLocalizationSpec { + + private ResourceLocalizationSpecProto proto = ResourceLocalizationSpecProto + .getDefaultInstance(); + private ResourceLocalizationSpecProto.Builder builder = null; + private boolean viaProto; + private LocalResource resource = null; + private URL destinationDirectory = null; + + public ResourceLocalizationSpecPBImpl() { + builder = ResourceLocalizationSpecProto.newBuilder(); + } + + public ResourceLocalizationSpecPBImpl(ResourceLocalizationSpecProto proto) { + this.proto = proto; + viaProto = true; + } + + @Override + public LocalResource getResource() { + ResourceLocalizationSpecProtoOrBuilder p = viaProto ? proto : builder; + if (resource != null) { + return resource; + } + if (!p.hasResource()) { + return null; + } + resource = new LocalResourcePBImpl(p.getResource()); + return resource; + } + + @Override + public void setResource(LocalResource rsrc) { + maybeInitBuilder(); + resource = rsrc; + } + + @Override + public URL getDestinationDirectory() { + ResourceLocalizationSpecProtoOrBuilder p = viaProto ? proto : builder; + if (destinationDirectory != null) { + return destinationDirectory; + } + if (!p.hasDestinationDirectory()) { + return null; + } + destinationDirectory = new URLPBImpl(p.getDestinationDirectory()); + return destinationDirectory; + } + + @Override + public void setDestinationDirectory(URL destinationDirectory) { + maybeInitBuilder(); + this.destinationDirectory = destinationDirectory; + } + + @Override + public ResourceLocalizationSpecProto getProto() { + mergeLocalToBuilder(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private synchronized void maybeInitBuilder() { + if (builder == null || viaProto) { + builder = ResourceLocalizationSpecProto.newBuilder(proto); + } + viaProto = false; + } + + private void mergeLocalToBuilder() { + ResourceLocalizationSpecProtoOrBuilder l = viaProto ? proto : builder; + if (this.resource != null + && !(l.getResource() + .equals(((LocalResourcePBImpl) resource).getProto()))) { + maybeInitBuilder(); + builder.setResource(((LocalResourcePBImpl) resource).getProto()); + } + if (this.destinationDirectory != null + && !(l.getDestinationDirectory() + .equals(((URLPBImpl) destinationDirectory).getProto()))) { + maybeInitBuilder(); + builder.setDestinationDirectory(((URLPBImpl) destinationDirectory) + .getProto()); + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LocalizerHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LocalizerHeartbeatResponse.java index b2f46c5ae38..9d2681a4474 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LocalizerHeartbeatResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LocalizerHeartbeatResponse.java @@ -18,18 +18,13 @@ package org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords; import java.util.List; - -import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.server.nodemanager.api.*; public interface LocalizerHeartbeatResponse { - public LocalizerAction getLocalizerAction(); - public List getAllResources(); - public LocalResource getLocalResource(int i); + public LocalizerAction getLocalizerAction(); public void setLocalizerAction(LocalizerAction action); - public void addAllResources(List resources); - public void addResource(LocalResource resource); - public void removeResource(int index); - public void clearResources(); -} + public List getResourceSpecs(); + public void setResourceSpecs(List rsrcs); +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LocalizerHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LocalizerHeartbeatResponsePBImpl.java index 0b791c0c14f..d46ba56e22b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LocalizerHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LocalizerHeartbeatResponsePBImpl.java @@ -21,13 +21,14 @@ import java.util.Iterator; import java.util.List; -import org.apache.hadoop.yarn.api.records.LocalResource; + import org.apache.hadoop.yarn.api.records.ProtoBase; -import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl; -import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerActionProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerHeartbeatResponseProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerHeartbeatResponseProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.ResourceLocalizationSpecProto; +import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec; +import org.apache.hadoop.yarn.server.nodemanager.api.impl.pb.ResourceLocalizationSpecPBImpl; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse; @@ -40,13 +41,14 @@ public class LocalizerHeartbeatResponsePBImpl LocalizerHeartbeatResponseProto.Builder builder = null; boolean viaProto = false; - private List resources; + private List resourceSpecs; public LocalizerHeartbeatResponsePBImpl() { builder = LocalizerHeartbeatResponseProto.newBuilder(); } - public LocalizerHeartbeatResponsePBImpl(LocalizerHeartbeatResponseProto proto) { + public LocalizerHeartbeatResponsePBImpl( + LocalizerHeartbeatResponseProto proto) { this.proto = proto; viaProto = true; } @@ -59,7 +61,7 @@ public LocalizerHeartbeatResponseProto getProto() { } private void mergeLocalToBuilder() { - if (resources != null) { + if (resourceSpecs != null) { addResourcesToProto(); } } @@ -79,6 +81,7 @@ private void maybeInitBuilder() { viaProto = false; } + @Override public LocalizerAction getLocalizerAction() { LocalizerHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; if (!p.hasAction()) { @@ -87,14 +90,10 @@ public LocalizerAction getLocalizerAction() { return convertFromProtoFormat(p.getAction()); } - public List getAllResources() { + @Override + public List getResourceSpecs() { initResources(); - return this.resources; - } - - public LocalResource getLocalResource(int i) { - initResources(); - return this.resources.get(i); + return this.resourceSpecs; } public void setLocalizerAction(LocalizerAction action) { @@ -106,31 +105,39 @@ public void setLocalizerAction(LocalizerAction action) { builder.setAction(convertToProtoFormat(action)); } + public void setResourceSpecs(List rsrcs) { + maybeInitBuilder(); + if (rsrcs == null) { + builder.clearResources(); + return; + } + this.resourceSpecs = rsrcs; + } + private void initResources() { - if (this.resources != null) { + if (this.resourceSpecs != null) { return; } LocalizerHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; - List list = p.getResourcesList(); - this.resources = new ArrayList(); - - for (LocalResourceProto c : list) { - this.resources.add(convertFromProtoFormat(c)); + List list = p.getResourcesList(); + this.resourceSpecs = new ArrayList(); + for (ResourceLocalizationSpecProto c : list) { + this.resourceSpecs.add(convertFromProtoFormat(c)); } } private void addResourcesToProto() { maybeInitBuilder(); builder.clearResources(); - if (this.resources == null) + if (this.resourceSpecs == null) return; - Iterable iterable = - new Iterable() { + Iterable iterable = + new Iterable() { @Override - public Iterator iterator() { - return new Iterator() { + public Iterator iterator() { + return new Iterator() { - Iterator iter = resources.iterator(); + Iterator iter = resourceSpecs.iterator(); @Override public boolean hasNext() { @@ -138,8 +145,10 @@ public boolean hasNext() { } @Override - public LocalResourceProto next() { - return convertToProtoFormat(iter.next()); + public ResourceLocalizationSpecProto next() { + ResourceLocalizationSpec resource = iter.next(); + + return ((ResourceLocalizationSpecPBImpl)resource).getProto(); } @Override @@ -154,34 +163,10 @@ public void remove() { builder.addAllResources(iterable); } - public void addAllResources(List resources) { - if (resources == null) - return; - initResources(); - this.resources.addAll(resources); - } - public void addResource(LocalResource resource) { - initResources(); - this.resources.add(resource); - } - - public void removeResource(int index) { - initResources(); - this.resources.remove(index); - } - - public void clearResources() { - initResources(); - this.resources.clear(); - } - - private LocalResource convertFromProtoFormat(LocalResourceProto p) { - return new LocalResourcePBImpl(p); - } - - private LocalResourceProto convertToProtoFormat(LocalResource s) { - return ((LocalResourcePBImpl)s).getProto(); + private ResourceLocalizationSpec convertFromProtoFormat( + ResourceLocalizationSpecProto p) { + return new ResourceLocalizationSpecPBImpl(p); } private LocalizerActionProto convertToProtoFormat(LocalizerAction a) { @@ -191,5 +176,4 @@ private LocalizerActionProto convertToProtoFormat(LocalizerAction a) { private LocalizerAction convertFromProtoFormat(LocalizerActionProto a) { return LocalizerAction.valueOf(a.name()); } - -} +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java index 74d0227c918..71bd4982195 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java @@ -51,6 +51,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; @@ -59,6 +60,7 @@ import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol; +import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus; @@ -89,8 +91,6 @@ public class ContainerLocalizer { private final String localizerId; private final FileContext lfs; private final Configuration conf; - private final LocalDirAllocator appDirs; - private final LocalDirAllocator userDirs; private final RecordFactory recordFactory; private final Map> pendingResources; private final String appCacheDirContextName; @@ -112,8 +112,6 @@ public ContainerLocalizer(FileContext lfs, String user, String appId, this.recordFactory = recordFactory; this.conf = new Configuration(); this.appCacheDirContextName = String.format(APPCACHE_CTXT_FMT, appId); - this.appDirs = new LocalDirAllocator(appCacheDirContextName); - this.userDirs = new LocalDirAllocator(String.format(USERCACHE_CTXT_FMT, user)); this.pendingResources = new HashMap>(); } @@ -197,10 +195,10 @@ CompletionService createCompletionService(ExecutorService exec) { return new ExecutorCompletionService(exec); } - Callable download(LocalDirAllocator lda, LocalResource rsrc, + Callable download(Path path, LocalResource rsrc, UserGroupInformation ugi) throws IOException { - Path destPath = lda.getLocalPathForWrite(".", getEstimatedSize(rsrc), conf); - return new FSDownload(lfs, ugi, conf, destPath, rsrc, new Random()); + DiskChecker.checkDir(new File(path.toUri().getRawPath())); + return new FSDownload(lfs, ugi, conf, path, rsrc, new Random()); } static long getEstimatedSize(LocalResource rsrc) { @@ -238,25 +236,12 @@ protected void localizeFiles(LocalizationProtocol nodemanager, LocalizerHeartbeatResponse response = nodemanager.heartbeat(status); switch (response.getLocalizerAction()) { case LIVE: - List newResources = response.getAllResources(); - for (LocalResource r : newResources) { - if (!pendingResources.containsKey(r)) { - final LocalDirAllocator lda; - switch (r.getVisibility()) { - default: - LOG.warn("Unknown visibility: " + r.getVisibility() - + ", Using userDirs"); - //Falling back to userDirs for unknown visibility. - case PUBLIC: - case PRIVATE: - lda = userDirs; - break; - case APPLICATION: - lda = appDirs; - break; - } - // TODO: Synchronization?? - pendingResources.put(r, cs.submit(download(lda, r, ugi))); + List newRsrcs = response.getResourceSpecs(); + for (ResourceLocalizationSpec newRsrc : newRsrcs) { + if (!pendingResources.containsKey(newRsrc.getResource())) { + pendingResources.put(newRsrc.getResource(), cs.submit(download( + new Path(newRsrc.getDestinationDirectory().getFile()), + newRsrc.getResource(), ugi))); } } break; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java index e32551faeaa..8a3b6bf2088 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java @@ -22,8 +22,6 @@ import java.util.LinkedList; import java.util.Queue; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.conf.YarnConfiguration; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 35b2fb5f477..c03590cc7c5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -80,10 +80,12 @@ import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol; +import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus; +import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; @@ -105,6 +107,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider; +import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerBuilderUtils; import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.service.CompositeService; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -326,7 +329,7 @@ private void handleInitApplicationResources(Application app) { // 0) Create application tracking structs String userName = app.getUser(); privateRsrc.putIfAbsent(userName, new LocalResourcesTrackerImpl(userName, - dispatcher, false, super.getConfig())); + dispatcher, true, super.getConfig())); if (null != appRsrc.putIfAbsent( ConverterUtils.toString(app.getAppId()), new LocalResourcesTrackerImpl(app.getUser(), dispatcher, false, super @@ -476,6 +479,21 @@ LocalResourcesTracker getLocalResourcesTracker( } } + private String getUserFileCachePath(String user) { + String path = + "." + Path.SEPARATOR + ContainerLocalizer.USERCACHE + Path.SEPARATOR + + user + Path.SEPARATOR + ContainerLocalizer.FILECACHE; + return path; + } + + private String getUserAppCachePath(String user, String appId) { + String path = + "." + Path.SEPARATOR + ContainerLocalizer.USERCACHE + Path.SEPARATOR + + user + Path.SEPARATOR + ContainerLocalizer.APPCACHE + + Path.SEPARATOR + appId; + return path; + } + /** * Sub-component handling the spawning of {@link ContainerLocalizer}s */ @@ -803,7 +821,20 @@ LocalizerHeartbeatResponse update( LocalResource next = findNextResource(); if (next != null) { response.setLocalizerAction(LocalizerAction.LIVE); - response.addResource(next); + try { + ArrayList rsrcs = + new ArrayList(); + ResourceLocalizationSpec rsrc = + NodeManagerBuilderUtils.newResourceLocalizationSpec(next, + getPathForLocalization(next)); + rsrcs.add(rsrc); + response.setResourceSpecs(rsrcs); + } catch (IOException e) { + LOG.error("local path for PRIVATE localization could not be found." + + "Disks might have failed.", e); + } catch (URISyntaxException e) { + // TODO fail? Already translated several times... + } } else if (pending.isEmpty()) { // TODO: Synchronization response.setLocalizerAction(LocalizerAction.DIE); @@ -812,7 +843,8 @@ LocalizerHeartbeatResponse update( } return response; } - + ArrayList rsrcs = + new ArrayList(); for (LocalResourceStatus stat : remoteResourceStatuses) { LocalResource rsrc = stat.getResource(); LocalResourceRequest req = null; @@ -835,6 +867,7 @@ LocalizerHeartbeatResponse update( new ResourceLocalizedEvent(req, ConverterUtils.getPathFromYarnURL(stat.getLocalPath()), stat.getLocalSize())); + localizationCompleted(stat); } catch (URISyntaxException e) { } if (pending.isEmpty()) { // TODO: Synchronization @@ -844,7 +877,17 @@ LocalizerHeartbeatResponse update( response.setLocalizerAction(LocalizerAction.LIVE); LocalResource next = findNextResource(); if (next != null) { - response.addResource(next); + try { + ResourceLocalizationSpec resource = + NodeManagerBuilderUtils.newResourceLocalizationSpec(next, + getPathForLocalization(next)); + rsrcs.add(resource); + } catch (IOException e) { + LOG.error("local path for PRIVATE localization could not be " + + "found. Disks might have failed.", e); + } catch (URISyntaxException e) { + //TODO fail? Already translated several times... + } } break; case FETCH_PENDING: @@ -854,6 +897,7 @@ LocalizerHeartbeatResponse update( LOG.info("DEBUG: FAILED " + req, stat.getException()); assoc.getResource().unlock(); response.setLocalizerAction(LocalizerAction.DIE); + localizationCompleted(stat); // TODO: Why is this event going directly to the container. Why not // the resource itself? What happens to the resource? Is it removed? dispatcher.getEventHandler().handle( @@ -869,9 +913,53 @@ LocalizerHeartbeatResponse update( break; } } + response.setResourceSpecs(rsrcs); return response; } + private void localizationCompleted(LocalResourceStatus stat) { + try { + LocalResource rsrc = stat.getResource(); + LocalResourceRequest key = new LocalResourceRequest(rsrc); + String user = context.getUser(); + ApplicationId appId = + context.getContainerId().getApplicationAttemptId() + .getApplicationId(); + LocalResourceVisibility vis = rsrc.getVisibility(); + LocalResourcesTracker tracker = + getLocalResourcesTracker(vis, user, appId); + if (stat.getStatus() == ResourceStatusType.FETCH_SUCCESS) { + tracker.localizationCompleted(key, true); + } else { + tracker.localizationCompleted(key, false); + } + } catch (URISyntaxException e) { + LOG.error("Invalid resource URL specified", e); + } + } + + private Path getPathForLocalization(LocalResource rsrc) throws IOException, + URISyntaxException { + String user = context.getUser(); + ApplicationId appId = + context.getContainerId().getApplicationAttemptId().getApplicationId(); + LocalResourceVisibility vis = rsrc.getVisibility(); + LocalResourcesTracker tracker = + getLocalResourcesTracker(vis, user, appId); + String cacheDirectory = null; + if (vis == LocalResourceVisibility.PRIVATE) {// PRIVATE Only + cacheDirectory = getUserFileCachePath(user); + } else {// APPLICATION ONLY + cacheDirectory = getUserAppCachePath(user, appId.toString()); + } + Path dirPath = + dirsHandler.getLocalPathForWrite(cacheDirectory, + ContainerLocalizer.getEstimatedSize(rsrc), false); + return tracker.getPathForLocalization(new LocalResourceRequest(rsrc), + dirPath); + + } + @Override @SuppressWarnings("unchecked") // dispatcher not typed public void run() { @@ -1033,4 +1121,4 @@ private void cleanUpFilesFromSubDir(FileContext lfs, DeletionService del, del.delete(null, dirPath, new Path[] {}); } -} \ No newline at end of file +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/NodeManagerBuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/NodeManagerBuilderUtils.java new file mode 100644 index 00000000000..21cf1f27c24 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/NodeManagerBuilderUtils.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.util; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.URL; +import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.Records; + +public class NodeManagerBuilderUtils { + + public static ResourceLocalizationSpec newResourceLocalizationSpec( + LocalResource rsrc, Path path) { + URL local = ConverterUtils.getYarnUrlFromPath(path); + ResourceLocalizationSpec resourceLocalizationSpec = + Records.newRecord(ResourceLocalizationSpec.class); + resourceLocalizationSpec.setDestinationDirectory(local); + resourceLocalizationSpec.setResource(rsrc); + return resourceLocalizationSpec; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_service_protos.proto index 500ebf835e1..b1d6ddc5925 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_service_protos.proto @@ -47,7 +47,12 @@ enum LocalizerActionProto { DIE = 2; } +message ResourceLocalizationSpecProto { + optional LocalResourceProto resource = 1; + optional URLProto destination_directory = 2; +} + message LocalizerHeartbeatResponseProto { optional LocalizerActionProto action = 1; - repeated LocalResourceProto resources = 2; + repeated ResourceLocalizationSpecProto resources = 2; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBRecordImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBRecordImpl.java index 8996b1ebfd3..71c7f9f62d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBRecordImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBRecordImpl.java @@ -17,6 +17,13 @@ */ package org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.impl.pb; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.net.URISyntaxException; +import java.util.ArrayList; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataInputBuffer; @@ -31,15 +38,14 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalResourceStatusProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerHeartbeatResponseProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerStatusProto; +import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType; import org.apache.hadoop.yarn.util.ConverterUtils; - import org.junit.Test; -import static org.junit.Assert.*; public class TestPBRecordImpl { @@ -54,9 +60,8 @@ static RecordFactory createPBRecordFactory() { static LocalResource createResource() { LocalResource ret = recordFactory.newRecordInstance(LocalResource.class); assertTrue(ret instanceof LocalResourcePBImpl); - ret.setResource( - ConverterUtils.getYarnUrlFromPath( - new Path("hdfs://y.ak:8020/foo/bar"))); + ret.setResource(ConverterUtils.getYarnUrlFromPath(new Path( + "hdfs://y.ak:8020/foo/bar"))); ret.setSize(4344L); ret.setTimestamp(3141592653589793L); ret.setVisibility(LocalResourceVisibility.PUBLIC); @@ -90,16 +95,27 @@ static LocalizerStatus createLocalizerStatus() { return ret; } - static LocalizerHeartbeatResponse createLocalizerHeartbeatResponse() { + static LocalizerHeartbeatResponse createLocalizerHeartbeatResponse() + throws URISyntaxException { LocalizerHeartbeatResponse ret = recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class); assertTrue(ret instanceof LocalizerHeartbeatResponsePBImpl); ret.setLocalizerAction(LocalizerAction.LIVE); - ret.addResource(createResource()); + LocalResource rsrc = createResource(); + ArrayList rsrcs = + new ArrayList(); + ResourceLocalizationSpec resource = + recordFactory.newRecordInstance(ResourceLocalizationSpec.class); + resource.setResource(rsrc); + resource.setDestinationDirectory(ConverterUtils + .getYarnUrlFromPath(new Path("/tmp" + System.currentTimeMillis()))); + rsrcs.add(resource); + ret.setResourceSpecs(rsrcs); + System.out.println(resource); return ret; } - @Test + @Test(timeout=10000) public void testLocalResourceStatusSerDe() throws Exception { LocalResourceStatus rsrcS = createLocalResourceStatus(); assertTrue(rsrcS instanceof LocalResourceStatusPBImpl); @@ -119,7 +135,7 @@ public void testLocalResourceStatusSerDe() throws Exception { assertEquals(createResource(), rsrcD.getResource()); } - @Test + @Test(timeout=10000) public void testLocalizerStatusSerDe() throws Exception { LocalizerStatus rsrcS = createLocalizerStatus(); assertTrue(rsrcS instanceof LocalizerStatusPBImpl); @@ -141,7 +157,7 @@ public void testLocalizerStatusSerDe() throws Exception { assertEquals(createLocalResourceStatus(), rsrcD.getResourceStatus(0)); } - @Test + @Test(timeout=10000) public void testLocalizerHeartbeatResponseSerDe() throws Exception { LocalizerHeartbeatResponse rsrcS = createLocalizerHeartbeatResponse(); assertTrue(rsrcS instanceof LocalizerHeartbeatResponsePBImpl); @@ -158,8 +174,8 @@ public void testLocalizerHeartbeatResponseSerDe() throws Exception { new LocalizerHeartbeatResponsePBImpl(rsrcPbD); assertEquals(rsrcS, rsrcD); - assertEquals(createResource(), rsrcS.getLocalResource(0)); - assertEquals(createResource(), rsrcD.getLocalResource(0)); + assertEquals(createResource(), rsrcS.getResourceSpecs().get(0).getResource()); + assertEquals(createResource(), rsrcD.getResourceSpecs().get(0).getResource()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/MockLocalizerHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/MockLocalizerHeartbeatResponse.java index ca4739400de..1fcf5bf4cfd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/MockLocalizerHeartbeatResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/MockLocalizerHeartbeatResponse.java @@ -20,7 +20,7 @@ import java.util.ArrayList; import java.util.List; -import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse; @@ -28,28 +28,30 @@ public class MockLocalizerHeartbeatResponse implements LocalizerHeartbeatResponse { LocalizerAction action; - List rsrc; + List resourceSpecs; MockLocalizerHeartbeatResponse() { - rsrc = new ArrayList(); + resourceSpecs = new ArrayList(); } MockLocalizerHeartbeatResponse( - LocalizerAction action, List rsrc) { + LocalizerAction action, List resources) { this.action = action; - this.rsrc = rsrc; + this.resourceSpecs = resources; } public LocalizerAction getLocalizerAction() { return action; } - public List getAllResources() { return rsrc; } - public LocalResource getLocalResource(int i) { return rsrc.get(i); } public void setLocalizerAction(LocalizerAction action) { this.action = action; } - public void addAllResources(List resources) { - rsrc.addAll(resources); - } - public void addResource(LocalResource resource) { rsrc.add(resource); } - public void removeResource(int index) { rsrc.remove(index); } - public void clearResources() { rsrc.clear(); } + + @Override + public List getResourceSpecs() { + return resourceSpecs; +} + + @Override + public void setResourceSpecs(List resourceSpecs) { + this.resourceSpecs = resourceSpecs; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java index f16c68c11ef..69501f71153 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java @@ -50,7 +50,6 @@ import org.apache.hadoop.fs.AbstractFileSystem; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileContext; -import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.DataInputBuffer; @@ -66,9 +65,11 @@ import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol; +import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.Test; import org.mockito.ArgumentMatcher; import org.mockito.invocation.InvocationOnMock; @@ -95,12 +96,33 @@ public class TestContainerLocalizer { public void testContainerLocalizerMain() throws Exception { ContainerLocalizer localizer = setupContainerLocalizerForTest(); + // verify created cache + List privCacheList = new ArrayList(); + List appCacheList = new ArrayList(); + for (Path p : localDirs) { + Path base = new Path(new Path(p, ContainerLocalizer.USERCACHE), appUser); + Path privcache = new Path(base, ContainerLocalizer.FILECACHE); + privCacheList.add(privcache); + Path appDir = + new Path(base, new Path(ContainerLocalizer.APPCACHE, appId)); + Path appcache = new Path(appDir, ContainerLocalizer.FILECACHE); + appCacheList.add(appcache); + } + // mock heartbeat responses from NM - LocalResource rsrcA = getMockRsrc(random, LocalResourceVisibility.PRIVATE); - LocalResource rsrcB = getMockRsrc(random, LocalResourceVisibility.PRIVATE); - LocalResource rsrcC = getMockRsrc(random, - LocalResourceVisibility.APPLICATION); - LocalResource rsrcD = getMockRsrc(random, LocalResourceVisibility.PRIVATE); + ResourceLocalizationSpec rsrcA = + getMockRsrc(random, LocalResourceVisibility.PRIVATE, + privCacheList.get(0)); + ResourceLocalizationSpec rsrcB = + getMockRsrc(random, LocalResourceVisibility.PRIVATE, + privCacheList.get(0)); + ResourceLocalizationSpec rsrcC = + getMockRsrc(random, LocalResourceVisibility.APPLICATION, + appCacheList.get(0)); + ResourceLocalizationSpec rsrcD = + getMockRsrc(random, LocalResourceVisibility.PRIVATE, + privCacheList.get(0)); + when(nmProxy.heartbeat(isA(LocalizerStatus.class))) .thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE, Collections.singletonList(rsrcA))) @@ -111,27 +133,33 @@ public void testContainerLocalizerMain() throws Exception { .thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE, Collections.singletonList(rsrcD))) .thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE, - Collections.emptyList())) + Collections.emptyList())) .thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.DIE, null)); - doReturn(new FakeDownload(rsrcA.getResource().getFile(), true)).when( - localizer).download(isA(LocalDirAllocator.class), eq(rsrcA), + LocalResource tRsrcA = rsrcA.getResource(); + LocalResource tRsrcB = rsrcB.getResource(); + LocalResource tRsrcC = rsrcC.getResource(); + LocalResource tRsrcD = rsrcD.getResource(); + doReturn( + new FakeDownload(rsrcA.getResource().getResource().getFile(), true)) + .when(localizer).download(isA(Path.class), eq(tRsrcA), isA(UserGroupInformation.class)); - doReturn(new FakeDownload(rsrcB.getResource().getFile(), true)).when( - localizer).download(isA(LocalDirAllocator.class), eq(rsrcB), + doReturn( + new FakeDownload(rsrcB.getResource().getResource().getFile(), true)) + .when(localizer).download(isA(Path.class), eq(tRsrcB), isA(UserGroupInformation.class)); - doReturn(new FakeDownload(rsrcC.getResource().getFile(), true)).when( - localizer).download(isA(LocalDirAllocator.class), eq(rsrcC), + doReturn( + new FakeDownload(rsrcC.getResource().getResource().getFile(), true)) + .when(localizer).download(isA(Path.class), eq(tRsrcC), isA(UserGroupInformation.class)); - doReturn(new FakeDownload(rsrcD.getResource().getFile(), true)).when( - localizer).download(isA(LocalDirAllocator.class), eq(rsrcD), + doReturn( + new FakeDownload(rsrcD.getResource().getResource().getFile(), true)) + .when(localizer).download(isA(Path.class), eq(tRsrcD), isA(UserGroupInformation.class)); // run localization assertEquals(0, localizer.runLocalization(nmAddr)); - - // verify created cache for (Path p : localDirs) { Path base = new Path(new Path(p, ContainerLocalizer.USERCACHE), appUser); Path privcache = new Path(base, ContainerLocalizer.FILECACHE); @@ -147,15 +175,14 @@ public void testContainerLocalizerMain() throws Exception { Path appcacheAfsPath = new Path(appcache.toUri().getPath()); verify(spylfs).mkdir(eq(appcacheAfsPath), isA(FsPermission.class), eq(false)); } - // verify tokens read at expected location verify(spylfs).open(tokenPath); // verify downloaded resources reported to NM - verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcA))); - verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcB))); - verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcC))); - verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcD))); + verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcA.getResource()))); + verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcB.getResource()))); + verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcC.getResource()))); + verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcD.getResource()))); // verify all HB use localizerID provided verify(nmProxy, never()).heartbeat(argThat( @@ -310,10 +337,12 @@ public LocalizerStatus answer(InvocationOnMock invoc) return mockRF; } - static LocalResource getMockRsrc(Random r, - LocalResourceVisibility vis) { - LocalResource rsrc = mock(LocalResource.class); + static ResourceLocalizationSpec getMockRsrc(Random r, + LocalResourceVisibility vis, Path p) { + ResourceLocalizationSpec resourceLocalizationSpec = + mock(ResourceLocalizationSpec.class); + LocalResource rsrc = mock(LocalResource.class); String name = Long.toHexString(r.nextLong()); URL uri = mock(org.apache.hadoop.yarn.api.records.URL.class); when(uri.getScheme()).thenReturn("file"); @@ -326,7 +355,10 @@ static LocalResource getMockRsrc(Random r, when(rsrc.getType()).thenReturn(LocalResourceType.FILE); when(rsrc.getVisibility()).thenReturn(vis); - return rsrc; + when(resourceLocalizationSpec.getResource()).thenReturn(rsrc); + when(resourceLocalizationSpec.getDestinationDirectory()). + thenReturn(ConverterUtils.getYarnUrlFromPath(p)); + return resourceLocalizationSpec; } @SuppressWarnings({ "rawtypes", "unchecked" }) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index 7ca2c91e3c7..51f2ca3bf02 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyLong; @@ -35,6 +36,7 @@ import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.times; import java.io.IOException; import java.net.InetSocketAddress; @@ -375,7 +377,7 @@ public void testResourceRelease() throws Exception { } } - @Test + @Test( timeout = 10000) @SuppressWarnings("unchecked") // mocked generics public void testLocalizationHeartbeat() throws Exception { Configuration conf = new YarnConfiguration(); @@ -386,12 +388,17 @@ public void testLocalizationHeartbeat() throws Exception { isA(Path.class), isA(FsPermission.class), anyBoolean()); List localDirs = new ArrayList(); - String[] sDirs = new String[4]; - for (int i = 0; i < 4; ++i) { - localDirs.add(lfs.makeQualified(new Path(basedir, i + ""))); - sDirs[i] = localDirs.get(i).toString(); - } + String[] sDirs = new String[1]; + // Making sure that we have only one local disk so that it will only be + // selected for consecutive resource localization calls. This is required + // to test LocalCacheDirectoryManager. + localDirs.add(lfs.makeQualified(new Path(basedir, 0 + ""))); + sDirs[0] = localDirs.get(0).toString(); + conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs); + // Adding configuration to make sure there is only one file per + // directory + conf.set(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, "37"); String logDir = lfs.makeQualified(new Path(basedir, "logdir " )).toString(); conf.set(YarnConfiguration.NM_LOG_DIRS, logDir); DrainDispatcher dispatcher = new DrainDispatcher(); @@ -452,12 +459,23 @@ public boolean matches(Object o) { doReturn(out).when(spylfs).createInternal(isA(Path.class), isA(EnumSet.class), isA(FsPermission.class), anyInt(), anyShort(), anyLong(), isA(Progressable.class), isA(ChecksumOpt.class), anyBoolean()); - final LocalResource resource = getPrivateMockedResource(r); - final LocalResourceRequest req = new LocalResourceRequest(resource); + final LocalResource resource1 = getPrivateMockedResource(r); + LocalResource resource2 = null; + do { + resource2 = getPrivateMockedResource(r); + } while (resource2 == null || resource2.equals(resource1)); + // above call to make sure we don't get identical resources. + + final LocalResourceRequest req1 = new LocalResourceRequest(resource1); + final LocalResourceRequest req2 = new LocalResourceRequest(resource2); Map> rsrcs = new HashMap>(); - rsrcs.put(LocalResourceVisibility.PRIVATE, Collections.singletonList(req)); + List privateResourceList = + new ArrayList(); + privateResourceList.add(req1); + privateResourceList.add(req2); + rsrcs.put(LocalResourceVisibility.PRIVATE, privateResourceList); spyService.handle(new ContainerLocalizationRequestEvent(c, rsrcs)); // Sigh. Thread init of private localizer not accessible Thread.sleep(1000); @@ -471,33 +489,64 @@ public boolean matches(Object o) { Path localizationTokenPath = tokenPathCaptor.getValue(); // heartbeat from localizer - LocalResourceStatus rsrcStat = mock(LocalResourceStatus.class); + LocalResourceStatus rsrcStat1 = mock(LocalResourceStatus.class); + LocalResourceStatus rsrcStat2 = mock(LocalResourceStatus.class); LocalizerStatus stat = mock(LocalizerStatus.class); when(stat.getLocalizerId()).thenReturn(ctnrStr); - when(rsrcStat.getResource()).thenReturn(resource); - when(rsrcStat.getLocalSize()).thenReturn(4344L); + when(rsrcStat1.getResource()).thenReturn(resource1); + when(rsrcStat2.getResource()).thenReturn(resource2); + when(rsrcStat1.getLocalSize()).thenReturn(4344L); + when(rsrcStat2.getLocalSize()).thenReturn(2342L); URL locPath = getPath("/cache/private/blah"); - when(rsrcStat.getLocalPath()).thenReturn(locPath); - when(rsrcStat.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS); + when(rsrcStat1.getLocalPath()).thenReturn(locPath); + when(rsrcStat2.getLocalPath()).thenReturn(locPath); + when(rsrcStat1.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS); + when(rsrcStat2.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS); when(stat.getResources()) .thenReturn(Collections.emptyList()) - .thenReturn(Collections.singletonList(rsrcStat)) + .thenReturn(Collections.singletonList(rsrcStat1)) + .thenReturn(Collections.singletonList(rsrcStat2)) .thenReturn(Collections.emptyList()); - // get rsrc + String localPath = Path.SEPARATOR + ContainerLocalizer.USERCACHE + + Path.SEPARATOR + "user0" + Path.SEPARATOR + + ContainerLocalizer.FILECACHE; + + // get first resource LocalizerHeartbeatResponse response = spyService.heartbeat(stat); assertEquals(LocalizerAction.LIVE, response.getLocalizerAction()); - assertEquals(req, new LocalResourceRequest(response.getLocalResource(0))); + assertEquals(1, response.getResourceSpecs().size()); + assertEquals(req1, + new LocalResourceRequest(response.getResourceSpecs().get(0).getResource())); + URL localizedPath = + response.getResourceSpecs().get(0).getDestinationDirectory(); + assertTrue(localizedPath.getFile().endsWith(localPath)); + + // get second resource + response = spyService.heartbeat(stat); + assertEquals(LocalizerAction.LIVE, response.getLocalizerAction()); + assertEquals(1, response.getResourceSpecs().size()); + assertEquals(req2, new LocalResourceRequest(response.getResourceSpecs() + .get(0).getResource())); + localizedPath = + response.getResourceSpecs().get(0).getDestinationDirectory(); + // Resource's destination path should be now inside sub directory 0 as + // LocalCacheDirectoryManager will be used and we have restricted number + // of files per directory to 1. + assertTrue(localizedPath.getFile().endsWith( + localPath + Path.SEPARATOR + "0")); // empty rsrc response = spyService.heartbeat(stat); assertEquals(LocalizerAction.LIVE, response.getLocalizerAction()); - assertEquals(0, response.getAllResources().size()); + assertEquals(0, response.getResourceSpecs().size()); // get shutdown response = spyService.heartbeat(stat); assertEquals(LocalizerAction.DIE, response.getLocalizerAction()); + + dispatcher.await(); // verify container notification ArgumentMatcher matchesContainerLoc = new ArgumentMatcher() { @@ -508,9 +557,9 @@ public boolean matches(Object o) { && c.getContainerID() == evt.getContainerID(); } }; - dispatcher.await(); - verify(containerBus).handle(argThat(matchesContainerLoc)); - + // total 2 resource localzation calls. one for each resource. + verify(containerBus, times(2)).handle(argThat(matchesContainerLoc)); + // Verify deletion of localization token. verify(delService).delete((String)isNull(), eq(localizationTokenPath)); } finally {