diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 003e6a381c2..7950b09078b 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -8,9 +8,6 @@ Trunk (Unreleased) FSDataOutputStream.sync() and Syncable.sync(). (szetszwo) NEW FEATURES - - HADOOP-10184. Hadoop Common changes required to support HDFS ACLs. (See - breakdown of tasks below for features and contributors) IMPROVEMENTS @@ -300,41 +297,6 @@ Trunk (Unreleased) HADOOP-10044 Improve the javadoc of rpc code (sanjay Radia) - BREAKDOWN OF HADOOP-10184 SUBTASKS AND RELATED JIRAS - - HADOOP-10185. FileSystem API for ACLs. (cnauroth) - - HADOOP-10186. Remove AclReadFlag and AclWriteFlag in FileSystem API. - (Haohui Mai via cnauroth) - - HADOOP-10187. FsShell CLI: add getfacl and setfacl with minimal support for - getting and setting ACLs. (Vinay via cnauroth) - - HADOOP-10192. FileSystem#getAclStatus has incorrect JavaDocs. (cnauroth) - - HADOOP-10220. Add ACL indicator bit to FsPermission. (cnauroth) - - HADOOP-10241. Clean up output of FsShell getfacl. (Chris Nauroth via wheat9) - - HADOOP-10213. Fix bugs parsing ACL spec in FsShell setfacl. - (Vinay via cnauroth) - - HADOOP-10277. setfacl -x fails to parse ACL spec if trying to remove the - mask entry. (Vinay via cnauroth) - - HADOOP-10270. getfacl does not display effective permissions of masked - entries. (cnauroth) - - HADOOP-10344. Fix TestAclCommands after merging HADOOP-10338 patch. - (cnauroth) - - HADOOP-10352. Recursive setfacl erroneously attempts to apply default ACL to - files. (cnauroth) - - HADOOP-10354. TestWebHDFS fails after merge of HDFS-4685 to trunk. (cnauroth) - - HADOOP-10361. Correct alignment in CLI output for ACLs. (cnauroth) - OPTIMIZATIONS HADOOP-7761. Improve the performance of raw comparisons. (todd) @@ -362,6 +324,9 @@ Release 2.4.0 - UNRELEASED NEW FEATURES + HADOOP-10184. Hadoop Common changes required to support HDFS ACLs. (See + breakdown of tasks below for features and contributors) + IMPROVEMENTS HADOOP-10139. Update and improve the Single Cluster Setup document. @@ -378,6 +343,9 @@ Release 2.4.0 - UNRELEASED HADOOP-10348. Deprecate hadoop.ssl.configuration in branch-2, and remove it in trunk. (Haohui Mai via jing9) + HADOOP-9454. Support multipart uploads for s3native. (Jordan Mendelson and + Akira AJISAKA via atm) + OPTIMIZATIONS BUG FIXES @@ -413,6 +381,44 @@ Release 2.4.0 - UNRELEASED HADOOP-10070. RPC client doesn't use per-connection conf to determine server's expected Kerberos principal name. (atm) + HADOOP-10368. InputStream is not closed in VersionInfo ctor. + (Tsuyoshi OZAWA via szetszwo) + + BREAKDOWN OF HADOOP-10184 SUBTASKS AND RELATED JIRAS + + HADOOP-10185. FileSystem API for ACLs. (cnauroth) + + HADOOP-10186. Remove AclReadFlag and AclWriteFlag in FileSystem API. + (Haohui Mai via cnauroth) + + HADOOP-10187. FsShell CLI: add getfacl and setfacl with minimal support for + getting and setting ACLs. (Vinay via cnauroth) + + HADOOP-10192. FileSystem#getAclStatus has incorrect JavaDocs. (cnauroth) + + HADOOP-10220. Add ACL indicator bit to FsPermission. (cnauroth) + + HADOOP-10241. Clean up output of FsShell getfacl. (Chris Nauroth via wheat9) + + HADOOP-10213. Fix bugs parsing ACL spec in FsShell setfacl. + (Vinay via cnauroth) + + HADOOP-10277. setfacl -x fails to parse ACL spec if trying to remove the + mask entry. (Vinay via cnauroth) + + HADOOP-10270. getfacl does not display effective permissions of masked + entries. (cnauroth) + + HADOOP-10344. Fix TestAclCommands after merging HADOOP-10338 patch. + (cnauroth) + + HADOOP-10352. Recursive setfacl erroneously attempts to apply default ACL to + files. (cnauroth) + + HADOOP-10354. TestWebHDFS fails after merge of HDFS-4685 to trunk. (cnauroth) + + HADOOP-10361. Correct alignment in CLI output for ACLs. (cnauroth) + Release 2.3.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java index 49266187057..f6a88338fe0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java @@ -28,6 +28,9 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -41,10 +44,13 @@ import org.jets3t.service.S3ServiceException; import org.jets3t.service.ServiceException; import org.jets3t.service.StorageObjectsChunk; import org.jets3t.service.impl.rest.httpclient.RestS3Service; +import org.jets3t.service.model.MultipartPart; +import org.jets3t.service.model.MultipartUpload; import org.jets3t.service.model.S3Bucket; import org.jets3t.service.model.S3Object; import org.jets3t.service.model.StorageObject; import org.jets3t.service.security.AWSCredentials; +import org.jets3t.service.utils.MultipartUtils; @InterfaceAudience.Private @InterfaceStability.Unstable @@ -52,6 +58,12 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore { private S3Service s3Service; private S3Bucket bucket; + + private long multipartBlockSize; + private boolean multipartEnabled; + private long multipartCopyBlockSize; + static final long MAX_PART_SIZE = (long)5 * 1024 * 1024 * 1024; + public static final Log LOG = LogFactory.getLog(Jets3tNativeFileSystemStore.class); @@ -67,13 +79,27 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore { } catch (S3ServiceException e) { handleS3ServiceException(e); } + multipartEnabled = + conf.getBoolean("fs.s3n.multipart.uploads.enabled", false); + multipartBlockSize = Math.min( + conf.getLong("fs.s3n.multipart.uploads.block.size", 64 * 1024 * 1024), + MAX_PART_SIZE); + multipartCopyBlockSize = Math.min( + conf.getLong("fs.s3n.multipart.copy.block.size", MAX_PART_SIZE), + MAX_PART_SIZE); + bucket = new S3Bucket(uri.getHost()); } @Override public void storeFile(String key, File file, byte[] md5Hash) throws IOException { - + + if (multipartEnabled && file.length() >= multipartBlockSize) { + storeLargeFile(key, file, md5Hash); + return; + } + BufferedInputStream in = null; try { in = new BufferedInputStream(new FileInputStream(file)); @@ -98,6 +124,31 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore { } } + public void storeLargeFile(String key, File file, byte[] md5Hash) + throws IOException { + S3Object object = new S3Object(key); + object.setDataInputFile(file); + object.setContentType("binary/octet-stream"); + object.setContentLength(file.length()); + if (md5Hash != null) { + object.setMd5Hash(md5Hash); + } + + List objectsToUploadAsMultipart = + new ArrayList(); + objectsToUploadAsMultipart.add(object); + MultipartUtils mpUtils = new MultipartUtils(multipartBlockSize); + + try { + mpUtils.uploadObjects(bucket.getName(), s3Service, + objectsToUploadAsMultipart, null); + } catch (ServiceException e) { + handleServiceException(e); + } catch (Exception e) { + throw new S3Exception(e); + } + } + @Override public void storeEmptyFile(String key) throws IOException { try { @@ -152,11 +203,8 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore { } S3Object object = s3Service.getObject(bucket.getName(), key); return object.getDataInputStream(); - } catch (S3ServiceException e) { - handleS3ServiceException(key, e); - return null; //never returned - keep compiler happy } catch (ServiceException e) { - handleServiceException(e); + handleServiceException(key, e); return null; //return null if key not found } } @@ -180,11 +228,8 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore { S3Object object = s3Service.getObject(bucket, key, null, null, null, null, byteRangeStart, null); return object.getDataInputStream(); - } catch (S3ServiceException e) { - handleS3ServiceException(key, e); - return null; //never returned - keep compiler happy } catch (ServiceException e) { - handleServiceException(e); + handleServiceException(key, e); return null; //return null if key not found } } @@ -244,8 +289,16 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore { LOG.debug("Deleting key:" + key + "from bucket" + bucket.getName()); } s3Service.deleteObject(bucket, key); - } catch (S3ServiceException e) { - handleS3ServiceException(key, e); + } catch (ServiceException e) { + handleServiceException(key, e); + } + } + + public void rename(String srcKey, String dstKey) throws IOException { + try { + s3Service.renameObject(bucket.getName(), srcKey, new S3Object(dstKey)); + } catch (ServiceException e) { + handleServiceException(e); } } @@ -255,10 +308,52 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore { if(LOG.isDebugEnabled()) { LOG.debug("Copying srcKey: " + srcKey + "to dstKey: " + dstKey + "in bucket: " + bucket.getName()); } + if (multipartEnabled) { + S3Object object = s3Service.getObjectDetails(bucket, srcKey, null, + null, null, null); + if (multipartCopyBlockSize > 0 && + object.getContentLength() > multipartCopyBlockSize) { + copyLargeFile(object, dstKey); + return; + } + } s3Service.copyObject(bucket.getName(), srcKey, bucket.getName(), new S3Object(dstKey), false); - } catch (S3ServiceException e) { - handleS3ServiceException(srcKey, e); + } catch (ServiceException e) { + handleServiceException(srcKey, e); + } + } + + public void copyLargeFile(S3Object srcObject, String dstKey) throws IOException { + try { + long partCount = srcObject.getContentLength() / multipartCopyBlockSize + + (srcObject.getContentLength() % multipartCopyBlockSize > 0 ? 1 : 0); + + MultipartUpload multipartUpload = s3Service.multipartStartUpload + (bucket.getName(), dstKey, srcObject.getMetadataMap()); + + List listedParts = new ArrayList(); + for (int i = 0; i < partCount; i++) { + long byteRangeStart = i * multipartCopyBlockSize; + long byteLength; + if (i < partCount - 1) { + byteLength = multipartCopyBlockSize; + } else { + byteLength = srcObject.getContentLength() % multipartCopyBlockSize; + if (byteLength == 0) { + byteLength = multipartCopyBlockSize; + } + } + + MultipartPart copiedPart = s3Service.multipartUploadPartCopy + (multipartUpload, i + 1, bucket.getName(), srcObject.getKey(), + null, null, null, null, byteRangeStart, + byteRangeStart + byteLength - 1, null); + listedParts.add(copiedPart); + } + + Collections.reverse(listedParts); + s3Service.multipartCompleteUpload(multipartUpload, listedParts); } catch (ServiceException e) { handleServiceException(e); } @@ -291,11 +386,11 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore { System.out.println(sb); } - private void handleS3ServiceException(String key, S3ServiceException e) throws IOException { - if ("NoSuchKey".equals(e.getS3ErrorCode())) { + private void handleServiceException(String key, ServiceException e) throws IOException { + if ("NoSuchKey".equals(e.getErrorCode())) { throw new FileNotFoundException("Key '" + key + "' does not exist in S3"); } else { - handleS3ServiceException(e); + handleServiceException(e); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/VersionInfo.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/VersionInfo.java index 1547577b864..1768567bd66 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/VersionInfo.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/VersionInfo.java @@ -31,6 +31,7 @@ import org.apache.hadoop.classification.InterfaceStability; import java.io.IOException; import java.io.InputStream; import java.util.Properties; +import org.apache.hadoop.io.IOUtils; /** * This class returns build information about Hadoop components. @@ -45,16 +46,19 @@ public class VersionInfo { protected VersionInfo(String component) { info = new Properties(); String versionInfoFile = component + "-version-info.properties"; + InputStream is = null; try { - InputStream is = Thread.currentThread().getContextClassLoader() + is = Thread.currentThread().getContextClassLoader() .getResourceAsStream(versionInfoFile); if (is == null) { throw new IOException("Resource not found"); } info.load(is); } catch (IOException ex) { - LogFactory.getLog(getClass()).warn("Could not read '" + - versionInfoFile + "', " + ex.toString(), ex); + LogFactory.getLog(getClass()).warn("Could not read '" + + versionInfoFile + "', " + ex.toString(), ex); + } finally { + IOUtils.closeStream(is); } } diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index a94d0c144b2..c7db1d472d8 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -532,6 +532,31 @@ filesystem (s3n: URIs). + + fs.s3n.multipart.uploads.enabled + false + Setting this property to true enables multiple uploads to + native S3 filesystem. When uploading a file, it is split into blocks + if the size is larger than fs.s3n.multipart.uploads.block.size. + + + + + fs.s3n.multipart.uploads.block.size + 67108864 + The block size for multipart uploads to native S3 filesystem. + Default size is 64MB. + + + + + fs.s3n.multipart.copy.block.size + 5368709120 + The block size for multipart copy in native S3 filesystem. + Default size is 5GB. + + + io.seqfile.compress.blocksize 1000000 diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/s3native/TestJets3tNativeFileSystemStore.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/s3native/TestJets3tNativeFileSystemStore.java new file mode 100644 index 00000000000..b1078a45144 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/s3native/TestJets3tNativeFileSystemStore.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3native; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +import static org.junit.Assert.*; +import static org.junit.Assume.*; + +import org.junit.Before; +import org.junit.After; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.security.DigestInputStream; +import java.security.DigestOutputStream; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; + + +public class TestJets3tNativeFileSystemStore { + private Configuration conf; + private Jets3tNativeFileSystemStore store; + private NativeS3FileSystem fs; + + @Before + public void setUp() throws Exception { + conf = new Configuration(); + store = new Jets3tNativeFileSystemStore(); + fs = new NativeS3FileSystem(store); + conf.setBoolean("fs.s3n.multipart.uploads.enabled", true); + conf.setLong("fs.s3n.multipart.uploads.block.size", 64 * 1024 * 1024); + fs.initialize(URI.create(conf.get("test.fs.s3n.name")), conf); + } + + @After + public void tearDown() throws Exception { + try { + store.purge("test"); + } catch (Exception e) {} + } + + @BeforeClass + public static void checkSettings() throws Exception { + Configuration conf = new Configuration(); + assumeNotNull(conf.get("fs.s3n.awsAccessKeyId")); + assumeNotNull(conf.get("fs.s3n.awsSecretAccessKey")); + assumeNotNull(conf.get("test.fs.s3n.name")); + } + + protected void writeRenameReadCompare(Path path, long len) + throws IOException, NoSuchAlgorithmException { + // If len > fs.s3n.multipart.uploads.block.size, + // we'll use a multipart upload copy + MessageDigest digest = MessageDigest.getInstance("MD5"); + OutputStream out = new BufferedOutputStream( + new DigestOutputStream(fs.create(path, false), digest)); + for (long i = 0; i < len; i++) { + out.write('Q'); + } + out.flush(); + out.close(); + + assertTrue("Exists", fs.exists(path)); + + // Depending on if this file is over 5 GB or not, + // rename will cause a multipart upload copy + Path copyPath = path.suffix(".copy"); + fs.rename(path, copyPath); + + assertTrue("Copy exists", fs.exists(copyPath)); + + // Download file from S3 and compare the digest against the original + MessageDigest digest2 = MessageDigest.getInstance("MD5"); + InputStream in = new BufferedInputStream( + new DigestInputStream(fs.open(copyPath), digest2)); + long copyLen = 0; + while (in.read() != -1) {copyLen++;} + in.close(); + + assertEquals("Copy length matches original", len, copyLen); + assertArrayEquals("Digests match", digest.digest(), digest2.digest()); + } + + @Test + public void testSmallUpload() throws IOException, NoSuchAlgorithmException { + // Regular upload, regular copy + writeRenameReadCompare(new Path("/test/small"), 16384); + } + + @Test + public void testMediumUpload() throws IOException, NoSuchAlgorithmException { + // Multipart upload, regular copy + writeRenameReadCompare(new Path("/test/medium"), 33554432); // 100 MB + } + + @Test + public void testExtraLargeUpload() + throws IOException, NoSuchAlgorithmException { + // Multipart upload, multipart copy + writeRenameReadCompare(new Path("/test/xlarge"), 5368709121L); // 5GB+1byte + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/resources/jets3t.properties b/hadoop-common-project/hadoop-common/src/test/resources/jets3t.properties new file mode 100644 index 00000000000..09cc46396ab --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/resources/jets3t.properties @@ -0,0 +1,16 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Speed up the s3native jets3t test + +s3service.max-thread-count=10 +threaded-service.max-thread-count=10 diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index dfbc75bcc04..d220f506711 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -13,9 +13,6 @@ Trunk (Unreleased) HDFS-3125. Add JournalService to enable Journal Daemon. (suresh) - HDFS-4685. Implementation of ACLs in HDFS. (See breakdown of tasks below for - features and contributors) - IMPROVEMENTS HDFS-4665. Move TestNetworkTopologyWithNodeGroup to common. @@ -256,86 +253,6 @@ Trunk (Unreleased) HDFS-5794. Fix the inconsistency of layout version number of ADD_DATANODE_AND_STORAGE_UUIDS between trunk and branch-2. (jing9) - BREAKDOWN OF HDFS-4685 SUBTASKS AND RELATED JIRAS - - HDFS-5596. Implement RPC stubs. (Haohui Mai via cnauroth) - - HDFS-5685. Implement ACL as a INode feature. (Haohui Mai via cnauroth) - - HDFS-5618. NameNode: persist ACLs in fsimage. (Haohui Mai via cnauroth) - - HDFS-5619. NameNode: record ACL modifications to edit log. - (Haohui Mai via cnauroth) - - HDFS-5673. Implement logic for modification of ACLs. (cnauroth) - - HDFS-5758. NameNode: complete implementation of inode modifications for - ACLs. (Chris Nauroth via wheat9) - - HDFS-5612. NameNode: change all permission checks to enforce ACLs in - addition to permissions. (Chris Nauroth via wheat9) - - HDFS-5613. NameNode: implement handling of ACLs in combination with - symlinks. (Chris Nauroth via wheat9) - - HDFS-5615. NameNode: implement handling of ACLs in combination with sticky - bit. (Chris Nauroth via wheat9) - - HDFS-5702. FsShell Cli: Add XML based End-to-End test for getfacl and - setfacl commands. (Vinay via cnauroth) - - HDFS-5608. WebHDFS: implement ACL APIs. - (Sachin Jose and Renil Joseph via cnauroth) - - HDFS-5614. NameNode: implement handling of ACLs in combination with - snapshots. (cnauroth) - - HDFS-5858. Refactor common ACL test cases to be run through multiple - FileSystem implementations. (cnauroth) - - HDFS-5860. Refactor INodeDirectory getDirectoryXFeature methods to use - common getFeature helper method. (Jing Zhao via cnauroth) - - HDFS-5861. Add CLI test for Ls output for extended ACL marker. - (Vinay via cnauroth) - - HDFS-5616. NameNode: implement default ACL handling. (cnauroth) - - HDFS-5899. Add configuration flag to disable/enable support for ACLs. - (cnauroth) - - HDFS-5914. Incorporate ACLs with the changes from HDFS-5698. - (Haohui Mai via cnauroth) - - HDFS-5625. Write end user documentation for HDFS ACLs. (cnauroth) - - HDFS-5925. ACL configuration flag must only reject ACL API calls, not ACLs - present in fsimage or edits. (cnauroth) - - HDFS-5923. Do not persist the ACL bit in the FsPermission. - (Haohui Mai via cnauroth) - - HDFS-5933. Optimize the FSImage layout for ACLs (Haohui Mai via cnauroth) - - HDFS-5932. Ls should display the ACL bit (Chris Nauroth via wheat9) - - HDFS-5937. Fix TestOfflineEditsViewer on HDFS-4685 branch. (cnauroth) - - HDFS-5737. Replacing only the default ACL can fail to copy unspecified base - entries from the access ACL. (cnauroth) - - HDFS-5739. ACL RPC must allow null name or unspecified permissions in ACL - entries. (cnauroth) - - HDFS-5799. Make audit logging consistent across ACL APIs. (cnauroth) - - HDFS-5849. Removing ACL from an inode fails if it has only a default ACL. - (cnauroth) - - HDFS-5623. NameNode: add tests for skipping ACL enforcement when permission - checks are disabled, user is superuser or user is member of supergroup. - (cnauroth) - Release 2.5.0 - UNRELEASED INCOMPATIBLE CHANGES @@ -359,6 +276,9 @@ Release 2.4.0 - UNRELEASED HDFS-5776 Support 'hedged' reads in DFSClient (Liang Xie via stack) + HDFS-4685. Implementation of ACLs in HDFS. (See breakdown of tasks below for + features and contributors) + IMPROVEMENTS HDFS-5781. Use an array to record the mapping between FSEditLogOpCode and @@ -439,6 +359,9 @@ Release 2.4.0 - UNRELEASED HDFS-6006. Remove duplicate code in FSNameSystem#getFileInfo. (Akira Ajisaka via cnauroth) + HDFS-6018. Exception recorded in LOG when IPCLoggerChannel#close is called. + (jing9) + OPTIMIZATIONS HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery @@ -563,6 +486,14 @@ Release 2.4.0 - UNRELEASED HDFS-5988. Bad fsimage always generated after upgrade. (wang) + HDFS-5922. DN heartbeat thread can get stuck in tight loop. (Arpit Agarwal) + + HDFS-6008. Namenode dead node link is giving HTTP error 500. + (Benoy Antony via cnauroth) + + HDFS-5936. MiniDFSCluster does not clean data left behind by + SecondaryNameNode. (Binglin Chang via cnauroth) + BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9) @@ -623,7 +554,88 @@ Release 2.4.0 - UNRELEASED HDFS-5981. PBImageXmlWriter generates malformed XML. (Haohui Mai via cnauroth) - HDFS-5922. DN heartbeat thread can get stuck in tight loop. (Arpit Agarwal) + BREAKDOWN OF HDFS-4685 SUBTASKS AND RELATED JIRAS + + HDFS-5596. Implement RPC stubs. (Haohui Mai via cnauroth) + + HDFS-5685. Implement ACL as a INode feature. (Haohui Mai via cnauroth) + + HDFS-5618. NameNode: persist ACLs in fsimage. (Haohui Mai via cnauroth) + + HDFS-5619. NameNode: record ACL modifications to edit log. + (Haohui Mai via cnauroth) + + HDFS-5673. Implement logic for modification of ACLs. (cnauroth) + + HDFS-5758. NameNode: complete implementation of inode modifications for + ACLs. (Chris Nauroth via wheat9) + + HDFS-5612. NameNode: change all permission checks to enforce ACLs in + addition to permissions. (Chris Nauroth via wheat9) + + HDFS-5613. NameNode: implement handling of ACLs in combination with + symlinks. (Chris Nauroth via wheat9) + + HDFS-5615. NameNode: implement handling of ACLs in combination with sticky + bit. (Chris Nauroth via wheat9) + + HDFS-5702. FsShell Cli: Add XML based End-to-End test for getfacl and + setfacl commands. (Vinay via cnauroth) + + HDFS-5608. WebHDFS: implement ACL APIs. + (Sachin Jose and Renil Joseph via cnauroth) + + HDFS-5614. NameNode: implement handling of ACLs in combination with + snapshots. (cnauroth) + + HDFS-5858. Refactor common ACL test cases to be run through multiple + FileSystem implementations. (cnauroth) + + HDFS-5860. Refactor INodeDirectory getDirectoryXFeature methods to use + common getFeature helper method. (Jing Zhao via cnauroth) + + HDFS-5861. Add CLI test for Ls output for extended ACL marker. + (Vinay via cnauroth) + + HDFS-5616. NameNode: implement default ACL handling. (cnauroth) + + HDFS-5899. Add configuration flag to disable/enable support for ACLs. + (cnauroth) + + HDFS-5914. Incorporate ACLs with the changes from HDFS-5698. + (Haohui Mai via cnauroth) + + HDFS-5625. Write end user documentation for HDFS ACLs. (cnauroth) + + HDFS-5925. ACL configuration flag must only reject ACL API calls, not ACLs + present in fsimage or edits. (cnauroth) + + HDFS-5923. Do not persist the ACL bit in the FsPermission. + (Haohui Mai via cnauroth) + + HDFS-5933. Optimize the FSImage layout for ACLs (Haohui Mai via cnauroth) + + HDFS-5932. Ls should display the ACL bit (Chris Nauroth via wheat9) + + HDFS-5937. Fix TestOfflineEditsViewer on HDFS-4685 branch. (cnauroth) + + HDFS-5737. Replacing only the default ACL can fail to copy unspecified base + entries from the access ACL. (cnauroth) + + HDFS-5739. ACL RPC must allow null name or unspecified permissions in ACL + entries. (cnauroth) + + HDFS-5799. Make audit logging consistent across ACL APIs. (cnauroth) + + HDFS-5849. Removing ACL from an inode fails if it has only a default ACL. + (cnauroth) + + HDFS-5623. NameNode: add tests for skipping ACL enforcement when permission + checks are disabled, user is superuser or user is member of supergroup. + (cnauroth) + + HDFS-5908. Change AclFeature to capture list of ACL entries in an + ImmutableList. (cnauroth) Release 2.3.1 - UNRELEASED diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java index 0235ec31038..7b9f2743bd0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java @@ -182,7 +182,6 @@ public class IPCLoggerChannel implements AsyncLogger { @Override public void close() { - QuorumJournalManager.LOG.info("Closing", new Exception()); // No more tasks may be submitted after this point. executor.shutdown(); if (proxy != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java index 0946d041817..615543ec16d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java @@ -147,7 +147,9 @@ public class JspHelper { */ public static final class Url { public static String authority(String scheme, DatanodeID d) { - String fqdn = canonicalize(d.getIpAddr()); + String fqdn = (d.getIpAddr() != null && !d.getIpAddr().isEmpty())? + canonicalize(d.getIpAddr()): + d.getHostName(); if (scheme.equals("http")) { return fqdn + ":" + d.getInfoPort(); } else if (scheme.equals("https")) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclFeature.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclFeature.java index 1849c9f28d6..1c5f469b3b0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclFeature.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclFeature.java @@ -18,26 +18,26 @@ package org.apache.hadoop.hdfs.server.namenode; -import java.util.Collections; -import java.util.List; - import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.permission.AclEntry; +import com.google.common.collect.ImmutableList; + /** * Feature that represents the ACLs of the inode. */ @InterfaceAudience.Private public class AclFeature implements INode.Feature { - public static final List EMPTY_ENTRY_LIST = Collections.emptyList(); + public static final ImmutableList EMPTY_ENTRY_LIST = + ImmutableList.of(); - private final List entries; + private final ImmutableList entries; - public AclFeature(List entries) { + public AclFeature(ImmutableList entries) { this.entries = entries; } - public List getEntries() { + public ImmutableList getEntries() { return entries; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclStorage.java index c40bd637322..a79bb393f9f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclStorage.java @@ -328,7 +328,7 @@ final class AclStorage { // Add all default entries to the feature. featureEntries.addAll(defaultEntries); - return new AclFeature(Collections.unmodifiableList(featureEntries)); + return new AclFeature(ImmutableList.copyOf(featureEntries)); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index 97c9a456a9b..2bf11033af9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -79,6 +79,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.common.Storage; +import org.apache.hadoop.hdfs.server.common.Util; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataStorage; @@ -817,6 +818,14 @@ public class MiniDFSCluster { throw new IOException("Could not fully delete " + nameDir); } } + Collection checkpointDirs = Util.stringCollectionAsURIs(conf + .getTrimmedStringCollection(DFS_NAMENODE_CHECKPOINT_DIR_KEY)); + for (URI checkpointDirUri : checkpointDirs) { + File checkpointDir = new File(checkpointDirUri); + if (checkpointDir.exists() && !FileUtil.fullyDelete(checkpointDir)) { + throw new IOException("Could not fully delete " + checkpointDir); + } + } } boolean formatThisOne = format; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestJspHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestJspHelper.java index 1f27a0b8428..e94d69ab55b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestJspHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestJspHelper.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.common; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -641,5 +642,20 @@ public class TestJspHelper { assertTrue(upgradeStatusReport.getStatusText(true).equals( MessageFormat.format(EXPECTED__NOTF_PATTERN, version))); } + + @Test + public void testAuthority(){ + DatanodeID dnWithIp = new DatanodeID("127.0.0.1", "hostName", null, + 50020, 50075, 50076, 50010); + assertNotNull(JspHelper.Url.authority("http", dnWithIp)); + + DatanodeID dnWithNullIp = new DatanodeID(null, "hostName", null, + 50020, 50075, 50076, 50010); + assertNotNull(JspHelper.Url.authority("http", dnWithNullIp)); + + DatanodeID dnWithEmptyIp = new DatanodeID("", "hostName", null, + 50020, 50075, 50076, 50010); + assertNotNull(JspHelper.Url.authority("http", dnWithEmptyIp)); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java index 5b03c7df13d..d3dc844d015 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java @@ -48,6 +48,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; /** @@ -1272,6 +1273,12 @@ public abstract class FSAclBaseTest { AclFeature aclFeature = inode.getAclFeature(); if (expectAclFeature) { assertNotNull(aclFeature); + // Intentionally capturing a reference to the entries, not using nested + // calls. This way, we get compile-time enforcement that the entries are + // stored in an ImmutableList. + ImmutableList entries = aclFeature.getEntries(); + assertNotNull(entries); + assertFalse(entries.isEmpty()); } else { assertNull(aclFeature); } diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index f3157c6d6fa..e75fbba9b48 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -30,6 +30,8 @@ Release 2.5.0 - UNRELEASED YARN-1678. Fair scheduler gabs incessantly about reservations (Sandy Ryza) + YARN-1561. Fix a generic type warning in FairScheduler. (Chen He via junping_du) + OPTIMIZATIONS BUG FIXES @@ -144,6 +146,10 @@ Release 2.4.0 - UNRELEASED YARN-1497. Command line additions for moving apps between queues (Sandy Ryza) + YARN-1588. Enhanced RM and the scheduling protocol to also send NMTokens of + transferred containers from previous app-attempts to new AMs after YARN-1490. + (Jian He via vinodkv) + IMPROVEMENTS YARN-1007. Enhance History Reader interface for Containers. (Mayank Bansal via diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java index 363d87265ee..79f9f3a442c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java @@ -29,6 +29,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.util.Records; @@ -55,13 +56,15 @@ public abstract class RegisterApplicationMasterResponse { public static RegisterApplicationMasterResponse newInstance( Resource minCapability, Resource maxCapability, Map acls, ByteBuffer key, - List containersFromPreviousAttempt, String queue) { + List containersFromPreviousAttempt, String queue, + List nmTokensFromPreviousAttempts) { RegisterApplicationMasterResponse response = Records.newRecord(RegisterApplicationMasterResponse.class); response.setMaximumResourceCapability(maxCapability); response.setApplicationACLs(acls); response.setClientToAMTokenMasterKey(key); - response.setContainersFromPreviousAttempt(containersFromPreviousAttempt); + response.setContainersFromPreviousAttempts(containersFromPreviousAttempt); + response.setNMTokensFromPreviousAttempts(nmTokensFromPreviousAttempts); response.setQueue(queue); return response; } @@ -129,26 +132,52 @@ public abstract class RegisterApplicationMasterResponse { /** *

