HDFS-8100. Refactor DFSClient.Conf to a standalone class and separates short-circuit related conf to ShortCircuitConf.

This commit is contained in:
Tsz-Wo Nicholas Sze 2015-04-10 14:48:45 -07:00
parent 3d17c50176
commit 1113aca7f8
24 changed files with 929 additions and 564 deletions

View File

@ -97,6 +97,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8102. Separate webhdfs retry configuration keys from DFSConfigKeys. HDFS-8102. Separate webhdfs retry configuration keys from DFSConfigKeys.
(wheat9) (wheat9)
HDFS-8100. Refactor DFSClient.Conf to a standalone class and separates
short-circuit related conf to ShortCircuitConf. (szetszwo)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -32,6 +32,8 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
import org.apache.hadoop.hdfs.net.DomainPeer; import org.apache.hadoop.hdfs.net.DomainPeer;
import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -81,7 +83,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
static ShortCircuitReplicaCreator static ShortCircuitReplicaCreator
createShortCircuitReplicaInfoCallback = null; createShortCircuitReplicaInfoCallback = null;
private final DFSClient.Conf conf; private final DfsClientConf conf;
/** /**
* Injects failures into specific operations during unit tests. * Injects failures into specific operations during unit tests.
@ -180,10 +182,10 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
*/ */
private int remainingCacheTries; private int remainingCacheTries;
public BlockReaderFactory(DFSClient.Conf conf) { public BlockReaderFactory(DfsClientConf conf) {
this.conf = conf; this.conf = conf;
this.failureInjector = conf.brfFailureInjector; this.failureInjector = conf.getShortCircuitConf().brfFailureInjector;
this.remainingCacheTries = conf.nCachedConnRetry; this.remainingCacheTries = conf.getNumCachedConnRetry();
} }
public BlockReaderFactory setFileName(String fileName) { public BlockReaderFactory setFileName(String fileName) {
@ -317,7 +319,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
BlockReader reader = null; BlockReader reader = null;
Preconditions.checkNotNull(configuration); Preconditions.checkNotNull(configuration);
if (conf.shortCircuitLocalReads && allowShortCircuitLocalReads) { final ShortCircuitConf scConf = conf.getShortCircuitConf();
if (scConf.isShortCircuitLocalReads() && allowShortCircuitLocalReads) {
if (clientContext.getUseLegacyBlockReaderLocal()) { if (clientContext.getUseLegacyBlockReaderLocal()) {
reader = getLegacyBlockReaderLocal(); reader = getLegacyBlockReaderLocal();
if (reader != null) { if (reader != null) {
@ -336,7 +339,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
} }
} }
} }
if (conf.domainSocketDataTraffic) { if (scConf.isDomainSocketDataTraffic()) {
reader = getRemoteBlockReaderFromDomain(); reader = getRemoteBlockReaderFromDomain();
if (reader != null) { if (reader != null) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
@ -406,8 +409,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
"for short-circuit reads."); "for short-circuit reads.");
} }
if (pathInfo == null) { if (pathInfo == null) {
pathInfo = clientContext.getDomainSocketFactory(). pathInfo = clientContext.getDomainSocketFactory()
getPathInfo(inetSocketAddress, conf); .getPathInfo(inetSocketAddress, conf.getShortCircuitConf());
} }
if (!pathInfo.getPathState().getUsableForShortCircuit()) { if (!pathInfo.getPathState().getUsableForShortCircuit()) {
PerformanceAdvisory.LOG.debug("{}: {} is not usable for short circuit; " + PerformanceAdvisory.LOG.debug("{}: {} is not usable for short circuit; " +
@ -431,7 +434,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
"BlockReaderLocal via {}", this, pathInfo.getPath()); "BlockReaderLocal via {}", this, pathInfo.getPath());
return null; return null;
} }
return new BlockReaderLocal.Builder(conf). return new BlockReaderLocal.Builder(conf.getShortCircuitConf()).
setFilename(fileName). setFilename(fileName).
setBlock(block). setBlock(block).
setStartOffset(startOffset). setStartOffset(startOffset).
@ -604,8 +607,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
*/ */
private BlockReader getRemoteBlockReaderFromDomain() throws IOException { private BlockReader getRemoteBlockReaderFromDomain() throws IOException {
if (pathInfo == null) { if (pathInfo == null) {
pathInfo = clientContext.getDomainSocketFactory(). pathInfo = clientContext.getDomainSocketFactory()
getPathInfo(inetSocketAddress, conf); .getPathInfo(inetSocketAddress, conf.getShortCircuitConf());
} }
if (!pathInfo.getPathState().getUsableForDataTransfer()) { if (!pathInfo.getPathState().getUsableForDataTransfer()) {
PerformanceAdvisory.LOG.debug("{}: not trying to create a " + PerformanceAdvisory.LOG.debug("{}: not trying to create a " +
@ -744,7 +747,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
} }
} }
DomainSocket sock = clientContext.getDomainSocketFactory(). DomainSocket sock = clientContext.getDomainSocketFactory().
createSocket(pathInfo, conf.socketTimeout); createSocket(pathInfo, conf.getSocketTimeout());
if (sock == null) return null; if (sock == null) return null;
return new BlockReaderPeer(new DomainPeer(sock), false); return new BlockReaderPeer(new DomainPeer(sock), false);
} }
@ -803,9 +806,9 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
private BlockReader getRemoteBlockReader(Peer peer) throws IOException { private BlockReader getRemoteBlockReader(Peer peer) throws IOException {
if (conf.useLegacyBlockReader) { if (conf.getShortCircuitConf().isUseLegacyBlockReader()) {
return RemoteBlockReader.newBlockReader(fileName, return RemoteBlockReader.newBlockReader(fileName,
block, token, startOffset, length, conf.ioBufferSize, block, token, startOffset, length, conf.getIoBufferSize(),
verifyChecksum, clientName, peer, datanode, verifyChecksum, clientName, peer, datanode,
clientContext.getPeerCache(), cachingStrategy); clientContext.getPeerCache(), cachingStrategy);
} else { } else {

View File

@ -27,14 +27,14 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ReadOption; import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSClient.Conf; import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
import org.apache.hadoop.util.DirectBufferPool;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DirectBufferPool;
import org.apache.htrace.Sampler; import org.apache.htrace.Sampler;
import org.apache.htrace.Trace; import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope; import org.apache.htrace.TraceScope;
@ -74,10 +74,10 @@ class BlockReaderLocal implements BlockReader {
private ExtendedBlock block; private ExtendedBlock block;
private StorageType storageType; private StorageType storageType;
public Builder(Conf conf) { public Builder(ShortCircuitConf conf) {
this.maxReadahead = Integer.MAX_VALUE; this.maxReadahead = Integer.MAX_VALUE;
this.verifyChecksum = !conf.skipShortCircuitChecksums; this.verifyChecksum = !conf.isSkipShortCircuitChecksums();
this.bufferSize = conf.shortCircuitBufferSize; this.bufferSize = conf.getShortCircuitBufferSize();
} }
public Builder setVerifyChecksum(boolean verifyChecksum) { public Builder setVerifyChecksum(boolean verifyChecksum) {

View File

@ -35,6 +35,8 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ReadOption; import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -42,12 +44,12 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
import org.apache.hadoop.util.DirectBufferPool;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DirectBufferPool;
import org.apache.htrace.Sampler; import org.apache.htrace.Sampler;
import org.apache.htrace.Trace; import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope; import org.apache.htrace.TraceScope;
@ -180,12 +182,13 @@ class BlockReaderLocalLegacy implements BlockReader {
/** /**
* The only way this object can be instantiated. * The only way this object can be instantiated.
*/ */
static BlockReaderLocalLegacy newBlockReader(DFSClient.Conf conf, static BlockReaderLocalLegacy newBlockReader(DfsClientConf conf,
UserGroupInformation userGroupInformation, UserGroupInformation userGroupInformation,
Configuration configuration, String file, ExtendedBlock blk, Configuration configuration, String file, ExtendedBlock blk,
Token<BlockTokenIdentifier> token, DatanodeInfo node, Token<BlockTokenIdentifier> token, DatanodeInfo node,
long startOffset, long length, StorageType storageType) long startOffset, long length, StorageType storageType)
throws IOException { throws IOException {
final ShortCircuitConf scConf = conf.getShortCircuitConf();
LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node
.getIpcPort()); .getIpcPort());
// check the cache first // check the cache first
@ -195,8 +198,8 @@ class BlockReaderLocalLegacy implements BlockReader {
userGroupInformation = UserGroupInformation.getCurrentUser(); userGroupInformation = UserGroupInformation.getCurrentUser();
} }
pathinfo = getBlockPathInfo(userGroupInformation, blk, node, pathinfo = getBlockPathInfo(userGroupInformation, blk, node,
configuration, conf.socketTimeout, token, configuration, conf.getSocketTimeout(), token,
conf.connectToDnViaHostname, storageType); conf.isConnectToDnViaHostname(), storageType);
} }
// check to see if the file exists. It may so happen that the // check to see if the file exists. It may so happen that the
@ -208,8 +211,8 @@ class BlockReaderLocalLegacy implements BlockReader {
FileInputStream dataIn = null; FileInputStream dataIn = null;
FileInputStream checksumIn = null; FileInputStream checksumIn = null;
BlockReaderLocalLegacy localBlockReader = null; BlockReaderLocalLegacy localBlockReader = null;
boolean skipChecksumCheck = conf.skipShortCircuitChecksums || final boolean skipChecksumCheck = scConf.isSkipShortCircuitChecksums()
storageType.isTransient(); || storageType.isTransient();
try { try {
// get a local file system // get a local file system
File blkfile = new File(pathinfo.getBlockPath()); File blkfile = new File(pathinfo.getBlockPath());
@ -230,11 +233,11 @@ class BlockReaderLocalLegacy implements BlockReader {
new DataInputStream(checksumIn), blk); new DataInputStream(checksumIn), blk);
long firstChunkOffset = startOffset long firstChunkOffset = startOffset
- (startOffset % checksum.getBytesPerChecksum()); - (startOffset % checksum.getBytesPerChecksum());
localBlockReader = new BlockReaderLocalLegacy(conf, file, blk, token, localBlockReader = new BlockReaderLocalLegacy(scConf, file, blk, token,
startOffset, length, pathinfo, checksum, true, dataIn, startOffset, length, pathinfo, checksum, true, dataIn,
firstChunkOffset, checksumIn); firstChunkOffset, checksumIn);
} else { } else {
localBlockReader = new BlockReaderLocalLegacy(conf, file, blk, token, localBlockReader = new BlockReaderLocalLegacy(scConf, file, blk, token,
startOffset, length, pathinfo, dataIn); startOffset, length, pathinfo, dataIn);
} }
} catch (IOException e) { } catch (IOException e) {
@ -312,7 +315,7 @@ class BlockReaderLocalLegacy implements BlockReader {
return bufferSizeBytes / bytesPerChecksum; return bufferSizeBytes / bytesPerChecksum;
} }
private BlockReaderLocalLegacy(DFSClient.Conf conf, String hdfsfile, private BlockReaderLocalLegacy(ShortCircuitConf conf, String hdfsfile,
ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset, ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
long length, BlockLocalPathInfo pathinfo, FileInputStream dataIn) long length, BlockLocalPathInfo pathinfo, FileInputStream dataIn)
throws IOException { throws IOException {
@ -321,7 +324,7 @@ class BlockReaderLocalLegacy implements BlockReader {
dataIn, startOffset, null); dataIn, startOffset, null);
} }
private BlockReaderLocalLegacy(DFSClient.Conf conf, String hdfsfile, private BlockReaderLocalLegacy(ShortCircuitConf conf, String hdfsfile,
ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset, ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
long length, BlockLocalPathInfo pathinfo, DataChecksum checksum, long length, BlockLocalPathInfo pathinfo, DataChecksum checksum,
boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset, boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset,
@ -339,8 +342,8 @@ class BlockReaderLocalLegacy implements BlockReader {
this.checksumIn = checksumIn; this.checksumIn = checksumIn;
this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset); this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset);
int chunksPerChecksumRead = getSlowReadBufferNumChunks( final int chunksPerChecksumRead = getSlowReadBufferNumChunks(
conf.shortCircuitBufferSize, bytesPerChecksum); conf.getShortCircuitBufferSize(), bytesPerChecksum);
slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead); slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead);
checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead); checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead);
// Initially the buffers have nothing to read. // Initially the buffers have nothing to read.

View File

@ -23,13 +23,13 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSClient.Conf; import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
import org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory; import org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
import org.apache.hadoop.hdfs.util.ByteArrayManager; import org.apache.hadoop.hdfs.util.ByteArrayManager;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
/** /**
* ClientContext contains context information for a client. * ClientContext contains context information for a client.
@ -99,59 +99,24 @@ public class ClientContext {
*/ */
private boolean printedConfWarning = false; private boolean printedConfWarning = false;
private ClientContext(String name, Conf conf) { private ClientContext(String name, DfsClientConf conf) {
final ShortCircuitConf scConf = conf.getShortCircuitConf();
this.name = name; this.name = name;
this.confString = confAsString(conf); this.confString = scConf.confAsString();
this.shortCircuitCache = new ShortCircuitCache( this.shortCircuitCache = ShortCircuitCache.fromConf(scConf);
conf.shortCircuitStreamsCacheSize, this.peerCache = new PeerCache(scConf.getSocketCacheCapacity(),
conf.shortCircuitStreamsCacheExpiryMs, scConf.getSocketCacheExpiry());
conf.shortCircuitMmapCacheSize, this.keyProviderCache = new KeyProviderCache(
conf.shortCircuitMmapCacheExpiryMs, scConf.getKeyProviderCacheExpiryMs());
conf.shortCircuitMmapCacheRetryTimeout, this.useLegacyBlockReaderLocal = scConf.isUseLegacyBlockReaderLocal();
conf.shortCircuitCacheStaleThresholdMs, this.domainSocketFactory = new DomainSocketFactory(scConf);
conf.shortCircuitSharedMemoryWatcherInterruptCheckMs);
this.peerCache =
new PeerCache(conf.socketCacheCapacity, conf.socketCacheExpiry);
this.keyProviderCache = new KeyProviderCache(conf.keyProviderCacheExpiryMs);
this.useLegacyBlockReaderLocal = conf.useLegacyBlockReaderLocal;
this.domainSocketFactory = new DomainSocketFactory(conf);
this.byteArrayManager = ByteArrayManager.newInstance(conf.writeByteArrayManagerConf); this.byteArrayManager = ByteArrayManager.newInstance(
conf.getWriteByteArrayManagerConf());
} }
public static String confAsString(Conf conf) { public static ClientContext get(String name, DfsClientConf conf) {
StringBuilder builder = new StringBuilder();
builder.append("shortCircuitStreamsCacheSize = ").
append(conf.shortCircuitStreamsCacheSize).
append(", shortCircuitStreamsCacheExpiryMs = ").
append(conf.shortCircuitStreamsCacheExpiryMs).
append(", shortCircuitMmapCacheSize = ").
append(conf.shortCircuitMmapCacheSize).
append(", shortCircuitMmapCacheExpiryMs = ").
append(conf.shortCircuitMmapCacheExpiryMs).
append(", shortCircuitMmapCacheRetryTimeout = ").
append(conf.shortCircuitMmapCacheRetryTimeout).
append(", shortCircuitCacheStaleThresholdMs = ").
append(conf.shortCircuitCacheStaleThresholdMs).
append(", socketCacheCapacity = ").
append(conf.socketCacheCapacity).
append(", socketCacheExpiry = ").
append(conf.socketCacheExpiry).
append(", shortCircuitLocalReads = ").
append(conf.shortCircuitLocalReads).
append(", useLegacyBlockReaderLocal = ").
append(conf.useLegacyBlockReaderLocal).
append(", domainSocketDataTraffic = ").
append(conf.domainSocketDataTraffic).
append(", shortCircuitSharedMemoryWatcherInterruptCheckMs = ").
append(conf.shortCircuitSharedMemoryWatcherInterruptCheckMs).
append(", keyProviderCacheExpiryMs = ").
append(conf.keyProviderCacheExpiryMs);
return builder.toString();
}
public static ClientContext get(String name, Conf conf) {
ClientContext context; ClientContext context;
synchronized(ClientContext.class) { synchronized(ClientContext.class) {
context = CACHES.get(name); context = CACHES.get(name);
@ -175,12 +140,12 @@ public class ClientContext {
public static ClientContext getFromConf(Configuration conf) { public static ClientContext getFromConf(Configuration conf) {
return get(conf.get(DFSConfigKeys.DFS_CLIENT_CONTEXT, return get(conf.get(DFSConfigKeys.DFS_CLIENT_CONTEXT,
DFSConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT), DFSConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT),
new DFSClient.Conf(conf)); new DfsClientConf(conf));
} }
private void printConfWarningIfNeeded(Conf conf) { private void printConfWarningIfNeeded(DfsClientConf conf) {
String existing = this.getConfString(); String existing = this.getConfString();
String requested = confAsString(conf); String requested = conf.getShortCircuitConf().confAsString();
if (!existing.equals(requested)) { if (!existing.equals(requested)) {
if (!printedConfWarning) { if (!printedConfWarning) {
printedConfWarning = true; printedConfWarning = true;

View File

@ -18,48 +18,11 @@
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
import java.io.BufferedOutputStream; import java.io.BufferedOutputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
@ -110,7 +73,6 @@ import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersi
import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.BlockStorageLocation; import org.apache.hadoop.fs.BlockStorageLocation;
import org.apache.hadoop.fs.CacheFlag; import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileAlreadyExistsException;
@ -137,9 +99,9 @@ import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.TcpPeerServer; import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.AclException; import org.apache.hadoop.hdfs.protocol.AclException;
@ -196,14 +158,12 @@ import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException; import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.util.ByteArrayManager;
import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.LossyRetryInvocationHandler; import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RpcInvocationHandler; import org.apache.hadoop.ipc.RpcInvocationHandler;
@ -252,7 +212,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
private final Configuration conf; private final Configuration conf;
private final Conf dfsClientConf; private final DfsClientConf dfsClientConf;
final ClientProtocol namenode; final ClientProtocol namenode;
/* The service used for delegation tokens */ /* The service used for delegation tokens */
private Text dtService; private Text dtService;
@ -280,307 +240,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL; private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL;
private final Sampler<?> traceSampler; private final Sampler<?> traceSampler;
/** public DfsClientConf getConf() {
* DFSClient configuration
*/
public static class Conf {
final int hdfsTimeout; // timeout value for a DFS operation.
final int maxFailoverAttempts;
final int maxRetryAttempts;
final int failoverSleepBaseMillis;
final int failoverSleepMaxMillis;
final int maxBlockAcquireFailures;
final int confTime;
final int ioBufferSize;
final ChecksumOpt defaultChecksumOpt;
final int writePacketSize;
final int writeMaxPackets;
final ByteArrayManager.Conf writeByteArrayManagerConf;
final int socketTimeout;
final int socketCacheCapacity;
final long socketCacheExpiry;
final long excludedNodesCacheExpiry;
/** Wait time window (in msec) if BlockMissingException is caught */
final int timeWindow;
final int nCachedConnRetry;
final int nBlockWriteRetry;
final int nBlockWriteLocateFollowingRetry;
final int blockWriteLocateFollowingInitialDelayMs;
final long defaultBlockSize;
final long prefetchSize;
final short defaultReplication;
final String taskId;
final FsPermission uMask;
final boolean connectToDnViaHostname;
final boolean getHdfsBlocksMetadataEnabled;
final int getFileBlockStorageLocationsNumThreads;
final int getFileBlockStorageLocationsTimeoutMs;
final int retryTimesForGetLastBlockLength;
final int retryIntervalForGetLastBlockLength;
final long datanodeRestartTimeout;
final long dfsclientSlowIoWarningThresholdMs;
final boolean useLegacyBlockReader;
final boolean useLegacyBlockReaderLocal;
final String domainSocketPath;
final boolean skipShortCircuitChecksums;
final int shortCircuitBufferSize;
final boolean shortCircuitLocalReads;
final boolean domainSocketDataTraffic;
final int shortCircuitStreamsCacheSize;
final long shortCircuitStreamsCacheExpiryMs;
final int shortCircuitSharedMemoryWatcherInterruptCheckMs;
final boolean shortCircuitMmapEnabled;
final int shortCircuitMmapCacheSize;
final long shortCircuitMmapCacheExpiryMs;
final long shortCircuitMmapCacheRetryTimeout;
final long shortCircuitCacheStaleThresholdMs;
final long keyProviderCacheExpiryMs;
public BlockReaderFactory.FailureInjector brfFailureInjector =
new BlockReaderFactory.FailureInjector();
public Conf(Configuration conf) {
// The hdfsTimeout is currently the same as the ipc timeout
hdfsTimeout = Client.getTimeout(conf);
maxFailoverAttempts = conf.getInt(
DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
maxRetryAttempts = conf.getInt(
HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_KEY,
HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_DEFAULT);
failoverSleepBaseMillis = conf.getInt(
DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY,
DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT);
failoverSleepMaxMillis = conf.getInt(
DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY,
DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT);
maxBlockAcquireFailures = conf.getInt(
DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY,
DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT);
confTime = conf.getInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
HdfsServerConstants.WRITE_TIMEOUT);
ioBufferSize = conf.getInt(
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
defaultChecksumOpt = getChecksumOptFromConf(conf);
socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
HdfsServerConstants.READ_TIMEOUT);
/** dfs.write.packet.size is an internal config variable */
writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
writeMaxPackets = conf.getInt(
DFSConfigKeys.DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_KEY,
DFSConfigKeys.DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_DEFAULT);
final boolean byteArrayManagerEnabled = conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_ENABLED_KEY,
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_ENABLED_DEFAULT);
if (!byteArrayManagerEnabled) {
writeByteArrayManagerConf = null;
} else {
final int countThreshold = conf.getInt(
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_KEY,
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_DEFAULT);
final int countLimit = conf.getInt(
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_LIMIT_KEY,
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_LIMIT_DEFAULT);
final long countResetTimePeriodMs = conf.getLong(
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_KEY,
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_DEFAULT);
writeByteArrayManagerConf = new ByteArrayManager.Conf(
countThreshold, countLimit, countResetTimePeriodMs);
}
defaultBlockSize = conf.getLongBytes(DFS_BLOCK_SIZE_KEY,
DFS_BLOCK_SIZE_DEFAULT);
defaultReplication = (short) conf.getInt(
DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT);
taskId = conf.get("mapreduce.task.attempt.id", "NONMAPREDUCE");
socketCacheCapacity = conf.getInt(DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY,
DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT);
socketCacheExpiry = conf.getLong(DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY,
DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT);
excludedNodesCacheExpiry = conf.getLong(
DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL,
DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT);
prefetchSize = conf.getLong(DFS_CLIENT_READ_PREFETCH_SIZE_KEY,
10 * defaultBlockSize);
timeWindow = conf.getInt(
HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY,
HdfsClientConfigKeys.Retry.WINDOW_BASE_DEFAULT);
nCachedConnRetry = conf.getInt(DFS_CLIENT_CACHED_CONN_RETRY_KEY,
DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT);
nBlockWriteRetry = conf.getInt(DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY,
DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT);
nBlockWriteLocateFollowingRetry = conf.getInt(
DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY,
DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT);
blockWriteLocateFollowingInitialDelayMs = conf.getInt(
DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_KEY,
DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_DEFAULT);
uMask = FsPermission.getUMask(conf);
connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME,
DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
getHdfsBlocksMetadataEnabled = conf.getBoolean(
DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
getFileBlockStorageLocationsNumThreads = conf.getInt(
DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS,
DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS_DEFAULT);
getFileBlockStorageLocationsTimeoutMs = conf.getInt(
DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS,
DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS_DEFAULT);
retryTimesForGetLastBlockLength = conf.getInt(
HdfsClientConfigKeys.Retry.TIMES_GET_LAST_BLOCK_LENGTH_KEY,
HdfsClientConfigKeys.Retry.TIMES_GET_LAST_BLOCK_LENGTH_DEFAULT);
retryIntervalForGetLastBlockLength = conf.getInt(
HdfsClientConfigKeys.Retry.INTERVAL_GET_LAST_BLOCK_LENGTH_KEY,
HdfsClientConfigKeys.Retry.INTERVAL_GET_LAST_BLOCK_LENGTH_DEFAULT);
useLegacyBlockReader = conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER,
DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT);
useLegacyBlockReaderLocal = conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL,
DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT);
shortCircuitLocalReads = conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT);
domainSocketDataTraffic = conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT);
domainSocketPath = conf.getTrimmed(
DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT);
if (BlockReaderLocal.LOG.isDebugEnabled()) {
BlockReaderLocal.LOG.debug(
DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL
+ " = " + useLegacyBlockReaderLocal);
BlockReaderLocal.LOG.debug(
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY
+ " = " + shortCircuitLocalReads);
BlockReaderLocal.LOG.debug(
DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC
+ " = " + domainSocketDataTraffic);
BlockReaderLocal.LOG.debug(
DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY
+ " = " + domainSocketPath);
}
skipShortCircuitChecksums = conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
shortCircuitBufferSize = conf.getInt(
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT);
shortCircuitStreamsCacheSize = conf.getInt(
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_DEFAULT);
shortCircuitStreamsCacheExpiryMs = conf.getLong(
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT);
shortCircuitMmapEnabled = conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_MMAP_ENABLED,
DFSConfigKeys.DFS_CLIENT_MMAP_ENABLED_DEFAULT);
shortCircuitMmapCacheSize = conf.getInt(
DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE,
DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT);
shortCircuitMmapCacheExpiryMs = conf.getLong(
DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS,
DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT);
shortCircuitMmapCacheRetryTimeout = conf.getLong(
DFSConfigKeys.DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS,
DFSConfigKeys.DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS_DEFAULT);
shortCircuitCacheStaleThresholdMs = conf.getLong(
DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS,
DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS_DEFAULT);
shortCircuitSharedMemoryWatcherInterruptCheckMs = conf.getInt(
DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS,
DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT);
datanodeRestartTimeout = conf.getLong(
DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY,
DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT) * 1000;
dfsclientSlowIoWarningThresholdMs = conf.getLong(
DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY,
DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT);
keyProviderCacheExpiryMs = conf.getLong(
DFSConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS,
DFSConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT);
}
public boolean isUseLegacyBlockReaderLocal() {
return useLegacyBlockReaderLocal;
}
public String getDomainSocketPath() {
return domainSocketPath;
}
public boolean isShortCircuitLocalReads() {
return shortCircuitLocalReads;
}
public boolean isDomainSocketDataTraffic() {
return domainSocketDataTraffic;
}
private DataChecksum.Type getChecksumType(Configuration conf) {
final String checksum = conf.get(
DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY,
DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT);
try {
return DataChecksum.Type.valueOf(checksum);
} catch(IllegalArgumentException iae) {
LOG.warn("Bad checksum type: " + checksum + ". Using default "
+ DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT);
return DataChecksum.Type.valueOf(
DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT);
}
}
// Construct a checksum option from conf
private ChecksumOpt getChecksumOptFromConf(Configuration conf) {
DataChecksum.Type type = getChecksumType(conf);
int bytesPerChecksum = conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY,
DFS_BYTES_PER_CHECKSUM_DEFAULT);
return new ChecksumOpt(type, bytesPerChecksum);
}
// create a DataChecksum with the default option.
private DataChecksum createChecksum() throws IOException {
return createChecksum(null);
}
private DataChecksum createChecksum(ChecksumOpt userOpt) {
// Fill in any missing field with the default.
ChecksumOpt myOpt = ChecksumOpt.processChecksumOpt(
defaultChecksumOpt, userOpt);
DataChecksum dataChecksum = DataChecksum.newDataChecksum(
myOpt.getChecksumType(),
myOpt.getBytesPerChecksum());
if (dataChecksum == null) {
throw new HadoopIllegalArgumentException("Invalid checksum type: userOpt="
+ userOpt + ", default=" + defaultChecksumOpt
+ ", effective=null");
}
return dataChecksum;
}
@VisibleForTesting
public int getBlockWriteLocateFollowingInitialDelayMs() {
return blockWriteLocateFollowingInitialDelayMs;
}
}
public Conf getConf() {
return dfsClientConf; return dfsClientConf;
} }
@ -644,10 +304,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
SpanReceiverHost.getInstance(conf); SpanReceiverHost.getInstance(conf);
traceSampler = new SamplerBuilder(TraceUtils.wrapHadoopConf(conf)).build(); traceSampler = new SamplerBuilder(TraceUtils.wrapHadoopConf(conf)).build();
// Copy only the required DFSClient configuration // Copy only the required DFSClient configuration
this.dfsClientConf = new Conf(conf); this.dfsClientConf = new DfsClientConf(conf);
if (this.dfsClientConf.useLegacyBlockReaderLocal) {
LOG.debug("Using legacy short-circuit local reads.");
}
this.conf = conf; this.conf = conf;
this.stats = stats; this.stats = stats;
this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class); this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
@ -656,7 +313,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
this.ugi = UserGroupInformation.getCurrentUser(); this.ugi = UserGroupInformation.getCurrentUser();
this.authority = nameNodeUri == null? "null": nameNodeUri.getAuthority(); this.authority = nameNodeUri == null? "null": nameNodeUri.getAuthority();
this.clientName = "DFSClient_" + dfsClientConf.taskId + "_" + this.clientName = "DFSClient_" + dfsClientConf.getTaskId() + "_" +
DFSUtil.getRandom().nextInt() + "_" + Thread.currentThread().getId(); DFSUtil.getRandom().nextInt() + "_" + Thread.currentThread().getId();
int numResponseToDrop = conf.getInt( int numResponseToDrop = conf.getInt(
DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY, DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY,
@ -780,31 +437,18 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
return addr; return addr;
} }
/**
* Return the number of times the client should go back to the namenode
* to retrieve block locations when reading.
*/
int getMaxBlockAcquireFailures() {
return dfsClientConf.maxBlockAcquireFailures;
}
/** /**
* Return the timeout that clients should use when writing to datanodes. * Return the timeout that clients should use when writing to datanodes.
* @param numNodes the number of nodes in the pipeline. * @param numNodes the number of nodes in the pipeline.
*/ */
int getDatanodeWriteTimeout(int numNodes) { int getDatanodeWriteTimeout(int numNodes) {
return (dfsClientConf.confTime > 0) ? final int t = dfsClientConf.getDatanodeSocketWriteTimeout();
(dfsClientConf.confTime + HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * numNodes) : 0; return t > 0? t + HdfsServerConstants.WRITE_TIMEOUT_EXTENSION*numNodes: 0;
} }
int getDatanodeReadTimeout(int numNodes) { int getDatanodeReadTimeout(int numNodes) {
return dfsClientConf.socketTimeout > 0 ? final int t = dfsClientConf.getSocketTimeout();
(HdfsServerConstants.READ_TIMEOUT_EXTENSION * numNodes + return t > 0? HdfsServerConstants.READ_TIMEOUT_EXTENSION*numNodes + t: 0;
dfsClientConf.socketTimeout) : 0;
}
int getHdfsTimeout() {
return dfsClientConf.hdfsTimeout;
} }
@VisibleForTesting @VisibleForTesting
@ -993,14 +637,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
} }
} }
/**
* Get the default block size for this cluster
* @return the default block size in bytes
*/
public long getDefaultBlockSize() {
return dfsClientConf.defaultBlockSize;
}
/** /**
* @see ClientProtocol#getPreferredBlockSize(String) * @see ClientProtocol#getPreferredBlockSize(String)
*/ */
@ -1213,13 +849,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
namenode.reportBadBlocks(blocks); namenode.reportBadBlocks(blocks);
} }
public short getDefaultReplication() {
return dfsClientConf.defaultReplication;
}
public LocatedBlocks getLocatedBlocks(String src, long start) public LocatedBlocks getLocatedBlocks(String src, long start)
throws IOException { throws IOException {
return getLocatedBlocks(src, start, dfsClientConf.prefetchSize); return getLocatedBlocks(src, start, dfsClientConf.getPrefetchSize());
} }
/* /*
@ -1321,7 +953,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
public BlockStorageLocation[] getBlockStorageLocations( public BlockStorageLocation[] getBlockStorageLocations(
List<BlockLocation> blockLocations) throws IOException, List<BlockLocation> blockLocations) throws IOException,
UnsupportedOperationException, InvalidBlockTokenException { UnsupportedOperationException, InvalidBlockTokenException {
if (!getConf().getHdfsBlocksMetadataEnabled) { if (!getConf().isHdfsBlocksMetadataEnabled()) {
throw new UnsupportedOperationException("Datanode-side support for " + throw new UnsupportedOperationException("Datanode-side support for " +
"getVolumeBlockLocations() must also be enabled in the client " + "getVolumeBlockLocations() must also be enabled in the client " +
"configuration."); "configuration.");
@ -1358,9 +990,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
try { try {
metadatas = BlockStorageLocationUtil. metadatas = BlockStorageLocationUtil.
queryDatanodesForHdfsBlocksMetadata(conf, datanodeBlocks, queryDatanodesForHdfsBlocksMetadata(conf, datanodeBlocks,
getConf().getFileBlockStorageLocationsNumThreads, getConf().getFileBlockStorageLocationsNumThreads(),
getConf().getFileBlockStorageLocationsTimeoutMs, getConf().getFileBlockStorageLocationsTimeoutMs(),
getConf().connectToDnViaHostname); getConf().isConnectToDnViaHostname());
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("metadata returned: " LOG.trace("metadata returned: "
+ Joiner.on("\n").withKeyValueSeparator("=").join(metadatas)); + Joiner.on("\n").withKeyValueSeparator("=").join(metadatas));
@ -1514,7 +1146,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
public DFSInputStream open(String src) public DFSInputStream open(String src)
throws IOException, UnresolvedLinkException { throws IOException, UnresolvedLinkException {
return open(src, dfsClientConf.ioBufferSize, true, null); return open(src, dfsClientConf.getIoBufferSize(), true, null);
} }
/** /**
@ -1565,8 +1197,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
*/ */
public OutputStream create(String src, boolean overwrite) public OutputStream create(String src, boolean overwrite)
throws IOException { throws IOException {
return create(src, overwrite, dfsClientConf.defaultReplication, return create(src, overwrite, dfsClientConf.getDefaultReplication(),
dfsClientConf.defaultBlockSize, null); dfsClientConf.getDefaultBlockSize(), null);
} }
/** /**
@ -1576,8 +1208,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
public OutputStream create(String src, public OutputStream create(String src,
boolean overwrite, boolean overwrite,
Progressable progress) throws IOException { Progressable progress) throws IOException {
return create(src, overwrite, dfsClientConf.defaultReplication, return create(src, overwrite, dfsClientConf.getDefaultReplication(),
dfsClientConf.defaultBlockSize, progress); dfsClientConf.getDefaultBlockSize(), progress);
} }
/** /**
@ -1598,7 +1230,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
public OutputStream create(String src, boolean overwrite, short replication, public OutputStream create(String src, boolean overwrite, short replication,
long blockSize, Progressable progress) throws IOException { long blockSize, Progressable progress) throws IOException {
return create(src, overwrite, replication, blockSize, progress, return create(src, overwrite, replication, blockSize, progress,
dfsClientConf.ioBufferSize); dfsClientConf.getIoBufferSize());
} }
/** /**
@ -1680,6 +1312,13 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
progress, buffersize, checksumOpt, null); progress, buffersize, checksumOpt, null);
} }
private FsPermission applyUMask(FsPermission permission) {
if (permission == null) {
permission = FsPermission.getFileDefault();
}
return permission.applyUMask(dfsClientConf.getUMask());
}
/** /**
* Same as {@link #create(String, FsPermission, EnumSet, boolean, short, long, * Same as {@link #create(String, FsPermission, EnumSet, boolean, short, long,
* Progressable, int, ChecksumOpt)} with the addition of favoredNodes that is * Progressable, int, ChecksumOpt)} with the addition of favoredNodes that is
@ -1700,10 +1339,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
ChecksumOpt checksumOpt, ChecksumOpt checksumOpt,
InetSocketAddress[] favoredNodes) throws IOException { InetSocketAddress[] favoredNodes) throws IOException {
checkOpen(); checkOpen();
if (permission == null) { final FsPermission masked = applyUMask(permission);
permission = FsPermission.getFileDefault();
}
FsPermission masked = permission.applyUMask(dfsClientConf.uMask);
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug(src + ": masked=" + masked); LOG.debug(src + ": masked=" + masked);
} }
@ -1785,8 +1421,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
throws IOException { throws IOException {
TraceScope scope = getPathTraceScope("createSymlink", target); TraceScope scope = getPathTraceScope("createSymlink", target);
try { try {
FsPermission dirPerm = final FsPermission dirPerm = applyUMask(null);
FsPermission.getDefault().applyUMask(dfsClientConf.uMask);
namenode.createSymlink(target, link, dirPerm, createParent); namenode.createSymlink(target, link, dirPerm, createParent);
} catch (RemoteException re) { } catch (RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class, throw re.unwrapRemoteException(AccessControlException.class,
@ -1830,7 +1465,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
new EnumSetWritable<>(flag, CreateFlag.class)); new EnumSetWritable<>(flag, CreateFlag.class));
return DFSOutputStream.newStreamForAppend(this, src, flag, buffersize, return DFSOutputStream.newStreamForAppend(this, src, flag, buffersize,
progress, blkWithStatus.getLastBlock(), progress, blkWithStatus.getLastBlock(),
blkWithStatus.getFileStatus(), dfsClientConf.createChecksum(), blkWithStatus.getFileStatus(), dfsClientConf.createChecksum(null),
favoredNodes); favoredNodes);
} catch(RemoteException re) { } catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class, throw re.unwrapRemoteException(AccessControlException.class,
@ -2252,7 +1887,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
final DatanodeInfo[] datanodes = lb.getLocations(); final DatanodeInfo[] datanodes = lb.getLocations();
//try each datanode location of the block //try each datanode location of the block
final int timeout = 3000 * datanodes.length + dfsClientConf.socketTimeout; final int timeout = 3000*datanodes.length + dfsClientConf.getSocketTimeout();
boolean done = false; boolean done = false;
for(int j = 0; !done && j < datanodes.length; j++) { for(int j = 0; !done && j < datanodes.length; j++) {
DataOutputStream out = null; DataOutputStream out = null;
@ -2390,7 +2025,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
Socket sock = null; Socket sock = null;
try { try {
sock = socketFactory.createSocket(); sock = socketFactory.createSocket();
String dnAddr = dn.getXferAddr(getConf().connectToDnViaHostname); String dnAddr = dn.getXferAddr(getConf().isConnectToDnViaHostname());
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to datanode " + dnAddr); LOG.debug("Connecting to datanode " + dnAddr);
} }
@ -2423,7 +2058,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
*/ */
private Type inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn) private Type inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn)
throws IOException { throws IOException {
IOStreamPair pair = connectToDN(dn, dfsClientConf.socketTimeout, lb); IOStreamPair pair = connectToDN(dn, dfsClientConf.getSocketTimeout(), lb);
try { try {
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(pair.out, DataOutputStream out = new DataOutputStream(new BufferedOutputStream(pair.out,
@ -2978,10 +2613,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
*/ */
public boolean mkdirs(String src, FsPermission permission, public boolean mkdirs(String src, FsPermission permission,
boolean createParent) throws IOException { boolean createParent) throws IOException {
if (permission == null) { final FsPermission masked = applyUMask(permission);
permission = FsPermission.getDefault();
}
FsPermission masked = permission.applyUMask(dfsClientConf.uMask);
return primitiveMkdir(src, masked, createParent); return primitiveMkdir(src, masked, createParent);
} }
@ -3003,8 +2635,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
throws IOException { throws IOException {
checkOpen(); checkOpen();
if (absPermission == null) { if (absPermission == null) {
absPermission = absPermission = applyUMask(null);
FsPermission.getDefault().applyUMask(dfsClientConf.uMask);
} }
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
@ -3446,14 +3077,13 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
Peer peer = null; Peer peer = null;
boolean success = false; boolean success = false;
Socket sock = null; Socket sock = null;
final int socketTimeout = dfsClientConf.getSocketTimeout();
try { try {
sock = socketFactory.createSocket(); sock = socketFactory.createSocket();
NetUtils.connect(sock, addr, NetUtils.connect(sock, addr, getRandomLocalInterfaceAddr(), socketTimeout);
getRandomLocalInterfaceAddr(),
dfsClientConf.socketTimeout);
peer = TcpPeerServer.peerFromSocketAndKey(saslClient, sock, this, peer = TcpPeerServer.peerFromSocketAndKey(saslClient, sock, this,
blockToken, datanodeId); blockToken, datanodeId);
peer.setReadTimeout(dfsClientConf.socketTimeout); peer.setReadTimeout(socketTimeout);
success = true; success = true;
return peer; return peer;
} finally { } finally {

View File

@ -52,14 +52,15 @@ import org.apache.hadoop.fs.CanSetReadahead;
import org.apache.hadoop.fs.CanUnbuffer; import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.HasEnhancedByteBufferAccess; import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
import org.apache.hadoop.fs.ReadOption; import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
@ -265,9 +266,10 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
* Grab the open-file info from namenode * Grab the open-file info from namenode
*/ */
void openInfo() throws IOException, UnresolvedLinkException { void openInfo() throws IOException, UnresolvedLinkException {
final DfsClientConf conf = dfsClient.getConf();
synchronized(infoLock) { synchronized(infoLock) {
lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength(); lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
int retriesForLastBlockLength = dfsClient.getConf().retryTimesForGetLastBlockLength; int retriesForLastBlockLength = conf.getRetryTimesForGetLastBlockLength();
while (retriesForLastBlockLength > 0) { while (retriesForLastBlockLength > 0) {
// Getting last block length as -1 is a special case. When cluster // Getting last block length as -1 is a special case. When cluster
// restarts, DNs may not report immediately. At this time partial block // restarts, DNs may not report immediately. At this time partial block
@ -277,7 +279,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
DFSClient.LOG.warn("Last block locations not available. " DFSClient.LOG.warn("Last block locations not available. "
+ "Datanodes might not have reported blocks completely." + "Datanodes might not have reported blocks completely."
+ " Will retry for " + retriesForLastBlockLength + " times"); + " Will retry for " + retriesForLastBlockLength + " times");
waitFor(dfsClient.getConf().retryIntervalForGetLastBlockLength); waitFor(conf.getRetryIntervalForGetLastBlockLength());
lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength(); lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
} else { } else {
break; break;
@ -346,13 +348,14 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
assert locatedblock != null : "LocatedBlock cannot be null"; assert locatedblock != null : "LocatedBlock cannot be null";
int replicaNotFoundCount = locatedblock.getLocations().length; int replicaNotFoundCount = locatedblock.getLocations().length;
final DfsClientConf conf = dfsClient.getConf();
for(DatanodeInfo datanode : locatedblock.getLocations()) { for(DatanodeInfo datanode : locatedblock.getLocations()) {
ClientDatanodeProtocol cdp = null; ClientDatanodeProtocol cdp = null;
try { try {
cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode, cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode,
dfsClient.getConfiguration(), dfsClient.getConf().socketTimeout, dfsClient.getConfiguration(), conf.getSocketTimeout(),
dfsClient.getConf().connectToDnViaHostname, locatedblock); conf.isConnectToDnViaHostname(), locatedblock);
final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock()); final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock());
@ -938,7 +941,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(), String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(),
deadNodes, ignoredNodes); deadNodes, ignoredNodes);
String blockInfo = block.getBlock() + " file=" + src; String blockInfo = block.getBlock() + " file=" + src;
if (failures >= dfsClient.getMaxBlockAcquireFailures()) { if (failures >= dfsClient.getConf().getMaxBlockAcquireFailures()) {
String description = "Could not obtain block: " + blockInfo; String description = "Could not obtain block: " + blockInfo;
DFSClient.LOG.warn(description + errMsg DFSClient.LOG.warn(description + errMsg
+ ". Throwing a BlockMissingException"); + ". Throwing a BlockMissingException");
@ -963,7 +966,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
// alleviating the request rate from the server. Similarly the 3rd retry // alleviating the request rate from the server. Similarly the 3rd retry
// will wait 6000ms grace period before retry and the waiting window is // will wait 6000ms grace period before retry and the waiting window is
// expanded to 9000ms. // expanded to 9000ms.
final int timeWindow = dfsClient.getConf().timeWindow; final int timeWindow = dfsClient.getConf().getTimeWindow();
double waitTime = timeWindow * failures + // grace period for the last round of attempt double waitTime = timeWindow * failures + // grace period for the last round of attempt
timeWindow * (failures + 1) * DFSUtil.getRandom().nextDouble(); // expanding time window for each failure timeWindow * (failures + 1) * DFSUtil.getRandom().nextDouble(); // expanding time window for each failure
DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec."); DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec.");
@ -1012,7 +1015,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
", ignoredNodes = " + ignoredNodes); ", ignoredNodes = " + ignoredNodes);
} }
final String dnAddr = final String dnAddr =
chosenNode.getXferAddr(dfsClient.getConf().connectToDnViaHostname); chosenNode.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname());
if (DFSClient.LOG.isDebugEnabled()) { if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Connecting to datanode " + dnAddr); DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
} }
@ -1706,7 +1709,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
} }
} }
ByteBuffer buffer = null; ByteBuffer buffer = null;
if (dfsClient.getConf().shortCircuitMmapEnabled) { if (dfsClient.getConf().getShortCircuitConf().isShortCircuitMmapEnabled()) {
buffer = tryReadZeroCopy(maxLength, opts); buffer = tryReadZeroCopy(maxLength, opts);
} }
if (buffer != null) { if (buffer != null) {

View File

@ -33,10 +33,11 @@ import org.apache.hadoop.fs.FSOutputSummer;
import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.Syncable; import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@ -211,7 +212,7 @@ public class DFSOutputStream extends FSOutputSummer
this(dfsClient, src, progress, stat, checksum); this(dfsClient, src, progress, stat, checksum);
this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK); this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum); computePacketChunkSize(dfsClient.getConf().getWritePacketSize(), bytesPerChecksum);
streamer = new DataStreamer(stat, null, dfsClient, src, progress, checksum, streamer = new DataStreamer(stat, null, dfsClient, src, progress, checksum,
cachingStrategy, byteArrayManager); cachingStrategy, byteArrayManager);
@ -297,7 +298,7 @@ public class DFSOutputStream extends FSOutputSummer
adjustPacketChunkSize(stat); adjustPacketChunkSize(stat);
streamer.setPipelineInConstruction(lastBlock); streamer.setPipelineInConstruction(lastBlock);
} else { } else {
computePacketChunkSize(dfsClient.getConf().writePacketSize, computePacketChunkSize(dfsClient.getConf().getWritePacketSize(),
bytesPerChecksum); bytesPerChecksum);
streamer = new DataStreamer(stat, lastBlock != null ? lastBlock.getBlock() : null, streamer = new DataStreamer(stat, lastBlock != null ? lastBlock.getBlock() : null,
dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager); dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager);
@ -334,7 +335,8 @@ public class DFSOutputStream extends FSOutputSummer
// that expected size of of a packet, then create // that expected size of of a packet, then create
// smaller size packet. // smaller size packet.
// //
computePacketChunkSize(Math.min(dfsClient.getConf().writePacketSize, freeInLastBlock), computePacketChunkSize(
Math.min(dfsClient.getConf().getWritePacketSize(), freeInLastBlock),
bytesPerChecksum); bytesPerChecksum);
} }
} }
@ -445,7 +447,7 @@ public class DFSOutputStream extends FSOutputSummer
if (!streamer.getAppendChunk()) { if (!streamer.getAppendChunk()) {
int psize = Math.min((int)(blockSize- streamer.getBytesCurBlock()), int psize = Math.min((int)(blockSize- streamer.getBytesCurBlock()),
dfsClient.getConf().writePacketSize); dfsClient.getConf().getWritePacketSize());
computePacketChunkSize(psize, bytesPerChecksum); computePacketChunkSize(psize, bytesPerChecksum);
} }
} }
@ -722,7 +724,7 @@ public class DFSOutputStream extends FSOutputSummer
return; return;
} }
streamer.setLastException(new IOException("Lease timeout of " streamer.setLastException(new IOException("Lease timeout of "
+ (dfsClient.getHdfsTimeout()/1000) + " seconds expired.")); + (dfsClient.getConf().getHdfsTimeout()/1000) + " seconds expired."));
closeThreads(true); closeThreads(true);
dfsClient.endFileLease(fileId); dfsClient.endFileLease(fileId);
} }
@ -811,15 +813,15 @@ public class DFSOutputStream extends FSOutputSummer
// be called during unit tests // be called during unit tests
protected void completeFile(ExtendedBlock last) throws IOException { protected void completeFile(ExtendedBlock last) throws IOException {
long localstart = Time.monotonicNow(); long localstart = Time.monotonicNow();
long sleeptime = dfsClient.getConf(). final DfsClientConf conf = dfsClient.getConf();
blockWriteLocateFollowingInitialDelayMs; long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs();
boolean fileComplete = false; boolean fileComplete = false;
int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry; int retries = conf.getNumBlockWriteLocateFollowingRetry();
while (!fileComplete) { while (!fileComplete) {
fileComplete = fileComplete =
dfsClient.namenode.complete(src, dfsClient.clientName, last, fileId); dfsClient.namenode.complete(src, dfsClient.clientName, last, fileId);
if (!fileComplete) { if (!fileComplete) {
final int hdfsTimeout = dfsClient.getHdfsTimeout(); final int hdfsTimeout = conf.getHdfsTimeout();
if (!dfsClient.clientRunning if (!dfsClient.clientRunning
|| (hdfsTimeout > 0 || (hdfsTimeout > 0
&& localstart + hdfsTimeout < Time.monotonicNow())) { && localstart + hdfsTimeout < Time.monotonicNow())) {

View File

@ -43,6 +43,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -84,6 +85,7 @@ import org.apache.htrace.Span;
import org.apache.htrace.Trace; import org.apache.htrace.Trace;
import org.apache.htrace.TraceInfo; import org.apache.htrace.TraceInfo;
import org.apache.htrace.TraceScope; import org.apache.htrace.TraceScope;
import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader; import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache; import com.google.common.cache.LoadingCache;
@ -123,15 +125,15 @@ class DataStreamer extends Daemon {
*/ */
static Socket createSocketForPipeline(final DatanodeInfo first, static Socket createSocketForPipeline(final DatanodeInfo first,
final int length, final DFSClient client) throws IOException { final int length, final DFSClient client) throws IOException {
final String dnAddr = first.getXferAddr( final DfsClientConf conf = client.getConf();
client.getConf().connectToDnViaHostname); final String dnAddr = first.getXferAddr(conf.isConnectToDnViaHostname());
if (DFSClient.LOG.isDebugEnabled()) { if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Connecting to datanode " + dnAddr); DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
} }
final InetSocketAddress isa = NetUtils.createSocketAddr(dnAddr); final InetSocketAddress isa = NetUtils.createSocketAddr(dnAddr);
final Socket sock = client.socketFactory.createSocket(); final Socket sock = client.socketFactory.createSocket();
final int timeout = client.getDatanodeReadTimeout(length); final int timeout = client.getDatanodeReadTimeout(length);
NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), client.getConf().socketTimeout); NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), conf.getSocketTimeout());
sock.setSoTimeout(timeout); sock.setSoTimeout(timeout);
sock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); sock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
if(DFSClient.LOG.isDebugEnabled()) { if(DFSClient.LOG.isDebugEnabled()) {
@ -244,7 +246,7 @@ class DataStreamer extends Daemon {
this.byteArrayManager = byteArrayManage; this.byteArrayManager = byteArrayManage;
isLazyPersistFile = isLazyPersist(stat); isLazyPersistFile = isLazyPersist(stat);
this.dfsclientSlowLogThresholdMs = this.dfsclientSlowLogThresholdMs =
dfsClient.getConf().dfsclientSlowIoWarningThresholdMs; dfsClient.getConf().getSlowIoWarningThresholdMs();
excludedNodes = initExcludedNodes(); excludedNodes = initExcludedNodes();
} }
@ -368,6 +370,7 @@ class DataStreamer extends Daemon {
doSleep = processDatanodeError(); doSleep = processDatanodeError();
} }
final int halfSocketTimeout = dfsClient.getConf().getSocketTimeout()/2;
synchronized (dataQueue) { synchronized (dataQueue) {
// wait for a packet to be sent. // wait for a packet to be sent.
long now = Time.monotonicNow(); long now = Time.monotonicNow();
@ -375,8 +378,8 @@ class DataStreamer extends Daemon {
&& dataQueue.size() == 0 && && dataQueue.size() == 0 &&
(stage != BlockConstructionStage.DATA_STREAMING || (stage != BlockConstructionStage.DATA_STREAMING ||
stage == BlockConstructionStage.DATA_STREAMING && stage == BlockConstructionStage.DATA_STREAMING &&
now - lastPacket < dfsClient.getConf().socketTimeout/2)) || doSleep ) { now - lastPacket < halfSocketTimeout)) || doSleep ) {
long timeout = dfsClient.getConf().socketTimeout/2 - (now-lastPacket); long timeout = halfSocketTimeout - (now-lastPacket);
timeout = timeout <= 0 ? 1000 : timeout; timeout = timeout <= 0 ? 1000 : timeout;
timeout = (stage == BlockConstructionStage.DATA_STREAMING)? timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
timeout : 1000; timeout : 1000;
@ -627,7 +630,7 @@ class DataStreamer extends Daemon {
boolean firstWait = true; boolean firstWait = true;
try { try {
while (!streamerClosed && dataQueue.size() + ackQueue.size() > while (!streamerClosed && dataQueue.size() + ackQueue.size() >
dfsClient.getConf().writeMaxPackets) { dfsClient.getConf().getWriteMaxPackets()) {
if (firstWait) { if (firstWait) {
Span span = Trace.currentSpan(); Span span = Trace.currentSpan();
if (span != null) { if (span != null) {
@ -842,7 +845,7 @@ class DataStreamer extends Daemon {
// the local node or the only one in the pipeline. // the local node or the only one in the pipeline.
if (PipelineAck.isRestartOOBStatus(reply) && if (PipelineAck.isRestartOOBStatus(reply) &&
shouldWaitForRestart(i)) { shouldWaitForRestart(i)) {
restartDeadline = dfsClient.getConf().datanodeRestartTimeout restartDeadline = dfsClient.getConf().getDatanodeRestartTimeout()
+ Time.monotonicNow(); + Time.monotonicNow();
setRestartingNodeIndex(i); setRestartingNodeIndex(i);
String message = "A datanode is restarting: " + targets[i]; String message = "A datanode is restarting: " + targets[i];
@ -1158,7 +1161,7 @@ class DataStreamer extends Daemon {
// 4 seconds or the configured deadline period, whichever is shorter. // 4 seconds or the configured deadline period, whichever is shorter.
// This is the retry interval and recovery will be retried in this // This is the retry interval and recovery will be retried in this
// interval until timeout or success. // interval until timeout or success.
long delay = Math.min(dfsClient.getConf().datanodeRestartTimeout, long delay = Math.min(dfsClient.getConf().getDatanodeRestartTimeout(),
4000L); 4000L);
try { try {
Thread.sleep(delay); Thread.sleep(delay);
@ -1311,7 +1314,7 @@ class DataStreamer extends Daemon {
LocatedBlock lb = null; LocatedBlock lb = null;
DatanodeInfo[] nodes = null; DatanodeInfo[] nodes = null;
StorageType[] storageTypes = null; StorageType[] storageTypes = null;
int count = dfsClient.getConf().nBlockWriteRetry; int count = dfsClient.getConf().getNumBlockWriteRetry();
boolean success = false; boolean success = false;
ExtendedBlock oldBlock = block; ExtendedBlock oldBlock = block;
do { do {
@ -1471,7 +1474,7 @@ class DataStreamer extends Daemon {
} }
// Check whether there is a restart worth waiting for. // Check whether there is a restart worth waiting for.
if (checkRestart && shouldWaitForRestart(errorIndex)) { if (checkRestart && shouldWaitForRestart(errorIndex)) {
restartDeadline = dfsClient.getConf().datanodeRestartTimeout restartDeadline = dfsClient.getConf().getDatanodeRestartTimeout()
+ Time.monotonicNow(); + Time.monotonicNow();
restartingNodeIndex.set(errorIndex); restartingNodeIndex.set(errorIndex);
errorIndex = -1; errorIndex = -1;
@ -1524,9 +1527,9 @@ class DataStreamer extends Daemon {
protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes) protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
throws IOException { throws IOException {
int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry; final DfsClientConf conf = dfsClient.getConf();
long sleeptime = dfsClient.getConf(). int retries = conf.getNumBlockWriteLocateFollowingRetry();
blockWriteLocateFollowingInitialDelayMs; long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs();
while (true) { while (true) {
long localstart = Time.monotonicNow(); long localstart = Time.monotonicNow();
while (true) { while (true) {
@ -1674,7 +1677,8 @@ class DataStreamer extends Daemon {
private LoadingCache<DatanodeInfo, DatanodeInfo> initExcludedNodes() { private LoadingCache<DatanodeInfo, DatanodeInfo> initExcludedNodes() {
return CacheBuilder.newBuilder().expireAfterWrite( return CacheBuilder.newBuilder().expireAfterWrite(
dfsClient.getConf().excludedNodesCacheExpiry, TimeUnit.MILLISECONDS) dfsClient.getConf().getExcludedNodesCacheExpiry(),
TimeUnit.MILLISECONDS)
.removalListener(new RemovalListener<DatanodeInfo, DatanodeInfo>() { .removalListener(new RemovalListener<DatanodeInfo, DatanodeInfo>() {
@Override @Override
public void onRemoval( public void onRemoval(

View File

@ -159,12 +159,12 @@ public class DistributedFileSystem extends FileSystem {
@Override @Override
public long getDefaultBlockSize() { public long getDefaultBlockSize() {
return dfs.getDefaultBlockSize(); return dfs.getConf().getDefaultBlockSize();
} }
@Override @Override
public short getDefaultReplication() { public short getDefaultReplication() {
return dfs.getDefaultReplication(); return dfs.getConf().getDefaultReplication();
} }
@Override @Override

View File

@ -225,8 +225,9 @@ class LeaseRenewer {
dfsclients.add(dfsc); dfsclients.add(dfsc);
//update renewal time //update renewal time
if (dfsc.getHdfsTimeout() > 0) { final int hdfsTimeout = dfsc.getConf().getHdfsTimeout();
final long half = dfsc.getHdfsTimeout()/2; if (hdfsTimeout > 0) {
final long half = hdfsTimeout/2;
if (half < renewal) { if (half < renewal) {
this.renewal = half; this.renewal = half;
} }
@ -368,16 +369,14 @@ class LeaseRenewer {
} }
//update renewal time //update renewal time
if (renewal == dfsc.getHdfsTimeout()/2) { if (renewal == dfsc.getConf().getHdfsTimeout()/2) {
long min = HdfsConstants.LEASE_SOFTLIMIT_PERIOD; long min = HdfsConstants.LEASE_SOFTLIMIT_PERIOD;
for(DFSClient c : dfsclients) { for(DFSClient c : dfsclients) {
if (c.getHdfsTimeout() > 0) { final int timeout = c.getConf().getHdfsTimeout();
final long timeout = c.getHdfsTimeout(); if (timeout > 0 && timeout < min) {
if (timeout < min) {
min = timeout; min = timeout;
} }
} }
}
renewal = min/2; renewal = min/2;
} }
} }

View File

@ -40,8 +40,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSClient.Conf;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@ -178,12 +178,12 @@ public class NameNodeProxies {
UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth); UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth);
} else { } else {
// HA case // HA case
Conf config = new Conf(conf); DfsClientConf config = new DfsClientConf(conf);
T proxy = (T) RetryProxy.create(xface, failoverProxyProvider, T proxy = (T) RetryProxy.create(xface, failoverProxyProvider,
RetryPolicies.failoverOnNetworkException( RetryPolicies.failoverOnNetworkException(
RetryPolicies.TRY_ONCE_THEN_FAIL, config.maxFailoverAttempts, RetryPolicies.TRY_ONCE_THEN_FAIL, config.getMaxFailoverAttempts(),
config.maxRetryAttempts, config.failoverSleepBaseMillis, config.getMaxRetryAttempts(), config.getFailoverSleepBaseMillis(),
config.failoverSleepMaxMillis)); config.getFailoverSleepMaxMillis()));
Text dtService; Text dtService;
if (failoverProxyProvider.useLogicalURI()) { if (failoverProxyProvider.useLogicalURI()) {

View File

@ -0,0 +1,738 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.client.impl;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.BlockReaderFactory;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.util.ByteArrayManager;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.util.DataChecksum;
import com.google.common.annotations.VisibleForTesting;
/**
* DFSClient configuration
*/
public class DfsClientConf {
private final int hdfsTimeout; // timeout value for a DFS operation.
private final int maxFailoverAttempts;
private final int maxRetryAttempts;
private final int failoverSleepBaseMillis;
private final int failoverSleepMaxMillis;
private final int maxBlockAcquireFailures;
private final int datanodeSocketWriteTimeout;
private final int ioBufferSize;
private final ChecksumOpt defaultChecksumOpt;
private final int writePacketSize;
private final int writeMaxPackets;
private final ByteArrayManager.Conf writeByteArrayManagerConf;
private final int socketTimeout;
private final long excludedNodesCacheExpiry;
/** Wait time window (in msec) if BlockMissingException is caught */
private final int timeWindow;
private final int numCachedConnRetry;
private final int numBlockWriteRetry;
private final int numBlockWriteLocateFollowingRetry;
private final int blockWriteLocateFollowingInitialDelayMs;
private final long defaultBlockSize;
private final long prefetchSize;
private final short defaultReplication;
private final String taskId;
private final FsPermission uMask;
private final boolean connectToDnViaHostname;
private final boolean hdfsBlocksMetadataEnabled;
private final int fileBlockStorageLocationsNumThreads;
private final int fileBlockStorageLocationsTimeoutMs;
private final int retryTimesForGetLastBlockLength;
private final int retryIntervalForGetLastBlockLength;
private final long datanodeRestartTimeout;
private final long slowIoWarningThresholdMs;
private final ShortCircuitConf shortCircuitConf;
public DfsClientConf(Configuration conf) {
// The hdfsTimeout is currently the same as the ipc timeout
hdfsTimeout = Client.getTimeout(conf);
maxFailoverAttempts = conf.getInt(
DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
maxRetryAttempts = conf.getInt(
HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_KEY,
HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_DEFAULT);
failoverSleepBaseMillis = conf.getInt(
DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY,
DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT);
failoverSleepMaxMillis = conf.getInt(
DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY,
DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT);
maxBlockAcquireFailures = conf.getInt(
DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY,
DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT);
datanodeSocketWriteTimeout = conf.getInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
HdfsServerConstants.WRITE_TIMEOUT);
ioBufferSize = conf.getInt(
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
defaultChecksumOpt = getChecksumOptFromConf(conf);
socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
HdfsServerConstants.READ_TIMEOUT);
/** dfs.write.packet.size is an internal config variable */
writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
writeMaxPackets = conf.getInt(
DFSConfigKeys.DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_KEY,
DFSConfigKeys.DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_DEFAULT);
final boolean byteArrayManagerEnabled = conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_ENABLED_KEY,
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_ENABLED_DEFAULT);
if (!byteArrayManagerEnabled) {
writeByteArrayManagerConf = null;
} else {
final int countThreshold = conf.getInt(
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_KEY,
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_DEFAULT);
final int countLimit = conf.getInt(
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_LIMIT_KEY,
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_LIMIT_DEFAULT);
final long countResetTimePeriodMs = conf.getLong(
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_KEY,
DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_DEFAULT);
writeByteArrayManagerConf = new ByteArrayManager.Conf(
countThreshold, countLimit, countResetTimePeriodMs);
}
defaultBlockSize = conf.getLongBytes(DFS_BLOCK_SIZE_KEY,
DFS_BLOCK_SIZE_DEFAULT);
defaultReplication = (short) conf.getInt(
DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT);
taskId = conf.get("mapreduce.task.attempt.id", "NONMAPREDUCE");
excludedNodesCacheExpiry = conf.getLong(
DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL,
DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT);
prefetchSize = conf.getLong(DFS_CLIENT_READ_PREFETCH_SIZE_KEY,
10 * defaultBlockSize);
timeWindow = conf.getInt(
HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY,
HdfsClientConfigKeys.Retry.WINDOW_BASE_DEFAULT);
numCachedConnRetry = conf.getInt(DFS_CLIENT_CACHED_CONN_RETRY_KEY,
DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT);
numBlockWriteRetry = conf.getInt(DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY,
DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT);
numBlockWriteLocateFollowingRetry = conf.getInt(
DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY,
DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT);
blockWriteLocateFollowingInitialDelayMs = conf.getInt(
DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_KEY,
DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_DEFAULT);
uMask = FsPermission.getUMask(conf);
connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME,
DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
hdfsBlocksMetadataEnabled = conf.getBoolean(
DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
fileBlockStorageLocationsNumThreads = conf.getInt(
DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS,
DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS_DEFAULT);
fileBlockStorageLocationsTimeoutMs = conf.getInt(
DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS,
DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS_DEFAULT);
retryTimesForGetLastBlockLength = conf.getInt(
HdfsClientConfigKeys.Retry.TIMES_GET_LAST_BLOCK_LENGTH_KEY,
HdfsClientConfigKeys.Retry.TIMES_GET_LAST_BLOCK_LENGTH_DEFAULT);
retryIntervalForGetLastBlockLength = conf.getInt(
HdfsClientConfigKeys.Retry.INTERVAL_GET_LAST_BLOCK_LENGTH_KEY,
HdfsClientConfigKeys.Retry.INTERVAL_GET_LAST_BLOCK_LENGTH_DEFAULT);
datanodeRestartTimeout = conf.getLong(
DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY,
DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT) * 1000;
slowIoWarningThresholdMs = conf.getLong(
DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY,
DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT);
shortCircuitConf = new ShortCircuitConf(conf);
}
private DataChecksum.Type getChecksumType(Configuration conf) {
final String checksum = conf.get(
DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY,
DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT);
try {
return DataChecksum.Type.valueOf(checksum);
} catch(IllegalArgumentException iae) {
DFSClient.LOG.warn("Bad checksum type: " + checksum + ". Using default "
+ DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT);
return DataChecksum.Type.valueOf(
DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT);
}
}
// Construct a checksum option from conf
private ChecksumOpt getChecksumOptFromConf(Configuration conf) {
DataChecksum.Type type = getChecksumType(conf);
int bytesPerChecksum = conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY,
DFS_BYTES_PER_CHECKSUM_DEFAULT);
return new ChecksumOpt(type, bytesPerChecksum);
}
/** create a DataChecksum with the given option. */
public DataChecksum createChecksum(ChecksumOpt userOpt) {
// Fill in any missing field with the default.
ChecksumOpt opt = ChecksumOpt.processChecksumOpt(
defaultChecksumOpt, userOpt);
DataChecksum dataChecksum = DataChecksum.newDataChecksum(
opt.getChecksumType(),
opt.getBytesPerChecksum());
if (dataChecksum == null) {
throw new HadoopIllegalArgumentException("Invalid checksum type: userOpt="
+ userOpt + ", default=" + defaultChecksumOpt
+ ", effective=null");
}
return dataChecksum;
}
@VisibleForTesting
public int getBlockWriteLocateFollowingInitialDelayMs() {
return blockWriteLocateFollowingInitialDelayMs;
}
/**
* @return the hdfsTimeout
*/
public int getHdfsTimeout() {
return hdfsTimeout;
}
/**
* @return the maxFailoverAttempts
*/
public int getMaxFailoverAttempts() {
return maxFailoverAttempts;
}
/**
* @return the maxRetryAttempts
*/
public int getMaxRetryAttempts() {
return maxRetryAttempts;
}
/**
* @return the failoverSleepBaseMillis
*/
public int getFailoverSleepBaseMillis() {
return failoverSleepBaseMillis;
}
/**
* @return the failoverSleepMaxMillis
*/
public int getFailoverSleepMaxMillis() {
return failoverSleepMaxMillis;
}
/**
* @return the maxBlockAcquireFailures
*/
public int getMaxBlockAcquireFailures() {
return maxBlockAcquireFailures;
}
/**
* @return the datanodeSocketWriteTimeout
*/
public int getDatanodeSocketWriteTimeout() {
return datanodeSocketWriteTimeout;
}
/**
* @return the ioBufferSize
*/
public int getIoBufferSize() {
return ioBufferSize;
}
/**
* @return the defaultChecksumOpt
*/
public ChecksumOpt getDefaultChecksumOpt() {
return defaultChecksumOpt;
}
/**
* @return the writePacketSize
*/
public int getWritePacketSize() {
return writePacketSize;
}
/**
* @return the writeMaxPackets
*/
public int getWriteMaxPackets() {
return writeMaxPackets;
}
/**
* @return the writeByteArrayManagerConf
*/
public ByteArrayManager.Conf getWriteByteArrayManagerConf() {
return writeByteArrayManagerConf;
}
/**
* @return the socketTimeout
*/
public int getSocketTimeout() {
return socketTimeout;
}
/**
* @return the excludedNodesCacheExpiry
*/
public long getExcludedNodesCacheExpiry() {
return excludedNodesCacheExpiry;
}
/**
* @return the timeWindow
*/
public int getTimeWindow() {
return timeWindow;
}
/**
* @return the numCachedConnRetry
*/
public int getNumCachedConnRetry() {
return numCachedConnRetry;
}
/**
* @return the numBlockWriteRetry
*/
public int getNumBlockWriteRetry() {
return numBlockWriteRetry;
}
/**
* @return the numBlockWriteLocateFollowingRetry
*/
public int getNumBlockWriteLocateFollowingRetry() {
return numBlockWriteLocateFollowingRetry;
}
/**
* @return the defaultBlockSize
*/
public long getDefaultBlockSize() {
return defaultBlockSize;
}
/**
* @return the prefetchSize
*/
public long getPrefetchSize() {
return prefetchSize;
}
/**
* @return the defaultReplication
*/
public short getDefaultReplication() {
return defaultReplication;
}
/**
* @return the taskId
*/
public String getTaskId() {
return taskId;
}
/**
* @return the uMask
*/
public FsPermission getUMask() {
return uMask;
}
/**
* @return the connectToDnViaHostname
*/
public boolean isConnectToDnViaHostname() {
return connectToDnViaHostname;
}
/**
* @return the hdfsBlocksMetadataEnabled
*/
public boolean isHdfsBlocksMetadataEnabled() {
return hdfsBlocksMetadataEnabled;
}
/**
* @return the fileBlockStorageLocationsNumThreads
*/
public int getFileBlockStorageLocationsNumThreads() {
return fileBlockStorageLocationsNumThreads;
}
/**
* @return the getFileBlockStorageLocationsTimeoutMs
*/
public int getFileBlockStorageLocationsTimeoutMs() {
return fileBlockStorageLocationsTimeoutMs;
}
/**
* @return the retryTimesForGetLastBlockLength
*/
public int getRetryTimesForGetLastBlockLength() {
return retryTimesForGetLastBlockLength;
}
/**
* @return the retryIntervalForGetLastBlockLength
*/
public int getRetryIntervalForGetLastBlockLength() {
return retryIntervalForGetLastBlockLength;
}
/**
* @return the datanodeRestartTimeout
*/
public long getDatanodeRestartTimeout() {
return datanodeRestartTimeout;
}
/**
* @return the slowIoWarningThresholdMs
*/
public long getSlowIoWarningThresholdMs() {
return slowIoWarningThresholdMs;
}
/**
* @return the shortCircuitConf
*/
public ShortCircuitConf getShortCircuitConf() {
return shortCircuitConf;
}
public static class ShortCircuitConf {
private static final Log LOG = LogFactory.getLog(ShortCircuitConf.class);
private final int socketCacheCapacity;
private final long socketCacheExpiry;
private final boolean useLegacyBlockReader;
private final boolean useLegacyBlockReaderLocal;
private final String domainSocketPath;
private final boolean skipShortCircuitChecksums;
private final int shortCircuitBufferSize;
private final boolean shortCircuitLocalReads;
private final boolean domainSocketDataTraffic;
private final int shortCircuitStreamsCacheSize;
private final long shortCircuitStreamsCacheExpiryMs;
private final int shortCircuitSharedMemoryWatcherInterruptCheckMs;
private final boolean shortCircuitMmapEnabled;
private final int shortCircuitMmapCacheSize;
private final long shortCircuitMmapCacheExpiryMs;
private final long shortCircuitMmapCacheRetryTimeout;
private final long shortCircuitCacheStaleThresholdMs;
private final long keyProviderCacheExpiryMs;
@VisibleForTesting
public BlockReaderFactory.FailureInjector brfFailureInjector =
new BlockReaderFactory.FailureInjector();
public ShortCircuitConf(Configuration conf) {
socketCacheCapacity = conf.getInt(
DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY,
DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT);
socketCacheExpiry = conf.getLong(
DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY,
DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT);
useLegacyBlockReader = conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER,
DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT);
useLegacyBlockReaderLocal = conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL,
DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT);
shortCircuitLocalReads = conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT);
domainSocketDataTraffic = conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT);
domainSocketPath = conf.getTrimmed(
DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT);
if (LOG.isDebugEnabled()) {
LOG.debug(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL
+ " = " + useLegacyBlockReaderLocal);
LOG.debug(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY
+ " = " + shortCircuitLocalReads);
LOG.debug(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC
+ " = " + domainSocketDataTraffic);
LOG.debug(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY
+ " = " + domainSocketPath);
}
skipShortCircuitChecksums = conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
shortCircuitBufferSize = conf.getInt(
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT);
shortCircuitStreamsCacheSize = conf.getInt(
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_DEFAULT);
shortCircuitStreamsCacheExpiryMs = conf.getLong(
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT);
shortCircuitMmapEnabled = conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_MMAP_ENABLED,
DFSConfigKeys.DFS_CLIENT_MMAP_ENABLED_DEFAULT);
shortCircuitMmapCacheSize = conf.getInt(
DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE,
DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT);
shortCircuitMmapCacheExpiryMs = conf.getLong(
DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS,
DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT);
shortCircuitMmapCacheRetryTimeout = conf.getLong(
DFSConfigKeys.DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS,
DFSConfigKeys.DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS_DEFAULT);
shortCircuitCacheStaleThresholdMs = conf.getLong(
DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS,
DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS_DEFAULT);
shortCircuitSharedMemoryWatcherInterruptCheckMs = conf.getInt(
DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS,
DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT);
keyProviderCacheExpiryMs = conf.getLong(
DFSConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS,
DFSConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT);
}
/**
* @return the socketCacheCapacity
*/
public int getSocketCacheCapacity() {
return socketCacheCapacity;
}
/**
* @return the socketCacheExpiry
*/
public long getSocketCacheExpiry() {
return socketCacheExpiry;
}
public boolean isUseLegacyBlockReaderLocal() {
return useLegacyBlockReaderLocal;
}
public String getDomainSocketPath() {
return domainSocketPath;
}
public boolean isShortCircuitLocalReads() {
return shortCircuitLocalReads;
}
public boolean isDomainSocketDataTraffic() {
return domainSocketDataTraffic;
}
/**
* @return the useLegacyBlockReader
*/
public boolean isUseLegacyBlockReader() {
return useLegacyBlockReader;
}
/**
* @return the skipShortCircuitChecksums
*/
public boolean isSkipShortCircuitChecksums() {
return skipShortCircuitChecksums;
}
/**
* @return the shortCircuitBufferSize
*/
public int getShortCircuitBufferSize() {
return shortCircuitBufferSize;
}
/**
* @return the shortCircuitStreamsCacheSize
*/
public int getShortCircuitStreamsCacheSize() {
return shortCircuitStreamsCacheSize;
}
/**
* @return the shortCircuitStreamsCacheExpiryMs
*/
public long getShortCircuitStreamsCacheExpiryMs() {
return shortCircuitStreamsCacheExpiryMs;
}
/**
* @return the shortCircuitSharedMemoryWatcherInterruptCheckMs
*/
public int getShortCircuitSharedMemoryWatcherInterruptCheckMs() {
return shortCircuitSharedMemoryWatcherInterruptCheckMs;
}
/**
* @return the shortCircuitMmapEnabled
*/
public boolean isShortCircuitMmapEnabled() {
return shortCircuitMmapEnabled;
}
/**
* @return the shortCircuitMmapCacheSize
*/
public int getShortCircuitMmapCacheSize() {
return shortCircuitMmapCacheSize;
}
/**
* @return the shortCircuitMmapCacheExpiryMs
*/
public long getShortCircuitMmapCacheExpiryMs() {
return shortCircuitMmapCacheExpiryMs;
}
/**
* @return the shortCircuitMmapCacheRetryTimeout
*/
public long getShortCircuitMmapCacheRetryTimeout() {
return shortCircuitMmapCacheRetryTimeout;
}
/**
* @return the shortCircuitCacheStaleThresholdMs
*/
public long getShortCircuitCacheStaleThresholdMs() {
return shortCircuitCacheStaleThresholdMs;
}
/**
* @return the keyProviderCacheExpiryMs
*/
public long getKeyProviderCacheExpiryMs() {
return keyProviderCacheExpiryMs;
}
public String confAsString() {
StringBuilder builder = new StringBuilder();
builder.append("shortCircuitStreamsCacheSize = ").
append(shortCircuitStreamsCacheSize).
append(", shortCircuitStreamsCacheExpiryMs = ").
append(shortCircuitStreamsCacheExpiryMs).
append(", shortCircuitMmapCacheSize = ").
append(shortCircuitMmapCacheSize).
append(", shortCircuitMmapCacheExpiryMs = ").
append(shortCircuitMmapCacheExpiryMs).
append(", shortCircuitMmapCacheRetryTimeout = ").
append(shortCircuitMmapCacheRetryTimeout).
append(", shortCircuitCacheStaleThresholdMs = ").
append(shortCircuitCacheStaleThresholdMs).
append(", socketCacheCapacity = ").
append(socketCacheCapacity).
append(", socketCacheExpiry = ").
append(socketCacheExpiry).
append(", shortCircuitLocalReads = ").
append(shortCircuitLocalReads).
append(", useLegacyBlockReaderLocal = ").
append(useLegacyBlockReaderLocal).
append(", domainSocketDataTraffic = ").
append(domainSocketDataTraffic).
append(", shortCircuitSharedMemoryWatcherInterruptCheckMs = ").
append(shortCircuitSharedMemoryWatcherInterruptCheckMs).
append(", keyProviderCacheExpiryMs = ").
append(keyProviderCacheExpiryMs);
return builder.toString();
}
}
}

View File

@ -26,14 +26,14 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSClient.Conf;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.util.PerformanceAdvisory;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.cache.Cache; import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheBuilder;
import org.apache.hadoop.util.PerformanceAdvisory;
public class DomainSocketFactory { public class DomainSocketFactory {
private static final Log LOG = LogFactory.getLog(DomainSocketFactory.class); private static final Log LOG = LogFactory.getLog(DomainSocketFactory.class);
@ -95,7 +95,7 @@ public class DomainSocketFactory {
.expireAfterWrite(10, TimeUnit.MINUTES) .expireAfterWrite(10, TimeUnit.MINUTES)
.build(); .build();
public DomainSocketFactory(Conf conf) { public DomainSocketFactory(ShortCircuitConf conf) {
final String feature; final String feature;
if (conf.isShortCircuitLocalReads() && (!conf.isUseLegacyBlockReaderLocal())) { if (conf.isShortCircuitLocalReads() && (!conf.isUseLegacyBlockReaderLocal())) {
feature = "The short-circuit local reads feature"; feature = "The short-circuit local reads feature";
@ -129,7 +129,7 @@ public class DomainSocketFactory {
* *
* @return Information about the socket path. * @return Information about the socket path.
*/ */
public PathInfo getPathInfo(InetSocketAddress addr, DFSClient.Conf conf) { public PathInfo getPathInfo(InetSocketAddress addr, ShortCircuitConf conf) {
// If there is no domain socket path configured, we can't use domain // If there is no domain socket path configured, we can't use domain
// sockets. // sockets.
if (conf.getDomainSocketPath().isEmpty()) return PathInfo.NOT_CONFIGURED; if (conf.getDomainSocketPath().isEmpty()) return PathInfo.NOT_CONFIGURED;

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
import org.apache.hadoop.hdfs.net.DomainPeer; import org.apache.hadoop.hdfs.net.DomainPeer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@ -359,6 +360,17 @@ public class ShortCircuitCache implements Closeable {
DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT)); DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT));
} }
public static ShortCircuitCache fromConf(ShortCircuitConf conf) {
return new ShortCircuitCache(
conf.getShortCircuitStreamsCacheSize(),
conf.getShortCircuitStreamsCacheExpiryMs(),
conf.getShortCircuitMmapCacheSize(),
conf.getShortCircuitMmapCacheExpiryMs(),
conf.getShortCircuitMmapCacheRetryTimeout(),
conf.getShortCircuitCacheStaleThresholdMs(),
conf.getShortCircuitSharedMemoryWatcherInterruptCheckMs());
}
public ShortCircuitCache(int maxTotalSize, long maxNonMmappedEvictableLifespanMs, public ShortCircuitCache(int maxTotalSize, long maxNonMmappedEvictableLifespanMs,
int maxEvictableMmapedSize, long maxEvictableMmapedLifespanMs, int maxEvictableMmapedSize, long maxEvictableMmapedLifespanMs,
long mmapRetryTimeoutMs, long staleThresholdMs, int shmInterruptCheckMs) { long mmapRetryTimeoutMs, long staleThresholdMs, int shmInterruptCheckMs) {

View File

@ -42,7 +42,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.BlockReaderTestUtil; import org.apache.hadoop.hdfs.BlockReaderTestUtil;
import org.apache.hadoop.hdfs.ClientContext; import org.apache.hadoop.hdfs.ClientContext;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
@ -50,6 +49,7 @@ import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@ -359,7 +359,7 @@ public class TestEnhancedByteBufferAccess {
fsIn.close(); fsIn.close();
fsIn = fs.open(TEST_PATH); fsIn = fs.open(TEST_PATH);
final ShortCircuitCache cache = ClientContext.get( final ShortCircuitCache cache = ClientContext.get(
CONTEXT, new DFSClient.Conf(conf)). getShortCircuitCache(); CONTEXT, new DfsClientConf(conf)). getShortCircuitCache();
cache.accept(new CountingVisitor(0, 5, 5, 0)); cache.accept(new CountingVisitor(0, 5, 5, 0));
results[0] = fsIn.read(null, BLOCK_SIZE, results[0] = fsIn.read(null, BLOCK_SIZE,
EnumSet.of(ReadOption.SKIP_CHECKSUMS)); EnumSet.of(ReadOption.SKIP_CHECKSUMS));
@ -662,7 +662,7 @@ public class TestEnhancedByteBufferAccess {
final ExtendedBlock firstBlock = final ExtendedBlock firstBlock =
DFSTestUtil.getFirstBlock(fs, TEST_PATH); DFSTestUtil.getFirstBlock(fs, TEST_PATH);
final ShortCircuitCache cache = ClientContext.get( final ShortCircuitCache cache = ClientContext.get(
CONTEXT, new DFSClient.Conf(conf)). getShortCircuitCache(); CONTEXT, new DfsClientConf(conf)). getShortCircuitCache();
waitForReplicaAnchorStatus(cache, firstBlock, true, true, 1); waitForReplicaAnchorStatus(cache, firstBlock, true, true, 1);
// Uncache the replica // Uncache the replica
fs.removeCacheDirective(directiveId); fs.removeCacheDirective(directiveId);

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
@ -187,7 +188,7 @@ public class TestBlockReaderLocal {
Time.now(), shm.allocAndRegisterSlot( Time.now(), shm.allocAndRegisterSlot(
ExtendedBlockId.fromExtendedBlock(block))); ExtendedBlockId.fromExtendedBlock(block)));
blockReaderLocal = new BlockReaderLocal.Builder( blockReaderLocal = new BlockReaderLocal.Builder(
new DFSClient.Conf(conf)). new DfsClientConf.ShortCircuitConf(conf)).
setFilename(TEST_PATH.getName()). setFilename(TEST_PATH.getName()).
setBlock(block). setBlock(block).
setShortCircuitReplica(replica). setShortCircuitReplica(replica).

View File

@ -300,7 +300,7 @@ public class TestDFSClientRetries {
NamenodeProtocols preSpyNN = cluster.getNameNodeRpc(); NamenodeProtocols preSpyNN = cluster.getNameNodeRpc();
NamenodeProtocols spyNN = spy(preSpyNN); NamenodeProtocols spyNN = spy(preSpyNN);
DFSClient client = new DFSClient(null, spyNN, conf, null); DFSClient client = new DFSClient(null, spyNN, conf, null);
int maxBlockAcquires = client.getMaxBlockAcquireFailures(); int maxBlockAcquires = client.getConf().getMaxBlockAcquireFailures();
assertTrue(maxBlockAcquires > 0); assertTrue(maxBlockAcquires > 0);

View File

@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@ -113,7 +114,7 @@ public class TestDFSOutputStream {
@Test @Test
public void testCongestionBackoff() throws IOException { public void testCongestionBackoff() throws IOException {
DFSClient.Conf dfsClientConf = mock(DFSClient.Conf.class); DfsClientConf dfsClientConf = mock(DfsClientConf.class);
DFSClient client = mock(DFSClient.class); DFSClient client = mock(DFSClient.class);
when(client.getConf()).thenReturn(dfsClientConf); when(client.getConf()).thenReturn(dfsClientConf);
client.clientRunning = true; client.clientRunning = true;

View File

@ -22,6 +22,7 @@ import static org.junit.Assert.assertSame;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
@ -59,13 +60,13 @@ public class TestLeaseRenewer {
} }
private DFSClient createMockClient() { private DFSClient createMockClient() {
final DfsClientConf mockConf = Mockito.mock(DfsClientConf.class);
Mockito.doReturn((int)FAST_GRACE_PERIOD).when(mockConf).getHdfsTimeout();
DFSClient mock = Mockito.mock(DFSClient.class); DFSClient mock = Mockito.mock(DFSClient.class);
Mockito.doReturn(true) Mockito.doReturn(true).when(mock).isClientRunning();
.when(mock).isClientRunning(); Mockito.doReturn(mockConf).when(mock).getConf();
Mockito.doReturn((int)FAST_GRACE_PERIOD) Mockito.doReturn("myclient").when(mock).getClientName();
.when(mock).getHdfsTimeout();
Mockito.doReturn("myclient")
.when(mock).getClientName();
return mock; return mock;
} }

View File

@ -32,7 +32,6 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.junit.Assume;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
@ -325,7 +324,7 @@ public class TestParallelReadUtil {
testInfo.filepath = new Path("/TestParallelRead.dat." + i); testInfo.filepath = new Path("/TestParallelRead.dat." + i);
testInfo.authenticData = util.writeFile(testInfo.filepath, FILE_SIZE_K); testInfo.authenticData = util.writeFile(testInfo.filepath, FILE_SIZE_K);
testInfo.dis = dfsClient.open(testInfo.filepath.toString(), testInfo.dis = dfsClient.open(testInfo.filepath.toString(),
dfsClient.getConf().ioBufferSize, verifyChecksums); dfsClient.getConf().getIoBufferSize(), verifyChecksums);
for (int j = 0; j < nWorkerEach; ++j) { for (int j = 0; j < nWorkerEach; ++j) {
workers[nWorkers++] = new ReadWorker(testInfo, nWorkers, helper); workers[nWorkers++] = new ReadWorker(testInfo, nWorkers, helper);

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.RemotePeerFactory; import org.apache.hadoop.hdfs.RemotePeerFactory;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.TcpPeerServer; import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
@ -146,7 +147,7 @@ public class TestBlockTokenWithDFS {
DatanodeInfo[] nodes = lblock.getLocations(); DatanodeInfo[] nodes = lblock.getLocations();
targetAddr = NetUtils.createSocketAddr(nodes[0].getXferAddr()); targetAddr = NetUtils.createSocketAddr(nodes[0].getXferAddr());
blockReader = new BlockReaderFactory(new DFSClient.Conf(conf)). blockReader = new BlockReaderFactory(new DfsClientConf(conf)).
setFileName(BlockReaderFactory.getFileName(targetAddr, setFileName(BlockReaderFactory.getFileName(targetAddr,
"test-blockpoolid", block.getBlockId())). "test-blockpoolid", block.getBlockId())).
setBlock(block). setBlock(block).

View File

@ -40,12 +40,12 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockReader; import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.BlockReaderFactory; import org.apache.hadoop.hdfs.BlockReaderFactory;
import org.apache.hadoop.hdfs.ClientContext; import org.apache.hadoop.hdfs.ClientContext;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.RemotePeerFactory; import org.apache.hadoop.hdfs.RemotePeerFactory;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.TcpPeerServer; import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
@ -405,7 +405,7 @@ public class TestDataNodeVolumeFailure {
targetAddr = NetUtils.createSocketAddr(datanode.getXferAddr()); targetAddr = NetUtils.createSocketAddr(datanode.getXferAddr());
BlockReader blockReader = new BlockReaderFactory(new DFSClient.Conf(conf)). BlockReader blockReader = new BlockReaderFactory(new DfsClientConf(conf)).
setInetSocketAddress(targetAddr). setInetSocketAddress(targetAddr).
setBlock(block). setBlock(block).
setFileName(BlockReaderFactory.getFileName(targetAddr, setFileName(BlockReaderFactory.getFileName(targetAddr,

View File

@ -657,7 +657,7 @@ public class TestShortCircuitCache {
// The second read should fail, and we should only have 1 segment and 1 slot // The second read should fail, and we should only have 1 segment and 1 slot
// left. // left.
fs.getClient().getConf().brfFailureInjector = fs.getClient().getConf().getShortCircuitConf().brfFailureInjector =
new TestCleanupFailureInjector(); new TestCleanupFailureInjector();
try { try {
DFSTestUtil.readFileBuffer(fs, TEST_PATH2); DFSTestUtil.readFileBuffer(fs, TEST_PATH2);