From 32052a1e3a8007b5348dc42415861aeb859ebc5a Mon Sep 17 00:00:00 2001 From: Todd Lipcon Date: Fri, 4 Jan 2013 06:59:12 +0000 Subject: [PATCH] HDFS-4352. Encapsulate arguments to BlockReaderFactory in a class. Contributed by Colin Patrick McCabe. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1428729 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hadoop/hdfs/BlockReaderFactory.java | 178 +++++++++++++----- .../org/apache/hadoop/hdfs/DFSClient.java | 5 +- .../apache/hadoop/hdfs/DFSInputStream.java | 18 +- .../apache/hadoop/hdfs/RemoteBlockReader.java | 41 ++-- .../hadoop/hdfs/RemoteBlockReader2.java | 47 ++--- .../hadoop/hdfs/server/common/JspHelper.java | 17 +- .../hdfs/server/namenode/NamenodeFsck.java | 13 +- .../hadoop/hdfs/BlockReaderTestUtil.java | 15 +- .../TestBlockTokenWithDFS.java | 8 +- .../datanode/TestDataNodeVolumeFailure.java | 16 +- 11 files changed, 219 insertions(+), 142 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 3c4e53f0f5d..8069d65f4f9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -177,6 +177,9 @@ Trunk (Unreleased) HDFS-4346. Add SequentialNumber as a base class for INodeId and GenerationStamp. (szetszwo) + HDFS-4352. Encapsulate arguments to BlockReaderFactory in a class + (Colin Patrick McCabe via todd) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java index c71f1ced6a6..26ae5fb9258 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java @@ -22,7 +22,6 @@ import java.net.Socket; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSClient.Conf; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor; @@ -40,71 +39,150 @@ @InterfaceAudience.Private public class BlockReaderFactory { /** - * @see #newBlockReader(Conf, Socket, String, ExtendedBlock, Token, long, long, int, boolean, String) + * Parameters for creating a BlockReader. + * + * Before you add something to here: think about whether it's already included + * in Conf (or should be). */ - public static BlockReader newBlockReader( - Configuration conf, - Socket sock, String file, - ExtendedBlock block, Token blockToken, - long startOffset, long len, DataEncryptionKey encryptionKey) - throws IOException { - int bufferSize = conf.getInt(DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY, - DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT); - return newBlockReader(new Conf(conf), - sock, file, block, blockToken, startOffset, - len, bufferSize, true, "", encryptionKey, null); + @InterfaceAudience.Private + public static class Params { + private final Conf conf; + private Socket socket = null; + private String file = null; + private ExtendedBlock block = null; + private Token blockToken = null; + private long startOffset = 0; + private long len = -1; + private int bufferSize; + private boolean verifyChecksum = true; + private boolean shortCircuitLocalReads = false; + private String clientName = ""; + private DataEncryptionKey encryptionKey = null; + private IOStreamPair ioStreamPair = null; + + public Params(Conf conf) { + this.conf = conf; + this.bufferSize = conf.ioBufferSize; + } + public Conf getConf() { + return conf; + } + public Socket getSocket() { + return socket; + } + public Params setSocket(Socket socket) { + this.socket = socket; + return this; + } + public String getFile() { + return file; + } + public Params setFile(String file) { + this.file = file; + return this; + } + public ExtendedBlock getBlock() { + return block; + } + public Params setBlock(ExtendedBlock block) { + this.block = block; + return this; + } + public Token getBlockToken() { + return blockToken; + } + public Params setBlockToken(Token blockToken) { + this.blockToken = blockToken; + return this; + } + public long getStartOffset() { + return startOffset; + } + public Params setStartOffset(long startOffset) { + this.startOffset = startOffset; + return this; + } + public long getLen() { + return len; + } + public Params setLen(long len) { + this.len = len; + return this; + } + public int getBufferSize() { + return bufferSize; + } + public Params setBufferSize(int bufferSize) { + this.bufferSize = bufferSize; + return this; + } + public boolean getVerifyChecksum() { + return verifyChecksum; + } + public Params setVerifyChecksum(boolean verifyChecksum) { + this.verifyChecksum = verifyChecksum; + return this; + } + public boolean getShortCircuitLocalReads() { + return shortCircuitLocalReads; + } + public Params setShortCircuitLocalReads(boolean on) { + this.shortCircuitLocalReads = on; + return this; + } + public String getClientName() { + return clientName; + } + public Params setClientName(String clientName) { + this.clientName = clientName; + return this; + } + public Params setEncryptionKey(DataEncryptionKey encryptionKey) { + this.encryptionKey = encryptionKey; + return this; + } + public DataEncryptionKey getEncryptionKey() { + return encryptionKey; + } + public IOStreamPair getIoStreamPair() { + return ioStreamPair; + } + public Params setIoStreamPair(IOStreamPair ioStreamPair) { + this.ioStreamPair = ioStreamPair; + return this; + } } /** * Create a new BlockReader specifically to satisfy a read. * This method also sends the OP_READ_BLOCK request. * - * @param conf the DFSClient configuration - * @param sock An established Socket to the DN. The BlockReader will not close it normally - * @param file File location - * @param block The block object - * @param blockToken The block token for security - * @param startOffset The read offset, relative to block head - * @param len The number of bytes to read - * @param bufferSize The IO buffer size (not the client buffer size) - * @param verifyChecksum Whether to verify checksum - * @param clientName Client name - * @return New BlockReader instance, or null on error. + * @param params The parameters + * + * @return New BlockReader instance + * @throws IOException If there was an error creating the BlockReader */ @SuppressWarnings("deprecation") - public static BlockReader newBlockReader( - Conf conf, - Socket sock, String file, - ExtendedBlock block, - Token blockToken, - long startOffset, long len, - int bufferSize, boolean verifyChecksum, - String clientName, - DataEncryptionKey encryptionKey, - IOStreamPair ioStreams) - throws IOException { - - if (conf.useLegacyBlockReader) { - if (encryptionKey != null) { + public static BlockReader newBlockReader(Params params) throws IOException { + if (params.getConf().useLegacyBlockReader) { + if (params.getEncryptionKey() != null) { throw new RuntimeException("Encryption is not supported with the legacy block reader."); } - return RemoteBlockReader.newBlockReader( - sock, file, block, blockToken, startOffset, len, bufferSize, verifyChecksum, clientName); + return RemoteBlockReader.newBlockReader(params); } else { - if (ioStreams == null) { - ioStreams = new IOStreamPair(NetUtils.getInputStream(sock), - NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT)); - if (encryptionKey != null) { + Socket sock = params.getSocket(); + if (params.getIoStreamPair() == null) { + params.setIoStreamPair(new IOStreamPair(NetUtils.getInputStream(sock), + NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT))); + if (params.getEncryptionKey() != null) { IOStreamPair encryptedStreams = DataTransferEncryptor.getEncryptedStreams( - ioStreams.out, ioStreams.in, encryptionKey); - ioStreams = encryptedStreams; + params.getIoStreamPair().out, params.getIoStreamPair().in, + params.getEncryptionKey()); + params.setIoStreamPair(encryptedStreams); } } - - return RemoteBlockReader2.newBlockReader( - sock, file, block, blockToken, startOffset, len, bufferSize, - verifyChecksum, clientName, encryptionKey, ioStreams); + return RemoteBlockReader2.newBlockReader(params); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 225a9e75dfe..ba31f9288c5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -200,7 +200,8 @@ public class DFSClient implements java.io.Closeable { /** * DFSClient configuration */ - static class Conf { + @InterfaceAudience.Private + public static class Conf { final int maxFailoverAttempts; final int failoverSleepBaseMillis; final int failoverSleepMaxMillis; @@ -228,7 +229,7 @@ static class Conf { final int getFileBlockStorageLocationsNumThreads; final int getFileBlockStorageLocationsTimeout; - Conf(Configuration conf) { + public Conf(Configuration conf) { maxFailoverAttempts = conf.getInt( DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 1e986cd1350..e403a57b0ae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -934,15 +934,15 @@ protected BlockReader getBlockReader(InetSocketAddress dnAddr, try { // The OP_READ_BLOCK request is sent as we make the BlockReader - BlockReader reader = - BlockReaderFactory.newBlockReader(dfsClient.getConf(), - sock, file, block, - blockToken, - startOffset, len, - bufferSize, verifyChecksum, - clientName, - dfsClient.getDataEncryptionKey(), - sockAndStreams == null ? null : sockAndStreams.ioStreams); + BlockReader reader = BlockReaderFactory. + newBlockReader(new BlockReaderFactory.Params(dfsClient.getConf()). + setFile(file).setBlock(block).setBlockToken(blockToken). + setStartOffset(startOffset).setLen(len). + setBufferSize(bufferSize).setVerifyChecksum(verifyChecksum). + setClientName(clientName). + setEncryptionKey(dfsClient.getDataEncryptionKey()). + setIoStreamPair(sockAndStreams == null ? null : sockAndStreams.ioStreams). + setSocket(sock)); return reader; } catch (IOException ex) { // Our socket is no good. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java index 2bcd96e7644..f380d818ba4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java @@ -349,13 +349,6 @@ private RemoteBlockReader(String file, String bpid, long blockId, checksumSize = this.checksum.getChecksumSize(); } - public static RemoteBlockReader newBlockReader(Socket sock, String file, - ExtendedBlock block, Token blockToken, - long startOffset, long len, int bufferSize) throws IOException { - return newBlockReader(sock, file, block, blockToken, startOffset, - len, bufferSize, true, ""); - } - /** * Create a new BlockReader specifically to satisfy a read. * This method also sends the OP_READ_BLOCK request. @@ -371,29 +364,26 @@ public static RemoteBlockReader newBlockReader(Socket sock, String file, * @param clientName Client name * @return New BlockReader instance, or null on error. */ - public static RemoteBlockReader newBlockReader( Socket sock, String file, - ExtendedBlock block, - Token blockToken, - long startOffset, long len, - int bufferSize, boolean verifyChecksum, - String clientName) - throws IOException { + public static RemoteBlockReader newBlockReader(BlockReaderFactory.Params params) + throws IOException { // in and out will be closed when sock is closed (by the caller) + Socket sock = params.getSocket(); final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT))); - new Sender(out).readBlock(block, blockToken, clientName, startOffset, len); - + new Sender(out).readBlock(params.getBlock(), params.getBlockToken(), + params.getClientName(), params.getStartOffset(), params.getLen()); + // // Get bytes in block, set streams // - DataInputStream in = new DataInputStream( new BufferedInputStream(NetUtils.getInputStream(sock), - bufferSize)); - + params.getBufferSize())); + BlockOpResponseProto status = BlockOpResponseProto.parseFrom( vintPrefixed(in)); - RemoteBlockReader2.checkSuccess(status, sock, block, file); + RemoteBlockReader2.checkSuccess(status, sock, params.getBlock(), + params.getFile()); ReadOpChecksumInfoProto checksumInfo = status.getReadOpChecksumInfo(); DataChecksum checksum = DataTransferProtoUtil.fromProto( @@ -403,15 +393,16 @@ public static RemoteBlockReader newBlockReader( Socket sock, String file, // Read the first chunk offset. long firstChunkOffset = checksumInfo.getChunkOffset(); - if ( firstChunkOffset < 0 || firstChunkOffset > startOffset || - firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) { + if ( firstChunkOffset < 0 || firstChunkOffset > params.getStartOffset() || + firstChunkOffset <= (params.getStartOffset() - checksum.getBytesPerChecksum())) { throw new IOException("BlockReader: error in first chunk offset (" + firstChunkOffset + ") startOffset is " + - startOffset + " for file " + file); + params.getStartOffset() + " for file " + params.getFile()); } - return new RemoteBlockReader(file, block.getBlockPoolId(), block.getBlockId(), - in, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock); + return new RemoteBlockReader(params.getFile(), params.getBlock().getBlockPoolId(), + params.getBlock().getBlockId(), in, checksum, params.getVerifyChecksum(), + params.getStartOffset(), firstChunkOffset, params.getLen(), sock); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java index b9a5c76ec31..c250a538984 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java @@ -246,24 +246,22 @@ private void readTrailingEmptyPacket() throws IOException { } } - protected RemoteBlockReader2(String file, String bpid, long blockId, - ReadableByteChannel in, DataChecksum checksum, boolean verifyChecksum, - long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock, - IOStreamPair ioStreams) { + protected RemoteBlockReader2(BlockReaderFactory.Params params, + DataChecksum checksum, long firstChunkOffset, ReadableByteChannel in) { // Path is used only for printing block and file information in debug - this.dnSock = dnSock; - this.ioStreams = ioStreams; + this.dnSock = params.getSocket(); + this.ioStreams = params.getIoStreamPair(); this.in = in; this.checksum = checksum; - this.verifyChecksum = verifyChecksum; - this.startOffset = Math.max( startOffset, 0 ); - this.filename = file; + this.verifyChecksum = params.getVerifyChecksum(); + this.startOffset = Math.max( params.getStartOffset(), 0 ); + this.filename = params.getFile(); // The total number of bytes that we need to transfer from the DN is // the amount that the user wants (bytesToRead), plus the padding at // the beginning in order to chunk-align. Note that the DN may elect // to send more than this amount if the read starts/ends mid-chunk. - this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset); + this.bytesNeededToFinish = params.getLen() + (startOffset - firstChunkOffset); bytesPerChecksum = this.checksum.getBytesPerChecksum(); checksumSize = this.checksum.getChecksumSize(); } @@ -373,16 +371,9 @@ public void readFully(byte[] buf, int off, int len) throws IOException { * @param clientName Client name * @return New BlockReader instance, or null on error. */ - public static BlockReader newBlockReader(Socket sock, String file, - ExtendedBlock block, - Token blockToken, - long startOffset, long len, - int bufferSize, boolean verifyChecksum, - String clientName, - DataEncryptionKey encryptionKey, - IOStreamPair ioStreams) + public static BlockReader newBlockReader(BlockReaderFactory.Params params) throws IOException { - + IOStreamPair ioStreams = params.getIoStreamPair(); ReadableByteChannel ch; if (ioStreams.in instanceof SocketInputWrapper) { ch = ((SocketInputWrapper)ioStreams.in).getReadableByteChannel(); @@ -393,7 +384,8 @@ public static BlockReader newBlockReader(Socket sock, String file, // in and out will be closed when sock is closed (by the caller) final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( ioStreams.out)); - new Sender(out).readBlock(block, blockToken, clientName, startOffset, len); + new Sender(out).readBlock(params.getBlock(), params.getBlockToken(), + params.getClientName(), params.getStartOffset(), params.getLen()); // // Get bytes in block @@ -402,7 +394,8 @@ public static BlockReader newBlockReader(Socket sock, String file, BlockOpResponseProto status = BlockOpResponseProto.parseFrom( vintPrefixed(in)); - checkSuccess(status, sock, block, file); + checkSuccess(status, params.getSocket(), params.getBlock(), + params.getFile()); ReadOpChecksumInfoProto checksumInfo = status.getReadOpChecksumInfo(); DataChecksum checksum = DataTransferProtoUtil.fromProto( @@ -412,16 +405,14 @@ public static BlockReader newBlockReader(Socket sock, String file, // Read the first chunk offset. long firstChunkOffset = checksumInfo.getChunkOffset(); - if ( firstChunkOffset < 0 || firstChunkOffset > startOffset || - firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) { + if ( firstChunkOffset < 0 || firstChunkOffset > params.getStartOffset() || + firstChunkOffset <= (params.getStartOffset() - checksum.getBytesPerChecksum())) { throw new IOException("BlockReader: error in first chunk offset (" + - firstChunkOffset + ") startOffset is " + - startOffset + " for file " + file); + firstChunkOffset + ") startOffset is " + + params.getStartOffset() + " for file " + params.getFile()); } - return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(), - ch, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock, - ioStreams); + return new RemoteBlockReader2(params, checksum, firstChunkOffset, ch); } static void checkSuccess( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java index 3d03447a6c6..2cdfae7193f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java @@ -44,6 +44,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.BlockReader; import org.apache.hadoop.hdfs.BlockReaderFactory; +import org.apache.hadoop.hdfs.DFSClient.Conf; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -201,14 +202,16 @@ public static void streamBlockInAscii(InetSocketAddress addr, String poolId, s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); int amtToRead = (int)Math.min(chunkSizeToView, blockSize - offsetIntoBlock); - - // Use the block name for file name. - String file = BlockReaderFactory.getFileName(addr, poolId, blockId); + + // Use the block name for file name. BlockReader blockReader = BlockReaderFactory.newBlockReader( - conf, s, file, - new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken, - offsetIntoBlock, amtToRead, encryptionKey); - + new BlockReaderFactory.Params(new Conf(conf)). + setSocket(s). + setBlockToken(blockToken).setStartOffset(offsetIntoBlock). + setLen(amtToRead). + setEncryptionKey(encryptionKey). + setFile(BlockReaderFactory.getFileName(addr, poolId, blockId)). + setBlock(new ExtendedBlock(poolId, blockId, 0, genStamp))); byte[] buf = new byte[(int)amtToRead]; int readOffset = 0; int retries = 2; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java index 90542124076..32c643b560b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.BlockReader; import org.apache.hadoop.hdfs.BlockReaderFactory; import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSClient.Conf; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -556,12 +557,14 @@ private void copyBlock(DFSClient dfs, LocatedBlock lblock, s.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT); s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); - String file = BlockReaderFactory.getFileName(targetAddr, block.getBlockPoolId(), - block.getBlockId()); blockReader = BlockReaderFactory.newBlockReader( - conf, s, file, block, lblock - .getBlockToken(), 0, -1, - namenode.getRpcServer().getDataEncryptionKey()); + new BlockReaderFactory.Params(new Conf(conf)). + setSocket(s).setBlock(block). + setFile(BlockReaderFactory.getFileName(targetAddr, + block.getBlockPoolId(), block.getBlockId())). + setBlockToken(lblock.getBlockToken()). + setEncryptionKey(namenode.getRpcServer().getDataEncryptionKey()). + setLen(-1)); } catch (IOException ex) { // Put chosen node into dead list, continue diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java index 29d8063426e..be75e1cd1a5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSClient.Conf; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; @@ -150,12 +151,14 @@ public BlockReader getBlockReader(LocatedBlock testBlock, int offset, int lenToR sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); return BlockReaderFactory.newBlockReader( - new DFSClient.Conf(conf), - sock, targetAddr.toString()+ ":" + block.getBlockId(), block, - testBlock.getBlockToken(), - offset, lenToRead, - conf.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096), - true, "", null, null); + new BlockReaderFactory.Params(new Conf(conf)). + setSocket(sock). + setFile(targetAddr.toString() + ":" + block.getBlockId()). + setBlock(block).setBlockToken(testBlock.getBlockToken()). + setStartOffset(offset).setLen(lenToRead). + setBufferSize(conf.getInt( + CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096)). + setVerifyChecksum(true)); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java index c7dbf200b13..0dad0648879 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.BlockReader; import org.apache.hadoop.hdfs.BlockReaderFactory; import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSClient.Conf; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -145,9 +146,10 @@ private static void tryRead(Configuration conf, LocatedBlock lblock, String file = BlockReaderFactory.getFileName(targetAddr, "test-blockpoolid", block.getBlockId()); blockReader = BlockReaderFactory.newBlockReader( - conf, s, file, block, - lblock.getBlockToken(), 0, -1, null); - + new BlockReaderFactory.Params(new Conf(conf)). + setSocket(s).setBlock(block).setFile(file). + setBlockToken(lblock.getBlockToken()).setStartOffset(0). + setLen(-1)); } catch (IOException ex) { if (ex instanceof InvalidBlockTokenException) { assertFalse("OP_READ_BLOCK: access token is invalid, " diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java index 5a80098329b..10bcc3d7da0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java @@ -32,7 +32,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.BlockReader; import org.apache.hadoop.hdfs.BlockReaderFactory; +import org.apache.hadoop.hdfs.DFSClient.Conf; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; @@ -277,13 +279,13 @@ private void accessBlock(DatanodeInfo datanode, LocatedBlock lblock) s.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT); s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); - String file = BlockReaderFactory.getFileName(targetAddr, - "test-blockpoolid", - block.getBlockId()); - BlockReaderFactory.newBlockReader(conf, s, file, block, lblock - .getBlockToken(), 0, -1, null); - - // nothing - if it fails - it will throw and exception + BlockReader blockReader = BlockReaderFactory. + newBlockReader(new BlockReaderFactory.Params(new Conf(conf)). + setFile(BlockReaderFactory.getFileName(targetAddr, + "test-blockpoolid", block.getBlockId())). + setBlock(block).setBlockToken(lblock.getBlockToken()). + setSocket(s)); + blockReader.close(); } /**