svn merge -c 1494854 from trunk for HDFS-4914. Use DFSClient.Conf instead of Configuration.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1494855 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2013-06-20 04:50:24 +00:00
parent 9c307dcbf4
commit 0536aec248
17 changed files with 173 additions and 178 deletions

View File

@ -178,6 +178,8 @@ Release 2.1.0-beta - UNRELEASED
HDFS-3009. Remove duplicate code in DFSClient#isLocalAddress by using HDFS-3009. Remove duplicate code in DFSClient#isLocalAddress by using
NetUtils. (Hari Mankude via suresh) NetUtils. (Hari Mankude via suresh)
HDFS-4914. Use DFSClient.Conf instead of Configuration. (szetszwo)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -25,7 +25,6 @@ import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -40,7 +39,6 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
@ -75,9 +73,7 @@ public class BlockReaderFactory {
* should be allowed. * should be allowed.
* @return New BlockReader instance * @return New BlockReader instance
*/ */
@SuppressWarnings("deprecation") public static BlockReader newBlockReader(DFSClient.Conf conf,
public static BlockReader newBlockReader(
Configuration conf,
String file, String file,
ExtendedBlock block, ExtendedBlock block,
Token<BlockTokenIdentifier> blockToken, Token<BlockTokenIdentifier> blockToken,
@ -91,14 +87,11 @@ public class BlockReaderFactory {
FileInputStreamCache fisCache, FileInputStreamCache fisCache,
boolean allowShortCircuitLocalReads) boolean allowShortCircuitLocalReads)
throws IOException { throws IOException {
peer.setReadTimeout(conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, peer.setReadTimeout(conf.socketTimeout);
HdfsServerConstants.READ_TIMEOUT));
peer.setWriteTimeout(HdfsServerConstants.WRITE_TIMEOUT); peer.setWriteTimeout(HdfsServerConstants.WRITE_TIMEOUT);
if (peer.getDomainSocket() != null) { if (peer.getDomainSocket() != null) {
if (allowShortCircuitLocalReads && if (allowShortCircuitLocalReads && !conf.useLegacyBlockReaderLocal) {
(!conf.getBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL,
DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT))) {
// If this is a domain socket, and short-circuit local reads are // If this is a domain socket, and short-circuit local reads are
// enabled, try to set up a BlockReaderLocal. // enabled, try to set up a BlockReaderLocal.
BlockReader reader = newShortCircuitBlockReader(conf, file, BlockReader reader = newShortCircuitBlockReader(conf, file,
@ -118,21 +111,19 @@ public class BlockReaderFactory {
// If this is a domain socket and we couldn't (or didn't want to) set // If this is a domain socket and we couldn't (or didn't want to) set
// up a BlockReaderLocal, check that we are allowed to pass data traffic // up a BlockReaderLocal, check that we are allowed to pass data traffic
// over the socket before proceeding. // over the socket before proceeding.
if (!conf.getBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, if (!conf.domainSocketDataTraffic) {
DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT)) {
throw new IOException("Because we can't do short-circuit access, " + throw new IOException("Because we can't do short-circuit access, " +
"and data traffic over domain sockets is disabled, " + "and data traffic over domain sockets is disabled, " +
"we cannot use this socket to talk to " + datanodeID); "we cannot use this socket to talk to " + datanodeID);
} }
} }
if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER, if (conf.useLegacyBlockReader) {
DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT)) { @SuppressWarnings("deprecation")
return RemoteBlockReader.newBlockReader(file, RemoteBlockReader reader = RemoteBlockReader.newBlockReader(file,
block, blockToken, startOffset, len, block, blockToken, startOffset, len, conf.ioBufferSize,
conf.getInt(DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY,
DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT),
verifyChecksum, clientName, peer, datanodeID, peerCache); verifyChecksum, clientName, peer, datanodeID, peerCache);
return reader;
} else { } else {
return RemoteBlockReader2.newBlockReader( return RemoteBlockReader2.newBlockReader(
file, block, blockToken, startOffset, len, file, block, blockToken, startOffset, len,
@ -173,7 +164,7 @@ public class BlockReaderFactory {
* @throws IOException If there was a communication error. * @throws IOException If there was a communication error.
*/ */
private static BlockReaderLocal newShortCircuitBlockReader( private static BlockReaderLocal newShortCircuitBlockReader(
Configuration conf, String file, ExtendedBlock block, DFSClient.Conf conf, String file, ExtendedBlock block,
Token<BlockTokenIdentifier> blockToken, long startOffset, Token<BlockTokenIdentifier> blockToken, long startOffset,
long len, Peer peer, DatanodeID datanodeID, long len, Peer peer, DatanodeID datanodeID,
DomainSocketFactory domSockFactory, boolean verifyChecksum, DomainSocketFactory domSockFactory, boolean verifyChecksum,
@ -245,15 +236,14 @@ public class BlockReaderFactory {
* This block reader implements the path-based style of local reads * This block reader implements the path-based style of local reads
* first introduced in HDFS-2246. * first introduced in HDFS-2246.
*/ */
static BlockReader getLegacyBlockReaderLocal(UserGroupInformation ugi, static BlockReader getLegacyBlockReaderLocal(DFSClient dfsClient,
Configuration conf, String src, ExtendedBlock blk, String src, ExtendedBlock blk,
Token<BlockTokenIdentifier> accessToken, DatanodeInfo chosenNode, Token<BlockTokenIdentifier> accessToken, DatanodeInfo chosenNode,
int socketTimeout, long offsetIntoBlock, long offsetIntoBlock) throws InvalidToken, IOException {
boolean connectToDnViaHostname) throws InvalidToken, IOException {
try { try {
return BlockReaderLocalLegacy.newBlockReader(ugi, conf, src, final long length = blk.getNumBytes() - offsetIntoBlock;
blk, accessToken, chosenNode, socketTimeout, offsetIntoBlock, return BlockReaderLocalLegacy.newBlockReader(dfsClient, src, blk,
blk.getNumBytes() - offsetIntoBlock, connectToDnViaHostname); accessToken, chosenNode, offsetIntoBlock, length);
} catch (RemoteException re) { } catch (RemoteException re) {
throw re.unwrapRemoteException(InvalidToken.class, throw re.unwrapRemoteException(InvalidToken.class,
AccessControlException.class); AccessControlException.class);

View File

@ -17,10 +17,8 @@
*/ */
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import java.io.DataInputStream;
import org.apache.hadoop.conf.Configuration;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -90,13 +88,8 @@ class BlockReaderLocal implements BlockReader {
private final FileInputStreamCache fisCache; private final FileInputStreamCache fisCache;
private static int getSlowReadBufferNumChunks(Configuration conf, private static int getSlowReadBufferNumChunks(int bufSize,
int bytesPerChecksum) { int bytesPerChecksum) {
int bufSize =
conf.getInt(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT);
if (bufSize < bytesPerChecksum) { if (bufSize < bytesPerChecksum) {
throw new IllegalArgumentException("Configured BlockReaderLocal buffer size (" + throw new IllegalArgumentException("Configured BlockReaderLocal buffer size (" +
bufSize + ") is not large enough to hold a single chunk (" + bufSize + ") is not large enough to hold a single chunk (" +
@ -108,7 +101,7 @@ class BlockReaderLocal implements BlockReader {
return bufSize / bytesPerChecksum; return bufSize / bytesPerChecksum;
} }
public BlockReaderLocal(Configuration conf, String filename, public BlockReaderLocal(DFSClient.Conf conf, String filename,
ExtendedBlock block, long startOffset, long length, ExtendedBlock block, long startOffset, long length,
FileInputStream dataIn, FileInputStream checksumIn, FileInputStream dataIn, FileInputStream checksumIn,
DatanodeID datanodeID, boolean verifyChecksum, DatanodeID datanodeID, boolean verifyChecksum,
@ -132,13 +125,7 @@ class BlockReaderLocal implements BlockReader {
throw new IOException("Wrong version (" + version + ") of the " + throw new IOException("Wrong version (" + version + ") of the " +
"metadata file for " + filename + "."); "metadata file for " + filename + ".");
} }
if (!verifyChecksum) { this.verifyChecksum = verifyChecksum && !conf.skipShortCircuitChecksums;
this.verifyChecksum = false;
} else {
this.verifyChecksum = !conf.getBoolean(DFSConfigKeys.
DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
}
long firstChunkOffset; long firstChunkOffset;
if (this.verifyChecksum) { if (this.verifyChecksum) {
this.checksum = header.getChecksum(); this.checksum = header.getChecksum();
@ -148,7 +135,8 @@ class BlockReaderLocal implements BlockReader {
- (startOffset % checksum.getBytesPerChecksum()); - (startOffset % checksum.getBytesPerChecksum());
this.offsetFromChunkBoundary = (int) (startOffset - firstChunkOffset); this.offsetFromChunkBoundary = (int) (startOffset - firstChunkOffset);
int chunksPerChecksumRead = getSlowReadBufferNumChunks(conf, bytesPerChecksum); int chunksPerChecksumRead = getSlowReadBufferNumChunks(
conf.shortCircuitBufferSize, bytesPerChecksum);
slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead); slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead);
checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead); checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead);
// Initially the buffers have nothing to read. // Initially the buffers have nothing to read.
@ -171,7 +159,12 @@ class BlockReaderLocal implements BlockReader {
this.dataIn.getChannel().position(firstChunkOffset); this.dataIn.getChannel().position(firstChunkOffset);
success = true; success = true;
} finally { } finally {
if (!success) { if (success) {
if (LOG.isDebugEnabled()) {
LOG.debug("Created BlockReaderLocal for file " + filename
+ " block " + block + " in datanode " + datanodeID);
}
} else {
if (slowReadBuff != null) bufferPool.returnBuffer(slowReadBuff); if (slowReadBuff != null) bufferPool.returnBuffer(slowReadBuff);
if (checksumBuff != null) bufferPool.returnBuffer(checksumBuff); if (checksumBuff != null) bufferPool.returnBuffer(checksumBuff);
} }

View File

@ -21,7 +21,6 @@ import java.io.DataInputStream;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.Collections; import java.util.Collections;
@ -32,17 +31,15 @@ import java.util.Map;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.util.DirectBufferPool; import org.apache.hadoop.hdfs.util.DirectBufferPool;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
@ -70,7 +67,7 @@ import org.apache.hadoop.util.DataChecksum;
* </ul> * </ul>
*/ */
class BlockReaderLocalLegacy implements BlockReader { class BlockReaderLocalLegacy implements BlockReader {
private static final Log LOG = LogFactory.getLog(DFSClient.class); private static final Log LOG = LogFactory.getLog(BlockReaderLocalLegacy.class);
//Stores the cache and proxy for a local datanode. //Stores the cache and proxy for a local datanode.
private static class LocalDatanodeInfo { private static class LocalDatanodeInfo {
@ -173,19 +170,20 @@ class BlockReaderLocalLegacy implements BlockReader {
/** /**
* The only way this object can be instantiated. * The only way this object can be instantiated.
*/ */
static BlockReaderLocalLegacy newBlockReader(UserGroupInformation ugi, static BlockReaderLocalLegacy newBlockReader(DFSClient dfsClient,
Configuration conf, String file, ExtendedBlock blk, String file, ExtendedBlock blk, Token<BlockTokenIdentifier> token,
Token<BlockTokenIdentifier> token, DatanodeInfo node, int socketTimeout, DatanodeInfo node, long startOffset, long length)
long startOffset, long length, boolean connectToDnViaHostname)
throws IOException { throws IOException {
final DFSClient.Conf conf = dfsClient.getConf();
LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node
.getIpcPort()); .getIpcPort());
// check the cache first // check the cache first
BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk); BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk);
if (pathinfo == null) { if (pathinfo == null) {
pathinfo = getBlockPathInfo(ugi, blk, node, conf, socketTimeout, token, pathinfo = getBlockPathInfo(dfsClient.ugi, blk, node,
connectToDnViaHostname); dfsClient.getConfiguration(), dfsClient.getHdfsTimeout(), token,
conf.connectToDnViaHostname);
} }
// check to see if the file exists. It may so happen that the // check to see if the file exists. It may so happen that the
@ -197,7 +195,7 @@ class BlockReaderLocalLegacy implements BlockReader {
FileInputStream dataIn = null; FileInputStream dataIn = null;
FileInputStream checksumIn = null; FileInputStream checksumIn = null;
BlockReaderLocalLegacy localBlockReader = null; BlockReaderLocalLegacy localBlockReader = null;
boolean skipChecksumCheck = skipChecksumCheck(conf); boolean skipChecksumCheck = conf.skipShortCircuitChecksums;
try { try {
// get a local file system // get a local file system
File blkfile = new File(pathinfo.getBlockPath()); File blkfile = new File(pathinfo.getBlockPath());
@ -285,16 +283,8 @@ class BlockReaderLocalLegacy implements BlockReader {
return pathinfo; return pathinfo;
} }
private static boolean skipChecksumCheck(Configuration conf) { private static int getSlowReadBufferNumChunks(int bufferSizeBytes,
return conf.getBoolean( int bytesPerChecksum) {
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
}
private static int getSlowReadBufferNumChunks(Configuration conf, int bytesPerChecksum) {
int bufferSizeBytes = conf.getInt(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT);
if (bufferSizeBytes < bytesPerChecksum) { if (bufferSizeBytes < bytesPerChecksum) {
throw new IllegalArgumentException("Configured BlockReaderLocalLegacy " + throw new IllegalArgumentException("Configured BlockReaderLocalLegacy " +
"buffer size (" + bufferSizeBytes + ") is not large enough to hold " + "buffer size (" + bufferSizeBytes + ") is not large enough to hold " +
@ -307,7 +297,7 @@ class BlockReaderLocalLegacy implements BlockReader {
return bufferSizeBytes / bytesPerChecksum; return bufferSizeBytes / bytesPerChecksum;
} }
private BlockReaderLocalLegacy(Configuration conf, String hdfsfile, private BlockReaderLocalLegacy(DFSClient.Conf conf, String hdfsfile,
ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset, ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
long length, BlockLocalPathInfo pathinfo, FileInputStream dataIn) long length, BlockLocalPathInfo pathinfo, FileInputStream dataIn)
throws IOException { throws IOException {
@ -316,7 +306,7 @@ class BlockReaderLocalLegacy implements BlockReader {
dataIn, startOffset, null); dataIn, startOffset, null);
} }
private BlockReaderLocalLegacy(Configuration conf, String hdfsfile, private BlockReaderLocalLegacy(DFSClient.Conf conf, String hdfsfile,
ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset, ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
long length, BlockLocalPathInfo pathinfo, DataChecksum checksum, long length, BlockLocalPathInfo pathinfo, DataChecksum checksum,
boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset, boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset,
@ -333,7 +323,8 @@ class BlockReaderLocalLegacy implements BlockReader {
this.checksumIn = checksumIn; this.checksumIn = checksumIn;
this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset); this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset);
int chunksPerChecksumRead = getSlowReadBufferNumChunks(conf, bytesPerChecksum); int chunksPerChecksumRead = getSlowReadBufferNumChunks(
conf.shortCircuitBufferSize, bytesPerChecksum);
slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead); slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead);
checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead); checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead);
// Initially the buffers have nothing to read. // Initially the buffers have nothing to read.

View File

@ -178,6 +178,9 @@ public class DFSClient implements java.io.Closeable {
public static final Log LOG = LogFactory.getLog(DFSClient.class); public static final Log LOG = LogFactory.getLog(DFSClient.class);
public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L; // 1 hour public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L; // 1 hour
static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
private final Configuration conf;
private final Conf dfsClientConf;
final ClientProtocol namenode; final ClientProtocol namenode;
/* The service used for delegation tokens */ /* The service used for delegation tokens */
private Text dtService; private Text dtService;
@ -188,14 +191,11 @@ public class DFSClient implements java.io.Closeable {
private volatile FsServerDefaults serverDefaults; private volatile FsServerDefaults serverDefaults;
private volatile long serverDefaultsLastUpdate; private volatile long serverDefaultsLastUpdate;
final String clientName; final String clientName;
Configuration conf;
SocketFactory socketFactory; SocketFactory socketFactory;
final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure; final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
final FileSystem.Statistics stats; final FileSystem.Statistics stats;
final int hdfsTimeout; // timeout value for a DFS operation.
private final String authority; private final String authority;
final PeerCache peerCache; final PeerCache peerCache;
final Conf dfsClientConf;
private Random r = new Random(); private Random r = new Random();
private SocketAddress[] localInterfaceAddrs; private SocketAddress[] localInterfaceAddrs;
private DataEncryptionKey encryptionKey; private DataEncryptionKey encryptionKey;
@ -204,7 +204,8 @@ public class DFSClient implements java.io.Closeable {
/** /**
* DFSClient configuration * DFSClient configuration
*/ */
static class Conf { public static class Conf {
final int hdfsTimeout; // timeout value for a DFS operation.
final int maxFailoverAttempts; final int maxFailoverAttempts;
final int failoverSleepBaseMillis; final int failoverSleepBaseMillis;
final int failoverSleepMaxMillis; final int failoverSleepMaxMillis;
@ -227,18 +228,25 @@ public class DFSClient implements java.io.Closeable {
final short defaultReplication; final short defaultReplication;
final String taskId; final String taskId;
final FsPermission uMask; final FsPermission uMask;
final boolean useLegacyBlockReaderLocal;
final boolean connectToDnViaHostname; final boolean connectToDnViaHostname;
final boolean getHdfsBlocksMetadataEnabled; final boolean getHdfsBlocksMetadataEnabled;
final int getFileBlockStorageLocationsNumThreads; final int getFileBlockStorageLocationsNumThreads;
final int getFileBlockStorageLocationsTimeout; final int getFileBlockStorageLocationsTimeout;
final boolean useLegacyBlockReader;
final boolean useLegacyBlockReaderLocal;
final String domainSocketPath; final String domainSocketPath;
final boolean skipShortCircuitChecksums; final boolean skipShortCircuitChecksums;
final int shortCircuitBufferSize; final int shortCircuitBufferSize;
final boolean shortCircuitLocalReads; final boolean shortCircuitLocalReads;
final boolean domainSocketDataTraffic; final boolean domainSocketDataTraffic;
final int shortCircuitStreamsCacheSize;
final long shortCircuitStreamsCacheExpiryMs;
public Conf(Configuration conf) {
// The hdfsTimeout is currently the same as the ipc timeout
hdfsTimeout = Client.getTimeout(conf);
Conf(Configuration conf) {
maxFailoverAttempts = conf.getInt( maxFailoverAttempts = conf.getInt(
DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT); DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
@ -277,19 +285,15 @@ public class DFSClient implements java.io.Closeable {
DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT); DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT);
prefetchSize = conf.getLong(DFS_CLIENT_READ_PREFETCH_SIZE_KEY, prefetchSize = conf.getLong(DFS_CLIENT_READ_PREFETCH_SIZE_KEY,
10 * defaultBlockSize); 10 * defaultBlockSize);
timeWindow = conf timeWindow = conf.getInt(DFS_CLIENT_RETRY_WINDOW_BASE, 3000);
.getInt(DFS_CLIENT_RETRY_WINDOW_BASE, 3000);
nCachedConnRetry = conf.getInt(DFS_CLIENT_CACHED_CONN_RETRY_KEY, nCachedConnRetry = conf.getInt(DFS_CLIENT_CACHED_CONN_RETRY_KEY,
DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT); DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT);
nBlockWriteRetry = conf.getInt(DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY, nBlockWriteRetry = conf.getInt(DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY,
DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT); DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT);
nBlockWriteLocateFollowingRetry = conf nBlockWriteLocateFollowingRetry = conf.getInt(
.getInt(DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY,
DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT); DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT);
uMask = FsPermission.getUMask(conf); uMask = FsPermission.getUMask(conf);
useLegacyBlockReaderLocal = conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL,
DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT);
connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME,
DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
getHdfsBlocksMetadataEnabled = conf.getBoolean( getHdfsBlocksMetadataEnabled = conf.getBoolean(
@ -301,20 +305,50 @@ public class DFSClient implements java.io.Closeable {
getFileBlockStorageLocationsTimeout = conf.getInt( getFileBlockStorageLocationsTimeout = conf.getInt(
DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT, DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT,
DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_DEFAULT); DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_DEFAULT);
domainSocketPath = conf.getTrimmed(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT); useLegacyBlockReader = conf.getBoolean(
skipShortCircuitChecksums = conf.getBoolean( DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT);
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT); useLegacyBlockReaderLocal = conf.getBoolean(
shortCircuitBufferSize = conf.getInt( DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY, DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT);
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT);
shortCircuitLocalReads = conf.getBoolean( shortCircuitLocalReads = conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT); DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT);
domainSocketDataTraffic = conf.getBoolean( domainSocketDataTraffic = conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT); DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT);
domainSocketPath = conf.getTrimmed(
DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT);
if (BlockReaderLocal.LOG.isDebugEnabled()) {
BlockReaderLocal.LOG.debug(
DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL
+ " = " + useLegacyBlockReaderLocal);
BlockReaderLocal.LOG.debug(
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY
+ " = " + shortCircuitLocalReads);
BlockReaderLocal.LOG.debug(
DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC
+ " = " + domainSocketDataTraffic);
BlockReaderLocal.LOG.debug(
DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY
+ " = " + domainSocketPath);
}
skipShortCircuitChecksums = conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
shortCircuitBufferSize = conf.getInt(
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT);
shortCircuitStreamsCacheSize = conf.getInt(
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_DEFAULT);
shortCircuitStreamsCacheExpiryMs = conf.getLong(
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT);
} }
private DataChecksum.Type getChecksumType(Configuration conf) { private DataChecksum.Type getChecksumType(Configuration conf) {
@ -360,10 +394,14 @@ public class DFSClient implements java.io.Closeable {
} }
} }
Conf getConf() { public Conf getConf() {
return dfsClientConf; return dfsClientConf;
} }
Configuration getConfiguration() {
return conf;
}
/** /**
* A map from file names to {@link DFSOutputStream} objects * A map from file names to {@link DFSOutputStream} objects
* that are currently being written by this client. * that are currently being written by this client.
@ -426,8 +464,6 @@ public class DFSClient implements java.io.Closeable {
this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class); this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf); this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
// The hdfsTimeout is currently the same as the ipc timeout
this.hdfsTimeout = Client.getTimeout(conf);
this.ugi = UserGroupInformation.getCurrentUser(); this.ugi = UserGroupInformation.getCurrentUser();
this.authority = nameNodeUri == null? "null": nameNodeUri.getAuthority(); this.authority = nameNodeUri == null? "null": nameNodeUri.getAuthority();
@ -542,21 +578,13 @@ public class DFSClient implements java.io.Closeable {
} }
int getHdfsTimeout() { int getHdfsTimeout() {
return hdfsTimeout; return dfsClientConf.hdfsTimeout;
} }
String getClientName() { String getClientName() {
return clientName; return clientName;
} }
/**
* @return whether the client should use hostnames instead of IPs
* when connecting to DataNodes
*/
boolean connectToDnViaHostname() {
return dfsClientConf.connectToDnViaHostname;
}
void checkOpen() throws IOException { void checkOpen() throws IOException {
if (!clientRunning) { if (!clientRunning) {
IOException result = new IOException("Filesystem closed"); IOException result = new IOException("Filesystem closed");
@ -793,6 +821,7 @@ public class DFSClient implements java.io.Closeable {
* @throws IOException * @throws IOException
* @deprecated Use Token.renew instead. * @deprecated Use Token.renew instead.
*/ */
@Deprecated
public long renewDelegationToken(Token<DelegationTokenIdentifier> token) public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
throws InvalidToken, IOException { throws InvalidToken, IOException {
LOG.info("Renewing " + DelegationTokenIdentifier.stringifyToken(token)); LOG.info("Renewing " + DelegationTokenIdentifier.stringifyToken(token));
@ -864,6 +893,7 @@ public class DFSClient implements java.io.Closeable {
* @throws IOException * @throws IOException
* @deprecated Use Token.cancel instead. * @deprecated Use Token.cancel instead.
*/ */
@Deprecated
public void cancelDelegationToken(Token<DelegationTokenIdentifier> token) public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
throws InvalidToken, IOException { throws InvalidToken, IOException {
LOG.info("Cancelling " + DelegationTokenIdentifier.stringifyToken(token)); LOG.info("Cancelling " + DelegationTokenIdentifier.stringifyToken(token));
@ -965,6 +995,11 @@ public class DFSClient implements java.io.Closeable {
return dfsClientConf.defaultReplication; return dfsClientConf.defaultReplication;
} }
public LocatedBlocks getLocatedBlocks(String src, long start)
throws IOException {
return getLocatedBlocks(src, start, dfsClientConf.prefetchSize);
}
/* /*
* This is just a wrapper around callGetBlockLocations, but non-static so that * This is just a wrapper around callGetBlockLocations, but non-static so that
* we can stub it out for tests. * we can stub it out for tests.
@ -1693,10 +1728,10 @@ public class DFSClient implements java.io.Closeable {
* @param socketFactory to create sockets to connect to DNs * @param socketFactory to create sockets to connect to DNs
* @param socketTimeout timeout to use when connecting and waiting for a response * @param socketTimeout timeout to use when connecting and waiting for a response
* @param encryptionKey the key needed to communicate with DNs in this cluster * @param encryptionKey the key needed to communicate with DNs in this cluster
* @param connectToDnViaHostname {@link #connectToDnViaHostname()} * @param connectToDnViaHostname whether the client should use hostnames instead of IPs
* @return The checksum * @return The checksum
*/ */
static MD5MD5CRC32FileChecksum getFileChecksum(String src, private static MD5MD5CRC32FileChecksum getFileChecksum(String src,
String clientName, String clientName,
ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout, ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout,
DataEncryptionKey encryptionKey, boolean connectToDnViaHostname) DataEncryptionKey encryptionKey, boolean connectToDnViaHostname)

View File

@ -72,7 +72,6 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
private final DFSClient dfsClient; private final DFSClient dfsClient;
private boolean closed = false; private boolean closed = false;
private final String src; private final String src;
private final long prefetchSize;
private BlockReader blockReader = null; private BlockReader blockReader = null;
private final boolean verifyChecksum; private final boolean verifyChecksum;
private LocatedBlocks locatedBlocks = null; private LocatedBlocks locatedBlocks = null;
@ -163,7 +162,6 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
* capped at maxBlockAcquireFailures * capped at maxBlockAcquireFailures
*/ */
private int failures = 0; private int failures = 0;
private final int timeWindow;
/* XXX Use of CocurrentHashMap is temp fix. Need to fix /* XXX Use of CocurrentHashMap is temp fix. Need to fix
* parallel accesses to DFSInputStream (through ptreads) properly */ * parallel accesses to DFSInputStream (through ptreads) properly */
@ -173,8 +171,6 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
private final byte[] oneByteBuf = new byte[1]; // used for 'int read()' private final byte[] oneByteBuf = new byte[1]; // used for 'int read()'
private final int nCachedConnRetry;
void addToDeadNodes(DatanodeInfo dnInfo) { void addToDeadNodes(DatanodeInfo dnInfo) {
deadNodes.put(dnInfo, dnInfo); deadNodes.put(dnInfo, dnInfo);
} }
@ -187,15 +183,8 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
this.src = src; this.src = src;
this.peerCache = dfsClient.peerCache; this.peerCache = dfsClient.peerCache;
this.fileInputStreamCache = new FileInputStreamCache( this.fileInputStreamCache = new FileInputStreamCache(
dfsClient.conf.getInt(DFSConfigKeys. dfsClient.getConf().shortCircuitStreamsCacheSize,
DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY, dfsClient.getConf().shortCircuitStreamsCacheExpiryMs);
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_DEFAULT),
dfsClient.conf.getLong(DFSConfigKeys.
DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT));
prefetchSize = dfsClient.getConf().prefetchSize;
timeWindow = dfsClient.getConf().timeWindow;
nCachedConnRetry = dfsClient.getConf().nCachedConnRetry;
openInfo(); openInfo();
} }
@ -236,7 +225,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
} }
private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException { private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException {
LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0, prefetchSize); final LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0);
if (DFSClient.LOG.isDebugEnabled()) { if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("newInfo = " + newInfo); DFSClient.LOG.debug("newInfo = " + newInfo);
} }
@ -280,8 +269,8 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
ClientDatanodeProtocol cdp = null; ClientDatanodeProtocol cdp = null;
try { try {
cdp = DFSUtil.createClientDatanodeProtocolProxy( cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode,
datanode, dfsClient.conf, dfsClient.getConf().socketTimeout, dfsClient.getConfiguration(), dfsClient.getConf().socketTimeout,
dfsClient.getConf().connectToDnViaHostname, locatedblock); dfsClient.getConf().connectToDnViaHostname, locatedblock);
final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock()); final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock());
@ -389,8 +378,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
if (targetBlockIdx < 0) { // block is not cached if (targetBlockIdx < 0) { // block is not cached
targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx); targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
// fetch more blocks // fetch more blocks
LocatedBlocks newBlocks; final LocatedBlocks newBlocks = dfsClient.getLocatedBlocks(src, offset);
newBlocks = dfsClient.getLocatedBlocks(src, offset, prefetchSize);
assert (newBlocks != null) : "Could not find target position " + offset; assert (newBlocks != null) : "Could not find target position " + offset;
locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks()); locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
} }
@ -413,8 +401,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx); targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
} }
// fetch blocks // fetch blocks
LocatedBlocks newBlocks; final LocatedBlocks newBlocks = dfsClient.getLocatedBlocks(src, offset);
newBlocks = dfsClient.getLocatedBlocks(src, offset, prefetchSize);
if (newBlocks == null) { if (newBlocks == null) {
throw new IOException("Could not find target position " + offset); throw new IOException("Could not find target position " + offset);
} }
@ -832,7 +819,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
try { try {
DatanodeInfo chosenNode = bestNode(nodes, deadNodes); DatanodeInfo chosenNode = bestNode(nodes, deadNodes);
final String dnAddr = final String dnAddr =
chosenNode.getXferAddr(dfsClient.connectToDnViaHostname()); chosenNode.getXferAddr(dfsClient.getConf().connectToDnViaHostname);
if (DFSClient.LOG.isDebugEnabled()) { if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Connecting to datanode " + dnAddr); DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
} }
@ -861,6 +848,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
// alleviating the request rate from the server. Similarly the 3rd retry // alleviating the request rate from the server. Similarly the 3rd retry
// will wait 6000ms grace period before retry and the waiting window is // will wait 6000ms grace period before retry and the waiting window is
// expanded to 9000ms. // expanded to 9000ms.
final int timeWindow = dfsClient.getConf().timeWindow;
double waitTime = timeWindow * failures + // grace period for the last round of attempt double waitTime = timeWindow * failures + // grace period for the last round of attempt
timeWindow * (failures + 1) * DFSUtil.getRandom().nextDouble(); // expanding time window for each failure timeWindow * (failures + 1) * DFSUtil.getRandom().nextDouble(); // expanding time window for each failure
DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec."); DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec.");
@ -1011,7 +999,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
DFSClient.LOG.debug("got FileInputStreams for " + block + " from " + DFSClient.LOG.debug("got FileInputStreams for " + block + " from " +
"the FileInputStreamCache."); "the FileInputStreamCache.");
} }
return new BlockReaderLocal(dfsClient.conf, file, return new BlockReaderLocal(dfsClient.getConf(), file,
block, startOffset, len, fis[0], fis[1], chosenNode, verifyChecksum, block, startOffset, len, fis[0], fis[1], chosenNode, verifyChecksum,
fileInputStreamCache); fileInputStreamCache);
} }
@ -1023,9 +1011,8 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
DFSClient.isLocalAddress(dnAddr) && DFSClient.isLocalAddress(dnAddr) &&
(!shortCircuitForbidden())) { (!shortCircuitForbidden())) {
try { try {
return BlockReaderFactory.getLegacyBlockReaderLocal(dfsClient.ugi, return BlockReaderFactory.getLegacyBlockReaderLocal(dfsClient,
dfsClient.conf, clientName, block, blockToken, chosenNode, clientName, block, blockToken, chosenNode, startOffset);
dfsClient.hdfsTimeout, startOffset,dfsClient.connectToDnViaHostname());
} catch (IOException e) { } catch (IOException e) {
DFSClient.LOG.warn("error creating legacy BlockReaderLocal. " + DFSClient.LOG.warn("error creating legacy BlockReaderLocal. " +
"Disabling legacy local reads.", e); "Disabling legacy local reads.", e);
@ -1037,6 +1024,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
int cacheTries = 0; int cacheTries = 0;
DomainSocketFactory dsFactory = dfsClient.getDomainSocketFactory(); DomainSocketFactory dsFactory = dfsClient.getDomainSocketFactory();
BlockReader reader = null; BlockReader reader = null;
final int nCachedConnRetry = dfsClient.getConf().nCachedConnRetry;
for (; cacheTries < nCachedConnRetry; ++cacheTries) { for (; cacheTries < nCachedConnRetry; ++cacheTries) {
Peer peer = peerCache.get(chosenNode, true); Peer peer = peerCache.get(chosenNode, true);
if (peer == null) break; if (peer == null) break;
@ -1044,7 +1032,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
boolean allowShortCircuitLocalReads = dfsClient.getConf(). boolean allowShortCircuitLocalReads = dfsClient.getConf().
shortCircuitLocalReads && (!shortCircuitForbidden()); shortCircuitLocalReads && (!shortCircuitForbidden());
reader = BlockReaderFactory.newBlockReader( reader = BlockReaderFactory.newBlockReader(
dfsClient.conf, file, block, blockToken, startOffset, dfsClient.getConf(), file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode, len, verifyChecksum, clientName, peer, chosenNode,
dsFactory, peerCache, fileInputStreamCache, dsFactory, peerCache, fileInputStreamCache,
allowShortCircuitLocalReads); allowShortCircuitLocalReads);
@ -1067,7 +1055,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
boolean allowShortCircuitLocalReads = dfsClient.getConf(). boolean allowShortCircuitLocalReads = dfsClient.getConf().
shortCircuitLocalReads && (!shortCircuitForbidden()); shortCircuitLocalReads && (!shortCircuitForbidden());
reader = BlockReaderFactory.newBlockReader( reader = BlockReaderFactory.newBlockReader(
dfsClient.conf, file, block, blockToken, startOffset, dfsClient.getConf(), file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode, len, verifyChecksum, clientName, peer, chosenNode,
dsFactory, peerCache, fileInputStreamCache, dsFactory, peerCache, fileInputStreamCache,
allowShortCircuitLocalReads); allowShortCircuitLocalReads);
@ -1091,7 +1079,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
if (peer == null) break; if (peer == null) break;
try { try {
reader = BlockReaderFactory.newBlockReader( reader = BlockReaderFactory.newBlockReader(
dfsClient.conf, file, block, blockToken, startOffset, dfsClient.getConf(), file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode, len, verifyChecksum, clientName, peer, chosenNode,
dsFactory, peerCache, fileInputStreamCache, false); dsFactory, peerCache, fileInputStreamCache, false);
return reader; return reader;
@ -1110,7 +1098,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
// Try to create a new remote peer. // Try to create a new remote peer.
Peer peer = newTcpPeer(dnAddr); Peer peer = newTcpPeer(dnAddr);
return BlockReaderFactory.newBlockReader( return BlockReaderFactory.newBlockReader(
dfsClient.conf, file, block, blockToken, startOffset, dfsClient.getConf(), file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode, len, verifyChecksum, clientName, peer, chosenNode,
dsFactory, peerCache, fileInputStreamCache, false); dsFactory, peerCache, fileInputStreamCache, false);
} }

View File

@ -1288,7 +1288,8 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
*/ */
static Socket createSocketForPipeline(final DatanodeInfo first, static Socket createSocketForPipeline(final DatanodeInfo first,
final int length, final DFSClient client) throws IOException { final int length, final DFSClient client) throws IOException {
final String dnAddr = first.getXferAddr(client.connectToDnViaHostname()); final String dnAddr = first.getXferAddr(
client.getConf().connectToDnViaHostname);
if (DFSClient.LOG.isDebugEnabled()) { if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Connecting to datanode " + dnAddr); DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
} }
@ -1813,8 +1814,8 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
if (closed) { if (closed) {
return; return;
} }
streamer.setLastException(new IOException("Lease timeout of " + streamer.setLastException(new IOException("Lease timeout of "
(dfsClient.hdfsTimeout/1000) + " seconds expired.")); + (dfsClient.getHdfsTimeout()/1000) + " seconds expired."));
closeThreads(true); closeThreads(true);
dfsClient.endFileLease(src); dfsClient.endFileLease(src);
} }
@ -1884,13 +1885,13 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
while (!fileComplete) { while (!fileComplete) {
fileComplete = dfsClient.namenode.complete(src, dfsClient.clientName, last); fileComplete = dfsClient.namenode.complete(src, dfsClient.clientName, last);
if (!fileComplete) { if (!fileComplete) {
final int hdfsTimeout = dfsClient.getHdfsTimeout();
if (!dfsClient.clientRunning || if (!dfsClient.clientRunning ||
(dfsClient.hdfsTimeout > 0 && (hdfsTimeout > 0 && localstart + hdfsTimeout < Time.now())) {
localstart + dfsClient.hdfsTimeout < Time.now())) {
String msg = "Unable to close file because dfsclient " + String msg = "Unable to close file because dfsclient " +
" was unable to contact the HDFS servers." + " was unable to contact the HDFS servers." +
" clientRunning " + dfsClient.clientRunning + " clientRunning " + dfsClient.clientRunning +
" hdfsTimeout " + dfsClient.hdfsTimeout; " hdfsTimeout " + hdfsTimeout;
DFSClient.LOG.info(msg); DFSClient.LOG.info(msg);
throw new IOException(msg); throw new IOException(msg);
} }

View File

@ -23,16 +23,15 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.hdfs.DFSClient.Conf; import org.apache.hadoop.hdfs.DFSClient.Conf;
import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.DomainSocket;
import com.google.common.cache.Cache; import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheBuilder;
class DomainSocketFactory { class DomainSocketFactory {
public static final Log LOG = LogFactory.getLog(DomainSocketFactory.class); private static final Log LOG = BlockReaderLocal.LOG;
private final Conf conf; private final Conf conf;
enum PathStatus { enum PathStatus {
@ -51,19 +50,24 @@ class DomainSocketFactory {
public DomainSocketFactory(Conf conf) { public DomainSocketFactory(Conf conf) {
this.conf = conf; this.conf = conf;
String feature = null; final String feature;
if (conf.shortCircuitLocalReads && (!conf.useLegacyBlockReaderLocal)) { if (conf.shortCircuitLocalReads && (!conf.useLegacyBlockReaderLocal)) {
feature = "The short-circuit local reads feature"; feature = "The short-circuit local reads feature";
} else if (conf.domainSocketDataTraffic) { } else if (conf.domainSocketDataTraffic) {
feature = "UNIX domain socket data traffic"; feature = "UNIX domain socket data traffic";
} else {
feature = null;
} }
if (feature != null) {
if (feature == null) {
LOG.debug("Both short-circuit local reads and UNIX domain socket are disabled.");
} else {
if (conf.domainSocketPath.isEmpty()) { if (conf.domainSocketPath.isEmpty()) {
LOG.warn(feature + " is disabled because you have not set " + throw new HadoopIllegalArgumentException(feature + " is enabled but "
DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY); + DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY + " is not set.");
} else if (DomainSocket.getLoadingFailureReason() != null) { } else if (DomainSocket.getLoadingFailureReason() != null) {
LOG.warn(feature + " is disabled because " + LOG.warn(feature + " cannot be used because "
DomainSocket.getLoadingFailureReason()); + DomainSocket.getLoadingFailureReason());
} else { } else {
LOG.debug(feature + " is enabled."); LOG.debug(feature + " is enabled.");
} }
@ -86,8 +90,8 @@ class DomainSocketFactory {
// sockets. // sockets.
if (conf.domainSocketPath.isEmpty()) return null; if (conf.domainSocketPath.isEmpty()) return null;
// If we can't do anything with the domain socket, don't create it. // If we can't do anything with the domain socket, don't create it.
if ((conf.domainSocketDataTraffic == false) && if (!conf.domainSocketDataTraffic &&
((!conf.shortCircuitLocalReads) || conf.useLegacyBlockReaderLocal)) { (!conf.shortCircuitLocalReads || conf.useLegacyBlockReaderLocal)) {
return null; return null;
} }
// UNIX domain sockets can only be used to talk to local peers // UNIX domain sockets can only be used to talk to local peers

View File

@ -22,7 +22,6 @@ import java.io.BufferedOutputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -463,18 +462,6 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
} }
} }
/**
* File name to print when accessing a block directly (from servlets)
* @param s Address of the block location
* @param poolId Block pool ID of the block
* @param blockId Block ID of the block
* @return string that has a file name for debug purposes
*/
public static String getFileName(final InetSocketAddress s,
final String poolId, final long blockId) {
return s.toString() + ":" + poolId + ":" + blockId;
}
@Override @Override
public int read(ByteBuffer buf) throws IOException { public int read(ByteBuffer buf) throws IOException {
throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader"); throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader");

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockReader; import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.BlockReaderFactory; import org.apache.hadoop.hdfs.BlockReaderFactory;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.net.TcpPeerServer; import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
@ -198,7 +199,8 @@ public class JspHelper {
public static void streamBlockInAscii(InetSocketAddress addr, String poolId, public static void streamBlockInAscii(InetSocketAddress addr, String poolId,
long blockId, Token<BlockTokenIdentifier> blockToken, long genStamp, long blockId, Token<BlockTokenIdentifier> blockToken, long genStamp,
long blockSize, long offsetIntoBlock, long chunkSizeToView, long blockSize, long offsetIntoBlock, long chunkSizeToView,
JspWriter out, Configuration conf, DataEncryptionKey encryptionKey) JspWriter out, Configuration conf, DFSClient.Conf dfsConf,
DataEncryptionKey encryptionKey)
throws IOException { throws IOException {
if (chunkSizeToView == 0) return; if (chunkSizeToView == 0) return;
Socket s = NetUtils.getDefaultSocketFactory(conf).createSocket(); Socket s = NetUtils.getDefaultSocketFactory(conf).createSocket();
@ -209,8 +211,7 @@ public class JspHelper {
// Use the block name for file name. // Use the block name for file name.
String file = BlockReaderFactory.getFileName(addr, poolId, blockId); String file = BlockReaderFactory.getFileName(addr, poolId, blockId);
BlockReader blockReader = BlockReaderFactory.newBlockReader( BlockReader blockReader = BlockReaderFactory.newBlockReader(dfsConf, file,
conf, file,
new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken, new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken,
offsetIntoBlock, amtToRead, true, offsetIntoBlock, amtToRead, true,
"JspHelper", TcpPeerServer.peerFromSocketAndKey(s, encryptionKey), "JspHelper", TcpPeerServer.peerFromSocketAndKey(s, encryptionKey),
@ -218,7 +219,7 @@ public class JspHelper {
addr.getHostName(), poolId, addr.getPort(), 0, 0), null, addr.getHostName(), poolId, addr.getPort(), 0, 0), null,
null, null, false); null, null, false);
byte[] buf = new byte[(int)amtToRead]; final byte[] buf = new byte[amtToRead];
int readOffset = 0; int readOffset = 0;
int retries = 2; int retries = 2;
while ( amtToRead > 0 ) { while ( amtToRead > 0 ) {

View File

@ -604,7 +604,8 @@ public class DatanodeJspHelper {
try { try {
JspHelper.streamBlockInAscii(new InetSocketAddress(req.getServerName(), JspHelper.streamBlockInAscii(new InetSocketAddress(req.getServerName(),
datanodePort), bpid, blockId, blockToken, genStamp, blockSize, datanodePort), bpid, blockId, blockToken, genStamp, blockSize,
startOffset, chunkSizeToView, out, conf, dfs.getDataEncryptionKey()); startOffset, chunkSizeToView, out, conf, dfs.getConf(),
dfs.getDataEncryptionKey());
} catch (Exception e) { } catch (Exception e) {
out.print(e); out.print(e);
} }
@ -697,7 +698,8 @@ public class DatanodeJspHelper {
out.print("<textarea cols=\"100\" rows=\"25\" wrap=\"virtual\" style=\"width:100%\" READONLY>"); out.print("<textarea cols=\"100\" rows=\"25\" wrap=\"virtual\" style=\"width:100%\" READONLY>");
JspHelper.streamBlockInAscii(addr, poolId, blockId, accessToken, genStamp, JspHelper.streamBlockInAscii(addr, poolId, blockId, accessToken, genStamp,
blockSize, startOffset, chunkSizeToView, out, conf, dfs.getDataEncryptionKey()); blockSize, startOffset, chunkSizeToView, out, conf, dfs.getConf(),
dfs.getDataEncryptionKey());
out.print("</textarea>"); out.print("</textarea>");
dfs.close(); dfs.close();
} }

View File

@ -559,8 +559,8 @@ public class NamenodeFsck {
String file = BlockReaderFactory.getFileName(targetAddr, block.getBlockPoolId(), String file = BlockReaderFactory.getFileName(targetAddr, block.getBlockPoolId(),
block.getBlockId()); block.getBlockId());
blockReader = BlockReaderFactory.newBlockReader( blockReader = BlockReaderFactory.newBlockReader(dfs.getConf(),
conf, file, block, lblock.getBlockToken(), 0, -1, true, "fsck", file, block, lblock.getBlockToken(), 0, -1, true, "fsck",
TcpPeerServer.peerFromSocketAndKey(s, namenode.getRpcServer(). TcpPeerServer.peerFromSocketAndKey(s, namenode.getRpcServer().
getDataEncryptionKey()), getDataEncryptionKey()),
chosenNode, null, null, null, false); chosenNode, null, null, null, false);

View File

@ -150,7 +150,7 @@ public class BlockReaderTestUtil {
sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
return BlockReaderFactory.newBlockReader( return BlockReaderFactory.newBlockReader(
conf, new DFSClient.Conf(conf),
targetAddr.toString()+ ":" + block.getBlockId(), block, targetAddr.toString()+ ":" + block.getBlockId(), block,
testBlock.getBlockToken(), testBlock.getBlockToken(),
offset, lenToRead, offset, lenToRead,

View File

@ -130,7 +130,7 @@ public class TestBlockReaderLocal {
test.setup(dataFile, checksum); test.setup(dataFile, checksum);
dataIn = new FileInputStream(dataFile); dataIn = new FileInputStream(dataFile);
checkIn = new FileInputStream(metaFile); checkIn = new FileInputStream(metaFile);
blockReaderLocal = new BlockReaderLocal(conf, blockReaderLocal = new BlockReaderLocal(new DFSClient.Conf(conf),
TEST_PATH.getName(), block, 0, -1, TEST_PATH.getName(), block, 0, -1,
dataIn, checkIn, datanodeID, checksum, null); dataIn, checkIn, datanodeID, checksum, null);
dataIn = null; dataIn = null;

View File

@ -325,7 +325,7 @@ public class TestParallelReadUtil {
testInfo.filepath = new Path("/TestParallelRead.dat." + i); testInfo.filepath = new Path("/TestParallelRead.dat." + i);
testInfo.authenticData = util.writeFile(testInfo.filepath, FILE_SIZE_K); testInfo.authenticData = util.writeFile(testInfo.filepath, FILE_SIZE_K);
testInfo.dis = dfsClient.open(testInfo.filepath.toString(), testInfo.dis = dfsClient.open(testInfo.filepath.toString(),
dfsClient.dfsClientConf.ioBufferSize, verifyChecksums); dfsClient.getConf().ioBufferSize, verifyChecksums);
for (int j = 0; j < nWorkerEach; ++j) { for (int j = 0; j < nWorkerEach; ++j) {
workers[nWorkers++] = new ReadWorker(testInfo, nWorkers, helper); workers[nWorkers++] = new ReadWorker(testInfo, nWorkers, helper);

View File

@ -146,7 +146,7 @@ public class TestBlockTokenWithDFS {
String file = BlockReaderFactory.getFileName(targetAddr, String file = BlockReaderFactory.getFileName(targetAddr,
"test-blockpoolid", block.getBlockId()); "test-blockpoolid", block.getBlockId());
blockReader = BlockReaderFactory.newBlockReader( blockReader = BlockReaderFactory.newBlockReader(
conf, file, block, lblock.getBlockToken(), 0, -1, new DFSClient.Conf(conf), file, block, lblock.getBlockToken(), 0, -1,
true, "TestBlockTokenWithDFS", TcpPeerServer.peerFromSocket(s), true, "TestBlockTokenWithDFS", TcpPeerServer.peerFromSocket(s),
nodes[0], null, null, null, false); nodes[0], null, null, null, false);

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockReader; import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.BlockReaderFactory; import org.apache.hadoop.hdfs.BlockReaderFactory;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
@ -284,7 +285,7 @@ public class TestDataNodeVolumeFailure {
"test-blockpoolid", "test-blockpoolid",
block.getBlockId()); block.getBlockId());
BlockReader blockReader = BlockReader blockReader =
BlockReaderFactory.newBlockReader(conf, file, block, BlockReaderFactory.newBlockReader(new DFSClient.Conf(conf), file, block,
lblock.getBlockToken(), 0, -1, true, "TestDataNodeVolumeFailure", lblock.getBlockToken(), 0, -1, true, "TestDataNodeVolumeFailure",
TcpPeerServer.peerFromSocket(s), datanode, null, null, null, false); TcpPeerServer.peerFromSocket(s), datanode, null, null, null, false);
blockReader.close(); blockReader.close();