HDFS-2092. Remove some object references to Configuration in DFSClient. Contributed by Bharath Mundlapudi

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1139097 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2011-06-23 22:24:59 +00:00
parent 42863b9baf
commit fd9997989c
4 changed files with 113 additions and 72 deletions

View File

@ -523,6 +523,9 @@ Trunk (unreleased changes)
HDFS-2100. Improve TestStorageRestore. (atm)
HDFS-2092. Remove some object references to Configuration in DFSClient.
(Bharath Mundlapudi via szetszwo)
OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

View File

@ -128,17 +128,85 @@ public class DFSClient implements FSConstants, java.io.Closeable {
static Random r = new Random();
final String clientName;
Configuration conf;
long defaultBlockSize;
private short defaultReplication;
SocketFactory socketFactory;
int socketTimeout;
final int writePacketSize;
final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
final FileSystem.Statistics stats;
final int hdfsTimeout; // timeout value for a DFS operation.
final LeaseRenewer leaserenewer;
final SocketCache socketCache;
final Conf dfsClientConf;
/**
* DFSClient configuration
*/
static class Conf {
final int maxBlockAcquireFailures;
final int confTime;
final int ioBufferSize;
final int bytesPerChecksum;
final int writePacketSize;
final int socketTimeout;
final int socketCacheCapacity;
/** Wait time window (in msec) if BlockMissingException is caught */
final int timeWindow;
final int nCachedConnRetry;
final int nBlockWriteRetry;
final int nBlockWriteLocateFollowingRetry;
final long defaultBlockSize;
final long prefetchSize;
final short defaultReplication;
final String taskId;
Conf(Configuration conf) {
maxBlockAcquireFailures = conf.getInt(
DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY,
DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT);
confTime = conf.getInt(
DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
HdfsConstants.WRITE_TIMEOUT);
ioBufferSize = conf.getInt(
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
bytesPerChecksum = conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT);
socketTimeout = conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
HdfsConstants.READ_TIMEOUT);
/** dfs.write.packet.size is an internal config variable */
writePacketSize = conf.getInt(
DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
defaultBlockSize = conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
DEFAULT_BLOCK_SIZE);
defaultReplication = (short) conf.getInt(
DFSConfigKeys.DFS_REPLICATION_KEY,
DFSConfigKeys.DFS_REPLICATION_DEFAULT);
taskId = conf.get("mapreduce.task.attempt.id", "NONMAPREDUCE");
socketCacheCapacity = conf.getInt(
DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY,
DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT);
prefetchSize = conf.getLong(
DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY,
10 * defaultBlockSize);
timeWindow = conf
.getInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 3000);
nCachedConnRetry = conf.getInt(
DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY,
DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT);
nBlockWriteRetry = conf.getInt(
DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY,
DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT);
nBlockWriteLocateFollowingRetry = conf
.getInt(
DFSConfigKeys
.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY,
DFSConfigKeys
.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT);
}
}
Conf getConf() {
return dfsClientConf;
}
/**
* A map from file names to {@link DFSOutputStream} objects
@ -257,16 +325,11 @@ public class DFSClient implements FSConstants, java.io.Closeable {
DFSClient(InetSocketAddress nameNodeAddr, ClientProtocol rpcNamenode,
Configuration conf, FileSystem.Statistics stats)
throws IOException {
// Copy only the required DFSClient configuration
this.dfsClientConf = new Conf(conf);
this.conf = conf;
this.stats = stats;
this.socketTimeout =
conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
HdfsConstants.READ_TIMEOUT);
this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
// dfs.write.packet.size is an internal config variable
this.writePacketSize =
conf.getInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
// The hdfsTimeout is currently the same as the ipc timeout
@ -275,19 +338,8 @@ public class DFSClient implements FSConstants, java.io.Closeable {
final String authority = nameNodeAddr == null? "null":
nameNodeAddr.getHostName() + ":" + nameNodeAddr.getPort();
this.leaserenewer = LeaseRenewer.getInstance(authority, ugi, this);
String taskId = conf.get("mapreduce.task.attempt.id", "NONMAPREDUCE");
this.clientName = leaserenewer.getClientName(taskId);
defaultBlockSize = conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
defaultReplication = (short)
conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
DFSConfigKeys.DFS_REPLICATION_DEFAULT);
this.socketCache = new SocketCache(
conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY,
DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT));
this.clientName = leaserenewer.getClientName(dfsClientConf.taskId);
this.socketCache = new SocketCache(dfsClientConf.socketCacheCapacity);
if (nameNodeAddr != null && rpcNamenode == null) {
this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);
this.namenode = createNamenode(this.rpcNamenode);
@ -306,8 +358,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
* to retrieve block locations when reading.
*/
int getMaxBlockAcquireFailures() {
return conf.getInt(DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY,
DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT);
return dfsClientConf.maxBlockAcquireFailures;
}
/**
@ -315,18 +366,14 @@ public class DFSClient implements FSConstants, java.io.Closeable {
* @param numNodes the number of nodes in the pipeline.
*/
int getDatanodeWriteTimeout(int numNodes) {
int confTime =
conf.getInt(DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
HdfsConstants.WRITE_TIMEOUT);
return (confTime > 0) ?
(confTime + HdfsConstants.WRITE_TIMEOUT_EXTENSION * numNodes) : 0;
return (dfsClientConf.confTime > 0) ?
(dfsClientConf.confTime + HdfsConstants.WRITE_TIMEOUT_EXTENSION * numNodes) : 0;
}
int getDatanodeReadTimeout(int numNodes) {
return socketTimeout > 0 ?
return dfsClientConf.socketTimeout > 0 ?
(HdfsConstants.READ_TIMEOUT_EXTENSION * numNodes +
socketTimeout) : 0;
dfsClientConf.socketTimeout) : 0;
}
int getHdfsTimeout() {
@ -430,7 +477,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
* @return the default block size in bytes
*/
public long getDefaultBlockSize() {
return defaultBlockSize;
return dfsClientConf.defaultBlockSize;
}
/**
@ -528,7 +575,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
}
public short getDefaultReplication() {
return defaultReplication;
return dfsClientConf.defaultReplication;
}
/**
@ -583,7 +630,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
public DFSInputStream open(String src)
throws IOException, UnresolvedLinkException {
return open(src, conf.getInt("io.file.buffer.size", 4096), true, null);
return open(src, dfsClientConf.ioBufferSize, true, null);
}
/**
@ -629,7 +676,8 @@ public class DFSClient implements FSConstants, java.io.Closeable {
*/
public OutputStream create(String src, boolean overwrite)
throws IOException {
return create(src, overwrite, defaultReplication, defaultBlockSize, null);
return create(src, overwrite, dfsClientConf.defaultReplication,
dfsClientConf.defaultBlockSize, null);
}
/**
@ -639,7 +687,8 @@ public class DFSClient implements FSConstants, java.io.Closeable {
public OutputStream create(String src,
boolean overwrite,
Progressable progress) throws IOException {
return create(src, overwrite, defaultReplication, defaultBlockSize, progress);
return create(src, overwrite, dfsClientConf.defaultReplication,
dfsClientConf.defaultBlockSize, progress);
}
/**
@ -660,7 +709,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
public OutputStream create(String src, boolean overwrite, short replication,
long blockSize, Progressable progress) throws IOException {
return create(src, overwrite, replication, blockSize, progress,
conf.getInt("io.file.buffer.size", 4096));
dfsClientConf.ioBufferSize);
}
/**
@ -744,10 +793,9 @@ public class DFSClient implements FSConstants, java.io.Closeable {
if(LOG.isDebugEnabled()) {
LOG.debug(src + ": masked=" + masked);
}
final DFSOutputStream result = new DFSOutputStream(this, src, masked,
flag, createParent, replication, blockSize, progress, buffersize,
conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT));
final DFSOutputStream result = new DFSOutputStream(this, src, masked, flag,
createParent, replication, blockSize, progress, buffersize,
dfsClientConf.bytesPerChecksum);
leaserenewer.put(src, result, this);
return result;
}
@ -851,8 +899,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
UnresolvedPathException.class);
}
return new DFSOutputStream(this, src, buffersize, progress,
lastBlock, stat, conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT));
lastBlock, stat, dfsClientConf.bytesPerChecksum);
}
/**
@ -1061,7 +1108,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
*/
public MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException {
checkOpen();
return getFileChecksum(src, namenode, socketFactory, socketTimeout);
return getFileChecksum(src, namenode, socketFactory, dfsClientConf.socketTimeout);
}
/**

View File

@ -83,7 +83,7 @@ public class DFSInputStream extends FSInputStream {
* capped at maxBlockAcquireFailures
*/
private int failures = 0;
private int timeWindow = 3000; // wait time window (in msec) if BlockMissingException is caught
private int timeWindow;
/* XXX Use of CocurrentHashMap is temp fix. Need to fix
* parallel accesses to DFSInputStream (through ptreads) properly */
@ -106,13 +106,9 @@ public class DFSInputStream extends FSInputStream {
this.buffersize = buffersize;
this.src = src;
this.socketCache = dfsClient.socketCache;
prefetchSize = this.dfsClient.conf.getLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY,
10 * dfsClient.defaultBlockSize);
timeWindow = this.dfsClient.conf.getInt(
DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, timeWindow);
nCachedConnRetry = this.dfsClient.conf.getInt(
DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY,
DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT);
prefetchSize = dfsClient.getConf().prefetchSize;
timeWindow = dfsClient.getConf().timeWindow;
nCachedConnRetry = dfsClient.getConf().nCachedConnRetry;
openInfo();
}
@ -163,7 +159,7 @@ public class DFSInputStream extends FSInputStream {
try {
cdp = DFSClient.createClientDatanodeProtocolProxy(
datanode, dfsClient.conf, dfsClient.socketTimeout, locatedblock);
datanode, dfsClient.conf, dfsClient.getConf().socketTimeout, locatedblock);
final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock());
@ -771,8 +767,8 @@ public class DFSInputStream extends FSInputStream {
// disaster.
sock.setTcpNoDelay(true);
NetUtils.connect(sock, dnAddr, dfsClient.socketTimeout);
sock.setSoTimeout(dfsClient.socketTimeout);
NetUtils.connect(sock, dnAddr, dfsClient.getConf().socketTimeout);
sock.setSoTimeout(dfsClient.getConf().socketTimeout);
}
try {

View File

@ -103,7 +103,6 @@ import org.apache.hadoop.util.StringUtils;
****************************************************************/
class DFSOutputStream extends FSOutputSummer implements Syncable {
private final DFSClient dfsClient;
private Configuration conf;
private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB
private Socket s;
// closed is accessed by different threads under different locks.
@ -355,7 +354,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
// that expected size of of a packet, then create
// smaller size packet.
//
computePacketChunkSize(Math.min(dfsClient.writePacketSize, freeInLastBlock),
computePacketChunkSize(Math.min(dfsClient.getConf().writePacketSize, freeInLastBlock),
bytesPerChecksum);
}
@ -426,8 +425,8 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
&& dataQueue.size() == 0 &&
(stage != BlockConstructionStage.DATA_STREAMING ||
stage == BlockConstructionStage.DATA_STREAMING &&
now - lastPacket < dfsClient.socketTimeout/2)) || doSleep ) {
long timeout = dfsClient.socketTimeout/2 - (now-lastPacket);
now - lastPacket < dfsClient.getConf().socketTimeout/2)) || doSleep ) {
long timeout = dfsClient.getConf().socketTimeout/2 - (now-lastPacket);
timeout = timeout <= 0 ? 1000 : timeout;
timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
timeout : 1000;
@ -953,8 +952,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
private DatanodeInfo[] nextBlockOutputStream(String client) throws IOException {
LocatedBlock lb = null;
DatanodeInfo[] nodes = null;
int count = conf.getInt(DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY,
DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT);
int count = dfsClient.getConf().nBlockWriteRetry;
boolean success = false;
do {
hasError = false;
@ -1079,9 +1077,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
private LocatedBlock locateFollowingBlock(long start,
DatanodeInfo[] excludedNodes)
throws IOException, UnresolvedLinkException {
int retries =
conf.getInt(DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY,
DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT);
int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
long sleeptime = 400;
while (true) {
long localstart = System.currentTimeMillis();
@ -1201,7 +1197,6 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
int bytesPerChecksum, short replication) throws IOException {
super(new PureJavaCrc32(), bytesPerChecksum, 4);
this.dfsClient = dfsClient;
this.conf = dfsClient.conf;
this.src = src;
this.blockSize = blockSize;
this.blockReplication = replication;
@ -1232,7 +1227,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
throws IOException {
this(dfsClient, src, blockSize, progress, bytesPerChecksum, replication);
computePacketChunkSize(dfsClient.writePacketSize, bytesPerChecksum);
computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
try {
dfsClient.namenode.create(
@ -1269,7 +1264,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
bytesCurBlock = lastBlock.getBlockSize();
streamer = new DataStreamer(lastBlock, stat, bytesPerChecksum);
} else {
computePacketChunkSize(dfsClient.writePacketSize, bytesPerChecksum);
computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
streamer = new DataStreamer();
}
streamer.start();
@ -1385,7 +1380,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
}
if (!appendChunk) {
int psize = Math.min((int)(blockSize-bytesCurBlock), dfsClient.writePacketSize);
int psize = Math.min((int)(blockSize-bytesCurBlock), dfsClient.getConf().writePacketSize);
computePacketChunkSize(psize, bytesPerChecksum);
}
//