* Get the list of running containers as viewed by - * ResourceManager from previous application attempt. + * ResourceManager from previous application attempts. *

* * @return the list of running containers as viewed by - * ResourceManager from previous application attempt + * ResourceManager from previous application attempts + * @see RegisterApplicationMasterResponse#getNMTokensFromPreviousAttempts() */ @Public @Unstable - public abstract List getContainersFromPreviousAttempt(); + public abstract List getContainersFromPreviousAttempts(); /** * Set the list of running containers as viewed by - * ResourceManager from previous application attempt. + * ResourceManager from previous application attempts. * * @param containersFromPreviousAttempt * the list of running containers as viewed by - * ResourceManager from previous application attempt. + * ResourceManager from previous application attempts. */ @Private @Unstable - public abstract void setContainersFromPreviousAttempt( + public abstract void setContainersFromPreviousAttempts( List containersFromPreviousAttempt); + + /** + * Get the list of NMTokens for communicating with the NMs where the + * containers of previous application attempts are running. + * + * @return the list of NMTokens for communicating with the NMs where the + * containers of previous application attempts are running. + * + * @see RegisterApplicationMasterResponse#getContainersFromPreviousAttempts() + */ + @Public + @Stable + public abstract List getNMTokensFromPreviousAttempts(); + + /** + * Set the list of NMTokens for communicating with the NMs where the the + * containers of previous application attempts are running. + * + * @param nmTokens + * the list of NMTokens for communicating with the NMs where the + * containers of previous application attempts are running. + */ + @Private + @Unstable + public abstract void setNMTokensFromPreviousAttempts(List nmTokens); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NMToken.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NMToken.java index d8bf0e66f7a..ea8bec2e8d7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NMToken.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NMToken.java @@ -72,4 +72,37 @@ public abstract class NMToken { @Stable public abstract void setToken(Token token); + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = + prime * result + ((getNodeId() == null) ? 0 : getNodeId().hashCode()); + result = + prime * result + ((getToken() == null) ? 0 : getToken().hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + NMToken other = (NMToken) obj; + if (getNodeId() == null) { + if (other.getNodeId() != null) + return false; + } else if (!getNodeId().equals(other.getNodeId())) + return false; + if (getToken() == null) { + if (other.getToken() != null) + return false; + } else if (!getToken().equals(other.getToken())) + return false; + return true; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index eff5cd7ae82..a1f6d2e211d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -44,8 +44,9 @@ message RegisterApplicationMasterResponseProto { optional ResourceProto maximumCapability = 1; optional bytes client_to_am_token_master_key = 2; repeated ApplicationACLMapProto application_ACLs = 3; - repeated ContainerProto containers_from_previous_attempt = 4; + repeated ContainerProto containers_from_previous_attempts = 4; optional string queue = 5; + repeated NMTokenProto nm_tokens_from_previous_attempts = 6; } message FinishApplicationMasterRequestProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 8d869a225cd..228f1845b46 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -542,7 +543,7 @@ public class ApplicationMaster { } List previousAMRunningContainers = - response.getContainersFromPreviousAttempt(); + response.getContainersFromPreviousAttempts(); LOG.info("Received " + previousAMRunningContainers.size() + " previous AM's running containers on AM registration."); numAllocatedContainers.addAndGet(previousAMRunningContainers.size()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index f7ed0ac3140..8c90d398389 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -195,6 +195,12 @@ public class AMRMClientImpl extends AMRMClient { appTrackingUrl); RegisterApplicationMasterResponse response = rmClient.registerApplicationMaster(request); + + synchronized (this) { + if(!response.getNMTokensFromPreviousAttempts().isEmpty()) { + populateNMTokens(response.getNMTokensFromPreviousAttempts()); + } + } return response; } @@ -250,7 +256,7 @@ public class AMRMClientImpl extends AMRMClient { lastResponseId = allocateResponse.getResponseId(); clusterAvailableResources = allocateResponse.getAvailableResources(); if (!allocateResponse.getNMTokens().isEmpty()) { - populateNMTokens(allocateResponse); + populateNMTokens(allocateResponse.getNMTokens()); } } } finally { @@ -284,13 +290,17 @@ public class AMRMClientImpl extends AMRMClient { @Private @VisibleForTesting - protected void populateNMTokens(AllocateResponse allocateResponse) { - for (NMToken token : allocateResponse.getNMTokens()) { + protected void populateNMTokens(List nmTokens) { + for (NMToken token : nmTokens) { String nodeId = token.getNodeId().toString(); if (getNMTokenCache().containsToken(nodeId)) { - LOG.debug("Replacing token for : " + nodeId); + if (LOG.isDebugEnabled()) { + LOG.debug("Replacing token for : " + nodeId); + } } else { - LOG.debug("Received new token for : " + nodeId); + if (LOG.isDebugEnabled()) { + LOG.debug("Received new token for : " + nodeId); + } } getNMTokenCache().setToken(nodeId, token.getToken()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java index ae488c452f3..06a637a3e62 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java @@ -31,13 +31,16 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.NMTokenProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProtoOrBuilder; @@ -56,7 +59,8 @@ public class RegisterApplicationMasterResponsePBImpl extends private Resource maximumResourceCapability; private Map applicationACLS = null; - private List containersFromPreviousAttempt = null; + private List containersFromPreviousAttempts = null; + private List nmTokens = null; public RegisterApplicationMasterResponsePBImpl() { builder = RegisterApplicationMasterResponseProto.newBuilder(); @@ -110,8 +114,13 @@ public class RegisterApplicationMasterResponsePBImpl extends if (this.applicationACLS != null) { addApplicationACLs(); } - if (this.containersFromPreviousAttempt != null) { - addRunningContainersToProto(); + if (this.containersFromPreviousAttempts != null) { + addContainersFromPreviousAttemptToProto(); + } + if (nmTokens != null) { + builder.clearNmTokensFromPreviousAttempts(); + Iterable iterable = getTokenProtoIterable(nmTokens); + builder.addAllNmTokensFromPreviousAttempts(iterable); } } @@ -236,21 +245,22 @@ public class RegisterApplicationMasterResponsePBImpl extends } @Override - public List getContainersFromPreviousAttempt() { - if (this.containersFromPreviousAttempt != null) { - return this.containersFromPreviousAttempt; + public List getContainersFromPreviousAttempts() { + if (this.containersFromPreviousAttempts != null) { + return this.containersFromPreviousAttempts; } - initRunningContainersList(); - return this.containersFromPreviousAttempt; + initContainersPreviousAttemptList(); + return this.containersFromPreviousAttempts; } @Override - public void setContainersFromPreviousAttempt(final List containers) { + public void + setContainersFromPreviousAttempts(final List containers) { if (containers == null) { return; } - this.containersFromPreviousAttempt = new ArrayList(); - this.containersFromPreviousAttempt.addAll(containers); + this.containersFromPreviousAttempts = new ArrayList(); + this.containersFromPreviousAttempts.addAll(containers); } @Override @@ -272,25 +282,88 @@ public class RegisterApplicationMasterResponsePBImpl extends } } - private void initRunningContainersList() { - RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder; - List list = p.getContainersFromPreviousAttemptList(); - containersFromPreviousAttempt = new ArrayList(); + + private void initContainersPreviousAttemptList() { + RegisterApplicationMasterResponseProtoOrBuilder p = + viaProto ? proto : builder; + List list = p.getContainersFromPreviousAttemptsList(); + containersFromPreviousAttempts = new ArrayList(); for (ContainerProto c : list) { - containersFromPreviousAttempt.add(convertFromProtoFormat(c)); + containersFromPreviousAttempts.add(convertFromProtoFormat(c)); } } - private void addRunningContainersToProto() { + private void addContainersFromPreviousAttemptToProto() { maybeInitBuilder(); - builder.clearContainersFromPreviousAttempt(); + builder.clearContainersFromPreviousAttempts(); List list = new ArrayList(); - for (Container c : containersFromPreviousAttempt) { + for (Container c : containersFromPreviousAttempts) { list.add(convertToProtoFormat(c)); } - builder.addAllContainersFromPreviousAttempt(list); + builder.addAllContainersFromPreviousAttempts(list); + } + + + @Override + public List getNMTokensFromPreviousAttempts() { + if (nmTokens != null) { + return nmTokens; + } + initLocalNewNMTokenList(); + return nmTokens; } + @Override + public void setNMTokensFromPreviousAttempts(final List nmTokens) { + if (nmTokens == null || nmTokens.isEmpty()) { + if (this.nmTokens != null) { + this.nmTokens.clear(); + } + builder.clearNmTokensFromPreviousAttempts(); + return; + } + this.nmTokens = new ArrayList(); + this.nmTokens.addAll(nmTokens); + } + + private synchronized void initLocalNewNMTokenList() { + RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getNmTokensFromPreviousAttemptsList(); + nmTokens = new ArrayList(); + for (NMTokenProto t : list) { + nmTokens.add(convertFromProtoFormat(t)); + } + } + + private synchronized Iterable getTokenProtoIterable( + final List nmTokenList) { + maybeInitBuilder(); + return new Iterable() { + @Override + public synchronized Iterator iterator() { + return new Iterator() { + + Iterator iter = nmTokenList.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public NMTokenProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + } + private Resource convertFromProtoFormat(ResourceProto resource) { return new ResourcePBImpl(resource); } @@ -306,4 +379,12 @@ public class RegisterApplicationMasterResponsePBImpl extends private ContainerProto convertToProtoFormat(Container t) { return ((ContainerPBImpl) t).getProto(); } + + private NMTokenProto convertToProtoFormat(NMToken token) { + return ((NMTokenPBImpl) token).getProto(); + } + + private NMToken convertFromProtoFormat(NMTokenProto proto) { + return new NMTokenPBImpl(proto); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NMTokenPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NMTokenPBImpl.java index bcbf0a3f8af..a269641504a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NMTokenPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NMTokenPBImpl.java @@ -47,7 +47,7 @@ public class NMTokenPBImpl extends NMToken{ this.proto = proto; viaProto = true; } - + @Override public synchronized NodeId getNodeId() { NMTokenProtoOrBuilder p = viaProto ? proto : builder; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index db81dd86afa..b5f90c6f38a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.io.IOException; import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -51,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.PreemptionContainer; import org.apache.hadoop.yarn.api.records.PreemptionContract; @@ -280,10 +282,32 @@ public class ApplicationMasterService extends AbstractService implements .getMasterKey(applicationAttemptId).getEncoded())); } - List containerList = + // For work-preserving AM restart, retrieve previous attempts' containers + // and corresponding NM tokens. + List transferredContainers = ((AbstractYarnScheduler) rScheduler) .getTransferredContainers(applicationAttemptId); - response.setContainersFromPreviousAttempt(containerList); + if (!transferredContainers.isEmpty()) { + response.setContainersFromPreviousAttempts(transferredContainers); + List nmTokens = new ArrayList(); + for (Container container : transferredContainers) { + try { + nmTokens.add(rmContext.getNMTokenSecretManager() + .createAndGetNMToken(app.getUser(), applicationAttemptId, + container)); + } catch (IllegalArgumentException e) { + // if it's a DNS issue, throw UnknowHostException directly and that + // will be automatically retried by RMProxy in RPC layer. + if (e.getCause() instanceof UnknownHostException) { + throw (UnknownHostException) e.getCause(); + } + } + } + response.setNMTokensFromPreviousAttempts(nmTokens); + LOG.info("Application " + appID + " retrieved " + + transferredContainers.size() + " containers from previous" + + " attempts and " + nmTokens.size() + " NM tokens."); + } return response; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index f35f76ff63d..919d5616371 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -385,9 +385,8 @@ public class SchedulerApplicationAttempt { } } catch (IllegalArgumentException e) { // DNS might be down, skip returning this container. - LOG.error( - "Error trying to assign container token to allocated container " - + container.getId(), e); + LOG.error("Error trying to assign container token and NM token to" + + " an allocated container " + container.getId(), e); continue; } returnContainerList.add(container); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index a27a86d3075..dbadd2d10a1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -175,7 +175,7 @@ public class FairScheduler extends AbstractYarnScheduler { protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not protected int continuousSchedulingSleepMs; // Sleep time for each pass in continuous scheduling - private Comparator nodeAvailableResourceComparator = + private Comparator nodeAvailableResourceComparator = new NodeAvailableResourceComparator(); // Node available resource comparator protected double nodeLocalityThreshold; // Cluster threshold for node locality protected double rackLocalityThreshold; // Cluster threshold for rack locality diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 63efe8fe454..c91ce350159 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -486,6 +486,7 @@ public class MockRM extends ResourceManager { public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm) throws Exception { + rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED); RMAppAttempt attempt = app.getCurrentAppAttempt(); nm.nodeHeartbeat(true); MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index ca9befd599c..66d7acb9792 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -24,6 +24,7 @@ import java.util.List; import junit.framework.Assert; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -31,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; @@ -160,11 +162,11 @@ public class TestAMRestart { am2.registerAppAttempt(); // Assert two containers are running: container2 and container3; - Assert.assertEquals(2, registerResponse.getContainersFromPreviousAttempt() + Assert.assertEquals(2, registerResponse.getContainersFromPreviousAttempts() .size()); boolean containerId2Exists = false, containerId3Exists = false; for (Container container : registerResponse - .getContainersFromPreviousAttempt()) { + .getContainersFromPreviousAttempts()) { if (container.getId().equals(containerId2)) { containerId2Exists = true; } @@ -232,4 +234,100 @@ public class TestAMRestart { rm1.stop(); } + + @Test + public void testNMTokensRebindOnAMRestart() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 3); + + MockRM rm1 = new MockRM(conf); + rm1.start(); + RMApp app1 = + rm1.submitApp(200, "myname", "myuser", + new HashMap(), false, "default", -1, + null, "MAPREDUCE", false, true); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); + nm1.registerNode(); + MockNM nm2 = + new MockNM("127.1.1.1:4321", 8000, rm1.getResourceTrackerService()); + nm2.registerNode(); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + int NUM_CONTAINERS = 1; + List containers = new ArrayList(); + // nmTokens keeps track of all the nmTokens issued in the allocate call. + List expectedNMTokens = new ArrayList(); + + // am1 allocate 1 container on nm1. + while (true) { + AllocateResponse response = + am1.allocate("127.0.0.1", 2000, NUM_CONTAINERS, + new ArrayList()); + nm1.nodeHeartbeat(true); + containers.addAll(response.getAllocatedContainers()); + expectedNMTokens.addAll(response.getNMTokens()); + if (containers.size() == NUM_CONTAINERS) { + break; + } + Thread.sleep(200); + System.out.println("Waiting for container to be allocated."); + } + // launch the container + nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING); + ContainerId containerId2 = + ContainerId.newInstance(am1.getApplicationAttemptId(), 2); + rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING); + + // fail am1 + nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am1.waitForState(RMAppAttemptState.FAILED); + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + + // restart the am + MockAM am2 = MockRM.launchAM(app1, rm1, nm1); + RegisterApplicationMasterResponse registerResponse = + am2.registerAppAttempt(); + rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING); + + // check am2 get the nm token from am1. + Assert.assertEquals(expectedNMTokens, + registerResponse.getNMTokensFromPreviousAttempts()); + + // am2 allocate 1 container on nm2 + containers = new ArrayList(); + while (true) { + AllocateResponse allocateResponse = + am2.allocate("127.1.1.1", 4000, NUM_CONTAINERS, + new ArrayList()); + nm2.nodeHeartbeat(true); + containers.addAll(allocateResponse.getAllocatedContainers()); + expectedNMTokens.addAll(allocateResponse.getNMTokens()); + if (containers.size() == NUM_CONTAINERS) { + break; + } + Thread.sleep(200); + System.out.println("Waiting for container to be allocated."); + } + nm1.nodeHeartbeat(am2.getApplicationAttemptId(), 2, ContainerState.RUNNING); + ContainerId am2ContainerId2 = + ContainerId.newInstance(am2.getApplicationAttemptId(), 2); + rm1.waitForState(nm1, am2ContainerId2, RMContainerState.RUNNING); + + // fail am2. + nm1.nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am2.waitForState(RMAppAttemptState.FAILED); + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + + // restart am + MockAM am3 = MockRM.launchAM(app1, rm1, nm1); + registerResponse = am3.registerAppAttempt(); + rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING); + + // check am3 get the NM token from both am1 and am2; + List transferredTokens = registerResponse.getNMTokensFromPreviousAttempts(); + Assert.assertEquals(2, transferredTokens.size()); + Assert.assertTrue(transferredTokens.containsAll(expectedNMTokens)); + rm1.stop(); + } }