From 9c95015bb47f362e592393f0cec899fd8fdd3b04 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Tue, 3 Dec 2013 19:28:02 +0000 Subject: [PATCH 1/6] YARN-895. Changed RM state-store to not crash immediately if RM restarts while the state-store is down. Contributed by Jian He. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1547538 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 ++ .../dev-support/findbugs-exclude.xml | 5 ++ .../hadoop/yarn/conf/YarnConfiguration.java | 18 +++++-- .../src/main/resources/yarn-default.xml | 41 ++++++++++++---- .../recovery/FileSystemRMStateStore.java | 9 +++- .../recovery/ZKRMStateStore.java | 7 +++ .../recovery/TestFSRMStateStore.java | 49 +++++++++++++++++++ ...TestZKRMStateStoreZKClientConnections.java | 32 ++++++++++++ 8 files changed, 148 insertions(+), 16 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 5cd354cfac9..adbcdd00500 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -191,6 +191,9 @@ Release 2.4.0 - UNRELEASED YARN-1416. Fixed a few invalid transitions in RMApp, RMAppAttempt and in some tests. (Jian He via vinodkv) + YARN-895. Changed RM state-store to not crash immediately if RM restarts while + the state-store is down. (Jian He via vinodkv) + Release 2.3.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index b25466533a3..80598a43bb8 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -305,4 +305,9 @@ + + + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index af48f26298a..0480cab7d2d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -301,22 +301,30 @@ public class YarnConfiguration extends Configuration { public static final String RM_STORE = RM_PREFIX + "store.class"; /** URI for FileSystemRMStateStore */ - public static final String FS_RM_STATE_STORE_URI = - RM_PREFIX + "fs.state-store.uri"; + public static final String FS_RM_STATE_STORE_URI = RM_PREFIX + + "fs.state-store.uri"; + public static final String FS_RM_STATE_STORE_RETRY_POLICY_SPEC = RM_PREFIX + + "fs.state-store.retry-policy-spec"; + public static final String DEFAULT_FS_RM_STATE_STORE_RETRY_POLICY_SPEC = + "2000, 500"; /** * Comma separated host:port pairs, each corresponding to a ZK server for * ZKRMStateStore */ public static final String ZK_STATE_STORE_PREFIX = - RM_PREFIX + "zk.state-store."; + RM_PREFIX + "zk-state-store."; public static final String ZK_RM_STATE_STORE_NUM_RETRIES = ZK_STATE_STORE_PREFIX + "num-retries"; - public static final int DEFAULT_ZK_RM_STATE_STORE_NUM_RETRIES = 3; + public static final int DEFAULT_ZK_RM_STATE_STORE_NUM_RETRIES = 500; + /** retry interval when connecting to zookeeper*/ + public static final String ZK_RM_STATE_STORE_RETRY_INTERVAL_MS = + ZK_STATE_STORE_PREFIX + "retry-interval-ms"; + public static final long DEFAULT_ZK_RM_STATE_STORE_RETRY_INTERVAL_MS = 2000; public static final String ZK_RM_STATE_STORE_ADDRESS = ZK_STATE_STORE_PREFIX + "address"; /** Timeout in millisec for ZK server connection for ZKRMStateStore */ public static final String ZK_RM_STATE_STORE_TIMEOUT_MS = - ZK_STATE_STORE_PREFIX + "timeout.ms"; + ZK_STATE_STORE_PREFIX + "timeout-ms"; public static final int DEFAULT_ZK_RM_STATE_STORE_TIMEOUT_MS = 60000; /** Parent znode path under which ZKRMStateStore will create znodes */ public static final String ZK_RM_STATE_STORE_PARENT_PATH = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 7f6e050d2b5..d798f4c9767 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -283,8 +283,8 @@ is implicitly fenced, meaning a single ResourceManager is able to use the store at any point in time. More details on this, along with setting up appropriate ACLs is discussed under the description for - yarn.resourcemanager.zk.state-store.root-node.acl. - yarn.resourcemanager.zk.state-store.address + yarn.resourcemanager.zk-state-store.root-node.acl. + yarn.resourcemanager.zk-state-store.address @@ -293,8 +293,15 @@ ZooKeeper. This may be supplied when using org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore as the value for yarn.resourcemanager.store.class - yarn.resourcemanager.zk.state-store.num-retries - 3 + yarn.resourcemanager.zk-state-store.num-retries + 500 + + + + Retry interval in milliseconds when ZKRMStateStore tries to + connect to ZooKeeper. + yarn.resourcemanager.zk-state-store.retry-interval-ms + 2000 @@ -302,16 +309,20 @@ stored. This must be supplied when using org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore as the value for yarn.resourcemanager.store.class - yarn.resourcemanager.zk.state-store.parent-path + yarn.resourcemanager.zk-state-store.parent-path /rmstore - Timeout when connecting to ZooKeeper. + ZooKeeper session timeout in milliseconds. Session expiration + is managed by the ZooKeeper cluster itself, not by the client. This value is + used by the cluster to determine when the client's session expires. + Expirations happens when the cluster does not hear from the client within + the specified session timeout period (i.e. no heartbeat). This may be supplied when using org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore as the value for yarn.resourcemanager.store.class - yarn.resourcemanager.zk.state-store.timeout.ms + yarn.resourcemanager.zk-state-store.timeout-ms 60000 @@ -320,7 +331,7 @@ This may be supplied when using org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore as the value for yarn.resourcemanager.store.class - yarn.resourcemanager.zk.state-store.acl + yarn.resourcemanager.zk-state-store.acl world:anyone:rwcda @@ -336,7 +347,7 @@ permissions. By default, when this property is not set, we use the ACLs from - yarn.resourcemanager.zk.state-store.acl for shared admin access and + yarn.resourcemanager.zk-state-store.acl for shared admin access and rm-address:cluster-timestamp for username-based exclusive create-delete access. @@ -346,7 +357,7 @@ ResourceManagers have shared admin access and the Active ResourceManger takes over (exclusively) the create-delete access. - yarn.resourcemanager.zk.state-store.root-node.acl + yarn.resourcemanager.zk-state-store.root-node.acl @@ -359,6 +370,16 @@ + + hdfs client retry policy specification. hdfs client retry + is always enabled. Specified in pairs of sleep-time and number-of-retries + and (t0, n0), (t1, n1), ..., the first n0 retries sleep t0 milliseconds on + average, the following n1 retries sleep t1 milliseconds on average, and so on. + + yarn.resourcemanager.fs.state-store.retry-policy-spec + 2000, 500 + + Enable RM high-availability. When enabled, (1) The RM starts in the Standby mode by default, and transitions to diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index 2ef6bcd62c0..23cefd334db 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -94,7 +94,14 @@ protected synchronized void startInternal() throws Exception { // create filesystem only now, as part of service-start. By this time, RM is // authenticated with kerberos so we are good to create a file-system // handle. - fs = fsWorkingPath.getFileSystem(getConfig()); + Configuration conf = new Configuration(getConfig()); + conf.setBoolean("dfs.client.retry.policy.enabled", true); + String retryPolicy = + conf.get(YarnConfiguration.FS_RM_STATE_STORE_RETRY_POLICY_SPEC, + YarnConfiguration.DEFAULT_FS_RM_STATE_STORE_RETRY_POLICY_SPEC); + conf.set("dfs.client.retry.policy.spec", retryPolicy); + + fs = fsWorkingPath.getFileSystem(conf); fs.mkdirs(rmDTSecretManagerRoot); fs.mkdirs(rmAppRoot); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 1621d8327a5..f419ff09888 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -82,6 +82,7 @@ public class ZKRMStateStore extends RMStateStore { private String zkHostPort = null; private int zkSessionTimeout; + private long zkRetryInterval; private List zkAcl; private String zkRootNodePath; private String rmDTSecretManagerRoot; @@ -161,6 +162,9 @@ public synchronized void initInternal(Configuration conf) throws Exception { zkSessionTimeout = conf.getInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS, YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_TIMEOUT_MS); + zkRetryInterval = + conf.getLong(YarnConfiguration.ZK_RM_STATE_STORE_RETRY_INTERVAL_MS, + YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_RETRY_INTERVAL_MS); // Parse authentication from configuration. String zkAclConf = conf.get(YarnConfiguration.ZK_RM_STATE_STORE_ACL, @@ -810,6 +814,9 @@ T runWithRetries() throws Exception { } } catch (KeeperException ke) { if (shouldRetry(ke.code()) && ++retry < numRetries) { + LOG.info("Waiting for zookeeper to be connected, retry no. + " + + retry); + Thread.sleep(zkRetryInterval); continue; } throw ke; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java index 63fe97557c5..4df1c3b0c83 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java @@ -19,6 +19,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; import static org.junit.Assert.assertTrue; + +import java.util.concurrent.atomic.AtomicBoolean; + import junit.framework.Assert; import org.apache.commons.logging.Log; @@ -33,7 +36,9 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.Test; @@ -81,6 +86,8 @@ public RMStateStore getRMStateStore() throws Exception { YarnConfiguration conf = new YarnConfiguration(); conf.set(YarnConfiguration.FS_RM_STATE_STORE_URI, workingDirPathURI.toString()); + conf.set(YarnConfiguration.FS_RM_STATE_STORE_RETRY_POLICY_SPEC, + "100,6000"); this.store = new TestFileSystemRMStore(conf); return store; } @@ -139,4 +146,46 @@ public void testFSRMStateStore() throws Exception { cluster.shutdown(); } } + + @Test (timeout = 30000) + public void testFSRMStateStoreClientRetry() throws Exception { + HdfsConfiguration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); + cluster.waitActive(); + try { + TestFSRMStateStoreTester fsTester = new TestFSRMStateStoreTester(cluster); + final RMStateStore store = fsTester.getRMStateStore(); + store.setRMDispatcher(new TestDispatcher()); + final AtomicBoolean assertionFailedInThread = new AtomicBoolean(false); + cluster.shutdownNameNodes(); + + Thread clientThread = new Thread() { + @Override + public void run() { + try { + store.storeApplicationStateInternal("application1", + (ApplicationStateDataPBImpl) ApplicationStateDataPBImpl + .newApplicationStateData(111, 111, "user", null, + RMAppState.ACCEPTED, "diagnostics", 333)); + } catch (Exception e) { + // TODO 0 datanode exception will not be retried by dfs client, fix + // that separately. + if (!e.getMessage().contains("could only be replicated" + + " to 0 nodes instead of minReplication (=1)")) { + assertionFailedInThread.set(true); + } + e.printStackTrace(); + } + } + }; + Thread.sleep(2000); + clientThread.start(); + cluster.restartNameNode(); + clientThread.join(); + Assert.assertFalse(assertionFailedInThread.get()); + } finally { + cluster.shutdown(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java index 82e550c9173..3def83c0ad7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java @@ -37,6 +37,7 @@ import java.io.IOException; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicBoolean; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -114,6 +115,37 @@ public RMStateStore getRMStateStore(Configuration conf) throws Exception { } } + @Test (timeout = 20000) + public void testZKClientRetry() throws Exception { + TestZKClient zkClientTester = new TestZKClient(); + final String path = "/test"; + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS, 100); + conf.setLong(YarnConfiguration.ZK_RM_STATE_STORE_RETRY_INTERVAL_MS, 100); + final ZKRMStateStore store = + (ZKRMStateStore) zkClientTester.getRMStateStore(conf); + TestDispatcher dispatcher = new TestDispatcher(); + store.setRMDispatcher(dispatcher); + final AtomicBoolean assertionFailedInThread = new AtomicBoolean(false); + + stopServer(); + Thread clientThread = new Thread() { + @Override + public void run() { + try { + store.getDataWithRetries(path, true); + } catch (Exception e) { + e.printStackTrace(); + assertionFailedInThread.set(true); + } + } + }; + Thread.sleep(2000); + startServer(); + clientThread.join(); + Assert.assertFalse(assertionFailedInThread.get()); + } + @Test(timeout = 20000) public void testZKClientDisconnectAndReconnect() throws Exception { From 5afc1242ea323d5d160b8a1c676e499af81f21b4 Mon Sep 17 00:00:00 2001 From: Jonathan Turner Eagles Date: Tue, 3 Dec 2013 22:44:07 +0000 Subject: [PATCH 2/6] MAPREDUCE-5645. TestFixedLengthInputFormat fails with native libs (Mit Desai via jeagles) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1547624 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 +++ .../mapred/TestFixedLengthInputFormat.java | 22 +++++++++++-------- .../lib/input/TestFixedLengthInputFormat.java | 18 +++++++++------ 3 files changed, 27 insertions(+), 16 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 1c2509584a1..a9527bc4bac 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -226,6 +226,9 @@ Release 2.4.0 - UNRELEASED MAPREDUCE-5631. TestJobEndNotifier.testNotifyRetries fails with Should have taken more than 5 seconds in jdk7 (Jonathan Eagles via jlowe) + MAPREDUCE-5645. TestFixedLengthInputFormat fails with native libs (Mit + Desai via jeagles) + Release 2.3.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFixedLengthInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFixedLengthInputFormat.java index a5f19eeb25d..d0bd7eecdf3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFixedLengthInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFixedLengthInputFormat.java @@ -197,17 +197,17 @@ public void testPartialRecordUncompressedIn() throws IOException { public void testGzipWithTwoInputs() throws IOException { CompressionCodec gzip = new GzipCodec(); localFs.delete(workDir, true); - // Create files with fixed length records with 5 byte long records. - writeFile(localFs, new Path(workDir, "part1.txt.gz"), gzip, - "one two threefour five six seveneightnine ten "); - writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip, - "ten nine eightsevensix five four threetwo one "); FixedLengthInputFormat format = new FixedLengthInputFormat(); JobConf job = new JobConf(defaultConf); format.setRecordLength(job, 5); FileInputFormat.setInputPaths(job, workDir); ReflectionUtils.setConf(gzip, job); format.configure(job); + // Create files with fixed length records with 5 byte long records. + writeFile(localFs, new Path(workDir, "part1.txt.gz"), gzip, + "one two threefour five six seveneightnine ten "); + writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip, + "ten nine eightsevensix five four threetwo one "); InputSplit[] splits = format.getSplits(job, 100); assertEquals("compressed splits == 2", 2, splits.length); FileSplit tmp = (FileSplit) splits[0]; @@ -283,12 +283,16 @@ private void runRandomTests(CompressionCodec codec) throws IOException { int fileSize = (totalRecords * recordLength); LOG.info("totalRecords=" + totalRecords + " recordLength=" + recordLength); + // Create the job + JobConf job = new JobConf(defaultConf); + if (codec != null) { + ReflectionUtils.setConf(codec, job); + } // Create the test file ArrayList recordList = createFile(file, codec, recordLength, totalRecords); assertTrue(localFs.exists(file)); - // Create the job and set the fixed length record length config property - JobConf job = new JobConf(defaultConf); + //set the fixed length record length config property for the job FixedLengthInputFormat.setRecordLength(job, recordLength); int numSplits = 1; @@ -383,8 +387,6 @@ private void runPartialRecordTest(CompressionCodec codec) throws IOException { if (codec != null) { fileName.append(".gz"); } - writeFile(localFs, new Path(workDir, fileName.toString()), codec, - "one two threefour five six seveneightnine ten"); FixedLengthInputFormat format = new FixedLengthInputFormat(); JobConf job = new JobConf(defaultConf); format.setRecordLength(job, 5); @@ -393,6 +395,8 @@ private void runPartialRecordTest(CompressionCodec codec) throws IOException { ReflectionUtils.setConf(codec, job); } format.configure(job); + writeFile(localFs, new Path(workDir, fileName.toString()), codec, + "one two threefour five six seveneightnine ten"); InputSplit[] splits = format.getSplits(job, 100); if (codec != null) { assertEquals("compressed splits == 1", 1, splits.length); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFixedLengthInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFixedLengthInputFormat.java index f00b1a92ac1..cf7c61c09b5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFixedLengthInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFixedLengthInputFormat.java @@ -225,16 +225,16 @@ public void testPartialRecordUncompressedIn() throws Exception { public void testGzipWithTwoInputs() throws Exception { CompressionCodec gzip = new GzipCodec(); localFs.delete(workDir, true); - // Create files with fixed length records with 5 byte long records. - writeFile(localFs, new Path(workDir, "part1.txt.gz"), gzip, - "one two threefour five six seveneightnine ten "); - writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip, - "ten nine eightsevensix five four threetwo one "); Job job = Job.getInstance(defaultConf); FixedLengthInputFormat format = new FixedLengthInputFormat(); format.setRecordLength(job.getConfiguration(), 5); ReflectionUtils.setConf(gzip, job.getConfiguration()); FileInputFormat.setInputPaths(job, workDir); + // Create files with fixed length records with 5 byte long records. + writeFile(localFs, new Path(workDir, "part1.txt.gz"), gzip, + "one two threefour five six seveneightnine ten "); + writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip, + "ten nine eightsevensix five four threetwo one "); List splits = format.getSplits(job); assertEquals("compressed splits == 2", 2, splits.size()); FileSplit tmp = (FileSplit) splits.get(0); @@ -310,12 +310,16 @@ private void runRandomTests(CompressionCodec codec) throws Exception { int fileSize = (totalRecords * recordLength); LOG.info("totalRecords=" + totalRecords + " recordLength=" + recordLength); + // Create the job + Job job = Job.getInstance(defaultConf); + if (codec != null) { + ReflectionUtils.setConf(codec, job.getConfiguration()); + } // Create the test file ArrayList recordList = createFile(file, codec, recordLength, totalRecords); assertTrue(localFs.exists(file)); - // Create the job and set the fixed length record length config property - Job job = Job.getInstance(defaultConf); + //set the fixed length record length config property for the job FixedLengthInputFormat.setRecordLength(job.getConfiguration(), recordLength); From f7fe50d55f51257c7b38724b37b9976a96f98c38 Mon Sep 17 00:00:00 2001 From: Sanford Ryza Date: Tue, 3 Dec 2013 22:44:46 +0000 Subject: [PATCH 3/6] HADOOP-10127. Add ipc.client.connect.retry.interval to control the frequency of connection retries (Karthik Kambatla via Sandy Ryza) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1547626 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-common-project/hadoop-common/CHANGES.txt | 3 +++ .../apache/hadoop/fs/CommonConfigurationKeysPublic.java | 5 +++++ .../src/main/java/org/apache/hadoop/ipc/Client.java | 7 ++++++- .../hadoop-common/src/main/resources/core-default.xml | 8 ++++++++ 4 files changed, 22 insertions(+), 1 deletion(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 61d29f43aeb..23e82b79462 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -390,6 +390,9 @@ Release 2.4.0 - UNRELEASED HADOOP-10126. LightWeightGSet log message is confusing. (Vinay via suresh) + HADOOP-10127. Add ipc.client.connect.retry.interval to control the frequency + of connection retries (Karthik Kambatla via Sandy Ryza) + OPTIMIZATIONS HADOOP-9748. Reduce blocking on UGI.ensureInitialized (daryn) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java index 5a75cbdf15b..87746bbe396 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java @@ -192,6 +192,11 @@ public class CommonConfigurationKeysPublic { /** Default value for IPC_CLIENT_CONNECT_MAX_RETRIES_KEY */ public static final int IPC_CLIENT_CONNECT_MAX_RETRIES_DEFAULT = 10; /** See core-default.xml */ + public static final String IPC_CLIENT_CONNECT_RETRY_INTERVAL_KEY = + "ipc.client.connect.retry.interval"; + /** Default value for IPC_CLIENT_CONNECT_RETRY_INTERVAL_KEY */ + public static final int IPC_CLIENT_CONNECT_RETRY_INTERVAL_DEFAULT = 1000; + /** See core-default.xml */ public static final String IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY = "ipc.client.connect.max.retries.on.timeouts"; /** Default value for IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY */ diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index b444a115cb5..764c7dca5ca 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -1562,8 +1562,13 @@ static ConnectionId getConnectionId(InetSocketAddress addr, final int max = conf.getInt( CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_DEFAULT); + final int retryInterval = conf.getInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_RETRY_INTERVAL_KEY, + CommonConfigurationKeysPublic + .IPC_CLIENT_CONNECT_RETRY_INTERVAL_DEFAULT); + connectionRetryPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep( - max, 1, TimeUnit.SECONDS); + max, retryInterval, TimeUnit.MILLISECONDS); } boolean doPing = diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 746d209715a..c7671c5451f 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -618,6 +618,14 @@ + + ipc.client.connect.retry.interval + 1000 + Indicates the number of milliseconds a client will wait for + before retrying to establish a server connection. + + + ipc.client.connect.timeout 20000 From 61c45aae4d0a6cc82b0e9253753ec73cbb0abb7b Mon Sep 17 00:00:00 2001 From: Colin McCabe Date: Tue, 3 Dec 2013 23:13:02 +0000 Subject: [PATCH 4/6] HDFS-4997. libhdfs does not return correct error code in most cases (cmccabe) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1547637 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../contrib/libwebhdfs/src/hdfs_json_parser.c | 9 -- .../src/main/native/libhdfs/exception.c | 33 ++++- .../native/libhdfs/test_libhdfs_threaded.c | 123 +++++++++++------- 4 files changed, 106 insertions(+), 61 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 1177771ec98..4ac7567a988 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -767,6 +767,8 @@ Release 2.3.0 - UNRELEASED HDFS-5563. NFS gateway should commit the buffered data when read request comes after write to the same file (brandonli) + HDFS-4997. libhdfs doesn't return correct error codes in most cases (cmccabe) + Release 2.2.0 - 2013-10-13 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_json_parser.c b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_json_parser.c index a883f06f7c6..178fb9de664 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_json_parser.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libwebhdfs/src/hdfs_json_parser.c @@ -39,14 +39,6 @@ struct jsonException { const char *message; }; -static void dotsToSlashes(char *str) -{ - for (; *str != '\0'; str++) { - if (*str == '.') - *str = '/'; - } -} - /** Print out the JSON exception information */ static int printJsonExceptionV(struct jsonException *exc, int noPrintFlags, const char *fmt, va_list ap) @@ -62,7 +54,6 @@ static int printJsonExceptionV(struct jsonException *exc, int noPrintFlags, fprintf(stderr, "printJsonExceptionV: internal out of memory error\n"); return EINTERNAL; } - dotsToSlashes(javaClassName); getExceptionInfo(javaClassName, noPrintFlags, &excErrno, &shouldPrint); free(javaClassName); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/exception.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/exception.c index 6a50c981b96..d7e17208454 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/exception.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/exception.c @@ -35,36 +35,55 @@ struct ExceptionInfo { static const struct ExceptionInfo gExceptionInfo[] = { { - .name = "java/io/FileNotFoundException", + .name = "java.io.FileNotFoundException", .noPrintFlag = NOPRINT_EXC_FILE_NOT_FOUND, .excErrno = ENOENT, }, { - .name = "org/apache/hadoop/security/AccessControlException", + .name = "org.apache.hadoop.security.AccessControlException", .noPrintFlag = NOPRINT_EXC_ACCESS_CONTROL, .excErrno = EACCES, }, { - .name = "org/apache/hadoop/fs/UnresolvedLinkException", + .name = "org.apache.hadoop.fs.UnresolvedLinkException", .noPrintFlag = NOPRINT_EXC_UNRESOLVED_LINK, .excErrno = ENOLINK, }, { - .name = "org/apache/hadoop/fs/ParentNotDirectoryException", + .name = "org.apache.hadoop.fs.ParentNotDirectoryException", .noPrintFlag = NOPRINT_EXC_PARENT_NOT_DIRECTORY, .excErrno = ENOTDIR, }, { - .name = "java/lang/IllegalArgumentException", + .name = "java.lang.IllegalArgumentException", .noPrintFlag = NOPRINT_EXC_ILLEGAL_ARGUMENT, .excErrno = EINVAL, }, { - .name = "java/lang/OutOfMemoryError", + .name = "java.lang.OutOfMemoryError", .noPrintFlag = 0, .excErrno = ENOMEM, }, - + { + .name = "org.apache.hadoop.hdfs.server.namenode.SafeModeException", + .noPrintFlag = 0, + .excErrno = EROFS, + }, + { + .name = "org.apache.hadoop.fs.FileAlreadyExistsException", + .noPrintFlag = 0, + .excErrno = EEXIST, + }, + { + .name = "org.apache.hadoop.hdfs.protocol.QuotaExceededException", + .noPrintFlag = 0, + .excErrno = EDQUOT, + }, + { + .name = "org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException", + .noPrintFlag = 0, + .excErrno = ESTALE, + }, }; void getExceptionInfo(const char *excName, int noPrintFlags, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test_libhdfs_threaded.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test_libhdfs_threaded.c index cf2bf963922..3a8f31dccac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test_libhdfs_threaded.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test_libhdfs_threaded.c @@ -48,7 +48,8 @@ struct tlhThreadInfo { pthread_t thread; }; -static int hdfsSingleNameNodeConnect(struct NativeMiniDfsCluster *cl, hdfsFS *fs) +static int hdfsSingleNameNodeConnect(struct NativeMiniDfsCluster *cl, hdfsFS *fs, + const char *username) { int ret, port; hdfsFS hdfs; @@ -70,6 +71,9 @@ static int hdfsSingleNameNodeConnect(struct NativeMiniDfsCluster *cl, hdfsFS *fs TO_STR(TLH_DEFAULT_BLOCK_SIZE)); hdfsBuilderConfSetStr(bld, "dfs.blocksize", TO_STR(TLH_DEFAULT_BLOCK_SIZE)); + if (username) { + hdfsBuilderSetUserName(bld, username); + } hdfs = hdfsBuilderConnect(bld); if (!hdfs) { ret = -errno; @@ -110,36 +114,58 @@ static int doTestGetDefaultBlockSize(hdfsFS fs, const char *path) return 0; } -static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs) +struct tlhPaths { + char prefix[256]; + char file1[256]; + char file2[256]; +}; + +static int setupPaths(const struct tlhThreadInfo *ti, struct tlhPaths *paths) { - char prefix[256], tmp[256]; + memset(paths, sizeof(*paths), 0); + if (snprintf(paths->prefix, sizeof(paths->prefix), "/tlhData%04d", + ti->threadIdx) >= sizeof(paths->prefix)) { + return ENAMETOOLONG; + } + if (snprintf(paths->file1, sizeof(paths->file1), "%s/file1", + paths->prefix) >= sizeof(paths->file1)) { + return ENAMETOOLONG; + } + if (snprintf(paths->file2, sizeof(paths->file2), "%s/file2", + paths->prefix) >= sizeof(paths->file2)) { + return ENAMETOOLONG; + } + return 0; +} + +static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs, + const struct tlhPaths *paths) +{ + char tmp[4096]; hdfsFile file; int ret, expected; hdfsFileInfo *fileInfo; struct hdfsReadStatistics *readStats = NULL; - snprintf(prefix, sizeof(prefix), "/tlhData%04d", ti->threadIdx); - - if (hdfsExists(fs, prefix) == 0) { - EXPECT_ZERO(hdfsDelete(fs, prefix, 1)); + if (hdfsExists(fs, paths->prefix) == 0) { + EXPECT_ZERO(hdfsDelete(fs, paths->prefix, 1)); } - EXPECT_ZERO(hdfsCreateDirectory(fs, prefix)); - snprintf(tmp, sizeof(tmp), "%s/file", prefix); + EXPECT_ZERO(hdfsCreateDirectory(fs, paths->prefix)); - EXPECT_ZERO(doTestGetDefaultBlockSize(fs, prefix)); + EXPECT_ZERO(doTestGetDefaultBlockSize(fs, paths->prefix)); /* There should not be any file to open for reading. */ - EXPECT_NULL(hdfsOpenFile(fs, tmp, O_RDONLY, 0, 0, 0)); + EXPECT_NULL(hdfsOpenFile(fs, paths->file1, O_RDONLY, 0, 0, 0)); /* hdfsOpenFile should not accept mode = 3 */ - EXPECT_NULL(hdfsOpenFile(fs, tmp, 3, 0, 0, 0)); + EXPECT_NULL(hdfsOpenFile(fs, paths->file1, 3, 0, 0, 0)); - file = hdfsOpenFile(fs, tmp, O_WRONLY, 0, 0, 0); + file = hdfsOpenFile(fs, paths->file1, O_WRONLY, 0, 0, 0); EXPECT_NONNULL(file); /* TODO: implement writeFully and use it here */ - expected = strlen(prefix); - ret = hdfsWrite(fs, file, prefix, expected); + expected = strlen(paths->prefix); + ret = hdfsWrite(fs, file, paths->prefix, expected); if (ret < 0) { ret = errno; fprintf(stderr, "hdfsWrite failed and set errno %d\n", ret); @@ -155,7 +181,7 @@ static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs) EXPECT_ZERO(hdfsCloseFile(fs, file)); /* Let's re-open the file for reading */ - file = hdfsOpenFile(fs, tmp, O_RDONLY, 0, 0, 0); + file = hdfsOpenFile(fs, paths->file1, O_RDONLY, 0, 0, 0); EXPECT_NONNULL(file); EXPECT_ZERO(hdfsFileGetReadStatistics(file, &readStats)); @@ -180,60 +206,67 @@ static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs) errno = 0; EXPECT_INT_EQ(expected, readStats->totalBytesRead); hdfsFileFreeReadStatistics(readStats); - EXPECT_ZERO(memcmp(prefix, tmp, expected)); + EXPECT_ZERO(memcmp(paths->prefix, tmp, expected)); EXPECT_ZERO(hdfsCloseFile(fs, file)); // TODO: Non-recursive delete should fail? //EXPECT_NONZERO(hdfsDelete(fs, prefix, 0)); + EXPECT_ZERO(hdfsCopy(fs, paths->file1, fs, paths->file2)); - snprintf(tmp, sizeof(tmp), "%s/file", prefix); - EXPECT_ZERO(hdfsChown(fs, tmp, NULL, NULL)); - EXPECT_ZERO(hdfsChown(fs, tmp, NULL, "doop")); - fileInfo = hdfsGetPathInfo(fs, tmp); + EXPECT_ZERO(hdfsChown(fs, paths->file2, NULL, NULL)); + EXPECT_ZERO(hdfsChown(fs, paths->file2, NULL, "doop")); + fileInfo = hdfsGetPathInfo(fs, paths->file2); EXPECT_NONNULL(fileInfo); EXPECT_ZERO(strcmp("doop", fileInfo->mGroup)); hdfsFreeFileInfo(fileInfo, 1); - EXPECT_ZERO(hdfsChown(fs, tmp, "ha", "doop2")); - fileInfo = hdfsGetPathInfo(fs, tmp); + EXPECT_ZERO(hdfsChown(fs, paths->file2, "ha", "doop2")); + fileInfo = hdfsGetPathInfo(fs, paths->file2); EXPECT_NONNULL(fileInfo); EXPECT_ZERO(strcmp("ha", fileInfo->mOwner)); EXPECT_ZERO(strcmp("doop2", fileInfo->mGroup)); hdfsFreeFileInfo(fileInfo, 1); - EXPECT_ZERO(hdfsChown(fs, tmp, "ha2", NULL)); - fileInfo = hdfsGetPathInfo(fs, tmp); + EXPECT_ZERO(hdfsChown(fs, paths->file2, "ha2", NULL)); + fileInfo = hdfsGetPathInfo(fs, paths->file2); EXPECT_NONNULL(fileInfo); EXPECT_ZERO(strcmp("ha2", fileInfo->mOwner)); EXPECT_ZERO(strcmp("doop2", fileInfo->mGroup)); hdfsFreeFileInfo(fileInfo, 1); - EXPECT_ZERO(hdfsDelete(fs, prefix, 1)); + snprintf(tmp, sizeof(tmp), "%s/nonexistent-file-name", paths->prefix); + EXPECT_NEGATIVE_ONE_WITH_ERRNO(hdfsChown(fs, tmp, "ha3", NULL), ENOENT); + return 0; +} + +static int testHdfsOperationsImpl(struct tlhThreadInfo *ti) +{ + hdfsFS fs = NULL; + struct tlhPaths paths; + + fprintf(stderr, "testHdfsOperations(threadIdx=%d): starting\n", + ti->threadIdx); + EXPECT_ZERO(hdfsSingleNameNodeConnect(tlhCluster, &fs, NULL)); + EXPECT_ZERO(setupPaths(ti, &paths)); + // test some operations + EXPECT_ZERO(doTestHdfsOperations(ti, fs, &paths)); + EXPECT_ZERO(hdfsDisconnect(fs)); + // reconnect as user "foo" and verify that we get permission errors + EXPECT_ZERO(hdfsSingleNameNodeConnect(tlhCluster, &fs, "foo")); + EXPECT_NEGATIVE_ONE_WITH_ERRNO(hdfsChown(fs, paths.file1, "ha3", NULL), EACCES); + EXPECT_ZERO(hdfsDisconnect(fs)); + // reconnect to do the final delete. + EXPECT_ZERO(hdfsSingleNameNodeConnect(tlhCluster, &fs, NULL)); + EXPECT_ZERO(hdfsDelete(fs, paths.prefix, 1)); + EXPECT_ZERO(hdfsDisconnect(fs)); return 0; } static void *testHdfsOperations(void *v) { struct tlhThreadInfo *ti = (struct tlhThreadInfo*)v; - hdfsFS fs = NULL; - int ret; - - fprintf(stderr, "testHdfsOperations(threadIdx=%d): starting\n", - ti->threadIdx); - ret = hdfsSingleNameNodeConnect(tlhCluster, &fs); - if (ret) { - fprintf(stderr, "testHdfsOperations(threadIdx=%d): " - "hdfsSingleNameNodeConnect failed with error %d.\n", - ti->threadIdx, ret); - ti->success = EIO; - return NULL; - } - ti->success = doTestHdfsOperations(ti, fs); - if (hdfsDisconnect(fs)) { - ret = errno; - fprintf(stderr, "hdfsDisconnect error %d\n", ret); - ti->success = ret; - } + int ret = testHdfsOperationsImpl(ti); + ti->success = ret; return NULL; } From 5dabc227657c29ea21528674d9ca1aa0e4c2bdd4 Mon Sep 17 00:00:00 2001 From: Sanford Ryza Date: Tue, 3 Dec 2013 23:21:30 +0000 Subject: [PATCH 5/6] YARN-1332. In TestAMRMClient, replace assertTrue with assertEquals where possible (Sebastian Wong via Sandy Ryza) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1547640 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../yarn/client/api/impl/TestAMRMClient.java | 122 +++++++++--------- 2 files changed, 64 insertions(+), 61 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index adbcdd00500..6838b8e9fa6 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -132,6 +132,9 @@ Release 2.4.0 - UNRELEASED YARN-1318. Promoted AdminService to an Always-On service and merged it into RMHAProtocolService. (Karthik Kambatla via vinodkv) + YARN-1332. In TestAMRMClient, replace assertTrue with assertEquals where + possible (Sebastian Wong via Sandy Ryza) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java index 1f7565be181..08e71c1cb84 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java @@ -248,7 +248,7 @@ public void testAMRMClientMatchingFit() throws YarnException, IOException { matches = amClient.getMatchingRequests(priority, node, testCapability1); verifyMatches(matches, 1); storedRequest = matches.get(0).iterator().next(); - assertTrue(storedContainer1 == storedRequest); + assertEquals(storedContainer1, storedRequest); amClient.removeContainerRequest(storedContainer1); // exact matching with order maintained @@ -259,9 +259,9 @@ public void testAMRMClientMatchingFit() throws YarnException, IOException { int i = 0; for(ContainerRequest storedRequest1 : matches.get(0)) { if(i++ == 0) { - assertTrue(storedContainer4 == storedRequest1); + assertEquals(storedContainer4, storedRequest1); } else { - assertTrue(storedContainer6 == storedRequest1); + assertEquals(storedContainer6, storedRequest1); } } amClient.removeContainerRequest(storedContainer6); @@ -276,7 +276,7 @@ public void testAMRMClientMatchingFit() throws YarnException, IOException { assert(matches.size() == 2); // verify non-fitting containers are not returned and fitting ones are for(Collection testSet : matches) { - assertTrue(testSet.size() == 1); + assertEquals(1, testSet.size()); ContainerRequest testRequest = testSet.iterator().next(); assertTrue(testRequest != storedContainer4); assertTrue(testRequest != storedContainer5); @@ -310,8 +310,8 @@ public void testAMRMClientMatchingFit() throws YarnException, IOException { private void verifyMatches( List> matches, int matchSize) { - assertTrue(matches.size() == 1); - assertTrue(matches.get(0).size() == matchSize); + assertEquals(1, matches.size()); + assertEquals(matches.get(0).size(), matchSize); } @Test (timeout=60000) @@ -337,12 +337,12 @@ public void testAMRMClientMatchingFitInferredRack() throws YarnException, IOExce matches = amClient.getMatchingRequests(priority, node, capability); verifyMatches(matches, 1); storedRequest = matches.get(0).iterator().next(); - assertTrue(storedContainer1 == storedRequest); + assertEquals(storedContainer1, storedRequest); // inferred match rack matches = amClient.getMatchingRequests(priority, rack, capability); verifyMatches(matches, 1); storedRequest = matches.get(0).iterator().next(); - assertTrue(storedContainer1 == storedRequest); + assertEquals(storedContainer1, storedRequest); // inferred rack match no longer valid after request is removed amClient.removeContainerRequest(storedContainer1); @@ -387,10 +387,10 @@ public void testAMRMClientMatchStorage() throws YarnException, IOException { // test addition and storage int containersRequestedAny = amClient.remoteRequestsTable.get(priority) .get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers(); - assertTrue(containersRequestedAny == 2); + assertEquals(2, containersRequestedAny); containersRequestedAny = amClient.remoteRequestsTable.get(priority1) .get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers(); - assertTrue(containersRequestedAny == 1); + assertEquals(1, containersRequestedAny); List> matches = amClient.getMatchingRequests(priority, node, capability); verifyMatches(matches, 2); @@ -417,7 +417,7 @@ public void testAMRMClientMatchStorage() throws YarnException, IOException { // test matching of containers ContainerRequest storedRequest = matches.get(0).iterator().next(); - assertTrue(storedContainer1 == storedRequest); + assertEquals(storedContainer1, storedRequest); amClient.removeContainerRequest(storedContainer1); matches = amClient.getMatchingRequests(priority, ResourceRequest.ANY, capability); @@ -438,10 +438,10 @@ public void testAMRMClientMatchStorage() throws YarnException, IOException { && iterationsLeft-- > 0) { Log.info(" == alloc " + allocatedContainerCount + " it left " + iterationsLeft); AllocateResponse allocResponse = amClient.allocate(0.1f); - assertTrue(amClient.ask.size() == 0); - assertTrue(amClient.release.size() == 0); + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); - assertTrue(nodeCount == amClient.getClusterNodeCount()); + assertEquals(nodeCount, amClient.getClusterNodeCount()); allocatedContainerCount += allocResponse.getAllocatedContainers().size(); for(Container container : allocResponse.getAllocatedContainers()) { ContainerRequest expectedRequest = @@ -453,7 +453,7 @@ public void testAMRMClientMatchStorage() throws YarnException, IOException { // test correct matched container is returned verifyMatches(matches, 1); ContainerRequest matchedRequest = matches.get(0).iterator().next(); - assertTrue(matchedRequest == expectedRequest); + assertEquals(matchedRequest, expectedRequest); amClient.removeContainerRequest(matchedRequest); // assign this container, use it and release it amClient.releaseAssignedContainer(container.getId()); @@ -464,11 +464,11 @@ public void testAMRMClientMatchStorage() throws YarnException, IOException { } } - assertTrue(allocatedContainerCount == 2); + assertEquals(2, allocatedContainerCount); AllocateResponse allocResponse = amClient.allocate(0.1f); - assertTrue(amClient.release.size() == 0); - assertTrue(amClient.ask.size() == 0); - assertTrue(allocResponse.getAllocatedContainers().size() == 0); + assertEquals(0, amClient.release.size()); + assertEquals(0, amClient.ask.size()); + assertEquals(0, allocResponse.getAllocatedContainers().size()); // 0 requests left. everything got cleaned up assertTrue(amClient.remoteRequestsTable.isEmpty()); @@ -494,14 +494,14 @@ public void testAllocationWithBlacklist() throws YarnException, IOException { amClient.start(); amClient.registerApplicationMaster("Host", 10000, ""); - assertTrue(amClient.ask.size() == 0); - assertTrue(amClient.release.size() == 0); + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); ContainerRequest storedContainer1 = new ContainerRequest(capability, nodes, racks, priority); amClient.addContainerRequest(storedContainer1); - assertTrue(amClient.ask.size() == 3); - assertTrue(amClient.release.size() == 0); + assertEquals(3, amClient.ask.size()); + assertEquals(0, amClient.release.size()); List localNodeBlacklist = new ArrayList(); localNodeBlacklist.add(node); @@ -512,7 +512,7 @@ public void testAllocationWithBlacklist() throws YarnException, IOException { int allocatedContainerCount = getAllocatedContainersNumber(amClient, DEFAULT_ITERATION); // the only node is in blacklist, so no allocation - assertTrue(allocatedContainerCount == 0); + assertEquals(0, allocatedContainerCount); // Remove node from blacklist, so get assigned with 2 amClient.updateBlacklist(null, localNodeBlacklist); @@ -521,7 +521,7 @@ public void testAllocationWithBlacklist() throws YarnException, IOException { amClient.addContainerRequest(storedContainer2); allocatedContainerCount = getAllocatedContainersNumber(amClient, DEFAULT_ITERATION); - assertEquals(allocatedContainerCount, 2); + assertEquals(2, allocatedContainerCount); // Test in case exception in allocate(), blacklist is kept assertTrue(amClient.blacklistAdditions.isEmpty()); @@ -538,7 +538,7 @@ public void testAllocationWithBlacklist() throws YarnException, IOException { amClient.allocate(0.1f); fail("there should be an exception here."); } catch (Exception e) { - assertEquals(amClient.blacklistAdditions.size(), 1); + assertEquals(1, amClient.blacklistAdditions.size()); } } finally { if (amClient != null && amClient.getServiceState() == STATE.STARTED) { @@ -565,16 +565,16 @@ public void testAMRMClientWithBlacklist() throws YarnException, IOException { nodeList01.add(nodes[0]); nodeList01.add(nodes[1]); amClient.updateBlacklist(nodeList01, null); - assertEquals(amClient.blacklistAdditions.size(),2); - assertEquals(amClient.blacklistRemovals.size(),0); + assertEquals(2, amClient.blacklistAdditions.size()); + assertEquals(0, amClient.blacklistRemovals.size()); // Add nodes[0] again, verify it is not added duplicated. List nodeList02 = new ArrayList(); nodeList02.add(nodes[0]); nodeList02.add(nodes[2]); amClient.updateBlacklist(nodeList02, null); - assertEquals(amClient.blacklistAdditions.size(),3); - assertEquals(amClient.blacklistRemovals.size(),0); + assertEquals(3, amClient.blacklistAdditions.size()); + assertEquals(0, amClient.blacklistRemovals.size()); // Add nodes[1] and nodes[2] to removal list, // Verify addition list remove these two nodes. @@ -582,16 +582,16 @@ public void testAMRMClientWithBlacklist() throws YarnException, IOException { nodeList12.add(nodes[1]); nodeList12.add(nodes[2]); amClient.updateBlacklist(null, nodeList12); - assertEquals(amClient.blacklistAdditions.size(),1); - assertEquals(amClient.blacklistRemovals.size(),2); + assertEquals(1, amClient.blacklistAdditions.size()); + assertEquals(2, amClient.blacklistRemovals.size()); // Add nodes[1] again to addition list, // Verify removal list will remove this node. List nodeList1 = new ArrayList(); nodeList1.add(nodes[1]); amClient.updateBlacklist(nodeList1, null); - assertEquals(amClient.blacklistAdditions.size(),2); - assertEquals(amClient.blacklistRemovals.size(),1); + assertEquals(2, amClient.blacklistAdditions.size()); + assertEquals(1, amClient.blacklistRemovals.size()); } finally { if (amClient != null && amClient.getServiceState() == STATE.STARTED) { amClient.stop(); @@ -606,10 +606,10 @@ private int getAllocatedContainersNumber( while (iterationsLeft-- > 0) { Log.info(" == alloc " + allocatedContainerCount + " it left " + iterationsLeft); AllocateResponse allocResponse = amClient.allocate(0.1f); - assertTrue(amClient.ask.size() == 0); - assertTrue(amClient.release.size() == 0); + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); - assertTrue(nodeCount == amClient.getClusterNodeCount()); + assertEquals(nodeCount, amClient.getClusterNodeCount()); allocatedContainerCount += allocResponse.getAllocatedContainers().size(); if(allocatedContainerCount == 0) { @@ -654,8 +654,8 @@ private void testAllocation(final AMRMClientImpl amClient) throws YarnException, IOException { // setup container request - assertTrue(amClient.ask.size() == 0); - assertTrue(amClient.release.size() == 0); + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); amClient.addContainerRequest( new ContainerRequest(capability, nodes, racks, priority)); @@ -677,11 +677,11 @@ private void testAllocation(final AMRMClientImpl amClient) int containersRequestedAny = amClient.remoteRequestsTable.get(priority) .get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers(); - assertTrue(containersRequestedNode == 2); - assertTrue(containersRequestedRack == 2); - assertTrue(containersRequestedAny == 2); - assertTrue(amClient.ask.size() == 3); - assertTrue(amClient.release.size() == 0); + assertEquals(2, containersRequestedNode); + assertEquals(2, containersRequestedRack); + assertEquals(2, containersRequestedAny); + assertEquals(3, amClient.ask.size()); + assertEquals(0, amClient.release.size()); // RM should allocate container within 2 calls to allocate() int allocatedContainerCount = 0; @@ -695,10 +695,10 @@ private void testAllocation(final AMRMClientImpl amClient) while (allocatedContainerCount < containersRequestedAny && iterationsLeft-- > 0) { AllocateResponse allocResponse = amClient.allocate(0.1f); - assertTrue(amClient.ask.size() == 0); - assertTrue(amClient.release.size() == 0); + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); - assertTrue(nodeCount == amClient.getClusterNodeCount()); + assertEquals(nodeCount, amClient.getClusterNodeCount()); allocatedContainerCount += allocResponse.getAllocatedContainers().size(); for(Container container : allocResponse.getAllocatedContainers()) { ContainerId rejectContainerId = container.getId(); @@ -724,19 +724,19 @@ private void testAllocation(final AMRMClientImpl amClient) Assert.assertTrue(receivedNMTokens.size() > 0 && receivedNMTokens.size() <= nodeCount); - assertTrue(allocatedContainerCount == containersRequestedAny); - assertTrue(amClient.release.size() == 2); - assertTrue(amClient.ask.size() == 0); + assertEquals(allocatedContainerCount, containersRequestedAny); + assertEquals(2, amClient.release.size()); + assertEquals(0, amClient.ask.size()); // need to tell the AMRMClient that we dont need these resources anymore amClient.removeContainerRequest( new ContainerRequest(capability, nodes, racks, priority)); amClient.removeContainerRequest( new ContainerRequest(capability, nodes, racks, priority)); - assertTrue(amClient.ask.size() == 3); + assertEquals(3, amClient.ask.size()); // send 0 container count request for resources that are no longer needed ResourceRequest snoopRequest = amClient.ask.iterator().next(); - assertTrue(snoopRequest.getNumContainers() == 0); + assertEquals(0, snoopRequest.getNumContainers()); // test RPC exception handling amClient.addContainerRequest(new ContainerRequest(capability, nodes, @@ -744,7 +744,7 @@ private void testAllocation(final AMRMClientImpl amClient) amClient.addContainerRequest(new ContainerRequest(capability, nodes, racks, priority)); snoopRequest = amClient.ask.iterator().next(); - assertTrue(snoopRequest.getNumContainers() == 2); + assertEquals(2, snoopRequest.getNumContainers()); ApplicationMasterProtocol realRM = amClient.rmClient; try { @@ -768,12 +768,12 @@ public AllocateResponse answer(InvocationOnMock invocation) amClient.rmClient = realRM; } - assertTrue(amClient.release.size() == 2); - assertTrue(amClient.ask.size() == 3); + assertEquals(2, amClient.release.size()); + assertEquals(3, amClient.ask.size()); snoopRequest = amClient.ask.iterator().next(); // verify that the remove request made in between makeRequest and allocate // has not been lost - assertTrue(snoopRequest.getNumContainers() == 0); + assertEquals(0, snoopRequest.getNumContainers()); iterationsLeft = 3; // do a few iterations to ensure RM is not going send new containers @@ -781,13 +781,13 @@ public AllocateResponse answer(InvocationOnMock invocation) // inform RM of rejection AllocateResponse allocResponse = amClient.allocate(0.1f); // RM did not send new containers because AM does not need any - assertTrue(allocResponse.getAllocatedContainers().size() == 0); + assertEquals(0, allocResponse.getAllocatedContainers().size()); if(allocResponse.getCompletedContainersStatuses().size() > 0) { for(ContainerStatus cStatus :allocResponse .getCompletedContainersStatuses()) { if(releases.contains(cStatus.getContainerId())) { - assertTrue(cStatus.getState() == ContainerState.COMPLETE); - assertTrue(cStatus.getExitStatus() == -100); + assertEquals(cStatus.getState(), ContainerState.COMPLETE); + assertEquals(-100, cStatus.getExitStatus()); releases.remove(cStatus.getContainerId()); } } @@ -797,8 +797,8 @@ public AllocateResponse answer(InvocationOnMock invocation) sleep(100); } } - assertTrue(amClient.ask.size() == 0); - assertTrue(amClient.release.size() == 0); + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); } private void sleep(int sleepTime) { From 7fe953ed9099efc0c9b964e70c49c57338dada1b Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Wed, 4 Dec 2013 00:22:12 +0000 Subject: [PATCH 6/6] YARN-1454. Fixed test failure issue with TestRMRestart. Contributed by Karthik Kambatla. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1547651 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 +++ .../hadoop/yarn/server/resourcemanager/TestRMRestart.java | 2 ++ 2 files changed, 5 insertions(+) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 6838b8e9fa6..50c7cc10795 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -197,6 +197,9 @@ Release 2.4.0 - UNRELEASED YARN-895. Changed RM state-store to not crash immediately if RM restarts while the state-store is down. (Jian He via vinodkv) + YARN-1454. Fixed test failure issue with TestRMRestart. (Karthik Kambatla + via vinodkv) + Release 2.3.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 9e145668426..38f8542bde5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -1247,6 +1247,8 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception { // renewDate before renewing Long renewDateBeforeRenew = allTokensRM2.get(dtId1); try{ + // Sleep for one millisecond to make sure renewDataAfterRenew is greater + Thread.sleep(1); // renew recovered token rm2.getRMDTSecretManager().renewToken(token1, "renewer1"); } catch(Exception e) {