From 1c4f4a38ca958dbb8be911b7b7d6bf57722b7514 Mon Sep 17 00:00:00 2001 From: Colin McCabe Date: Thu, 19 Sep 2013 02:09:43 +0000 Subject: [PATCH 01/11] HADOOP-9929. Insufficient permission for a path reported as file not found. (Contributed by Colin Patrick McCabe) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1524611 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 3 + .../java/org/apache/hadoop/fs/Globber.java | 9 +- .../org/apache/hadoop/fs/TestGlobPaths.java | 94 ++++++++++++++++--- 3 files changed, 87 insertions(+), 19 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 74b962194ce..02c4fc0939f 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -366,6 +366,9 @@ Release 2.3.0 - UNRELEASED HADOOP-9350. Hadoop not building against Java7 on OSX (Robert Kanter via stevel) + HADOOP-9929. Insufficient permissions for a path reported as file not found. + (Contributed by Colin Patrick McCabe) + Release 2.2.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Globber.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Globber.java index bae801ef0a3..a23649fe482 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Globber.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Globber.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.fs; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -50,26 +51,26 @@ class Globber { this.filter = filter; } - private FileStatus getFileStatus(Path path) { + private FileStatus getFileStatus(Path path) throws IOException { try { if (fs != null) { return fs.getFileStatus(path); } else { return fc.getFileStatus(path); } - } catch (IOException e) { + } catch (FileNotFoundException e) { return null; } } - private FileStatus[] listStatus(Path path) { + private FileStatus[] listStatus(Path path) throws IOException { try { if (fs != null) { return fs.listStatus(path); } else { return fc.util().listStatus(path); } - } catch (IOException e) { + } catch (FileNotFoundException e) { return new FileStatus[0]; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestGlobPaths.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestGlobPaths.java index 820b00bb0b0..8e8124747c3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestGlobPaths.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestGlobPaths.java @@ -20,6 +20,7 @@ package org.apache.hadoop.fs; import static org.junit.Assert.*; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.regex.Pattern; import org.apache.commons.lang.StringUtils; @@ -27,10 +28,15 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.UserGroupInformation; import org.junit.*; public class TestGlobPaths { + private static final UserGroupInformation unprivilegedUser = + UserGroupInformation.createRemoteUser("myuser"); + static class RegexPathFilter implements PathFilter { private final String regex; @@ -47,17 +53,33 @@ public class TestGlobPaths { static private MiniDFSCluster dfsCluster; static private FileSystem fs; + static private FileSystem unprivilegedFs; static private FileContext fc; + static private FileContext unprivilegedFc; static final private int NUM_OF_PATHS = 4; static private String USER_DIR; private Path[] path = new Path[NUM_OF_PATHS]; @BeforeClass public static void setUp() throws Exception { - Configuration conf = new HdfsConfiguration(); + final Configuration conf = new HdfsConfiguration(); dfsCluster = new MiniDFSCluster.Builder(conf).build(); fs = FileSystem.get(conf); + unprivilegedFs = + unprivilegedUser.doAs(new PrivilegedExceptionAction() { + @Override + public FileSystem run() throws IOException { + return FileSystem.get(conf); + } + }); fc = FileContext.getFileContext(conf); + unprivilegedFc = + unprivilegedUser.doAs(new PrivilegedExceptionAction() { + @Override + public FileContext run() throws IOException { + return FileContext.getFileContext(conf); + } + }); USER_DIR = fs.getHomeDirectory().toUri().getPath().toString(); } @@ -781,8 +803,8 @@ public class TestGlobPaths { * A glob test that can be run on either FileContext or FileSystem. */ private static interface FSTestWrapperGlobTest { - void run(FSTestWrapper wrap, FileSystem fs, FileContext fc) - throws Exception; + void run(FSTestWrapper wrap, FSTestWrapper unprivilegedWrapper, + FileSystem fs, FileContext fc) throws Exception; } /** @@ -791,7 +813,8 @@ public class TestGlobPaths { private void testOnFileSystem(FSTestWrapperGlobTest test) throws Exception { try { fc.mkdir(new Path(USER_DIR), FsPermission.getDefault(), true); - test.run(new FileSystemTestWrapper(fs), fs, null); + test.run(new FileSystemTestWrapper(fs), + new FileSystemTestWrapper(unprivilegedFs), fs, null); } finally { fc.delete(new Path(USER_DIR), true); } @@ -803,7 +826,8 @@ public class TestGlobPaths { private void testOnFileContext(FSTestWrapperGlobTest test) throws Exception { try { fs.mkdirs(new Path(USER_DIR)); - test.run(new FileContextTestWrapper(fc), null, fc); + test.run(new FileContextTestWrapper(fc), + new FileContextTestWrapper(unprivilegedFc), null, fc); } finally { cleanupDFS(); } @@ -834,8 +858,8 @@ public class TestGlobPaths { * Test globbing through symlinks. */ private static class TestGlobWithSymlinks implements FSTestWrapperGlobTest { - public void run(FSTestWrapper wrap, FileSystem fs, FileContext fc) - throws Exception { + public void run(FSTestWrapper wrap, FSTestWrapper unprivilegedWrap, + FileSystem fs, FileContext fc) throws Exception { // Test that globbing through a symlink to a directory yields a path // containing that symlink. wrap.mkdir(new Path(USER_DIR + "/alpha"), FsPermission.getDirDefault(), @@ -886,8 +910,8 @@ public class TestGlobPaths { */ private static class TestGlobWithSymlinksToSymlinks implements FSTestWrapperGlobTest { - public void run(FSTestWrapper wrap, FileSystem fs, FileContext fc) - throws Exception { + public void run(FSTestWrapper wrap, FSTestWrapper unprivilegedWrap, + FileSystem fs, FileContext fc) throws Exception { // Test that globbing through a symlink to a symlink to a directory // fully resolves wrap.mkdir(new Path(USER_DIR + "/alpha"), FsPermission.getDirDefault(), @@ -961,8 +985,8 @@ public class TestGlobPaths { */ private static class TestGlobSymlinksWithCustomPathFilter implements FSTestWrapperGlobTest { - public void run(FSTestWrapper wrap, FileSystem fs, FileContext fc) - throws Exception { + public void run(FSTestWrapper wrap, FSTestWrapper unprivilegedWrap, + FileSystem fs, FileContext fc) throws Exception { // Test that globbing through a symlink to a symlink to a directory // fully resolves wrap.mkdir(new Path(USER_DIR + "/alpha"), FsPermission.getDirDefault(), @@ -1009,8 +1033,8 @@ public class TestGlobPaths { * Test that globStatus fills in the scheme even when it is not provided. */ private static class TestGlobFillsInScheme implements FSTestWrapperGlobTest { - public void run(FSTestWrapper wrap, FileSystem fs, FileContext fc) - throws Exception { + public void run(FSTestWrapper wrap, FSTestWrapper unprivilegedWrap, + FileSystem fs, FileContext fc) throws Exception { // Verify that the default scheme is hdfs, when we don't supply one. wrap.mkdir(new Path(USER_DIR + "/alpha"), FsPermission.getDirDefault(), false); @@ -1052,8 +1076,8 @@ public class TestGlobPaths { * Test that globStatus works with relative paths. **/ private static class TestRelativePath implements FSTestWrapperGlobTest { - public void run(FSTestWrapper wrap, FileSystem fs, FileContext fc) - throws Exception { + public void run(FSTestWrapper wrap, FSTestWrapper unprivilegedWrap, + FileSystem fs, FileContext fc) throws Exception { String[] files = new String[] { "a", "abc", "abc.p", "bacd" }; Path[] path = new Path[files.length]; @@ -1086,4 +1110,44 @@ public class TestGlobPaths { public void testRelativePathOnFC() throws Exception { testOnFileContext(new TestRelativePath()); } + + /** + * Test that trying to glob through a directory we don't have permission + * to list fails with AccessControlException rather than succeeding or + * throwing any other exception. + **/ + private static class TestGlobAccessDenied implements FSTestWrapperGlobTest { + public void run(FSTestWrapper wrap, FSTestWrapper unprivilegedWrap, + FileSystem fs, FileContext fc) throws Exception { + wrap.mkdir(new Path("/nopermission/val"), + new FsPermission((short)0777), true); + wrap.mkdir(new Path("/norestrictions/val"), + new FsPermission((short)0777), true); + wrap.setPermission(new Path("/nopermission"), + new FsPermission((short)0)); + try { + unprivilegedWrap.globStatus(new Path("/no*/*"), + new AcceptAllPathFilter()); + Assert.fail("expected to get an AccessControlException when " + + "globbing through a directory we don't have permissions " + + "to list."); + } catch (AccessControlException ioe) { + } + + Assert.assertEquals("/norestrictions/val", + TestPath.mergeStatuses(unprivilegedWrap.globStatus( + new Path("/norestrictions/*"), + new AcceptAllPathFilter()))); + } + } + + @Test + public void testGlobAccessDeniedOnFS() throws Exception { + testOnFileSystem(new TestGlobAccessDenied()); + } + + @Test + public void testGlobAccessDeniedOnFC() throws Exception { + testOnFileContext(new TestGlobAccessDenied()); + } } From 3720956a6afc2014fbe2b942300d0723486f7d79 Mon Sep 17 00:00:00 2001 From: Ivan Mitic Date: Thu, 19 Sep 2013 06:54:29 +0000 Subject: [PATCH 02/11] HADOOP-9791. Add a test case covering long paths for new FileUtil access check methods. Contributed by Ivan Mitic. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1524631 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 3 ++ .../hadoop/io/nativeio/TestNativeIO.java | 39 +++++++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 02c4fc0939f..2148381bbef 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -369,6 +369,9 @@ Release 2.3.0 - UNRELEASED HADOOP-9929. Insufficient permissions for a path reported as file not found. (Contributed by Colin Patrick McCabe) + HADOOP-9791. Add a test case covering long paths for new FileUtil access + check methods (ivanmi) + Release 2.2.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java index 4d71e15c4b3..2e7c62c9339 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java @@ -254,6 +254,45 @@ public class TestNativeIO { File testFile = new File(TEST_DIR, "testfileaccess"); assertTrue(testFile.createNewFile()); + // Validate ACCESS_READ + FileUtil.setReadable(testFile, false); + assertFalse(NativeIO.Windows.access(testFile.getAbsolutePath(), + NativeIO.Windows.AccessRight.ACCESS_READ)); + + FileUtil.setReadable(testFile, true); + assertTrue(NativeIO.Windows.access(testFile.getAbsolutePath(), + NativeIO.Windows.AccessRight.ACCESS_READ)); + + // Validate ACCESS_WRITE + FileUtil.setWritable(testFile, false); + assertFalse(NativeIO.Windows.access(testFile.getAbsolutePath(), + NativeIO.Windows.AccessRight.ACCESS_WRITE)); + + FileUtil.setWritable(testFile, true); + assertTrue(NativeIO.Windows.access(testFile.getAbsolutePath(), + NativeIO.Windows.AccessRight.ACCESS_WRITE)); + + // Validate ACCESS_EXECUTE + FileUtil.setExecutable(testFile, false); + assertFalse(NativeIO.Windows.access(testFile.getAbsolutePath(), + NativeIO.Windows.AccessRight.ACCESS_EXECUTE)); + + FileUtil.setExecutable(testFile, true); + assertTrue(NativeIO.Windows.access(testFile.getAbsolutePath(), + NativeIO.Windows.AccessRight.ACCESS_EXECUTE)); + + // Validate that access checks work as expected for long paths + + // Assemble a path longer then 260 chars (MAX_PATH) + String testFileRelativePath = ""; + for (int i = 0; i < 15; ++i) { + testFileRelativePath += "testfileaccessfolder\\"; + } + testFileRelativePath += "testfileaccess"; + testFile = new File(TEST_DIR, testFileRelativePath); + assertTrue(testFile.getParentFile().mkdirs()); + assertTrue(testFile.createNewFile()); + // Validate ACCESS_READ FileUtil.setReadable(testFile, false); assertFalse(NativeIO.Windows.access(testFile.getAbsolutePath(), From 3a87e8fec08cd5b778e6a65baf3cece30a972b4d Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Thu, 19 Sep 2013 08:19:27 +0000 Subject: [PATCH 03/11] HADOOP-9974: document setting mvn heap size git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1524650 13f79535-47bb-0310-9956-ffa450edef68 --- BUILDING.txt | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/BUILDING.txt b/BUILDING.txt index c9415dcedef..77d3e5a0a8d 100644 --- a/BUILDING.txt +++ b/BUILDING.txt @@ -4,8 +4,8 @@ Build instructions for Hadoop Requirements: * Unix System -* JDK 1.6 -* Maven 3.0 +* JDK 1.6+ +* Maven 3.0 or later * Findbugs 1.3.9 (if running findbugs) * ProtocolBuffer 2.5.0 * CMake 2.6 or newer (if compiling native code) @@ -149,6 +149,21 @@ Create a local staging version of the website (in /tmp/hadoop-site) ---------------------------------------------------------------------------------- +Handling out of memory errors in builds + +---------------------------------------------------------------------------------- + +If the build process fails with an out of memory error, you should be able to fix +it by increasing the memory used by maven -which can be done via the environment +variable MAVEN_OPTS. + +Here is an example setting to allocate between 256 and 512 MB of heap space to +Maven + +export MAVEN_OPTS="-Xms256m -Xmx512m" + +---------------------------------------------------------------------------------- + Building on OS/X ---------------------------------------------------------------------------------- From 8de47fc5700e70c69b85ba1202e1f6121b7a6a80 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Thu, 19 Sep 2013 08:21:06 +0000 Subject: [PATCH 04/11] HADOOP-9974: document setting mvn heap size - trailing space git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1524651 13f79535-47bb-0310-9956-ffa450edef68 --- BUILDING.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/BUILDING.txt b/BUILDING.txt index 77d3e5a0a8d..2deb2ac2790 100644 --- a/BUILDING.txt +++ b/BUILDING.txt @@ -155,7 +155,7 @@ Handling out of memory errors in builds If the build process fails with an out of memory error, you should be able to fix it by increasing the memory used by maven -which can be done via the environment -variable MAVEN_OPTS. +variable MAVEN_OPTS. Here is an example setting to allocate between 256 and 512 MB of heap space to Maven From 8628c1704b7d8c1a0ac01cea679c03245d11d330 Mon Sep 17 00:00:00 2001 From: Hitesh Shah Date: Thu, 19 Sep 2013 20:39:10 +0000 Subject: [PATCH 05/11] YARN-353. Add Zookeeper-based store implementation for RMStateStore. Contributed by Bikas Saha, Jian He and Karthik Kambatla. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1524829 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/hadoop/ha/ClientBaseWithFixes.java | 10 +- hadoop-yarn-project/CHANGES.txt | 5 + .../hadoop/yarn/conf/YarnConfiguration.java | 28 + .../src/main/resources/yarn-default.xml | 45 ++ .../pom.xml | 10 + .../recovery/FileSystemRMStateStore.java | 6 - .../recovery/RMStateStore.java | 13 +- .../recovery/ZKRMStateStore.java | 621 ++++++++++++++++++ .../recovery/TestRMStateStore.java | 87 ++- ...TestZKRMStateStoreZKClientConnections.java | 218 ++++++ 10 files changed, 1016 insertions(+), 27 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java index 89af8d06f1a..11d46574b23 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java @@ -53,6 +53,8 @@ import org.junit.Before; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; + /** * Copy-paste of ClientBase from ZooKeeper, but without any of the * JMXEnv verification. There seems to be a bug ZOOKEEPER-1438 @@ -111,7 +113,9 @@ public abstract class ClientBaseWithFixes extends ZKTestCase { synchronized boolean isConnected() { return connected; } - synchronized void waitForConnected(long timeout) throws InterruptedException, TimeoutException { + @VisibleForTesting + public synchronized void waitForConnected(long timeout) + throws InterruptedException, TimeoutException { long expire = Time.now() + timeout; long left = timeout; while(!connected && left > 0) { @@ -123,7 +127,9 @@ public abstract class ClientBaseWithFixes extends ZKTestCase { } } - synchronized void waitForDisconnected(long timeout) throws InterruptedException, TimeoutException { + @VisibleForTesting + public synchronized void waitForDisconnected(long timeout) + throws InterruptedException, TimeoutException { long expire = Time.now() + timeout; long left = timeout; while(connected && left > 0) { diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index ad38abc1739..7a506d3f877 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -27,10 +27,15 @@ Release 2.3.0 - UNRELEASED IMPROVEMENTS YARN-905. Add state filters to nodes CLI (Wei Yan via Sandy Ryza) + YARN-1098. Separate out RM services into Always On and Active (Karthik Kambatla via bikas) + YARN-1027. Implement RMHAProtocolService (Karthik Kambatla via bikas) + YARN-353. Add Zookeeper-based store implementation for RMStateStore. + (Bikas Saha, Jian He and Karthik Kambatla via hitesh) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 8142b78d58e..da23133cfa5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -276,12 +276,40 @@ public class YarnConfiguration extends Configuration { public static final String RM_HA_ENABLED = RM_HA_PREFIX + "enabled"; public static final boolean DEFAULT_RM_HA_ENABLED = false; + + //////////////////////////////// + // RM state store configs + //////////////////////////////// /** The class to use as the persistent store.*/ public static final String RM_STORE = RM_PREFIX + "store.class"; /** URI for FileSystemRMStateStore */ public static final String FS_RM_STATE_STORE_URI = RM_PREFIX + "fs.state-store.uri"; + /** + * Comma separated host:port pairs, each corresponding to a ZK server for + * ZKRMStateStore + */ + public static final String ZK_STATE_STORE_PREFIX = + RM_PREFIX + "zk.state-store."; + public static final String ZK_RM_STATE_STORE_NUM_RETRIES = + ZK_STATE_STORE_PREFIX + "num-retries"; + public static final int DEFAULT_ZK_RM_STATE_STORE_NUM_RETRIES = 3; + public static final String ZK_RM_STATE_STORE_ADDRESS = + ZK_STATE_STORE_PREFIX + "address"; + /** Timeout in millisec for ZK server connection for ZKRMStateStore */ + public static final String ZK_RM_STATE_STORE_TIMEOUT_MS = + ZK_STATE_STORE_PREFIX + "timeout.ms"; + public static final int DEFAULT_ZK_RM_STATE_STORE_TIMEOUT_MS = 60000; + /** Parent znode path under which ZKRMStateStore will create znodes */ + public static final String ZK_RM_STATE_STORE_PARENT_PATH = + ZK_STATE_STORE_PREFIX + "parent-path"; + public static final String DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH = "/rmstore"; + /** ACL for znodes in ZKRMStateStore */ + public static final String ZK_RM_STATE_STORE_ACL = + ZK_STATE_STORE_PREFIX + "acl"; + public static final String DEFAULT_ZK_RM_STATE_STORE_ACL = + "world:anyone:rwcda"; /** The maximum number of completed applications RM keeps. */ public static final String RM_MAX_COMPLETED_APPLICATIONS = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index c0f3a1ec417..8aa7d01735b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -258,6 +258,51 @@ org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore + + Host:Port of the ZooKeeper server where RM state will + be stored. This must be supplied when using + org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore + as the value for yarn.resourcemanager.store.class + yarn.resourcemanager.zk.state-store.address + + + + + Number of times ZKRMStateStore tries to connect to + ZooKeeper. This may be supplied when using + org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore + as the value for yarn.resourcemanager.store.class + yarn.resourcemanager.zk.state-store.num-retries + 3 + + + + Full path of the ZooKeeper znode where RM state will be + stored. This must be supplied when using + org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore + as the value for yarn.resourcemanager.store.class + yarn.resourcemanager.zk.state-store.parent-path + /rmstore + + + + Timeout when connecting to ZooKeeper. + This may be supplied when using + org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore + as the value for yarn.resourcemanager.store.class + yarn.resourcemanager.zk.state-store.timeout.ms + 60000 + + + + ACL's to be used for ZooKeeper znodes. + This may be supplied when using + org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore + as the value for yarn.resourcemanager.store.class + yarn.resourcemanager.zk.state-store.acl + world:anyone:rwcda + + URI pointing to the location of the FileSystem path where RM state will be stored. This must be supplied when using diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml index eacd35ea859..1c138559016 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml @@ -41,6 +41,16 @@ org.apache.hadoop hadoop-yarn-server-web-proxy + + org.apache.zookeeper + zookeeper + + + org.apache.zookeeper + zookeeper + test-jar + test + org.apache.hadoop hadoop-hdfs diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index 30d5d4108d4..062f5cc5532 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -63,12 +63,6 @@ public class FileSystemRMStateStore extends RMStateStore { public static final Log LOG = LogFactory.getLog(FileSystemRMStateStore.class); private static final String ROOT_DIR_NAME = "FSRMStateRoot"; - private static final String RM_DT_SECRET_MANAGER_ROOT = "RMDTSecretManagerRoot"; - private static final String RM_APP_ROOT = "RMAppRoot"; - private static final String DELEGATION_KEY_PREFIX = "DelegationKey_"; - private static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_"; - private static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX = - "RMDTSequenceNumber_"; protected FileSystem fs; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 382ed97d61b..2f4b8960205 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -65,6 +65,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt */ public abstract class RMStateStore extends AbstractService { + // constants for RM App state and RMDTSecretManagerState. + protected static final String RM_APP_ROOT = "RMAppRoot"; + protected static final String RM_DT_SECRET_MANAGER_ROOT = "RMDTSecretManagerRoot"; + protected static final String DELEGATION_KEY_PREFIX = "DelegationKey_"; + protected static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_"; + protected static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX = + "RMDTSequenceNumber_"; + public static final Log LOG = LogFactory.getLog(RMStateStore.class); public RMStateStore() { @@ -464,8 +472,9 @@ public abstract class RMStateStore extends AbstractService { (ApplicationAttemptStateDataPBImpl) ApplicationAttemptStateDataPBImpl .newApplicationAttemptStateData(attemptState.getAttemptId(), attemptState.getMasterContainer(), appAttemptTokens); - - LOG.info("Storing info for attempt: " + attemptState.getAttemptId()); + if (LOG.isDebugEnabled()) { + LOG.debug("Storing info for attempt: " + attemptState.getAttemptId()); + } storeApplicationAttemptState(attemptState.getAttemptId().toString(), attemptStateData); } catch (Exception e) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java new file mode 100644 index 00000000000..41c95d3ff85 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -0,0 +1,621 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.ZKUtil; + +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.Code; +import org.apache.zookeeper.Op; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +@Private +@Unstable +public class ZKRMStateStore extends RMStateStore { + + public static final Log LOG = LogFactory.getLog(ZKRMStateStore.class); + + private static final String ROOT_ZNODE_NAME = "ZKRMStateRoot"; + private int numRetries; + + private String zkHostPort = null; + private int zkSessionTimeout; + private List zkAcl; + private String zkRootNodePath; + private String rmDTSecretManagerRoot; + private String rmAppRoot; + private String dtSequenceNumberPath = null; + + @VisibleForTesting + protected String znodeWorkingPath; + + @VisibleForTesting + protected ZooKeeper zkClient; + private ZooKeeper oldZkClient; + + @Override + public synchronized void initInternal(Configuration conf) throws Exception { + zkHostPort = conf.get(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS); + if (zkHostPort == null) { + throw new YarnRuntimeException("No server address specified for " + + "zookeeper state store for Resource Manager recovery. " + + YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS + " is not configured."); + } + numRetries = + conf.getInt(YarnConfiguration.ZK_RM_STATE_STORE_NUM_RETRIES, + YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_NUM_RETRIES); + znodeWorkingPath = + conf.get(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, + YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH); + zkSessionTimeout = + conf.getInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS, + YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_TIMEOUT_MS); + // Parse authentication from configuration. + String zkAclConf = + conf.get(YarnConfiguration.ZK_RM_STATE_STORE_ACL, + YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_ACL); + zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf); + + try { + zkAcl = ZKUtil.parseACLs(zkAclConf); + } catch (ZKUtil.BadAclFormatException bafe) { + LOG.error("Invalid format for " + YarnConfiguration.ZK_RM_STATE_STORE_ACL); + throw bafe; + } + + zkRootNodePath = znodeWorkingPath + "/" + ROOT_ZNODE_NAME; + rmDTSecretManagerRoot = zkRootNodePath + "/" + RM_DT_SECRET_MANAGER_ROOT; + rmAppRoot = zkRootNodePath + "/" + RM_APP_ROOT; + } + + @Override + public synchronized void startInternal() throws Exception { + // createConnection for future API calls + createConnection(); + + // ensure root dirs exist + createRootDir(znodeWorkingPath); + createRootDir(zkRootNodePath); + createRootDir(rmDTSecretManagerRoot); + createRootDir(rmAppRoot); + } + + private void createRootDir(String rootPath) throws Exception { + try { + createWithRetries(rootPath, null, zkAcl, CreateMode.PERSISTENT); + } catch (KeeperException ke) { + if (ke.code() != Code.NODEEXISTS) { + throw ke; + } + } + } + + private synchronized void closeZkClients() throws IOException { + if (zkClient != null) { + try { + zkClient.close(); + } catch (InterruptedException e) { + throw new IOException("Interrupted while closing ZK", e); + } + zkClient = null; + } + if (oldZkClient != null) { + try { + oldZkClient.close(); + } catch (InterruptedException e) { + throw new IOException("Interrupted while closing old ZK", e); + } + oldZkClient = null; + } + } + + @Override + protected synchronized void closeInternal() throws Exception { + closeZkClients(); + } + + @Override + public synchronized RMState loadState() throws Exception { + RMState rmState = new RMState(); + // recover DelegationTokenSecretManager + loadRMDTSecretManagerState(rmState); + // recover RM applications + loadRMAppState(rmState); + return rmState; + } + + private synchronized void loadRMDTSecretManagerState(RMState rmState) + throws Exception { + List childNodes = zkClient.getChildren(rmDTSecretManagerRoot, true); + + for (String childNodeName : childNodes) { + if (childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) { + rmState.rmSecretManagerState.dtSequenceNumber = + Integer.parseInt(childNodeName.split("_")[1]); + continue; + } + String childNodePath = getNodePath(rmDTSecretManagerRoot, childNodeName); + byte[] childData = getDataWithRetries(childNodePath, true); + + ByteArrayInputStream is = new ByteArrayInputStream(childData); + DataInputStream fsIn = new DataInputStream(is); + try { + if (childNodeName.startsWith(DELEGATION_KEY_PREFIX)) { + DelegationKey key = new DelegationKey(); + key.readFields(fsIn); + rmState.rmSecretManagerState.masterKeyState.add(key); + } else if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) { + RMDelegationTokenIdentifier identifier = + new RMDelegationTokenIdentifier(); + identifier.readFields(fsIn); + long renewDate = fsIn.readLong(); + rmState.rmSecretManagerState.delegationTokenState.put(identifier, + renewDate); + } + } finally { + is.close(); + } + } + } + + private synchronized void loadRMAppState(RMState rmState) throws Exception { + List childNodes = zkClient.getChildren(rmAppRoot, true); + List attempts = + new ArrayList(); + for (String childNodeName : childNodes) { + String childNodePath = getNodePath(rmAppRoot, childNodeName); + byte[] childData = getDataWithRetries(childNodePath, true); + if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) { + // application + LOG.info("Loading application from znode: " + childNodeName); + ApplicationId appId = ConverterUtils.toApplicationId(childNodeName); + ApplicationStateDataPBImpl appStateData = + new ApplicationStateDataPBImpl( + ApplicationStateDataProto.parseFrom(childData)); + ApplicationState appState = + new ApplicationState(appStateData.getSubmitTime(), + appStateData.getApplicationSubmissionContext(), + appStateData.getUser()); + if (!appId.equals(appState.context.getApplicationId())) { + throw new YarnRuntimeException("The child node name is different " + + "from the application id"); + } + rmState.appState.put(appId, appState); + } else if (childNodeName + .startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) { + // attempt + LOG.info("Loading application attempt from znode: " + childNodeName); + ApplicationAttemptId attemptId = + ConverterUtils.toApplicationAttemptId(childNodeName); + ApplicationAttemptStateDataPBImpl attemptStateData = + new ApplicationAttemptStateDataPBImpl( + ApplicationAttemptStateDataProto.parseFrom(childData)); + Credentials credentials = null; + if (attemptStateData.getAppAttemptTokens() != null) { + credentials = new Credentials(); + DataInputByteBuffer dibb = new DataInputByteBuffer(); + dibb.reset(attemptStateData.getAppAttemptTokens()); + credentials.readTokenStorageStream(dibb); + } + ApplicationAttemptState attemptState = + new ApplicationAttemptState(attemptId, + attemptStateData.getMasterContainer(), credentials); + if (!attemptId.equals(attemptState.getAttemptId())) { + throw new YarnRuntimeException("The child node name is different " + + "from the application attempt id"); + } + attempts.add(attemptState); + } else { + LOG.info("Unknown child node with name: " + childNodeName); + } + } + + // go through all attempts and add them to their apps + for (ApplicationAttemptState attemptState : attempts) { + ApplicationId appId = attemptState.getAttemptId().getApplicationId(); + ApplicationState appState = rmState.appState.get(appId); + if (appState != null) { + appState.attempts.put(attemptState.getAttemptId(), attemptState); + } else { + // the application znode may have been removed when the application + // completed but the RM might have stopped before it could remove the + // application attempt znodes + LOG.info("Application node not found for attempt: " + + attemptState.getAttemptId()); + deleteWithRetries( + getNodePath(rmAppRoot, attemptState.getAttemptId().toString()), + 0); + } + } + } + + @Override + public synchronized void storeApplicationState( + String appId, ApplicationStateDataPBImpl appStateDataPB) throws + Exception { + String nodeCreatePath = getNodePath(rmAppRoot, appId); + + if (LOG.isDebugEnabled()) { + LOG.debug("Storing info for app: " + appId + " at: " + nodeCreatePath); + } + byte[] appStateData = appStateDataPB.getProto().toByteArray(); + createWithRetries( + nodeCreatePath, appStateData, zkAcl, CreateMode.PERSISTENT); + } + + @Override + public synchronized void storeApplicationAttemptState( + String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB) + throws Exception { + String nodeCreatePath = getNodePath(rmAppRoot, attemptId); + if (LOG.isDebugEnabled()) { + LOG.debug("Storing info for attempt: " + attemptId + " at: " + + nodeCreatePath); + } + byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray(); + createWithRetries(nodeCreatePath, attemptStateData, zkAcl, + CreateMode.PERSISTENT); + } + + @Override + public synchronized void removeApplicationState(ApplicationState appState) + throws Exception { + String appId = appState.getAppId().toString(); + String nodeRemovePath = getNodePath(rmAppRoot, appId); + ArrayList opList = new ArrayList(); + opList.add(Op.delete(nodeRemovePath, 0)); + + for (ApplicationAttemptId attemptId : appState.attempts.keySet()) { + String attemptRemovePath = getNodePath(rmAppRoot, attemptId.toString()); + opList.add(Op.delete(attemptRemovePath, 0)); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Removing info for app: " + appId + " at: " + nodeRemovePath + + " and its attempts."); + } + doMultiWithRetries(opList); + } + + @Override + protected synchronized void storeRMDelegationTokenAndSequenceNumberState( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, + int latestSequenceNumber) throws Exception { + ArrayList opList = new ArrayList(); + // store RM delegation token + String nodeCreatePath = + getNodePath(rmDTSecretManagerRoot, DELEGATION_TOKEN_PREFIX + + rmDTIdentifier.getSequenceNumber()); + ByteArrayOutputStream os = new ByteArrayOutputStream(); + DataOutputStream fsOut = new DataOutputStream(os); + try { + rmDTIdentifier.write(fsOut); + fsOut.writeLong(renewDate); + if (LOG.isDebugEnabled()) { + LOG.debug("Storing RMDelegationToken_" + + rmDTIdentifier.getSequenceNumber()); + } + opList.add(Op.create(nodeCreatePath, os.toByteArray(), zkAcl, + CreateMode.PERSISTENT)); + } finally { + os.close(); + } + + // store sequence number + String latestSequenceNumberPath = + getNodePath(rmDTSecretManagerRoot, + DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX + latestSequenceNumber); + if (LOG.isDebugEnabled()) { + LOG.debug("Storing " + DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX + + latestSequenceNumber); + } + + if (dtSequenceNumberPath != null) { + opList.add(Op.delete(dtSequenceNumberPath, 0)); + } + opList.add(Op.create(latestSequenceNumberPath, null, zkAcl, + CreateMode.PERSISTENT)); + dtSequenceNumberPath = latestSequenceNumberPath; + doMultiWithRetries(opList); + } + + @Override + protected synchronized void removeRMDelegationTokenState( + RMDelegationTokenIdentifier rmDTIdentifier) throws Exception { + String nodeRemovePath = + getNodePath(rmDTSecretManagerRoot, DELEGATION_TOKEN_PREFIX + + rmDTIdentifier.getSequenceNumber()); + if (LOG.isDebugEnabled()) { + LOG.debug("Removing RMDelegationToken_" + + rmDTIdentifier.getSequenceNumber()); + } + deleteWithRetries(nodeRemovePath, 0); + } + + @Override + protected synchronized void storeRMDTMasterKeyState( + DelegationKey delegationKey) throws Exception { + String nodeCreatePath = + getNodePath(rmDTSecretManagerRoot, DELEGATION_KEY_PREFIX + + delegationKey.getKeyId()); + ByteArrayOutputStream os = new ByteArrayOutputStream(); + DataOutputStream fsOut = new DataOutputStream(os); + if (LOG.isDebugEnabled()) { + LOG.debug("Storing RMDelegationKey_" + delegationKey.getKeyId()); + } + delegationKey.write(fsOut); + try { + createWithRetries(nodeCreatePath, os.toByteArray(), zkAcl, + CreateMode.PERSISTENT); + } finally { + os.close(); + } + } + + @Override + protected synchronized void removeRMDTMasterKeyState( + DelegationKey delegationKey) throws Exception { + String nodeRemovePath = + getNodePath(rmDTSecretManagerRoot, DELEGATION_KEY_PREFIX + + delegationKey.getKeyId()); + if (LOG.isDebugEnabled()) { + LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId()); + } + deleteWithRetries(nodeRemovePath, 0); + } + + // ZK related code + /** + * Watcher implementation which forward events to the ZKRMStateStore This + * hides the ZK methods of the store from its public interface + */ + private final class ForwardingWatcher implements Watcher { + + @Override + public void process(WatchedEvent event) { + try { + ZKRMStateStore.this.processWatchEvent(event); + } catch (Throwable t) { + LOG.error("Failed to process watcher event " + event + ": " + + StringUtils.stringifyException(t)); + } + } + } + + @VisibleForTesting + public synchronized void processWatchEvent(WatchedEvent event) + throws Exception { + Event.EventType eventType = event.getType(); + LOG.info("Watcher event type: " + eventType + " with state:" + + event.getState() + " for path:" + event.getPath() + " for " + this); + + if (eventType == Event.EventType.None) { + + // the connection state has changed + switch (event.getState()) { + case SyncConnected: + LOG.info("ZKRMStateStore Session connected"); + if (oldZkClient != null) { + // the SyncConnected must be from the client that sent Disconnected + zkClient = oldZkClient; + oldZkClient = null; + ZKRMStateStore.this.notifyAll(); + LOG.info("ZKRMStateStore Session restored"); + } + break; + case Disconnected: + LOG.info("ZKRMStateStore Session disconnected"); + oldZkClient = zkClient; + zkClient = null; + break; + case Expired: + // the connection got terminated because of session timeout + // call listener to reconnect + LOG.info("Session expired"); + createConnection(); + break; + default: + LOG.error("Unexpected Zookeeper" + + " watch event state: " + event.getState()); + break; + } + } + } + + @VisibleForTesting + String getNodePath(String root, String nodeName) { + return (root + "/" + nodeName); + } + + @VisibleForTesting + public String createWithRetries( + final String path, final byte[] data, final List acl, + final CreateMode mode) throws Exception { + return new ZKAction() { + @Override + public String run() throws KeeperException, InterruptedException { + return zkClient.create(path, data, acl, mode); + } + }.runWithRetries(); + } + + private void deleteWithRetries(final String path, final int version) + throws Exception { + new ZKAction() { + @Override + public Void run() throws KeeperException, InterruptedException { + /** + * Call exists() to leave a watch on the node denoted by path. + * Delete node if exists. To pass the existence information to the + * caller, call delete irrespective of whether node exists or not. + */ + if (zkClient.exists(path, true) == null) { + LOG.error("Trying to delete a path (" + path + + ") that doesn't exist."); + } + zkClient.delete(path, version); + return null; + } + }.runWithRetries(); + } + + private void doMultiWithRetries(final ArrayList opList) throws Exception { + new ZKAction() { + @Override + public Void run() throws KeeperException, InterruptedException { + zkClient.multi(opList); + return null; + } + }.runWithRetries(); + } + + @VisibleForTesting + public void setDataWithRetries(final String path, final byte[] data, + final int version) throws Exception { + new ZKAction() { + @Override + public Void run() throws KeeperException, InterruptedException { + zkClient.setData(path, data, version); + return null; + } + }.runWithRetries(); + } + + @VisibleForTesting + public byte[] getDataWithRetries(final String path, final boolean watch) + throws Exception { + return new ZKAction() { + @Override + public byte[] run() throws KeeperException, InterruptedException { + Stat stat = new Stat(); + return zkClient.getData(path, watch, stat); + } + }.runWithRetries(); + } + + private abstract class ZKAction { + // run() expects synchronization on ZKRMStateStore.this + abstract T run() throws KeeperException, InterruptedException; + + T runWithCheck() throws Exception { + long startTime = System.currentTimeMillis(); + synchronized (ZKRMStateStore.this) { + while (zkClient == null) { + ZKRMStateStore.this.wait(zkSessionTimeout); + if (zkClient != null) { + break; + } + if (System.currentTimeMillis() - startTime > zkSessionTimeout) { + throw new IOException("Wait for ZKClient creation timed out"); + } + } + return run(); + } + } + + T runWithRetries() throws Exception { + int retry = 0; + while (true) { + try { + return runWithCheck(); + } catch (KeeperException ke) { + if (shouldRetry(ke.code()) && ++retry < numRetries) { + continue; + } + throw ke; + } + } + } + } + + private static boolean shouldRetry(Code code) { + switch (code) { + case CONNECTIONLOSS: + case OPERATIONTIMEOUT: + return true; + default: + break; + } + return false; + } + + private synchronized void createConnection() + throws IOException, InterruptedException { + closeZkClients(); + for (int retries = 0; retries < numRetries && zkClient == null; + retries++) { + try { + zkClient = getNewZooKeeper(); + } catch (IOException ioe) { + // Retry in case of network failures + LOG.info("Failed to connect to the ZooKeeper on attempt - " + + (retries + 1)); + ioe.printStackTrace(); + } + } + if (zkClient == null) { + LOG.error("Unable to connect to Zookeeper"); + throw new YarnRuntimeException("Unable to connect to Zookeeper"); + } + ZKRMStateStore.this.notifyAll(); + LOG.info("Created new ZK connection"); + } + + // protected to mock for testing + @VisibleForTesting + protected synchronized ZooKeeper getNewZooKeeper() + throws IOException, InterruptedException { + ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, null); + zk.register(new ForwardingWatcher()); + return zk; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java index aef92d5fdcc..80d923f7216 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java @@ -26,8 +26,10 @@ import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.io.IOException; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import javax.crypto.SecretKey; @@ -40,6 +42,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.ha.ClientBaseWithFixes; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.io.Text; @@ -67,13 +70,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.util.ConverterUtils; + +import org.apache.zookeeper.ZooKeeper; + import org.junit.Test; -public class TestRMStateStore { +public class TestRMStateStore extends ClientBaseWithFixes{ public static final Log LOG = LogFactory.getLog(TestRMStateStore.class); - class TestDispatcher implements Dispatcher, EventHandler { + static class TestDispatcher implements + Dispatcher, EventHandler { ApplicationAttemptId attemptId; Exception storedException; @@ -82,7 +89,8 @@ public class TestRMStateStore { @SuppressWarnings("rawtypes") @Override - public void register(Class eventType, EventHandler handler) { + public void register(Class eventType, + EventHandler handler) { } @Override @@ -108,10 +116,18 @@ public class TestRMStateStore { boolean isFinalStateValid() throws Exception; } + @Test + public void testZKRMStateStoreRealZK() throws Exception { + TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester(); + testRMAppStateStore(zkTester); + testRMDTSecretManagerStateStore(zkTester); + } + @Test public void testFSRMStateStore() throws Exception { HdfsConfiguration conf = new HdfsConfiguration(); - MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); try { TestFSRMStateStoreTester fsTester = new TestFSRMStateStoreTester(cluster); testRMAppStateStore(fsTester); @@ -121,6 +137,41 @@ public class TestRMStateStore { } } + class TestZKRMStateStoreTester implements RMStateStoreHelper { + ZooKeeper client; + ZKRMStateStore store; + + class TestZKRMStateStore extends ZKRMStateStore { + public TestZKRMStateStore(Configuration conf, String workingZnode) + throws Exception { + init(conf); + start(); + assertTrue(znodeWorkingPath.equals(workingZnode)); + } + + @Override + public ZooKeeper getNewZooKeeper() throws IOException { + return client; + } + } + + public RMStateStore getRMStateStore() throws Exception { + String workingZnode = "/Test"; + YarnConfiguration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort); + conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode); + this.client = createClient(); + this.store = new TestZKRMStateStore(conf, workingZnode); + return this.store; + } + + @Override + public boolean isFinalStateValid() throws Exception { + List nodes = client.getChildren(store.znodeWorkingPath, false); + return nodes.size() == 1; + } + } + class TestFSRMStateStoreTester implements RMStateStoreHelper { Path workingDirPathURI; FileSystemRMStateStore store; @@ -149,7 +200,8 @@ public class TestRMStateStore { @Override public RMStateStore getRMStateStore() throws Exception { YarnConfiguration conf = new YarnConfiguration(); - conf.set(YarnConfiguration.FS_RM_STATE_STORE_URI, workingDirPathURI.toString()); + conf.set(YarnConfiguration.FS_RM_STATE_STORE_URI, + workingDirPathURI.toString()); this.store = new TestFileSystemRMStore(conf); return store; } @@ -158,11 +210,7 @@ public class TestRMStateStore { public boolean isFinalStateValid() throws Exception { FileSystem fs = cluster.getFileSystem(); FileStatus[] files = fs.listStatus(workingDirPathURI); - if(files.length == 1) { - // only store root directory should exist - return true; - } - return false; + return files.length == 1; } } @@ -183,9 +231,10 @@ public class TestRMStateStore { dispatcher.notified = false; } - void storeApp(RMStateStore store, ApplicationId appId, long time) - throws Exception { - ApplicationSubmissionContext context = new ApplicationSubmissionContextPBImpl(); + void storeApp( + RMStateStore store, ApplicationId appId, long time) throws Exception { + ApplicationSubmissionContext context = + new ApplicationSubmissionContextPBImpl(); context.setApplicationId(appId); RMApp mockApp = mock(RMApp.class); @@ -216,7 +265,8 @@ public class TestRMStateStore { return container.getId(); } - void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) throws Exception { + void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) + throws Exception { long submitTime = System.currentTimeMillis(); Configuration conf = new YarnConfiguration(); RMStateStore store = stateStoreHelper.getRMStateStore(); @@ -271,7 +321,8 @@ public class TestRMStateStore { RMApp mockRemovedApp = mock(RMApp.class); HashMap attempts = new HashMap(); - ApplicationSubmissionContext context = new ApplicationSubmissionContextPBImpl(); + ApplicationSubmissionContext context = + new ApplicationSubmissionContextPBImpl(); context.setApplicationId(appIdRemoved); when(mockRemovedApp.getSubmitTime()).thenReturn(submitTime); when(mockRemovedApp.getApplicationSubmissionContext()).thenReturn(context); @@ -288,7 +339,8 @@ public class TestRMStateStore { // load state store = stateStoreHelper.getRMStateStore(); RMState state = store.loadState(); - Map rmAppState = state.getApplicationState(); + Map rmAppState = + state.getApplicationState(); ApplicationState appState = rmAppState.get(appId1); // app is loaded @@ -362,7 +414,8 @@ public class TestRMStateStore { store.loadState().getRMDTSecretManagerState(); Assert.assertEquals(token1, secretManagerState.getTokenState()); Assert.assertEquals(keySet, secretManagerState.getMasterKeyState()); - Assert.assertEquals(sequenceNumber, secretManagerState.getDTSequenceNumber()); + Assert.assertEquals(sequenceNumber, + secretManagerState.getDTSequenceNumber()); } private Token generateAMRMToken( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java new file mode 100644 index 00000000000..7c807a5b602 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java @@ -0,0 +1,218 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.ClientBaseWithFixes; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.TestRMStateStore.TestDispatcher; +import org.apache.hadoop.util.ZKUtil; + +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.CyclicBarrier; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class TestZKRMStateStoreZKClientConnections extends + ClientBaseWithFixes { + private static final int ZK_OP_WAIT_TIME = 3000; + private Log LOG = + LogFactory.getLog(TestZKRMStateStoreZKClientConnections.class); + + class TestZKClient { + ZKRMStateStore store; + boolean forExpire = false; + TestForwardingWatcher watcher; + CyclicBarrier syncBarrier = new CyclicBarrier(2); + + protected class TestZKRMStateStore extends ZKRMStateStore { + public TestZKRMStateStore(Configuration conf, String workingZnode) + throws Exception { + init(conf); + start(); + assertTrue(znodeWorkingPath.equals(workingZnode)); + } + + @Override + public ZooKeeper getNewZooKeeper() + throws IOException, InterruptedException { + return createClient(watcher, hostPort, 100); + } + + @Override + public synchronized void processWatchEvent(WatchedEvent event) + throws Exception { + + if (forExpire) { + // a hack... couldn't find a way to trigger expired event. + WatchedEvent expriredEvent = new WatchedEvent( + Watcher.Event.EventType.None, + Watcher.Event.KeeperState.Expired, null); + super.processWatchEvent(expriredEvent); + forExpire = false; + syncBarrier.await(); + } else { + super.processWatchEvent(event); + } + } + } + + private class TestForwardingWatcher extends + ClientBaseWithFixes.CountdownWatcher { + public void process(WatchedEvent event) { + super.process(event); + try { + if (store != null) { + store.processWatchEvent(event); + } + } catch (Throwable t) { + LOG.error("Failed to process watcher event " + event + ": " + + StringUtils.stringifyException(t)); + } + } + } + + public RMStateStore getRMStateStore(Configuration conf) throws Exception { + String workingZnode = "/Test"; + conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort); + conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode); + watcher = new TestForwardingWatcher(); + this.store = new TestZKRMStateStore(conf, workingZnode); + return this.store; + } + } + + @Test(timeout = 20000) + public void testZKClientDisconnectAndReconnect() + throws Exception { + + TestZKClient zkClientTester = new TestZKClient(); + String path = "/test"; + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS, 100); + ZKRMStateStore store = + (ZKRMStateStore) zkClientTester.getRMStateStore(conf); + TestDispatcher dispatcher = new TestDispatcher(); + store.setRMDispatcher(dispatcher); + + // trigger watch + store.createWithRetries(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + store.getDataWithRetries(path, true); + store.setDataWithRetries(path, "newBytes".getBytes(), 0); + + stopServer(); + zkClientTester.watcher.waitForDisconnected(ZK_OP_WAIT_TIME); + try { + store.getDataWithRetries(path, true); + fail("Expected ZKClient time out exception"); + } catch (Exception e) { + assertTrue(e.getMessage().contains( + "Wait for ZKClient creation timed out")); + } + + // ZKRMStateStore Session restored + startServer(); + zkClientTester.watcher.waitForConnected(ZK_OP_WAIT_TIME); + byte[] ret = null; + try { + ret = store.getDataWithRetries(path, true); + } catch (Exception e) { + String error = "ZKRMStateStore Session restore failed"; + LOG.error(error, e); + fail(error); + } + Assert.assertEquals("newBytes", new String(ret)); + } + + @Test(timeout = 20000) + public void testZKSessionTimeout() throws Exception { + + TestZKClient zkClientTester = new TestZKClient(); + String path = "/test"; + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS, 100); + ZKRMStateStore store = + (ZKRMStateStore) zkClientTester.getRMStateStore(conf); + TestDispatcher dispatcher = new TestDispatcher(); + store.setRMDispatcher(dispatcher); + + // a hack to trigger expired event + zkClientTester.forExpire = true; + + // trigger watch + store.createWithRetries(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + store.getDataWithRetries(path, true); + store.setDataWithRetries(path, "bytes".getBytes(), 0); + + zkClientTester.syncBarrier.await(); + // after this point, expired event has already been processed. + + try { + byte[] ret = store.getDataWithRetries(path, false); + Assert.assertEquals("bytes", new String(ret)); + } catch (Exception e) { + String error = "New session creation failed"; + LOG.error(error, e); + fail(error); + } + } + + @Test (timeout = 20000) + public void testSetZKAcl() { + TestZKClient zkClientTester = new TestZKClient(); + YarnConfiguration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ACL, "world:anyone:rwca"); + try { + zkClientTester.store.zkClient.delete(zkClientTester.store + .znodeWorkingPath, -1); + fail("Shouldn't be able to delete path"); + } catch (Exception e) {/* expected behavior */} + } + + @Test (timeout = 20000) + public void testInvalidZKAclConfiguration() { + TestZKClient zkClientTester = new TestZKClient(); + YarnConfiguration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ACL, "randomstring&*"); + try { + zkClientTester.getRMStateStore(conf); + fail("ZKRMStateStore created with bad ACL"); + } catch (ZKUtil.BadAclFormatException bafe) { + // expected behavior + } catch (Exception e) { + String error = "Incorrect exception on BadAclFormat"; + LOG.error(error, e); + fail(error); + } + } +} From 6b1f5073a7c553cbe3cafebbe687c19bac8fb9f8 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Thu, 19 Sep 2013 20:58:18 +0000 Subject: [PATCH 06/11] MAPREDUCE-5504. mapred queue -info inconsistent with types (Kousuke Saruta via tgraves) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1524841 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 6 ++++++ .../java/org/apache/hadoop/mapreduce/TypeConverter.java | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 2d4a38a9c2c..63b24f55dc8 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -190,6 +190,9 @@ Release 2.2.0 - UNRELEASED BUG FIXES + MAPREDUCE-5504. mapred queue -info inconsistent with types (Kousuke Saruta + via tgraves) + Release 2.1.1-beta - 2013-09-23 INCOMPATIBLE CHANGES @@ -1365,6 +1368,9 @@ Release 0.23.10 - UNRELEASED MAPREDUCE-5475. MRClientService does not verify ACLs properly (jlowe) + MAPREDUCE-5504. mapred queue -info inconsistent with types (Kousuke Saruta + via tgraves) + Release 0.23.9 - 2013-07-08 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java index 57fa8aaecba..a28b5eb6eb1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java @@ -469,7 +469,7 @@ public class TypeConverter { QueueInfo toReturn = new QueueInfo(queueInfo.getQueueName(), "Capacity: " + queueInfo.getCapacity() * 100 + ", MaximumCapacity: " + (queueInfo.getMaximumCapacity() < 0 ? "UNDEFINED" : - queueInfo.getMaximumCapacity()) + ", CurrentCapacity: " + + queueInfo.getMaximumCapacity() * 100) + ", CurrentCapacity: " + queueInfo.getCurrentCapacity() * 100, fromYarn(queueInfo.getQueueState()), TypeConverter.fromYarnApps(queueInfo.getApplications(), conf)); List childQueues = new ArrayList(); From af78fd729c3b847c447d4a8edd758fb0c9b25b02 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Thu, 19 Sep 2013 22:35:12 +0000 Subject: [PATCH 07/11] MAPREDUCE-5488. Changed MR client to keep trying to reach the application when it sees that on attempt's AM is down. Contributed by Jian He. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1524856 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 ++ .../dev-support/findbugs-exclude.xml | 6 +++ .../apache/hadoop/mapreduce/MRJobConfig.java | 2 +- .../src/main/resources/mapred-default.xml | 4 +- .../hadoop/mapred/ClientServiceDelegate.java | 45 +++++++++++++++---- .../mapred/TestClientServiceDelegate.java | 42 +++++++++++++++++ 6 files changed, 91 insertions(+), 11 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 63b24f55dc8..70ff15eb64d 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -193,6 +193,9 @@ Release 2.2.0 - UNRELEASED MAPREDUCE-5504. mapred queue -info inconsistent with types (Kousuke Saruta via tgraves) + MAPREDUCE-5488. Changed MR client to keep trying to reach the application + when it sees that on attempt's AM is down. (Jian He via vinodkv) + Release 2.1.1-beta - 2013-09-23 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml b/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml index 25fada378a4..11d4643066d 100644 --- a/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml +++ b/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml @@ -496,6 +496,12 @@ + + + + + + diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 85f6b96e291..24c33994493 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -357,7 +357,7 @@ public interface MRJobConfig { public static final int DEFAULT_MR_CLIENT_TO_AM_IPC_MAX_RETRIES = 3; /** - * The number of client retries to the RM/HS/AM before throwing exception. + * The number of client retries to the RM/HS before throwing exception. */ public static final String MR_CLIENT_MAX_RETRIES = MR_PREFIX + "client.max-retries"; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index cf28e4d288b..db63f4511f0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -982,7 +982,7 @@ yarn.app.mapreduce.client-am.ipc.max-retries - 1 + 3 The number of client retries to the AM - before reconnecting to the RM to fetch Application Status. @@ -990,7 +990,7 @@ yarn.app.mapreduce.client.max-retries 3 - The number of client retries to the RM/HS/AM before + The number of client retries to the RM/HS before throwing exception. This is a layer above the ipc. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java index 77a0ad2b8c8..54c5b608865 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java @@ -26,6 +26,7 @@ import java.security.PrivilegedExceptionAction; import java.util.EnumSet; import java.util.HashMap; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -77,6 +78,8 @@ import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier; import org.apache.hadoop.yarn.util.ConverterUtils; +import com.google.common.annotations.VisibleForTesting; + public class ClientServiceDelegate { private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class); private static final String UNAVAILABLE = "N/A"; @@ -93,7 +96,8 @@ public class ClientServiceDelegate { private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); private static String UNKNOWN_USER = "Unknown User"; private String trackingUrl; - + private AtomicBoolean usingAMProxy = new AtomicBoolean(false); + private int maxClientRetry; private boolean amAclDisabledStatusLogged = false; public ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm, @@ -287,6 +291,7 @@ public class ClientServiceDelegate { MRClientProtocol proxy = (MRClientProtocol) rpc.getProxy(MRClientProtocol.class, serviceAddr, conf); + usingAMProxy.set(true); LOG.trace("Connected to ApplicationMaster at: " + serviceAddr); return proxy; } @@ -301,13 +306,15 @@ public class ClientServiceDelegate { } catch (NoSuchMethodException e) { throw new YarnRuntimeException("Method name mismatch", e); } - int maxRetries = this.conf.getInt( + maxClientRetry = this.conf.getInt( MRJobConfig.MR_CLIENT_MAX_RETRIES, MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES); IOException lastException = null; - while (maxRetries > 0) { + while (maxClientRetry > 0) { + MRClientProtocol MRClientProxy = null; try { - return methodOb.invoke(getProxy(), args); + MRClientProxy = getProxy(); + return methodOb.invoke(MRClientProxy, args); } catch (InvocationTargetException e) { // Will not throw out YarnException anymore LOG.debug("Failed to contact AM/History for job " + jobId + @@ -315,22 +322,44 @@ public class ClientServiceDelegate { // Force reconnection by setting the proxy to null. realProxy = null; // HS/AMS shut down - maxRetries--; + // if it's AM shut down, do not decrement maxClientRetry as we wait for + // AM to be restarted. + if (!usingAMProxy.get()) { + maxClientRetry--; + } + usingAMProxy.set(false); lastException = new IOException(e.getTargetException()); - + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + LOG.warn("ClientServiceDelegate invoke call interrupted", ie); + throw new YarnRuntimeException(ie); + } } catch (Exception e) { LOG.debug("Failed to contact AM/History for job " + jobId + " Will retry..", e); // Force reconnection by setting the proxy to null. realProxy = null; // RM shutdown - maxRetries--; - lastException = new IOException(e.getMessage()); + maxClientRetry--; + lastException = new IOException(e.getMessage()); + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + LOG.warn("ClientServiceDelegate invoke call interrupted", ie); + throw new YarnRuntimeException(ie); + } } } throw lastException; } + // Only for testing + @VisibleForTesting + public int getMaxClientRetry() { + return this.maxClientRetry; + } + public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException, InterruptedException { org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter.toYarn(arg0); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java index 7889774e487..7fbd2abab71 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java @@ -140,6 +140,48 @@ public class TestClientServiceDelegate { any(GetJobReportRequest.class)); } + @Test + public void testRetriesOnAMConnectionFailures() throws Exception { + if (!isAMReachableFromClient) { + return; + } + + ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class); + when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId())) + .thenReturn(getRunningApplicationReport("am1", 78)); + + // throw exception in 1st, 2nd, 3rd and 4th call of getJobReport, and + // succeed in the 5th call. + final MRClientProtocol amProxy = mock(MRClientProtocol.class); + when(amProxy.getJobReport(any(GetJobReportRequest.class))) + .thenThrow(new RuntimeException("11")) + .thenThrow(new RuntimeException("22")) + .thenThrow(new RuntimeException("33")) + .thenThrow(new RuntimeException("44")).thenReturn(getJobReportResponse()); + Configuration conf = new YarnConfiguration(); + conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME); + conf.setBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED, + !isAMReachableFromClient); + ClientServiceDelegate clientServiceDelegate = + new ClientServiceDelegate(conf, rm, oldJobId, null) { + @Override + MRClientProtocol instantiateAMProxy( + final InetSocketAddress serviceAddr) throws IOException { + super.instantiateAMProxy(serviceAddr); + return amProxy; + } + }; + + JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId); + + Assert.assertNotNull(jobStatus); + // assert maxClientRetry is not decremented. + Assert.assertEquals(conf.getInt(MRJobConfig.MR_CLIENT_MAX_RETRIES, + MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES), clientServiceDelegate + .getMaxClientRetry()); + verify(amProxy, times(5)).getJobReport(any(GetJobReportRequest.class)); + } + @Test public void testHistoryServerNotConfigured() throws Exception { //RM doesn't have app report and job History Server is not configured From 13420d01f2fb28a0e445dc87ad9ee9b607804d53 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Thu, 19 Sep 2013 23:35:58 +0000 Subject: [PATCH 08/11] YARN-1203. Changed YARN web-app proxy to handle http and https URLs from AM registration and finish correctly. Contributed by Omkar Vinit Joshi. MAPREDUCE-5515. Fixed MR AM's webapp to depend on a new config mapreduce.ssl.enabled to enable https and disabling it by default as MR AM needs to set up its own certificates etc and not depend on clusters'. Contributed by Omkar Vinit Joshi. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1524864 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/hadoop/http/HttpConfig.java | 4 +- hadoop-mapreduce-project/CHANGES.txt | 5 ++ .../hadoop/mapreduce/v2/app/MRAppMaster.java | 15 ++++++ .../v2/app/client/MRClientService.java | 3 ++ .../mapreduce/v2/app/rm/RMCommunicator.java | 23 ++++---- .../v2/app/webapp/AppController.java | 6 ++- .../mapreduce/v2/app/webapp/JobBlock.java | 2 +- .../mapreduce/v2/app/webapp/NavBlock.java | 2 +- .../mapreduce/v2/app/webapp/TaskPage.java | 4 +- .../mapreduce/v2/app/webapp/WebAppUtil.java | 39 ++++++++++++++ .../v2/app/webapp/dao/AMAttemptInfo.java | 3 +- .../org/apache/hadoop/mapreduce/MRConfig.java | 5 ++ .../src/main/resources/mapred-default.xml | 14 +++++ .../mapreduce/v2/hs/JobHistoryServer.java | 6 +++ .../mapreduce/v2/hs/webapp/HsJobBlock.java | 3 +- .../mapreduce/v2/hs/webapp/HsTaskPage.java | 3 +- hadoop-yarn-project/CHANGES.txt | 3 ++ .../FinishApplicationMasterRequest.java | 4 ++ .../RegisterApplicationMasterRequest.java | 4 ++ .../hadoop/yarn/conf/YarnConfiguration.java | 5 +- .../yarn/server/webproxy/ProxyUriUtils.java | 54 +++++++++++++++++-- .../server/webproxy/WebAppProxyServlet.java | 13 ++--- 22 files changed, 189 insertions(+), 31 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/WebAppUtil.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpConfig.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpConfig.java index d906d9642a7..d9e219a332a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpConfig.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpConfig.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.http; -import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -38,8 +37,7 @@ public class HttpConfig { CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_DEFAULT); } - @VisibleForTesting - static void setSecure(boolean secure) { + public static void setSecure(boolean secure) { sslEnabled = secure; } diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 70ff15eb64d..c10f96013fc 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -196,6 +196,11 @@ Release 2.2.0 - UNRELEASED MAPREDUCE-5488. Changed MR client to keep trying to reach the application when it sees that on attempt's AM is down. (Jian He via vinodkv) + MAPREDUCE-5515. Fixed MR AM's webapp to depend on a new config + mapreduce.ssl.enabled to enable https and disabling it by default as MR AM + needs to set up its own certificates etc and not depend on clusters'. + (Omkar Vinit Joshi via vinodkv) + Release 2.1.1-beta - 2013-09-23 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 24db757e4f6..6fffb2af6e3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -36,14 +36,17 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.mapred.FileOutputCommitter; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.LocalContainerLauncher; import org.apache.hadoop.mapred.TaskAttemptListenerImpl; import org.apache.hadoop.mapred.TaskUmbilicalProtocol; +import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; @@ -101,6 +104,7 @@ import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator; import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator; import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent; +import org.apache.hadoop.mapreduce.v2.app.webapp.WebAppUtil; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; @@ -1313,6 +1317,7 @@ public class MRAppMaster extends CompositeService { containerId.getApplicationAttemptId(); long appSubmitTime = Long.parseLong(appSubmitTimeStr); + MRAppMaster appMaster = new MRAppMaster(applicationAttemptId, containerId, nodeHostString, Integer.parseInt(nodePortString), @@ -1322,6 +1327,16 @@ public class MRAppMaster extends CompositeService { new MRAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY); JobConf conf = new JobConf(new YarnConfiguration()); conf.addResource(new Path(MRJobConfig.JOB_CONF_FILE)); + + // Explicitly disabling SSL for map reduce task as we can't allow MR users + // to gain access to keystore file for opening SSL listener. We can trust + // RM/NM to issue SSL certificates but definitely not MR-AM as it is + // running in user-land. + HttpConfig.setSecure(conf.getBoolean(MRConfig.SSL_ENABLED_KEY, + MRConfig.SSL_ENABLED_KEY_DEFAULT)); + WebAppUtil.setSSLEnabledInYARN(conf.getBoolean( + CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_KEY, + CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_DEFAULT)); // log the system properties String systemPropsToLog = MRApps.getSystemPropertiesToLog(conf); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java index 181fd3740a9..1661b8ada1c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java @@ -27,8 +27,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.mapreduce.JobACL; +import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol; @@ -78,6 +80,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.security.authorize.MRAMPolicyProvider; import org.apache.hadoop.mapreduce.v2.app.webapp.AMWebApp; +import org.apache.hadoop.mapreduce.v2.app.webapp.WebAppUtil; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java index 7ce4d5d1b6c..cc047619c2c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java @@ -28,7 +28,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.v2.api.records.JobId; @@ -36,12 +38,10 @@ import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; -import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; -import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; +import org.apache.hadoop.mapreduce.v2.app.webapp.WebAppUtil; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; @@ -58,8 +58,6 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import com.sun.research.ws.wadl.Response; - /** * Registers/unregisters to RM and sends heartbeats to RM. */ @@ -148,7 +146,13 @@ public abstract class RMCommunicator extends AbstractService if (serviceAddr != null) { request.setHost(serviceAddr.getHostName()); request.setRpcPort(serviceAddr.getPort()); - request.setTrackingUrl(serviceAddr.getHostName() + ":" + clientService.getHttpPort()); + String scheme = "http://"; + if (getConfig().getBoolean(MRConfig.SSL_ENABLED_KEY, + MRConfig.SSL_ENABLED_KEY_DEFAULT)) { + scheme = "https://"; + } + request.setTrackingUrl(scheme + serviceAddr.getHostName() + ":" + + clientService.getHttpPort()); } RegisterApplicationMasterResponse response = scheduler.registerApplicationMaster(request); @@ -190,10 +194,11 @@ public abstract class RMCommunicator extends AbstractService } LOG.info("Setting job diagnostics to " + sb.toString()); - String historyUrl = JobHistoryUtils.getHistoryUrl(getConfig(), - context.getApplicationID()); + String historyUrl = + WebAppUtil.getSchemePrefix() + + JobHistoryUtils.getHistoryUrl(getConfig(), + context.getApplicationID()); LOG.info("History url is " + historyUrl); - FinishApplicationMasterRequest request = FinishApplicationMasterRequest.newInstance(finishState, sb.toString(), historyUrl); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/AppController.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/AppController.java index da537e5bc71..aff99531752 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/AppController.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/AppController.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.util.Times; import org.apache.hadoop.yarn.webapp.Controller; import org.apache.hadoop.yarn.webapp.View; +import com.google.common.base.Joiner; import com.google.inject.Inject; /** @@ -50,6 +51,7 @@ import com.google.inject.Inject; */ public class AppController extends Controller implements AMParams { private static final Log LOG = LogFactory.getLog(AppController.class); + private static final Joiner JOINER = Joiner.on(""); protected final App app; @@ -58,7 +60,9 @@ public class AppController extends Controller implements AMParams { super(ctx); this.app = app; set(APP_ID, app.context.getApplicationID().toString()); - set(RM_WEB, YarnConfiguration.getRMWebAppURL(conf)); + set(RM_WEB, + JOINER.join(WebAppUtil.getSchemePrefix(), + YarnConfiguration.getRMWebAppHostAndPort(conf))); } @Inject diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JobBlock.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JobBlock.java index 6b80c8c7d07..ccc5e7ead7a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JobBlock.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JobBlock.java @@ -104,7 +104,7 @@ public class JobBlock extends HtmlBlock { table.tr(). td(String.valueOf(attempt.getAttemptId())). td(new Date(attempt.getStartTime()).toString()). - td().a(".nodelink", url(HttpConfig.getSchemePrefix(), + td().a(".nodelink", url(WebAppUtil.getSchemePrefix(), attempt.getNodeHttpAddress()), attempt.getNodeHttpAddress())._(). td().a(".logslink", url(attempt.getLogsLink()), diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/NavBlock.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/NavBlock.java index 686045568f1..a2b00e9672d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/NavBlock.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/NavBlock.java @@ -63,7 +63,7 @@ public class NavBlock extends HtmlBlock { li().a(url("conf", jobid), "Configuration")._(). li().a(url("tasks", jobid, "m"), "Map tasks")._(). li().a(url("tasks", jobid, "r"), "Reduce tasks")._(). - li().a(".logslink", url(HttpConfig.getSchemePrefix(), + li().a(".logslink", url(WebAppUtil.getSchemePrefix(), nodeHttpAddress, "node", "containerlogs", thisAmInfo.getContainerId().toString(), app.getJob().getUserName()), diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java index 430117c4e25..26ef371f8a8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java @@ -86,12 +86,12 @@ public class TaskPage extends AppView { .append(ta.getState().toString()).append("\",\"") .append(nodeHttpAddr == null ? "N/A" : - "" + "" + nodeHttpAddr + "") .append("\",\"") .append(ta.getAssignedContainerId() == null ? "N/A" : - "logs") .append("\",\"") diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/WebAppUtil.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/WebAppUtil.java new file mode 100644 index 00000000000..f2ae7daa0bb --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/WebAppUtil.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.v2.app.webapp; + + +public class WebAppUtil { + private static boolean isSSLEnabledInYARN; + + public static void setSSLEnabledInYARN(boolean isSSLEnabledInYARN) { + WebAppUtil.isSSLEnabledInYARN = isSSLEnabledInYARN; + } + + public static boolean isSSLEnabledInYARN() { + return isSSLEnabledInYARN; + } + + public static String getSchemePrefix() { + if (isSSLEnabledInYARN) { + return "https://"; + } else { + return "http://"; + } + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/AMAttemptInfo.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/AMAttemptInfo.java index dd2e90a9001..a139b5176c2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/AMAttemptInfo.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/AMAttemptInfo.java @@ -26,6 +26,7 @@ import javax.xml.bind.annotation.XmlRootElement; import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; +import org.apache.hadoop.mapreduce.v2.app.webapp.WebAppUtil; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; @@ -63,7 +64,7 @@ public class AMAttemptInfo { ContainerId containerId = amInfo.getContainerId(); if (containerId != null) { this.containerId = containerId.toString(); - this.logsLink = join(HttpConfig.getSchemePrefix() + nodeHttpAddress, + this.logsLink = join(WebAppUtil.getSchemePrefix() + nodeHttpAddress, ujoin("node", "containerlogs", this.containerId, user)); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java index bbac5fcab9d..879f70d0983 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java @@ -84,6 +84,11 @@ public interface MRConfig { "mapreduce.shuffle.ssl.enabled"; public static final boolean SHUFFLE_SSL_ENABLED_DEFAULT = false; + + public static final String SSL_ENABLED_KEY = + "mapreduce.ssl.enabled"; + + public static final boolean SSL_ENABLED_KEY_DEFAULT = false; public static final String SHUFFLE_CONSUMER_PLUGIN = "mapreduce.job.reduce.shuffle.consumer.plugin.class"; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index db63f4511f0..b9796cb798a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -289,6 +289,20 @@ + + mapreduce.ssl.enabled + false + + If enabled, MapReduce application master's http server will be + started with SSL enabled. Map reduce AM by default doesn't support SSL. + If MapReduce jobs want SSL support, it is the user's responsibility to + create and manage certificates, keystores and trust-stores with appropriate + permissions. This is only for MapReduce application master and is not used + by job history server. To enable encrypted shuffle this property is not + required, instead refer to (mapreduce.shuffle.ssl.enabled) property. + + + mapreduce.shuffle.ssl.file.buffer.size 65536 diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java index a5e2f1794a0..0e610d34e89 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java @@ -24,8 +24,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.v2.app.webapp.WebAppUtil; import org.apache.hadoop.mapreduce.v2.hs.server.HSAdminServer; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; @@ -73,6 +75,10 @@ public class JobHistoryServer extends CompositeService { config.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); + // This is required for WebApps to use https if enabled. + WebAppUtil.setSSLEnabledInYARN(conf.getBoolean( + CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_KEY, + CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_DEFAULT)); try { doSecureLogin(conf); } catch(IOException ie) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsJobBlock.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsJobBlock.java index 25b22f0d2aa..5bc44d19fe2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsJobBlock.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsJobBlock.java @@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.app.webapp.WebAppUtil; import org.apache.hadoop.mapreduce.v2.app.webapp.dao.ConfEntryInfo; import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.AMAttemptInfo; import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobInfo; @@ -132,7 +133,7 @@ public class HsJobBlock extends HtmlBlock { table.tr((odd = !odd) ? _ODD : _EVEN). td(String.valueOf(attempt.getAttemptId())). td(new Date(attempt.getStartTime()).toString()). - td().a(".nodelink", url(HttpConfig.getSchemePrefix(), + td().a(".nodelink", url(WebAppUtil.getSchemePrefix(), attempt.getNodeHttpAddress()), attempt.getNodeHttpAddress())._(). td().a(".logslink", url(attempt.getShortLogsLink()), diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTaskPage.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTaskPage.java index f01ddc31ea9..ba8b68fec2d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTaskPage.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTaskPage.java @@ -35,6 +35,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.webapp.App; +import org.apache.hadoop.mapreduce.v2.app.webapp.WebAppUtil; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.util.Times; @@ -148,7 +149,7 @@ public class HsTaskPage extends HsView { .append(sortId + " ").append(taid).append("\",\"") .append(ta.getState().toString()).append("\",\"") - .append("") + .append("") .append(nodeRackName + "/" + nodeHttpAddr + "\",\"") .append(" Date: Fri, 20 Sep 2013 19:03:51 +0000 Subject: [PATCH 09/11] HDFS-5234 Move RpcFrameDecoder out of the public API. Contributed by Haohui Mai git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1525104 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/hadoop/nfs/nfs3/Nfs3Base.java | 5 +- .../org/apache/hadoop/oncrpc/RpcUtil.java | 58 ++++++++++++++++++- .../apache/hadoop/oncrpc/SimpleTcpClient.java | 5 +- .../apache/hadoop/oncrpc/SimpleTcpServer.java | 3 +- .../hadoop/oncrpc/SimpleTcpServerHandler.java | 2 +- .../hadoop/oncrpc/SimpleUdpServerHandler.java | 2 +- .../java/org/apache/hadoop/oncrpc/XDR.java | 4 +- .../hadoop/oncrpc/TestFrameDecoder.java | 6 +- .../org/apache/hadoop/oncrpc/TestXDR.java | 3 +- .../hadoop/hdfs/nfs/TestOutOfOrderWrite.java | 7 ++- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + 11 files changed, 78 insertions(+), 20 deletions(-) diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java index ec96e617585..dcd01c023ca 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java @@ -20,8 +20,8 @@ package org.apache.hadoop.nfs.nfs3; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.mount.MountdBase; -import org.apache.hadoop.oncrpc.RpcFrameDecoder; import org.apache.hadoop.oncrpc.RpcProgram; +import org.apache.hadoop.oncrpc.RpcUtil; import org.apache.hadoop.oncrpc.SimpleTcpServer; import org.apache.hadoop.oncrpc.SimpleTcpServerHandler; import org.apache.hadoop.portmap.PortmapMapping; @@ -68,7 +68,8 @@ public abstract class Nfs3Base { return new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() { - return Channels.pipeline(new RpcFrameDecoder(), + return Channels.pipeline( + RpcUtil.constructRpcFrameDecoder(), new SimpleTcpServerHandler(rpcProgram)); } }; diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java index 7186dd1359e..04ebbbc39bc 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java @@ -17,13 +17,65 @@ */ package org.apache.hadoop.oncrpc; -/** - * The XID in RPC call. It is used for starting with new seed after each reboot. - */ +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.handler.codec.frame.FrameDecoder; + public class RpcUtil { + /** + * The XID in RPC call. It is used for starting with new seed after each reboot. + */ private static int xid = (int) (System.currentTimeMillis() / 1000) << 12; public static int getNewXid(String caller) { return xid = ++xid + caller.hashCode(); } + + public static FrameDecoder constructRpcFrameDecoder() { + return new RpcFrameDecoder(); + } + + static class RpcFrameDecoder extends FrameDecoder { + public static final Log LOG = LogFactory.getLog(RpcFrameDecoder.class); + private ChannelBuffer currentFrame; + + @Override + protected Object decode(ChannelHandlerContext ctx, Channel channel, + ChannelBuffer buf) { + + if (buf.readableBytes() < 4) + return null; + + buf.markReaderIndex(); + + byte[] fragmentHeader = new byte[4]; + buf.readBytes(fragmentHeader); + int length = XDR.fragmentSize(fragmentHeader); + boolean isLast = XDR.isLastFragment(fragmentHeader); + + if (buf.readableBytes() < length) { + buf.resetReaderIndex(); + return null; + } + + ChannelBuffer newFragment = buf.readSlice(length); + if (currentFrame == null) { + currentFrame = newFragment; + } else { + currentFrame = ChannelBuffers.wrappedBuffer(currentFrame, newFragment); + } + + if (isLast) { + ChannelBuffer completeFrame = currentFrame; + currentFrame = null; + return completeFrame; + } else { + return null; + } + } + } } diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java index 287aa9fa8ad..32e1b4b8392 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java @@ -20,8 +20,6 @@ package org.apache.hadoop.oncrpc; import java.net.InetSocketAddress; import java.util.concurrent.Executors; -import org.apache.hadoop.oncrpc.RpcFrameDecoder; -import org.apache.hadoop.oncrpc.XDR; import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.channel.ChannelFactory; import org.jboss.netty.channel.ChannelFuture; @@ -55,7 +53,8 @@ public class SimpleTcpClient { this.pipelineFactory = new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() { - return Channels.pipeline(new RpcFrameDecoder(), + return Channels.pipeline( + RpcUtil.constructRpcFrameDecoder(), new SimpleTcpClientHandler(request)); } }; diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java index e168ef406b9..6f668a21065 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java @@ -57,7 +57,8 @@ public class SimpleTcpServer { return new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() { - return Channels.pipeline(new RpcFrameDecoder(), + return Channels.pipeline( + RpcUtil.constructRpcFrameDecoder(), new SimpleTcpServerHandler(rpcProgram)); } }; diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServerHandler.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServerHandler.java index 71cce18f586..04e2930f60b 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServerHandler.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServerHandler.java @@ -44,7 +44,7 @@ public class SimpleTcpServerHandler extends SimpleChannelHandler { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { ChannelBuffer buf = (ChannelBuffer) e.getMessage(); - XDR request = new XDR(buf.array()); + XDR request = new XDR(buf.toByteBuffer().asReadOnlyBuffer(), XDR.State.READING); InetAddress remoteInetAddr = ((InetSocketAddress) ctx.getChannel() .getRemoteAddress()).getAddress(); diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServerHandler.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServerHandler.java index 517b18d7d30..79a255b2616 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServerHandler.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServerHandler.java @@ -43,7 +43,7 @@ public class SimpleUdpServerHandler extends SimpleChannelHandler { public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { ChannelBuffer buf = (ChannelBuffer) e.getMessage(); - XDR request = new XDR(buf.array()); + XDR request = new XDR(buf.toByteBuffer().asReadOnlyBuffer(), XDR.State.READING); InetAddress remoteInetAddr = ((InetSocketAddress) e.getRemoteAddress()) .getAddress(); diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java index 1472f6ebe62..df2b91f05f4 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java @@ -46,7 +46,7 @@ public final class XDR { private ByteBuffer buf; - private enum State { + public enum State { READING, WRITING, } @@ -66,7 +66,7 @@ public final class XDR { this(DEFAULT_INITIAL_CAPACITY); } - private XDR(ByteBuffer buf, State state) { + public XDR(ByteBuffer buf, State state) { this.buf = buf; this.state = state; } diff --git a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java index 189c24d6035..0c306861b50 100644 --- a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java +++ b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java @@ -18,12 +18,14 @@ package org.apache.hadoop.oncrpc; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.net.InetAddress; import java.nio.ByteBuffer; +import org.apache.hadoop.oncrpc.RpcUtil.RpcFrameDecoder; import org.apache.hadoop.oncrpc.security.CredentialsNone; import org.apache.hadoop.oncrpc.security.VerifierNone; import org.jboss.netty.buffer.ByteBufferBackedChannelBuffer; @@ -138,7 +140,7 @@ public class TestFrameDecoder { buf); assertTrue(channelBuffer != null); // Complete frame should have to total size 10+10=20 - assertTrue(channelBuffer.array().length == 20); + assertEquals(20, channelBuffer.readableBytes()); } @Test @@ -195,4 +197,4 @@ public class TestFrameDecoder { * static void testDump() { XDR xdr_out = new XDR(); * createPortmapXDRheader(xdr_out, 4); testRequest(xdr_out); } */ -} \ No newline at end of file +} diff --git a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestXDR.java b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestXDR.java index 48e148be8e0..98836db0b2f 100644 --- a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestXDR.java +++ b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestXDR.java @@ -17,10 +17,9 @@ */ package org.apache.hadoop.oncrpc; +import org.junit.Assert; import org.junit.Test; -import junit.framework.Assert; - public class TestXDR { private void serializeInt(int times) { XDR w = new XDR(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java index 7f3d8c58eea..2e8869e5a93 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java @@ -33,8 +33,8 @@ import org.apache.hadoop.nfs.nfs3.request.SetAttr3; import org.apache.hadoop.nfs.nfs3.request.WRITE3Request; import org.apache.hadoop.oncrpc.RegistrationClient; import org.apache.hadoop.oncrpc.RpcCall; -import org.apache.hadoop.oncrpc.RpcFrameDecoder; import org.apache.hadoop.oncrpc.RpcReply; +import org.apache.hadoop.oncrpc.RpcUtil; import org.apache.hadoop.oncrpc.SimpleTcpClient; import org.apache.hadoop.oncrpc.SimpleTcpClientHandler; import org.apache.hadoop.oncrpc.XDR; @@ -136,8 +136,9 @@ public class TestOutOfOrderWrite { protected ChannelPipelineFactory setPipelineFactory() { this.pipelineFactory = new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { - return Channels.pipeline(new RpcFrameDecoder(), new WriteHandler( - request)); + return Channels.pipeline( + RpcUtil.constructRpcFrameDecoder(), + new WriteHandler(request)); } }; return this.pipelineFactory; diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index cd12395bcba..c21db104140 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -366,6 +366,9 @@ Release 2.1.1-beta - 2013-09-23 HDFS-5199 Add more debug trace for NFS READ and WRITE. (brandonli) + HDFS-5234 Move RpcFrameDecoder out of the public API. + (Haohui Mai via brandonli) + IMPROVEMENTS HDFS-4513. Clarify in the WebHDFS REST API that all JSON respsonses may From ab22245dc84a6fd84cf97b65adc92684db5743d9 Mon Sep 17 00:00:00 2001 From: Jing Zhao Date: Fri, 20 Sep 2013 20:34:55 +0000 Subject: [PATCH 10/11] HDFS-5231. Fix broken links in the document of HDFS Federation. Contributed by Haohui Mai. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1525133 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +++ .../hadoop-hdfs/src/site/apt/Federation.apt.vm | 4 ++-- .../resources/images}/federation-background.gif | Bin .../src/site/resources/images}/federation.gif | Bin 4 files changed, 5 insertions(+), 2 deletions(-) rename {hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/resources => hadoop-hdfs-project/hadoop-hdfs/src/site/resources/images}/federation-background.gif (100%) rename {hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/resources => hadoop-hdfs-project/hadoop-hdfs/src/site/resources/images}/federation.gif (100%) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index c21db104140..443090787df 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -482,6 +482,9 @@ Release 2.1.1-beta - 2013-09-23 HDFS-5219. Add configuration keys for retry policy in WebHDFSFileSystem. (Haohui Mai via jing9) + HDFS-5231. Fix broken links in the document of HDFS Federation. (Haohui Mai + via jing9) + Release 2.1.0-beta - 2013-08-22 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/Federation.apt.vm b/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/Federation.apt.vm index c7c877053a0..f2ce08c6311 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/Federation.apt.vm +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/Federation.apt.vm @@ -28,7 +28,7 @@ HDFS Federation * {Background} -[./federation-background.gif] HDFS Layers +[./images/federation-background.gif] HDFS Layers HDFS has two main layers: @@ -72,7 +72,7 @@ HDFS Federation send periodic heartbeats and block reports and handles commands from the Namenodes. -[./federation.gif] HDFS Federation Architecture +[./images/federation.gif] HDFS Federation Architecture <> diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/resources/federation-background.gif b/hadoop-hdfs-project/hadoop-hdfs/src/site/resources/images/federation-background.gif similarity index 100% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/resources/federation-background.gif rename to hadoop-hdfs-project/hadoop-hdfs/src/site/resources/images/federation-background.gif diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/resources/federation.gif b/hadoop-hdfs-project/hadoop-hdfs/src/site/resources/images/federation.gif similarity index 100% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/resources/federation.gif rename to hadoop-hdfs-project/hadoop-hdfs/src/site/resources/images/federation.gif From 9f6cf60ae04360ffa9202d0b34ccc29d41faaa3c Mon Sep 17 00:00:00 2001 From: Sanford Ryza Date: Sun, 22 Sep 2013 01:46:58 +0000 Subject: [PATCH 11/11] YARN-1128. FifoPolicy.computeShares throws NPE on empty list of Schedulables (Karthik Kambatla via Sandy Ryza) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1525313 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/fair/policies/FifoPolicy.java | 5 ++ .../fair/policies/TestEmptyQueues.java | 57 +++++++++++++++++++ 3 files changed, 65 insertions(+) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestEmptyQueues.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 9f234570ca3..4b41f79b697 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -55,6 +55,9 @@ Release 2.2.0 - UNRELEASED BUG FIXES + YARN-1128. FifoPolicy.computeShares throws NPE on empty list of Schedulables + (Karthik Kambatla via Sandy Ryza) + Release 2.1.1-beta - 2013-09-23 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java index 90e1e6242f7..3451cfea4c5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies; import java.io.Serializable; import java.util.Collection; import java.util.Comparator; +import java.util.Iterator; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -73,6 +74,10 @@ public class FifoPolicy extends SchedulingPolicy { @Override public void computeShares(Collection schedulables, Resource totalResources) { + if (schedulables.isEmpty()) { + return; + } + Schedulable earliest = null; for (Schedulable schedulable : schedulables) { if (earliest == null || diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestEmptyQueues.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestEmptyQueues.java new file mode 100644 index 00000000000..4636c5bbd8d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestEmptyQueues.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; + +public class TestEmptyQueues { + private Collection schedulables; + + @Before + public void setup() { + schedulables = new ArrayList(); + } + + private void testComputeShares(SchedulingPolicy policy) { + policy.computeShares(schedulables, Resources.none()); + } + + @Test (timeout = 1000) + public void testFifoPolicy() { + testComputeShares(SchedulingPolicy.getInstance(FifoPolicy.class)); + } + + @Test (timeout = 1000) + public void testFairSharePolicy() { + testComputeShares(SchedulingPolicy.getInstance(FairSharePolicy.class)); + } + + @Test (timeout = 1000) + public void testDRFPolicy() { + testComputeShares( + SchedulingPolicy.getInstance(DominantResourceFairnessPolicy.class)); + } +}