HDFS-4352. Encapsulate arguments to BlockReaderFactory in a class. Contributed by Colin Patrick McCabe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1428729 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2013-01-04 06:59:12 +00:00
parent 228de3c987
commit 32052a1e3a
11 changed files with 219 additions and 142 deletions

View File

@ -177,6 +177,9 @@ Trunk (Unreleased)
HDFS-4346. Add SequentialNumber as a base class for INodeId and
GenerationStamp. (szetszwo)
HDFS-4352. Encapsulate arguments to BlockReaderFactory in a class
(Colin Patrick McCabe via todd)
OPTIMIZATIONS
BUG FIXES

View File

@ -22,7 +22,6 @@
import java.net.Socket;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSClient.Conf;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
@ -40,71 +39,150 @@
@InterfaceAudience.Private
public class BlockReaderFactory {
/**
* @see #newBlockReader(Conf, Socket, String, ExtendedBlock, Token, long, long, int, boolean, String)
* Parameters for creating a BlockReader.
*
* Before you add something to here: think about whether it's already included
* in Conf (or should be).
*/
public static BlockReader newBlockReader(
Configuration conf,
Socket sock, String file,
ExtendedBlock block, Token<BlockTokenIdentifier> blockToken,
long startOffset, long len, DataEncryptionKey encryptionKey)
throws IOException {
int bufferSize = conf.getInt(DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY,
DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT);
return newBlockReader(new Conf(conf),
sock, file, block, blockToken, startOffset,
len, bufferSize, true, "", encryptionKey, null);
@InterfaceAudience.Private
public static class Params {
private final Conf conf;
private Socket socket = null;
private String file = null;
private ExtendedBlock block = null;
private Token<BlockTokenIdentifier> blockToken = null;
private long startOffset = 0;
private long len = -1;
private int bufferSize;
private boolean verifyChecksum = true;
private boolean shortCircuitLocalReads = false;
private String clientName = "";
private DataEncryptionKey encryptionKey = null;
private IOStreamPair ioStreamPair = null;
public Params(Conf conf) {
this.conf = conf;
this.bufferSize = conf.ioBufferSize;
}
public Conf getConf() {
return conf;
}
public Socket getSocket() {
return socket;
}
public Params setSocket(Socket socket) {
this.socket = socket;
return this;
}
public String getFile() {
return file;
}
public Params setFile(String file) {
this.file = file;
return this;
}
public ExtendedBlock getBlock() {
return block;
}
public Params setBlock(ExtendedBlock block) {
this.block = block;
return this;
}
public Token<BlockTokenIdentifier> getBlockToken() {
return blockToken;
}
public Params setBlockToken(Token<BlockTokenIdentifier> blockToken) {
this.blockToken = blockToken;
return this;
}
public long getStartOffset() {
return startOffset;
}
public Params setStartOffset(long startOffset) {
this.startOffset = startOffset;
return this;
}
public long getLen() {
return len;
}
public Params setLen(long len) {
this.len = len;
return this;
}
public int getBufferSize() {
return bufferSize;
}
public Params setBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
return this;
}
public boolean getVerifyChecksum() {
return verifyChecksum;
}
public Params setVerifyChecksum(boolean verifyChecksum) {
this.verifyChecksum = verifyChecksum;
return this;
}
public boolean getShortCircuitLocalReads() {
return shortCircuitLocalReads;
}
public Params setShortCircuitLocalReads(boolean on) {
this.shortCircuitLocalReads = on;
return this;
}
public String getClientName() {
return clientName;
}
public Params setClientName(String clientName) {
this.clientName = clientName;
return this;
}
public Params setEncryptionKey(DataEncryptionKey encryptionKey) {
this.encryptionKey = encryptionKey;
return this;
}
public DataEncryptionKey getEncryptionKey() {
return encryptionKey;
}
public IOStreamPair getIoStreamPair() {
return ioStreamPair;
}
public Params setIoStreamPair(IOStreamPair ioStreamPair) {
this.ioStreamPair = ioStreamPair;
return this;
}
}
/**
* Create a new BlockReader specifically to satisfy a read.
* This method also sends the OP_READ_BLOCK request.
*
* @param conf the DFSClient configuration
* @param sock An established Socket to the DN. The BlockReader will not close it normally
* @param file File location
* @param block The block object
* @param blockToken The block token for security
* @param startOffset The read offset, relative to block head
* @param len The number of bytes to read
* @param bufferSize The IO buffer size (not the client buffer size)
* @param verifyChecksum Whether to verify checksum
* @param clientName Client name
* @return New BlockReader instance, or null on error.
* @param params The parameters
*
* @return New BlockReader instance
* @throws IOException If there was an error creating the BlockReader
*/
@SuppressWarnings("deprecation")
public static BlockReader newBlockReader(
Conf conf,
Socket sock, String file,
ExtendedBlock block,
Token<BlockTokenIdentifier> blockToken,
long startOffset, long len,
int bufferSize, boolean verifyChecksum,
String clientName,
DataEncryptionKey encryptionKey,
IOStreamPair ioStreams)
throws IOException {
if (conf.useLegacyBlockReader) {
if (encryptionKey != null) {
public static BlockReader newBlockReader(Params params) throws IOException {
if (params.getConf().useLegacyBlockReader) {
if (params.getEncryptionKey() != null) {
throw new RuntimeException("Encryption is not supported with the legacy block reader.");
}
return RemoteBlockReader.newBlockReader(
sock, file, block, blockToken, startOffset, len, bufferSize, verifyChecksum, clientName);
return RemoteBlockReader.newBlockReader(params);
} else {
if (ioStreams == null) {
ioStreams = new IOStreamPair(NetUtils.getInputStream(sock),
NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT));
if (encryptionKey != null) {
Socket sock = params.getSocket();
if (params.getIoStreamPair() == null) {
params.setIoStreamPair(new IOStreamPair(NetUtils.getInputStream(sock),
NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT)));
if (params.getEncryptionKey() != null) {
IOStreamPair encryptedStreams =
DataTransferEncryptor.getEncryptedStreams(
ioStreams.out, ioStreams.in, encryptionKey);
ioStreams = encryptedStreams;
params.getIoStreamPair().out, params.getIoStreamPair().in,
params.getEncryptionKey());
params.setIoStreamPair(encryptedStreams);
}
}
return RemoteBlockReader2.newBlockReader(
sock, file, block, blockToken, startOffset, len, bufferSize,
verifyChecksum, clientName, encryptionKey, ioStreams);
return RemoteBlockReader2.newBlockReader(params);
}
}

View File

@ -200,7 +200,8 @@ public class DFSClient implements java.io.Closeable {
/**
* DFSClient configuration
*/
static class Conf {
@InterfaceAudience.Private
public static class Conf {
final int maxFailoverAttempts;
final int failoverSleepBaseMillis;
final int failoverSleepMaxMillis;
@ -228,7 +229,7 @@ static class Conf {
final int getFileBlockStorageLocationsNumThreads;
final int getFileBlockStorageLocationsTimeout;
Conf(Configuration conf) {
public Conf(Configuration conf) {
maxFailoverAttempts = conf.getInt(
DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);

View File

@ -934,15 +934,15 @@ protected BlockReader getBlockReader(InetSocketAddress dnAddr,
try {
// The OP_READ_BLOCK request is sent as we make the BlockReader
BlockReader reader =
BlockReaderFactory.newBlockReader(dfsClient.getConf(),
sock, file, block,
blockToken,
startOffset, len,
bufferSize, verifyChecksum,
clientName,
dfsClient.getDataEncryptionKey(),
sockAndStreams == null ? null : sockAndStreams.ioStreams);
BlockReader reader = BlockReaderFactory.
newBlockReader(new BlockReaderFactory.Params(dfsClient.getConf()).
setFile(file).setBlock(block).setBlockToken(blockToken).
setStartOffset(startOffset).setLen(len).
setBufferSize(bufferSize).setVerifyChecksum(verifyChecksum).
setClientName(clientName).
setEncryptionKey(dfsClient.getDataEncryptionKey()).
setIoStreamPair(sockAndStreams == null ? null : sockAndStreams.ioStreams).
setSocket(sock));
return reader;
} catch (IOException ex) {
// Our socket is no good.

View File

@ -349,13 +349,6 @@ private RemoteBlockReader(String file, String bpid, long blockId,
checksumSize = this.checksum.getChecksumSize();
}
public static RemoteBlockReader newBlockReader(Socket sock, String file,
ExtendedBlock block, Token<BlockTokenIdentifier> blockToken,
long startOffset, long len, int bufferSize) throws IOException {
return newBlockReader(sock, file, block, blockToken, startOffset,
len, bufferSize, true, "");
}
/**
* Create a new BlockReader specifically to satisfy a read.
* This method also sends the OP_READ_BLOCK request.
@ -371,29 +364,26 @@ public static RemoteBlockReader newBlockReader(Socket sock, String file,
* @param clientName Client name
* @return New BlockReader instance, or null on error.
*/
public static RemoteBlockReader newBlockReader( Socket sock, String file,
ExtendedBlock block,
Token<BlockTokenIdentifier> blockToken,
long startOffset, long len,
int bufferSize, boolean verifyChecksum,
String clientName)
throws IOException {
public static RemoteBlockReader newBlockReader(BlockReaderFactory.Params params)
throws IOException {
// in and out will be closed when sock is closed (by the caller)
Socket sock = params.getSocket();
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT)));
new Sender(out).readBlock(block, blockToken, clientName, startOffset, len);
new Sender(out).readBlock(params.getBlock(), params.getBlockToken(),
params.getClientName(), params.getStartOffset(), params.getLen());
//
// Get bytes in block, set streams
//
DataInputStream in = new DataInputStream(
new BufferedInputStream(NetUtils.getInputStream(sock),
bufferSize));
params.getBufferSize()));
BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
vintPrefixed(in));
RemoteBlockReader2.checkSuccess(status, sock, block, file);
RemoteBlockReader2.checkSuccess(status, sock, params.getBlock(),
params.getFile());
ReadOpChecksumInfoProto checksumInfo =
status.getReadOpChecksumInfo();
DataChecksum checksum = DataTransferProtoUtil.fromProto(
@ -403,15 +393,16 @@ public static RemoteBlockReader newBlockReader( Socket sock, String file,
// Read the first chunk offset.
long firstChunkOffset = checksumInfo.getChunkOffset();
if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) {
if ( firstChunkOffset < 0 || firstChunkOffset > params.getStartOffset() ||
firstChunkOffset <= (params.getStartOffset() - checksum.getBytesPerChecksum())) {
throw new IOException("BlockReader: error in first chunk offset (" +
firstChunkOffset + ") startOffset is " +
startOffset + " for file " + file);
params.getStartOffset() + " for file " + params.getFile());
}
return new RemoteBlockReader(file, block.getBlockPoolId(), block.getBlockId(),
in, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock);
return new RemoteBlockReader(params.getFile(), params.getBlock().getBlockPoolId(),
params.getBlock().getBlockId(), in, checksum, params.getVerifyChecksum(),
params.getStartOffset(), firstChunkOffset, params.getLen(), sock);
}
@Override

View File

@ -246,24 +246,22 @@ private void readTrailingEmptyPacket() throws IOException {
}
}
protected RemoteBlockReader2(String file, String bpid, long blockId,
ReadableByteChannel in, DataChecksum checksum, boolean verifyChecksum,
long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock,
IOStreamPair ioStreams) {
protected RemoteBlockReader2(BlockReaderFactory.Params params,
DataChecksum checksum, long firstChunkOffset, ReadableByteChannel in) {
// Path is used only for printing block and file information in debug
this.dnSock = dnSock;
this.ioStreams = ioStreams;
this.dnSock = params.getSocket();
this.ioStreams = params.getIoStreamPair();
this.in = in;
this.checksum = checksum;
this.verifyChecksum = verifyChecksum;
this.startOffset = Math.max( startOffset, 0 );
this.filename = file;
this.verifyChecksum = params.getVerifyChecksum();
this.startOffset = Math.max( params.getStartOffset(), 0 );
this.filename = params.getFile();
// The total number of bytes that we need to transfer from the DN is
// the amount that the user wants (bytesToRead), plus the padding at
// the beginning in order to chunk-align. Note that the DN may elect
// to send more than this amount if the read starts/ends mid-chunk.
this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
this.bytesNeededToFinish = params.getLen() + (startOffset - firstChunkOffset);
bytesPerChecksum = this.checksum.getBytesPerChecksum();
checksumSize = this.checksum.getChecksumSize();
}
@ -373,16 +371,9 @@ public void readFully(byte[] buf, int off, int len) throws IOException {
* @param clientName Client name
* @return New BlockReader instance, or null on error.
*/
public static BlockReader newBlockReader(Socket sock, String file,
ExtendedBlock block,
Token<BlockTokenIdentifier> blockToken,
long startOffset, long len,
int bufferSize, boolean verifyChecksum,
String clientName,
DataEncryptionKey encryptionKey,
IOStreamPair ioStreams)
public static BlockReader newBlockReader(BlockReaderFactory.Params params)
throws IOException {
IOStreamPair ioStreams = params.getIoStreamPair();
ReadableByteChannel ch;
if (ioStreams.in instanceof SocketInputWrapper) {
ch = ((SocketInputWrapper)ioStreams.in).getReadableByteChannel();
@ -393,7 +384,8 @@ public static BlockReader newBlockReader(Socket sock, String file,
// in and out will be closed when sock is closed (by the caller)
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
ioStreams.out));
new Sender(out).readBlock(block, blockToken, clientName, startOffset, len);
new Sender(out).readBlock(params.getBlock(), params.getBlockToken(),
params.getClientName(), params.getStartOffset(), params.getLen());
//
// Get bytes in block
@ -402,7 +394,8 @@ public static BlockReader newBlockReader(Socket sock, String file,
BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
vintPrefixed(in));
checkSuccess(status, sock, block, file);
checkSuccess(status, params.getSocket(), params.getBlock(),
params.getFile());
ReadOpChecksumInfoProto checksumInfo =
status.getReadOpChecksumInfo();
DataChecksum checksum = DataTransferProtoUtil.fromProto(
@ -412,16 +405,14 @@ public static BlockReader newBlockReader(Socket sock, String file,
// Read the first chunk offset.
long firstChunkOffset = checksumInfo.getChunkOffset();
if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) {
if ( firstChunkOffset < 0 || firstChunkOffset > params.getStartOffset() ||
firstChunkOffset <= (params.getStartOffset() - checksum.getBytesPerChecksum())) {
throw new IOException("BlockReader: error in first chunk offset (" +
firstChunkOffset + ") startOffset is " +
startOffset + " for file " + file);
firstChunkOffset + ") startOffset is " +
params.getStartOffset() + " for file " + params.getFile());
}
return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(),
ch, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock,
ioStreams);
return new RemoteBlockReader2(params, checksum, firstChunkOffset, ch);
}
static void checkSuccess(

View File

@ -44,6 +44,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.BlockReaderFactory;
import org.apache.hadoop.hdfs.DFSClient.Conf;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@ -201,14 +202,16 @@ public static void streamBlockInAscii(InetSocketAddress addr, String poolId,
s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
int amtToRead = (int)Math.min(chunkSizeToView, blockSize - offsetIntoBlock);
// Use the block name for file name.
String file = BlockReaderFactory.getFileName(addr, poolId, blockId);
// Use the block name for file name.
BlockReader blockReader = BlockReaderFactory.newBlockReader(
conf, s, file,
new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken,
offsetIntoBlock, amtToRead, encryptionKey);
new BlockReaderFactory.Params(new Conf(conf)).
setSocket(s).
setBlockToken(blockToken).setStartOffset(offsetIntoBlock).
setLen(amtToRead).
setEncryptionKey(encryptionKey).
setFile(BlockReaderFactory.getFileName(addr, poolId, blockId)).
setBlock(new ExtendedBlock(poolId, blockId, 0, genStamp)));
byte[] buf = new byte[(int)amtToRead];
int readOffset = 0;
int retries = 2;

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.BlockReaderFactory;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSClient.Conf;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -556,12 +557,14 @@ private void copyBlock(DFSClient dfs, LocatedBlock lblock,
s.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT);
s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
String file = BlockReaderFactory.getFileName(targetAddr, block.getBlockPoolId(),
block.getBlockId());
blockReader = BlockReaderFactory.newBlockReader(
conf, s, file, block, lblock
.getBlockToken(), 0, -1,
namenode.getRpcServer().getDataEncryptionKey());
new BlockReaderFactory.Params(new Conf(conf)).
setSocket(s).setBlock(block).
setFile(BlockReaderFactory.getFileName(targetAddr,
block.getBlockPoolId(), block.getBlockId())).
setBlockToken(lblock.getBlockToken()).
setEncryptionKey(namenode.getRpcServer().getDataEncryptionKey()).
setLen(-1));
} catch (IOException ex) {
// Put chosen node into dead list, continue

View File

@ -31,6 +31,7 @@
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient.Conf;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@ -150,12 +151,14 @@ public BlockReader getBlockReader(LocatedBlock testBlock, int offset, int lenToR
sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
return BlockReaderFactory.newBlockReader(
new DFSClient.Conf(conf),
sock, targetAddr.toString()+ ":" + block.getBlockId(), block,
testBlock.getBlockToken(),
offset, lenToRead,
conf.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
true, "", null, null);
new BlockReaderFactory.Params(new Conf(conf)).
setSocket(sock).
setFile(targetAddr.toString() + ":" + block.getBlockId()).
setBlock(block).setBlockToken(testBlock.getBlockToken()).
setStartOffset(offset).setLen(lenToRead).
setBufferSize(conf.getInt(
CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096)).
setVerifyChecksum(true));
}
/**

View File

@ -39,6 +39,7 @@
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.BlockReaderFactory;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSClient.Conf;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@ -145,9 +146,10 @@ private static void tryRead(Configuration conf, LocatedBlock lblock,
String file = BlockReaderFactory.getFileName(targetAddr,
"test-blockpoolid", block.getBlockId());
blockReader = BlockReaderFactory.newBlockReader(
conf, s, file, block,
lblock.getBlockToken(), 0, -1, null);
new BlockReaderFactory.Params(new Conf(conf)).
setSocket(s).setBlock(block).setFile(file).
setBlockToken(lblock.getBlockToken()).setStartOffset(0).
setLen(-1));
} catch (IOException ex) {
if (ex instanceof InvalidBlockTokenException) {
assertFalse("OP_READ_BLOCK: access token is invalid, "

View File

@ -32,7 +32,9 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.BlockReaderFactory;
import org.apache.hadoop.hdfs.DFSClient.Conf;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
@ -277,13 +279,13 @@ private void accessBlock(DatanodeInfo datanode, LocatedBlock lblock)
s.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT);
s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
String file = BlockReaderFactory.getFileName(targetAddr,
"test-blockpoolid",
block.getBlockId());
BlockReaderFactory.newBlockReader(conf, s, file, block, lblock
.getBlockToken(), 0, -1, null);
// nothing - if it fails - it will throw and exception
BlockReader blockReader = BlockReaderFactory.
newBlockReader(new BlockReaderFactory.Params(new Conf(conf)).
setFile(BlockReaderFactory.getFileName(targetAddr,
"test-blockpoolid", block.getBlockId())).
setBlock(block).setBlockToken(lblock.getBlockToken()).
setSocket(s));
blockReader.close();
}
/**