diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 333a1b1546e..c2f0363fc09 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -415,6 +415,9 @@ Release 2.8.0 - UNRELEASED HDFS-8102. Separate webhdfs retry configuration keys from DFSConfigKeys. (wheat9) + HDFS-8100. Refactor DFSClient.Conf to a standalone class and separates + short-circuit related conf to ShortCircuitConf. (szetszwo) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java index 8f33899ea0a..5175a873bbb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java @@ -32,6 +32,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf; import org.apache.hadoop.hdfs.net.DomainPeer; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -81,7 +83,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator { static ShortCircuitReplicaCreator createShortCircuitReplicaInfoCallback = null; - private final DFSClient.Conf conf; + private final DfsClientConf conf; /** * Injects failures into specific operations during unit tests. @@ -180,10 +182,10 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator { */ private int remainingCacheTries; - public BlockReaderFactory(DFSClient.Conf conf) { + public BlockReaderFactory(DfsClientConf conf) { this.conf = conf; - this.failureInjector = conf.brfFailureInjector; - this.remainingCacheTries = conf.nCachedConnRetry; + this.failureInjector = conf.getShortCircuitConf().brfFailureInjector; + this.remainingCacheTries = conf.getNumCachedConnRetry(); } public BlockReaderFactory setFileName(String fileName) { @@ -317,7 +319,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator { BlockReader reader = null; Preconditions.checkNotNull(configuration); - if (conf.shortCircuitLocalReads && allowShortCircuitLocalReads) { + final ShortCircuitConf scConf = conf.getShortCircuitConf(); + if (scConf.isShortCircuitLocalReads() && allowShortCircuitLocalReads) { if (clientContext.getUseLegacyBlockReaderLocal()) { reader = getLegacyBlockReaderLocal(); if (reader != null) { @@ -336,7 +339,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator { } } } - if (conf.domainSocketDataTraffic) { + if (scConf.isDomainSocketDataTraffic()) { reader = getRemoteBlockReaderFromDomain(); if (reader != null) { if (LOG.isTraceEnabled()) { @@ -406,8 +409,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator { "for short-circuit reads."); } if (pathInfo == null) { - pathInfo = clientContext.getDomainSocketFactory(). - getPathInfo(inetSocketAddress, conf); + pathInfo = clientContext.getDomainSocketFactory() + .getPathInfo(inetSocketAddress, conf.getShortCircuitConf()); } if (!pathInfo.getPathState().getUsableForShortCircuit()) { PerformanceAdvisory.LOG.debug("{}: {} is not usable for short circuit; " + @@ -431,7 +434,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator { "BlockReaderLocal via {}", this, pathInfo.getPath()); return null; } - return new BlockReaderLocal.Builder(conf). + return new BlockReaderLocal.Builder(conf.getShortCircuitConf()). setFilename(fileName). setBlock(block). setStartOffset(startOffset). @@ -604,8 +607,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator { */ private BlockReader getRemoteBlockReaderFromDomain() throws IOException { if (pathInfo == null) { - pathInfo = clientContext.getDomainSocketFactory(). - getPathInfo(inetSocketAddress, conf); + pathInfo = clientContext.getDomainSocketFactory() + .getPathInfo(inetSocketAddress, conf.getShortCircuitConf()); } if (!pathInfo.getPathState().getUsableForDataTransfer()) { PerformanceAdvisory.LOG.debug("{}: not trying to create a " + @@ -744,7 +747,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator { } } DomainSocket sock = clientContext.getDomainSocketFactory(). - createSocket(pathInfo, conf.socketTimeout); + createSocket(pathInfo, conf.getSocketTimeout()); if (sock == null) return null; return new BlockReaderPeer(new DomainPeer(sock), false); } @@ -803,9 +806,9 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator { @SuppressWarnings("deprecation") private BlockReader getRemoteBlockReader(Peer peer) throws IOException { - if (conf.useLegacyBlockReader) { + if (conf.getShortCircuitConf().isUseLegacyBlockReader()) { return RemoteBlockReader.newBlockReader(fileName, - block, token, startOffset, length, conf.ioBufferSize, + block, token, startOffset, length, conf.getIoBufferSize(), verifyChecksum, clientName, peer, datanode, clientContext.getPeerCache(), cachingStrategy); } else { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java index ab934413852..d913f3a2835 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java @@ -27,14 +27,14 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.ReadOption; import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.hdfs.DFSClient.Conf; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica; -import org.apache.hadoop.util.DirectBufferPool; import org.apache.hadoop.util.DataChecksum; +import org.apache.hadoop.util.DirectBufferPool; import org.apache.htrace.Sampler; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; @@ -74,10 +74,10 @@ class BlockReaderLocal implements BlockReader { private ExtendedBlock block; private StorageType storageType; - public Builder(Conf conf) { + public Builder(ShortCircuitConf conf) { this.maxReadahead = Integer.MAX_VALUE; - this.verifyChecksum = !conf.skipShortCircuitChecksums; - this.bufferSize = conf.shortCircuitBufferSize; + this.verifyChecksum = !conf.isSkipShortCircuitChecksums(); + this.bufferSize = conf.getShortCircuitBufferSize(); } public Builder setVerifyChecksum(boolean verifyChecksum) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java index 0c9ec45644e..8df44f8e09e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java @@ -35,6 +35,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ReadOption; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -42,12 +44,12 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; -import org.apache.hadoop.util.DirectBufferPool; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; +import org.apache.hadoop.util.DirectBufferPool; import org.apache.htrace.Sampler; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; @@ -180,12 +182,13 @@ class BlockReaderLocalLegacy implements BlockReader { /** * The only way this object can be instantiated. */ - static BlockReaderLocalLegacy newBlockReader(DFSClient.Conf conf, + static BlockReaderLocalLegacy newBlockReader(DfsClientConf conf, UserGroupInformation userGroupInformation, Configuration configuration, String file, ExtendedBlock blk, Token token, DatanodeInfo node, long startOffset, long length, StorageType storageType) throws IOException { + final ShortCircuitConf scConf = conf.getShortCircuitConf(); LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node .getIpcPort()); // check the cache first @@ -195,8 +198,8 @@ class BlockReaderLocalLegacy implements BlockReader { userGroupInformation = UserGroupInformation.getCurrentUser(); } pathinfo = getBlockPathInfo(userGroupInformation, blk, node, - configuration, conf.socketTimeout, token, - conf.connectToDnViaHostname, storageType); + configuration, conf.getSocketTimeout(), token, + conf.isConnectToDnViaHostname(), storageType); } // check to see if the file exists. It may so happen that the @@ -208,8 +211,8 @@ class BlockReaderLocalLegacy implements BlockReader { FileInputStream dataIn = null; FileInputStream checksumIn = null; BlockReaderLocalLegacy localBlockReader = null; - boolean skipChecksumCheck = conf.skipShortCircuitChecksums || - storageType.isTransient(); + final boolean skipChecksumCheck = scConf.isSkipShortCircuitChecksums() + || storageType.isTransient(); try { // get a local file system File blkfile = new File(pathinfo.getBlockPath()); @@ -230,11 +233,11 @@ class BlockReaderLocalLegacy implements BlockReader { new DataInputStream(checksumIn), blk); long firstChunkOffset = startOffset - (startOffset % checksum.getBytesPerChecksum()); - localBlockReader = new BlockReaderLocalLegacy(conf, file, blk, token, + localBlockReader = new BlockReaderLocalLegacy(scConf, file, blk, token, startOffset, length, pathinfo, checksum, true, dataIn, firstChunkOffset, checksumIn); } else { - localBlockReader = new BlockReaderLocalLegacy(conf, file, blk, token, + localBlockReader = new BlockReaderLocalLegacy(scConf, file, blk, token, startOffset, length, pathinfo, dataIn); } } catch (IOException e) { @@ -312,7 +315,7 @@ class BlockReaderLocalLegacy implements BlockReader { return bufferSizeBytes / bytesPerChecksum; } - private BlockReaderLocalLegacy(DFSClient.Conf conf, String hdfsfile, + private BlockReaderLocalLegacy(ShortCircuitConf conf, String hdfsfile, ExtendedBlock block, Token token, long startOffset, long length, BlockLocalPathInfo pathinfo, FileInputStream dataIn) throws IOException { @@ -321,7 +324,7 @@ class BlockReaderLocalLegacy implements BlockReader { dataIn, startOffset, null); } - private BlockReaderLocalLegacy(DFSClient.Conf conf, String hdfsfile, + private BlockReaderLocalLegacy(ShortCircuitConf conf, String hdfsfile, ExtendedBlock block, Token token, long startOffset, long length, BlockLocalPathInfo pathinfo, DataChecksum checksum, boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset, @@ -339,8 +342,8 @@ class BlockReaderLocalLegacy implements BlockReader { this.checksumIn = checksumIn; this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset); - int chunksPerChecksumRead = getSlowReadBufferNumChunks( - conf.shortCircuitBufferSize, bytesPerChecksum); + final int chunksPerChecksumRead = getSlowReadBufferNumChunks( + conf.getShortCircuitBufferSize(), bytesPerChecksum); slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead); checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead); // Initially the buffers have nothing to read. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java index af7c0956e88..6359def4e2f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java @@ -23,13 +23,13 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSClient.Conf; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf; import org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache; import org.apache.hadoop.hdfs.util.ByteArrayManager; import com.google.common.annotations.VisibleForTesting; -import com.google.common.cache.Cache; /** * ClientContext contains context information for a client. @@ -99,59 +99,24 @@ public class ClientContext { */ private boolean printedConfWarning = false; - private ClientContext(String name, Conf conf) { + private ClientContext(String name, DfsClientConf conf) { + final ShortCircuitConf scConf = conf.getShortCircuitConf(); + this.name = name; - this.confString = confAsString(conf); - this.shortCircuitCache = new ShortCircuitCache( - conf.shortCircuitStreamsCacheSize, - conf.shortCircuitStreamsCacheExpiryMs, - conf.shortCircuitMmapCacheSize, - conf.shortCircuitMmapCacheExpiryMs, - conf.shortCircuitMmapCacheRetryTimeout, - conf.shortCircuitCacheStaleThresholdMs, - conf.shortCircuitSharedMemoryWatcherInterruptCheckMs); - this.peerCache = - new PeerCache(conf.socketCacheCapacity, conf.socketCacheExpiry); - this.keyProviderCache = new KeyProviderCache(conf.keyProviderCacheExpiryMs); - this.useLegacyBlockReaderLocal = conf.useLegacyBlockReaderLocal; - this.domainSocketFactory = new DomainSocketFactory(conf); + this.confString = scConf.confAsString(); + this.shortCircuitCache = ShortCircuitCache.fromConf(scConf); + this.peerCache = new PeerCache(scConf.getSocketCacheCapacity(), + scConf.getSocketCacheExpiry()); + this.keyProviderCache = new KeyProviderCache( + scConf.getKeyProviderCacheExpiryMs()); + this.useLegacyBlockReaderLocal = scConf.isUseLegacyBlockReaderLocal(); + this.domainSocketFactory = new DomainSocketFactory(scConf); - this.byteArrayManager = ByteArrayManager.newInstance(conf.writeByteArrayManagerConf); + this.byteArrayManager = ByteArrayManager.newInstance( + conf.getWriteByteArrayManagerConf()); } - public static String confAsString(Conf conf) { - StringBuilder builder = new StringBuilder(); - builder.append("shortCircuitStreamsCacheSize = "). - append(conf.shortCircuitStreamsCacheSize). - append(", shortCircuitStreamsCacheExpiryMs = "). - append(conf.shortCircuitStreamsCacheExpiryMs). - append(", shortCircuitMmapCacheSize = "). - append(conf.shortCircuitMmapCacheSize). - append(", shortCircuitMmapCacheExpiryMs = "). - append(conf.shortCircuitMmapCacheExpiryMs). - append(", shortCircuitMmapCacheRetryTimeout = "). - append(conf.shortCircuitMmapCacheRetryTimeout). - append(", shortCircuitCacheStaleThresholdMs = "). - append(conf.shortCircuitCacheStaleThresholdMs). - append(", socketCacheCapacity = "). - append(conf.socketCacheCapacity). - append(", socketCacheExpiry = "). - append(conf.socketCacheExpiry). - append(", shortCircuitLocalReads = "). - append(conf.shortCircuitLocalReads). - append(", useLegacyBlockReaderLocal = "). - append(conf.useLegacyBlockReaderLocal). - append(", domainSocketDataTraffic = "). - append(conf.domainSocketDataTraffic). - append(", shortCircuitSharedMemoryWatcherInterruptCheckMs = "). - append(conf.shortCircuitSharedMemoryWatcherInterruptCheckMs). - append(", keyProviderCacheExpiryMs = "). - append(conf.keyProviderCacheExpiryMs); - - return builder.toString(); - } - - public static ClientContext get(String name, Conf conf) { + public static ClientContext get(String name, DfsClientConf conf) { ClientContext context; synchronized(ClientContext.class) { context = CACHES.get(name); @@ -175,12 +140,12 @@ public class ClientContext { public static ClientContext getFromConf(Configuration conf) { return get(conf.get(DFSConfigKeys.DFS_CLIENT_CONTEXT, DFSConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT), - new DFSClient.Conf(conf)); + new DfsClientConf(conf)); } - private void printConfWarningIfNeeded(Conf conf) { + private void printConfWarningIfNeeded(DfsClientConf conf) { String existing = this.getConfString(); - String requested = confAsString(conf); + String requested = conf.getShortCircuitConf().confAsString(); if (!existing.equals(requested)) { if (!printedConfWarning) { printedConfWarning = true; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index d43e7de8f39..f79d160066a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -18,48 +18,11 @@ package org.apache.hadoop.hdfs; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY; import java.io.BufferedOutputStream; import java.io.DataInputStream; @@ -109,7 +72,6 @@ import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersi import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockStorageLocation; import org.apache.hadoop.fs.CacheFlag; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FileAlreadyExistsException; @@ -136,9 +98,9 @@ import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.TcpPeerServer; import org.apache.hadoop.hdfs.protocol.AclException; @@ -195,14 +157,12 @@ import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.SafeModeException; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; -import org.apache.hadoop.hdfs.util.ByteArrayManager; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.retry.LossyRetryInvocationHandler; -import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.DNS; @@ -250,7 +210,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB private final Configuration conf; - private final Conf dfsClientConf; + private final DfsClientConf dfsClientConf; final ClientProtocol namenode; /* The service used for delegation tokens */ private Text dtService; @@ -278,307 +238,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL; private final Sampler traceSampler; - /** - * DFSClient configuration - */ - public static class Conf { - final int hdfsTimeout; // timeout value for a DFS operation. - - final int maxFailoverAttempts; - final int maxRetryAttempts; - final int failoverSleepBaseMillis; - final int failoverSleepMaxMillis; - final int maxBlockAcquireFailures; - final int confTime; - final int ioBufferSize; - final ChecksumOpt defaultChecksumOpt; - final int writePacketSize; - final int writeMaxPackets; - final ByteArrayManager.Conf writeByteArrayManagerConf; - final int socketTimeout; - final int socketCacheCapacity; - final long socketCacheExpiry; - final long excludedNodesCacheExpiry; - /** Wait time window (in msec) if BlockMissingException is caught */ - final int timeWindow; - final int nCachedConnRetry; - final int nBlockWriteRetry; - final int nBlockWriteLocateFollowingRetry; - final int blockWriteLocateFollowingInitialDelayMs; - final long defaultBlockSize; - final long prefetchSize; - final short defaultReplication; - final String taskId; - final FsPermission uMask; - final boolean connectToDnViaHostname; - final boolean getHdfsBlocksMetadataEnabled; - final int getFileBlockStorageLocationsNumThreads; - final int getFileBlockStorageLocationsTimeoutMs; - final int retryTimesForGetLastBlockLength; - final int retryIntervalForGetLastBlockLength; - final long datanodeRestartTimeout; - final long dfsclientSlowIoWarningThresholdMs; - - final boolean useLegacyBlockReader; - final boolean useLegacyBlockReaderLocal; - final String domainSocketPath; - final boolean skipShortCircuitChecksums; - final int shortCircuitBufferSize; - final boolean shortCircuitLocalReads; - final boolean domainSocketDataTraffic; - final int shortCircuitStreamsCacheSize; - final long shortCircuitStreamsCacheExpiryMs; - final int shortCircuitSharedMemoryWatcherInterruptCheckMs; - - final boolean shortCircuitMmapEnabled; - final int shortCircuitMmapCacheSize; - final long shortCircuitMmapCacheExpiryMs; - final long shortCircuitMmapCacheRetryTimeout; - final long shortCircuitCacheStaleThresholdMs; - - final long keyProviderCacheExpiryMs; - public BlockReaderFactory.FailureInjector brfFailureInjector = - new BlockReaderFactory.FailureInjector(); - - public Conf(Configuration conf) { - // The hdfsTimeout is currently the same as the ipc timeout - hdfsTimeout = Client.getTimeout(conf); - maxFailoverAttempts = conf.getInt( - DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, - DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT); - maxRetryAttempts = conf.getInt( - HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_KEY, - HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_DEFAULT); - failoverSleepBaseMillis = conf.getInt( - DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY, - DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT); - failoverSleepMaxMillis = conf.getInt( - DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY, - DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT); - - maxBlockAcquireFailures = conf.getInt( - DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY, - DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT); - confTime = conf.getInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, - HdfsServerConstants.WRITE_TIMEOUT); - ioBufferSize = conf.getInt( - CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, - CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT); - defaultChecksumOpt = getChecksumOptFromConf(conf); - socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, - HdfsServerConstants.READ_TIMEOUT); - /** dfs.write.packet.size is an internal config variable */ - writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, - DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT); - writeMaxPackets = conf.getInt( - DFSConfigKeys.DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_KEY, - DFSConfigKeys.DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_DEFAULT); - - final boolean byteArrayManagerEnabled = conf.getBoolean( - DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_ENABLED_KEY, - DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_ENABLED_DEFAULT); - if (!byteArrayManagerEnabled) { - writeByteArrayManagerConf = null; - } else { - final int countThreshold = conf.getInt( - DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_KEY, - DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_DEFAULT); - final int countLimit = conf.getInt( - DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_LIMIT_KEY, - DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_LIMIT_DEFAULT); - final long countResetTimePeriodMs = conf.getLong( - DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_KEY, - DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_DEFAULT); - writeByteArrayManagerConf = new ByteArrayManager.Conf( - countThreshold, countLimit, countResetTimePeriodMs); - } - - - defaultBlockSize = conf.getLongBytes(DFS_BLOCK_SIZE_KEY, - DFS_BLOCK_SIZE_DEFAULT); - defaultReplication = (short) conf.getInt( - DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT); - taskId = conf.get("mapreduce.task.attempt.id", "NONMAPREDUCE"); - socketCacheCapacity = conf.getInt(DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, - DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT); - socketCacheExpiry = conf.getLong(DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY, - DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT); - excludedNodesCacheExpiry = conf.getLong( - DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL, - DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT); - prefetchSize = conf.getLong(DFS_CLIENT_READ_PREFETCH_SIZE_KEY, - 10 * defaultBlockSize); - timeWindow = conf.getInt( - HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, - HdfsClientConfigKeys.Retry.WINDOW_BASE_DEFAULT); - nCachedConnRetry = conf.getInt(DFS_CLIENT_CACHED_CONN_RETRY_KEY, - DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT); - nBlockWriteRetry = conf.getInt(DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY, - DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT); - nBlockWriteLocateFollowingRetry = conf.getInt( - DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, - DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT); - blockWriteLocateFollowingInitialDelayMs = conf.getInt( - DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_KEY, - DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_DEFAULT); - uMask = FsPermission.getUMask(conf); - connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, - DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); - getHdfsBlocksMetadataEnabled = conf.getBoolean( - DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, - DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT); - getFileBlockStorageLocationsNumThreads = conf.getInt( - DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS, - DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS_DEFAULT); - getFileBlockStorageLocationsTimeoutMs = conf.getInt( - DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS, - DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS_DEFAULT); - retryTimesForGetLastBlockLength = conf.getInt( - HdfsClientConfigKeys.Retry.TIMES_GET_LAST_BLOCK_LENGTH_KEY, - HdfsClientConfigKeys.Retry.TIMES_GET_LAST_BLOCK_LENGTH_DEFAULT); - retryIntervalForGetLastBlockLength = conf.getInt( - HdfsClientConfigKeys.Retry.INTERVAL_GET_LAST_BLOCK_LENGTH_KEY, - HdfsClientConfigKeys.Retry.INTERVAL_GET_LAST_BLOCK_LENGTH_DEFAULT); - - useLegacyBlockReader = conf.getBoolean( - DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER, - DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT); - useLegacyBlockReaderLocal = conf.getBoolean( - DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, - DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT); - shortCircuitLocalReads = conf.getBoolean( - DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, - DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT); - domainSocketDataTraffic = conf.getBoolean( - DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, - DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT); - domainSocketPath = conf.getTrimmed( - DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, - DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT); - - if (BlockReaderLocal.LOG.isDebugEnabled()) { - BlockReaderLocal.LOG.debug( - DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL - + " = " + useLegacyBlockReaderLocal); - BlockReaderLocal.LOG.debug( - DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY - + " = " + shortCircuitLocalReads); - BlockReaderLocal.LOG.debug( - DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC - + " = " + domainSocketDataTraffic); - BlockReaderLocal.LOG.debug( - DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY - + " = " + domainSocketPath); - } - - skipShortCircuitChecksums = conf.getBoolean( - DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, - DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT); - shortCircuitBufferSize = conf.getInt( - DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY, - DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT); - shortCircuitStreamsCacheSize = conf.getInt( - DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY, - DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_DEFAULT); - shortCircuitStreamsCacheExpiryMs = conf.getLong( - DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY, - DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT); - shortCircuitMmapEnabled = conf.getBoolean( - DFSConfigKeys.DFS_CLIENT_MMAP_ENABLED, - DFSConfigKeys.DFS_CLIENT_MMAP_ENABLED_DEFAULT); - shortCircuitMmapCacheSize = conf.getInt( - DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE, - DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT); - shortCircuitMmapCacheExpiryMs = conf.getLong( - DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS, - DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT); - shortCircuitMmapCacheRetryTimeout = conf.getLong( - DFSConfigKeys.DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS, - DFSConfigKeys.DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS_DEFAULT); - shortCircuitCacheStaleThresholdMs = conf.getLong( - DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS, - DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS_DEFAULT); - shortCircuitSharedMemoryWatcherInterruptCheckMs = conf.getInt( - DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, - DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT); - - datanodeRestartTimeout = conf.getLong( - DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY, - DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT) * 1000; - dfsclientSlowIoWarningThresholdMs = conf.getLong( - DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY, - DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT); - - keyProviderCacheExpiryMs = conf.getLong( - DFSConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS, - DFSConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT); - } - - public boolean isUseLegacyBlockReaderLocal() { - return useLegacyBlockReaderLocal; - } - - public String getDomainSocketPath() { - return domainSocketPath; - } - - public boolean isShortCircuitLocalReads() { - return shortCircuitLocalReads; - } - - public boolean isDomainSocketDataTraffic() { - return domainSocketDataTraffic; - } - - private DataChecksum.Type getChecksumType(Configuration conf) { - final String checksum = conf.get( - DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, - DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT); - try { - return DataChecksum.Type.valueOf(checksum); - } catch(IllegalArgumentException iae) { - LOG.warn("Bad checksum type: " + checksum + ". Using default " - + DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT); - return DataChecksum.Type.valueOf( - DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT); - } - } - - // Construct a checksum option from conf - private ChecksumOpt getChecksumOptFromConf(Configuration conf) { - DataChecksum.Type type = getChecksumType(conf); - int bytesPerChecksum = conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY, - DFS_BYTES_PER_CHECKSUM_DEFAULT); - return new ChecksumOpt(type, bytesPerChecksum); - } - - // create a DataChecksum with the default option. - private DataChecksum createChecksum() throws IOException { - return createChecksum(null); - } - - private DataChecksum createChecksum(ChecksumOpt userOpt) { - // Fill in any missing field with the default. - ChecksumOpt myOpt = ChecksumOpt.processChecksumOpt( - defaultChecksumOpt, userOpt); - DataChecksum dataChecksum = DataChecksum.newDataChecksum( - myOpt.getChecksumType(), - myOpt.getBytesPerChecksum()); - if (dataChecksum == null) { - throw new HadoopIllegalArgumentException("Invalid checksum type: userOpt=" - + userOpt + ", default=" + defaultChecksumOpt - + ", effective=null"); - } - return dataChecksum; - } - - @VisibleForTesting - public int getBlockWriteLocateFollowingInitialDelayMs() { - return blockWriteLocateFollowingInitialDelayMs; - } - } - - public Conf getConf() { + public DfsClientConf getConf() { return dfsClientConf; } @@ -642,10 +302,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, SpanReceiverHost.getInstance(conf); traceSampler = new SamplerBuilder(TraceUtils.wrapHadoopConf(conf)).build(); // Copy only the required DFSClient configuration - this.dfsClientConf = new Conf(conf); - if (this.dfsClientConf.useLegacyBlockReaderLocal) { - LOG.debug("Using legacy short-circuit local reads."); - } + this.dfsClientConf = new DfsClientConf(conf); this.conf = conf; this.stats = stats; this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class); @@ -654,7 +311,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, this.ugi = UserGroupInformation.getCurrentUser(); this.authority = nameNodeUri == null? "null": nameNodeUri.getAuthority(); - this.clientName = "DFSClient_" + dfsClientConf.taskId + "_" + + this.clientName = "DFSClient_" + dfsClientConf.getTaskId() + "_" + DFSUtil.getRandom().nextInt() + "_" + Thread.currentThread().getId(); int numResponseToDrop = conf.getInt( DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY, @@ -778,31 +435,18 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, return addr; } - /** - * Return the number of times the client should go back to the namenode - * to retrieve block locations when reading. - */ - int getMaxBlockAcquireFailures() { - return dfsClientConf.maxBlockAcquireFailures; - } - /** * Return the timeout that clients should use when writing to datanodes. * @param numNodes the number of nodes in the pipeline. */ int getDatanodeWriteTimeout(int numNodes) { - return (dfsClientConf.confTime > 0) ? - (dfsClientConf.confTime + HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * numNodes) : 0; + final int t = dfsClientConf.getDatanodeSocketWriteTimeout(); + return t > 0? t + HdfsServerConstants.WRITE_TIMEOUT_EXTENSION*numNodes: 0; } int getDatanodeReadTimeout(int numNodes) { - return dfsClientConf.socketTimeout > 0 ? - (HdfsServerConstants.READ_TIMEOUT_EXTENSION * numNodes + - dfsClientConf.socketTimeout) : 0; - } - - int getHdfsTimeout() { - return dfsClientConf.hdfsTimeout; + final int t = dfsClientConf.getSocketTimeout(); + return t > 0? HdfsServerConstants.READ_TIMEOUT_EXTENSION*numNodes + t: 0; } @VisibleForTesting @@ -991,14 +635,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, } } - /** - * Get the default block size for this cluster - * @return the default block size in bytes - */ - public long getDefaultBlockSize() { - return dfsClientConf.defaultBlockSize; - } - /** * @see ClientProtocol#getPreferredBlockSize(String) */ @@ -1211,13 +847,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, namenode.reportBadBlocks(blocks); } - public short getDefaultReplication() { - return dfsClientConf.defaultReplication; - } - public LocatedBlocks getLocatedBlocks(String src, long start) throws IOException { - return getLocatedBlocks(src, start, dfsClientConf.prefetchSize); + return getLocatedBlocks(src, start, dfsClientConf.getPrefetchSize()); } /* @@ -1319,7 +951,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public BlockStorageLocation[] getBlockStorageLocations( List blockLocations) throws IOException, UnsupportedOperationException, InvalidBlockTokenException { - if (!getConf().getHdfsBlocksMetadataEnabled) { + if (!getConf().isHdfsBlocksMetadataEnabled()) { throw new UnsupportedOperationException("Datanode-side support for " + "getVolumeBlockLocations() must also be enabled in the client " + "configuration."); @@ -1356,9 +988,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, try { metadatas = BlockStorageLocationUtil. queryDatanodesForHdfsBlocksMetadata(conf, datanodeBlocks, - getConf().getFileBlockStorageLocationsNumThreads, - getConf().getFileBlockStorageLocationsTimeoutMs, - getConf().connectToDnViaHostname); + getConf().getFileBlockStorageLocationsNumThreads(), + getConf().getFileBlockStorageLocationsTimeoutMs(), + getConf().isConnectToDnViaHostname()); if (LOG.isTraceEnabled()) { LOG.trace("metadata returned: " + Joiner.on("\n").withKeyValueSeparator("=").join(metadatas)); @@ -1512,7 +1144,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public DFSInputStream open(String src) throws IOException, UnresolvedLinkException { - return open(src, dfsClientConf.ioBufferSize, true, null); + return open(src, dfsClientConf.getIoBufferSize(), true, null); } /** @@ -1563,8 +1195,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, */ public OutputStream create(String src, boolean overwrite) throws IOException { - return create(src, overwrite, dfsClientConf.defaultReplication, - dfsClientConf.defaultBlockSize, null); + return create(src, overwrite, dfsClientConf.getDefaultReplication(), + dfsClientConf.getDefaultBlockSize(), null); } /** @@ -1574,8 +1206,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public OutputStream create(String src, boolean overwrite, Progressable progress) throws IOException { - return create(src, overwrite, dfsClientConf.defaultReplication, - dfsClientConf.defaultBlockSize, progress); + return create(src, overwrite, dfsClientConf.getDefaultReplication(), + dfsClientConf.getDefaultBlockSize(), progress); } /** @@ -1596,7 +1228,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public OutputStream create(String src, boolean overwrite, short replication, long blockSize, Progressable progress) throws IOException { return create(src, overwrite, replication, blockSize, progress, - dfsClientConf.ioBufferSize); + dfsClientConf.getIoBufferSize()); } /** @@ -1678,6 +1310,13 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, progress, buffersize, checksumOpt, null); } + private FsPermission applyUMask(FsPermission permission) { + if (permission == null) { + permission = FsPermission.getFileDefault(); + } + return permission.applyUMask(dfsClientConf.getUMask()); + } + /** * Same as {@link #create(String, FsPermission, EnumSet, boolean, short, long, * Progressable, int, ChecksumOpt)} with the addition of favoredNodes that is @@ -1698,10 +1337,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes) throws IOException { checkOpen(); - if (permission == null) { - permission = FsPermission.getFileDefault(); - } - FsPermission masked = permission.applyUMask(dfsClientConf.uMask); + final FsPermission masked = applyUMask(permission); if(LOG.isDebugEnabled()) { LOG.debug(src + ": masked=" + masked); } @@ -1783,8 +1419,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, throws IOException { TraceScope scope = getPathTraceScope("createSymlink", target); try { - FsPermission dirPerm = - FsPermission.getDefault().applyUMask(dfsClientConf.uMask); + final FsPermission dirPerm = applyUMask(null); namenode.createSymlink(target, link, dirPerm, createParent); } catch (RemoteException re) { throw re.unwrapRemoteException(AccessControlException.class, @@ -1828,7 +1463,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, new EnumSetWritable<>(flag, CreateFlag.class)); return DFSOutputStream.newStreamForAppend(this, src, flag, buffersize, progress, blkWithStatus.getLastBlock(), - blkWithStatus.getFileStatus(), dfsClientConf.createChecksum(), + blkWithStatus.getFileStatus(), dfsClientConf.createChecksum(null), favoredNodes); } catch(RemoteException re) { throw re.unwrapRemoteException(AccessControlException.class, @@ -2253,7 +1888,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, final DatanodeInfo[] datanodes = lb.getLocations(); //try each datanode location of the block - final int timeout = 3000 * datanodes.length + dfsClientConf.socketTimeout; + final int timeout = 3000*datanodes.length + dfsClientConf.getSocketTimeout(); boolean done = false; for(int j = 0; !done && j < datanodes.length; j++) { DataOutputStream out = null; @@ -2391,7 +2026,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, Socket sock = null; try { sock = socketFactory.createSocket(); - String dnAddr = dn.getXferAddr(getConf().connectToDnViaHostname); + String dnAddr = dn.getXferAddr(getConf().isConnectToDnViaHostname()); if (LOG.isDebugEnabled()) { LOG.debug("Connecting to datanode " + dnAddr); } @@ -2424,7 +2059,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, */ private Type inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn) throws IOException { - IOStreamPair pair = connectToDN(dn, dfsClientConf.socketTimeout, lb); + IOStreamPair pair = connectToDN(dn, dfsClientConf.getSocketTimeout(), lb); try { DataOutputStream out = new DataOutputStream(new BufferedOutputStream(pair.out, @@ -2979,10 +2614,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, */ public boolean mkdirs(String src, FsPermission permission, boolean createParent) throws IOException { - if (permission == null) { - permission = FsPermission.getDefault(); - } - FsPermission masked = permission.applyUMask(dfsClientConf.uMask); + final FsPermission masked = applyUMask(permission); return primitiveMkdir(src, masked, createParent); } @@ -3004,8 +2636,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, throws IOException { checkOpen(); if (absPermission == null) { - absPermission = - FsPermission.getDefault().applyUMask(dfsClientConf.uMask); + absPermission = applyUMask(null); } if(LOG.isDebugEnabled()) { @@ -3447,14 +3078,13 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, Peer peer = null; boolean success = false; Socket sock = null; + final int socketTimeout = dfsClientConf.getSocketTimeout(); try { sock = socketFactory.createSocket(); - NetUtils.connect(sock, addr, - getRandomLocalInterfaceAddr(), - dfsClientConf.socketTimeout); + NetUtils.connect(sock, addr, getRandomLocalInterfaceAddr(), socketTimeout); peer = TcpPeerServer.peerFromSocketAndKey(saslClient, sock, this, blockToken, datanodeId); - peer.setReadTimeout(dfsClientConf.socketTimeout); + peer.setReadTimeout(socketTimeout); success = true; return peer; } finally { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 41b9d5039b4..dd0f6fed0ed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -52,14 +52,15 @@ import org.apache.hadoop.fs.CanSetReadahead; import org.apache.hadoop.fs.CanUnbuffer; import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.fs.HasEnhancedByteBufferAccess; import org.apache.hadoop.fs.ReadOption; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.UnresolvedLinkException; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; @@ -265,9 +266,10 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, * Grab the open-file info from namenode */ void openInfo() throws IOException, UnresolvedLinkException { + final DfsClientConf conf = dfsClient.getConf(); synchronized(infoLock) { lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength(); - int retriesForLastBlockLength = dfsClient.getConf().retryTimesForGetLastBlockLength; + int retriesForLastBlockLength = conf.getRetryTimesForGetLastBlockLength(); while (retriesForLastBlockLength > 0) { // Getting last block length as -1 is a special case. When cluster // restarts, DNs may not report immediately. At this time partial block @@ -277,7 +279,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, DFSClient.LOG.warn("Last block locations not available. " + "Datanodes might not have reported blocks completely." + " Will retry for " + retriesForLastBlockLength + " times"); - waitFor(dfsClient.getConf().retryIntervalForGetLastBlockLength); + waitFor(conf.getRetryIntervalForGetLastBlockLength()); lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength(); } else { break; @@ -346,13 +348,14 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, assert locatedblock != null : "LocatedBlock cannot be null"; int replicaNotFoundCount = locatedblock.getLocations().length; + final DfsClientConf conf = dfsClient.getConf(); for(DatanodeInfo datanode : locatedblock.getLocations()) { ClientDatanodeProtocol cdp = null; try { cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode, - dfsClient.getConfiguration(), dfsClient.getConf().socketTimeout, - dfsClient.getConf().connectToDnViaHostname, locatedblock); + dfsClient.getConfiguration(), conf.getSocketTimeout(), + conf.isConnectToDnViaHostname(), locatedblock); final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock()); @@ -938,7 +941,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(), deadNodes, ignoredNodes); String blockInfo = block.getBlock() + " file=" + src; - if (failures >= dfsClient.getMaxBlockAcquireFailures()) { + if (failures >= dfsClient.getConf().getMaxBlockAcquireFailures()) { String description = "Could not obtain block: " + blockInfo; DFSClient.LOG.warn(description + errMsg + ". Throwing a BlockMissingException"); @@ -963,7 +966,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, // alleviating the request rate from the server. Similarly the 3rd retry // will wait 6000ms grace period before retry and the waiting window is // expanded to 9000ms. - final int timeWindow = dfsClient.getConf().timeWindow; + final int timeWindow = dfsClient.getConf().getTimeWindow(); double waitTime = timeWindow * failures + // grace period for the last round of attempt timeWindow * (failures + 1) * DFSUtil.getRandom().nextDouble(); // expanding time window for each failure DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec."); @@ -1012,7 +1015,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, ", ignoredNodes = " + ignoredNodes); } final String dnAddr = - chosenNode.getXferAddr(dfsClient.getConf().connectToDnViaHostname); + chosenNode.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname()); if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Connecting to datanode " + dnAddr); } @@ -1706,7 +1709,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, } } ByteBuffer buffer = null; - if (dfsClient.getConf().shortCircuitMmapEnabled) { + if (dfsClient.getConf().getShortCircuitConf().isShortCircuitMmapEnabled()) { buffer = tryReadZeroCopy(maxLength, opts); } if (buffer != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index f6733e36b4a..8cde2740a58 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -33,10 +33,11 @@ import org.apache.hadoop.fs.FSOutputSummer; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.fs.ParentNotDirectoryException; -import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.Syncable; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf; import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -211,7 +212,7 @@ public class DFSOutputStream extends FSOutputSummer this(dfsClient, src, progress, stat, checksum); this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK); - computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum); + computePacketChunkSize(dfsClient.getConf().getWritePacketSize(), bytesPerChecksum); streamer = new DataStreamer(stat, null, dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager); @@ -297,7 +298,7 @@ public class DFSOutputStream extends FSOutputSummer adjustPacketChunkSize(stat); streamer.setPipelineInConstruction(lastBlock); } else { - computePacketChunkSize(dfsClient.getConf().writePacketSize, + computePacketChunkSize(dfsClient.getConf().getWritePacketSize(), bytesPerChecksum); streamer = new DataStreamer(stat, lastBlock != null ? lastBlock.getBlock() : null, dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager); @@ -334,7 +335,8 @@ public class DFSOutputStream extends FSOutputSummer // that expected size of of a packet, then create // smaller size packet. // - computePacketChunkSize(Math.min(dfsClient.getConf().writePacketSize, freeInLastBlock), + computePacketChunkSize( + Math.min(dfsClient.getConf().getWritePacketSize(), freeInLastBlock), bytesPerChecksum); } } @@ -445,7 +447,7 @@ public class DFSOutputStream extends FSOutputSummer if (!streamer.getAppendChunk()) { int psize = Math.min((int)(blockSize- streamer.getBytesCurBlock()), - dfsClient.getConf().writePacketSize); + dfsClient.getConf().getWritePacketSize()); computePacketChunkSize(psize, bytesPerChecksum); } } @@ -717,7 +719,7 @@ public class DFSOutputStream extends FSOutputSummer return; } streamer.setLastException(new IOException("Lease timeout of " - + (dfsClient.getHdfsTimeout()/1000) + " seconds expired.")); + + (dfsClient.getConf().getHdfsTimeout()/1000) + " seconds expired.")); closeThreads(true); dfsClient.endFileLease(fileId); } @@ -806,15 +808,15 @@ public class DFSOutputStream extends FSOutputSummer // be called during unit tests protected void completeFile(ExtendedBlock last) throws IOException { long localstart = Time.monotonicNow(); - long sleeptime = dfsClient.getConf(). - blockWriteLocateFollowingInitialDelayMs; + final DfsClientConf conf = dfsClient.getConf(); + long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs(); boolean fileComplete = false; - int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry; + int retries = conf.getNumBlockWriteLocateFollowingRetry(); while (!fileComplete) { fileComplete = dfsClient.namenode.complete(src, dfsClient.clientName, last, fileId); if (!fileComplete) { - final int hdfsTimeout = dfsClient.getHdfsTimeout(); + final int hdfsTimeout = conf.getHdfsTimeout(); if (!dfsClient.clientRunning || (hdfsTimeout > 0 && localstart + hdfsTimeout < Time.monotonicNow())) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 0c6b4a38321..405f775e4a2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -43,6 +43,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -84,6 +85,7 @@ import org.apache.htrace.Span; import org.apache.htrace.Trace; import org.apache.htrace.TraceInfo; import org.apache.htrace.TraceScope; + import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -123,15 +125,15 @@ class DataStreamer extends Daemon { */ static Socket createSocketForPipeline(final DatanodeInfo first, final int length, final DFSClient client) throws IOException { - final String dnAddr = first.getXferAddr( - client.getConf().connectToDnViaHostname); + final DfsClientConf conf = client.getConf(); + final String dnAddr = first.getXferAddr(conf.isConnectToDnViaHostname()); if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Connecting to datanode " + dnAddr); } final InetSocketAddress isa = NetUtils.createSocketAddr(dnAddr); final Socket sock = client.socketFactory.createSocket(); final int timeout = client.getDatanodeReadTimeout(length); - NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), client.getConf().socketTimeout); + NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), conf.getSocketTimeout()); sock.setSoTimeout(timeout); sock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); if(DFSClient.LOG.isDebugEnabled()) { @@ -244,7 +246,7 @@ class DataStreamer extends Daemon { this.byteArrayManager = byteArrayManage; isLazyPersistFile = isLazyPersist(stat); this.dfsclientSlowLogThresholdMs = - dfsClient.getConf().dfsclientSlowIoWarningThresholdMs; + dfsClient.getConf().getSlowIoWarningThresholdMs(); excludedNodes = initExcludedNodes(); } @@ -368,6 +370,7 @@ class DataStreamer extends Daemon { doSleep = processDatanodeError(); } + final int halfSocketTimeout = dfsClient.getConf().getSocketTimeout()/2; synchronized (dataQueue) { // wait for a packet to be sent. long now = Time.monotonicNow(); @@ -375,8 +378,8 @@ class DataStreamer extends Daemon { && dataQueue.size() == 0 && (stage != BlockConstructionStage.DATA_STREAMING || stage == BlockConstructionStage.DATA_STREAMING && - now - lastPacket < dfsClient.getConf().socketTimeout/2)) || doSleep ) { - long timeout = dfsClient.getConf().socketTimeout/2 - (now-lastPacket); + now - lastPacket < halfSocketTimeout)) || doSleep ) { + long timeout = halfSocketTimeout - (now-lastPacket); timeout = timeout <= 0 ? 1000 : timeout; timeout = (stage == BlockConstructionStage.DATA_STREAMING)? timeout : 1000; @@ -627,7 +630,7 @@ class DataStreamer extends Daemon { boolean firstWait = true; try { while (!streamerClosed && dataQueue.size() + ackQueue.size() > - dfsClient.getConf().writeMaxPackets) { + dfsClient.getConf().getWriteMaxPackets()) { if (firstWait) { Span span = Trace.currentSpan(); if (span != null) { @@ -842,7 +845,7 @@ class DataStreamer extends Daemon { // the local node or the only one in the pipeline. if (PipelineAck.isRestartOOBStatus(reply) && shouldWaitForRestart(i)) { - restartDeadline = dfsClient.getConf().datanodeRestartTimeout + restartDeadline = dfsClient.getConf().getDatanodeRestartTimeout() + Time.monotonicNow(); setRestartingNodeIndex(i); String message = "A datanode is restarting: " + targets[i]; @@ -1158,7 +1161,7 @@ class DataStreamer extends Daemon { // 4 seconds or the configured deadline period, whichever is shorter. // This is the retry interval and recovery will be retried in this // interval until timeout or success. - long delay = Math.min(dfsClient.getConf().datanodeRestartTimeout, + long delay = Math.min(dfsClient.getConf().getDatanodeRestartTimeout(), 4000L); try { Thread.sleep(delay); @@ -1311,7 +1314,7 @@ class DataStreamer extends Daemon { LocatedBlock lb = null; DatanodeInfo[] nodes = null; StorageType[] storageTypes = null; - int count = dfsClient.getConf().nBlockWriteRetry; + int count = dfsClient.getConf().getNumBlockWriteRetry(); boolean success = false; ExtendedBlock oldBlock = block; do { @@ -1471,7 +1474,7 @@ class DataStreamer extends Daemon { } // Check whether there is a restart worth waiting for. if (checkRestart && shouldWaitForRestart(errorIndex)) { - restartDeadline = dfsClient.getConf().datanodeRestartTimeout + restartDeadline = dfsClient.getConf().getDatanodeRestartTimeout() + Time.monotonicNow(); restartingNodeIndex.set(errorIndex); errorIndex = -1; @@ -1524,9 +1527,9 @@ class DataStreamer extends Daemon { protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes) throws IOException { - int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry; - long sleeptime = dfsClient.getConf(). - blockWriteLocateFollowingInitialDelayMs; + final DfsClientConf conf = dfsClient.getConf(); + int retries = conf.getNumBlockWriteLocateFollowingRetry(); + long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs(); while (true) { long localstart = Time.monotonicNow(); while (true) { @@ -1674,7 +1677,8 @@ class DataStreamer extends Daemon { private LoadingCache initExcludedNodes() { return CacheBuilder.newBuilder().expireAfterWrite( - dfsClient.getConf().excludedNodesCacheExpiry, TimeUnit.MILLISECONDS) + dfsClient.getConf().getExcludedNodesCacheExpiry(), + TimeUnit.MILLISECONDS) .removalListener(new RemovalListener() { @Override public void onRemoval( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 3edab489b3b..21f5107e753 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -160,12 +160,12 @@ public class DistributedFileSystem extends FileSystem { @Override public long getDefaultBlockSize() { - return dfs.getDefaultBlockSize(); + return dfs.getConf().getDefaultBlockSize(); } @Override public short getDefaultReplication() { - return dfs.getDefaultReplication(); + return dfs.getConf().getDefaultReplication(); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java index 3e0abcecf89..511bddb4c9b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java @@ -225,8 +225,9 @@ class LeaseRenewer { dfsclients.add(dfsc); //update renewal time - if (dfsc.getHdfsTimeout() > 0) { - final long half = dfsc.getHdfsTimeout()/2; + final int hdfsTimeout = dfsc.getConf().getHdfsTimeout(); + if (hdfsTimeout > 0) { + final long half = hdfsTimeout/2; if (half < renewal) { this.renewal = half; } @@ -368,14 +369,12 @@ class LeaseRenewer { } //update renewal time - if (renewal == dfsc.getHdfsTimeout()/2) { + if (renewal == dfsc.getConf().getHdfsTimeout()/2) { long min = HdfsConstants.LEASE_SOFTLIMIT_PERIOD; for(DFSClient c : dfsclients) { - if (c.getHdfsTimeout() > 0) { - final long timeout = c.getHdfsTimeout(); - if (timeout < min) { - min = timeout; - } + final int timeout = c.getConf().getHdfsTimeout(); + if (timeout > 0 && timeout < min) { + min = timeout; } } renewal = min/2; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java index ec2223f7eb6..5a929fc14c7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java @@ -40,8 +40,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSClient.Conf; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.HdfsConstants; @@ -178,12 +178,12 @@ public class NameNodeProxies { UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth); } else { // HA case - Conf config = new Conf(conf); + DfsClientConf config = new DfsClientConf(conf); T proxy = (T) RetryProxy.create(xface, failoverProxyProvider, RetryPolicies.failoverOnNetworkException( - RetryPolicies.TRY_ONCE_THEN_FAIL, config.maxFailoverAttempts, - config.maxRetryAttempts, config.failoverSleepBaseMillis, - config.failoverSleepMaxMillis)); + RetryPolicies.TRY_ONCE_THEN_FAIL, config.getMaxFailoverAttempts(), + config.getMaxRetryAttempts(), config.getFailoverSleepBaseMillis(), + config.getFailoverSleepMaxMillis())); Text dtService; if (failoverProxyProvider.useLogicalURI()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java new file mode 100644 index 00000000000..e781b1636cb --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java @@ -0,0 +1,738 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.client.impl; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.Options.ChecksumOpt; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.BlockReaderFactory; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; +import org.apache.hadoop.hdfs.util.ByteArrayManager; +import org.apache.hadoop.ipc.Client; +import org.apache.hadoop.util.DataChecksum; + +import com.google.common.annotations.VisibleForTesting; + +/** + * DFSClient configuration + */ +public class DfsClientConf { + + private final int hdfsTimeout; // timeout value for a DFS operation. + + private final int maxFailoverAttempts; + private final int maxRetryAttempts; + private final int failoverSleepBaseMillis; + private final int failoverSleepMaxMillis; + private final int maxBlockAcquireFailures; + private final int datanodeSocketWriteTimeout; + private final int ioBufferSize; + private final ChecksumOpt defaultChecksumOpt; + private final int writePacketSize; + private final int writeMaxPackets; + private final ByteArrayManager.Conf writeByteArrayManagerConf; + private final int socketTimeout; + private final long excludedNodesCacheExpiry; + /** Wait time window (in msec) if BlockMissingException is caught */ + private final int timeWindow; + private final int numCachedConnRetry; + private final int numBlockWriteRetry; + private final int numBlockWriteLocateFollowingRetry; + private final int blockWriteLocateFollowingInitialDelayMs; + private final long defaultBlockSize; + private final long prefetchSize; + private final short defaultReplication; + private final String taskId; + private final FsPermission uMask; + private final boolean connectToDnViaHostname; + private final boolean hdfsBlocksMetadataEnabled; + private final int fileBlockStorageLocationsNumThreads; + private final int fileBlockStorageLocationsTimeoutMs; + private final int retryTimesForGetLastBlockLength; + private final int retryIntervalForGetLastBlockLength; + private final long datanodeRestartTimeout; + private final long slowIoWarningThresholdMs; + + private final ShortCircuitConf shortCircuitConf; + + public DfsClientConf(Configuration conf) { + // The hdfsTimeout is currently the same as the ipc timeout + hdfsTimeout = Client.getTimeout(conf); + + maxFailoverAttempts = conf.getInt( + DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, + DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT); + maxRetryAttempts = conf.getInt( + HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_KEY, + HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_DEFAULT); + failoverSleepBaseMillis = conf.getInt( + DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY, + DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT); + failoverSleepMaxMillis = conf.getInt( + DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY, + DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT); + + maxBlockAcquireFailures = conf.getInt( + DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY, + DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT); + datanodeSocketWriteTimeout = conf.getInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, + HdfsServerConstants.WRITE_TIMEOUT); + ioBufferSize = conf.getInt( + CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, + CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT); + defaultChecksumOpt = getChecksumOptFromConf(conf); + socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, + HdfsServerConstants.READ_TIMEOUT); + /** dfs.write.packet.size is an internal config variable */ + writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, + DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT); + writeMaxPackets = conf.getInt( + DFSConfigKeys.DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_KEY, + DFSConfigKeys.DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_DEFAULT); + + final boolean byteArrayManagerEnabled = conf.getBoolean( + DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_ENABLED_KEY, + DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_ENABLED_DEFAULT); + if (!byteArrayManagerEnabled) { + writeByteArrayManagerConf = null; + } else { + final int countThreshold = conf.getInt( + DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_KEY, + DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_DEFAULT); + final int countLimit = conf.getInt( + DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_LIMIT_KEY, + DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_LIMIT_DEFAULT); + final long countResetTimePeriodMs = conf.getLong( + DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_KEY, + DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_DEFAULT); + writeByteArrayManagerConf = new ByteArrayManager.Conf( + countThreshold, countLimit, countResetTimePeriodMs); + } + + defaultBlockSize = conf.getLongBytes(DFS_BLOCK_SIZE_KEY, + DFS_BLOCK_SIZE_DEFAULT); + defaultReplication = (short) conf.getInt( + DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT); + taskId = conf.get("mapreduce.task.attempt.id", "NONMAPREDUCE"); + excludedNodesCacheExpiry = conf.getLong( + DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL, + DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT); + prefetchSize = conf.getLong(DFS_CLIENT_READ_PREFETCH_SIZE_KEY, + 10 * defaultBlockSize); + timeWindow = conf.getInt( + HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, + HdfsClientConfigKeys.Retry.WINDOW_BASE_DEFAULT); + numCachedConnRetry = conf.getInt(DFS_CLIENT_CACHED_CONN_RETRY_KEY, + DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT); + numBlockWriteRetry = conf.getInt(DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY, + DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT); + numBlockWriteLocateFollowingRetry = conf.getInt( + DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, + DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT); + blockWriteLocateFollowingInitialDelayMs = conf.getInt( + DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_KEY, + DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_DEFAULT); + uMask = FsPermission.getUMask(conf); + connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, + DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); + hdfsBlocksMetadataEnabled = conf.getBoolean( + DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, + DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT); + fileBlockStorageLocationsNumThreads = conf.getInt( + DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS, + DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS_DEFAULT); + fileBlockStorageLocationsTimeoutMs = conf.getInt( + DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS, + DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS_DEFAULT); + retryTimesForGetLastBlockLength = conf.getInt( + HdfsClientConfigKeys.Retry.TIMES_GET_LAST_BLOCK_LENGTH_KEY, + HdfsClientConfigKeys.Retry.TIMES_GET_LAST_BLOCK_LENGTH_DEFAULT); + retryIntervalForGetLastBlockLength = conf.getInt( + HdfsClientConfigKeys.Retry.INTERVAL_GET_LAST_BLOCK_LENGTH_KEY, + HdfsClientConfigKeys.Retry.INTERVAL_GET_LAST_BLOCK_LENGTH_DEFAULT); + + + datanodeRestartTimeout = conf.getLong( + DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY, + DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT) * 1000; + slowIoWarningThresholdMs = conf.getLong( + DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY, + DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT); + + shortCircuitConf = new ShortCircuitConf(conf); + } + + private DataChecksum.Type getChecksumType(Configuration conf) { + final String checksum = conf.get( + DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, + DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT); + try { + return DataChecksum.Type.valueOf(checksum); + } catch(IllegalArgumentException iae) { + DFSClient.LOG.warn("Bad checksum type: " + checksum + ". Using default " + + DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT); + return DataChecksum.Type.valueOf( + DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT); + } + } + + // Construct a checksum option from conf + private ChecksumOpt getChecksumOptFromConf(Configuration conf) { + DataChecksum.Type type = getChecksumType(conf); + int bytesPerChecksum = conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY, + DFS_BYTES_PER_CHECKSUM_DEFAULT); + return new ChecksumOpt(type, bytesPerChecksum); + } + + /** create a DataChecksum with the given option. */ + public DataChecksum createChecksum(ChecksumOpt userOpt) { + // Fill in any missing field with the default. + ChecksumOpt opt = ChecksumOpt.processChecksumOpt( + defaultChecksumOpt, userOpt); + DataChecksum dataChecksum = DataChecksum.newDataChecksum( + opt.getChecksumType(), + opt.getBytesPerChecksum()); + if (dataChecksum == null) { + throw new HadoopIllegalArgumentException("Invalid checksum type: userOpt=" + + userOpt + ", default=" + defaultChecksumOpt + + ", effective=null"); + } + return dataChecksum; + } + + @VisibleForTesting + public int getBlockWriteLocateFollowingInitialDelayMs() { + return blockWriteLocateFollowingInitialDelayMs; + } + + /** + * @return the hdfsTimeout + */ + public int getHdfsTimeout() { + return hdfsTimeout; + } + + /** + * @return the maxFailoverAttempts + */ + public int getMaxFailoverAttempts() { + return maxFailoverAttempts; + } + + /** + * @return the maxRetryAttempts + */ + public int getMaxRetryAttempts() { + return maxRetryAttempts; + } + + /** + * @return the failoverSleepBaseMillis + */ + public int getFailoverSleepBaseMillis() { + return failoverSleepBaseMillis; + } + + /** + * @return the failoverSleepMaxMillis + */ + public int getFailoverSleepMaxMillis() { + return failoverSleepMaxMillis; + } + + /** + * @return the maxBlockAcquireFailures + */ + public int getMaxBlockAcquireFailures() { + return maxBlockAcquireFailures; + } + + /** + * @return the datanodeSocketWriteTimeout + */ + public int getDatanodeSocketWriteTimeout() { + return datanodeSocketWriteTimeout; + } + + /** + * @return the ioBufferSize + */ + public int getIoBufferSize() { + return ioBufferSize; + } + + /** + * @return the defaultChecksumOpt + */ + public ChecksumOpt getDefaultChecksumOpt() { + return defaultChecksumOpt; + } + + /** + * @return the writePacketSize + */ + public int getWritePacketSize() { + return writePacketSize; + } + + /** + * @return the writeMaxPackets + */ + public int getWriteMaxPackets() { + return writeMaxPackets; + } + + /** + * @return the writeByteArrayManagerConf + */ + public ByteArrayManager.Conf getWriteByteArrayManagerConf() { + return writeByteArrayManagerConf; + } + + /** + * @return the socketTimeout + */ + public int getSocketTimeout() { + return socketTimeout; + } + + /** + * @return the excludedNodesCacheExpiry + */ + public long getExcludedNodesCacheExpiry() { + return excludedNodesCacheExpiry; + } + + /** + * @return the timeWindow + */ + public int getTimeWindow() { + return timeWindow; + } + + /** + * @return the numCachedConnRetry + */ + public int getNumCachedConnRetry() { + return numCachedConnRetry; + } + + /** + * @return the numBlockWriteRetry + */ + public int getNumBlockWriteRetry() { + return numBlockWriteRetry; + } + + /** + * @return the numBlockWriteLocateFollowingRetry + */ + public int getNumBlockWriteLocateFollowingRetry() { + return numBlockWriteLocateFollowingRetry; + } + + /** + * @return the defaultBlockSize + */ + public long getDefaultBlockSize() { + return defaultBlockSize; + } + + /** + * @return the prefetchSize + */ + public long getPrefetchSize() { + return prefetchSize; + } + + /** + * @return the defaultReplication + */ + public short getDefaultReplication() { + return defaultReplication; + } + + /** + * @return the taskId + */ + public String getTaskId() { + return taskId; + } + + /** + * @return the uMask + */ + public FsPermission getUMask() { + return uMask; + } + + /** + * @return the connectToDnViaHostname + */ + public boolean isConnectToDnViaHostname() { + return connectToDnViaHostname; + } + + /** + * @return the hdfsBlocksMetadataEnabled + */ + public boolean isHdfsBlocksMetadataEnabled() { + return hdfsBlocksMetadataEnabled; + } + + /** + * @return the fileBlockStorageLocationsNumThreads + */ + public int getFileBlockStorageLocationsNumThreads() { + return fileBlockStorageLocationsNumThreads; + } + + /** + * @return the getFileBlockStorageLocationsTimeoutMs + */ + public int getFileBlockStorageLocationsTimeoutMs() { + return fileBlockStorageLocationsTimeoutMs; + } + + /** + * @return the retryTimesForGetLastBlockLength + */ + public int getRetryTimesForGetLastBlockLength() { + return retryTimesForGetLastBlockLength; + } + + /** + * @return the retryIntervalForGetLastBlockLength + */ + public int getRetryIntervalForGetLastBlockLength() { + return retryIntervalForGetLastBlockLength; + } + + /** + * @return the datanodeRestartTimeout + */ + public long getDatanodeRestartTimeout() { + return datanodeRestartTimeout; + } + + /** + * @return the slowIoWarningThresholdMs + */ + public long getSlowIoWarningThresholdMs() { + return slowIoWarningThresholdMs; + } + + /** + * @return the shortCircuitConf + */ + public ShortCircuitConf getShortCircuitConf() { + return shortCircuitConf; + } + + public static class ShortCircuitConf { + private static final Log LOG = LogFactory.getLog(ShortCircuitConf.class); + + private final int socketCacheCapacity; + private final long socketCacheExpiry; + + private final boolean useLegacyBlockReader; + private final boolean useLegacyBlockReaderLocal; + private final String domainSocketPath; + private final boolean skipShortCircuitChecksums; + + private final int shortCircuitBufferSize; + private final boolean shortCircuitLocalReads; + private final boolean domainSocketDataTraffic; + private final int shortCircuitStreamsCacheSize; + private final long shortCircuitStreamsCacheExpiryMs; + private final int shortCircuitSharedMemoryWatcherInterruptCheckMs; + + private final boolean shortCircuitMmapEnabled; + private final int shortCircuitMmapCacheSize; + private final long shortCircuitMmapCacheExpiryMs; + private final long shortCircuitMmapCacheRetryTimeout; + private final long shortCircuitCacheStaleThresholdMs; + + private final long keyProviderCacheExpiryMs; + + @VisibleForTesting + public BlockReaderFactory.FailureInjector brfFailureInjector = + new BlockReaderFactory.FailureInjector(); + + public ShortCircuitConf(Configuration conf) { + socketCacheCapacity = conf.getInt( + DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, + DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT); + socketCacheExpiry = conf.getLong( + DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY, + DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT); + + useLegacyBlockReader = conf.getBoolean( + DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER, + DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT); + useLegacyBlockReaderLocal = conf.getBoolean( + DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, + DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT); + shortCircuitLocalReads = conf.getBoolean( + DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, + DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT); + domainSocketDataTraffic = conf.getBoolean( + DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, + DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT); + domainSocketPath = conf.getTrimmed( + DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, + DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT); + + if (LOG.isDebugEnabled()) { + LOG.debug(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL + + " = " + useLegacyBlockReaderLocal); + LOG.debug(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY + + " = " + shortCircuitLocalReads); + LOG.debug(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC + + " = " + domainSocketDataTraffic); + LOG.debug(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY + + " = " + domainSocketPath); + } + + skipShortCircuitChecksums = conf.getBoolean( + DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, + DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT); + shortCircuitBufferSize = conf.getInt( + DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY, + DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT); + shortCircuitStreamsCacheSize = conf.getInt( + DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY, + DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_DEFAULT); + shortCircuitStreamsCacheExpiryMs = conf.getLong( + DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY, + DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT); + shortCircuitMmapEnabled = conf.getBoolean( + DFSConfigKeys.DFS_CLIENT_MMAP_ENABLED, + DFSConfigKeys.DFS_CLIENT_MMAP_ENABLED_DEFAULT); + shortCircuitMmapCacheSize = conf.getInt( + DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE, + DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT); + shortCircuitMmapCacheExpiryMs = conf.getLong( + DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS, + DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT); + shortCircuitMmapCacheRetryTimeout = conf.getLong( + DFSConfigKeys.DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS, + DFSConfigKeys.DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS_DEFAULT); + shortCircuitCacheStaleThresholdMs = conf.getLong( + DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS, + DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS_DEFAULT); + shortCircuitSharedMemoryWatcherInterruptCheckMs = conf.getInt( + DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, + DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT); + + keyProviderCacheExpiryMs = conf.getLong( + DFSConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS, + DFSConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT); + } + + /** + * @return the socketCacheCapacity + */ + public int getSocketCacheCapacity() { + return socketCacheCapacity; + } + + /** + * @return the socketCacheExpiry + */ + public long getSocketCacheExpiry() { + return socketCacheExpiry; + } + + public boolean isUseLegacyBlockReaderLocal() { + return useLegacyBlockReaderLocal; + } + + public String getDomainSocketPath() { + return domainSocketPath; + } + + public boolean isShortCircuitLocalReads() { + return shortCircuitLocalReads; + } + + public boolean isDomainSocketDataTraffic() { + return domainSocketDataTraffic; + } + /** + * @return the useLegacyBlockReader + */ + public boolean isUseLegacyBlockReader() { + return useLegacyBlockReader; + } + + /** + * @return the skipShortCircuitChecksums + */ + public boolean isSkipShortCircuitChecksums() { + return skipShortCircuitChecksums; + } + + /** + * @return the shortCircuitBufferSize + */ + public int getShortCircuitBufferSize() { + return shortCircuitBufferSize; + } + + /** + * @return the shortCircuitStreamsCacheSize + */ + public int getShortCircuitStreamsCacheSize() { + return shortCircuitStreamsCacheSize; + } + + /** + * @return the shortCircuitStreamsCacheExpiryMs + */ + public long getShortCircuitStreamsCacheExpiryMs() { + return shortCircuitStreamsCacheExpiryMs; + } + + /** + * @return the shortCircuitSharedMemoryWatcherInterruptCheckMs + */ + public int getShortCircuitSharedMemoryWatcherInterruptCheckMs() { + return shortCircuitSharedMemoryWatcherInterruptCheckMs; + } + + /** + * @return the shortCircuitMmapEnabled + */ + public boolean isShortCircuitMmapEnabled() { + return shortCircuitMmapEnabled; + } + + /** + * @return the shortCircuitMmapCacheSize + */ + public int getShortCircuitMmapCacheSize() { + return shortCircuitMmapCacheSize; + } + + /** + * @return the shortCircuitMmapCacheExpiryMs + */ + public long getShortCircuitMmapCacheExpiryMs() { + return shortCircuitMmapCacheExpiryMs; + } + + /** + * @return the shortCircuitMmapCacheRetryTimeout + */ + public long getShortCircuitMmapCacheRetryTimeout() { + return shortCircuitMmapCacheRetryTimeout; + } + + /** + * @return the shortCircuitCacheStaleThresholdMs + */ + public long getShortCircuitCacheStaleThresholdMs() { + return shortCircuitCacheStaleThresholdMs; + } + + /** + * @return the keyProviderCacheExpiryMs + */ + public long getKeyProviderCacheExpiryMs() { + return keyProviderCacheExpiryMs; + } + + public String confAsString() { + StringBuilder builder = new StringBuilder(); + builder.append("shortCircuitStreamsCacheSize = "). + append(shortCircuitStreamsCacheSize). + append(", shortCircuitStreamsCacheExpiryMs = "). + append(shortCircuitStreamsCacheExpiryMs). + append(", shortCircuitMmapCacheSize = "). + append(shortCircuitMmapCacheSize). + append(", shortCircuitMmapCacheExpiryMs = "). + append(shortCircuitMmapCacheExpiryMs). + append(", shortCircuitMmapCacheRetryTimeout = "). + append(shortCircuitMmapCacheRetryTimeout). + append(", shortCircuitCacheStaleThresholdMs = "). + append(shortCircuitCacheStaleThresholdMs). + append(", socketCacheCapacity = "). + append(socketCacheCapacity). + append(", socketCacheExpiry = "). + append(socketCacheExpiry). + append(", shortCircuitLocalReads = "). + append(shortCircuitLocalReads). + append(", useLegacyBlockReaderLocal = "). + append(useLegacyBlockReaderLocal). + append(", domainSocketDataTraffic = "). + append(domainSocketDataTraffic). + append(", shortCircuitSharedMemoryWatcherInterruptCheckMs = "). + append(shortCircuitSharedMemoryWatcherInterruptCheckMs). + append(", keyProviderCacheExpiryMs = "). + append(keyProviderCacheExpiryMs); + + return builder.toString(); + } + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java index 5fd31a920cc..fadb2f9159d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java @@ -26,14 +26,14 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.hdfs.DFSClient; -import org.apache.hadoop.hdfs.DFSClient.Conf; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf; import org.apache.hadoop.net.unix.DomainSocket; +import org.apache.hadoop.util.PerformanceAdvisory; import com.google.common.base.Preconditions; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; -import org.apache.hadoop.util.PerformanceAdvisory; public class DomainSocketFactory { private static final Log LOG = LogFactory.getLog(DomainSocketFactory.class); @@ -95,7 +95,7 @@ public class DomainSocketFactory { .expireAfterWrite(10, TimeUnit.MINUTES) .build(); - public DomainSocketFactory(Conf conf) { + public DomainSocketFactory(ShortCircuitConf conf) { final String feature; if (conf.isShortCircuitLocalReads() && (!conf.isUseLegacyBlockReaderLocal())) { feature = "The short-circuit local reads feature"; @@ -129,7 +129,7 @@ public class DomainSocketFactory { * * @return Information about the socket path. */ - public PathInfo getPathInfo(InetSocketAddress addr, DFSClient.Conf conf) { + public PathInfo getPathInfo(InetSocketAddress addr, ShortCircuitConf conf) { // If there is no domain socket path configured, we can't use domain // sockets. if (conf.getDomainSocketPath().isEmpty()) return PathInfo.NOT_CONFIGURED; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java index d1ec3b8e0d7..27a9ef22819 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java @@ -40,6 +40,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.ExtendedBlockId; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf; import org.apache.hadoop.hdfs.net.DomainPeer; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; @@ -359,6 +360,17 @@ public class ShortCircuitCache implements Closeable { DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT)); } + public static ShortCircuitCache fromConf(ShortCircuitConf conf) { + return new ShortCircuitCache( + conf.getShortCircuitStreamsCacheSize(), + conf.getShortCircuitStreamsCacheExpiryMs(), + conf.getShortCircuitMmapCacheSize(), + conf.getShortCircuitMmapCacheExpiryMs(), + conf.getShortCircuitMmapCacheRetryTimeout(), + conf.getShortCircuitCacheStaleThresholdMs(), + conf.getShortCircuitSharedMemoryWatcherInterruptCheckMs()); + } + public ShortCircuitCache(int maxTotalSize, long maxNonMmappedEvictableLifespanMs, int maxEvictableMmapedSize, long maxEvictableMmapedLifespanMs, long mmapRetryTimeoutMs, long staleThresholdMs, int shmInterruptCheckMs) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java index 296c8d2dfbd..6d644ae48b6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java @@ -42,7 +42,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hdfs.BlockReaderTestUtil; import org.apache.hadoop.hdfs.ClientContext; -import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -50,6 +49,7 @@ import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.client.HdfsDataInputStream; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -359,7 +359,7 @@ public class TestEnhancedByteBufferAccess { fsIn.close(); fsIn = fs.open(TEST_PATH); final ShortCircuitCache cache = ClientContext.get( - CONTEXT, new DFSClient.Conf(conf)). getShortCircuitCache(); + CONTEXT, new DfsClientConf(conf)). getShortCircuitCache(); cache.accept(new CountingVisitor(0, 5, 5, 0)); results[0] = fsIn.read(null, BLOCK_SIZE, EnumSet.of(ReadOption.SKIP_CHECKSUMS)); @@ -662,7 +662,7 @@ public class TestEnhancedByteBufferAccess { final ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fs, TEST_PATH); final ShortCircuitCache cache = ClientContext.get( - CONTEXT, new DFSClient.Conf(conf)). getShortCircuitCache(); + CONTEXT, new DfsClientConf(conf)). getShortCircuitCache(); waitForReplicaAnchorStatus(cache, firstBlock, true, true, 1); // Uncache the replica fs.removeCacheDirective(directiveId); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java index 29c32f53896..ab3515e8ef3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.client.HdfsDataInputStream; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache; @@ -187,7 +188,7 @@ public class TestBlockReaderLocal { Time.now(), shm.allocAndRegisterSlot( ExtendedBlockId.fromExtendedBlock(block))); blockReaderLocal = new BlockReaderLocal.Builder( - new DFSClient.Conf(conf)). + new DfsClientConf.ShortCircuitConf(conf)). setFilename(TEST_PATH.getName()). setBlock(block). setShortCircuitReplica(replica). diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java index 5d95a8b79f7..1c55d5dbc17 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java @@ -300,7 +300,7 @@ public class TestDFSClientRetries { NamenodeProtocols preSpyNN = cluster.getNameNodeRpc(); NamenodeProtocols spyNN = spy(preSpyNN); DFSClient client = new DFSClient(null, spyNN, conf, null); - int maxBlockAcquires = client.getMaxBlockAcquireFailures(); + int maxBlockAcquires = client.getConf().getMaxBlockAcquireFailures(); assertTrue(maxBlockAcquires > 0); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java index a410e74b13e..478f7e5447f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; @@ -113,7 +114,7 @@ public class TestDFSOutputStream { @Test public void testCongestionBackoff() throws IOException { - DFSClient.Conf dfsClientConf = mock(DFSClient.Conf.class); + DfsClientConf dfsClientConf = mock(DfsClientConf.class); DFSClient client = mock(DFSClient.class); when(client.getConf()).thenReturn(dfsClientConf); client.clientRunning = true; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRenewer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRenewer.java index 11cbcad5596..f091db74300 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRenewer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRenewer.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertSame; import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Time; @@ -59,13 +60,13 @@ public class TestLeaseRenewer { } private DFSClient createMockClient() { + final DfsClientConf mockConf = Mockito.mock(DfsClientConf.class); + Mockito.doReturn((int)FAST_GRACE_PERIOD).when(mockConf).getHdfsTimeout(); + DFSClient mock = Mockito.mock(DFSClient.class); - Mockito.doReturn(true) - .when(mock).isClientRunning(); - Mockito.doReturn((int)FAST_GRACE_PERIOD) - .when(mock).getHdfsTimeout(); - Mockito.doReturn("myclient") - .when(mock).getClientName(); + Mockito.doReturn(true).when(mock).isClientRunning(); + Mockito.doReturn(mockConf).when(mock).getConf(); + Mockito.doReturn("myclient").when(mock).getClientName(); return mock; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java index 23e2a7a2f95..05698ec8d97 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java @@ -32,7 +32,6 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.util.Time; import org.apache.log4j.Level; import org.apache.log4j.LogManager; -import org.junit.Assume; import org.junit.Ignore; import org.junit.Test; @@ -325,7 +324,7 @@ public class TestParallelReadUtil { testInfo.filepath = new Path("/TestParallelRead.dat." + i); testInfo.authenticData = util.writeFile(testInfo.filepath, FILE_SIZE_K); testInfo.dis = dfsClient.open(testInfo.filepath.toString(), - dfsClient.getConf().ioBufferSize, verifyChecksums); + dfsClient.getConf().getIoBufferSize(), verifyChecksums); for (int j = 0; j < nWorkerEach; ++j) { workers[nWorkers++] = new ReadWorker(testInfo, nWorkers, helper); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java index 84e5c826e7a..c280027781c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.RemotePeerFactory; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.TcpPeerServer; import org.apache.hadoop.hdfs.protocol.DatanodeID; @@ -146,7 +147,7 @@ public class TestBlockTokenWithDFS { DatanodeInfo[] nodes = lblock.getLocations(); targetAddr = NetUtils.createSocketAddr(nodes[0].getXferAddr()); - blockReader = new BlockReaderFactory(new DFSClient.Conf(conf)). + blockReader = new BlockReaderFactory(new DfsClientConf(conf)). setFileName(BlockReaderFactory.getFileName(targetAddr, "test-blockpoolid", block.getBlockId())). setBlock(block). diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java index 41e8d7b45bb..0a90947d4df 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java @@ -40,12 +40,12 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.BlockReader; import org.apache.hadoop.hdfs.BlockReaderFactory; import org.apache.hadoop.hdfs.ClientContext; -import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.RemotePeerFactory; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.TcpPeerServer; import org.apache.hadoop.hdfs.protocol.Block; @@ -405,7 +405,7 @@ public class TestDataNodeVolumeFailure { targetAddr = NetUtils.createSocketAddr(datanode.getXferAddr()); - BlockReader blockReader = new BlockReaderFactory(new DFSClient.Conf(conf)). + BlockReader blockReader = new BlockReaderFactory(new DfsClientConf(conf)). setInetSocketAddress(targetAddr). setBlock(block). setFileName(BlockReaderFactory.getFileName(targetAddr, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java index 7d26dee610e..e38b97b46b1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java @@ -657,7 +657,7 @@ public class TestShortCircuitCache { // The second read should fail, and we should only have 1 segment and 1 slot // left. - fs.getClient().getConf().brfFailureInjector = + fs.getClient().getConf().getShortCircuitConf().brfFailureInjector = new TestCleanupFailureInjector(); try { DFSTestUtil.readFileBuffer(fs, TEST_PATH2);