From 34f08944b7c8d58f531a3f3bf3d4ee4cd3fa643a Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Thu, 17 Oct 2013 02:14:33 +0000 Subject: [PATCH] merge trunk to branch HDFS-4949 git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-4949@1532952 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 211 ++++++++- .../hadoop-hdfs/src/contrib/bkjournal/pom.xml | 18 + .../org/apache/hadoop/hdfs/DFSClient.java | 23 +- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 20 +- .../apache/hadoop/hdfs/DFSInputStream.java | 2 +- .../apache/hadoop/hdfs/DFSOutputStream.java | 58 +-- .../java/org/apache/hadoop/hdfs/DFSUtil.java | 43 ++ .../hadoop/hdfs/DistributedFileSystem.java | 5 +- .../java/org/apache/hadoop/hdfs/HAUtil.java | 35 +- .../apache/hadoop/hdfs/HftpFileSystem.java | 60 +-- .../apache/hadoop/hdfs/HsftpFileSystem.java | 23 +- .../apache/hadoop/hdfs/NameNodeProxies.java | 10 +- .../hadoop/hdfs/protocol/DatanodeID.java | 22 +- .../hadoop/hdfs/protocol/DatanodeInfo.java | 23 +- .../hadoop/hdfs/protocolPB/PBHelper.java | 4 +- .../DelegationTokenSecretManager.java | 19 + .../hadoop/hdfs/server/balancer/Balancer.java | 38 +- .../server/blockmanagement/BlockManager.java | 38 +- .../blockmanagement/BlockPlacementPolicy.java | 74 +-- .../BlockPlacementPolicyDefault.java | 361 ++++++--------- .../BlockPlacementPolicyWithNodeGroup.java | 78 ++-- .../server/blockmanagement/BlocksMap.java | 6 +- .../blockmanagement/DatanodeDescriptor.java | 3 +- .../blockmanagement/DatanodeManager.java | 92 ++-- .../hadoop/hdfs/server/common/JspHelper.java | 68 +-- .../datanode/BlockPoolSliceScanner.java | 16 +- .../hadoop/hdfs/server/datanode/DataNode.java | 169 ++----- .../server/datanode/DatanodeJspHelper.java | 220 ++++----- .../datanode/fsdataset/RollingLogs.java | 6 + .../fsdataset/impl/RollingLogsImpl.java | 7 + .../server/namenode/ClusterJspHelper.java | 13 +- .../hdfs/server/namenode/FSDirectory.java | 10 +- .../hdfs/server/namenode/FSNamesystem.java | 437 ++++++++++-------- .../server/namenode/FileChecksumServlets.java | 11 +- .../hdfs/server/namenode/FileDataServlet.java | 11 +- .../server/namenode/NameNodeHttpServer.java | 20 +- .../server/namenode/NameNodeRpcServer.java | 15 +- .../server/namenode/NamenodeJspHelper.java | 142 +++--- .../hdfs/server/namenode/Namesystem.java | 3 + .../server/namenode/SafeModeException.java | 5 +- .../namenode/StartupProgressServlet.java | 8 +- .../namenode/metrics/FSNamesystemMBean.java | 15 + .../protocol/DisallowedDatanodeException.java | 7 +- .../hadoop/hdfs/util/LightWeightHashSet.java | 2 +- .../org/apache/hadoop/hdfs/web/JsonUtil.java | 29 +- .../hadoop/hdfs/web/WebHdfsFileSystem.java | 173 +++++-- .../src/main/native/libhdfs/hdfs.c | 2 +- .../hadoop-hdfs/src/main/proto/hdfs.proto | 3 +- .../src/main/resources/hdfs-default.xml | 16 + .../src/main/webapps/hdfs/corrupt_files.jsp | 10 +- .../src/main/webapps/hdfs/dfshealth.jsp | 18 +- .../src/main/webapps/hdfs/dfsnodelist.jsp | 4 +- .../src/site/apt/Federation.apt.vm | 4 +- .../images/federation-background.gif | Bin 0 -> 13420 bytes .../src/site/resources/images/federation.gif | Bin 0 -> 18355 bytes .../org/apache/hadoop/fs/TestGlobPaths.java | 143 ++++-- .../fs/TestHDFSFileContextMainOperations.java | 43 +- ...tCommand.java => TestHdfsTextCommand.java} | 2 +- .../org/apache/hadoop/hdfs/DFSTestUtil.java | 73 +-- .../hadoop/hdfs/TestDFSClientRetries.java | 6 +- .../hadoop/hdfs/TestDFSOutputStream.java | 74 +++ .../org/apache/hadoop/hdfs/TestDFSShell.java | 53 ++- .../org/apache/hadoop/hdfs/TestDFSUtil.java | 67 ++- .../hadoop/hdfs/TestDatanodeBlockScanner.java | 39 ++ .../hadoop/hdfs/TestDatanodeRegistration.java | 23 +- .../apache/hadoop/hdfs/TestDecommission.java | 11 +- .../hdfs/TestDistributedFileSystem.java | 42 +- .../hadoop/hdfs/TestFileInputStreamCache.java | 17 +- .../hadoop/hdfs/TestHftpDelegationToken.java | 10 +- .../hadoop/hdfs/TestHftpFileSystem.java | 24 +- .../hadoop/hdfs/TestNameNodeHttpServer.java | 39 ++ .../org/apache/hadoop/hdfs/TestPeerCache.java | 29 +- .../hdfs/TestShortCircuitLocalRead.java | 60 +++ .../hadoop/hdfs/TestSnapshotCommands.java | 213 +++++++++ .../balancer/TestBalancerWithNodeGroup.java | 12 +- .../blockmanagement/TestCachedBlocksList.java | 6 +- .../TestRBWBlockInvalidation.java | 4 +- .../TestReplicationPolicy.java | 198 ++++---- .../TestReplicationPolicyWithNodeGroup.java | 161 +++---- .../hdfs/server/common/TestJspHelper.java | 171 ++++++- .../server/datanode/DataNodeTestUtils.java | 17 +- .../hdfs/server/datanode/TestDatanodeJsp.java | 17 +- .../namenode/NNThroughputBenchmark.java | 46 +- .../server/namenode/TestAddBlockRetry.java | 4 +- .../server/namenode/TestClusterJspHelper.java | 59 +++ .../TestCommitBlockSynchronization.java | 18 +- .../server/namenode/TestFSNamesystem.java | 20 +- .../namenode/TestFSNamesystemMBean.java | 72 +++ .../namenode/TestFileJournalManager.java | 29 +- .../hdfs/server/namenode/TestINodeFile.java | 56 ++- .../namenode/TestNameNodeJspHelper.java | 290 +++++++++++- .../namenode/TestNamenodeRetryCache.java | 44 +- .../namenode/TestStartupProgressServlet.java | 16 + .../server/namenode/ha/TestDNFencing.java | 4 +- .../ha/TestDelegationTokensWithHA.java | 116 ++++- .../server/namenode/ha/TestHASafeMode.java | 52 ++- .../ha/TestLossyRetryInvocationHandler.java | 57 +++ .../namenode/ha/TestRetryCacheWithHA.java | 13 +- .../namenode/metrics/TestNameNodeMetrics.java | 7 - .../snapshot/TestOpenFilesWithSnapshot.java | 113 +++++ .../snapshot/TestSnapshotDeletion.java | 67 ++- .../namenode/snapshot/TestSnapshotRename.java | 28 ++ .../hadoop/hdfs/web/TestWebHDFSForHA.java | 77 +++ .../hadoop/hdfs/web/WebHdfsTestUtil.java | 2 +- .../src/test/resources/testHDFSConf.xml | 29 +- 105 files changed, 3843 insertions(+), 1640 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/site/resources/images/federation-background.gif create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/site/resources/images/federation.gif rename hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/shell/{TestTextCommand.java => TestHdfsTextCommand.java} (99%) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestNameNodeHttpServer.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSnapshotCommands.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestClusterJspHelper.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystemMBean.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestLossyRetryInvocationHandler.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFSForHA.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index f07d3beaba2..5ccd87bc348 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -120,8 +120,8 @@ Trunk (Unreleased) HDFS-4904. Remove JournalService. (Arpit Agarwal via cnauroth) - HDFS-4953. Enable HDFS local reads via mmap. - (Colin Patrick McCabe via wang). + HDFS-5041. Add the time of last heartbeat to dead server Web UI (Shinichi + Yamashita via brandonli) OPTIMIZATIONS @@ -247,8 +247,19 @@ Release 2.3.0 - UNRELEASED NEW FEATURES + HDFS-5122. Support failover and retry in WebHdfsFileSystem for NN HA. + (Haohui Mai via jing9) + + HDFS-4953. Enable HDFS local reads via mmap. + (Colin Patrick McCabe via wang). + + HDFS-5342. Provide more information in the FSNamesystem JMX interfaces. + (Haohui Mai via jing9) + IMPROVEMENTS + HDFS-5267. Remove volatile from LightWeightHashSet. (Junping Du via llu) + HDFS-4657. Limit the number of blocks logged by the NN after a block report to a configurable value. (Aaron T. Myers via Colin Patrick McCabe) @@ -256,9 +267,6 @@ Release 2.3.0 - UNRELEASED HDFS-4278. Log an ERROR when DFS_BLOCK_ACCESS_TOKEN_ENABLE config is disabled but security is turned on. (Kousuke Saruta via harsh) - HDFS-4817. Make HDFS advisory caching configurable on a per-file basis. - (Colin Patrick McCabe) - HDFS-5004. Add additional JMX bean for NameNode status data (Trevor Lorimer via cos) @@ -275,8 +283,46 @@ Release 2.3.0 - UNRELEASED HDFS-4879. Add "blocked ArrayList" collection to avoid CMS full GCs (Todd Lipcon via Colin Patrick McCabe) + HDFS-4096. Add snapshot information to namenode WebUI. (Haohui Mai via + jing9) + + HDFS-5188. In BlockPlacementPolicy, reduce the number of chooseTarget(..) + methods; replace HashMap with Map in parameter declarations and cleanup + some related code. (szetszwo) + + HDFS-5207. In BlockPlacementPolicy.chooseTarget(..), change the writer + and the excludedNodes parameter types respectively to Node and Set. + (Junping Du via szetszwo) + + HDFS-5240. Separate formatting from logging in the audit logger API (daryn) + + HDFS-5191. Revisit zero-copy API in FSDataInputStream to make it more + intuitive. (Contributed by Colin Patrick McCabe) + + HDFS-5260. Merge zero-copy memory-mapped HDFS client reads to trunk and + branch-2. (cnauroth) + + HDFS-4517. Cover class RemoteBlockReader with unit tests. (Vadim Bondarev + and Dennis Y via kihwal) + + HDFS-4512. Cover package org.apache.hadoop.hdfs.server.common with tests. + (Vadim Bondarev via kihwal) + + HDFS-4510. Cover classes ClusterJspHelper/NamenodeJspHelper with unit + tests. (Andrey Klochkov via kihwal) + + HDFS-5323. Remove some deadcode in BlockManager (Colin Patrick McCabe) + + HDFS-5338. Add a conf to disable hostname check in datanode registration. + (szetszwo) + + HDFS-5130. Add test for snapshot related FsShell and DFSAdmin commands. + (Binglin Chang via jing9) + OPTIMIZATIONS + HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn) + BUG FIXES HDFS-5034. Remove debug prints from GetFileLinkInfo (Andrew Wang via Colin Patrick McCabe) @@ -295,7 +341,130 @@ Release 2.3.0 - UNRELEASED HDFS-5170. BlockPlacementPolicyDefault uses the wrong classname when alerting to enable debug logging. (Andrew Wang) -Release 2.1.1-beta - UNRELEASED + HDFS-5031. BlockScanner scans the block multiple times. (Vinay via Arpit + Agarwal) + + HDFS-5266. ElasticByteBufferPool#Key does not implement equals. (cnauroth) + + HDFS-5352. Server#initLog() doesn't close InputStream in httpfs. (Ted Yu via + jing9) + + HDFS-5283. Under construction blocks only inside snapshots should not be + counted in safemode threshhold. (Vinay via szetszwo) + + HDFS-4376. Fix race conditions in Balancer. (Junping Du via szetszwo) + +Release 2.2.1 - UNRELEASED + + INCOMPATIBLE CHANGES + + NEW FEATURES + + IMPROVEMENTS + + HDFS-5360. Improvement of usage message of renameSnapshot and + deleteSnapshot. (Shinichi Yamashita via wang) + + OPTIMIZATIONS + + BUG FIXES + + HDFS-5307. Support both HTTP and HTTPS in jsp pages (Haohui Mai via + brandonli) + + HDFS-5291. Standby namenode after transition to active goes into safemode. + (jing9) + + HDFS-5317. Go back to DFS Home link does not work on datanode webUI + (Haohui Mai via brandonli) + + HDFS-5316. Namenode ignores the default https port (Haohui Mai via + brandonli) + + HDFS-5281. COMMIT request should not block. (brandonli) + + HDFS-5337. should do hsync for a commit request even there is no pending + writes (brandonli) + + HDFS-5335. Hive query failed with possible race in dfs output stream. + (Haohui Mai via suresh) + + HDFS-5322. HDFS delegation token not found in cache errors seen on secure HA + clusters. (jing9) + + HDFS-5329. Update FSNamesystem#getListing() to handle inode path in startAfter + token. (brandonli) + + HDFS-5330. fix readdir and readdirplus for large directories (brandonli) + + HDFS-5370. Typo in Error Message: different between range in condition + and range in error message. (Kousuke Saruta via suresh) + +Release 2.2.0 - 2013-10-13 + + INCOMPATIBLE CHANGES + + NEW FEATURES + + HDFS-4817. Make HDFS advisory caching configurable on a per-file basis. + (Colin Patrick McCabe) + + HDFS-5230. Introduce RpcInfo to decouple XDR classes from the RPC API. + (Haohui Mai via brandonli) + + IMPROVEMENTS + + HDFS-5246. Make Hadoop nfs server port and mount daemon port + configurable. (Jinghui Wang via brandonli) + + HDFS-5256. Use guava LoadingCache to implement DFSClientCache. (Haohui Mai + via brandonli) + + HDFS-5308. Replace HttpConfig#getSchemePrefix with implicit schemes in HDFS + JSP. (Haohui Mai via jing9) + + OPTIMIZATIONS + + BUG FIXES + + HDFS-5139. Remove redundant -R option from setrep. + + HDFS-5251. Race between the initialization of NameNode and the http + server. (Haohui Mai via suresh) + + HDFS-5258. Skip tests in TestHDFSCLI that are not applicable on Windows. + (Chuan Liu via cnauroth) + + HDFS-5186. TestFileJournalManager fails on Windows due to file handle leaks. + (Chuan Liu via cnauroth) + + HDFS-5268. NFS write commit verifier is not set in a few places (brandonli) + + HDFS-5265. Namenode fails to start when dfs.https.port is unspecified. + (Haohui Mai via jing9) + + HDFS-5255. Distcp job fails with hsftp when https is enabled in insecure + cluster. (Arpit Agarwal) + + HDFS-5279. Guard against NullPointerException in NameNode JSP pages before + initialization of FSNamesystem. (cnauroth) + + HDFS-5289. Race condition in TestRetryCacheWithHA#testCreateSymlink causes + spurious test failure. (atm) + + HDFS-5300. FSNameSystem#deleteSnapshot() should not check owner in case of + permissions disabled. (Vinay via jing9) + + HDFS-5306. Datanode https port is not available at the namenode. (Suresh + Srinivas via brandonli) + + HDFS-5299. DFS client hangs in updatePipeline RPC when failover happened. + (Vinay via jing9) + + HDFS-5259. Support client which combines appended data with old data + before sends it to NFS server. (brandonli) + +Release 2.1.1-beta - 2013-09-23 INCOMPATIBLE CHANGES @@ -336,6 +505,13 @@ Release 2.1.1-beta - UNRELEASED HDFS-5085. Refactor o.a.h.nfs to support different types of authentications. (jing9) + HDFS-5067 Support symlink operations in NFS gateway. (brandonli) + + HDFS-5199 Add more debug trace for NFS READ and WRITE. (brandonli) + + HDFS-5234 Move RpcFrameDecoder out of the public API. + (Haohui Mai via brandonli) + IMPROVEMENTS HDFS-4513. Clarify in the WebHDFS REST API that all JSON respsonses may @@ -372,6 +548,12 @@ Release 2.1.1-beta - UNRELEASED HDFS-4680. Audit logging of delegation tokens for MR tracing. (Andrew Wang) + HDFS-5212. Refactor RpcMessage and NFS3Response to support different + types of authentication information. (jing9) + + HDFS-4971. Move IO operations out of locking in OpenFileCtx. (brandonli and + jing9) + OPTIMIZATIONS BUG FIXES @@ -440,6 +622,17 @@ Release 2.1.1-beta - UNRELEASED HDFS-5159. Secondary NameNode fails to checkpoint if error occurs downloading edits on first checkpoint. (atm) + HDFS-5192. NameNode may fail to start when + dfs.client.test.drop.namenode.response.number is set. (jing9) + + HDFS-5219. Add configuration keys for retry policy in WebHDFSFileSystem. + (Haohui Mai via jing9) + + HDFS-5231. Fix broken links in the document of HDFS Federation. (Haohui Mai + via jing9) + + HDFS-5249. Fix dumper thread which may die silently. (brandonli) + Release 2.1.0-beta - 2013-08-22 INCOMPATIBLE CHANGES @@ -893,6 +1086,9 @@ Release 2.1.0-beta - 2013-08-22 HDFS-5016. Deadlock in pipeline recovery causes Datanode to be marked dead. (suresh) + HDFS-5228. The RemoteIterator returned by DistributedFileSystem.listFiles + may throw NullPointerException. (szetszwo and cnauroth via szetszwo) + BREAKDOWN OF HDFS-347 SUBTASKS AND RELATED JIRAS HDFS-4353. Encapsulate connections to peers in Peer and PeerServer classes. @@ -3483,6 +3679,9 @@ Release 0.23.10 - UNRELEASED HDFS-5010. Reduce the frequency of getCurrentUser() calls from namenode (kihwal) + HDFS-5346. Avoid unnecessary call to getNumLiveDataNodes() for each block + during IBR processing (Ravi Prakash via kihwal) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml index 537dee79c4d..253dba85e25 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml @@ -36,7 +36,25 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> ${basedir}/../../../../../hadoop-common-project/hadoop-common/target + + + + + org.jboss.netty + netty + 3.2.4.Final + + + + + + org.jboss.netty + netty + compile + + commons-logging commons-logging diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index c37d86d7c51..f62b668175c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -525,14 +525,17 @@ public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, int numResponseToDrop = conf.getInt( DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY, DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT); + NameNodeProxies.ProxyAndInfo proxyInfo = null; if (numResponseToDrop > 0) { // This case is used for testing. LOG.warn(DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY + " is set to " + numResponseToDrop + ", this hacked client will proactively drop responses"); - NameNodeProxies.ProxyAndInfo proxyInfo = NameNodeProxies - .createProxyWithLossyRetryHandler(conf, nameNodeUri, - ClientProtocol.class, numResponseToDrop); + proxyInfo = NameNodeProxies.createProxyWithLossyRetryHandler(conf, + nameNodeUri, ClientProtocol.class, numResponseToDrop); + } + + if (proxyInfo != null) { this.dtService = proxyInfo.getDelegationTokenService(); this.namenode = proxyInfo.getProxy(); } else if (rpcNamenode != null) { @@ -543,9 +546,8 @@ public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, } else { Preconditions.checkArgument(nameNodeUri != null, "null URI"); - NameNodeProxies.ProxyAndInfo proxyInfo = - NameNodeProxies.createProxy(conf, nameNodeUri, ClientProtocol.class); - + proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri, + ClientProtocol.class); this.dtService = proxyInfo.getDelegationTokenService(); this.namenode = proxyInfo.getProxy(); } @@ -902,10 +904,15 @@ public Token getDelegationToken(Text renewer) assert dtService != null; Token token = namenode.getDelegationToken(renewer); - token.setService(this.dtService); - LOG.info("Created " + DelegationTokenIdentifier.stringifyToken(token)); + if (token != null) { + token.setService(this.dtService); + LOG.info("Created " + DelegationTokenIdentifier.stringifyToken(token)); + } else { + LOG.info("Cannot get delegation token from " + renewer); + } return token; + } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 5d2b14c50e9..e9eb7be119a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -20,6 +20,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault; /** * This class contains constants for configuration keys used @@ -198,7 +199,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final boolean DFS_DATANODE_SYNCONCLOSE_DEFAULT = false; public static final String DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY = "dfs.datanode.socket.reuse.keepalive"; public static final int DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT = 1000; - + + public static final String DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY = "dfs.namenode.datanode.registration.ip-hostname-check"; + public static final boolean DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_DEFAULT = true; + public static final String DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES = "dfs.namenode.list.cache.pools.num.responses"; public static final int DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT = 100; @@ -364,6 +368,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_BLOCK_ACCESS_TOKEN_LIFETIME_KEY = "dfs.block.access.token.lifetime"; public static final long DFS_BLOCK_ACCESS_TOKEN_LIFETIME_DEFAULT = 600L; + public static final String DFS_BLOCK_REPLICATOR_CLASSNAME_KEY = "dfs.block.replicator.classname"; + public static final Class DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT = BlockPlacementPolicyDefault.class; public static final String DFS_REPLICATION_MAX_KEY = "dfs.replication.max"; public static final int DFS_REPLICATION_MAX_DEFAULT = 512; public static final String DFS_DF_INTERVAL_KEY = "dfs.df.interval"; @@ -534,4 +540,16 @@ public class DFSConfigKeys extends CommonConfigurationKeys { // Timeout to wait for block receiver and responder thread to stop public static final String DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY = "dfs.datanode.xceiver.stop.timeout.millis"; public static final long DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT = 60000; + + // WebHDFS retry policy + public static final String DFS_HTTP_CLIENT_RETRY_POLICY_ENABLED_KEY = "dfs.http.client.retry.policy.enabled"; + public static final boolean DFS_HTTP_CLIENT_RETRY_POLICY_ENABLED_DEFAULT = false; + public static final String DFS_HTTP_CLIENT_RETRY_POLICY_SPEC_KEY = "dfs.http.client.retry.policy.spec"; + public static final String DFS_HTTP_CLIENT_RETRY_POLICY_SPEC_DEFAULT = "10000,6,60000,10"; //t1,n1,t2,n2,... + public static final String DFS_HTTP_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY = "dfs.http.client.failover.max.attempts"; + public static final int DFS_HTTP_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT = 15; + public static final String DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY = "dfs.http.client.failover.sleep.base.millis"; + public static final int DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT = 500; + public static final String DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY = "dfs.http.client.failover.sleep.max.millis"; + public static final int DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT = 15000; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 74fcc6f14ea..49ecb268646 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -403,7 +403,7 @@ private synchronized LocatedBlock getBlockAt(long offset, //check offset if (offset < 0 || offset >= getFileLength()) { - throw new IOException("offset < 0 || offset > getFileLength(), offset=" + throw new IOException("offset < 0 || offset >= getFileLength(), offset=" + offset + ", updatePosition=" + updatePosition + ", locatedBlocks=" + locatedBlocks); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 927c530556d..28332fa0bd3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -38,6 +38,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.CanSetDropBehind; @@ -85,7 +86,6 @@ import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Time; -import org.mortbay.log.Log; import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.CacheBuilder; @@ -141,7 +141,7 @@ public class DFSOutputStream extends FSOutputSummer private long bytesCurBlock = 0; // bytes writen in current block private int packetSize = 0; // write packet size, not including the header. private int chunksPerPacket = 0; - private volatile IOException lastException = null; + private final AtomicReference lastException = new AtomicReference(); private long artificialSlowdown = 0; private long lastFlushOffset = 0; // offset when flush was invoked //persist blocks on namenode @@ -809,8 +809,8 @@ private boolean processDatanodeError() throws IOException { if (++pipelineRecoveryCount > 5) { DFSClient.LOG.warn("Error recovering pipeline for writing " + block + ". Already retried 5 times for the same packet."); - lastException = new IOException("Failing write. Tried pipeline " + - "recovery 5 times without success."); + lastException.set(new IOException("Failing write. Tried pipeline " + + "recovery 5 times without success.")); streamerClosed = true; return false; } @@ -1000,8 +1000,8 @@ private boolean setupPipelineForAppendOrRecovery() throws IOException { } } if (nodes.length <= 1) { - lastException = new IOException("All datanodes " + pipelineMsg - + " are bad. Aborting..."); + lastException.set(new IOException("All datanodes " + pipelineMsg + + " are bad. Aborting...")); streamerClosed = true; return false; } @@ -1016,7 +1016,7 @@ private boolean setupPipelineForAppendOrRecovery() throws IOException { newnodes.length-errorIndex); nodes = newnodes; hasError = false; - lastException = null; + lastException.set(null); errorIndex = -1; } @@ -1060,7 +1060,7 @@ private DatanodeInfo[] nextBlockOutputStream(String client) throws IOException { ExtendedBlock oldBlock = block; do { hasError = false; - lastException = null; + lastException.set(null); errorIndex = -1; success = false; @@ -1275,9 +1275,7 @@ Token getBlockToken() { } private void setLastException(IOException e) { - if (lastException == null) { - lastException = e; - } + lastException.compareAndSet(null, e); } } @@ -1309,7 +1307,7 @@ static Socket createSocketForPipeline(final DatanodeInfo first, protected void checkClosed() throws IOException { if (closed) { - IOException e = lastException; + IOException e = lastException.get(); throw e != null ? e : new ClosedChannelException(); } } @@ -1465,6 +1463,7 @@ private void queueCurrentPacket() { private void waitAndQueueCurrentPacket() throws IOException { synchronized (dataQueue) { + try { // If queue is full, then wait till we have enough space while (!closed && dataQueue.size() + ackQueue.size() > MAX_PACKETS) { try { @@ -1483,6 +1482,8 @@ private void waitAndQueueCurrentPacket() throws IOException { } checkClosed(); queueCurrentPacket(); + } catch (ClosedChannelException e) { + } } } @@ -1726,7 +1727,7 @@ private void flushOrSync(boolean isSync, EnumSet syncFlags) DFSClient.LOG.warn("Error while syncing", e); synchronized (this) { if (!closed) { - lastException = new IOException("IOException flush:" + e); + lastException.set(new IOException("IOException flush:" + e)); closeThreads(true); } } @@ -1784,21 +1785,25 @@ private void waitForAckedSeqno(long seqno) throws IOException { if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Waiting for ack for: " + seqno); } - synchronized (dataQueue) { - while (!closed) { - checkClosed(); - if (lastAckedSeqno >= seqno) { - break; - } - try { - dataQueue.wait(1000); // when we receive an ack, we notify on dataQueue - } catch (InterruptedException ie) { - throw new InterruptedIOException( - "Interrupted while waiting for data to be acknowledged by pipeline"); + try { + synchronized (dataQueue) { + while (!closed) { + checkClosed(); + if (lastAckedSeqno >= seqno) { + break; + } + try { + dataQueue.wait(1000); // when we receive an ack, we notify on + // dataQueue + } catch (InterruptedException ie) { + throw new InterruptedIOException( + "Interrupted while waiting for data to be acknowledged by pipeline"); + } } } + checkClosed(); + } catch (ClosedChannelException e) { } - checkClosed(); } private synchronized void start() { @@ -1844,7 +1849,7 @@ private void closeThreads(boolean force) throws IOException { @Override public synchronized void close() throws IOException { if (closed) { - IOException e = lastException; + IOException e = lastException.getAndSet(null); if (e == null) return; else @@ -1872,6 +1877,7 @@ public synchronized void close() throws IOException { closeThreads(false); completeFile(lastBlock); dfsClient.endFileLease(src); + } catch (ClosedChannelException e) { } finally { closed = true; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java index e3b61abc0bd..27c0059b791 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java @@ -38,6 +38,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.security.SecureRandom; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -593,6 +594,48 @@ public static Map> getHaNnRpcAddresses( Configuration conf) { return getAddresses(conf, null, DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY); } + + /** + * Returns list of InetSocketAddress corresponding to HA NN HTTP addresses from + * the configuration. + * + * @param conf configuration + * @return list of InetSocketAddresses + */ + public static Map> getHaNnHttpAddresses( + Configuration conf) { + return getAddresses(conf, null, DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY); + } + + /** + * Resolve an HDFS URL into real INetSocketAddress. It works like a DNS resolver + * when the URL points to an non-HA cluster. When the URL points to an HA + * cluster, the resolver further resolves the logical name (i.e., the authority + * in the URL) into real namenode addresses. + */ + public static InetSocketAddress[] resolve(URI uri, int schemeDefaultPort, + Configuration conf) throws IOException { + ArrayList ret = new ArrayList(); + + if (!HAUtil.isLogicalUri(conf, uri)) { + InetSocketAddress addr = NetUtils.createSocketAddr(uri.getAuthority(), + schemeDefaultPort); + ret.add(addr); + + } else { + Map> addresses = DFSUtil + .getHaNnHttpAddresses(conf); + + for (Map addrs : addresses.values()) { + for (InetSocketAddress addr : addrs.values()) { + ret.add(addr); + } + } + } + + InetSocketAddress[] r = new InetSocketAddress[ret.size()]; + return ret.toArray(r); + } /** * Returns list of InetSocketAddress corresponding to backup node rpc diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index a51c31116ab..d2d316834e4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -716,6 +716,7 @@ public FileStatus[] next(final FileSystem fs, final Path p) protected RemoteIterator listLocatedStatus(final Path p, final PathFilter filter) throws IOException { + final Path absF = fixRelativePart(p); return new RemoteIterator() { private DirectoryListing thisListing; private int i; @@ -725,7 +726,7 @@ protected RemoteIterator listLocatedStatus(final Path p, { // initializer // Fully resolve symlinks in path first to avoid additional resolution // round-trips as we fetch more batches of listings - src = getPathName(resolvePath(p)); + src = getPathName(resolvePath(absF)); // fetch the first batch of entries in the directory thisListing = dfs.listPaths(src, HdfsFileStatus.EMPTY_NAME, true); statistics.incrementReadOps(1); @@ -739,7 +740,7 @@ public boolean hasNext() throws IOException { while (curStat == null && hasNextNoFilter()) { LocatedFileStatus next = ((HdfsLocatedFileStatus)thisListing.getPartialListing()[i++]) - .makeQualifiedLocated(getUri(), p); + .makeQualifiedLocated(getUri(), absF); if (filter.accept(next.getPath())) { curStat = next; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java index 9674b6d6f7b..7d53fb991d8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java @@ -17,15 +17,9 @@ */ package org.apache.hadoop.hdfs; -import static org.apache.hadoop.hdfs.DFSConfigKeys.*; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Map; - +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.HadoopIllegalArgumentException; @@ -41,11 +35,17 @@ import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; -import static org.apache.hadoop.hdfs.protocol.HdfsConstants.HA_DT_SERVICE_PREFIX; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.*; +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.HA_DT_SERVICE_PREFIX; public class HAUtil { @@ -265,10 +265,15 @@ public static void cloneDelegationTokenForLogicalUri( tokenSelector.selectToken(haService, ugi.getTokens()); if (haToken != null) { for (InetSocketAddress singleNNAddr : nnAddrs) { + // this is a minor hack to prevent physical HA tokens from being + // exposed to the user via UGI.getCredentials(), otherwise these + // cloned tokens may be inadvertently propagated to jobs Token specificToken = - new Token(haToken); + new Token.PrivateToken(haToken); SecurityUtil.setTokenService(specificToken, singleNNAddr); - ugi.addToken(specificToken); + Text alias = + new Text(HA_DT_SERVICE_PREFIX + "//" + specificToken.getService()); + ugi.addToken(alias, specificToken); LOG.debug("Mapped HA service delegation token for logical URI " + haUri + " to namenode " + singleNNAddr); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java index dd5e9c6daa0..361f6a0c462 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java @@ -94,7 +94,6 @@ public class HftpFileSystem extends FileSystem private URI hftpURI; protected URI nnUri; - protected URI nnSecureUri; public static final String HFTP_TIMEZONE = "UTC"; public static final String HFTP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ"; @@ -134,34 +133,33 @@ protected int getDefaultPort() { DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT); } - protected int getDefaultSecurePort() { - return getConf().getInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY, - DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT); - } - + /** + * We generate the address with one of the following ports, in + * order of preference. + * 1. Port from the hftp URI e.g. hftp://namenode:4000/ will return 4000. + * 2. Port configured via DFS_NAMENODE_HTTP_PORT_KEY + * 3. DFS_NAMENODE_HTTP_PORT_DEFAULT i.e. 50070. + * + * @param uri + * @return + */ protected InetSocketAddress getNamenodeAddr(URI uri) { // use authority so user supplied uri can override port return NetUtils.createSocketAddr(uri.getAuthority(), getDefaultPort()); } - protected InetSocketAddress getNamenodeSecureAddr(URI uri) { - // must only use the host and the configured https port - return NetUtils.createSocketAddrForHost(uri.getHost(), getDefaultSecurePort()); - } - protected URI getNamenodeUri(URI uri) { - return DFSUtil.createUri("http", getNamenodeAddr(uri)); - } - - protected URI getNamenodeSecureUri(URI uri) { - return DFSUtil.createUri("http", getNamenodeSecureAddr(uri)); + return DFSUtil.createUri(getUnderlyingProtocol(), getNamenodeAddr(uri)); } + /** + * See the documentation of {@Link #getNamenodeAddr(URI)} for the logic + * behind selecting the canonical service name. + * @return + */ @Override public String getCanonicalServiceName() { - // unlike other filesystems, hftp's service is the secure port, not the - // actual port in the uri - return SecurityUtil.buildTokenService(nnSecureUri).toString(); + return SecurityUtil.buildTokenService(nnUri).toString(); } @Override @@ -187,7 +185,6 @@ public void initialize(final URI name, final Configuration conf) setConf(conf); this.ugi = UserGroupInformation.getCurrentUser(); this.nnUri = getNamenodeUri(name); - this.nnSecureUri = getNamenodeSecureUri(name); try { this.hftpURI = new URI(name.getScheme(), name.getAuthority(), null, null, null); @@ -225,7 +222,7 @@ protected void initDelegationToken() throws IOException { protected Token selectDelegationToken( UserGroupInformation ugi) { - return hftpTokenSelector.selectToken(nnSecureUri, ugi.getTokens(), getConf()); + return hftpTokenSelector.selectToken(nnUri, ugi.getTokens(), getConf()); } @@ -234,6 +231,13 @@ public Token getRenewToken() { return renewToken; } + /** + * Return the underlying protocol that is used to talk to the namenode. + */ + protected String getUnderlyingProtocol() { + return "http"; + } + @Override public synchronized void setDelegationToken(Token token) { renewToken = token; @@ -257,7 +261,7 @@ public synchronized Token getDelegationToken(final String renewer return ugi.doAs(new PrivilegedExceptionAction>() { @Override public Token run() throws IOException { - final String nnHttpUrl = nnSecureUri.toString(); + final String nnHttpUrl = nnUri.toString(); Credentials c; try { c = DelegationTokenFetcher.getDTfromRemote(nnHttpUrl, renewer); @@ -301,7 +305,7 @@ public URI getUri() { * @throws IOException on error constructing the URL */ protected URL getNamenodeURL(String path, String query) throws IOException { - final URL url = new URL("http", nnUri.getHost(), + final URL url = new URL(getUnderlyingProtocol(), nnUri.getHost(), nnUri.getPort(), path + '?' + query); if (LOG.isTraceEnabled()) { LOG.trace("url=" + url); @@ -703,17 +707,20 @@ public boolean isManaged(Token token) throws IOException { return true; } + protected String getUnderlyingProtocol() { + return "http"; + } + @SuppressWarnings("unchecked") @Override public long renew(Token token, Configuration conf) throws IOException { // update the kerberos credentials, if they are coming from a keytab UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab(); - // use http to renew the token InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token); return DelegationTokenFetcher.renewDelegationToken - (DFSUtil.createUri("http", serviceAddr).toString(), + (DFSUtil.createUri(getUnderlyingProtocol(), serviceAddr).toString(), (Token) token); } @@ -723,10 +730,9 @@ public void cancel(Token token, Configuration conf) throws IOException { // update the kerberos credentials, if they are coming from a keytab UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab(); - // use http to cancel the token InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token); DelegationTokenFetcher.cancelDelegationToken - (DFSUtil.createUri("http", serviceAddr).toString(), + (DFSUtil.createUri(getUnderlyingProtocol(), serviceAddr).toString(), (Token) token); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HsftpFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HsftpFileSystem.java index 6a3bdba593b..5f5c4836953 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HsftpFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HsftpFileSystem.java @@ -68,6 +68,14 @@ public String getScheme() { return "hsftp"; } + /** + * Return the underlying protocol that is used to talk to the namenode. + */ + @Override + protected String getUnderlyingProtocol() { + return "https"; + } + @Override public void initialize(URI name, Configuration conf) throws IOException { super.initialize(name, conf); @@ -134,24 +142,15 @@ private static void setupSsl(Configuration conf) throws IOException { @Override protected int getDefaultPort() { - return getDefaultSecurePort(); + return getConf().getInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY, + DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT); } - @Override - protected InetSocketAddress getNamenodeSecureAddr(URI uri) { - return getNamenodeAddr(uri); - } - - @Override - protected URI getNamenodeUri(URI uri) { - return getNamenodeSecureUri(uri); - } - @Override protected HttpURLConnection openConnection(String path, String query) throws IOException { query = addDelegationTokenParam(query); - final URL url = new URL("https", nnUri.getHost(), + final URL url = new URL(getUnderlyingProtocol(), nnUri.getHost(), nnUri.getPort(), path + '?' + query); HttpsURLConnection conn; conn = (HttpsURLConnection)connectionFactory.openConnection(url); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java index 41dac6a80f6..bff0284c74f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java @@ -158,8 +158,8 @@ public static ProxyAndInfo createProxy(Configuration conf, * Generate a dummy namenode proxy instance that utilizes our hacked * {@link LossyRetryInvocationHandler}. Proxy instance generated using this * method will proactively drop RPC responses. Currently this method only - * support HA setup. IllegalStateException will be thrown if the given - * configuration is not for HA. + * support HA setup. null will be returned if the given configuration is not + * for HA. * * @param config the configuration containing the required IPC * properties, client failover configurations, etc. @@ -168,7 +168,8 @@ public static ProxyAndInfo createProxy(Configuration conf, * @param xface the IPC interface which should be created * @param numResponseToDrop The number of responses to drop for each RPC call * @return an object containing both the proxy and the associated - * delegation token service it corresponds to + * delegation token service it corresponds to. Will return null of the + * given configuration does not support HA. * @throws IOException if there is an error creating the proxy */ @SuppressWarnings("unchecked") @@ -204,8 +205,9 @@ public static ProxyAndInfo createProxyWithLossyRetryHandler( Text dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri); return new ProxyAndInfo(proxy, dtService); } else { - throw new IllegalStateException("Currently creating proxy using " + + LOG.warn("Currently creating proxy using " + "LossyRetryInvocationHandler requires NN HA setup"); + return null; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java index 2a0578ca93f..9a012107b6d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java @@ -43,6 +43,7 @@ public class DatanodeID implements Comparable { private String storageID; // unique per cluster storageID private int xferPort; // data streaming port private int infoPort; // info server port + private int infoSecurePort; // info server port private int ipcPort; // IPC server port public DatanodeID(DatanodeID from) { @@ -51,6 +52,7 @@ public DatanodeID(DatanodeID from) { from.getStorageID(), from.getXferPort(), from.getInfoPort(), + from.getInfoSecurePort(), from.getIpcPort()); this.peerHostName = from.getPeerHostName(); } @@ -65,12 +67,13 @@ public DatanodeID(DatanodeID from) { * @param ipcPort ipc server port */ public DatanodeID(String ipAddr, String hostName, String storageID, - int xferPort, int infoPort, int ipcPort) { + int xferPort, int infoPort, int infoSecurePort, int ipcPort) { this.ipAddr = ipAddr; this.hostName = hostName; this.storageID = storageID; this.xferPort = xferPort; this.infoPort = infoPort; + this.infoSecurePort = infoSecurePort; this.ipcPort = ipcPort; } @@ -128,6 +131,13 @@ public String getInfoAddr() { return ipAddr + ":" + infoPort; } + /** + * @return IP:infoPort string + */ + public String getInfoSecureAddr() { + return ipAddr + ":" + infoSecurePort; + } + /** * @return hostname:xferPort */ @@ -179,6 +189,13 @@ public int getInfoPort() { return infoPort; } + /** + * @return infoSecurePort (the port at which the HTTPS server bound to) + */ + public int getInfoSecurePort() { + return infoSecurePort; + } + /** * @return ipcPort (the port at which the IPC server bound to) */ @@ -218,13 +235,14 @@ public void updateRegInfo(DatanodeID nodeReg) { peerHostName = nodeReg.getPeerHostName(); xferPort = nodeReg.getXferPort(); infoPort = nodeReg.getInfoPort(); + infoSecurePort = nodeReg.getInfoSecurePort(); ipcPort = nodeReg.getIpcPort(); } /** * Compare based on data transfer address. * - * @param that + * @param that datanode to compare with * @return as specified by Comparable */ @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java index dc12a924eaf..17636ed146b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java @@ -17,10 +17,6 @@ */ package org.apache.hadoop.hdfs.protocol; -import static org.apache.hadoop.hdfs.DFSUtil.percent2String; - -import java.util.Date; - import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -32,6 +28,10 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; +import java.util.Date; + +import static org.apache.hadoop.hdfs.DFSUtil.percent2String; + /** * This class extends the primary identifier of a Datanode with ephemeral * state, eg usage information, current administrative state, and the @@ -115,20 +115,23 @@ public DatanodeInfo(DatanodeID nodeID, String location, final long blockPoolUsed, final long cacheCapacity, final long cacheUsed, final long lastUpdate, final int xceiverCount, final AdminStates adminState) { - this(nodeID.getIpAddr(), nodeID.getHostName(), nodeID.getStorageID(), nodeID.getXferPort(), - nodeID.getInfoPort(), nodeID.getIpcPort(), capacity, dfsUsed, remaining, - blockPoolUsed, cacheCapacity, cacheUsed, lastUpdate, xceiverCount, - location, adminState); + this(nodeID.getIpAddr(), nodeID.getHostName(), nodeID.getStorageID(), + nodeID.getXferPort(), nodeID.getInfoPort(), nodeID.getInfoSecurePort(), + nodeID.getIpcPort(), capacity, dfsUsed, remaining, blockPoolUsed, + cacheCapacity, cacheUsed, lastUpdate, xceiverCount, location, + adminState); } /** Constructor */ public DatanodeInfo(final String ipAddr, final String hostName, - final String storageID, final int xferPort, final int infoPort, final int ipcPort, + final String storageID, final int xferPort, final int infoPort, + final int infoSecurePort, final int ipcPort, final long capacity, final long dfsUsed, final long remaining, final long blockPoolUsed, final long cacheCapacity, final long cacheUsed, final long lastUpdate, final int xceiverCount, final String networkLocation, final AdminStates adminState) { - super(ipAddr, hostName, storageID, xferPort, infoPort, ipcPort); + super(ipAddr, hostName, storageID, xferPort, infoPort, + infoSecurePort, ipcPort); this.capacity = capacity; this.dfsUsed = dfsUsed; this.remaining = remaining; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 4f9ce6c79aa..27586117937 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -228,7 +228,8 @@ public static NamenodeRegistration convert(NamenodeRegistrationProto reg) { // DatanodeId public static DatanodeID convert(DatanodeIDProto dn) { return new DatanodeID(dn.getIpAddr(), dn.getHostName(), dn.getStorageID(), - dn.getXferPort(), dn.getInfoPort(), dn.getIpcPort()); + dn.getXferPort(), dn.getInfoPort(), dn.hasInfoSecurePort() ? dn + .getInfoSecurePort() : 0, dn.getIpcPort()); } public static DatanodeIDProto convert(DatanodeID dn) { @@ -238,6 +239,7 @@ public static DatanodeIDProto convert(DatanodeID dn) { .setStorageID(dn.getStorageID()) .setXferPort(dn.getXferPort()) .setInfoPort(dn.getInfoPort()) + .setInfoSecurePort(dn.getInfoSecurePort()) .setIpcPort(dn.getIpcPort()).build(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java index f233d1f5f72..b2446cbb806 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step; import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType; import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SecurityUtil; @@ -115,6 +116,24 @@ public byte[] retrievePassword( return super.retrievePassword(identifier); } + @Override + public byte[] retriableRetrievePassword(DelegationTokenIdentifier identifier) + throws InvalidToken, StandbyException, RetriableException, IOException { + namesystem.checkOperation(OperationCategory.READ); + try { + return super.retrievePassword(identifier); + } catch (InvalidToken it) { + if (namesystem.inTransitionToActive()) { + // if the namesystem is currently in the middle of transition to + // active state, let client retry since the corresponding editlog may + // have not been applied yet + throw new RetriableException(it); + } else { + throw it; + } + } + } + /** * Returns expiry time of a token given its identifier. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java index 1ed0e1915f4..befdd90a79e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java @@ -506,7 +506,7 @@ private static class BalancerDatanode { final DatanodeInfo datanode; final double utilization; final long maxSize2Move; - protected long scheduledSize = 0L; + private long scheduledSize = 0L; // blocks being moved but not confirmed yet private List pendingBlocks = new ArrayList(MAX_NUM_CONCURRENT_MOVES); @@ -555,20 +555,35 @@ protected String getStorageID() { } /** Decide if still need to move more bytes */ - protected boolean hasSpaceForScheduling() { + protected synchronized boolean hasSpaceForScheduling() { return scheduledSize0 && + while(!isTimeUp && getScheduledSize()>0 && (!srcBlockList.isEmpty() || blocksToReceive>0)) { PendingBlockMove pendingBlock = chooseNextBlockToMove(); if (pendingBlock != null) { @@ -779,7 +795,7 @@ private void dispatchBlocks() { // in case no blocks can be moved for source node's task, // jump out of while-loop after 5 iterations. if (noPendingBlockIteration >= MAX_NO_PENDING_BLOCK_ITERATIONS) { - scheduledSize = 0; + setScheduledSize(0); } } @@ -992,7 +1008,7 @@ private long chooseNodes() { long bytesToMove = 0L; for (Source src : sources) { - bytesToMove += src.scheduledSize; + bytesToMove += src.getScheduledSize(); } return bytesToMove; } @@ -1093,7 +1109,7 @@ private synchronized void inc( long bytes ) { bytesMoved += bytes; } - private long get() { + private synchronized long get() { return bytesMoved; } }; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 53699fb2aad..35045548c1d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -26,6 +26,7 @@ import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -74,6 +75,7 @@ import org.apache.hadoop.net.Node; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.util.LightWeightGSet; import org.apache.hadoop.util.Time; import com.google.common.annotations.VisibleForTesting; @@ -89,9 +91,6 @@ public class BlockManager { static final Log LOG = LogFactory.getLog(BlockManager.class); public static final Log blockLog = NameNode.blockStateChangeLog; - /** Default load factor of map */ - public static final float DEFAULT_MAP_LOAD_FACTOR = 0.75f; - private static final String QUEUE_REASON_CORRUPT_STATE = "it has the wrong state or generation stamp"; @@ -243,7 +242,8 @@ public BlockManager(final Namesystem namesystem, final FSClusterStats stats, invalidateBlocks = new InvalidateBlocks(datanodeManager); // Compute the map capacity by allocating 2% of total memory - blocksMap = new BlocksMap(DEFAULT_MAP_LOAD_FACTOR); + blocksMap = new BlocksMap( + LightWeightGSet.computeCapacity(2.0, "BlocksMap")); blockplacement = BlockPlacementPolicy.getInstance( conf, stats, datanodeManager.getNetworkTopology()); pendingReplications = new PendingReplicationBlocks(conf.getInt( @@ -1256,22 +1256,19 @@ int computeReplicationWorkForBlocks(List> blocksToReplicate) { namesystem.writeUnlock(); } - HashMap excludedNodes - = new HashMap(); + final Set excludedNodes = new HashSet(); for(ReplicationWork rw : work){ // Exclude all of the containing nodes from being targets. // This list includes decommissioning or corrupt nodes. excludedNodes.clear(); for (DatanodeDescriptor dn : rw.containingNodes) { - excludedNodes.put(dn, dn); + excludedNodes.add(dn); } // choose replication targets: NOT HOLDING THE GLOBAL LOCK // It is costly to extract the filename for which chooseTargets is called, // so for now we pass in the block collection itself. - rw.targets = blockplacement.chooseTarget(rw.bc, - rw.additionalReplRequired, rw.srcNode, rw.liveReplicaNodes, - excludedNodes, rw.block.getNumBytes()); + rw.chooseTargets(blockplacement, excludedNodes); } namesystem.writeLock(); @@ -1378,12 +1375,12 @@ int computeReplicationWorkForBlocks(List> blocksToReplicate) { * * @throws IOException * if the number of targets < minimum replication. - * @see BlockPlacementPolicy#chooseTarget(String, int, DatanodeDescriptor, - * List, boolean, HashMap, long) + * @see BlockPlacementPolicy#chooseTarget(String, int, Node, + * List, boolean, Set, long) */ public DatanodeDescriptor[] chooseTarget(final String src, final int numOfReplicas, final DatanodeDescriptor client, - final HashMap excludedNodes, + final Set excludedNodes, final long blocksize, List favoredNodes) throws IOException { List favoredDatanodeDescriptors = getDatanodeDescriptors(favoredNodes); @@ -1783,6 +1780,14 @@ private void processFirstBlockReport(final DatanodeDescriptor node, if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) { ((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent( node, iblk, reportedState); + // OpenFileBlocks only inside snapshots also will be added to safemode + // threshold. So we need to update such blocks to safemode + // refer HDFS-5283 + BlockInfoUnderConstruction blockUC = (BlockInfoUnderConstruction) storedBlock; + if (namesystem.isInSnapshot(blockUC)) { + int numOfReplicas = blockUC.getNumExpectedLocations(); + namesystem.incrementSafeBlockCount(numOfReplicas); + } //and fall through to next clause } //add replica if appropriate @@ -3256,6 +3261,13 @@ public ReplicationWork(Block block, this.priority = priority; this.targets = null; } + + private void chooseTargets(BlockPlacementPolicy blockplacement, + Set excludedNodes) { + targets = blockplacement.chooseTarget(bc.getName(), + additionalReplRequired, srcNode, liveReplicaNodes, false, + excludedNodes, block.getNumBytes()); + } } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java index 36a0b2a6c86..48f77298dbb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java @@ -19,14 +19,15 @@ import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.LocatedBlock; @@ -51,25 +52,6 @@ public static class NotEnoughReplicasException extends Exception { } } - /** - * choose numOfReplicas data nodes for writer - * to re-replicate a block with size blocksize - * If not, return as many as we can. - * - * @param srcPath the file to which this chooseTargets is being invoked. - * @param numOfReplicas additional number of replicas wanted. - * @param writer the writer's machine, null if not in the cluster. - * @param chosenNodes datanodes that have been chosen as targets. - * @param blocksize size of the data to be written. - * @return array of DatanodeDescriptor instances chosen as target - * and sorted as a pipeline. - */ - abstract DatanodeDescriptor[] chooseTarget(String srcPath, - int numOfReplicas, - DatanodeDescriptor writer, - List chosenNodes, - long blocksize); - /** * choose numOfReplicas data nodes for writer * to re-replicate a block with size blocksize @@ -87,48 +69,22 @@ abstract DatanodeDescriptor[] chooseTarget(String srcPath, */ public abstract DatanodeDescriptor[] chooseTarget(String srcPath, int numOfReplicas, - DatanodeDescriptor writer, + Node writer, List chosenNodes, boolean returnChosenNodes, - HashMap excludedNodes, + Set excludedNodes, long blocksize); - - /** - * choose numOfReplicas data nodes for writer - * If not, return as many as we can. - * The base implemenatation extracts the pathname of the file from the - * specified srcBC, but this could be a costly operation depending on the - * file system implementation. Concrete implementations of this class should - * override this method to avoid this overhead. - * - * @param srcBC block collection of file for which chooseTarget is invoked. - * @param numOfReplicas additional number of replicas wanted. - * @param writer the writer's machine, null if not in the cluster. - * @param chosenNodes datanodes that have been chosen as targets. - * @param blocksize size of the data to be written. - * @return array of DatanodeDescriptor instances chosen as target - * and sorted as a pipeline. - */ - DatanodeDescriptor[] chooseTarget(BlockCollection srcBC, - int numOfReplicas, - DatanodeDescriptor writer, - List chosenNodes, - HashMap excludedNodes, - long blocksize) { - return chooseTarget(srcBC.getName(), numOfReplicas, writer, - chosenNodes, false, excludedNodes, blocksize); - } /** - * Same as {@link #chooseTarget(String, int, DatanodeDescriptor, List, boolean, - * HashMap, long)} with added parameter {@code favoredDatanodes} + * Same as {@link #chooseTarget(String, int, Node, List, boolean, + * Set, long)} with added parameter {@code favoredDatanodes} * @param favoredNodes datanodes that should be favored as targets. This * is only a hint and due to cluster state, namenode may not be * able to place the blocks on these datanodes. */ DatanodeDescriptor[] chooseTarget(String src, - int numOfReplicas, DatanodeDescriptor writer, - HashMap excludedNodes, + int numOfReplicas, Node writer, + Set excludedNodes, long blocksize, List favoredNodes) { // This class does not provide the functionality of placing // a block in favored datanodes. The implementations of this class @@ -183,7 +139,7 @@ abstract protected void initialize(Configuration conf, FSClusterStats stats, /** * Get an instance of the configured Block Placement Policy based on the - * value of the configuration paramater dfs.block.replicator.classname. + * the configuration property {@link DFS_BLOCK_REPLICATOR_CLASSNAME_KEY}. * * @param conf the configuration to be used * @param stats an object that is used to retrieve the load on the cluster @@ -193,12 +149,12 @@ abstract protected void initialize(Configuration conf, FSClusterStats stats, public static BlockPlacementPolicy getInstance(Configuration conf, FSClusterStats stats, NetworkTopology clusterMap) { - Class replicatorClass = - conf.getClass("dfs.block.replicator.classname", - BlockPlacementPolicyDefault.class, - BlockPlacementPolicy.class); - BlockPlacementPolicy replicator = (BlockPlacementPolicy) ReflectionUtils.newInstance( - replicatorClass, conf); + final Class replicatorClass = conf.getClass( + DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, + DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT, + BlockPlacementPolicy.class); + final BlockPlacementPolicy replicator = ReflectionUtils.newInstance( + replicatorClass, conf); replicator.initialize(conf, stats, clusterMap); return replicator; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index fbb922351bf..7fb4cf01f7d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -21,8 +21,7 @@ import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.TreeSet; @@ -57,6 +56,14 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { "For more information, please enable DEBUG log level on " + BlockPlacementPolicy.class.getName(); + private static final ThreadLocal debugLoggingBuilder + = new ThreadLocal() { + @Override + protected StringBuilder initialValue() { + return new StringBuilder(); + } + }; + protected boolean considerLoad; private boolean preferLocalNode = true; protected NetworkTopology clusterMap; @@ -95,40 +102,25 @@ public void initialize(Configuration conf, FSClusterStats stats, DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT); } - protected ThreadLocal threadLocalBuilder = - new ThreadLocal() { - @Override - protected StringBuilder initialValue() { - return new StringBuilder(); - } - }; - @Override public DatanodeDescriptor[] chooseTarget(String srcPath, int numOfReplicas, - DatanodeDescriptor writer, - List chosenNodes, - long blocksize) { - return chooseTarget(numOfReplicas, writer, chosenNodes, false, - null, blocksize); - } - - @Override - public DatanodeDescriptor[] chooseTarget(String srcPath, - int numOfReplicas, - DatanodeDescriptor writer, + Node writer, List chosenNodes, boolean returnChosenNodes, - HashMap excludedNodes, + Set excludedNodes, long blocksize) { return chooseTarget(numOfReplicas, writer, chosenNodes, returnChosenNodes, excludedNodes, blocksize); } @Override - DatanodeDescriptor[] chooseTarget(String src, int numOfReplicas, - DatanodeDescriptor writer, HashMap excludedNodes, - long blocksize, List favoredNodes) { + DatanodeDescriptor[] chooseTarget(String src, + int numOfReplicas, + Node writer, + Set excludedNodes, + long blocksize, + List favoredNodes) { try { if (favoredNodes == null || favoredNodes.size() == 0) { // Favored nodes not specified, fall back to regular block placement. @@ -137,8 +129,8 @@ DatanodeDescriptor[] chooseTarget(String src, int numOfReplicas, excludedNodes, blocksize); } - HashMap favoriteAndExcludedNodes = excludedNodes == null ? - new HashMap() : new HashMap(excludedNodes); + Set favoriteAndExcludedNodes = excludedNodes == null ? + new HashSet() : new HashSet(excludedNodes); // Choose favored nodes List results = new ArrayList(); @@ -157,10 +149,10 @@ DatanodeDescriptor[] chooseTarget(String src, int numOfReplicas, + " with favored node " + favoredNode); continue; } - favoriteAndExcludedNodes.put(target, target); + favoriteAndExcludedNodes.add(target); } - if (results.size() < numOfReplicas) { + if (results.size() < numOfReplicas) { // Not enough favored nodes, choose other nodes. numOfReplicas -= results.size(); DatanodeDescriptor[] remainingTargets = @@ -181,18 +173,18 @@ DatanodeDescriptor[] chooseTarget(String src, int numOfReplicas, } /** This is the implementation. */ - DatanodeDescriptor[] chooseTarget(int numOfReplicas, - DatanodeDescriptor writer, + private DatanodeDescriptor[] chooseTarget(int numOfReplicas, + Node writer, List chosenNodes, boolean returnChosenNodes, - HashMap excludedNodes, + Set excludedNodes, long blocksize) { if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) { - return new DatanodeDescriptor[0]; + return DatanodeDescriptor.EMPTY_ARRAY; } if (excludedNodes == null) { - excludedNodes = new HashMap(); + excludedNodes = new HashSet(); } int[] result = getMaxNodesPerRack(chosenNodes, numOfReplicas); @@ -204,16 +196,15 @@ DatanodeDescriptor[] chooseTarget(int numOfReplicas, for (DatanodeDescriptor node:chosenNodes) { // add localMachine and related nodes to excludedNodes addToExcludedNodes(node, excludedNodes); - adjustExcludedNodes(excludedNodes, node); } if (!clusterMap.contains(writer)) { - writer=null; + writer = null; } boolean avoidStaleNodes = (stats != null && stats.isAvoidingStaleDataNodesForWrite()); - DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer, + Node localNode = chooseTarget(numOfReplicas, writer, excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes); if (!returnChosenNodes) { results.removeAll(chosenNodes); @@ -236,10 +227,20 @@ private int[] getMaxNodesPerRack(List chosenNodes, return new int[] {numOfReplicas, maxNodesPerRack}; } - /* choose numOfReplicas from all data nodes */ - private DatanodeDescriptor chooseTarget(int numOfReplicas, - DatanodeDescriptor writer, - HashMap excludedNodes, + /** + * choose numOfReplicas from all data nodes + * @param numOfReplicas additional number of replicas wanted + * @param writer the writer's machine, could be a non-DatanodeDescriptor node + * @param excludedNodes datanodes that should not be considered as targets + * @param blocksize size of the data to be written + * @param maxNodesPerRack max nodes allowed per rack + * @param results the target nodes already chosen + * @param avoidStaleNodes avoid stale nodes in replica choosing + * @return local node of writer (not chosen node) + */ + private Node chooseTarget(int numOfReplicas, + Node writer, + Set excludedNodes, long blocksize, int maxNodesPerRack, List results, @@ -251,13 +252,13 @@ private DatanodeDescriptor chooseTarget(int numOfReplicas, int numOfResults = results.size(); boolean newBlock = (numOfResults==0); - if (writer == null && !newBlock) { + if ((writer == null || !(writer instanceof DatanodeDescriptor)) && !newBlock) { writer = results.get(0); } // Keep a copy of original excludedNodes - final HashMap oldExcludedNodes = avoidStaleNodes ? - new HashMap(excludedNodes) : null; + final Set oldExcludedNodes = avoidStaleNodes ? + new HashSet(excludedNodes) : null; try { if (numOfResults == 0) { writer = chooseLocalNode(writer, excludedNodes, blocksize, @@ -304,7 +305,7 @@ private DatanodeDescriptor chooseTarget(int numOfReplicas, // We need to additionally exclude the nodes that were added to the // result list in the successful calls to choose*() above. for (Node node : results) { - oldExcludedNodes.put(node, node); + oldExcludedNodes.add(node); } // Set numOfReplicas, since it can get out of sync with the result list // if the NotEnoughReplicasException was thrown in chooseRandom(). @@ -316,33 +317,30 @@ private DatanodeDescriptor chooseTarget(int numOfReplicas, return writer; } - /* choose localMachine as the target. + /** + * Choose localMachine as the target. * if localMachine is not available, * choose a node on the same rack * @return the chosen node */ - protected DatanodeDescriptor chooseLocalNode( - DatanodeDescriptor localMachine, - HashMap excludedNodes, + protected DatanodeDescriptor chooseLocalNode(Node localMachine, + Set excludedNodes, long blocksize, int maxNodesPerRack, List results, boolean avoidStaleNodes) - throws NotEnoughReplicasException { + throws NotEnoughReplicasException { // if no local machine, randomly choose one node if (localMachine == null) return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes); - if (preferLocalNode) { + if (preferLocalNode && localMachine instanceof DatanodeDescriptor) { + DatanodeDescriptor localDatanode = (DatanodeDescriptor) localMachine; // otherwise try local machine first - Node oldNode = excludedNodes.put(localMachine, localMachine); - if (oldNode == null) { // was not in the excluded list - if (isGoodTarget(localMachine, blocksize, maxNodesPerRack, false, - results, avoidStaleNodes)) { - results.add(localMachine); - // add localMachine and related nodes to excludedNode - addToExcludedNodes(localMachine, excludedNodes); - return localMachine; + if (excludedNodes.add(localMachine)) { // was not in the excluded list + if (addIfIsGoodTarget(localDatanode, excludedNodes, blocksize, + maxNodesPerRack, false, results, avoidStaleNodes) >= 0) { + return localDatanode; } } } @@ -358,26 +356,25 @@ protected DatanodeDescriptor chooseLocalNode( * @return number of new excluded nodes */ protected int addToExcludedNodes(DatanodeDescriptor localMachine, - HashMap excludedNodes) { - Node node = excludedNodes.put(localMachine, localMachine); - return node == null?1:0; + Set excludedNodes) { + return excludedNodes.add(localMachine) ? 1 : 0; } - /* choose one node from the rack that localMachine is on. + /** + * Choose one node from the rack that localMachine is on. * if no such node is available, choose one node from the rack where * a second replica is on. * if still no such node is available, choose a random node * in the cluster. * @return the chosen node */ - protected DatanodeDescriptor chooseLocalRack( - DatanodeDescriptor localMachine, - HashMap excludedNodes, + protected DatanodeDescriptor chooseLocalRack(Node localMachine, + Set excludedNodes, long blocksize, int maxNodesPerRack, List results, boolean avoidStaleNodes) - throws NotEnoughReplicasException { + throws NotEnoughReplicasException { // no local machine, so choose a random machine if (localMachine == null) { return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, @@ -391,9 +388,7 @@ protected DatanodeDescriptor chooseLocalRack( } catch (NotEnoughReplicasException e1) { // find the second replica DatanodeDescriptor newLocal=null; - for(Iterator iter=results.iterator(); - iter.hasNext();) { - DatanodeDescriptor nextNode = iter.next(); + for(DatanodeDescriptor nextNode : results) { if (nextNode != localMachine) { newLocal = nextNode; break; @@ -416,7 +411,8 @@ protected DatanodeDescriptor chooseLocalRack( } } - /* choose numOfReplicas nodes from the racks + /** + * Choose numOfReplicas nodes from the racks * that localMachine is NOT on. * if not enough nodes are available, choose the remaining ones * from the local rack @@ -424,12 +420,12 @@ protected DatanodeDescriptor chooseLocalRack( protected void chooseRemoteRack(int numOfReplicas, DatanodeDescriptor localMachine, - HashMap excludedNodes, + Set excludedNodes, long blocksize, int maxReplicasPerRack, List results, boolean avoidStaleNodes) - throws NotEnoughReplicasException { + throws NotEnoughReplicasException { int oldNumOfReplicas = results.size(); // randomly choose one node from remote racks try { @@ -443,91 +439,58 @@ protected void chooseRemoteRack(int numOfReplicas, } } - /* Randomly choose one target from nodes. - * @return the chosen node + /** + * Randomly choose one target from the given scope. + * @return the chosen node, if there is any. */ - protected DatanodeDescriptor chooseRandom( - String nodes, - HashMap excludedNodes, - long blocksize, - int maxNodesPerRack, - List results, - boolean avoidStaleNodes) - throws NotEnoughReplicasException { - int numOfAvailableNodes = - clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet()); - StringBuilder builder = null; - if (LOG.isDebugEnabled()) { - builder = threadLocalBuilder.get(); - builder.setLength(0); - builder.append("["); - } - boolean badTarget = false; - while(numOfAvailableNodes > 0) { - DatanodeDescriptor chosenNode = - (DatanodeDescriptor)(clusterMap.chooseRandom(nodes)); - - Node oldNode = excludedNodes.put(chosenNode, chosenNode); - if (oldNode == null) { // chosenNode was not in the excluded list - numOfAvailableNodes--; - if (isGoodTarget(chosenNode, blocksize, - maxNodesPerRack, results, avoidStaleNodes)) { - results.add(chosenNode); - // add chosenNode and related nodes to excludedNode - addToExcludedNodes(chosenNode, excludedNodes); - adjustExcludedNodes(excludedNodes, chosenNode); - return chosenNode; - } else { - badTarget = true; - } - } - } - - String detail = enableDebugLogging; - if (LOG.isDebugEnabled()) { - if (badTarget && builder != null) { - detail = builder.append("]").toString(); - builder.setLength(0); - } else detail = ""; - } - throw new NotEnoughReplicasException(detail); + protected DatanodeDescriptor chooseRandom(String scope, + Set excludedNodes, + long blocksize, + int maxNodesPerRack, + List results, + boolean avoidStaleNodes) + throws NotEnoughReplicasException { + return chooseRandom(1, scope, excludedNodes, blocksize, maxNodesPerRack, + results, avoidStaleNodes); } - - /* Randomly choose numOfReplicas targets from nodes. + + /** + * Randomly choose numOfReplicas targets from the given scope. + * @return the first chosen node, if there is any. */ - protected void chooseRandom(int numOfReplicas, - String nodes, - HashMap excludedNodes, + protected DatanodeDescriptor chooseRandom(int numOfReplicas, + String scope, + Set excludedNodes, long blocksize, int maxNodesPerRack, List results, boolean avoidStaleNodes) - throws NotEnoughReplicasException { + throws NotEnoughReplicasException { - int numOfAvailableNodes = - clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet()); + int numOfAvailableNodes = clusterMap.countNumOfAvailableNodes( + scope, excludedNodes); StringBuilder builder = null; if (LOG.isDebugEnabled()) { - builder = threadLocalBuilder.get(); + builder = debugLoggingBuilder.get(); builder.setLength(0); builder.append("["); } boolean badTarget = false; + DatanodeDescriptor firstChosen = null; while(numOfReplicas > 0 && numOfAvailableNodes > 0) { DatanodeDescriptor chosenNode = - (DatanodeDescriptor)(clusterMap.chooseRandom(nodes)); - Node oldNode = excludedNodes.put(chosenNode, chosenNode); - if (oldNode == null) { + (DatanodeDescriptor)clusterMap.chooseRandom(scope); + if (excludedNodes.add(chosenNode)) { //was not in the excluded list numOfAvailableNodes--; - if (isGoodTarget(chosenNode, blocksize, - maxNodesPerRack, results, avoidStaleNodes)) { + int newExcludedNodes = addIfIsGoodTarget(chosenNode, excludedNodes, + blocksize, maxNodesPerRack, considerLoad, results, avoidStaleNodes); + if (newExcludedNodes >= 0) { numOfReplicas--; - results.add(chosenNode); - // add chosenNode and related nodes to excludedNode - int newExcludedNodes = addToExcludedNodes(chosenNode, excludedNodes); + if (firstChosen == null) { + firstChosen = chosenNode; + } numOfAvailableNodes -= newExcludedNodes; - adjustExcludedNodes(excludedNodes, chosenNode); } else { badTarget = true; } @@ -544,34 +507,44 @@ protected void chooseRandom(int numOfReplicas, } throw new NotEnoughReplicasException(detail); } - } - - /** - * After choosing a node to place replica, adjust excluded nodes accordingly. - * It should do nothing here as chosenNode is already put into exlcudeNodes, - * but it can be overridden in subclass to put more related nodes into - * excludedNodes. - * - * @param excludedNodes - * @param chosenNode - */ - protected void adjustExcludedNodes(HashMap excludedNodes, - Node chosenNode) { - // do nothing here. + + return firstChosen; } - /* judge if a node is a good target. - * return true if node has enough space, - * does not have too much load, and the rack does not have too many nodes + /** + * If the given node is a good target, add it to the result list and + * update the set of excluded nodes. + * @return -1 if the given is not a good target; + * otherwise, return the number of nodes added to excludedNodes set. */ - private boolean isGoodTarget(DatanodeDescriptor node, - long blockSize, int maxTargetPerRack, - List results, - boolean avoidStaleNodes) { - return isGoodTarget(node, blockSize, maxTargetPerRack, this.considerLoad, - results, avoidStaleNodes); + int addIfIsGoodTarget(DatanodeDescriptor node, + Set excludedNodes, + long blockSize, + int maxNodesPerRack, + boolean considerLoad, + List results, + boolean avoidStaleNodes) { + if (isGoodTarget(node, blockSize, maxNodesPerRack, considerLoad, + results, avoidStaleNodes)) { + results.add(node); + // add node and related nodes to excludedNode + return addToExcludedNodes(node, excludedNodes); + } else { + return -1; + } } - + + private static void logNodeIsNotChosen(DatanodeDescriptor node, String reason) { + if (LOG.isDebugEnabled()) { + // build the error message for later use. + debugLoggingBuilder.get() + .append(node).append(": ") + .append("Node ").append(NodeBase.getPath(node)) + .append(" is not chosen because ") + .append(reason); + } + } + /** * Determine if a node is a good target. * @@ -588,28 +561,20 @@ private boolean isGoodTarget(DatanodeDescriptor node, * does not have too much load, * and the rack does not have too many nodes. */ - protected boolean isGoodTarget(DatanodeDescriptor node, + private boolean isGoodTarget(DatanodeDescriptor node, long blockSize, int maxTargetPerRack, boolean considerLoad, List results, boolean avoidStaleNodes) { // check if the node is (being) decommissed if (node.isDecommissionInProgress() || node.isDecommissioned()) { - if(LOG.isDebugEnabled()) { - threadLocalBuilder.get().append(node.toString()).append(": ") - .append("Node ").append(NodeBase.getPath(node)) - .append(" is not chosen because the node is (being) decommissioned "); - } + logNodeIsNotChosen(node, "the node is (being) decommissioned "); return false; } if (avoidStaleNodes) { if (node.isStale(this.staleInterval)) { - if (LOG.isDebugEnabled()) { - threadLocalBuilder.get().append(node.toString()).append(": ") - .append("Node ").append(NodeBase.getPath(node)) - .append(" is not chosen because the node is stale "); - } + logNodeIsNotChosen(node, "the node is stale "); return false; } } @@ -618,11 +583,7 @@ protected boolean isGoodTarget(DatanodeDescriptor node, (node.getBlocksScheduled() * blockSize); // check the remaining capacity of the target machine if (blockSize* HdfsConstants.MIN_BLOCKS_FOR_WRITE>remaining) { - if(LOG.isDebugEnabled()) { - threadLocalBuilder.get().append(node.toString()).append(": ") - .append("Node ").append(NodeBase.getPath(node)) - .append(" is not chosen because the node does not have enough space "); - } + logNodeIsNotChosen(node, "the node does not have enough space "); return false; } @@ -634,11 +595,7 @@ protected boolean isGoodTarget(DatanodeDescriptor node, avgLoad = (double)stats.getTotalLoad()/size; } if (node.getXceiverCount() > (2.0 * avgLoad)) { - if(LOG.isDebugEnabled()) { - threadLocalBuilder.get().append(node.toString()).append(": ") - .append("Node ").append(NodeBase.getPath(node)) - .append(" is not chosen because the node is too busy "); - } + logNodeIsNotChosen(node, "the node is too busy "); return false; } } @@ -646,31 +603,25 @@ protected boolean isGoodTarget(DatanodeDescriptor node, // check if the target rack has chosen too many nodes String rackname = node.getNetworkLocation(); int counter=1; - for(Iterator iter = results.iterator(); - iter.hasNext();) { - Node result = iter.next(); + for(Node result : results) { if (rackname.equals(result.getNetworkLocation())) { counter++; } } if (counter>maxTargetPerRack) { - if(LOG.isDebugEnabled()) { - threadLocalBuilder.get().append(node.toString()).append(": ") - .append("Node ").append(NodeBase.getPath(node)) - .append(" is not chosen because the rack has too many chosen nodes "); - } + logNodeIsNotChosen(node, "the rack has too many chosen nodes "); return false; } return true; } - /* Return a pipeline of nodes. + /** + * Return a pipeline of nodes. * The pipeline is formed finding a shortest path that * starts from the writer and traverses all nodes * This is basically a traveling salesman problem. */ - private DatanodeDescriptor[] getPipeline( - DatanodeDescriptor writer, + private DatanodeDescriptor[] getPipeline(Node writer, DatanodeDescriptor[] nodes) { if (nodes.length==0) return nodes; @@ -709,7 +660,7 @@ public int verifyBlockPlacement(String srcPath, int minRacks) { DatanodeInfo[] locs = lBlk.getLocations(); if (locs == null) - locs = new DatanodeInfo[0]; + locs = DatanodeDescriptor.EMPTY_ARRAY; int numRacks = clusterMap.getNumOfRacks(); if(numRacks <= 1) // only one rack return 0; @@ -724,24 +675,18 @@ public int verifyBlockPlacement(String srcPath, @Override public DatanodeDescriptor chooseReplicaToDelete(BlockCollection bc, - Block block, - short replicationFactor, - Collection first, - Collection second) { + Block block, short replicationFactor, + Collection first, + Collection second) { long oldestHeartbeat = now() - heartbeatInterval * tolerateHeartbeatMultiplier; DatanodeDescriptor oldestHeartbeatNode = null; long minSpace = Long.MAX_VALUE; DatanodeDescriptor minSpaceNode = null; - // pick replica from the first Set. If first is empty, then pick replicas - // from second set. - Iterator iter = pickupReplicaSet(first, second); - // Pick the node with the oldest heartbeat or with the least free space, // if all hearbeats are within the tolerable heartbeat interval - while (iter.hasNext() ) { - DatanodeDescriptor node = iter.next(); + for(DatanodeDescriptor node : pickupReplicaSet(first, second)) { long free = node.getRemaining(); long lastHeartbeat = node.getLastUpdate(); if(lastHeartbeat < oldestHeartbeat) { @@ -762,12 +707,10 @@ public DatanodeDescriptor chooseReplicaToDelete(BlockCollection bc, * replica while second set contains remaining replica nodes. * So pick up first set if not empty. If first is empty, then pick second. */ - protected Iterator pickupReplicaSet( + protected Collection pickupReplicaSet( Collection first, Collection second) { - Iterator iter = - first.isEmpty() ? second.iterator() : first.iterator(); - return iter; + return first.isEmpty() ? second : first; } @VisibleForTesting diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java index e98318b9783..1b8f916dd2c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java @@ -20,9 +20,9 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -64,8 +64,8 @@ public void initialize(Configuration conf, FSClusterStats stats, * @return the chosen node */ @Override - protected DatanodeDescriptor chooseLocalNode(DatanodeDescriptor localMachine, - HashMap excludedNodes, long blocksize, int maxNodesPerRack, + protected DatanodeDescriptor chooseLocalNode(Node localMachine, + Set excludedNodes, long blocksize, int maxNodesPerRack, List results, boolean avoidStaleNodes) throws NotEnoughReplicasException { // if no local machine, randomly choose one node @@ -73,18 +73,16 @@ protected DatanodeDescriptor chooseLocalNode(DatanodeDescriptor localMachine, return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes); - // otherwise try local machine first - Node oldNode = excludedNodes.put(localMachine, localMachine); - if (oldNode == null) { // was not in the excluded list - if (isGoodTarget(localMachine, blocksize, - maxNodesPerRack, false, results, avoidStaleNodes)) { - results.add(localMachine); - // Nodes under same nodegroup should be excluded. - addNodeGroupToExcludedNodes(excludedNodes, - localMachine.getNetworkLocation()); - return localMachine; + if (localMachine instanceof DatanodeDescriptor) { + DatanodeDescriptor localDataNode = (DatanodeDescriptor)localMachine; + // otherwise try local machine first + if (excludedNodes.add(localMachine)) { // was not in the excluded list + if (addIfIsGoodTarget(localDataNode, excludedNodes, blocksize, + maxNodesPerRack, false, results, avoidStaleNodes) >= 0) { + return localDataNode; + } } - } + } // try a node on local node group DatanodeDescriptor chosenNode = chooseLocalNodeGroup( @@ -98,26 +96,10 @@ protected DatanodeDescriptor chooseLocalNode(DatanodeDescriptor localMachine, blocksize, maxNodesPerRack, results, avoidStaleNodes); } - @Override - protected void adjustExcludedNodes(HashMap excludedNodes, - Node chosenNode) { - // as node-group aware implementation, it should make sure no two replica - // are placing on the same node group. - addNodeGroupToExcludedNodes(excludedNodes, chosenNode.getNetworkLocation()); - } - // add all nodes under specific nodegroup to excludedNodes. - private void addNodeGroupToExcludedNodes(HashMap excludedNodes, - String nodeGroup) { - List leafNodes = clusterMap.getLeaves(nodeGroup); - for (Node node : leafNodes) { - excludedNodes.put(node, node); - } - } - @Override - protected DatanodeDescriptor chooseLocalRack(DatanodeDescriptor localMachine, - HashMap excludedNodes, long blocksize, int maxNodesPerRack, + protected DatanodeDescriptor chooseLocalRack(Node localMachine, + Set excludedNodes, long blocksize, int maxNodesPerRack, List results, boolean avoidStaleNodes) throws NotEnoughReplicasException { // no local machine, so choose a random machine @@ -137,9 +119,7 @@ protected DatanodeDescriptor chooseLocalRack(DatanodeDescriptor localMachine, } catch (NotEnoughReplicasException e1) { // find the second replica DatanodeDescriptor newLocal=null; - for(Iterator iter=results.iterator(); - iter.hasNext();) { - DatanodeDescriptor nextNode = iter.next(); + for(DatanodeDescriptor nextNode : results) { if (nextNode != localMachine) { newLocal = nextNode; break; @@ -165,7 +145,7 @@ protected DatanodeDescriptor chooseLocalRack(DatanodeDescriptor localMachine, @Override protected void chooseRemoteRack(int numOfReplicas, - DatanodeDescriptor localMachine, HashMap excludedNodes, + DatanodeDescriptor localMachine, Set excludedNodes, long blocksize, int maxReplicasPerRack, List results, boolean avoidStaleNodes) throws NotEnoughReplicasException { int oldNumOfReplicas = results.size(); @@ -191,8 +171,8 @@ protected void chooseRemoteRack(int numOfReplicas, * @return the chosen node */ private DatanodeDescriptor chooseLocalNodeGroup( - NetworkTopologyWithNodeGroup clusterMap, DatanodeDescriptor localMachine, - HashMap excludedNodes, long blocksize, int maxNodesPerRack, + NetworkTopologyWithNodeGroup clusterMap, Node localMachine, + Set excludedNodes, long blocksize, int maxNodesPerRack, List results, boolean avoidStaleNodes) throws NotEnoughReplicasException { // no local machine, so choose a random machine @@ -209,9 +189,7 @@ private DatanodeDescriptor chooseLocalNodeGroup( } catch (NotEnoughReplicasException e1) { // find the second replica DatanodeDescriptor newLocal=null; - for(Iterator iter=results.iterator(); - iter.hasNext();) { - DatanodeDescriptor nextNode = iter.next(); + for(DatanodeDescriptor nextNode : results) { if (nextNode != localMachine) { newLocal = nextNode; break; @@ -248,14 +226,14 @@ protected String getRack(final DatanodeInfo cur) { * within the same nodegroup * @return number of new excluded nodes */ - protected int addToExcludedNodes(DatanodeDescriptor localMachine, - HashMap excludedNodes) { + @Override + protected int addToExcludedNodes(DatanodeDescriptor chosenNode, + Set excludedNodes) { int countOfExcludedNodes = 0; - String nodeGroupScope = localMachine.getNetworkLocation(); + String nodeGroupScope = chosenNode.getNetworkLocation(); List leafNodes = clusterMap.getLeaves(nodeGroupScope); for (Node leafNode : leafNodes) { - Node node = excludedNodes.put(leafNode, leafNode); - if (node == null) { + if (excludedNodes.add(leafNode)) { // not a existing node in excludedNodes countOfExcludedNodes++; } @@ -274,12 +252,12 @@ protected int addToExcludedNodes(DatanodeDescriptor localMachine, * If first is empty, then pick second. */ @Override - public Iterator pickupReplicaSet( + public Collection pickupReplicaSet( Collection first, Collection second) { // If no replica within same rack, return directly. if (first.isEmpty()) { - return second.iterator(); + return second; } // Split data nodes in the first set into two sets, // moreThanOne contains nodes on nodegroup with more than one replica @@ -312,9 +290,7 @@ public Iterator pickupReplicaSet( } } - Iterator iter = - moreThanOne.isEmpty() ? exactlyOne.iterator() : moreThanOne.iterator(); - return iter; + return moreThanOne.isEmpty()? exactlyOne : moreThanOne; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java index 1e454c9bc86..99dd965ef98 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java @@ -57,11 +57,11 @@ public void remove() { /** Constant {@link LightWeightGSet} capacity. */ private final int capacity; - private volatile GSet blocks; + private GSet blocks; - BlocksMap(final float loadFactor) { + BlocksMap(int capacity) { // Use 2% of total memory to size the GSet capacity - this.capacity = LightWeightGSet.computeCapacity(2.0, "BlocksMap"); + this.capacity = capacity; this.blocks = new LightWeightGSet(capacity); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index 7c0eb79b0b4..f7b43e4e05b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -44,7 +44,8 @@ @InterfaceAudience.Private @InterfaceStability.Evolving public class DatanodeDescriptor extends DatanodeInfo { - + public static final DatanodeDescriptor[] EMPTY_ARRAY = {}; + // Stores status of decommissioning. // If node is not decommissioning, do not use this object for anything. public DecommissioningStatus decommissioningStatus = new DecommissioningStatus(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 00672acff50..2ecfde8c8b7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -17,21 +17,9 @@ */ package org.apache.hadoop.hdfs.server.blockmanagement; -import static org.apache.hadoop.util.Time.now; - -import java.io.IOException; -import java.io.PrintWriter; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Comparator; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.NavigableMap; -import java.util.TreeMap; - +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.net.InetAddresses; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.HadoopIllegalArgumentException; @@ -41,13 +29,8 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.*; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList; import org.apache.hadoop.hdfs.server.namenode.CachedBlock; @@ -57,34 +40,24 @@ import org.apache.hadoop.hdfs.server.namenode.HostFileManager.MutableEntrySet; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.Namesystem; -import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand; -import org.apache.hadoop.hdfs.server.protocol.BlockCommand; -import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand; -import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand; +import org.apache.hadoop.hdfs.server.protocol.*; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; -import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; -import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; -import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; -import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException; -import org.apache.hadoop.hdfs.server.protocol.RegisterCommand; import org.apache.hadoop.hdfs.util.CyclicIteration; import org.apache.hadoop.ipc.Server; -import org.apache.hadoop.net.CachedDNSToSwitchMapping; -import org.apache.hadoop.net.DNSToSwitchMapping; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.net.*; import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException; -import org.apache.hadoop.net.Node; -import org.apache.hadoop.net.NodeBase; -import org.apache.hadoop.net.ScriptBasedMapping; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.IntrusiveCollection; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Time; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.net.InetAddresses; +import java.io.IOException; +import java.io.PrintWriter; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.*; + +import static org.apache.hadoop.util.Time.now; /** * Manage datanodes, include decommission and other activities. @@ -131,6 +104,8 @@ public class DatanodeManager { private final int defaultInfoPort; + private final int defaultInfoSecurePort; + private final int defaultIpcPort; /** Read include/exclude files*/ @@ -170,6 +145,7 @@ public class DatanodeManager { */ private boolean hasClusterEverBeenMultiRack = false; + private final boolean checkIpHostnameInRegistration; /** * Whether we should tell datanodes what to cache in replies to * heartbeat messages. @@ -198,7 +174,10 @@ public class DatanodeManager { DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT)).getPort(); this.defaultInfoPort = NetUtils.createSocketAddr( conf.get(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, - DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_DEFAULT)).getPort(); + DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_DEFAULT)).getPort(); + this.defaultInfoSecurePort = NetUtils.createSocketAddr( + conf.get(DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, + DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_DEFAULT)).getPort(); this.defaultIpcPort = NetUtils.createSocketAddr( conf.get(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_DEFAULT)).getPort(); @@ -241,6 +220,12 @@ public class DatanodeManager { LOG.info(DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY + "=" + this.blockInvalidateLimit); + this.checkIpHostnameInRegistration = conf.getBoolean( + DFSConfigKeys.DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY, + DFSConfigKeys.DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_DEFAULT); + LOG.info(DFSConfigKeys.DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY + + "=" + checkIpHostnameInRegistration); + this.avoidStaleDataNodesForRead = conf.getBoolean( DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_DEFAULT); @@ -764,11 +749,13 @@ public void registerDatanode(DatanodeRegistration nodeReg) // Mostly called inside an RPC, update ip and peer hostname String hostname = dnAddress.getHostName(); String ip = dnAddress.getHostAddress(); - if (!isNameResolved(dnAddress)) { + if (checkIpHostnameInRegistration && !isNameResolved(dnAddress)) { // Reject registration of unresolved datanode to prevent performance // impact of repetitive DNS lookups later. - LOG.warn("Unresolved datanode registration from " + ip); - throw new DisallowedDatanodeException(nodeReg); + final String message = "hostname cannot be resolved (ip=" + + ip + ", hostname=" + hostname + ")"; + LOG.warn("Unresolved datanode registration: " + message); + throw new DisallowedDatanodeException(nodeReg, message); } // update node registration with the ip and hostname from rpc request nodeReg.setIpAddr(ip); @@ -897,7 +884,12 @@ nodes with its data cleared (or user can just remove the StorageID // If the network location is invalid, clear the cached mappings // so that we have a chance to re-add this DataNode with the // correct network location later. - dnsToSwitchMapping.reloadCachedMappings(); + List invalidNodeNames = new ArrayList(3); + // clear cache for nodes in IP or Hostname + invalidNodeNames.add(nodeReg.getIpAddr()); + invalidNodeNames.add(nodeReg.getHostName()); + invalidNodeNames.add(nodeReg.getPeerHostName()); + dnsToSwitchMapping.reloadCachedMappings(invalidNodeNames); throw e; } } @@ -1133,6 +1125,7 @@ private DatanodeID parseDNFromHostsEntry(String hostLine) { // The IP:port is sufficient for listing in a report dnId = new DatanodeID(hostStr, "", "", port, DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT, + DFSConfigKeys.DFS_DATANODE_HTTPS_DEFAULT_PORT, DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT); } else { String ipAddr = ""; @@ -1143,6 +1136,7 @@ private DatanodeID parseDNFromHostsEntry(String hostLine) { } dnId = new DatanodeID(ipAddr, hostStr, "", port, DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT, + DFSConfigKeys.DFS_DATANODE_HTTPS_DEFAULT_PORT, DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT); } return dnId; @@ -1190,7 +1184,7 @@ public List getDatanodeListForReport( new DatanodeDescriptor(new DatanodeID(entry.getIpAddress(), entry.getPrefix(), "", entry.getPort() == 0 ? defaultXferPort : entry.getPort(), - defaultInfoPort, defaultIpcPort)); + defaultInfoPort, defaultInfoSecurePort, defaultIpcPort)); dn.setLastUpdate(0); // Consider this node dead for reporting nodes.add(dn); } @@ -1209,17 +1203,17 @@ public List getDatanodeListForReport( /** * Checks if name resolution was successful for the given address. If IP * address and host name are the same, then it means name resolution has - * failed. As a special case, the loopback address is also considered + * failed. As a special case, local addresses are also considered * acceptable. This is particularly important on Windows, where 127.0.0.1 does * not resolve to "localhost". * * @param address InetAddress to check - * @return boolean true if name resolution successful or address is loopback + * @return boolean true if name resolution successful or address is local */ private static boolean isNameResolved(InetAddress address) { String hostname = address.getHostName(); String ip = address.getHostAddress(); - return !hostname.equals(ip) || address.isLoopbackAddress(); + return !hostname.equals(ip) || NetUtils.isLocalAddress(address); } private void setDatanodeDead(DatanodeDescriptor node) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java index 0b82c12b1bd..a34f2cf217a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java @@ -18,24 +18,7 @@ package org.apache.hadoop.hdfs.server.common; -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.URL; -import java.net.URLEncoder; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.TreeSet; - -import javax.servlet.ServletContext; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.jsp.JspWriter; +import com.google.common.base.Charsets; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -47,13 +30,9 @@ import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.net.TcpPeerServer; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.protocol.LocatedBlocks; -import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; +import org.apache.hadoop.hdfs.protocol.*; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; @@ -74,10 +53,22 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.VersionInfo; -import com.google.common.base.Charsets; +import javax.servlet.ServletContext; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.jsp.JspWriter; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.URL; +import java.net.URLEncoder; +import java.util.*; -import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER; import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER; +import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER; @InterfaceAudience.Private public class JspHelper { @@ -112,7 +103,7 @@ public int hashCode() { return super.hashCode(); } } - + // compare two records based on their frequency private static class NodeRecordComparator implements Comparator { @@ -126,6 +117,27 @@ public int compare(NodeRecord o1, NodeRecord o2) { return 0; } } + + /** + * A helper class that generates the correct URL for different schema. + * + */ + public static final class Url { + public static String authority(String scheme, DatanodeID d) { + if (scheme.equals("http")) { + return d.getInfoAddr(); + } else if (scheme.equals("https")) { + return d.getInfoSecureAddr(); + } else { + throw new IllegalArgumentException("Unknown scheme:" + scheme); + } + } + + public static String url(String scheme, DatanodeID d) { + return scheme + "://" + authority(scheme, d); + } + } + public static DatanodeInfo bestNode(LocatedBlocks blks, Configuration conf) throws IOException { HashMap map = @@ -217,7 +229,7 @@ public static void streamBlockInAscii(InetSocketAddress addr, String poolId, offsetIntoBlock, amtToRead, true, "JspHelper", TcpPeerServer.peerFromSocketAndKey(s, encryptionKey), new DatanodeID(addr.getAddress().getHostAddress(), - addr.getHostName(), poolId, addr.getPort(), 0, 0), null, + addr.getHostName(), poolId, addr.getPort(), 0, 0, 0), null, null, null, false, CachingStrategy.newDefaultStrategy()); final byte[] buf = new byte[amtToRead]; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java index be4f9577baf..32f9d9ea3e9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java @@ -100,6 +100,7 @@ class BlockPoolSliceScanner { private long currentPeriodStart = Time.now(); private long bytesLeft = 0; // Bytes to scan in this period private long totalBytesToScan = 0; + private boolean isNewPeriod = true; private final LogFileHandler verificationLog; @@ -126,7 +127,10 @@ static class BlockScanInfo extends Block public int compare(BlockScanInfo left, BlockScanInfo right) { final long l = left.lastScanTime; final long r = right.lastScanTime; - return l < r? -1: l > r? 1: 0; + // compare blocks itself if scantimes are same to avoid. + // because TreeMap uses comparator if available to check existence of + // the object. + return l < r? -1: l > r? 1: left.compareTo(right); } }; @@ -148,8 +152,6 @@ public int hashCode() { public boolean equals(Object that) { if (this == that) { return true; - } else if (that == null || !(that instanceof BlockScanInfo)) { - return false; } return super.equals(that); } @@ -539,10 +541,12 @@ private boolean assignInitialVerificationTimes() { entry.genStamp)); if (info != null) { if (processedBlocks.get(entry.blockId) == null) { - updateBytesLeft(-info.getNumBytes()); + if (isNewPeriod) { + updateBytesLeft(-info.getNumBytes()); + } processedBlocks.put(entry.blockId, 1); } - if (logIterator.isPrevious()) { + if (logIterator.isLastReadFromPrevious()) { // write the log entry to current file // so that the entry is preserved for later runs. verificationLog.append(entry.verificationTime, entry.genStamp, @@ -557,6 +561,7 @@ private boolean assignInitialVerificationTimes() { } finally { IOUtils.closeStream(logIterator); } + isNewPeriod = false; } @@ -597,6 +602,7 @@ private synchronized void startNewPeriod() { // reset the byte counts : bytesLeft = totalBytesToScan; currentPeriodStart = Time.now(); + isNewPeriod = true; } private synchronized boolean workRemainingInCurrentPeriod() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 226eb97d5b3..fee84190486 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -18,67 +18,10 @@ package org.apache.hadoop.hdfs.server.datanode; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_INTERFACE_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_INTERFACE_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_NAMESERVER_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_NAMESERVER_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTPS_ENABLE_KEY; -import static org.apache.hadoop.util.ExitUtil.terminate; - -import java.io.BufferedOutputStream; -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.PrintStream; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.SocketException; -import java.net.SocketTimeoutException; -import java.net.URI; -import java.net.UnknownHostException; -import java.nio.channels.ClosedByInterruptException; -import java.nio.channels.SocketChannel; -import java.security.PrivilegedExceptionAction; -import java.util.AbstractList; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; - +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.protobuf.BlockingService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -95,37 +38,15 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.net.DomainPeerServer; import org.apache.hadoop.hdfs.net.TcpPeerServer; -import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; -import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; -import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; -import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor; -import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; -import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; -import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; +import org.apache.hadoop.hdfs.protocol.*; +import org.apache.hadoop.hdfs.protocol.datatransfer.*; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ClientDatanodeProtocolService; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InterDatanodeProtocolService; -import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB; -import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolServerSideTranslatorPB; -import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; -import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB; -import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB; -import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB; -import org.apache.hadoop.hdfs.protocolPB.PBHelper; -import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; +import org.apache.hadoop.hdfs.protocolPB.*; +import org.apache.hadoop.hdfs.security.token.block.*; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode; -import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; -import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; @@ -140,11 +61,7 @@ import org.apache.hadoop.hdfs.server.namenode.FileChecksumServlets; import org.apache.hadoop.hdfs.server.namenode.StreamFile; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; -import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; -import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; -import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; -import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; -import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; +import org.apache.hadoop.hdfs.server.protocol.*; import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; import org.apache.hadoop.hdfs.web.resources.Param; import org.apache.hadoop.http.HttpServer; @@ -166,22 +83,21 @@ import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.util.Daemon; -import org.apache.hadoop.util.DiskChecker; +import org.apache.hadoop.util.*; import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; -import org.apache.hadoop.util.GenericOptionsParser; -import org.apache.hadoop.util.JvmPauseMonitor; -import org.apache.hadoop.util.ServicePlugin; -import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.util.Time; -import org.apache.hadoop.util.VersionInfo; import org.mortbay.util.ajax.JSON; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.protobuf.BlockingService; +import java.io.*; +import java.net.*; +import java.nio.channels.ClosedByInterruptException; +import java.nio.channels.SocketChannel; +import java.security.PrivilegedExceptionAction; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.*; +import static org.apache.hadoop.util.ExitUtil.terminate; /********************************************************** * DataNode is a class (and program) that stores a set of @@ -263,6 +179,7 @@ public static InetSocketAddress createSocketAddr(String target) { private volatile boolean heartbeatsDisabledForTests = false; private DataStorage storage = null; private HttpServer infoServer = null; + private int infoSecurePort; DataNodeMetrics metrics; private InetSocketAddress streamingAddr; @@ -386,16 +303,13 @@ private void startInfoServer(Configuration conf) throws IOException { InetSocketAddress infoSocAddr = DataNode.getInfoAddr(conf); String infoHost = infoSocAddr.getHostName(); int tmpInfoPort = infoSocAddr.getPort(); - this.infoServer = (secureResources == null) - ? new HttpServer.Builder().setName("datanode") - .setBindAddress(infoHost).setPort(tmpInfoPort) - .setFindPort(tmpInfoPort == 0).setConf(conf) - .setACL(new AccessControlList(conf.get(DFS_ADMIN, " "))).build() - : new HttpServer.Builder().setName("datanode") - .setBindAddress(infoHost).setPort(tmpInfoPort) - .setFindPort(tmpInfoPort == 0).setConf(conf) - .setACL(new AccessControlList(conf.get(DFS_ADMIN, " "))) - .setConnector(secureResources.getListener()).build(); + HttpServer.Builder builder = new HttpServer.Builder().setName("datanode") + .setBindAddress(infoHost).setPort(tmpInfoPort) + .setFindPort(tmpInfoPort == 0).setConf(conf) + .setACL(new AccessControlList(conf.get(DFS_ADMIN, " "))); + this.infoServer = (secureResources == null) ? builder.build() : + builder.setConnector(secureResources.getListener()).build(); + LOG.info("Opened info server at " + infoHost + ":" + tmpInfoPort); if (conf.getBoolean(DFS_HTTPS_ENABLE_KEY, false)) { boolean needClientAuth = conf.getBoolean(DFS_CLIENT_HTTPS_NEED_AUTH_KEY, @@ -409,6 +323,7 @@ private void startInfoServer(Configuration conf) throws IOException { if(LOG.isDebugEnabled()) { LOG.debug("Datanode listening for SSL on " + secInfoSocAddr); } + infoSecurePort = secInfoSocAddr.getPort(); } this.infoServer.addInternalServlet(null, "/streamFile/*", StreamFile.class); this.infoServer.addInternalServlet(null, "/getFileChecksum/*", @@ -796,7 +711,8 @@ DatanodeRegistration createBPRegistration(NamespaceInfo nsInfo) { } DatanodeID dnId = new DatanodeID( streamingAddr.getAddress().getHostAddress(), hostName, - getStorageId(), getXferPort(), getInfoPort(), getIpcPort()); + getStorageId(), getXferPort(), getInfoPort(), + infoSecurePort, getIpcPort()); return new DatanodeRegistration(dnId, storageInfo, new ExportedBlockKeys(), VersionInfo.getVersion()); } @@ -894,7 +810,7 @@ void shutdownBlockPool(BPOfferService bpos) { * If this is the first block pool to register, this also initializes * the datanode-scoped storage. * - * @param nsInfo the handshake response from the NN. + * @param bpos Block pool offer service * @throws IOException if the NN is inconsistent with the local storage. */ void initBlockPool(BPOfferService bpos) throws IOException { @@ -1419,15 +1335,13 @@ private void transferBlock(ExtendedBlock block, DatanodeInfo xferTargets[]) int numTargets = xferTargets.length; if (numTargets > 0) { - if (LOG.isInfoEnabled()) { - StringBuilder xfersBuilder = new StringBuilder(); - for (int i = 0; i < numTargets; i++) { - xfersBuilder.append(xferTargets[i]); - xfersBuilder.append(" "); - } - LOG.info(bpReg + " Starting thread to transfer " + - block + " to " + xfersBuilder); + StringBuilder xfersBuilder = new StringBuilder(); + for (int i = 0; i < numTargets; i++) { + xfersBuilder.append(xferTargets[i]); + xfersBuilder.append(" "); } + LOG.info(bpReg + " Starting thread to transfer " + + block + " to " + xfersBuilder); new Daemon(new DataTransfer(xferTargets, block, BlockConstructionStage.PIPELINE_SETUP_CREATE, "")).start(); @@ -2351,6 +2265,13 @@ public int getInfoPort() { return infoServer.getPort(); } + /** + * @return the datanode's https port + */ + public int getInfoSecurePort() { + return infoSecurePort; + } + /** * Returned information is a JSON representation of a map with * name node host name as the key and block pool Id as the value. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java index 639468bbc75..c931698a32a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java @@ -18,10 +18,9 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; -import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.URI; import java.net.URL; import java.net.URLEncoder; import java.security.PrivilegedExceptionAction; @@ -37,9 +36,9 @@ import org.apache.commons.lang.StringEscapeUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; @@ -47,20 +46,23 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.server.common.JspHelper; -import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; -import org.apache.hadoop.hdfs.server.namenode.NameNode; -import org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer; import org.apache.hadoop.http.HtmlQuoting; -import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.ServletUtil; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.VersionInfo; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; + @InterfaceAudience.Private public class DatanodeJspHelper { + private static final int PREV_BLOCK = -1; + private static final int NEXT_BLOCK = 1; + private static DFSClient getDFSClient(final UserGroupInformation user, final String addr, final Configuration conf @@ -143,10 +145,10 @@ static void generateDirectoryStructure(JspWriter out, out.print("Empty file"); } else { DatanodeInfo chosenNode = JspHelper.bestNode(firstBlock, conf); - String fqdn = canonicalize(chosenNode.getIpAddr()); int datanodePort = chosenNode.getXferPort(); - String redirectLocation = HttpConfig.getSchemePrefix() + fqdn + ":" - + chosenNode.getInfoPort() + "/browseBlock.jsp?blockId=" + String redirectLocation = JspHelper.Url.url(req.getScheme(), + chosenNode) + + "/browseBlock.jsp?blockId=" + firstBlock.getBlock().getBlockId() + "&blockSize=" + firstBlock.getBlock().getNumBytes() + "&genstamp=" + firstBlock.getBlock().getGenerationStamp() + "&filename=" @@ -225,7 +227,7 @@ static void generateDirectoryStructure(JspWriter out, JspHelper.addTableFooter(out); } } - out.print("
Go back to DFS home"); dfs.close(); @@ -302,8 +304,7 @@ static void generateFileDetails(JspWriter out, Long.MAX_VALUE).getLocatedBlocks(); // Add the various links for looking at the file contents // URL for downloading the full file - String downloadUrl = HttpConfig.getSchemePrefix() + req.getServerName() + ":" - + req.getServerPort() + "/streamFile" + ServletUtil.encodePath(filename) + String downloadUrl = "/streamFile" + ServletUtil.encodePath(filename) + JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, nnAddr, true) + JspHelper.getDelegationTokenUrlParam(tokenString); out.print(""); @@ -319,8 +320,8 @@ static void generateFileDetails(JspWriter out, dfs.close(); return; } - String fqdn = canonicalize(chosenNode.getIpAddr()); - String tailUrl = HttpConfig.getSchemePrefix() + fqdn + ":" + chosenNode.getInfoPort() + + String tailUrl = "///" + JspHelper.Url.authority(req.getScheme(), chosenNode) + "/tail.jsp?filename=" + URLEncoder.encode(filename, "UTF-8") + "&namenodeInfoPort=" + namenodeInfoPort + "&chunkSizeToView=" + chunkSizeToView @@ -368,8 +369,7 @@ static void generateFileDetails(JspWriter out, for (int j = 0; j < locs.length; j++) { String datanodeAddr = locs[j].getXferAddr(); datanodePort = locs[j].getXferPort(); - fqdn = canonicalize(locs[j].getIpAddr()); - String blockUrl = HttpConfig.getSchemePrefix() + fqdn + ":" + locs[j].getInfoPort() + String blockUrl = "///" + JspHelper.Url.authority(req.getScheme(), locs[j]) + "/browseBlock.jsp?blockId=" + blockidstring + "&blockSize=" + blockSize + "&filename=" + URLEncoder.encode(filename, "UTF-8") @@ -380,7 +380,7 @@ static void generateFileDetails(JspWriter out, + JspHelper.getDelegationTokenUrlParam(tokenString) + JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, nnAddr); - String blockInfoUrl = HttpConfig.getSchemePrefix() + nnCanonicalName + ":" + String blockInfoUrl = "///" + nnCanonicalName + ":" + namenodeInfoPort + "/block_info_xml.jsp?blockId=" + blockidstring; out.print(" " @@ -391,7 +391,7 @@ static void generateFileDetails(JspWriter out, } out.println(""); out.print("
"); - out.print("
Go back to DFS home"); dfs.close(); @@ -491,9 +491,7 @@ static void generateFileChunks(JspWriter out, HttpServletRequest req, String parent = new File(filename).getParent(); JspHelper.printGotoForm(out, namenodeInfoPort, tokenString, parent, nnAddr); out.print("
"); - out.print("Advanced view/download options
"); out.print("
"); - // Determine the prev & next blocks - long nextStartOffset = 0; - long nextBlockSize = 0; - String nextBlockIdStr = null; - String nextGenStamp = null; - String nextHost = req.getServerName(); - int nextPort = req.getServerPort(); - int nextDatanodePort = datanodePort; - // determine data for the next link - if (startOffset + chunkSizeToView >= blockSize) { - // we have to go to the next block from this point onwards - List blocks = dfs.getNamenode().getBlockLocations(filename, 0, - Long.MAX_VALUE).getLocatedBlocks(); - for (int i = 0; i < blocks.size(); i++) { - if (blocks.get(i).getBlock().getBlockId() == blockId) { - if (i != blocks.size() - 1) { - LocatedBlock nextBlock = blocks.get(i + 1); - nextBlockIdStr = Long.toString(nextBlock.getBlock().getBlockId()); - nextGenStamp = Long.toString(nextBlock.getBlock() - .getGenerationStamp()); - nextStartOffset = 0; - nextBlockSize = nextBlock.getBlock().getNumBytes(); - DatanodeInfo d = JspHelper.bestNode(nextBlock, conf); - nextDatanodePort = d.getXferPort(); - nextHost = d.getIpAddr(); - nextPort = d.getInfoPort(); - } - } - } - } else { - // we are in the same block - nextBlockIdStr = blockId.toString(); - nextStartOffset = startOffset + chunkSizeToView; - nextBlockSize = blockSize; - nextGenStamp = genStamp.toString(); - } - String nextUrl = null; - if (nextBlockIdStr != null) { - nextUrl = HttpConfig.getSchemePrefix() + canonicalize(nextHost) + ":" + nextPort - + "/browseBlock.jsp?blockId=" + nextBlockIdStr - + "&blockSize=" + nextBlockSize - + "&startOffset=" + nextStartOffset - + "&genstamp=" + nextGenStamp - + "&filename=" + URLEncoder.encode(filename, "UTF-8") - + "&chunkSizeToView=" + chunkSizeToView - + "&datanodePort=" + nextDatanodePort - + "&namenodeInfoPort=" + namenodeInfoPort - + JspHelper.getDelegationTokenUrlParam(tokenString) - + JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, nnAddr); + String authority = req.getServerName() + ":" + req.getServerPort(); + String nextUrl = generateLinksForAdjacentBlock(NEXT_BLOCK, authority, + datanodePort, startOffset, chunkSizeToView, blockSize, blockId, + genStamp, dfs, filename, conf, req.getScheme(), tokenString, + namenodeInfoPort, nnAddr); + if (nextUrl != null) { out.print("View Next chunk  "); } - // determine data for the prev link - String prevBlockIdStr = null; - String prevGenStamp = null; - long prevStartOffset = 0; - long prevBlockSize = 0; - String prevHost = req.getServerName(); - int prevPort = req.getServerPort(); - int prevDatanodePort = datanodePort; - if (startOffset == 0) { - List blocks = dfs.getNamenode().getBlockLocations(filename, 0, - Long.MAX_VALUE).getLocatedBlocks(); - for (int i = 0; i < blocks.size(); i++) { - if (blocks.get(i).getBlock().getBlockId() == blockId) { - if (i != 0) { - LocatedBlock prevBlock = blocks.get(i - 1); - prevBlockIdStr = Long.toString(prevBlock.getBlock().getBlockId()); - prevGenStamp = Long.toString(prevBlock.getBlock() - .getGenerationStamp()); - prevStartOffset = prevBlock.getBlock().getNumBytes() - - chunkSizeToView; - if (prevStartOffset < 0) - prevStartOffset = 0; - prevBlockSize = prevBlock.getBlock().getNumBytes(); - DatanodeInfo d = JspHelper.bestNode(prevBlock, conf); - prevDatanodePort = d.getXferPort(); - prevHost = d.getIpAddr(); - prevPort = d.getInfoPort(); - } - } - } - } else { - // we are in the same block - prevBlockIdStr = blockId.toString(); - prevStartOffset = startOffset - chunkSizeToView; - if (prevStartOffset < 0) - prevStartOffset = 0; - prevBlockSize = blockSize; - prevGenStamp = genStamp.toString(); - } - String prevUrl = null; - if (prevBlockIdStr != null) { - prevUrl = HttpConfig.getSchemePrefix() + canonicalize(prevHost) + ":" + prevPort - + "/browseBlock.jsp?blockId=" + prevBlockIdStr - + "&blockSize=" + prevBlockSize - + "&startOffset=" + prevStartOffset - + "&filename=" + URLEncoder.encode(filename, "UTF-8") - + "&chunkSizeToView=" + chunkSizeToView - + "&genstamp=" + prevGenStamp - + "&datanodePort=" + prevDatanodePort - + "&namenodeInfoPort=" + namenodeInfoPort - + JspHelper.getDelegationTokenUrlParam(tokenString) - + JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, nnAddr); + String prevUrl = generateLinksForAdjacentBlock(PREV_BLOCK, authority, + datanodePort, startOffset, chunkSizeToView, blockSize, blockId, + genStamp, dfs, filename, conf, req.getScheme(), tokenString, + namenodeInfoPort, nnAddr); + if (prevUrl != null) { out.print("View Prev chunk  "); } + out.print("
"); out.print("