HDFS-4661. A few little code cleanups of some HDFS-347-related code. Contributed by Colin Patrick McCabe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1483210 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2013-05-16 07:04:12 +00:00
parent 87a1e92453
commit e3021894e7
23 changed files with 116 additions and 118 deletions

View File

@ -291,13 +291,6 @@
<Bug pattern="OS_OPEN_STREAM" /> <Bug pattern="OS_OPEN_STREAM" />
</Match> </Match>
<!-- getShortCircuitFdsForRead is supposed to return open streams. -->
<Match>
<Class name="org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl" />
<Method name="getShortCircuitFdsForRead" />
<Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
</Match>
<!-- Don't complain about LocalDatanodeInfo's anonymous class --> <!-- Don't complain about LocalDatanodeInfo's anonymous class -->
<Match> <Match>
<Class name="org.apache.hadoop.hdfs.BlockReaderLocal$LocalDatanodeInfo$1" /> <Class name="org.apache.hadoop.hdfs.BlockReaderLocal$LocalDatanodeInfo$1" />

View File

@ -51,19 +51,9 @@ public interface BlockReader extends ByteBufferReadable {
/** /**
* Close the block reader. * Close the block reader.
* *
* @param peerCache The PeerCache to put the Peer we're using back
* into, or null if we should simply close the Peer
* we're using (along with its Socket).
* Ignored by Readers that don't maintain Peers.
* @param fisCache The FileInputStreamCache to put our FileInputStreams
* back into, or null if we should simply close them.
* Ignored by Readers that don't maintain
* FileInputStreams.
*
* @throws IOException * @throws IOException
*/ */
void close(PeerCache peerCache, FileInputStreamCache fisCache) void close() throws IOException;
throws IOException;
/** /**
* Read exactly the given amount of data, throwing an exception * Read exactly the given amount of data, throwing an exception

View File

@ -87,6 +87,8 @@ public static BlockReader newBlockReader(
Peer peer, Peer peer,
DatanodeID datanodeID, DatanodeID datanodeID,
DomainSocketFactory domSockFactory, DomainSocketFactory domSockFactory,
PeerCache peerCache,
FileInputStreamCache fisCache,
boolean allowShortCircuitLocalReads) boolean allowShortCircuitLocalReads)
throws IOException { throws IOException {
peer.setReadTimeout(conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, peer.setReadTimeout(conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
@ -101,16 +103,15 @@ public static BlockReader newBlockReader(
// enabled, try to set up a BlockReaderLocal. // enabled, try to set up a BlockReaderLocal.
BlockReader reader = newShortCircuitBlockReader(conf, file, BlockReader reader = newShortCircuitBlockReader(conf, file,
block, blockToken, startOffset, len, peer, datanodeID, block, blockToken, startOffset, len, peer, datanodeID,
domSockFactory, verifyChecksum); domSockFactory, verifyChecksum, fisCache);
if (reader != null) { if (reader != null) {
// One we've constructed the short-circuit block reader, we don't // One we've constructed the short-circuit block reader, we don't
// need the socket any more. So let's return it to the cache. // need the socket any more. So let's return it to the cache.
PeerCache peerCache = PeerCache.getInstance( if (peerCache != null) {
conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY,
DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT),
conf.getLong(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY,
DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT));
peerCache.put(datanodeID, peer); peerCache.put(datanodeID, peer);
} else {
IOUtils.cleanup(null, peer);
}
return reader; return reader;
} }
} }
@ -131,11 +132,11 @@ public static BlockReader newBlockReader(
block, blockToken, startOffset, len, block, blockToken, startOffset, len,
conf.getInt(DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY, conf.getInt(DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY,
DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT), DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT),
verifyChecksum, clientName, peer, datanodeID); verifyChecksum, clientName, peer, datanodeID, peerCache);
} else { } else {
return RemoteBlockReader2.newBlockReader( return RemoteBlockReader2.newBlockReader(
file, block, blockToken, startOffset, len, file, block, blockToken, startOffset, len,
verifyChecksum, clientName, peer, datanodeID); verifyChecksum, clientName, peer, datanodeID, peerCache);
} }
} }
@ -175,8 +176,8 @@ private static BlockReaderLocal newShortCircuitBlockReader(
Configuration conf, String file, ExtendedBlock block, Configuration conf, String file, ExtendedBlock block,
Token<BlockTokenIdentifier> blockToken, long startOffset, Token<BlockTokenIdentifier> blockToken, long startOffset,
long len, Peer peer, DatanodeID datanodeID, long len, Peer peer, DatanodeID datanodeID,
DomainSocketFactory domSockFactory, boolean verifyChecksum) DomainSocketFactory domSockFactory, boolean verifyChecksum,
throws IOException { FileInputStreamCache fisCache) throws IOException {
final DataOutputStream out = final DataOutputStream out =
new DataOutputStream(new BufferedOutputStream( new DataOutputStream(new BufferedOutputStream(
peer.getOutputStream())); peer.getOutputStream()));
@ -194,7 +195,8 @@ private static BlockReaderLocal newShortCircuitBlockReader(
sock.recvFileInputStreams(fis, buf, 0, buf.length); sock.recvFileInputStreams(fis, buf, 0, buf.length);
try { try {
reader = new BlockReaderLocal(conf, file, block, reader = new BlockReaderLocal(conf, file, block,
startOffset, len, fis[0], fis[1], datanodeID, verifyChecksum); startOffset, len, fis[0], fis[1], datanodeID, verifyChecksum,
fisCache);
} finally { } finally {
if (reader == null) { if (reader == null) {
IOUtils.cleanup(DFSClient.LOG, fis[0], fis[1]); IOUtils.cleanup(DFSClient.LOG, fis[0], fis[1]);

View File

@ -88,6 +88,8 @@ class BlockReaderLocal implements BlockReader {
private final DatanodeID datanodeID; private final DatanodeID datanodeID;
private final ExtendedBlock block; private final ExtendedBlock block;
private final FileInputStreamCache fisCache;
private static int getSlowReadBufferNumChunks(Configuration conf, private static int getSlowReadBufferNumChunks(Configuration conf,
int bytesPerChecksum) { int bytesPerChecksum) {
@ -109,13 +111,15 @@ private static int getSlowReadBufferNumChunks(Configuration conf,
public BlockReaderLocal(Configuration conf, String filename, public BlockReaderLocal(Configuration conf, String filename,
ExtendedBlock block, long startOffset, long length, ExtendedBlock block, long startOffset, long length,
FileInputStream dataIn, FileInputStream checksumIn, FileInputStream dataIn, FileInputStream checksumIn,
DatanodeID datanodeID, boolean verifyChecksum) throws IOException { DatanodeID datanodeID, boolean verifyChecksum,
FileInputStreamCache fisCache) throws IOException {
this.dataIn = dataIn; this.dataIn = dataIn;
this.checksumIn = checksumIn; this.checksumIn = checksumIn;
this.startOffset = Math.max(startOffset, 0); this.startOffset = Math.max(startOffset, 0);
this.filename = filename; this.filename = filename;
this.datanodeID = datanodeID; this.datanodeID = datanodeID;
this.block = block; this.block = block;
this.fisCache = fisCache;
// read and handle the common header here. For now just a version // read and handle the common header here. For now just a version
checksumIn.getChannel().position(0); checksumIn.getChannel().position(0);
@ -489,8 +493,7 @@ public synchronized long skip(long n) throws IOException {
} }
@Override @Override
public synchronized void close(PeerCache peerCache, public synchronized void close() throws IOException {
FileInputStreamCache fisCache) throws IOException {
if (fisCache != null) { if (fisCache != null) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("putting FileInputStream for " + filename + LOG.debug("putting FileInputStream for " + filename +

View File

@ -671,8 +671,7 @@ public synchronized long skip(long n) throws IOException {
} }
@Override @Override
public synchronized void close(PeerCache peerCache, public synchronized void close() throws IOException {
FileInputStreamCache fisCache) throws IOException {
IOUtils.cleanup(LOG, dataIn, checksumIn); IOUtils.cleanup(LOG, dataIn, checksumIn);
if (slowReadBuff != null) { if (slowReadBuff != null) {
bufferPool.returnBuffer(slowReadBuff); bufferPool.returnBuffer(slowReadBuff);

View File

@ -441,7 +441,7 @@ private synchronized DatanodeInfo blockSeekTo(long target) throws IOException {
// Will be getting a new BlockReader. // Will be getting a new BlockReader.
if (blockReader != null) { if (blockReader != null) {
blockReader.close(peerCache, fileInputStreamCache); blockReader.close();
blockReader = null; blockReader = null;
} }
@ -527,7 +527,7 @@ public synchronized void close() throws IOException {
dfsClient.checkOpen(); dfsClient.checkOpen();
if (blockReader != null) { if (blockReader != null) {
blockReader.close(peerCache, fileInputStreamCache); blockReader.close();
blockReader = null; blockReader = null;
} }
super.close(); super.close();
@ -855,7 +855,7 @@ private void fetchBlockByteRange(LocatedBlock block, long start, long end,
} }
} finally { } finally {
if (reader != null) { if (reader != null) {
reader.close(peerCache, fileInputStreamCache); reader.close();
} }
} }
// Put chosen node into dead list, continue // Put chosen node into dead list, continue
@ -924,7 +924,8 @@ protected BlockReader getBlockReader(InetSocketAddress dnAddr,
"the FileInputStreamCache."); "the FileInputStreamCache.");
} }
return new BlockReaderLocal(dfsClient.conf, file, return new BlockReaderLocal(dfsClient.conf, file,
block, startOffset, len, fis[0], fis[1], chosenNode, verifyChecksum); block, startOffset, len, fis[0], fis[1], chosenNode, verifyChecksum,
fileInputStreamCache);
} }
// If the legacy local block reader is enabled and we are reading a local // If the legacy local block reader is enabled and we are reading a local
@ -957,7 +958,8 @@ protected BlockReader getBlockReader(InetSocketAddress dnAddr,
reader = BlockReaderFactory.newBlockReader( reader = BlockReaderFactory.newBlockReader(
dfsClient.conf, file, block, blockToken, startOffset, dfsClient.conf, file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode, len, verifyChecksum, clientName, peer, chosenNode,
dsFactory, allowShortCircuitLocalReads); dsFactory, peerCache, fileInputStreamCache,
allowShortCircuitLocalReads);
return reader; return reader;
} catch (IOException ex) { } catch (IOException ex) {
DFSClient.LOG.debug("Error making BlockReader with DomainSocket. " + DFSClient.LOG.debug("Error making BlockReader with DomainSocket. " +
@ -979,7 +981,8 @@ protected BlockReader getBlockReader(InetSocketAddress dnAddr,
reader = BlockReaderFactory.newBlockReader( reader = BlockReaderFactory.newBlockReader(
dfsClient.conf, file, block, blockToken, startOffset, dfsClient.conf, file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode, len, verifyChecksum, clientName, peer, chosenNode,
dsFactory, allowShortCircuitLocalReads); dsFactory, peerCache, fileInputStreamCache,
allowShortCircuitLocalReads);
return reader; return reader;
} catch (IOException e) { } catch (IOException e) {
DFSClient.LOG.warn("failed to connect to " + domSock, e); DFSClient.LOG.warn("failed to connect to " + domSock, e);
@ -1002,7 +1005,7 @@ protected BlockReader getBlockReader(InetSocketAddress dnAddr,
reader = BlockReaderFactory.newBlockReader( reader = BlockReaderFactory.newBlockReader(
dfsClient.conf, file, block, blockToken, startOffset, dfsClient.conf, file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode, len, verifyChecksum, clientName, peer, chosenNode,
dsFactory, false); dsFactory, peerCache, fileInputStreamCache, false);
return reader; return reader;
} catch (IOException ex) { } catch (IOException ex) {
DFSClient.LOG.debug("Error making BlockReader. Closing stale " + DFSClient.LOG.debug("Error making BlockReader. Closing stale " +
@ -1021,7 +1024,7 @@ protected BlockReader getBlockReader(InetSocketAddress dnAddr,
return BlockReaderFactory.newBlockReader( return BlockReaderFactory.newBlockReader(
dfsClient.conf, file, block, blockToken, startOffset, dfsClient.conf, file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode, len, verifyChecksum, clientName, peer, chosenNode,
dsFactory, false); dsFactory, peerCache, fileInputStreamCache, false);
} }

View File

@ -118,8 +118,8 @@ public boolean equals(Object other) {
return false; return false;
} }
FileInputStreamCache.Key otherKey = (FileInputStreamCache.Key)other; FileInputStreamCache.Key otherKey = (FileInputStreamCache.Key)other;
return (block.equals(otherKey.block) & return (block.equals(otherKey.block) &&
(block.getGenerationStamp() == otherKey.block.getGenerationStamp()) & (block.getGenerationStamp() == otherKey.block.getGenerationStamp()) &&
datanodeID.equals(otherKey.datanodeID)); datanodeID.equals(otherKey.datanodeID));
} }
@ -233,8 +233,7 @@ public synchronized void close() {
executor.remove(cacheCleaner); executor.remove(cacheCleaner);
} }
for (Iterator<Entry<Key, Value>> iter = map.entries().iterator(); for (Iterator<Entry<Key, Value>> iter = map.entries().iterator();
iter.hasNext(); iter.hasNext();) {
iter = map.entries().iterator()) {
Entry<Key, Value> entry = iter.next(); Entry<Key, Value> entry = iter.next();
entry.getValue().close(); entry.getValue().close();
iter.remove(); iter.remove();

View File

@ -23,7 +23,6 @@
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -33,7 +32,6 @@
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
@ -41,9 +39,7 @@
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
@ -91,6 +87,8 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
/** Amount of unread data in the current received packet */ /** Amount of unread data in the current received packet */
int dataLeft = 0; int dataLeft = 0;
private final PeerCache peerCache;
/* FSInputChecker interface */ /* FSInputChecker interface */
/* same interface as inputStream java.io.InputStream#read() /* same interface as inputStream java.io.InputStream#read()
@ -324,7 +322,7 @@ protected synchronized int readChunk(long pos, byte[] buf, int offset,
private RemoteBlockReader(String file, String bpid, long blockId, private RemoteBlockReader(String file, String bpid, long blockId,
DataInputStream in, DataChecksum checksum, boolean verifyChecksum, DataInputStream in, DataChecksum checksum, boolean verifyChecksum,
long startOffset, long firstChunkOffset, long bytesToRead, Peer peer, long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
DatanodeID datanodeID) { DatanodeID datanodeID, PeerCache peerCache) {
// Path is used only for printing block and file information in debug // Path is used only for printing block and file information in debug
super(new Path("/blk_" + blockId + ":" + bpid + ":of:"+ file)/*too non path-like?*/, super(new Path("/blk_" + blockId + ":" + bpid + ":of:"+ file)/*too non path-like?*/,
1, verifyChecksum, 1, verifyChecksum,
@ -350,6 +348,7 @@ private RemoteBlockReader(String file, String bpid, long blockId,
bytesPerChecksum = this.checksum.getBytesPerChecksum(); bytesPerChecksum = this.checksum.getBytesPerChecksum();
checksumSize = this.checksum.getChecksumSize(); checksumSize = this.checksum.getChecksumSize();
this.peerCache = peerCache;
} }
/** /**
@ -373,7 +372,8 @@ public static RemoteBlockReader newBlockReader(String file,
long startOffset, long len, long startOffset, long len,
int bufferSize, boolean verifyChecksum, int bufferSize, boolean verifyChecksum,
String clientName, Peer peer, String clientName, Peer peer,
DatanodeID datanodeID) DatanodeID datanodeID,
PeerCache peerCache)
throws IOException { throws IOException {
// in and out will be closed when sock is closed (by the caller) // in and out will be closed when sock is closed (by the caller)
final DataOutputStream out = final DataOutputStream out =
@ -409,12 +409,11 @@ public static RemoteBlockReader newBlockReader(String file,
return new RemoteBlockReader(file, block.getBlockPoolId(), block.getBlockId(), return new RemoteBlockReader(file, block.getBlockPoolId(), block.getBlockId(),
in, checksum, verifyChecksum, startOffset, firstChunkOffset, len, in, checksum, verifyChecksum, startOffset, firstChunkOffset, len,
peer, datanodeID); peer, datanodeID, peerCache);
} }
@Override @Override
public synchronized void close(PeerCache peerCache, public synchronized void close() throws IOException {
FileInputStreamCache fisCache) throws IOException {
startOffset = -1; startOffset = -1;
checksum = null; checksum = null;
if (peerCache != null & sentStatusCode) { if (peerCache != null & sentStatusCode) {

View File

@ -82,6 +82,7 @@ public class RemoteBlockReader2 implements BlockReader {
final private Peer peer; final private Peer peer;
final private DatanodeID datanodeID; final private DatanodeID datanodeID;
final private PeerCache peerCache;
private final ReadableByteChannel in; private final ReadableByteChannel in;
private DataChecksum checksum; private DataChecksum checksum;
@ -253,7 +254,7 @@ private void readTrailingEmptyPacket() throws IOException {
protected RemoteBlockReader2(String file, String bpid, long blockId, protected RemoteBlockReader2(String file, String bpid, long blockId,
DataChecksum checksum, boolean verifyChecksum, DataChecksum checksum, boolean verifyChecksum,
long startOffset, long firstChunkOffset, long bytesToRead, Peer peer, long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
DatanodeID datanodeID) { DatanodeID datanodeID, PeerCache peerCache) {
// Path is used only for printing block and file information in debug // Path is used only for printing block and file information in debug
this.peer = peer; this.peer = peer;
this.datanodeID = datanodeID; this.datanodeID = datanodeID;
@ -262,6 +263,7 @@ protected RemoteBlockReader2(String file, String bpid, long blockId,
this.verifyChecksum = verifyChecksum; this.verifyChecksum = verifyChecksum;
this.startOffset = Math.max( startOffset, 0 ); this.startOffset = Math.max( startOffset, 0 );
this.filename = file; this.filename = file;
this.peerCache = peerCache;
// The total number of bytes that we need to transfer from the DN is // The total number of bytes that we need to transfer from the DN is
// the amount that the user wants (bytesToRead), plus the padding at // the amount that the user wants (bytesToRead), plus the padding at
@ -274,8 +276,7 @@ protected RemoteBlockReader2(String file, String bpid, long blockId,
@Override @Override
public synchronized void close(PeerCache peerCache, public synchronized void close() throws IOException {
FileInputStreamCache fisCache) throws IOException {
packetReceiver.close(); packetReceiver.close();
startOffset = -1; startOffset = -1;
checksum = null; checksum = null;
@ -365,8 +366,8 @@ public static BlockReader newBlockReader(String file,
long startOffset, long len, long startOffset, long len,
boolean verifyChecksum, boolean verifyChecksum,
String clientName, String clientName,
Peer peer, DatanodeID datanodeID) Peer peer, DatanodeID datanodeID,
throws IOException { PeerCache peerCache) throws IOException {
// in and out will be closed when sock is closed (by the caller) // in and out will be closed when sock is closed (by the caller)
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
peer.getOutputStream())); peer.getOutputStream()));
@ -399,7 +400,7 @@ public static BlockReader newBlockReader(String file,
return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(), return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(),
checksum, verifyChecksum, startOffset, firstChunkOffset, len, peer, checksum, verifyChecksum, startOffset, firstChunkOffset, len, peer,
datanodeID); datanodeID, peerCache);
} }
static void checkSuccess( static void checkSuccess(

View File

@ -215,7 +215,8 @@ public static void streamBlockInAscii(InetSocketAddress addr, String poolId,
offsetIntoBlock, amtToRead, true, offsetIntoBlock, amtToRead, true,
"JspHelper", TcpPeerServer.peerFromSocketAndKey(s, encryptionKey), "JspHelper", TcpPeerServer.peerFromSocketAndKey(s, encryptionKey),
new DatanodeID(addr.getAddress().toString(), new DatanodeID(addr.getAddress().toString(),
addr.getHostName(), poolId, addr.getPort(), 0, 0), null, false); addr.getHostName(), poolId, addr.getPort(), 0, 0), null,
null, null, false);
byte[] buf = new byte[(int)amtToRead]; byte[] buf = new byte[(int)amtToRead];
int readOffset = 0; int readOffset = 0;
@ -234,7 +235,7 @@ public static void streamBlockInAscii(InetSocketAddress addr, String poolId,
amtToRead -= numRead; amtToRead -= numRead;
readOffset += numRead; readOffset += numRead;
} }
blockReader.close(null, null); blockReader.close();
out.print(HtmlQuoting.quoteHtmlChars(new String(buf, Charsets.UTF_8))); out.print(HtmlQuoting.quoteHtmlChars(new String(buf, Charsets.UTF_8)));
} }

View File

@ -1137,7 +1137,17 @@ FileInputStream[] requestShortCircuitFdsForRead(final ExtendedBlock blk,
maxVersion); maxVersion);
} }
metrics.incrBlocksGetLocalPathInfo(); metrics.incrBlocksGetLocalPathInfo();
return data.getShortCircuitFdsForRead(blk); FileInputStream fis[] = new FileInputStream[2];
try {
fis[0] = (FileInputStream)data.getBlockInputStream(blk, 0);
fis[1] = (FileInputStream)data.getMetaDataInputStream(blk).getWrappedStream();
} catch (ClassCastException e) {
LOG.debug("requestShortCircuitFdsForRead failed", e);
throw new ShortCircuitFdsUnsupportedException("This DataNode's " +
"FsDatasetSpi does not support short-circuit local reads");
}
return fis;
} }
@Override @Override

View File

@ -282,16 +282,10 @@ public void requestShortCircuitFds(final ExtendedBlock blk,
DatanodeRegistration dnR = datanode.getDNRegistrationForBP(blk DatanodeRegistration dnR = datanode.getDNRegistrationForBP(blk
.getBlockPoolId()); .getBlockPoolId());
BlockSender.ClientTraceLog.info(String.format( BlockSender.ClientTraceLog.info(String.format(
String.format( "src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_FDS," +
"src: %s, dest: %s, op: %s, blockid: %s, srvID: %s, " + " blockid: %s, srvID: %s, success: %b",
"success: %b", blk.getBlockId(), dnR.getStorageID(), (fis != null)
"127.0.0.1", // src IP ));
"127.0.0.1", // dst IP
"REQUEST_SHORT_CIRCUIT_FDS", // operation
blk.getBlockId(), // block id
dnR.getStorageID(),
(fis != null)
)));
} }
if (fis != null) { if (fis != null) {
IOUtils.cleanup(LOG, fis); IOUtils.cleanup(LOG, fis);

View File

@ -19,7 +19,6 @@
import java.io.File; import java.io.File;
import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.List; import java.util.List;
@ -386,7 +385,4 @@ public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b
*/ */
public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks) public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks)
throws IOException; throws IOException;
FileInputStream[] getShortCircuitFdsForRead(ExtendedBlock block)
throws IOException;
} }

