diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index d6ffe9e2f18..4470a3901e4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -178,6 +178,8 @@ Release 2.1.0-beta - UNRELEASED HDFS-3009. Remove duplicate code in DFSClient#isLocalAddress by using NetUtils. (Hari Mankude via suresh) + HDFS-4914. Use DFSClient.Conf instead of Configuration. (szetszwo) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java index a67b3892f2a..197ab23b29f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java @@ -25,7 +25,6 @@ import java.io.IOException; import java.net.InetSocketAddress; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -40,7 +39,6 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.security.AccessControlException; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; @@ -75,9 +73,7 @@ public class BlockReaderFactory { * should be allowed. * @return New BlockReader instance */ - @SuppressWarnings("deprecation") - public static BlockReader newBlockReader( - Configuration conf, + public static BlockReader newBlockReader(DFSClient.Conf conf, String file, ExtendedBlock block, Token blockToken, @@ -91,14 +87,11 @@ public class BlockReaderFactory { FileInputStreamCache fisCache, boolean allowShortCircuitLocalReads) throws IOException { - peer.setReadTimeout(conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, - HdfsServerConstants.READ_TIMEOUT)); + peer.setReadTimeout(conf.socketTimeout); peer.setWriteTimeout(HdfsServerConstants.WRITE_TIMEOUT); if (peer.getDomainSocket() != null) { - if (allowShortCircuitLocalReads && - (!conf.getBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, - DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT))) { + if (allowShortCircuitLocalReads && !conf.useLegacyBlockReaderLocal) { // If this is a domain socket, and short-circuit local reads are // enabled, try to set up a BlockReaderLocal. BlockReader reader = newShortCircuitBlockReader(conf, file, @@ -118,21 +111,19 @@ public class BlockReaderFactory { // If this is a domain socket and we couldn't (or didn't want to) set // up a BlockReaderLocal, check that we are allowed to pass data traffic // over the socket before proceeding. - if (!conf.getBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, - DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT)) { + if (!conf.domainSocketDataTraffic) { throw new IOException("Because we can't do short-circuit access, " + "and data traffic over domain sockets is disabled, " + "we cannot use this socket to talk to " + datanodeID); } } - if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER, - DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT)) { - return RemoteBlockReader.newBlockReader(file, - block, blockToken, startOffset, len, - conf.getInt(DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY, - DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT), + if (conf.useLegacyBlockReader) { + @SuppressWarnings("deprecation") + RemoteBlockReader reader = RemoteBlockReader.newBlockReader(file, + block, blockToken, startOffset, len, conf.ioBufferSize, verifyChecksum, clientName, peer, datanodeID, peerCache); + return reader; } else { return RemoteBlockReader2.newBlockReader( file, block, blockToken, startOffset, len, @@ -173,7 +164,7 @@ public class BlockReaderFactory { * @throws IOException If there was a communication error. */ private static BlockReaderLocal newShortCircuitBlockReader( - Configuration conf, String file, ExtendedBlock block, + DFSClient.Conf conf, String file, ExtendedBlock block, Token blockToken, long startOffset, long len, Peer peer, DatanodeID datanodeID, DomainSocketFactory domSockFactory, boolean verifyChecksum, @@ -245,15 +236,14 @@ public class BlockReaderFactory { * This block reader implements the path-based style of local reads * first introduced in HDFS-2246. */ - static BlockReader getLegacyBlockReaderLocal(UserGroupInformation ugi, - Configuration conf, String src, ExtendedBlock blk, + static BlockReader getLegacyBlockReaderLocal(DFSClient dfsClient, + String src, ExtendedBlock blk, Token accessToken, DatanodeInfo chosenNode, - int socketTimeout, long offsetIntoBlock, - boolean connectToDnViaHostname) throws InvalidToken, IOException { + long offsetIntoBlock) throws InvalidToken, IOException { try { - return BlockReaderLocalLegacy.newBlockReader(ugi, conf, src, - blk, accessToken, chosenNode, socketTimeout, offsetIntoBlock, - blk.getNumBytes() - offsetIntoBlock, connectToDnViaHostname); + final long length = blk.getNumBytes() - offsetIntoBlock; + return BlockReaderLocalLegacy.newBlockReader(dfsClient, src, blk, + accessToken, chosenNode, offsetIntoBlock, length); } catch (RemoteException re) { throw re.unwrapRemoteException(InvalidToken.class, AccessControlException.class); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java index 86ac6cb3573..c1cb0b3db3f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java @@ -17,10 +17,8 @@ */ package org.apache.hadoop.hdfs; -import java.io.DataInputStream; -import org.apache.hadoop.conf.Configuration; - import java.io.BufferedInputStream; +import java.io.DataInputStream; import java.io.FileInputStream; import java.io.IOException; import java.nio.ByteBuffer; @@ -90,13 +88,8 @@ class BlockReaderLocal implements BlockReader { private final FileInputStreamCache fisCache; - private static int getSlowReadBufferNumChunks(Configuration conf, + private static int getSlowReadBufferNumChunks(int bufSize, int bytesPerChecksum) { - - int bufSize = - conf.getInt(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY, - DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT); - if (bufSize < bytesPerChecksum) { throw new IllegalArgumentException("Configured BlockReaderLocal buffer size (" + bufSize + ") is not large enough to hold a single chunk (" + @@ -108,7 +101,7 @@ class BlockReaderLocal implements BlockReader { return bufSize / bytesPerChecksum; } - public BlockReaderLocal(Configuration conf, String filename, + public BlockReaderLocal(DFSClient.Conf conf, String filename, ExtendedBlock block, long startOffset, long length, FileInputStream dataIn, FileInputStream checksumIn, DatanodeID datanodeID, boolean verifyChecksum, @@ -132,13 +125,7 @@ class BlockReaderLocal implements BlockReader { throw new IOException("Wrong version (" + version + ") of the " + "metadata file for " + filename + "."); } - if (!verifyChecksum) { - this.verifyChecksum = false; - } else { - this.verifyChecksum = !conf.getBoolean(DFSConfigKeys. - DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, - DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT); - } + this.verifyChecksum = verifyChecksum && !conf.skipShortCircuitChecksums; long firstChunkOffset; if (this.verifyChecksum) { this.checksum = header.getChecksum(); @@ -148,7 +135,8 @@ class BlockReaderLocal implements BlockReader { - (startOffset % checksum.getBytesPerChecksum()); this.offsetFromChunkBoundary = (int) (startOffset - firstChunkOffset); - int chunksPerChecksumRead = getSlowReadBufferNumChunks(conf, bytesPerChecksum); + int chunksPerChecksumRead = getSlowReadBufferNumChunks( + conf.shortCircuitBufferSize, bytesPerChecksum); slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead); checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead); // Initially the buffers have nothing to read. @@ -171,7 +159,12 @@ class BlockReaderLocal implements BlockReader { this.dataIn.getChannel().position(firstChunkOffset); success = true; } finally { - if (!success) { + if (success) { + if (LOG.isDebugEnabled()) { + LOG.debug("Created BlockReaderLocal for file " + filename + + " block " + block + " in datanode " + datanodeID); + } + } else { if (slowReadBuff != null) bufferPool.returnBuffer(slowReadBuff); if (checksumBuff != null) bufferPool.returnBuffer(checksumBuff); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java index acee9de49ae..aeb6279bead 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java @@ -21,7 +21,6 @@ import java.io.DataInputStream; import java.io.File; import java.io.FileInputStream; import java.io.IOException; -import java.net.Socket; import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; import java.util.Collections; @@ -32,17 +31,15 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.util.DirectBufferPool; -import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; @@ -70,7 +67,7 @@ import org.apache.hadoop.util.DataChecksum; * */ class BlockReaderLocalLegacy implements BlockReader { - private static final Log LOG = LogFactory.getLog(DFSClient.class); + private static final Log LOG = LogFactory.getLog(BlockReaderLocalLegacy.class); //Stores the cache and proxy for a local datanode. private static class LocalDatanodeInfo { @@ -173,19 +170,20 @@ class BlockReaderLocalLegacy implements BlockReader { /** * The only way this object can be instantiated. */ - static BlockReaderLocalLegacy newBlockReader(UserGroupInformation ugi, - Configuration conf, String file, ExtendedBlock blk, - Token token, DatanodeInfo node, int socketTimeout, - long startOffset, long length, boolean connectToDnViaHostname) + static BlockReaderLocalLegacy newBlockReader(DFSClient dfsClient, + String file, ExtendedBlock blk, Token token, + DatanodeInfo node, long startOffset, long length) throws IOException { + final DFSClient.Conf conf = dfsClient.getConf(); LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node .getIpcPort()); // check the cache first BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk); if (pathinfo == null) { - pathinfo = getBlockPathInfo(ugi, blk, node, conf, socketTimeout, token, - connectToDnViaHostname); + pathinfo = getBlockPathInfo(dfsClient.ugi, blk, node, + dfsClient.getConfiguration(), dfsClient.getHdfsTimeout(), token, + conf.connectToDnViaHostname); } // check to see if the file exists. It may so happen that the @@ -197,7 +195,7 @@ class BlockReaderLocalLegacy implements BlockReader { FileInputStream dataIn = null; FileInputStream checksumIn = null; BlockReaderLocalLegacy localBlockReader = null; - boolean skipChecksumCheck = skipChecksumCheck(conf); + boolean skipChecksumCheck = conf.skipShortCircuitChecksums; try { // get a local file system File blkfile = new File(pathinfo.getBlockPath()); @@ -285,16 +283,8 @@ class BlockReaderLocalLegacy implements BlockReader { return pathinfo; } - private static boolean skipChecksumCheck(Configuration conf) { - return conf.getBoolean( - DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, - DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT); - } - - private static int getSlowReadBufferNumChunks(Configuration conf, int bytesPerChecksum) { - int bufferSizeBytes = conf.getInt(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY, - DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT); - + private static int getSlowReadBufferNumChunks(int bufferSizeBytes, + int bytesPerChecksum) { if (bufferSizeBytes < bytesPerChecksum) { throw new IllegalArgumentException("Configured BlockReaderLocalLegacy " + "buffer size (" + bufferSizeBytes + ") is not large enough to hold " + @@ -307,7 +297,7 @@ class BlockReaderLocalLegacy implements BlockReader { return bufferSizeBytes / bytesPerChecksum; } - private BlockReaderLocalLegacy(Configuration conf, String hdfsfile, + private BlockReaderLocalLegacy(DFSClient.Conf conf, String hdfsfile, ExtendedBlock block, Token token, long startOffset, long length, BlockLocalPathInfo pathinfo, FileInputStream dataIn) throws IOException { @@ -316,7 +306,7 @@ class BlockReaderLocalLegacy implements BlockReader { dataIn, startOffset, null); } - private BlockReaderLocalLegacy(Configuration conf, String hdfsfile, + private BlockReaderLocalLegacy(DFSClient.Conf conf, String hdfsfile, ExtendedBlock block, Token token, long startOffset, long length, BlockLocalPathInfo pathinfo, DataChecksum checksum, boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset, @@ -333,7 +323,8 @@ class BlockReaderLocalLegacy implements BlockReader { this.checksumIn = checksumIn; this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset); - int chunksPerChecksumRead = getSlowReadBufferNumChunks(conf, bytesPerChecksum); + int chunksPerChecksumRead = getSlowReadBufferNumChunks( + conf.shortCircuitBufferSize, bytesPerChecksum); slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead); checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead); // Initially the buffers have nothing to read. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index c1469ba7abf..cbb6d71c80e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -178,6 +178,9 @@ public class DFSClient implements java.io.Closeable { public static final Log LOG = LogFactory.getLog(DFSClient.class); public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L; // 1 hour static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB + + private final Configuration conf; + private final Conf dfsClientConf; final ClientProtocol namenode; /* The service used for delegation tokens */ private Text dtService; @@ -188,14 +191,11 @@ public class DFSClient implements java.io.Closeable { private volatile FsServerDefaults serverDefaults; private volatile long serverDefaultsLastUpdate; final String clientName; - Configuration conf; SocketFactory socketFactory; final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure; final FileSystem.Statistics stats; - final int hdfsTimeout; // timeout value for a DFS operation. private final String authority; final PeerCache peerCache; - final Conf dfsClientConf; private Random r = new Random(); private SocketAddress[] localInterfaceAddrs; private DataEncryptionKey encryptionKey; @@ -204,7 +204,8 @@ public class DFSClient implements java.io.Closeable { /** * DFSClient configuration */ - static class Conf { + public static class Conf { + final int hdfsTimeout; // timeout value for a DFS operation. final int maxFailoverAttempts; final int failoverSleepBaseMillis; final int failoverSleepMaxMillis; @@ -227,18 +228,25 @@ public class DFSClient implements java.io.Closeable { final short defaultReplication; final String taskId; final FsPermission uMask; - final boolean useLegacyBlockReaderLocal; final boolean connectToDnViaHostname; final boolean getHdfsBlocksMetadataEnabled; final int getFileBlockStorageLocationsNumThreads; final int getFileBlockStorageLocationsTimeout; + + final boolean useLegacyBlockReader; + final boolean useLegacyBlockReaderLocal; final String domainSocketPath; final boolean skipShortCircuitChecksums; final int shortCircuitBufferSize; final boolean shortCircuitLocalReads; final boolean domainSocketDataTraffic; + final int shortCircuitStreamsCacheSize; + final long shortCircuitStreamsCacheExpiryMs; + + public Conf(Configuration conf) { + // The hdfsTimeout is currently the same as the ipc timeout + hdfsTimeout = Client.getTimeout(conf); - Conf(Configuration conf) { maxFailoverAttempts = conf.getInt( DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT); @@ -277,19 +285,15 @@ public class DFSClient implements java.io.Closeable { DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT); prefetchSize = conf.getLong(DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 10 * defaultBlockSize); - timeWindow = conf - .getInt(DFS_CLIENT_RETRY_WINDOW_BASE, 3000); + timeWindow = conf.getInt(DFS_CLIENT_RETRY_WINDOW_BASE, 3000); nCachedConnRetry = conf.getInt(DFS_CLIENT_CACHED_CONN_RETRY_KEY, DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT); nBlockWriteRetry = conf.getInt(DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY, DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT); - nBlockWriteLocateFollowingRetry = conf - .getInt(DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, - DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT); + nBlockWriteLocateFollowingRetry = conf.getInt( + DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, + DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT); uMask = FsPermission.getUMask(conf); - useLegacyBlockReaderLocal = conf.getBoolean( - DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, - DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT); connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); getHdfsBlocksMetadataEnabled = conf.getBoolean( @@ -301,20 +305,50 @@ public class DFSClient implements java.io.Closeable { getFileBlockStorageLocationsTimeout = conf.getInt( DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT, DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_DEFAULT); - domainSocketPath = conf.getTrimmed(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, + + useLegacyBlockReader = conf.getBoolean( + DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER, + DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT); + useLegacyBlockReaderLocal = conf.getBoolean( + DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, + DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT); + shortCircuitLocalReads = conf.getBoolean( + DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, + DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT); + domainSocketDataTraffic = conf.getBoolean( + DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, + DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT); + domainSocketPath = conf.getTrimmed( + DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT); + + if (BlockReaderLocal.LOG.isDebugEnabled()) { + BlockReaderLocal.LOG.debug( + DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL + + " = " + useLegacyBlockReaderLocal); + BlockReaderLocal.LOG.debug( + DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY + + " = " + shortCircuitLocalReads); + BlockReaderLocal.LOG.debug( + DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC + + " = " + domainSocketDataTraffic); + BlockReaderLocal.LOG.debug( + DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY + + " = " + domainSocketPath); + } + skipShortCircuitChecksums = conf.getBoolean( DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT); shortCircuitBufferSize = conf.getInt( DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY, DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT); - shortCircuitLocalReads = conf.getBoolean( - DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, - DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT); - domainSocketDataTraffic = conf.getBoolean( - DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, - DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT); + shortCircuitStreamsCacheSize = conf.getInt( + DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY, + DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_DEFAULT); + shortCircuitStreamsCacheExpiryMs = conf.getLong( + DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY, + DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT); } private DataChecksum.Type getChecksumType(Configuration conf) { @@ -360,10 +394,14 @@ public class DFSClient implements java.io.Closeable { } } - Conf getConf() { + public Conf getConf() { return dfsClientConf; } + Configuration getConfiguration() { + return conf; + } + /** * A map from file names to {@link DFSOutputStream} objects * that are currently being written by this client. @@ -426,8 +464,6 @@ public class DFSClient implements java.io.Closeable { this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class); this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf); - // The hdfsTimeout is currently the same as the ipc timeout - this.hdfsTimeout = Client.getTimeout(conf); this.ugi = UserGroupInformation.getCurrentUser(); this.authority = nameNodeUri == null? "null": nameNodeUri.getAuthority(); @@ -542,21 +578,13 @@ public class DFSClient implements java.io.Closeable { } int getHdfsTimeout() { - return hdfsTimeout; + return dfsClientConf.hdfsTimeout; } String getClientName() { return clientName; } - /** - * @return whether the client should use hostnames instead of IPs - * when connecting to DataNodes - */ - boolean connectToDnViaHostname() { - return dfsClientConf.connectToDnViaHostname; - } - void checkOpen() throws IOException { if (!clientRunning) { IOException result = new IOException("Filesystem closed"); @@ -793,6 +821,7 @@ public class DFSClient implements java.io.Closeable { * @throws IOException * @deprecated Use Token.renew instead. */ + @Deprecated public long renewDelegationToken(Token token) throws InvalidToken, IOException { LOG.info("Renewing " + DelegationTokenIdentifier.stringifyToken(token)); @@ -864,6 +893,7 @@ public class DFSClient implements java.io.Closeable { * @throws IOException * @deprecated Use Token.cancel instead. */ + @Deprecated public void cancelDelegationToken(Token token) throws InvalidToken, IOException { LOG.info("Cancelling " + DelegationTokenIdentifier.stringifyToken(token)); @@ -965,6 +995,11 @@ public class DFSClient implements java.io.Closeable { return dfsClientConf.defaultReplication; } + public LocatedBlocks getLocatedBlocks(String src, long start) + throws IOException { + return getLocatedBlocks(src, start, dfsClientConf.prefetchSize); + } + /* * This is just a wrapper around callGetBlockLocations, but non-static so that * we can stub it out for tests. @@ -1693,10 +1728,10 @@ public class DFSClient implements java.io.Closeable { * @param socketFactory to create sockets to connect to DNs * @param socketTimeout timeout to use when connecting and waiting for a response * @param encryptionKey the key needed to communicate with DNs in this cluster - * @param connectToDnViaHostname {@link #connectToDnViaHostname()} + * @param connectToDnViaHostname whether the client should use hostnames instead of IPs * @return The checksum */ - static MD5MD5CRC32FileChecksum getFileChecksum(String src, + private static MD5MD5CRC32FileChecksum getFileChecksum(String src, String clientName, ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout, DataEncryptionKey encryptionKey, boolean connectToDnViaHostname) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 56fc97bf045..c025ee8d46b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -72,7 +72,6 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable private final DFSClient dfsClient; private boolean closed = false; private final String src; - private final long prefetchSize; private BlockReader blockReader = null; private final boolean verifyChecksum; private LocatedBlocks locatedBlocks = null; @@ -163,7 +162,6 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable * capped at maxBlockAcquireFailures */ private int failures = 0; - private final int timeWindow; /* XXX Use of CocurrentHashMap is temp fix. Need to fix * parallel accesses to DFSInputStream (through ptreads) properly */ @@ -173,8 +171,6 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable private final byte[] oneByteBuf = new byte[1]; // used for 'int read()' - private final int nCachedConnRetry; - void addToDeadNodes(DatanodeInfo dnInfo) { deadNodes.put(dnInfo, dnInfo); } @@ -187,15 +183,8 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable this.src = src; this.peerCache = dfsClient.peerCache; this.fileInputStreamCache = new FileInputStreamCache( - dfsClient.conf.getInt(DFSConfigKeys. - DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY, - DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_DEFAULT), - dfsClient.conf.getLong(DFSConfigKeys. - DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY, - DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT)); - prefetchSize = dfsClient.getConf().prefetchSize; - timeWindow = dfsClient.getConf().timeWindow; - nCachedConnRetry = dfsClient.getConf().nCachedConnRetry; + dfsClient.getConf().shortCircuitStreamsCacheSize, + dfsClient.getConf().shortCircuitStreamsCacheExpiryMs); openInfo(); } @@ -236,7 +225,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable } private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException { - LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0, prefetchSize); + final LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0); if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("newInfo = " + newInfo); } @@ -280,8 +269,8 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable ClientDatanodeProtocol cdp = null; try { - cdp = DFSUtil.createClientDatanodeProtocolProxy( - datanode, dfsClient.conf, dfsClient.getConf().socketTimeout, + cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode, + dfsClient.getConfiguration(), dfsClient.getConf().socketTimeout, dfsClient.getConf().connectToDnViaHostname, locatedblock); final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock()); @@ -389,8 +378,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable if (targetBlockIdx < 0) { // block is not cached targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx); // fetch more blocks - LocatedBlocks newBlocks; - newBlocks = dfsClient.getLocatedBlocks(src, offset, prefetchSize); + final LocatedBlocks newBlocks = dfsClient.getLocatedBlocks(src, offset); assert (newBlocks != null) : "Could not find target position " + offset; locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks()); } @@ -413,8 +401,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx); } // fetch blocks - LocatedBlocks newBlocks; - newBlocks = dfsClient.getLocatedBlocks(src, offset, prefetchSize); + final LocatedBlocks newBlocks = dfsClient.getLocatedBlocks(src, offset); if (newBlocks == null) { throw new IOException("Could not find target position " + offset); } @@ -832,7 +819,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable try { DatanodeInfo chosenNode = bestNode(nodes, deadNodes); final String dnAddr = - chosenNode.getXferAddr(dfsClient.connectToDnViaHostname()); + chosenNode.getXferAddr(dfsClient.getConf().connectToDnViaHostname); if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Connecting to datanode " + dnAddr); } @@ -861,6 +848,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable // alleviating the request rate from the server. Similarly the 3rd retry // will wait 6000ms grace period before retry and the waiting window is // expanded to 9000ms. + final int timeWindow = dfsClient.getConf().timeWindow; double waitTime = timeWindow * failures + // grace period for the last round of attempt timeWindow * (failures + 1) * DFSUtil.getRandom().nextDouble(); // expanding time window for each failure DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec."); @@ -1011,7 +999,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable DFSClient.LOG.debug("got FileInputStreams for " + block + " from " + "the FileInputStreamCache."); } - return new BlockReaderLocal(dfsClient.conf, file, + return new BlockReaderLocal(dfsClient.getConf(), file, block, startOffset, len, fis[0], fis[1], chosenNode, verifyChecksum, fileInputStreamCache); } @@ -1023,9 +1011,8 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable DFSClient.isLocalAddress(dnAddr) && (!shortCircuitForbidden())) { try { - return BlockReaderFactory.getLegacyBlockReaderLocal(dfsClient.ugi, - dfsClient.conf, clientName, block, blockToken, chosenNode, - dfsClient.hdfsTimeout, startOffset,dfsClient.connectToDnViaHostname()); + return BlockReaderFactory.getLegacyBlockReaderLocal(dfsClient, + clientName, block, blockToken, chosenNode, startOffset); } catch (IOException e) { DFSClient.LOG.warn("error creating legacy BlockReaderLocal. " + "Disabling legacy local reads.", e); @@ -1037,6 +1024,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable int cacheTries = 0; DomainSocketFactory dsFactory = dfsClient.getDomainSocketFactory(); BlockReader reader = null; + final int nCachedConnRetry = dfsClient.getConf().nCachedConnRetry; for (; cacheTries < nCachedConnRetry; ++cacheTries) { Peer peer = peerCache.get(chosenNode, true); if (peer == null) break; @@ -1044,7 +1032,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable boolean allowShortCircuitLocalReads = dfsClient.getConf(). shortCircuitLocalReads && (!shortCircuitForbidden()); reader = BlockReaderFactory.newBlockReader( - dfsClient.conf, file, block, blockToken, startOffset, + dfsClient.getConf(), file, block, blockToken, startOffset, len, verifyChecksum, clientName, peer, chosenNode, dsFactory, peerCache, fileInputStreamCache, allowShortCircuitLocalReads); @@ -1067,7 +1055,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable boolean allowShortCircuitLocalReads = dfsClient.getConf(). shortCircuitLocalReads && (!shortCircuitForbidden()); reader = BlockReaderFactory.newBlockReader( - dfsClient.conf, file, block, blockToken, startOffset, + dfsClient.getConf(), file, block, blockToken, startOffset, len, verifyChecksum, clientName, peer, chosenNode, dsFactory, peerCache, fileInputStreamCache, allowShortCircuitLocalReads); @@ -1091,7 +1079,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable if (peer == null) break; try { reader = BlockReaderFactory.newBlockReader( - dfsClient.conf, file, block, blockToken, startOffset, + dfsClient.getConf(), file, block, blockToken, startOffset, len, verifyChecksum, clientName, peer, chosenNode, dsFactory, peerCache, fileInputStreamCache, false); return reader; @@ -1110,7 +1098,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable // Try to create a new remote peer. Peer peer = newTcpPeer(dnAddr); return BlockReaderFactory.newBlockReader( - dfsClient.conf, file, block, blockToken, startOffset, + dfsClient.getConf(), file, block, blockToken, startOffset, len, verifyChecksum, clientName, peer, chosenNode, dsFactory, peerCache, fileInputStreamCache, false); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 079ff7ea4ed..e58a5730510 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -1288,7 +1288,8 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable { */ static Socket createSocketForPipeline(final DatanodeInfo first, final int length, final DFSClient client) throws IOException { - final String dnAddr = first.getXferAddr(client.connectToDnViaHostname()); + final String dnAddr = first.getXferAddr( + client.getConf().connectToDnViaHostname); if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Connecting to datanode " + dnAddr); } @@ -1813,8 +1814,8 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable { if (closed) { return; } - streamer.setLastException(new IOException("Lease timeout of " + - (dfsClient.hdfsTimeout/1000) + " seconds expired.")); + streamer.setLastException(new IOException("Lease timeout of " + + (dfsClient.getHdfsTimeout()/1000) + " seconds expired.")); closeThreads(true); dfsClient.endFileLease(src); } @@ -1884,13 +1885,13 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable { while (!fileComplete) { fileComplete = dfsClient.namenode.complete(src, dfsClient.clientName, last); if (!fileComplete) { + final int hdfsTimeout = dfsClient.getHdfsTimeout(); if (!dfsClient.clientRunning || - (dfsClient.hdfsTimeout > 0 && - localstart + dfsClient.hdfsTimeout < Time.now())) { + (hdfsTimeout > 0 && localstart + hdfsTimeout < Time.now())) { String msg = "Unable to close file because dfsclient " + " was unable to contact the HDFS servers." + " clientRunning " + dfsClient.clientRunning + - " hdfsTimeout " + dfsClient.hdfsTimeout; + " hdfsTimeout " + hdfsTimeout; DFSClient.LOG.info(msg); throw new IOException(msg); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java index ebcb84a810e..d420a83d6fc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java @@ -23,16 +23,15 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.hdfs.DFSClient.Conf; - import org.apache.hadoop.net.unix.DomainSocket; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; class DomainSocketFactory { - public static final Log LOG = LogFactory.getLog(DomainSocketFactory.class); + private static final Log LOG = BlockReaderLocal.LOG; private final Conf conf; enum PathStatus { @@ -51,21 +50,26 @@ class DomainSocketFactory { public DomainSocketFactory(Conf conf) { this.conf = conf; - String feature = null; + final String feature; if (conf.shortCircuitLocalReads && (!conf.useLegacyBlockReaderLocal)) { feature = "The short-circuit local reads feature"; } else if (conf.domainSocketDataTraffic) { feature = "UNIX domain socket data traffic"; + } else { + feature = null; } - if (feature != null) { + + if (feature == null) { + LOG.debug("Both short-circuit local reads and UNIX domain socket are disabled."); + } else { if (conf.domainSocketPath.isEmpty()) { - LOG.warn(feature + " is disabled because you have not set " + - DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY); + throw new HadoopIllegalArgumentException(feature + " is enabled but " + + DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY + " is not set."); } else if (DomainSocket.getLoadingFailureReason() != null) { - LOG.warn(feature + " is disabled because " + - DomainSocket.getLoadingFailureReason()); + LOG.warn(feature + " cannot be used because " + + DomainSocket.getLoadingFailureReason()); } else { - LOG.debug(feature + "is enabled."); + LOG.debug(feature + " is enabled."); } } } @@ -86,8 +90,8 @@ class DomainSocketFactory { // sockets. if (conf.domainSocketPath.isEmpty()) return null; // If we can't do anything with the domain socket, don't create it. - if ((conf.domainSocketDataTraffic == false) && - ((!conf.shortCircuitLocalReads) || conf.useLegacyBlockReaderLocal)) { + if (!conf.domainSocketDataTraffic && + (!conf.shortCircuitLocalReads || conf.useLegacyBlockReaderLocal)) { return null; } // UNIX domain sockets can only be used to talk to local peers diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java index 9476b80a376..61ce3eab157 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java @@ -22,7 +22,6 @@ import java.io.BufferedOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; -import java.net.InetSocketAddress; import java.nio.ByteBuffer; import org.apache.hadoop.classification.InterfaceAudience; @@ -462,18 +461,6 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { peer.getRemoteAddressString() + ": " + e.getMessage()); } } - - /** - * File name to print when accessing a block directly (from servlets) - * @param s Address of the block location - * @param poolId Block pool ID of the block - * @param blockId Block ID of the block - * @return string that has a file name for debug purposes - */ - public static String getFileName(final InetSocketAddress s, - final String poolId, final long blockId) { - return s.toString() + ":" + poolId + ":" + blockId; - } @Override public int read(ByteBuffer buf) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java index 24544e656a2..d0b88a04977 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java @@ -44,6 +44,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.BlockReader; import org.apache.hadoop.hdfs.BlockReaderFactory; +import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.net.TcpPeerServer; import org.apache.hadoop.hdfs.protocol.DatanodeID; @@ -198,7 +199,8 @@ public class JspHelper { public static void streamBlockInAscii(InetSocketAddress addr, String poolId, long blockId, Token blockToken, long genStamp, long blockSize, long offsetIntoBlock, long chunkSizeToView, - JspWriter out, Configuration conf, DataEncryptionKey encryptionKey) + JspWriter out, Configuration conf, DFSClient.Conf dfsConf, + DataEncryptionKey encryptionKey) throws IOException { if (chunkSizeToView == 0) return; Socket s = NetUtils.getDefaultSocketFactory(conf).createSocket(); @@ -209,8 +211,7 @@ public class JspHelper { // Use the block name for file name. String file = BlockReaderFactory.getFileName(addr, poolId, blockId); - BlockReader blockReader = BlockReaderFactory.newBlockReader( - conf, file, + BlockReader blockReader = BlockReaderFactory.newBlockReader(dfsConf, file, new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken, offsetIntoBlock, amtToRead, true, "JspHelper", TcpPeerServer.peerFromSocketAndKey(s, encryptionKey), @@ -218,7 +219,7 @@ public class JspHelper { addr.getHostName(), poolId, addr.getPort(), 0, 0), null, null, null, false); - byte[] buf = new byte[(int)amtToRead]; + final byte[] buf = new byte[amtToRead]; int readOffset = 0; int retries = 2; while ( amtToRead > 0 ) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java index db2b92076b6..80732f0d304 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java @@ -604,7 +604,8 @@ public class DatanodeJspHelper { try { JspHelper.streamBlockInAscii(new InetSocketAddress(req.getServerName(), datanodePort), bpid, blockId, blockToken, genStamp, blockSize, - startOffset, chunkSizeToView, out, conf, dfs.getDataEncryptionKey()); + startOffset, chunkSizeToView, out, conf, dfs.getConf(), + dfs.getDataEncryptionKey()); } catch (Exception e) { out.print(e); } @@ -697,7 +698,8 @@ public class DatanodeJspHelper { out.print(""); dfs.close(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java index 26103fc2e22..d665c48c04b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java @@ -559,8 +559,8 @@ public class NamenodeFsck { String file = BlockReaderFactory.getFileName(targetAddr, block.getBlockPoolId(), block.getBlockId()); - blockReader = BlockReaderFactory.newBlockReader( - conf, file, block, lblock.getBlockToken(), 0, -1, true, "fsck", + blockReader = BlockReaderFactory.newBlockReader(dfs.getConf(), + file, block, lblock.getBlockToken(), 0, -1, true, "fsck", TcpPeerServer.peerFromSocketAndKey(s, namenode.getRpcServer(). getDataEncryptionKey()), chosenNode, null, null, null, false); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java index 35381634527..5373d5c9827 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java @@ -150,7 +150,7 @@ public class BlockReaderTestUtil { sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); return BlockReaderFactory.newBlockReader( - conf, + new DFSClient.Conf(conf), targetAddr.toString()+ ":" + block.getBlockId(), block, testBlock.getBlockToken(), offset, lenToRead, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java index 80cffa048ec..057b79fd114 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java @@ -130,7 +130,7 @@ public class TestBlockReaderLocal { test.setup(dataFile, checksum); dataIn = new FileInputStream(dataFile); checkIn = new FileInputStream(metaFile); - blockReaderLocal = new BlockReaderLocal(conf, + blockReaderLocal = new BlockReaderLocal(new DFSClient.Conf(conf), TEST_PATH.getName(), block, 0, -1, dataIn, checkIn, datanodeID, checksum, null); dataIn = null; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java index 5965fec2874..0163d956ac1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java @@ -325,7 +325,7 @@ public class TestParallelReadUtil { testInfo.filepath = new Path("/TestParallelRead.dat." + i); testInfo.authenticData = util.writeFile(testInfo.filepath, FILE_SIZE_K); testInfo.dis = dfsClient.open(testInfo.filepath.toString(), - dfsClient.dfsClientConf.ioBufferSize, verifyChecksums); + dfsClient.getConf().ioBufferSize, verifyChecksums); for (int j = 0; j < nWorkerEach; ++j) { workers[nWorkers++] = new ReadWorker(testInfo, nWorkers, helper); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java index 689852e3fcf..61fa0ecc680 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java @@ -146,7 +146,7 @@ public class TestBlockTokenWithDFS { String file = BlockReaderFactory.getFileName(targetAddr, "test-blockpoolid", block.getBlockId()); blockReader = BlockReaderFactory.newBlockReader( - conf, file, block, lblock.getBlockToken(), 0, -1, + new DFSClient.Conf(conf), file, block, lblock.getBlockToken(), 0, -1, true, "TestBlockTokenWithDFS", TcpPeerServer.peerFromSocket(s), nodes[0], null, null, null, false); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java index 0a69c072399..cd30e2779e3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java @@ -35,6 +35,7 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.BlockReader; import org.apache.hadoop.hdfs.BlockReaderFactory; +import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; @@ -284,7 +285,7 @@ public class TestDataNodeVolumeFailure { "test-blockpoolid", block.getBlockId()); BlockReader blockReader = - BlockReaderFactory.newBlockReader(conf, file, block, + BlockReaderFactory.newBlockReader(new DFSClient.Conf(conf), file, block, lblock.getBlockToken(), 0, -1, true, "TestDataNodeVolumeFailure", TcpPeerServer.peerFromSocket(s), datanode, null, null, null, false); blockReader.close();