From fd9997989c1f1c6f806c57a806e7225ca599fc0c Mon Sep 17 00:00:00 2001 From: Tsz-wo Sze Date: Thu, 23 Jun 2011 22:24:59 +0000 Subject: [PATCH] HDFS-2092. Remove some object references to Configuration in DFSClient. Contributed by Bharath Mundlapudi git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1139097 13f79535-47bb-0310-9956-ffa450edef68 --- hdfs/CHANGES.txt | 3 + .../org/apache/hadoop/hdfs/DFSClient.java | 143 ++++++++++++------ .../apache/hadoop/hdfs/DFSInputStream.java | 18 +-- .../apache/hadoop/hdfs/DFSOutputStream.java | 21 +-- 4 files changed, 113 insertions(+), 72 deletions(-) diff --git a/hdfs/CHANGES.txt b/hdfs/CHANGES.txt index 016ab754b8e..a0274918efe 100644 --- a/hdfs/CHANGES.txt +++ b/hdfs/CHANGES.txt @@ -523,6 +523,9 @@ Trunk (unreleased changes) HDFS-2100. Improve TestStorageRestore. (atm) + HDFS-2092. Remove some object references to Configuration in DFSClient. + (Bharath Mundlapudi via szetszwo) + OPTIMIZATIONS HDFS-1458. Improve checkpoint performance by avoiding unnecessary image diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java b/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java index f88171893f6..a29cf88b8af 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java @@ -128,17 +128,85 @@ public class DFSClient implements FSConstants, java.io.Closeable { static Random r = new Random(); final String clientName; Configuration conf; - long defaultBlockSize; - private short defaultReplication; SocketFactory socketFactory; - int socketTimeout; - final int writePacketSize; final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure; final FileSystem.Statistics stats; final int hdfsTimeout; // timeout value for a DFS operation. final LeaseRenewer leaserenewer; - final SocketCache socketCache; + final Conf dfsClientConf; + + /** + * DFSClient configuration + */ + static class Conf { + final int maxBlockAcquireFailures; + final int confTime; + final int ioBufferSize; + final int bytesPerChecksum; + final int writePacketSize; + final int socketTimeout; + final int socketCacheCapacity; + /** Wait time window (in msec) if BlockMissingException is caught */ + final int timeWindow; + final int nCachedConnRetry; + final int nBlockWriteRetry; + final int nBlockWriteLocateFollowingRetry; + final long defaultBlockSize; + final long prefetchSize; + final short defaultReplication; + final String taskId; + + Conf(Configuration conf) { + maxBlockAcquireFailures = conf.getInt( + DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY, + DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT); + confTime = conf.getInt( + DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, + HdfsConstants.WRITE_TIMEOUT); + ioBufferSize = conf.getInt( + CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, + CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT); + bytesPerChecksum = conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, + DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT); + socketTimeout = conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, + HdfsConstants.READ_TIMEOUT); + /** dfs.write.packet.size is an internal config variable */ + writePacketSize = conf.getInt( + DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, + DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT); + defaultBlockSize = conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, + DEFAULT_BLOCK_SIZE); + defaultReplication = (short) conf.getInt( + DFSConfigKeys.DFS_REPLICATION_KEY, + DFSConfigKeys.DFS_REPLICATION_DEFAULT); + taskId = conf.get("mapreduce.task.attempt.id", "NONMAPREDUCE"); + socketCacheCapacity = conf.getInt( + DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, + DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT); + prefetchSize = conf.getLong( + DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, + 10 * defaultBlockSize); + timeWindow = conf + .getInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 3000); + nCachedConnRetry = conf.getInt( + DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY, + DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT); + nBlockWriteRetry = conf.getInt( + DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY, + DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT); + nBlockWriteLocateFollowingRetry = conf + .getInt( + DFSConfigKeys + .DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, + DFSConfigKeys + .DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT); + } + } + + Conf getConf() { + return dfsClientConf; + } /** * A map from file names to {@link DFSOutputStream} objects @@ -257,16 +325,11 @@ public class DFSClient implements FSConstants, java.io.Closeable { DFSClient(InetSocketAddress nameNodeAddr, ClientProtocol rpcNamenode, Configuration conf, FileSystem.Statistics stats) throws IOException { + // Copy only the required DFSClient configuration + this.dfsClientConf = new Conf(conf); this.conf = conf; this.stats = stats; - this.socketTimeout = - conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, - HdfsConstants.READ_TIMEOUT); this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class); - // dfs.write.packet.size is an internal config variable - this.writePacketSize = - conf.getInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, - DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT); this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf); // The hdfsTimeout is currently the same as the ipc timeout @@ -275,19 +338,8 @@ public class DFSClient implements FSConstants, java.io.Closeable { final String authority = nameNodeAddr == null? "null": nameNodeAddr.getHostName() + ":" + nameNodeAddr.getPort(); this.leaserenewer = LeaseRenewer.getInstance(authority, ugi, this); - - String taskId = conf.get("mapreduce.task.attempt.id", "NONMAPREDUCE"); - this.clientName = leaserenewer.getClientName(taskId); - - defaultBlockSize = conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE); - defaultReplication = (short) - conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, - DFSConfigKeys.DFS_REPLICATION_DEFAULT); - - this.socketCache = new SocketCache( - conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, - DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT)); - + this.clientName = leaserenewer.getClientName(dfsClientConf.taskId); + this.socketCache = new SocketCache(dfsClientConf.socketCacheCapacity); if (nameNodeAddr != null && rpcNamenode == null) { this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi); this.namenode = createNamenode(this.rpcNamenode); @@ -306,8 +358,7 @@ public class DFSClient implements FSConstants, java.io.Closeable { * to retrieve block locations when reading. */ int getMaxBlockAcquireFailures() { - return conf.getInt(DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY, - DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT); + return dfsClientConf.maxBlockAcquireFailures; } /** @@ -315,18 +366,14 @@ public class DFSClient implements FSConstants, java.io.Closeable { * @param numNodes the number of nodes in the pipeline. */ int getDatanodeWriteTimeout(int numNodes) { - int confTime = - conf.getInt(DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, - HdfsConstants.WRITE_TIMEOUT); - - return (confTime > 0) ? - (confTime + HdfsConstants.WRITE_TIMEOUT_EXTENSION * numNodes) : 0; + return (dfsClientConf.confTime > 0) ? + (dfsClientConf.confTime + HdfsConstants.WRITE_TIMEOUT_EXTENSION * numNodes) : 0; } int getDatanodeReadTimeout(int numNodes) { - return socketTimeout > 0 ? + return dfsClientConf.socketTimeout > 0 ? (HdfsConstants.READ_TIMEOUT_EXTENSION * numNodes + - socketTimeout) : 0; + dfsClientConf.socketTimeout) : 0; } int getHdfsTimeout() { @@ -430,7 +477,7 @@ public class DFSClient implements FSConstants, java.io.Closeable { * @return the default block size in bytes */ public long getDefaultBlockSize() { - return defaultBlockSize; + return dfsClientConf.defaultBlockSize; } /** @@ -528,7 +575,7 @@ public class DFSClient implements FSConstants, java.io.Closeable { } public short getDefaultReplication() { - return defaultReplication; + return dfsClientConf.defaultReplication; } /** @@ -583,7 +630,7 @@ public class DFSClient implements FSConstants, java.io.Closeable { public DFSInputStream open(String src) throws IOException, UnresolvedLinkException { - return open(src, conf.getInt("io.file.buffer.size", 4096), true, null); + return open(src, dfsClientConf.ioBufferSize, true, null); } /** @@ -629,7 +676,8 @@ public class DFSClient implements FSConstants, java.io.Closeable { */ public OutputStream create(String src, boolean overwrite) throws IOException { - return create(src, overwrite, defaultReplication, defaultBlockSize, null); + return create(src, overwrite, dfsClientConf.defaultReplication, + dfsClientConf.defaultBlockSize, null); } /** @@ -639,7 +687,8 @@ public class DFSClient implements FSConstants, java.io.Closeable { public OutputStream create(String src, boolean overwrite, Progressable progress) throws IOException { - return create(src, overwrite, defaultReplication, defaultBlockSize, progress); + return create(src, overwrite, dfsClientConf.defaultReplication, + dfsClientConf.defaultBlockSize, progress); } /** @@ -660,7 +709,7 @@ public class DFSClient implements FSConstants, java.io.Closeable { public OutputStream create(String src, boolean overwrite, short replication, long blockSize, Progressable progress) throws IOException { return create(src, overwrite, replication, blockSize, progress, - conf.getInt("io.file.buffer.size", 4096)); + dfsClientConf.ioBufferSize); } /** @@ -744,10 +793,9 @@ public class DFSClient implements FSConstants, java.io.Closeable { if(LOG.isDebugEnabled()) { LOG.debug(src + ": masked=" + masked); } - final DFSOutputStream result = new DFSOutputStream(this, src, masked, - flag, createParent, replication, blockSize, progress, buffersize, - conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, - DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT)); + final DFSOutputStream result = new DFSOutputStream(this, src, masked, flag, + createParent, replication, blockSize, progress, buffersize, + dfsClientConf.bytesPerChecksum); leaserenewer.put(src, result, this); return result; } @@ -851,8 +899,7 @@ public class DFSClient implements FSConstants, java.io.Closeable { UnresolvedPathException.class); } return new DFSOutputStream(this, src, buffersize, progress, - lastBlock, stat, conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, - DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT)); + lastBlock, stat, dfsClientConf.bytesPerChecksum); } /** @@ -1061,7 +1108,7 @@ public class DFSClient implements FSConstants, java.io.Closeable { */ public MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException { checkOpen(); - return getFileChecksum(src, namenode, socketFactory, socketTimeout); + return getFileChecksum(src, namenode, socketFactory, dfsClientConf.socketTimeout); } /** diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java index 49707aea838..87365747795 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -83,7 +83,7 @@ public class DFSInputStream extends FSInputStream { * capped at maxBlockAcquireFailures */ private int failures = 0; - private int timeWindow = 3000; // wait time window (in msec) if BlockMissingException is caught + private int timeWindow; /* XXX Use of CocurrentHashMap is temp fix. Need to fix * parallel accesses to DFSInputStream (through ptreads) properly */ @@ -106,13 +106,9 @@ public class DFSInputStream extends FSInputStream { this.buffersize = buffersize; this.src = src; this.socketCache = dfsClient.socketCache; - prefetchSize = this.dfsClient.conf.getLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, - 10 * dfsClient.defaultBlockSize); - timeWindow = this.dfsClient.conf.getInt( - DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, timeWindow); - nCachedConnRetry = this.dfsClient.conf.getInt( - DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY, - DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT); + prefetchSize = dfsClient.getConf().prefetchSize; + timeWindow = dfsClient.getConf().timeWindow; + nCachedConnRetry = dfsClient.getConf().nCachedConnRetry; openInfo(); } @@ -163,7 +159,7 @@ public class DFSInputStream extends FSInputStream { try { cdp = DFSClient.createClientDatanodeProtocolProxy( - datanode, dfsClient.conf, dfsClient.socketTimeout, locatedblock); + datanode, dfsClient.conf, dfsClient.getConf().socketTimeout, locatedblock); final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock()); @@ -771,8 +767,8 @@ public class DFSInputStream extends FSInputStream { // disaster. sock.setTcpNoDelay(true); - NetUtils.connect(sock, dnAddr, dfsClient.socketTimeout); - sock.setSoTimeout(dfsClient.socketTimeout); + NetUtils.connect(sock, dnAddr, dfsClient.getConf().socketTimeout); + sock.setSoTimeout(dfsClient.getConf().socketTimeout); } try { diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java index b6e5cc23a98..0d120da33e0 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -103,7 +103,6 @@ import org.apache.hadoop.util.StringUtils; ****************************************************************/ class DFSOutputStream extends FSOutputSummer implements Syncable { private final DFSClient dfsClient; - private Configuration conf; private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB private Socket s; // closed is accessed by different threads under different locks. @@ -355,7 +354,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable { // that expected size of of a packet, then create // smaller size packet. // - computePacketChunkSize(Math.min(dfsClient.writePacketSize, freeInLastBlock), + computePacketChunkSize(Math.min(dfsClient.getConf().writePacketSize, freeInLastBlock), bytesPerChecksum); } @@ -426,8 +425,8 @@ class DFSOutputStream extends FSOutputSummer implements Syncable { && dataQueue.size() == 0 && (stage != BlockConstructionStage.DATA_STREAMING || stage == BlockConstructionStage.DATA_STREAMING && - now - lastPacket < dfsClient.socketTimeout/2)) || doSleep ) { - long timeout = dfsClient.socketTimeout/2 - (now-lastPacket); + now - lastPacket < dfsClient.getConf().socketTimeout/2)) || doSleep ) { + long timeout = dfsClient.getConf().socketTimeout/2 - (now-lastPacket); timeout = timeout <= 0 ? 1000 : timeout; timeout = (stage == BlockConstructionStage.DATA_STREAMING)? timeout : 1000; @@ -953,8 +952,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable { private DatanodeInfo[] nextBlockOutputStream(String client) throws IOException { LocatedBlock lb = null; DatanodeInfo[] nodes = null; - int count = conf.getInt(DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY, - DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT); + int count = dfsClient.getConf().nBlockWriteRetry; boolean success = false; do { hasError = false; @@ -1079,9 +1077,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable { private LocatedBlock locateFollowingBlock(long start, DatanodeInfo[] excludedNodes) throws IOException, UnresolvedLinkException { - int retries = - conf.getInt(DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, - DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT); + int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry; long sleeptime = 400; while (true) { long localstart = System.currentTimeMillis(); @@ -1201,7 +1197,6 @@ class DFSOutputStream extends FSOutputSummer implements Syncable { int bytesPerChecksum, short replication) throws IOException { super(new PureJavaCrc32(), bytesPerChecksum, 4); this.dfsClient = dfsClient; - this.conf = dfsClient.conf; this.src = src; this.blockSize = blockSize; this.blockReplication = replication; @@ -1232,7 +1227,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable { throws IOException { this(dfsClient, src, blockSize, progress, bytesPerChecksum, replication); - computePacketChunkSize(dfsClient.writePacketSize, bytesPerChecksum); + computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum); try { dfsClient.namenode.create( @@ -1269,7 +1264,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable { bytesCurBlock = lastBlock.getBlockSize(); streamer = new DataStreamer(lastBlock, stat, bytesPerChecksum); } else { - computePacketChunkSize(dfsClient.writePacketSize, bytesPerChecksum); + computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum); streamer = new DataStreamer(); } streamer.start(); @@ -1385,7 +1380,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable { } if (!appendChunk) { - int psize = Math.min((int)(blockSize-bytesCurBlock), dfsClient.writePacketSize); + int psize = Math.min((int)(blockSize-bytesCurBlock), dfsClient.getConf().writePacketSize); computePacketChunkSize(psize, bytesPerChecksum); } //