YARN-2236. [YARN-1492] Shared Cache uploader service on the Node Manager. (Chris Trezzo and Sanjin Lee via kasha)

(cherry picked from commit a04143039e)
This commit is contained in:
Karthik Kambatla 2014-11-12 09:31:05 -08:00
parent 6984d899e3
commit ff1b13ded5
22 changed files with 1158 additions and 12 deletions

View File

@ -18,6 +18,9 @@ Release 2.7.0 - UNRELEASED
YARN-2186. [YARN-1492] Node Manager uploader service for cache manager.
(Chris Trezzo and Sangjin Lee via kasha)
YARN-2236. [YARN-1492] Shared Cache uploader service on the Node
Manager. (Chris Trezzo and Sanjin Lee via kasha)
IMPROVEMENTS
YARN-1979. TestDirectoryCollection fails when the umask is unusual.

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.api.records;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.util.Records;
@ -48,6 +49,14 @@ public abstract class LocalResource {
public static LocalResource newInstance(URL url, LocalResourceType type,
LocalResourceVisibility visibility, long size, long timestamp,
String pattern) {
return newInstance(url, type, visibility, size, timestamp, pattern, false);
}
@Public
@Unstable
public static LocalResource newInstance(URL url, LocalResourceType type,
LocalResourceVisibility visibility, long size, long timestamp,
String pattern, boolean shouldBeUploadedToSharedCache) {
LocalResource resource = Records.newRecord(LocalResource.class);
resource.setResource(url);
resource.setType(type);
@ -55,6 +64,7 @@ public abstract class LocalResource {
resource.setSize(size);
resource.setTimestamp(timestamp);
resource.setPattern(pattern);
resource.setShouldBeUploadedToSharedCache(shouldBeUploadedToSharedCache);
return resource;
}
@ -65,6 +75,15 @@ public abstract class LocalResource {
return newInstance(url, type, visibility, size, timestamp, null);
}
@Public
@Unstable
public static LocalResource newInstance(URL url, LocalResourceType type,
LocalResourceVisibility visibility, long size, long timestamp,
boolean shouldBeUploadedToSharedCache) {
return newInstance(url, type, visibility, size, timestamp, null,
shouldBeUploadedToSharedCache);
}
/**
* Get the <em>location</em> of the resource to be localized.
* @return <em>location</em> of the resource to be localized
@ -170,4 +189,23 @@ public abstract class LocalResource {
@Public
@Stable
public abstract void setPattern(String pattern);
/**
* NM uses it to decide whether if it is necessary to upload the resource to
* the shared cache
*/
@Public
@Unstable
public abstract boolean getShouldBeUploadedToSharedCache();
/**
* Inform NM whether upload to SCM is needed.
*
* @param shouldBeUploadedToSharedCache <em>shouldBeUploadedToSharedCache</em>
* of this request
*/
@Public
@Unstable
public abstract void setShouldBeUploadedToSharedCache(
boolean shouldBeUploadedToSharedCache);
}

View File

@ -1472,6 +1472,25 @@ public class YarnConfiguration extends Configuration {
SHARED_CACHE_PREFIX + "uploader.server.thread-count";
public static final int DEFAULT_SCM_UPLOADER_SERVER_THREAD_COUNT = 50;
/** the checksum algorithm implementation **/
public static final String SHARED_CACHE_CHECKSUM_ALGO_IMPL =
SHARED_CACHE_PREFIX + "checksum.algo.impl";
public static final String DEFAULT_SHARED_CACHE_CHECKSUM_ALGO_IMPL =
"org.apache.hadoop.yarn.sharedcache.ChecksumSHA256Impl";
// node manager (uploader) configs
/**
* The replication factor for the node manager uploader for the shared cache.
*/
public static final String SHARED_CACHE_NM_UPLOADER_REPLICATION_FACTOR =
SHARED_CACHE_PREFIX + "nm.uploader.replication.factor";
public static final int DEFAULT_SHARED_CACHE_NM_UPLOADER_REPLICATION_FACTOR =
10;
public static final String SHARED_CACHE_NM_UPLOADER_THREAD_COUNT =
SHARED_CACHE_PREFIX + "nm.uploader.thread-count";
public static final int DEFAULT_SHARED_CACHE_NM_UPLOADER_THREAD_COUNT = 20;
////////////////////////////////
// Other Configs
////////////////////////////////

View File

@ -159,6 +159,7 @@ message LocalResourceProto {
optional LocalResourceTypeProto type = 4;
optional LocalResourceVisibilityProto visibility = 5;
optional string pattern = 6;
optional bool should_be_uploaded_to_shared_cache = 7;
}
message ApplicationResourceUsageReportProto {

View File

@ -192,6 +192,26 @@ public class LocalResourcePBImpl extends LocalResource {
builder.setPattern(pattern);
}
@Override
public synchronized boolean getShouldBeUploadedToSharedCache() {
LocalResourceProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasShouldBeUploadedToSharedCache()) {
return false;
}
return p.getShouldBeUploadedToSharedCache();
}
@Override
public synchronized void setShouldBeUploadedToSharedCache(
boolean shouldBeUploadedToSharedCache) {
maybeInitBuilder();
if (!shouldBeUploadedToSharedCache) {
builder.clearShouldBeUploadedToSharedCache();
return;
}
builder.setShouldBeUploadedToSharedCache(shouldBeUploadedToSharedCache);
}
private LocalResourceTypeProto convertToProtoFormat(LocalResourceType e) {
return ProtoUtils.convertToProtoFormat(e);
}

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.sharedcache;
import java.io.IOException;
import java.io.InputStream;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
@Private
@Evolving
/**
* The SHA-256 implementation of the shared cache checksum interface.
*/
public class ChecksumSHA256Impl implements SharedCacheChecksum {
public String computeChecksum(InputStream in) throws IOException {
return DigestUtils.sha256Hex(in);
}
}

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.sharedcache;
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
@Public
@Evolving
/**
* An interface to calculate a checksum for a resource in the shared cache. The
* checksum implementation should be thread safe.
*/
public interface SharedCacheChecksum {
/**
* Calculate the checksum of the passed input stream.
*
* @param in <code>InputStream</code> to be checksumed
* @return the message digest of the input stream
* @throws IOException
*/
public String computeChecksum(InputStream in) throws IOException;
}

View File

@ -0,0 +1,84 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.sharedcache;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@SuppressWarnings("unchecked")
@Public
@Evolving
/**
* A factory class for creating checksum objects based on a configurable
* algorithm implementation
*/
public class SharedCacheChecksumFactory {
private static final
ConcurrentMap<Class<? extends SharedCacheChecksum>,SharedCacheChecksum>
instances =
new ConcurrentHashMap<Class<? extends SharedCacheChecksum>,
SharedCacheChecksum>();
private static final Class<? extends SharedCacheChecksum> defaultAlgorithm;
static {
try {
defaultAlgorithm = (Class<? extends SharedCacheChecksum>)
Class.forName(
YarnConfiguration.DEFAULT_SHARED_CACHE_CHECKSUM_ALGO_IMPL);
} catch (Exception e) {
// cannot happen
throw new ExceptionInInitializerError(e);
}
}
/**
* Get a new <code>SharedCacheChecksum</code> object based on the configurable
* algorithm implementation
* (see <code>yarn.sharedcache.checksum.algo.impl</code>)
*
* @return <code>SharedCacheChecksum</code> object
*/
public static SharedCacheChecksum getChecksum(Configuration conf) {
Class<? extends SharedCacheChecksum> clazz =
conf.getClass(YarnConfiguration.SHARED_CACHE_CHECKSUM_ALGO_IMPL,
defaultAlgorithm, SharedCacheChecksum.class);
SharedCacheChecksum checksum = instances.get(clazz);
if (checksum == null) {
try {
checksum = ReflectionUtils.newInstance(clazz, conf);
SharedCacheChecksum old = instances.putIfAbsent(clazz, checksum);
if (old != null) {
checksum = old;
}
} catch (Exception e) {
throw new YarnRuntimeException(e);
}
}
return checksum;
}
}

View File

@ -32,6 +32,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
@ -134,8 +135,8 @@ public class FSDownload implements Callable<Path> {
* @return true if the path in the current path is visible to all, false
* otherwise
*/
@VisibleForTesting
static boolean isPublic(FileSystem fs, Path current, FileStatus sStat,
@Private
public static boolean isPublic(FileSystem fs, Path current, FileStatus sStat,
LoadingCache<Path,Future<FileStatus>> statCache) throws IOException {
current = fs.makeQualified(current);
//the leaf level file should be readable by others

View File

@ -1457,6 +1457,24 @@
<value>50</value>
</property>
<property>
<description>The algorithm used to compute checksums of files (SHA-256 by default)</description>
<name>yarn.sharedcache.checksum.algo.impl</name>
<value>org.apache.hadoop.yarn.sharedcache.ChecksumSHA256Impl</value>
</property>
<property>
<description>The replication factor for the node manager uploader for the shared cache (10 by default)</description>
<name>yarn.sharedcache.nm.uploader.replication.factor</name>
<value>10</value>
</property>
<property>
<description>The number of threads used to upload files from a node manager instance (20 by default)</description>
<name>yarn.sharedcache.nm.uploader.thread-count</name>
<value>20</value>
</property>
<!-- Other configuration -->
<property>
<description>The interval that the yarn client library uses to poll the

View File

@ -70,6 +70,7 @@ import com.google.common.annotations.VisibleForTesting;
* Builder utilities to construct various objects.
*
*/
@Private
public class BuilderUtils {
private static final RecordFactory recordFactory = RecordFactoryProvider
@ -94,7 +95,8 @@ public class BuilderUtils {
}
public static LocalResource newLocalResource(URL url, LocalResourceType type,
LocalResourceVisibility visibility, long size, long timestamp) {
LocalResourceVisibility visibility, long size, long timestamp,
boolean shouldBeUploadedToSharedCache) {
LocalResource resource =
recordFactory.newRecordInstance(LocalResource.class);
resource.setResource(url);
@ -102,14 +104,15 @@ public class BuilderUtils {
resource.setVisibility(visibility);
resource.setSize(size);
resource.setTimestamp(timestamp);
resource.setShouldBeUploadedToSharedCache(shouldBeUploadedToSharedCache);
return resource;
}
public static LocalResource newLocalResource(URI uri,
LocalResourceType type, LocalResourceVisibility visibility, long size,
long timestamp) {
long timestamp, boolean shouldBeUploadedToSharedCache) {
return newLocalResource(ConverterUtils.getYarnUrlFromURI(uri), type,
visibility, size, timestamp);
visibility, size, timestamp, shouldBeUploadedToSharedCache);
}
public static ApplicationId newApplicationId(RecordFactory recordFactory,
@ -245,7 +248,6 @@ public class BuilderUtils {
return newToken(Token.class, identifier, kind, password, service);
}
@Private
@VisibleForTesting
public static Token newContainerToken(NodeId nodeId,
byte[] password, ContainerTokenIdentifier tokenIdentifier) {

View File

@ -119,6 +119,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.Conta
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.NonAggregatingLogHandler;
@ -227,6 +229,13 @@ public class ContainerManagerImpl extends CompositeService implements
addIfService(logHandler);
dispatcher.register(LogHandlerEventType.class, logHandler);
// add the shared cache upload service (it will do nothing if the shared
// cache is disabled)
SharedCacheUploadService sharedCacheUploader =
createSharedCacheUploaderService();
addService(sharedCacheUploader);
dispatcher.register(SharedCacheUploadEventType.class, sharedCacheUploader);
waitForContainersOnShutdownMillis =
conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS) +
@ -367,6 +376,10 @@ public class ContainerManagerImpl extends CompositeService implements
deletionContext, dirsHandler, context);
}
protected SharedCacheUploadService createSharedCacheUploaderService() {
return new SharedCacheUploadService();
}
protected ContainersLauncher createContainersLauncher(Context context,
ContainerExecutor exec) {
return new ContainersLauncher(context, this.dispatcher, exec, dirsHandler, this);

View File

@ -27,6 +27,7 @@ import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -59,6 +60,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.Conta
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent;
@ -104,6 +107,10 @@ public class ContainerImpl implements Container {
new ArrayList<LocalResourceRequest>();
private final List<LocalResourceRequest> appRsrcs =
new ArrayList<LocalResourceRequest>();
private final Map<LocalResourceRequest, Path> resourcesToBeUploaded =
new ConcurrentHashMap<LocalResourceRequest, Path>();
private final Map<LocalResourceRequest, Boolean> resourcesUploadPolicies =
new ConcurrentHashMap<LocalResourceRequest, Boolean>();
// whether container has been recovered after a restart
private RecoveredContainerStatus recoveredStatus =
@ -637,6 +644,8 @@ public class ContainerImpl implements Container {
container.pendingResources.put(req, links);
}
links.add(rsrc.getKey());
storeSharedCacheUploadPolicy(container, req, rsrc.getValue()
.getShouldBeUploadedToSharedCache());
switch (rsrc.getValue().getVisibility()) {
case PUBLIC:
container.publicRsrcs.add(req);
@ -685,31 +694,77 @@ public class ContainerImpl implements Container {
}
}
/**
* Store the resource's shared cache upload policies
* Given LocalResourceRequest can be shared across containers in
* LocalResourcesTrackerImpl, we preserve the upload policies here.
* In addition, it is possible for the application to create several
* "identical" LocalResources as part of
* ContainerLaunchContext.setLocalResources with different symlinks.
* There is a corner case where these "identical" local resources have
* different upload policies. For that scenario, upload policy will be set to
* true as long as there is at least one LocalResource entry with
* upload policy set to true.
*/
private static void storeSharedCacheUploadPolicy(ContainerImpl container,
LocalResourceRequest resourceRequest, Boolean uploadPolicy) {
Boolean storedUploadPolicy =
container.resourcesUploadPolicies.get(resourceRequest);
if (storedUploadPolicy == null || (!storedUploadPolicy && uploadPolicy)) {
container.resourcesUploadPolicies.put(resourceRequest, uploadPolicy);
}
}
/**
* Transition when one of the requested resources for this container
* has been successfully localized.
*/
static class LocalizedTransition implements
MultipleArcTransition<ContainerImpl,ContainerEvent,ContainerState> {
@SuppressWarnings("unchecked")
@Override
public ContainerState transition(ContainerImpl container,
ContainerEvent event) {
ContainerResourceLocalizedEvent rsrcEvent = (ContainerResourceLocalizedEvent) event;
List<String> syms =
container.pendingResources.remove(rsrcEvent.getResource());
LocalResourceRequest resourceRequest = rsrcEvent.getResource();
Path location = rsrcEvent.getLocation();
List<String> syms = container.pendingResources.remove(resourceRequest);
if (null == syms) {
LOG.warn("Localized unknown resource " + rsrcEvent.getResource() +
LOG.warn("Localized unknown resource " + resourceRequest +
" for container " + container.containerId);
assert false;
// fail container?
return ContainerState.LOCALIZING;
}
container.localizedResources.put(rsrcEvent.getLocation(), syms);
container.localizedResources.put(location, syms);
// check to see if this resource should be uploaded to the shared cache
// as well
if (shouldBeUploadedToSharedCache(container, resourceRequest)) {
container.resourcesToBeUploaded.put(resourceRequest, location);
}
if (!container.pendingResources.isEmpty()) {
return ContainerState.LOCALIZING;
}
container.sendLaunchEvent();
// If this is a recovered container that has already launched, skip
// uploading resources to the shared cache. We do this to avoid uploading
// the same resources multiple times. The tradeoff is that in the case of
// a recovered container, there is a chance that resources don't get
// uploaded into the shared cache. This is OK because resources are not
// acknowledged by the SCM until they have been uploaded by the node
// manager.
if (container.recoveredStatus != RecoveredContainerStatus.LAUNCHED
&& container.recoveredStatus != RecoveredContainerStatus.COMPLETED) {
// kick off uploads to the shared cache
container.dispatcher.getEventHandler().handle(
new SharedCacheUploadEvent(container.resourcesToBeUploaded, container
.getLaunchContext(), container.getUser(),
SharedCacheUploadEventType.UPLOAD));
}
container.metrics.endInitingContainer();
return ContainerState.LOCALIZED;
}
@ -1018,4 +1073,13 @@ public class ContainerImpl implements Container {
private boolean hasDefaultExitCode() {
return (this.exitCode == ContainerExitStatus.INVALID);
}
/**
* Returns whether the specific resource should be uploaded to the shared
* cache.
*/
private static boolean shouldBeUploadedToSharedCache(ContainerImpl container,
LocalResourceRequest resource) {
return container.resourcesUploadPolicies.get(resource);
}
}

View File

@ -151,6 +151,17 @@ public class LocalResourceRequest
return pattern;
}
@Override
public boolean getShouldBeUploadedToSharedCache() {
throw new UnsupportedOperationException();
}
@Override
public void setShouldBeUploadedToSharedCache(
boolean shouldBeUploadedToSharedCache) {
throw new UnsupportedOperationException();
}
@Override
public void setResource(URL resource) {
throw new UnsupportedOperationException();

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.event.AbstractEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
@Private
@Unstable
public class SharedCacheUploadEvent extends
AbstractEvent<SharedCacheUploadEventType> {
private final Map<LocalResourceRequest,Path> resources;
private final ContainerLaunchContext context;
private final String user;
public SharedCacheUploadEvent(Map<LocalResourceRequest,Path> resources,
ContainerLaunchContext context, String user,
SharedCacheUploadEventType eventType) {
super(eventType);
this.resources = resources;
this.context = context;
this.user = user;
}
public Map<LocalResourceRequest,Path> getResources() {
return resources;
}
public ContainerLaunchContext getContainerLaunchContext() {
return context;
}
public String getUser() {
return user;
}
}

View File

@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
@Private
@Unstable
public enum SharedCacheUploadEventType {
UPLOAD
}

View File

@ -0,0 +1,126 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.api.SCMUploaderProtocol;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@Private
@Unstable
/**
* Service that uploads localized files to the shared cache. The upload is
* considered not critical, and is done on a best-effort basis. Failure to
* upload is not fatal.
*/
public class SharedCacheUploadService extends AbstractService implements
EventHandler<SharedCacheUploadEvent> {
private static final Log LOG =
LogFactory.getLog(SharedCacheUploadService.class);
private boolean enabled;
private FileSystem fs;
private FileSystem localFs;
private ExecutorService uploaderPool;
private SCMUploaderProtocol scmClient;
public SharedCacheUploadService() {
super(SharedCacheUploadService.class.getName());
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
enabled = conf.getBoolean(YarnConfiguration.SHARED_CACHE_ENABLED,
YarnConfiguration.DEFAULT_SHARED_CACHE_ENABLED);
if (enabled) {
int threadCount =
conf.getInt(YarnConfiguration.SHARED_CACHE_NM_UPLOADER_THREAD_COUNT,
YarnConfiguration.DEFAULT_SHARED_CACHE_NM_UPLOADER_THREAD_COUNT);
uploaderPool = Executors.newFixedThreadPool(threadCount,
new ThreadFactoryBuilder().
setNameFormat("Shared cache uploader #%d").
build());
scmClient = createSCMClient(conf);
try {
fs = FileSystem.get(conf);
localFs = FileSystem.getLocal(conf);
} catch (IOException e) {
LOG.error("Unexpected exception in getting the filesystem", e);
throw new RuntimeException(e);
}
}
super.serviceInit(conf);
}
private SCMUploaderProtocol createSCMClient(Configuration conf) {
YarnRPC rpc = YarnRPC.create(conf);
InetSocketAddress scmAddress =
conf.getSocketAddr(YarnConfiguration.SCM_UPLOADER_SERVER_ADDRESS,
YarnConfiguration.DEFAULT_SCM_UPLOADER_SERVER_ADDRESS,
YarnConfiguration.DEFAULT_SCM_UPLOADER_SERVER_PORT);
return (SCMUploaderProtocol)rpc.getProxy(
SCMUploaderProtocol.class, scmAddress, conf);
}
@Override
protected void serviceStop() throws Exception {
if (enabled) {
uploaderPool.shutdown();
RPC.stopProxy(scmClient);
}
super.serviceStop();
}
@Override
public void handle(SharedCacheUploadEvent event) {
if (enabled) {
Map<LocalResourceRequest,Path> resources = event.getResources();
for (Map.Entry<LocalResourceRequest,Path> e: resources.entrySet()) {
SharedCacheUploader uploader =
new SharedCacheUploader(e.getKey(), e.getValue(), event.getUser(),
getConfig(), scmClient, fs, localFs);
// fire off an upload task
uploaderPool.submit(uploader);
}
}
}
public boolean isEnabled() {
return enabled;
}
}

View File

@ -0,0 +1,289 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.URISyntaxException;
import java.util.Random;
import java.util.concurrent.Callable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.SCMUploaderProtocol;
import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyRequest;
import org.apache.hadoop.yarn.server.sharedcache.SharedCacheUtil;
import org.apache.hadoop.yarn.sharedcache.SharedCacheChecksum;
import org.apache.hadoop.yarn.sharedcache.SharedCacheChecksumFactory;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.FSDownload;
import com.google.common.annotations.VisibleForTesting;
/**
* The callable class that handles the actual upload to the shared cache.
*/
class SharedCacheUploader implements Callable<Boolean> {
// rwxr-xr-x
static final FsPermission DIRECTORY_PERMISSION =
new FsPermission((short)00755);
// r-xr-xr-x
static final FsPermission FILE_PERMISSION =
new FsPermission((short)00555);
private static final Log LOG = LogFactory.getLog(SharedCacheUploader.class);
private static final ThreadLocal<Random> randomTl =
new ThreadLocal<Random>() {
@Override
protected Random initialValue() {
return new Random(System.nanoTime());
}
};
private final LocalResource resource;
private final Path localPath;
private final String user;
private final Configuration conf;
private final SCMUploaderProtocol scmClient;
private final FileSystem fs;
private final FileSystem localFs;
private final String sharedCacheRootDir;
private final int nestedLevel;
private final SharedCacheChecksum checksum;
private final RecordFactory recordFactory;
public SharedCacheUploader(LocalResource resource, Path localPath,
String user, Configuration conf, SCMUploaderProtocol scmClient)
throws IOException {
this(resource, localPath, user, conf, scmClient,
FileSystem.get(conf), localPath.getFileSystem(conf));
}
/**
* @param resource the local resource that contains the original remote path
* @param localPath the path in the local filesystem where the resource is
* localized
* @param fs the filesystem of the shared cache
* @param localFs the local filesystem
*/
public SharedCacheUploader(LocalResource resource, Path localPath,
String user, Configuration conf, SCMUploaderProtocol scmClient,
FileSystem fs, FileSystem localFs) {
this.resource = resource;
this.localPath = localPath;
this.user = user;
this.conf = conf;
this.scmClient = scmClient;
this.fs = fs;
this.sharedCacheRootDir =
conf.get(YarnConfiguration.SHARED_CACHE_ROOT,
YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT);
this.nestedLevel = SharedCacheUtil.getCacheDepth(conf);
this.checksum = SharedCacheChecksumFactory.getChecksum(conf);
this.localFs = localFs;
this.recordFactory = RecordFactoryProvider.getRecordFactory(null);
}
/**
* Uploads the file under the shared cache, and notifies the shared cache
* manager. If it is unable to upload the file because it already exists, it
* returns false.
*/
@Override
public Boolean call() throws Exception {
Path tempPath = null;
try {
if (!verifyAccess()) {
LOG.warn("User " + user + " is not authorized to upload file " +
localPath.getName());
return false;
}
// first determine the actual local path that will be used for upload
Path actualPath = getActualPath();
// compute the checksum
String checksumVal = computeChecksum(actualPath);
// create the directory (if it doesn't exist)
Path directoryPath =
new Path(SharedCacheUtil.getCacheEntryPath(nestedLevel,
sharedCacheRootDir, checksumVal));
// let's not check if the directory already exists: in the vast majority
// of the cases, the directory does not exist; as long as mkdirs does not
// error out if it exists, we should be fine
fs.mkdirs(directoryPath, DIRECTORY_PERMISSION);
// create the temporary file
tempPath = new Path(directoryPath, getTemporaryFileName(actualPath));
if (!uploadFile(actualPath, tempPath)) {
LOG.warn("Could not copy the file to the shared cache at " + tempPath);
return false;
}
// set the permission so that it is readable but not writable
fs.setPermission(tempPath, FILE_PERMISSION);
// rename it to the final filename
Path finalPath = new Path(directoryPath, actualPath.getName());
if (!fs.rename(tempPath, finalPath)) {
LOG.warn("The file already exists under " + finalPath +
". Ignoring this attempt.");
deleteTempFile(tempPath);
return false;
}
// notify the SCM
if (!notifySharedCacheManager(checksumVal, actualPath.getName())) {
// the shared cache manager rejected the upload (as it is likely
// uploaded under a different name
// clean up this file and exit
fs.delete(finalPath, false);
return false;
}
// set the replication factor
short replication =
(short)conf.getInt(YarnConfiguration.SHARED_CACHE_NM_UPLOADER_REPLICATION_FACTOR,
YarnConfiguration.DEFAULT_SHARED_CACHE_NM_UPLOADER_REPLICATION_FACTOR);
fs.setReplication(finalPath, replication);
LOG.info("File " + actualPath.getName() +
" was uploaded to the shared cache at " + finalPath);
return true;
} catch (IOException e) {
LOG.warn("Exception while uploading the file " + localPath.getName(), e);
// in case an exception is thrown, delete the temp file
deleteTempFile(tempPath);
throw e;
}
}
@VisibleForTesting
Path getActualPath() throws IOException {
Path path = localPath;
FileStatus status = localFs.getFileStatus(path);
if (status != null && status.isDirectory()) {
// for certain types of resources that get unpacked, the original file may
// be found under the directory with the same name (see
// FSDownload.unpack); check if the path is a directory and if so look
// under it
path = new Path(path, path.getName());
}
return path;
}
private void deleteTempFile(Path tempPath) {
try {
if (tempPath != null && fs.exists(tempPath)) {
fs.delete(tempPath, false);
}
} catch (IOException ignore) {}
}
/**
* Checks that the (original) remote file is either owned by the user who
* started the app or public.
*/
@VisibleForTesting
boolean verifyAccess() throws IOException {
// if it is in the public cache, it's trivially OK
if (resource.getVisibility() == LocalResourceVisibility.PUBLIC) {
return true;
}
final Path remotePath;
try {
remotePath = ConverterUtils.getPathFromYarnURL(resource.getResource());
} catch (URISyntaxException e) {
throw new IOException("Invalid resource", e);
}
// get the file status of the HDFS file
FileSystem remoteFs = remotePath.getFileSystem(conf);
FileStatus status = remoteFs.getFileStatus(remotePath);
// check to see if the file has been modified in any way
if (status.getModificationTime() != resource.getTimestamp()) {
LOG.warn("The remote file " + remotePath +
" has changed since it's localized; will not consider it for upload");
return false;
}
// check for the user ownership
if (status.getOwner().equals(user)) {
return true; // the user owns the file
}
// check if the file is publicly readable otherwise
return fileIsPublic(remotePath, remoteFs, status);
}
@VisibleForTesting
boolean fileIsPublic(final Path remotePath, FileSystem remoteFs,
FileStatus status) throws IOException {
return FSDownload.isPublic(remoteFs, remotePath, status, null);
}
/**
* Uploads the file to the shared cache under a temporary name, and returns
* the result.
*/
@VisibleForTesting
boolean uploadFile(Path sourcePath, Path tempPath) throws IOException {
return FileUtil.copy(localFs, sourcePath, fs, tempPath, false, conf);
}
@VisibleForTesting
String computeChecksum(Path path) throws IOException {
InputStream is = localFs.open(path);
try {
return checksum.computeChecksum(is);
} finally {
try { is.close(); } catch (IOException ignore) {}
}
}
private String getTemporaryFileName(Path path) {
return path.getName() + "-" + randomTl.get().nextLong();
}
@VisibleForTesting
boolean notifySharedCacheManager(String checksumVal, String fileName)
throws IOException {
try {
SCMUploaderNotifyRequest request =
recordFactory.newRecordInstance(SCMUploaderNotifyRequest.class);
request.setResourceKey(checksumVal);
request.setFilename(fileName);
return scmClient.notify(request).getAccepted();
} catch (YarnException e) {
throw new IOException(e);
} catch (UndeclaredThrowableException e) {
// retrieve the cause of the exception and throw it as an IOException
throw new IOException(e.getCause() == null ? e : e.getCause());
}
}
}

View File

@ -642,7 +642,7 @@ public class TestContainer {
URL url = BuilderUtils.newURL("file", null, 0, "/local" + vis + "/" + name);
LocalResource rsrc =
BuilderUtils.newLocalResource(url, LocalResourceType.FILE, vis,
r.nextInt(1024) + 1024L, r.nextInt(1024) + 2048L);
r.nextInt(1024) + 1024L, r.nextInt(1024) + 2048L, false);
return new SimpleEntry<String, LocalResource>(name, rsrc);
}

View File

@ -1760,7 +1760,7 @@ public class TestResourceLocalizationService {
URL url = getPath("/local/PRIVATE/" + name);
LocalResource rsrc =
BuilderUtils.newLocalResource(url, LocalResourceType.FILE, vis,
r.nextInt(1024) + 1024L, r.nextInt(1024) + 2048L);
r.nextInt(1024) + 1024L, r.nextInt(1024) + 2048L, false);
return rsrc;
}

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache;
import static org.junit.Assert.assertSame;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.Test;
public class TestSharedCacheUploadService {
@Test
public void testInitDisabled() {
testInit(false);
}
@Test
public void testInitEnabled() {
testInit(true);
}
public void testInit(boolean enabled) {
Configuration conf = new Configuration();
conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, enabled);
SharedCacheUploadService service = new SharedCacheUploadService();
service.init(conf);
assertSame(enabled, service.isEnabled());
service.stop();
}
}

View File

@ -0,0 +1,241 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.SCMUploaderProtocol;
import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyResponse;
import org.junit.Test;
public class TestSharedCacheUploader {
/**
* If verifyAccess fails, the upload should fail
*/
@Test
public void testFailVerifyAccess() throws Exception {
SharedCacheUploader spied = createSpiedUploader();
doReturn(false).when(spied).verifyAccess();
assertFalse(spied.call());
}
/**
* If rename fails, the upload should fail
*/
@Test
public void testRenameFail() throws Exception {
Configuration conf = new Configuration();
conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true);
LocalResource resource = mock(LocalResource.class);
Path localPath = mock(Path.class);
when(localPath.getName()).thenReturn("foo.jar");
String user = "joe";
SCMUploaderProtocol scmClient = mock(SCMUploaderProtocol.class);
SCMUploaderNotifyResponse response = mock(SCMUploaderNotifyResponse.class);
when(response.getAccepted()).thenReturn(true);
when(scmClient.notify(isA(SCMUploaderNotifyRequest.class))).
thenReturn(response);
FileSystem fs = mock(FileSystem.class);
// return false when rename is called
when(fs.rename(isA(Path.class), isA(Path.class))).thenReturn(false);
FileSystem localFs = FileSystem.getLocal(conf);
SharedCacheUploader spied =
createSpiedUploader(resource, localPath, user, conf, scmClient, fs,
localFs);
// stub verifyAccess() to return true
doReturn(true).when(spied).verifyAccess();
// stub getActualPath()
doReturn(localPath).when(spied).getActualPath();
// stub computeChecksum()
doReturn("abcdef0123456789").when(spied).computeChecksum(isA(Path.class));
// stub uploadFile() to return true
doReturn(true).when(spied).uploadFile(isA(Path.class), isA(Path.class));
assertFalse(spied.call());
}
/**
* If verifyAccess, uploadFile, rename, and notification succeed, the upload
* should succeed
*/
@Test
public void testSuccess() throws Exception {
Configuration conf = new Configuration();
conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true);
LocalResource resource = mock(LocalResource.class);
Path localPath = mock(Path.class);
when(localPath.getName()).thenReturn("foo.jar");
String user = "joe";
SCMUploaderProtocol scmClient = mock(SCMUploaderProtocol.class);
SCMUploaderNotifyResponse response = mock(SCMUploaderNotifyResponse.class);
when(response.getAccepted()).thenReturn(true);
when(scmClient.notify(isA(SCMUploaderNotifyRequest.class))).
thenReturn(response);
FileSystem fs = mock(FileSystem.class);
// return false when rename is called
when(fs.rename(isA(Path.class), isA(Path.class))).thenReturn(true);
FileSystem localFs = FileSystem.getLocal(conf);
SharedCacheUploader spied =
createSpiedUploader(resource, localPath, user, conf, scmClient, fs,
localFs);
// stub verifyAccess() to return true
doReturn(true).when(spied).verifyAccess();
// stub getActualPath()
doReturn(localPath).when(spied).getActualPath();
// stub computeChecksum()
doReturn("abcdef0123456789").when(spied).computeChecksum(isA(Path.class));
// stub uploadFile() to return true
doReturn(true).when(spied).uploadFile(isA(Path.class), isA(Path.class));
// stub notifySharedCacheManager to return true
doReturn(true).when(spied).notifySharedCacheManager(isA(String.class),
isA(String.class));
assertTrue(spied.call());
}
/**
* If verifyAccess, uploadFile, and rename succed, but it receives a nay from
* SCM, the file should be deleted
*/
@Test
public void testNotifySCMFail() throws Exception {
Configuration conf = new Configuration();
conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true);
LocalResource resource = mock(LocalResource.class);
Path localPath = mock(Path.class);
when(localPath.getName()).thenReturn("foo.jar");
String user = "joe";
FileSystem fs = mock(FileSystem.class);
// return false when rename is called
when(fs.rename(isA(Path.class), isA(Path.class))).thenReturn(true);
FileSystem localFs = FileSystem.getLocal(conf);
SharedCacheUploader spied =
createSpiedUploader(resource, localPath, user, conf, null, fs,
localFs);
// stub verifyAccess() to return true
doReturn(true).when(spied).verifyAccess();
// stub getActualPath()
doReturn(localPath).when(spied).getActualPath();
// stub computeChecksum()
doReturn("abcdef0123456789").when(spied).computeChecksum(isA(Path.class));
// stub uploadFile() to return true
doReturn(true).when(spied).uploadFile(isA(Path.class), isA(Path.class));
// stub notifySharedCacheManager to return true
doReturn(false).when(spied).notifySharedCacheManager(isA(String.class),
isA(String.class));
assertFalse(spied.call());
verify(fs).delete(isA(Path.class), anyBoolean());
}
/**
* If resource is public, verifyAccess should succeed
*/
@Test
public void testVerifyAccessPublicResource() throws Exception {
Configuration conf = new Configuration();
conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true);
LocalResource resource = mock(LocalResource.class);
// give public visibility
when(resource.getVisibility()).thenReturn(LocalResourceVisibility.PUBLIC);
Path localPath = mock(Path.class);
when(localPath.getName()).thenReturn("foo.jar");
String user = "joe";
SCMUploaderProtocol scmClient = mock(SCMUploaderProtocol.class);
FileSystem fs = mock(FileSystem.class);
FileSystem localFs = FileSystem.getLocal(conf);
SharedCacheUploader spied =
createSpiedUploader(resource, localPath, user, conf, scmClient, fs,
localFs);
assertTrue(spied.verifyAccess());
}
/**
* If the localPath does not exists, getActualPath should get to one level
* down
*/
@Test
public void testGetActualPath() throws Exception {
Configuration conf = new Configuration();
conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true);
LocalResource resource = mock(LocalResource.class);
// give public visibility
when(resource.getVisibility()).thenReturn(LocalResourceVisibility.PUBLIC);
Path localPath = new Path("foo.jar");
String user = "joe";
SCMUploaderProtocol scmClient = mock(SCMUploaderProtocol.class);
FileSystem fs = mock(FileSystem.class);
FileSystem localFs = mock(FileSystem.class);
// stub it to return a status that indicates a directory
FileStatus status = mock(FileStatus.class);
when(status.isDirectory()).thenReturn(true);
when(localFs.getFileStatus(localPath)).thenReturn(status);
SharedCacheUploader spied =
createSpiedUploader(resource, localPath, user, conf, scmClient, fs,
localFs);
Path actualPath = spied.getActualPath();
assertEquals(actualPath.getName(), localPath.getName());
assertEquals(actualPath.getParent().getName(), localPath.getName());
}
private SharedCacheUploader createSpiedUploader() throws IOException {
Configuration conf = new Configuration();
conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true);
LocalResource resource = mock(LocalResource.class);
Path localPath = mock(Path.class);
String user = "foo";
SCMUploaderProtocol scmClient = mock(SCMUploaderProtocol.class);
FileSystem fs = FileSystem.get(conf);
FileSystem localFs = FileSystem.getLocal(conf);
return createSpiedUploader(resource, localPath, user, conf, scmClient, fs,
localFs);
}
private SharedCacheUploader createSpiedUploader(LocalResource resource, Path localPath,
String user, Configuration conf, SCMUploaderProtocol scmClient,
FileSystem fs, FileSystem localFs)
throws IOException {
SharedCacheUploader uploader = new SharedCacheUploader(resource, localPath, user, conf, scmClient,
fs, localFs);
return spy(uploader);
}
}