From a27f99c4a0f17bacd5f93c57b60072e6d1231cf8 Mon Sep 17 00:00:00 2001 From: Todd Lipcon Date: Fri, 18 Nov 2011 09:03:52 +0000 Subject: [PATCH] HDFS-2562. Refactor DN configuration variables out of DataNode class. Contributed by Todd Lipcon. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1203542 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hdfs/server/datanode/BlockReceiver.java | 10 +- .../hdfs/server/datanode/BlockSender.java | 6 +- .../hadoop/hdfs/server/datanode/DNConf.java | 113 ++++++++++++++++ .../hadoop/hdfs/server/datanode/DataNode.java | 127 ++++-------------- .../hdfs/server/datanode/DataXceiver.java | 50 ++++--- .../datanode/TestInterDatanodeProtocol.java | 2 +- 7 files changed, 172 insertions(+), 139 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 58d73d47939..f5584ffd356 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -14,6 +14,9 @@ Release 0.23.1 - UNRELEASED HDFS-2543. HADOOP_PREFIX cannot be overridden. (Bruno Mahé via tomwhite) + HDFS-2562. Refactor DN configuration variables out of DataNode class + (todd) + OPTIMIZATIONS HDFS-2130. Switch default checksum to CRC32C. (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 9277956e1f1..4b961522d6c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -185,8 +185,8 @@ class BlockReceiver implements Closeable { " while receiving block " + block + " from " + inAddr); } } - this.dropCacheBehindWrites = datanode.shouldDropCacheBehindWrites(); - this.syncBehindWrites = datanode.shouldSyncBehindWrites(); + this.dropCacheBehindWrites = datanode.getDnConf().dropCacheBehindWrites; + this.syncBehindWrites = datanode.getDnConf().syncBehindWrites; final boolean isCreate = isDatanode || isTransfer || stage == BlockConstructionStage.PIPELINE_SETUP_CREATE; @@ -249,7 +249,7 @@ class BlockReceiver implements Closeable { try { if (checksumOut != null) { checksumOut.flush(); - if (datanode.syncOnClose && (cout instanceof FileOutputStream)) { + if (datanode.getDnConf().syncOnClose && (cout instanceof FileOutputStream)) { ((FileOutputStream)cout).getChannel().force(true); } checksumOut.close(); @@ -265,7 +265,7 @@ class BlockReceiver implements Closeable { try { if (out != null) { out.flush(); - if (datanode.syncOnClose && (out instanceof FileOutputStream)) { + if (datanode.getDnConf().syncOnClose && (out instanceof FileOutputStream)) { ((FileOutputStream)out).getChannel().force(true); } out.close(); @@ -435,7 +435,7 @@ class BlockReceiver implements Closeable { * calculation in DFSClient to make the guess accurate. */ int chunkSize = bytesPerChecksum + checksumSize; - int chunksPerPacket = (datanode.writePacketSize - PacketHeader.PKT_HEADER_LEN + int chunksPerPacket = (datanode.getDnConf().writePacketSize - PacketHeader.PKT_HEADER_LEN + chunkSize - 1)/chunkSize; buf = ByteBuffer.allocate(PacketHeader.PKT_HEADER_LEN + Math.max(chunksPerPacket, 1) * chunkSize); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java index f4168ee1c90..cf4e8032600 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java @@ -185,8 +185,8 @@ class BlockSender implements java.io.Closeable { this.corruptChecksumOk = corruptChecksumOk; this.verifyChecksum = verifyChecksum; this.clientTraceFmt = clientTraceFmt; - this.readaheadLength = datanode.getReadaheadLength(); - this.shouldDropCacheBehindRead = datanode.shouldDropCacheBehindReads(); + this.readaheadLength = datanode.getDnConf().readaheadLength; + this.shouldDropCacheBehindRead = datanode.getDnConf().dropCacheBehindReads; synchronized(datanode.data) { this.replica = getReplica(block, datanode); @@ -215,7 +215,7 @@ class BlockSender implements java.io.Closeable { // transferToFully() fails on 32 bit platforms for block sizes >= 2GB, // use normal transfer in those cases - this.transferToAllowed = datanode.transferToAllowed && + this.transferToAllowed = datanode.getDnConf().transferToAllowed && (!is32Bit || length <= Integer.MAX_VALUE); DataChecksum csum; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java new file mode 100644 index 00000000000..f0f4737df92 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_TRANSFERTO_ALLOWED_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; + +/** + * Simple class encapsulating all of the configuration that the DataNode + * loads at startup time. + */ +class DNConf { + final int socketTimeout; + final int socketWriteTimeout; + final int socketKeepaliveTimeout; + + final boolean transferToAllowed; + final boolean dropCacheBehindWrites; + final boolean syncBehindWrites; + final boolean dropCacheBehindReads; + final boolean syncOnClose; + + + final long readaheadLength; + final long heartBeatInterval; + final long blockReportInterval; + final long initialBlockReportDelay; + final int writePacketSize; + + public DNConf(Configuration conf) { + socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, + HdfsServerConstants.READ_TIMEOUT); + socketWriteTimeout = conf.getInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, + HdfsServerConstants.WRITE_TIMEOUT); + socketKeepaliveTimeout = conf.getInt( + DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY, + DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT); + + /* Based on results on different platforms, we might need set the default + * to false on some of them. */ + transferToAllowed = conf.getBoolean( + DFS_DATANODE_TRANSFERTO_ALLOWED_KEY, + DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT); + + writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, + DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT); + + readaheadLength = conf.getLong( + DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY, + DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + dropCacheBehindWrites = conf.getBoolean( + DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY, + DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT); + syncBehindWrites = conf.getBoolean( + DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_KEY, + DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT); + dropCacheBehindReads = conf.getBoolean( + DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY, + DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT); + + this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, + DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT); + + long initBRDelay = conf.getLong( + DFS_BLOCKREPORT_INITIAL_DELAY_KEY, + DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT) * 1000L; + if (initBRDelay >= blockReportInterval) { + initBRDelay = 0; + DataNode.LOG.info("dfs.blockreport.initialDelay is greater than " + + "dfs.blockreport.intervalMsec." + " Setting initial delay to 0 msec:"); + } + initialBlockReportDelay = initBRDelay; + + heartBeatInterval = conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY, + DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000L; + + // do we need to sync block file contents to disk when blockfile is closed? + this.syncOnClose = conf.getBoolean(DFS_DATANODE_SYNCONCLOSE_KEY, + DFS_DATANODE_SYNCONCLOSE_DEFAULT); + + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 09d555b0c61..5fb7f09fc17 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -19,15 +19,8 @@ package org.apache.hadoop.hdfs.server.datanode; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; @@ -51,17 +44,10 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOUR import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SIMULATEDDATASTORAGE_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SIMULATEDDATASTORAGE_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_TRANSFERTO_ALLOWED_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_FEDERATION_NAMESERVICES; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEBHDFS_ENABLED_DEFAULT; import static org.apache.hadoop.hdfs.server.common.Util.now; @@ -104,7 +90,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HDFSPolicyProvider; import org.apache.hadoop.hdfs.HdfsConfiguration; @@ -396,9 +381,7 @@ public class DataNode extends Configured AtomicInteger xmitsInProgress = new AtomicInteger(); Daemon dataXceiverServer = null; ThreadGroup threadGroup = null; - long blockReportInterval; - long initialBlockReportDelay = DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT * 1000L; - long heartBeatInterval; + private DNConf dnConf; private boolean heartbeatsDisabledForTests = false; private DataStorage storage = null; private HttpServer infoServer = null; @@ -408,18 +391,9 @@ public class DataNode extends Configured private volatile String hostName; // Host name of this datanode private static String dnThreadName; - int socketTimeout; - int socketWriteTimeout = 0; - boolean transferToAllowed = true; - private boolean dropCacheBehindWrites = false; - private boolean syncBehindWrites = false; - private boolean dropCacheBehindReads = false; - private long readaheadLength = 0; - int writePacketSize = 0; boolean isBlockTokenEnabled; BlockPoolTokenSecretManager blockPoolTokenSecretManager; - boolean syncOnClose; public DataBlockScanner blockScanner = null; private DirectoryScanner directoryScanner = null; @@ -487,49 +461,6 @@ public class DataNode extends Configured return name; } - private void initConfig(Configuration conf) { - this.socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, - HdfsServerConstants.READ_TIMEOUT); - this.socketWriteTimeout = conf.getInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, - HdfsServerConstants.WRITE_TIMEOUT); - /* Based on results on different platforms, we might need set the default - * to false on some of them. */ - this.transferToAllowed = conf.getBoolean( - DFS_DATANODE_TRANSFERTO_ALLOWED_KEY, - DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT); - this.writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, - DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT); - - this.readaheadLength = conf.getLong( - DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY, - DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); - this.dropCacheBehindWrites = conf.getBoolean( - DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY, - DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT); - this.syncBehindWrites = conf.getBoolean( - DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_KEY, - DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT); - this.dropCacheBehindReads = conf.getBoolean( - DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY, - DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT); - - this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, - DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT); - this.initialBlockReportDelay = conf.getLong( - DFS_BLOCKREPORT_INITIAL_DELAY_KEY, - DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT) * 1000L; - if (this.initialBlockReportDelay >= blockReportInterval) { - this.initialBlockReportDelay = 0; - LOG.info("dfs.blockreport.initialDelay is greater than " + - "dfs.blockreport.intervalMsec." + " Setting initial delay to 0 msec:"); - } - this.heartBeatInterval = conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY, - DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000L; - - // do we need to sync block file contents to disk when blockfile is closed? - this.syncOnClose = conf.getBoolean(DFS_DATANODE_SYNCONCLOSE_KEY, - DFS_DATANODE_SYNCONCLOSE_DEFAULT); - } private void startInfoServer(Configuration conf) throws IOException { // create a servlet to serve full-file content @@ -688,7 +619,7 @@ public class DataNode extends Configured // find free port or use privileged port provided ServerSocket ss; if(secureResources == null) { - ss = (socketWriteTimeout > 0) ? + ss = (dnConf.socketWriteTimeout > 0) ? ServerSocketChannel.open().socket() : new ServerSocket(); Server.bind(ss, socAddr, 0); } else { @@ -760,11 +691,13 @@ public class DataNode extends Configured private volatile boolean shouldServiceRun = true; UpgradeManagerDatanode upgradeManager = null; private final DataNode dn; + private final DNConf dnConf; BPOfferService(InetSocketAddress nnAddr, DataNode dn) { this.dn = dn; this.bpRegistration = dn.createRegistration(); this.nnAddr = nnAddr; + this.dnConf = dn.getDnConf(); } /** @@ -866,9 +799,9 @@ public class DataNode extends Configured void scheduleBlockReport(long delay) { if (delay > 0) { // send BR after random delay lastBlockReport = System.currentTimeMillis() - - ( dn.blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay))); + - ( dnConf.blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay))); } else { // send at next heartbeat - lastBlockReport = lastHeartbeat - dn.blockReportInterval; + lastBlockReport = lastHeartbeat - dnConf.blockReportInterval; } resetBlockReportTime = true; // reset future BRs for randomness } @@ -965,7 +898,7 @@ public class DataNode extends Configured // send block report if timer has expired. DatanodeCommand cmd = null; long startTime = now(); - if (startTime - lastBlockReport > dn.blockReportInterval) { + if (startTime - lastBlockReport > dnConf.blockReportInterval) { // Create block report long brCreateStartTime = now(); @@ -987,7 +920,7 @@ public class DataNode extends Configured // If we have sent the first block report, then wait a random // time before we start the periodic block reports. if (resetBlockReportTime) { - lastBlockReport = startTime - DFSUtil.getRandom().nextInt((int)(dn.blockReportInterval)); + lastBlockReport = startTime - DFSUtil.getRandom().nextInt((int)(dnConf.blockReportInterval)); resetBlockReportTime = false; } else { /* say the last block report was at 8:20:14. The current report @@ -997,7 +930,7 @@ public class DataNode extends Configured * 2) unexpected like 11:35:43, next report should be at 12:20:14 */ lastBlockReport += (now() - lastBlockReport) / - dn.blockReportInterval * dn.blockReportInterval; + dnConf.blockReportInterval * dnConf.blockReportInterval; } LOG.info("sent block report, processed command:" + cmd); } @@ -1059,9 +992,9 @@ public class DataNode extends Configured */ private void offerService() throws Exception { LOG.info("For namenode " + nnAddr + " using BLOCKREPORT_INTERVAL of " - + dn.blockReportInterval + "msec" + " Initial delay: " - + dn.initialBlockReportDelay + "msec" + "; heartBeatInterval=" - + dn.heartBeatInterval); + + dnConf.blockReportInterval + "msec" + " Initial delay: " + + dnConf.initialBlockReportDelay + "msec" + "; heartBeatInterval=" + + dnConf.heartBeatInterval); // // Now loop for a long time.... @@ -1073,7 +1006,7 @@ public class DataNode extends Configured // // Every so often, send heartbeat or block-report // - if (startTime - lastHeartbeat > dn.heartBeatInterval) { + if (startTime - lastHeartbeat > dnConf.heartBeatInterval) { // // All heartbeat messages include following info: // -- Datanode name @@ -1111,7 +1044,7 @@ public class DataNode extends Configured // There is no work to do; sleep until hearbeat timer elapses, // or work arrives, and then iterate again. // - long waitTime = dn.heartBeatInterval - + long waitTime = dnConf.heartBeatInterval - (System.currentTimeMillis() - lastHeartbeat); synchronized(receivedBlockList) { if (waitTime > 0 && receivedBlockList.size() == 0) { @@ -1134,7 +1067,7 @@ public class DataNode extends Configured } LOG.warn("RemoteException in offerService", re); try { - long sleepTime = Math.min(1000, dn.heartBeatInterval); + long sleepTime = Math.min(1000, dnConf.heartBeatInterval); Thread.sleep(sleepTime); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); @@ -1202,7 +1135,7 @@ public class DataNode extends Configured LOG.info("in register:" + ";bpDNR="+bpRegistration.storageInfo); // random short delay - helps scatter the BR from all DNs - scheduleBlockReport(dn.initialBlockReportDelay); + scheduleBlockReport(dnConf.initialBlockReportDelay); } @@ -1412,11 +1345,11 @@ public class DataNode extends Configured this.secureResources = resources; this.dataDirs = dataDirs; this.conf = conf; + this.dnConf = new DNConf(conf); storage = new DataStorage(); // global DN settings - initConfig(conf); registerMXBean(); initDataXceiver(conf); startInfoServer(conf); @@ -1664,7 +1597,7 @@ public class DataNode extends Configured * Creates either NIO or regular depending on socketWriteTimeout. */ protected Socket newSocket() throws IOException { - return (socketWriteTimeout > 0) ? + return (dnConf.socketWriteTimeout > 0) ? SocketChannel.open().socket() : new Socket(); } @@ -2091,10 +2024,10 @@ public class DataNode extends Configured InetSocketAddress curTarget = NetUtils.createSocketAddr(targets[0].getName()); sock = newSocket(); - NetUtils.connect(sock, curTarget, socketTimeout); - sock.setSoTimeout(targets.length * socketTimeout); + NetUtils.connect(sock, curTarget, dnConf.socketTimeout); + sock.setSoTimeout(targets.length * dnConf.socketTimeout); - long writeTimeout = socketWriteTimeout + + long writeTimeout = dnConf.socketWriteTimeout + HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1); OutputStream baseStream = NetUtils.getOutputStream(sock, writeTimeout); out = new DataOutputStream(new BufferedOutputStream(baseStream, @@ -2537,7 +2470,7 @@ public class DataNode extends Configured DatanodeRegistration bpReg = bpos.bpRegistration; InterDatanodeProtocol datanode = bpReg.equals(id)? this: DataNode.createInterDataNodeProtocolProxy(id, getConf(), - socketTimeout); + dnConf.socketTimeout); ReplicaRecoveryInfo info = callInitReplicaRecovery(datanode, rBlock); if (info != null && info.getGenerationStamp() >= block.getGenerationStamp() && @@ -2926,20 +2859,8 @@ public class DataNode extends Configured (DataXceiverServer) this.dataXceiverServer.getRunnable(); return dxcs.balanceThrottler.getBandwidth(); } - - long getReadaheadLength() { - return readaheadLength; - } - - boolean shouldDropCacheBehindWrites() { - return dropCacheBehindWrites; - } - - boolean shouldDropCacheBehindReads() { - return dropCacheBehindReads; - } - boolean shouldSyncBehindWrites() { - return syncBehindWrites; + DNConf getDnConf() { + return dnConf; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index d6a3963c0b1..11282a5b7ad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -38,7 +38,6 @@ import java.nio.channels.ClosedChannelException; import java.util.Arrays; import org.apache.commons.logging.Log; -import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; @@ -82,9 +81,9 @@ class DataXceiver extends Receiver implements Runnable { private final String remoteAddress; // address of remote side private final String localAddress; // local address of this daemon private final DataNode datanode; + private final DNConf dnConf; private final DataXceiverServer dataXceiverServer; - private int socketKeepaliveTimeout; private long opStartTime; //the start time of receiving an Op public DataXceiver(Socket s, DataNode datanode, @@ -95,14 +94,11 @@ class DataXceiver extends Receiver implements Runnable { this.s = s; this.isLocal = s.getInetAddress().equals(s.getLocalAddress()); this.datanode = datanode; + this.dnConf = datanode.getDnConf(); this.dataXceiverServer = dataXceiverServer; remoteAddress = s.getRemoteSocketAddress().toString(); localAddress = s.getLocalSocketAddress().toString(); - socketKeepaliveTimeout = datanode.getConf().getInt( - DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY, - DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT); - if (LOG.isDebugEnabled()) { LOG.debug("Number of active connections is: " + datanode.getXceiverCount()); @@ -144,8 +140,8 @@ class DataXceiver extends Receiver implements Runnable { try { if (opsProcessed != 0) { - assert socketKeepaliveTimeout > 0; - s.setSoTimeout(socketKeepaliveTimeout); + assert dnConf.socketKeepaliveTimeout > 0; + s.setSoTimeout(dnConf.socketKeepaliveTimeout); } op = readOp(); } catch (InterruptedIOException ignored) { @@ -180,7 +176,7 @@ class DataXceiver extends Receiver implements Runnable { opStartTime = now(); processOp(op); ++opsProcessed; - } while (!s.isClosed() && socketKeepaliveTimeout > 0); + } while (!s.isClosed() && dnConf.socketKeepaliveTimeout > 0); } catch (Throwable t) { LOG.error(datanode.getMachineName() + ":DataXceiver error processing " + ((op == null) ? "unknown" : op.name()) + " operation " + @@ -205,7 +201,7 @@ class DataXceiver extends Receiver implements Runnable { final long blockOffset, final long length) throws IOException { OutputStream baseStream = NetUtils.getOutputStream(s, - datanode.socketWriteTimeout); + dnConf.socketWriteTimeout); DataOutputStream out = new DataOutputStream(new BufferedOutputStream( baseStream, HdfsConstants.SMALL_BUFFER_SIZE)); checkAccess(out, true, block, blockToken, @@ -231,13 +227,13 @@ class DataXceiver extends Receiver implements Runnable { } catch(IOException e) { String msg = "opReadBlock " + block + " received exception " + e; LOG.info(msg); - sendResponse(s, ERROR, msg, datanode.socketWriteTimeout); + sendResponse(s, ERROR, msg, dnConf.socketWriteTimeout); throw e; } // send op status writeSuccessWithChecksumInfo(blockSender, - getStreamWithTimeout(s, datanode.socketWriteTimeout)); + getStreamWithTimeout(s, dnConf.socketWriteTimeout)); long read = blockSender.sendBlock(out, baseStream, null); // send data @@ -335,7 +331,7 @@ class DataXceiver extends Receiver implements Runnable { // reply to upstream datanode or client final DataOutputStream replyOut = new DataOutputStream( new BufferedOutputStream( - NetUtils.getOutputStream(s, datanode.socketWriteTimeout), + NetUtils.getOutputStream(s, dnConf.socketWriteTimeout), HdfsConstants.SMALL_BUFFER_SIZE)); checkAccess(replyOut, isClient, block, blockToken, Op.WRITE_BLOCK, BlockTokenSecretManager.AccessMode.WRITE); @@ -370,9 +366,9 @@ class DataXceiver extends Receiver implements Runnable { mirrorTarget = NetUtils.createSocketAddr(mirrorNode); mirrorSock = datanode.newSocket(); try { - int timeoutValue = datanode.socketTimeout + int timeoutValue = dnConf.socketTimeout + (HdfsServerConstants.READ_TIMEOUT_EXTENSION * targets.length); - int writeTimeout = datanode.socketWriteTimeout + + int writeTimeout = dnConf.socketWriteTimeout + (HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * targets.length); NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue); mirrorSock.setSoTimeout(timeoutValue); @@ -508,7 +504,7 @@ class DataXceiver extends Receiver implements Runnable { updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk); final DataOutputStream out = new DataOutputStream( - NetUtils.getOutputStream(s, datanode.socketWriteTimeout)); + NetUtils.getOutputStream(s, dnConf.socketWriteTimeout)); try { datanode.transferReplicaForPipelineRecovery(blk, targets, clientName); writeResponse(Status.SUCCESS, null, out); @@ -521,7 +517,7 @@ class DataXceiver extends Receiver implements Runnable { public void blockChecksum(final ExtendedBlock block, final Token blockToken) throws IOException { final DataOutputStream out = new DataOutputStream( - NetUtils.getOutputStream(s, datanode.socketWriteTimeout)); + NetUtils.getOutputStream(s, dnConf.socketWriteTimeout)); checkAccess(out, true, block, blockToken, Op.BLOCK_CHECKSUM, BlockTokenSecretManager.AccessMode.READ); updateCurrentThreadName("Reading metadata for block " + block); @@ -581,7 +577,7 @@ class DataXceiver extends Receiver implements Runnable { LOG.warn("Invalid access token in request from " + remoteAddress + " for OP_COPY_BLOCK for block " + block + " : " + e.getLocalizedMessage()); - sendResponse(s, ERROR_ACCESS_TOKEN, "Invalid access token", datanode.socketWriteTimeout); + sendResponse(s, ERROR_ACCESS_TOKEN, "Invalid access token", dnConf.socketWriteTimeout); return; } @@ -591,7 +587,7 @@ class DataXceiver extends Receiver implements Runnable { String msg = "Not able to copy block " + block.getBlockId() + " to " + s.getRemoteSocketAddress() + " because threads quota is exceeded."; LOG.info(msg); - sendResponse(s, ERROR, msg, datanode.socketWriteTimeout); + sendResponse(s, ERROR, msg, dnConf.socketWriteTimeout); return; } @@ -606,7 +602,7 @@ class DataXceiver extends Receiver implements Runnable { // set up response stream OutputStream baseStream = NetUtils.getOutputStream( - s, datanode.socketWriteTimeout); + s, dnConf.socketWriteTimeout); reply = new DataOutputStream(new BufferedOutputStream( baseStream, HdfsConstants.SMALL_BUFFER_SIZE)); @@ -659,7 +655,7 @@ class DataXceiver extends Receiver implements Runnable { + " for OP_REPLACE_BLOCK for block " + block + " : " + e.getLocalizedMessage()); sendResponse(s, ERROR_ACCESS_TOKEN, "Invalid access token", - datanode.socketWriteTimeout); + dnConf.socketWriteTimeout); return; } } @@ -668,7 +664,7 @@ class DataXceiver extends Receiver implements Runnable { String msg = "Not able to receive block " + block.getBlockId() + " from " + s.getRemoteSocketAddress() + " because threads quota is exceeded."; LOG.warn(msg); - sendResponse(s, ERROR, msg, datanode.socketWriteTimeout); + sendResponse(s, ERROR, msg, dnConf.socketWriteTimeout); return; } @@ -684,11 +680,11 @@ class DataXceiver extends Receiver implements Runnable { InetSocketAddress proxyAddr = NetUtils.createSocketAddr( proxySource.getName()); proxySock = datanode.newSocket(); - NetUtils.connect(proxySock, proxyAddr, datanode.socketTimeout); - proxySock.setSoTimeout(datanode.socketTimeout); + NetUtils.connect(proxySock, proxyAddr, dnConf.socketTimeout); + proxySock.setSoTimeout(dnConf.socketTimeout); OutputStream baseStream = NetUtils.getOutputStream(proxySock, - datanode.socketWriteTimeout); + dnConf.socketWriteTimeout); proxyOut = new DataOutputStream(new BufferedOutputStream(baseStream, HdfsConstants.SMALL_BUFFER_SIZE)); @@ -750,7 +746,7 @@ class DataXceiver extends Receiver implements Runnable { // send response back try { - sendResponse(s, opStatus, errMsg, datanode.socketWriteTimeout); + sendResponse(s, opStatus, errMsg, dnConf.socketWriteTimeout); } catch (IOException ioe) { LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress()); } @@ -826,7 +822,7 @@ class DataXceiver extends Receiver implements Runnable { if (reply) { if (out == null) { out = new DataOutputStream( - NetUtils.getOutputStream(s, datanode.socketWriteTimeout)); + NetUtils.getOutputStream(s, dnConf.socketWriteTimeout)); } BlockOpResponseProto.Builder resp = BlockOpResponseProto.newBuilder() diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java index fe85babe7f8..61b154069e9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java @@ -154,7 +154,7 @@ public class TestInterDatanodeProtocol { //connect to a data node DataNode datanode = cluster.getDataNode(datanodeinfo[0].getIpcPort()); InterDatanodeProtocol idp = DataNode.createInterDataNodeProtocolProxy( - datanodeinfo[0], conf, datanode.socketTimeout); + datanodeinfo[0], conf, datanode.getDnConf().socketTimeout); assertTrue(datanode != null); //stop block scanner, so we could compare lastScanTime