From ff1b13ded58b640ace4a6ccb6c5a3565ecdc6485 Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Wed, 12 Nov 2014 09:31:05 -0800 Subject: [PATCH] YARN-2236. [YARN-1492] Shared Cache uploader service on the Node Manager. (Chris Trezzo and Sanjin Lee via kasha) (cherry picked from commit a04143039e7fe310d807f40584633096181cfada) --- hadoop-yarn-project/CHANGES.txt | 3 + .../yarn/api/records/LocalResource.java | 38 +++ .../hadoop/yarn/conf/YarnConfiguration.java | 19 ++ .../src/main/proto/yarn_protos.proto | 1 + .../records/impl/pb/LocalResourcePBImpl.java | 20 ++ .../yarn/sharedcache/ChecksumSHA256Impl.java | 37 +++ .../yarn/sharedcache/SharedCacheChecksum.java | 43 +++ .../SharedCacheChecksumFactory.java | 84 +++++ .../apache/hadoop/yarn/util/FSDownload.java | 5 +- .../src/main/resources/yarn-default.xml | 18 ++ .../yarn/server/utils/BuilderUtils.java | 10 +- .../ContainerManagerImpl.java | 13 + .../container/ContainerImpl.java | 72 ++++- .../localizer/LocalResourceRequest.java | 11 + .../sharedcache/SharedCacheUploadEvent.java | 58 ++++ .../SharedCacheUploadEventType.java | 28 ++ .../sharedcache/SharedCacheUploadService.java | 126 ++++++++ .../sharedcache/SharedCacheUploader.java | 289 ++++++++++++++++++ .../container/TestContainer.java | 2 +- .../TestResourceLocalizationService.java | 2 +- .../TestSharedCacheUploadService.java | 50 +++ .../sharedcache/TestSharedCacheUploader.java | 241 +++++++++++++++ 22 files changed, 1158 insertions(+), 12 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/ChecksumSHA256Impl.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/SharedCacheChecksum.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/SharedCacheChecksumFactory.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadEvent.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadEventType.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadService.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploader.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/TestSharedCacheUploadService.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/TestSharedCacheUploader.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index f9a00fafafd..b2549a93883 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -18,6 +18,9 @@ Release 2.7.0 - UNRELEASED YARN-2186. [YARN-1492] Node Manager uploader service for cache manager. (Chris Trezzo and Sangjin Lee via kasha) + YARN-2236. [YARN-1492] Shared Cache uploader service on the Node + Manager. (Chris Trezzo and Sanjin Lee via kasha) + IMPROVEMENTS YARN-1979. TestDirectoryCollection fails when the umask is unusual. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java index f14a136d30d..726d9699c27 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.api.records; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.util.Records; @@ -48,6 +49,14 @@ public abstract class LocalResource { public static LocalResource newInstance(URL url, LocalResourceType type, LocalResourceVisibility visibility, long size, long timestamp, String pattern) { + return newInstance(url, type, visibility, size, timestamp, pattern, false); + } + + @Public + @Unstable + public static LocalResource newInstance(URL url, LocalResourceType type, + LocalResourceVisibility visibility, long size, long timestamp, + String pattern, boolean shouldBeUploadedToSharedCache) { LocalResource resource = Records.newRecord(LocalResource.class); resource.setResource(url); resource.setType(type); @@ -55,6 +64,7 @@ public abstract class LocalResource { resource.setSize(size); resource.setTimestamp(timestamp); resource.setPattern(pattern); + resource.setShouldBeUploadedToSharedCache(shouldBeUploadedToSharedCache); return resource; } @@ -65,6 +75,15 @@ public abstract class LocalResource { return newInstance(url, type, visibility, size, timestamp, null); } + @Public + @Unstable + public static LocalResource newInstance(URL url, LocalResourceType type, + LocalResourceVisibility visibility, long size, long timestamp, + boolean shouldBeUploadedToSharedCache) { + return newInstance(url, type, visibility, size, timestamp, null, + shouldBeUploadedToSharedCache); + } + /** * Get the location of the resource to be localized. * @return location of the resource to be localized @@ -170,4 +189,23 @@ public abstract class LocalResource { @Public @Stable public abstract void setPattern(String pattern); + + /** + * NM uses it to decide whether if it is necessary to upload the resource to + * the shared cache + */ + @Public + @Unstable + public abstract boolean getShouldBeUploadedToSharedCache(); + + /** + * Inform NM whether upload to SCM is needed. + * + * @param shouldBeUploadedToSharedCache shouldBeUploadedToSharedCache + * of this request + */ + @Public + @Unstable + public abstract void setShouldBeUploadedToSharedCache( + boolean shouldBeUploadedToSharedCache); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 7168068d0da..fada0ea1c14 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1472,6 +1472,25 @@ public class YarnConfiguration extends Configuration { SHARED_CACHE_PREFIX + "uploader.server.thread-count"; public static final int DEFAULT_SCM_UPLOADER_SERVER_THREAD_COUNT = 50; + /** the checksum algorithm implementation **/ + public static final String SHARED_CACHE_CHECKSUM_ALGO_IMPL = + SHARED_CACHE_PREFIX + "checksum.algo.impl"; + public static final String DEFAULT_SHARED_CACHE_CHECKSUM_ALGO_IMPL = + "org.apache.hadoop.yarn.sharedcache.ChecksumSHA256Impl"; + + // node manager (uploader) configs + /** + * The replication factor for the node manager uploader for the shared cache. + */ + public static final String SHARED_CACHE_NM_UPLOADER_REPLICATION_FACTOR = + SHARED_CACHE_PREFIX + "nm.uploader.replication.factor"; + public static final int DEFAULT_SHARED_CACHE_NM_UPLOADER_REPLICATION_FACTOR = + 10; + + public static final String SHARED_CACHE_NM_UPLOADER_THREAD_COUNT = + SHARED_CACHE_PREFIX + "nm.uploader.thread-count"; + public static final int DEFAULT_SHARED_CACHE_NM_UPLOADER_THREAD_COUNT = 20; + //////////////////////////////// // Other Configs //////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 5c86c2dfca5..c4e756d81c4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -159,6 +159,7 @@ message LocalResourceProto { optional LocalResourceTypeProto type = 4; optional LocalResourceVisibilityProto visibility = 5; optional string pattern = 6; + optional bool should_be_uploaded_to_shared_cache = 7; } message ApplicationResourceUsageReportProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java index 16bd59740d2..560b081c016 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LocalResourcePBImpl.java @@ -192,6 +192,26 @@ public class LocalResourcePBImpl extends LocalResource { builder.setPattern(pattern); } + @Override + public synchronized boolean getShouldBeUploadedToSharedCache() { + LocalResourceProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasShouldBeUploadedToSharedCache()) { + return false; + } + return p.getShouldBeUploadedToSharedCache(); + } + + @Override + public synchronized void setShouldBeUploadedToSharedCache( + boolean shouldBeUploadedToSharedCache) { + maybeInitBuilder(); + if (!shouldBeUploadedToSharedCache) { + builder.clearShouldBeUploadedToSharedCache(); + return; + } + builder.setShouldBeUploadedToSharedCache(shouldBeUploadedToSharedCache); + } + private LocalResourceTypeProto convertToProtoFormat(LocalResourceType e) { return ProtoUtils.convertToProtoFormat(e); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/ChecksumSHA256Impl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/ChecksumSHA256Impl.java new file mode 100644 index 00000000000..24ceeae248c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/ChecksumSHA256Impl.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sharedcache; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +@Private +@Evolving +/** + * The SHA-256 implementation of the shared cache checksum interface. + */ +public class ChecksumSHA256Impl implements SharedCacheChecksum { + public String computeChecksum(InputStream in) throws IOException { + return DigestUtils.sha256Hex(in); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/SharedCacheChecksum.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/SharedCacheChecksum.java new file mode 100644 index 00000000000..7e6fddaa263 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/SharedCacheChecksum.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sharedcache; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +@Public +@Evolving +/** + * An interface to calculate a checksum for a resource in the shared cache. The + * checksum implementation should be thread safe. + */ +public interface SharedCacheChecksum { + + /** + * Calculate the checksum of the passed input stream. + * + * @param in InputStream to be checksumed + * @return the message digest of the input stream + * @throws IOException + */ + public String computeChecksum(InputStream in) throws IOException; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/SharedCacheChecksumFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/SharedCacheChecksumFactory.java new file mode 100644 index 00000000000..cbfd95db5b9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/sharedcache/SharedCacheChecksumFactory.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sharedcache; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; + +@SuppressWarnings("unchecked") +@Public +@Evolving +/** + * A factory class for creating checksum objects based on a configurable + * algorithm implementation + */ +public class SharedCacheChecksumFactory { + private static final + ConcurrentMap,SharedCacheChecksum> + instances = + new ConcurrentHashMap, + SharedCacheChecksum>(); + + private static final Class defaultAlgorithm; + + static { + try { + defaultAlgorithm = (Class) + Class.forName( + YarnConfiguration.DEFAULT_SHARED_CACHE_CHECKSUM_ALGO_IMPL); + } catch (Exception e) { + // cannot happen + throw new ExceptionInInitializerError(e); + } + } + + /** + * Get a new SharedCacheChecksum object based on the configurable + * algorithm implementation + * (see yarn.sharedcache.checksum.algo.impl) + * + * @return SharedCacheChecksum object + */ + public static SharedCacheChecksum getChecksum(Configuration conf) { + Class clazz = + conf.getClass(YarnConfiguration.SHARED_CACHE_CHECKSUM_ALGO_IMPL, + defaultAlgorithm, SharedCacheChecksum.class); + SharedCacheChecksum checksum = instances.get(clazz); + if (checksum == null) { + try { + checksum = ReflectionUtils.newInstance(clazz, conf); + SharedCacheChecksum old = instances.putIfAbsent(clazz, checksum); + if (old != null) { + checksum = old; + } + } catch (Exception e) { + throw new YarnRuntimeException(e); + } + } + + return checksum; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java index 2737cce196b..436cb312938 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java @@ -32,6 +32,7 @@ import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; @@ -134,8 +135,8 @@ public class FSDownload implements Callable { * @return true if the path in the current path is visible to all, false * otherwise */ - @VisibleForTesting - static boolean isPublic(FileSystem fs, Path current, FileStatus sStat, + @Private + public static boolean isPublic(FileSystem fs, Path current, FileStatus sStat, LoadingCache> statCache) throws IOException { current = fs.makeQualified(current); //the leaf level file should be readable by others diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index bae77ed8d18..73aa81634e0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1457,6 +1457,24 @@ 50 + + The algorithm used to compute checksums of files (SHA-256 by default) + yarn.sharedcache.checksum.algo.impl + org.apache.hadoop.yarn.sharedcache.ChecksumSHA256Impl + + + + The replication factor for the node manager uploader for the shared cache (10 by default) + yarn.sharedcache.nm.uploader.replication.factor + 10 + + + + The number of threads used to upload files from a node manager instance (20 by default) + yarn.sharedcache.nm.uploader.thread-count + 20 + + The interval that the yarn client library uses to poll the diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java index a7e5d9cd820..1b326716af8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java @@ -70,6 +70,7 @@ import com.google.common.annotations.VisibleForTesting; * Builder utilities to construct various objects. * */ +@Private public class BuilderUtils { private static final RecordFactory recordFactory = RecordFactoryProvider @@ -94,7 +95,8 @@ public class BuilderUtils { } public static LocalResource newLocalResource(URL url, LocalResourceType type, - LocalResourceVisibility visibility, long size, long timestamp) { + LocalResourceVisibility visibility, long size, long timestamp, + boolean shouldBeUploadedToSharedCache) { LocalResource resource = recordFactory.newRecordInstance(LocalResource.class); resource.setResource(url); @@ -102,14 +104,15 @@ public class BuilderUtils { resource.setVisibility(visibility); resource.setSize(size); resource.setTimestamp(timestamp); + resource.setShouldBeUploadedToSharedCache(shouldBeUploadedToSharedCache); return resource; } public static LocalResource newLocalResource(URI uri, LocalResourceType type, LocalResourceVisibility visibility, long size, - long timestamp) { + long timestamp, boolean shouldBeUploadedToSharedCache) { return newLocalResource(ConverterUtils.getYarnUrlFromURI(uri), type, - visibility, size, timestamp); + visibility, size, timestamp, shouldBeUploadedToSharedCache); } public static ApplicationId newApplicationId(RecordFactory recordFactory, @@ -245,7 +248,6 @@ public class BuilderUtils { return newToken(Token.class, identifier, kind, password, service); } - @Private @VisibleForTesting public static Token newContainerToken(NodeId nodeId, byte[] password, ContainerTokenIdentifier tokenIdentifier) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 35b232fea3d..bb277d94b8b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -119,6 +119,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.Conta import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.NonAggregatingLogHandler; @@ -227,6 +229,13 @@ public class ContainerManagerImpl extends CompositeService implements addIfService(logHandler); dispatcher.register(LogHandlerEventType.class, logHandler); + // add the shared cache upload service (it will do nothing if the shared + // cache is disabled) + SharedCacheUploadService sharedCacheUploader = + createSharedCacheUploaderService(); + addService(sharedCacheUploader); + dispatcher.register(SharedCacheUploadEventType.class, sharedCacheUploader); + waitForContainersOnShutdownMillis = conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS) + @@ -367,6 +376,10 @@ public class ContainerManagerImpl extends CompositeService implements deletionContext, dirsHandler, context); } + protected SharedCacheUploadService createSharedCacheUploaderService() { + return new SharedCacheUploadService(); + } + protected ContainersLauncher createContainersLauncher(Context context, ContainerExecutor exec) { return new ContainersLauncher(context, this.dispatcher, exec, dirsHandler, this); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index fa54ee19b9e..6b65a544bb4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -27,6 +27,7 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -59,6 +60,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.Conta import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent; @@ -104,6 +107,10 @@ public class ContainerImpl implements Container { new ArrayList(); private final List appRsrcs = new ArrayList(); + private final Map resourcesToBeUploaded = + new ConcurrentHashMap(); + private final Map resourcesUploadPolicies = + new ConcurrentHashMap(); // whether container has been recovered after a restart private RecoveredContainerStatus recoveredStatus = @@ -637,6 +644,8 @@ public class ContainerImpl implements Container { container.pendingResources.put(req, links); } links.add(rsrc.getKey()); + storeSharedCacheUploadPolicy(container, req, rsrc.getValue() + .getShouldBeUploadedToSharedCache()); switch (rsrc.getValue().getVisibility()) { case PUBLIC: container.publicRsrcs.add(req); @@ -685,31 +694,77 @@ public class ContainerImpl implements Container { } } + /** + * Store the resource's shared cache upload policies + * Given LocalResourceRequest can be shared across containers in + * LocalResourcesTrackerImpl, we preserve the upload policies here. + * In addition, it is possible for the application to create several + * "identical" LocalResources as part of + * ContainerLaunchContext.setLocalResources with different symlinks. + * There is a corner case where these "identical" local resources have + * different upload policies. For that scenario, upload policy will be set to + * true as long as there is at least one LocalResource entry with + * upload policy set to true. + */ + private static void storeSharedCacheUploadPolicy(ContainerImpl container, + LocalResourceRequest resourceRequest, Boolean uploadPolicy) { + Boolean storedUploadPolicy = + container.resourcesUploadPolicies.get(resourceRequest); + if (storedUploadPolicy == null || (!storedUploadPolicy && uploadPolicy)) { + container.resourcesUploadPolicies.put(resourceRequest, uploadPolicy); + } + } + /** * Transition when one of the requested resources for this container * has been successfully localized. */ static class LocalizedTransition implements MultipleArcTransition { + @SuppressWarnings("unchecked") @Override public ContainerState transition(ContainerImpl container, ContainerEvent event) { ContainerResourceLocalizedEvent rsrcEvent = (ContainerResourceLocalizedEvent) event; - List syms = - container.pendingResources.remove(rsrcEvent.getResource()); + LocalResourceRequest resourceRequest = rsrcEvent.getResource(); + Path location = rsrcEvent.getLocation(); + List syms = container.pendingResources.remove(resourceRequest); if (null == syms) { - LOG.warn("Localized unknown resource " + rsrcEvent.getResource() + + LOG.warn("Localized unknown resource " + resourceRequest + " for container " + container.containerId); assert false; // fail container? return ContainerState.LOCALIZING; } - container.localizedResources.put(rsrcEvent.getLocation(), syms); + container.localizedResources.put(location, syms); + + // check to see if this resource should be uploaded to the shared cache + // as well + if (shouldBeUploadedToSharedCache(container, resourceRequest)) { + container.resourcesToBeUploaded.put(resourceRequest, location); + } if (!container.pendingResources.isEmpty()) { return ContainerState.LOCALIZING; } container.sendLaunchEvent(); + + // If this is a recovered container that has already launched, skip + // uploading resources to the shared cache. We do this to avoid uploading + // the same resources multiple times. The tradeoff is that in the case of + // a recovered container, there is a chance that resources don't get + // uploaded into the shared cache. This is OK because resources are not + // acknowledged by the SCM until they have been uploaded by the node + // manager. + if (container.recoveredStatus != RecoveredContainerStatus.LAUNCHED + && container.recoveredStatus != RecoveredContainerStatus.COMPLETED) { + // kick off uploads to the shared cache + container.dispatcher.getEventHandler().handle( + new SharedCacheUploadEvent(container.resourcesToBeUploaded, container + .getLaunchContext(), container.getUser(), + SharedCacheUploadEventType.UPLOAD)); + } + container.metrics.endInitingContainer(); return ContainerState.LOCALIZED; } @@ -1018,4 +1073,13 @@ public class ContainerImpl implements Container { private boolean hasDefaultExitCode() { return (this.exitCode == ContainerExitStatus.INVALID); } + + /** + * Returns whether the specific resource should be uploaded to the shared + * cache. + */ + private static boolean shouldBeUploadedToSharedCache(ContainerImpl container, + LocalResourceRequest resource) { + return container.resourcesUploadPolicies.get(resource); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java index 70bead7320a..607d0b40866 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java @@ -151,6 +151,17 @@ public class LocalResourceRequest return pattern; } + @Override + public boolean getShouldBeUploadedToSharedCache() { + throw new UnsupportedOperationException(); + } + + @Override + public void setShouldBeUploadedToSharedCache( + boolean shouldBeUploadedToSharedCache) { + throw new UnsupportedOperationException(); + } + @Override public void setResource(URL resource) { throw new UnsupportedOperationException(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadEvent.java new file mode 100644 index 00000000000..2be080e846d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadEvent.java @@ -0,0 +1,58 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache; + +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.event.AbstractEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest; + +@Private +@Unstable +public class SharedCacheUploadEvent extends + AbstractEvent { + private final Map resources; + private final ContainerLaunchContext context; + private final String user; + + public SharedCacheUploadEvent(Map resources, + ContainerLaunchContext context, String user, + SharedCacheUploadEventType eventType) { + super(eventType); + this.resources = resources; + this.context = context; + this.user = user; + } + + public Map getResources() { + return resources; + } + + public ContainerLaunchContext getContainerLaunchContext() { + return context; + } + + public String getUser() { + return user; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadEventType.java new file mode 100644 index 00000000000..5ba7e1b6b18 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadEventType.java @@ -0,0 +1,28 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +@Private +@Unstable +public enum SharedCacheUploadEventType { + UPLOAD +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadService.java new file mode 100644 index 00000000000..cb11f99c553 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadService.java @@ -0,0 +1,126 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.server.api.SCMUploaderProtocol; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +@Private +@Unstable +/** + * Service that uploads localized files to the shared cache. The upload is + * considered not critical, and is done on a best-effort basis. Failure to + * upload is not fatal. + */ +public class SharedCacheUploadService extends AbstractService implements + EventHandler { + private static final Log LOG = + LogFactory.getLog(SharedCacheUploadService.class); + + private boolean enabled; + private FileSystem fs; + private FileSystem localFs; + private ExecutorService uploaderPool; + private SCMUploaderProtocol scmClient; + + public SharedCacheUploadService() { + super(SharedCacheUploadService.class.getName()); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + enabled = conf.getBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, + YarnConfiguration.DEFAULT_SHARED_CACHE_ENABLED); + if (enabled) { + int threadCount = + conf.getInt(YarnConfiguration.SHARED_CACHE_NM_UPLOADER_THREAD_COUNT, + YarnConfiguration.DEFAULT_SHARED_CACHE_NM_UPLOADER_THREAD_COUNT); + uploaderPool = Executors.newFixedThreadPool(threadCount, + new ThreadFactoryBuilder(). + setNameFormat("Shared cache uploader #%d"). + build()); + scmClient = createSCMClient(conf); + try { + fs = FileSystem.get(conf); + localFs = FileSystem.getLocal(conf); + } catch (IOException e) { + LOG.error("Unexpected exception in getting the filesystem", e); + throw new RuntimeException(e); + } + } + super.serviceInit(conf); + } + + private SCMUploaderProtocol createSCMClient(Configuration conf) { + YarnRPC rpc = YarnRPC.create(conf); + InetSocketAddress scmAddress = + conf.getSocketAddr(YarnConfiguration.SCM_UPLOADER_SERVER_ADDRESS, + YarnConfiguration.DEFAULT_SCM_UPLOADER_SERVER_ADDRESS, + YarnConfiguration.DEFAULT_SCM_UPLOADER_SERVER_PORT); + return (SCMUploaderProtocol)rpc.getProxy( + SCMUploaderProtocol.class, scmAddress, conf); + } + + @Override + protected void serviceStop() throws Exception { + if (enabled) { + uploaderPool.shutdown(); + RPC.stopProxy(scmClient); + } + super.serviceStop(); + } + + @Override + public void handle(SharedCacheUploadEvent event) { + if (enabled) { + Map resources = event.getResources(); + for (Map.Entry e: resources.entrySet()) { + SharedCacheUploader uploader = + new SharedCacheUploader(e.getKey(), e.getValue(), event.getUser(), + getConfig(), scmClient, fs, localFs); + // fire off an upload task + uploaderPool.submit(uploader); + } + } + } + + public boolean isEnabled() { + return enabled; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploader.java new file mode 100644 index 00000000000..050d5315b4f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploader.java @@ -0,0 +1,289 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache; + +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.UndeclaredThrowableException; +import java.net.URISyntaxException; +import java.util.Random; +import java.util.concurrent.Callable; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.api.SCMUploaderProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyRequest; +import org.apache.hadoop.yarn.server.sharedcache.SharedCacheUtil; +import org.apache.hadoop.yarn.sharedcache.SharedCacheChecksum; +import org.apache.hadoop.yarn.sharedcache.SharedCacheChecksumFactory; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.FSDownload; + +import com.google.common.annotations.VisibleForTesting; + +/** + * The callable class that handles the actual upload to the shared cache. + */ +class SharedCacheUploader implements Callable { + // rwxr-xr-x + static final FsPermission DIRECTORY_PERMISSION = + new FsPermission((short)00755); + // r-xr-xr-x + static final FsPermission FILE_PERMISSION = + new FsPermission((short)00555); + + private static final Log LOG = LogFactory.getLog(SharedCacheUploader.class); + private static final ThreadLocal randomTl = + new ThreadLocal() { + @Override + protected Random initialValue() { + return new Random(System.nanoTime()); + } + }; + + private final LocalResource resource; + private final Path localPath; + private final String user; + private final Configuration conf; + private final SCMUploaderProtocol scmClient; + private final FileSystem fs; + private final FileSystem localFs; + private final String sharedCacheRootDir; + private final int nestedLevel; + private final SharedCacheChecksum checksum; + private final RecordFactory recordFactory; + + public SharedCacheUploader(LocalResource resource, Path localPath, + String user, Configuration conf, SCMUploaderProtocol scmClient) + throws IOException { + this(resource, localPath, user, conf, scmClient, + FileSystem.get(conf), localPath.getFileSystem(conf)); + } + + /** + * @param resource the local resource that contains the original remote path + * @param localPath the path in the local filesystem where the resource is + * localized + * @param fs the filesystem of the shared cache + * @param localFs the local filesystem + */ + public SharedCacheUploader(LocalResource resource, Path localPath, + String user, Configuration conf, SCMUploaderProtocol scmClient, + FileSystem fs, FileSystem localFs) { + this.resource = resource; + this.localPath = localPath; + this.user = user; + this.conf = conf; + this.scmClient = scmClient; + this.fs = fs; + this.sharedCacheRootDir = + conf.get(YarnConfiguration.SHARED_CACHE_ROOT, + YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT); + this.nestedLevel = SharedCacheUtil.getCacheDepth(conf); + this.checksum = SharedCacheChecksumFactory.getChecksum(conf); + this.localFs = localFs; + this.recordFactory = RecordFactoryProvider.getRecordFactory(null); + } + + /** + * Uploads the file under the shared cache, and notifies the shared cache + * manager. If it is unable to upload the file because it already exists, it + * returns false. + */ + @Override + public Boolean call() throws Exception { + Path tempPath = null; + try { + if (!verifyAccess()) { + LOG.warn("User " + user + " is not authorized to upload file " + + localPath.getName()); + return false; + } + + // first determine the actual local path that will be used for upload + Path actualPath = getActualPath(); + // compute the checksum + String checksumVal = computeChecksum(actualPath); + // create the directory (if it doesn't exist) + Path directoryPath = + new Path(SharedCacheUtil.getCacheEntryPath(nestedLevel, + sharedCacheRootDir, checksumVal)); + // let's not check if the directory already exists: in the vast majority + // of the cases, the directory does not exist; as long as mkdirs does not + // error out if it exists, we should be fine + fs.mkdirs(directoryPath, DIRECTORY_PERMISSION); + // create the temporary file + tempPath = new Path(directoryPath, getTemporaryFileName(actualPath)); + if (!uploadFile(actualPath, tempPath)) { + LOG.warn("Could not copy the file to the shared cache at " + tempPath); + return false; + } + + // set the permission so that it is readable but not writable + fs.setPermission(tempPath, FILE_PERMISSION); + // rename it to the final filename + Path finalPath = new Path(directoryPath, actualPath.getName()); + if (!fs.rename(tempPath, finalPath)) { + LOG.warn("The file already exists under " + finalPath + + ". Ignoring this attempt."); + deleteTempFile(tempPath); + return false; + } + + // notify the SCM + if (!notifySharedCacheManager(checksumVal, actualPath.getName())) { + // the shared cache manager rejected the upload (as it is likely + // uploaded under a different name + // clean up this file and exit + fs.delete(finalPath, false); + return false; + } + + // set the replication factor + short replication = + (short)conf.getInt(YarnConfiguration.SHARED_CACHE_NM_UPLOADER_REPLICATION_FACTOR, + YarnConfiguration.DEFAULT_SHARED_CACHE_NM_UPLOADER_REPLICATION_FACTOR); + fs.setReplication(finalPath, replication); + LOG.info("File " + actualPath.getName() + + " was uploaded to the shared cache at " + finalPath); + return true; + } catch (IOException e) { + LOG.warn("Exception while uploading the file " + localPath.getName(), e); + // in case an exception is thrown, delete the temp file + deleteTempFile(tempPath); + throw e; + } + } + + @VisibleForTesting + Path getActualPath() throws IOException { + Path path = localPath; + FileStatus status = localFs.getFileStatus(path); + if (status != null && status.isDirectory()) { + // for certain types of resources that get unpacked, the original file may + // be found under the directory with the same name (see + // FSDownload.unpack); check if the path is a directory and if so look + // under it + path = new Path(path, path.getName()); + } + return path; + } + + private void deleteTempFile(Path tempPath) { + try { + if (tempPath != null && fs.exists(tempPath)) { + fs.delete(tempPath, false); + } + } catch (IOException ignore) {} + } + + /** + * Checks that the (original) remote file is either owned by the user who + * started the app or public. + */ + @VisibleForTesting + boolean verifyAccess() throws IOException { + // if it is in the public cache, it's trivially OK + if (resource.getVisibility() == LocalResourceVisibility.PUBLIC) { + return true; + } + + final Path remotePath; + try { + remotePath = ConverterUtils.getPathFromYarnURL(resource.getResource()); + } catch (URISyntaxException e) { + throw new IOException("Invalid resource", e); + } + + // get the file status of the HDFS file + FileSystem remoteFs = remotePath.getFileSystem(conf); + FileStatus status = remoteFs.getFileStatus(remotePath); + // check to see if the file has been modified in any way + if (status.getModificationTime() != resource.getTimestamp()) { + LOG.warn("The remote file " + remotePath + + " has changed since it's localized; will not consider it for upload"); + return false; + } + + // check for the user ownership + if (status.getOwner().equals(user)) { + return true; // the user owns the file + } + // check if the file is publicly readable otherwise + return fileIsPublic(remotePath, remoteFs, status); + } + + @VisibleForTesting + boolean fileIsPublic(final Path remotePath, FileSystem remoteFs, + FileStatus status) throws IOException { + return FSDownload.isPublic(remoteFs, remotePath, status, null); + } + + /** + * Uploads the file to the shared cache under a temporary name, and returns + * the result. + */ + @VisibleForTesting + boolean uploadFile(Path sourcePath, Path tempPath) throws IOException { + return FileUtil.copy(localFs, sourcePath, fs, tempPath, false, conf); + } + + @VisibleForTesting + String computeChecksum(Path path) throws IOException { + InputStream is = localFs.open(path); + try { + return checksum.computeChecksum(is); + } finally { + try { is.close(); } catch (IOException ignore) {} + } + } + + private String getTemporaryFileName(Path path) { + return path.getName() + "-" + randomTl.get().nextLong(); + } + + @VisibleForTesting + boolean notifySharedCacheManager(String checksumVal, String fileName) + throws IOException { + try { + SCMUploaderNotifyRequest request = + recordFactory.newRecordInstance(SCMUploaderNotifyRequest.class); + request.setResourceKey(checksumVal); + request.setFilename(fileName); + return scmClient.notify(request).getAccepted(); + } catch (YarnException e) { + throw new IOException(e); + } catch (UndeclaredThrowableException e) { + // retrieve the cause of the exception and throw it as an IOException + throw new IOException(e.getCause() == null ? e : e.getCause()); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java index 8f7fa782996..c28d691a99d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java @@ -642,7 +642,7 @@ public class TestContainer { URL url = BuilderUtils.newURL("file", null, 0, "/local" + vis + "/" + name); LocalResource rsrc = BuilderUtils.newLocalResource(url, LocalResourceType.FILE, vis, - r.nextInt(1024) + 1024L, r.nextInt(1024) + 2048L); + r.nextInt(1024) + 1024L, r.nextInt(1024) + 2048L, false); return new SimpleEntry(name, rsrc); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index bf36651e941..1051e7acfc1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -1760,7 +1760,7 @@ public class TestResourceLocalizationService { URL url = getPath("/local/PRIVATE/" + name); LocalResource rsrc = BuilderUtils.newLocalResource(url, LocalResourceType.FILE, vis, - r.nextInt(1024) + 1024L, r.nextInt(1024) + 2048L); + r.nextInt(1024) + 1024L, r.nextInt(1024) + 2048L, false); return rsrc; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/TestSharedCacheUploadService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/TestSharedCacheUploadService.java new file mode 100644 index 00000000000..1b2b2f0715d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/TestSharedCacheUploadService.java @@ -0,0 +1,50 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache; + +import static org.junit.Assert.assertSame; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.Test; + +public class TestSharedCacheUploadService { + + @Test + public void testInitDisabled() { + testInit(false); + } + + @Test + public void testInitEnabled() { + testInit(true); + } + + public void testInit(boolean enabled) { + Configuration conf = new Configuration(); + conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, enabled); + + SharedCacheUploadService service = new SharedCacheUploadService(); + service.init(conf); + assertSame(enabled, service.isEnabled()); + + service.stop(); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/TestSharedCacheUploader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/TestSharedCacheUploader.java new file mode 100644 index 00000000000..9234c62fd55 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/TestSharedCacheUploader.java @@ -0,0 +1,241 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.SCMUploaderProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyResponse; +import org.junit.Test; + +public class TestSharedCacheUploader { + + /** + * If verifyAccess fails, the upload should fail + */ + @Test + public void testFailVerifyAccess() throws Exception { + SharedCacheUploader spied = createSpiedUploader(); + doReturn(false).when(spied).verifyAccess(); + + assertFalse(spied.call()); + } + + /** + * If rename fails, the upload should fail + */ + @Test + public void testRenameFail() throws Exception { + Configuration conf = new Configuration(); + conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true); + LocalResource resource = mock(LocalResource.class); + Path localPath = mock(Path.class); + when(localPath.getName()).thenReturn("foo.jar"); + String user = "joe"; + SCMUploaderProtocol scmClient = mock(SCMUploaderProtocol.class); + SCMUploaderNotifyResponse response = mock(SCMUploaderNotifyResponse.class); + when(response.getAccepted()).thenReturn(true); + when(scmClient.notify(isA(SCMUploaderNotifyRequest.class))). + thenReturn(response); + FileSystem fs = mock(FileSystem.class); + // return false when rename is called + when(fs.rename(isA(Path.class), isA(Path.class))).thenReturn(false); + FileSystem localFs = FileSystem.getLocal(conf); + SharedCacheUploader spied = + createSpiedUploader(resource, localPath, user, conf, scmClient, fs, + localFs); + // stub verifyAccess() to return true + doReturn(true).when(spied).verifyAccess(); + // stub getActualPath() + doReturn(localPath).when(spied).getActualPath(); + // stub computeChecksum() + doReturn("abcdef0123456789").when(spied).computeChecksum(isA(Path.class)); + // stub uploadFile() to return true + doReturn(true).when(spied).uploadFile(isA(Path.class), isA(Path.class)); + + assertFalse(spied.call()); + } + + /** + * If verifyAccess, uploadFile, rename, and notification succeed, the upload + * should succeed + */ + @Test + public void testSuccess() throws Exception { + Configuration conf = new Configuration(); + conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true); + LocalResource resource = mock(LocalResource.class); + Path localPath = mock(Path.class); + when(localPath.getName()).thenReturn("foo.jar"); + String user = "joe"; + SCMUploaderProtocol scmClient = mock(SCMUploaderProtocol.class); + SCMUploaderNotifyResponse response = mock(SCMUploaderNotifyResponse.class); + when(response.getAccepted()).thenReturn(true); + when(scmClient.notify(isA(SCMUploaderNotifyRequest.class))). + thenReturn(response); + FileSystem fs = mock(FileSystem.class); + // return false when rename is called + when(fs.rename(isA(Path.class), isA(Path.class))).thenReturn(true); + FileSystem localFs = FileSystem.getLocal(conf); + SharedCacheUploader spied = + createSpiedUploader(resource, localPath, user, conf, scmClient, fs, + localFs); + // stub verifyAccess() to return true + doReturn(true).when(spied).verifyAccess(); + // stub getActualPath() + doReturn(localPath).when(spied).getActualPath(); + // stub computeChecksum() + doReturn("abcdef0123456789").when(spied).computeChecksum(isA(Path.class)); + // stub uploadFile() to return true + doReturn(true).when(spied).uploadFile(isA(Path.class), isA(Path.class)); + // stub notifySharedCacheManager to return true + doReturn(true).when(spied).notifySharedCacheManager(isA(String.class), + isA(String.class)); + + assertTrue(spied.call()); + } + + /** + * If verifyAccess, uploadFile, and rename succed, but it receives a nay from + * SCM, the file should be deleted + */ + @Test + public void testNotifySCMFail() throws Exception { + Configuration conf = new Configuration(); + conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true); + LocalResource resource = mock(LocalResource.class); + Path localPath = mock(Path.class); + when(localPath.getName()).thenReturn("foo.jar"); + String user = "joe"; + FileSystem fs = mock(FileSystem.class); + // return false when rename is called + when(fs.rename(isA(Path.class), isA(Path.class))).thenReturn(true); + FileSystem localFs = FileSystem.getLocal(conf); + SharedCacheUploader spied = + createSpiedUploader(resource, localPath, user, conf, null, fs, + localFs); + // stub verifyAccess() to return true + doReturn(true).when(spied).verifyAccess(); + // stub getActualPath() + doReturn(localPath).when(spied).getActualPath(); + // stub computeChecksum() + doReturn("abcdef0123456789").when(spied).computeChecksum(isA(Path.class)); + // stub uploadFile() to return true + doReturn(true).when(spied).uploadFile(isA(Path.class), isA(Path.class)); + // stub notifySharedCacheManager to return true + doReturn(false).when(spied).notifySharedCacheManager(isA(String.class), + isA(String.class)); + + assertFalse(spied.call()); + verify(fs).delete(isA(Path.class), anyBoolean()); + } + + /** + * If resource is public, verifyAccess should succeed + */ + @Test + public void testVerifyAccessPublicResource() throws Exception { + Configuration conf = new Configuration(); + conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true); + LocalResource resource = mock(LocalResource.class); + // give public visibility + when(resource.getVisibility()).thenReturn(LocalResourceVisibility.PUBLIC); + Path localPath = mock(Path.class); + when(localPath.getName()).thenReturn("foo.jar"); + String user = "joe"; + SCMUploaderProtocol scmClient = mock(SCMUploaderProtocol.class); + FileSystem fs = mock(FileSystem.class); + FileSystem localFs = FileSystem.getLocal(conf); + SharedCacheUploader spied = + createSpiedUploader(resource, localPath, user, conf, scmClient, fs, + localFs); + + assertTrue(spied.verifyAccess()); + } + + /** + * If the localPath does not exists, getActualPath should get to one level + * down + */ + @Test + public void testGetActualPath() throws Exception { + Configuration conf = new Configuration(); + conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true); + LocalResource resource = mock(LocalResource.class); + // give public visibility + when(resource.getVisibility()).thenReturn(LocalResourceVisibility.PUBLIC); + Path localPath = new Path("foo.jar"); + String user = "joe"; + SCMUploaderProtocol scmClient = mock(SCMUploaderProtocol.class); + FileSystem fs = mock(FileSystem.class); + FileSystem localFs = mock(FileSystem.class); + // stub it to return a status that indicates a directory + FileStatus status = mock(FileStatus.class); + when(status.isDirectory()).thenReturn(true); + when(localFs.getFileStatus(localPath)).thenReturn(status); + SharedCacheUploader spied = + createSpiedUploader(resource, localPath, user, conf, scmClient, fs, + localFs); + + Path actualPath = spied.getActualPath(); + assertEquals(actualPath.getName(), localPath.getName()); + assertEquals(actualPath.getParent().getName(), localPath.getName()); + } + + private SharedCacheUploader createSpiedUploader() throws IOException { + Configuration conf = new Configuration(); + conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true); + LocalResource resource = mock(LocalResource.class); + Path localPath = mock(Path.class); + String user = "foo"; + SCMUploaderProtocol scmClient = mock(SCMUploaderProtocol.class); + FileSystem fs = FileSystem.get(conf); + FileSystem localFs = FileSystem.getLocal(conf); + return createSpiedUploader(resource, localPath, user, conf, scmClient, fs, + localFs); + } + + private SharedCacheUploader createSpiedUploader(LocalResource resource, Path localPath, + String user, Configuration conf, SCMUploaderProtocol scmClient, + FileSystem fs, FileSystem localFs) + throws IOException { + SharedCacheUploader uploader = new SharedCacheUploader(resource, localPath, user, conf, scmClient, + fs, localFs); + return spy(uploader); + } +}