View File

@ -41,4 +41,8 @@ public LengthInputStream(InputStream in, long length) {
public long getLength() { public long getLength() {
return length; return length;
} }
public InputStream getWrappedStream() {
return in;
}
} }

View File

@ -1682,26 +1682,6 @@ public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
return info; return info;
} }
@Override // FsDatasetSpi
public FileInputStream[] getShortCircuitFdsForRead(ExtendedBlock block)
throws IOException {
File datafile = getBlockFile(block);
File metafile = FsDatasetUtil.getMetaFile(datafile,
block.getGenerationStamp());
FileInputStream fis[] = new FileInputStream[2];
boolean success = false;
try {
fis[0] = new FileInputStream(datafile);
fis[1] = new FileInputStream(metafile);
success = true;
return fis;
} finally {
if (!success) {
IOUtils.cleanup(null, fis);
}
}
}
@Override // FsDatasetSpi @Override // FsDatasetSpi
public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks) public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks)
throws IOException { throws IOException {

View File

@ -563,7 +563,7 @@ private void copyBlock(DFSClient dfs, LocatedBlock lblock,
conf, file, block, lblock.getBlockToken(), 0, -1, true, "fsck", conf, file, block, lblock.getBlockToken(), 0, -1, true, "fsck",
TcpPeerServer.peerFromSocketAndKey(s, namenode.getRpcServer(). TcpPeerServer.peerFromSocketAndKey(s, namenode.getRpcServer().
getDataEncryptionKey()), getDataEncryptionKey()),
chosenNode, null, false); chosenNode, null, null, null, false);
} catch (IOException ex) { } catch (IOException ex) {
// Put chosen node into dead list, continue // Put chosen node into dead list, continue

View File

@ -66,3 +66,33 @@ HDFS Short-Circuit Local Reads
</property> </property>
</configuration> </configuration>
---- ----
* {Configuration Keys}
* dfs.client.read.shortcircuit
This configuration parameter turns on short-circuit local reads.
* dfs.client.read.shortcircuit.skip.checkusm
If this configuration parameter is set, short-circuit local reads will skip
checksums. This is normally not recommended, but it may be useful for
special setups. You might consider using this if you are doing your own
checksumming outside of HDFS.
* dfs.client.read.shortcircuit.streams.cache.size
The DFSClient maintains a cache of recently opened file descriptors. This
parameter controls the size of that cache. Setting this higher will use more
file descriptors, but potentially provide better performance on workloads
involving lots of seeks.
* dfs.client.read.shortcircuit.streams.cache.expiry.ms
This controls the minimum amount of time file descriptors need to sit in the
FileInputStreamCache before they can be closed for being inactive for too long.
* dfs.client.domain.socket.data.traffic
This control whether we will try to pass normal data traffic over UNIX domain
sockets.

View File

@ -155,7 +155,7 @@ public BlockReader getBlockReader(LocatedBlock testBlock, int offset, int lenToR
testBlock.getBlockToken(), testBlock.getBlockToken(),
offset, lenToRead, offset, lenToRead,
true, "BlockReaderTestUtil", TcpPeerServer.peerFromSocket(sock), true, "BlockReaderTestUtil", TcpPeerServer.peerFromSocket(sock),
nodes[0], null, false); nodes[0], null, null, null, false);
} }
/** /**

View File

@ -127,7 +127,7 @@ public void runBlockReaderLocalTest(BlockReaderLocalTest test,
checkIn = new FileInputStream(metaFile); checkIn = new FileInputStream(metaFile);
blockReaderLocal = new BlockReaderLocal(conf, blockReaderLocal = new BlockReaderLocal(conf,
TEST_PATH.getName(), block, 0, -1, TEST_PATH.getName(), block, 0, -1,
dataIn, checkIn, datanodeID, checksum); dataIn, checkIn, datanodeID, checksum, null);
dataIn = null; dataIn = null;
checkIn = null; checkIn = null;
test.doTest(blockReaderLocal, original); test.doTest(blockReaderLocal, original);
@ -136,7 +136,7 @@ public void runBlockReaderLocalTest(BlockReaderLocalTest test,
if (cluster != null) cluster.shutdown(); if (cluster != null) cluster.shutdown();
if (dataIn != null) dataIn.close(); if (dataIn != null) dataIn.close();
if (checkIn != null) checkIn.close(); if (checkIn != null) checkIn.close();
if (blockReaderLocal != null) blockReaderLocal.close(null, null); if (blockReaderLocal != null) blockReaderLocal.close();
} }
} }

View File

@ -61,7 +61,7 @@ public void testBlockVerification() throws Exception {
util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024)); util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true); util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true);
verify(reader).sendReadResult(Status.CHECKSUM_OK); verify(reader).sendReadResult(Status.CHECKSUM_OK);
reader.close(null, null); reader.close();
} }
/** /**
@ -76,7 +76,7 @@ public void testIncompleteRead() throws Exception {
// We asked the blockreader for the whole file, and only read // We asked the blockreader for the whole file, and only read
// half of it, so no CHECKSUM_OK // half of it, so no CHECKSUM_OK
verify(reader, never()).sendReadResult(Status.CHECKSUM_OK); verify(reader, never()).sendReadResult(Status.CHECKSUM_OK);
reader.close(null, null); reader.close();
} }
/** /**
@ -92,7 +92,7 @@ public void testCompletePartialRead() throws Exception {
// And read half the file // And read half the file
util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true); util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true);
verify(reader).sendReadResult(Status.CHECKSUM_OK); verify(reader).sendReadResult(Status.CHECKSUM_OK);
reader.close(null, null); reader.close();
} }
/** /**
@ -111,7 +111,7 @@ public void testUnalignedReads() throws Exception {
util.getBlockReader(testBlock, startOffset, length)); util.getBlockReader(testBlock, startOffset, length));
util.readAndCheckEOS(reader, length, true); util.readAndCheckEOS(reader, length, true);
verify(reader).sendReadResult(Status.CHECKSUM_OK); verify(reader).sendReadResult(Status.CHECKSUM_OK);
reader.close(null, null); reader.close();
} }
} }
} }

View File

@ -148,7 +148,7 @@ private static void tryRead(Configuration conf, LocatedBlock lblock,
blockReader = BlockReaderFactory.newBlockReader( blockReader = BlockReaderFactory.newBlockReader(
conf, file, block, lblock.getBlockToken(), 0, -1, conf, file, block, lblock.getBlockToken(), 0, -1,
true, "TestBlockTokenWithDFS", TcpPeerServer.peerFromSocket(s), true, "TestBlockTokenWithDFS", TcpPeerServer.peerFromSocket(s),
nodes[0], null, false); nodes[0], null, null, null, false);
} catch (IOException ex) { } catch (IOException ex) {
if (ex instanceof InvalidBlockTokenException) { if (ex instanceof InvalidBlockTokenException) {

View File

@ -964,12 +964,6 @@ public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public FileInputStream[] getShortCircuitFdsForRead(ExtendedBlock block)
throws IOException {
throw new UnsupportedOperationException();
}
@Override @Override
public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks) public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks)
throws IOException { throws IOException {

View File

@ -285,8 +285,8 @@ private void accessBlock(DatanodeInfo datanode, LocatedBlock lblock)
BlockReader blockReader = BlockReader blockReader =
BlockReaderFactory.newBlockReader(conf, file, block, BlockReaderFactory.newBlockReader(conf, file, block,
lblock.getBlockToken(), 0, -1, true, "TestDataNodeVolumeFailure", lblock.getBlockToken(), 0, -1, true, "TestDataNodeVolumeFailure",
TcpPeerServer.peerFromSocket(s), datanode, null, false); TcpPeerServer.peerFromSocket(s), datanode, null, null, null, false);
blockReader.close(null, null); blockReader.close();
} }
/** /**