HDFS-4661. A few little code cleanups of some HDFS-347-related code. Contributed by Colin Patrick McCabe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1480839 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2013-05-10 00:03:17 +00:00
parent 6012f67977
commit a18fd620d0
24 changed files with 119 additions and 118 deletions

View File

@ -390,6 +390,9 @@ Trunk (Unreleased)
HDFS-4538. Allow use of legacy blockreader (Colin Patrick McCabe via todd) HDFS-4538. Allow use of legacy blockreader (Colin Patrick McCabe via todd)
HDFS-4661. A few little code cleanups of some HDFS-347-related code. (Colin
Patrick McCabe via atm)
Release 2.0.5-beta - UNRELEASED Release 2.0.5-beta - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -291,13 +291,6 @@
<Bug pattern="OS_OPEN_STREAM" /> <Bug pattern="OS_OPEN_STREAM" />
</Match> </Match>
<!-- getShortCircuitFdsForRead is supposed to return open streams. -->
<Match>
<Class name="org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl" />
<Method name="getShortCircuitFdsForRead" />
<Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
</Match>
<!-- Don't complain about LocalDatanodeInfo's anonymous class --> <!-- Don't complain about LocalDatanodeInfo's anonymous class -->
<Match> <Match>
<Class name="org.apache.hadoop.hdfs.BlockReaderLocal$LocalDatanodeInfo$1" /> <Class name="org.apache.hadoop.hdfs.BlockReaderLocal$LocalDatanodeInfo$1" />

View File

@ -51,19 +51,9 @@ public interface BlockReader extends ByteBufferReadable {
/** /**
* Close the block reader. * Close the block reader.
* *
* @param peerCache The PeerCache to put the Peer we're using back
* into, or null if we should simply close the Peer
* we're using (along with its Socket).
* Ignored by Readers that don't maintain Peers.
* @param fisCache The FileInputStreamCache to put our FileInputStreams
* back into, or null if we should simply close them.
* Ignored by Readers that don't maintain
* FileInputStreams.
*
* @throws IOException * @throws IOException
*/ */
void close(PeerCache peerCache, FileInputStreamCache fisCache) void close() throws IOException;
throws IOException;
/** /**
* Read exactly the given amount of data, throwing an exception * Read exactly the given amount of data, throwing an exception

View File

@ -87,6 +87,8 @@ public class BlockReaderFactory {
Peer peer, Peer peer,
DatanodeID datanodeID, DatanodeID datanodeID,
DomainSocketFactory domSockFactory, DomainSocketFactory domSockFactory,
PeerCache peerCache,
FileInputStreamCache fisCache,
boolean allowShortCircuitLocalReads) boolean allowShortCircuitLocalReads)
throws IOException { throws IOException {
peer.setReadTimeout(conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, peer.setReadTimeout(conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
@ -101,16 +103,15 @@ public class BlockReaderFactory {
// enabled, try to set up a BlockReaderLocal. // enabled, try to set up a BlockReaderLocal.
BlockReader reader = newShortCircuitBlockReader(conf, file, BlockReader reader = newShortCircuitBlockReader(conf, file,
block, blockToken, startOffset, len, peer, datanodeID, block, blockToken, startOffset, len, peer, datanodeID,
domSockFactory, verifyChecksum); domSockFactory, verifyChecksum, fisCache);
if (reader != null) { if (reader != null) {
// One we've constructed the short-circuit block reader, we don't // One we've constructed the short-circuit block reader, we don't
// need the socket any more. So let's return it to the cache. // need the socket any more. So let's return it to the cache.
PeerCache peerCache = PeerCache.getInstance( if (peerCache != null) {
conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, peerCache.put(datanodeID, peer);
DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT), } else {
conf.getLong(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY, IOUtils.cleanup(null, peer);
DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT)); }
peerCache.put(datanodeID, peer);
return reader; return reader;
} }
} }
@ -131,11 +132,11 @@ public class BlockReaderFactory {
block, blockToken, startOffset, len, block, blockToken, startOffset, len,
conf.getInt(DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY, conf.getInt(DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY,
DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT), DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT),
verifyChecksum, clientName, peer, datanodeID); verifyChecksum, clientName, peer, datanodeID, peerCache);
} else { } else {
return RemoteBlockReader2.newBlockReader( return RemoteBlockReader2.newBlockReader(
file, block, blockToken, startOffset, len, file, block, blockToken, startOffset, len,
verifyChecksum, clientName, peer, datanodeID); verifyChecksum, clientName, peer, datanodeID, peerCache);
} }
} }
@ -175,8 +176,8 @@ public class BlockReaderFactory {
Configuration conf, String file, ExtendedBlock block, Configuration conf, String file, ExtendedBlock block,
Token<BlockTokenIdentifier> blockToken, long startOffset, Token<BlockTokenIdentifier> blockToken, long startOffset,
long len, Peer peer, DatanodeID datanodeID, long len, Peer peer, DatanodeID datanodeID,
DomainSocketFactory domSockFactory, boolean verifyChecksum) DomainSocketFactory domSockFactory, boolean verifyChecksum,
throws IOException { FileInputStreamCache fisCache) throws IOException {
final DataOutputStream out = final DataOutputStream out =
new DataOutputStream(new BufferedOutputStream( new DataOutputStream(new BufferedOutputStream(
peer.getOutputStream())); peer.getOutputStream()));
@ -194,7 +195,8 @@ public class BlockReaderFactory {
sock.recvFileInputStreams(fis, buf, 0, buf.length); sock.recvFileInputStreams(fis, buf, 0, buf.length);
try { try {
reader = new BlockReaderLocal(conf, file, block, reader = new BlockReaderLocal(conf, file, block,
startOffset, len, fis[0], fis[1], datanodeID, verifyChecksum); startOffset, len, fis[0], fis[1], datanodeID, verifyChecksum,
fisCache);
} finally { } finally {
if (reader == null) { if (reader == null) {
IOUtils.cleanup(DFSClient.LOG, fis[0], fis[1]); IOUtils.cleanup(DFSClient.LOG, fis[0], fis[1]);

View File

@ -88,6 +88,8 @@ class BlockReaderLocal implements BlockReader {
private final DatanodeID datanodeID; private final DatanodeID datanodeID;
private final ExtendedBlock block; private final ExtendedBlock block;
private final FileInputStreamCache fisCache;
private static int getSlowReadBufferNumChunks(Configuration conf, private static int getSlowReadBufferNumChunks(Configuration conf,
int bytesPerChecksum) { int bytesPerChecksum) {
@ -109,13 +111,15 @@ class BlockReaderLocal implements BlockReader {
public BlockReaderLocal(Configuration conf, String filename, public BlockReaderLocal(Configuration conf, String filename,
ExtendedBlock block, long startOffset, long length, ExtendedBlock block, long startOffset, long length,
FileInputStream dataIn, FileInputStream checksumIn, FileInputStream dataIn, FileInputStream checksumIn,
DatanodeID datanodeID, boolean verifyChecksum) throws IOException { DatanodeID datanodeID, boolean verifyChecksum,
FileInputStreamCache fisCache) throws IOException {
this.dataIn = dataIn; this.dataIn = dataIn;
this.checksumIn = checksumIn; this.checksumIn = checksumIn;
this.startOffset = Math.max(startOffset, 0); this.startOffset = Math.max(startOffset, 0);
this.filename = filename; this.filename = filename;
this.datanodeID = datanodeID; this.datanodeID = datanodeID;
this.block = block; this.block = block;
this.fisCache = fisCache;
// read and handle the common header here. For now just a version // read and handle the common header here. For now just a version
checksumIn.getChannel().position(0); checksumIn.getChannel().position(0);
@ -489,8 +493,7 @@ class BlockReaderLocal implements BlockReader {
} }
@Override @Override
public synchronized void close(PeerCache peerCache, public synchronized void close() throws IOException {
FileInputStreamCache fisCache) throws IOException {
if (fisCache != null) { if (fisCache != null) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("putting FileInputStream for " + filename + LOG.debug("putting FileInputStream for " + filename +

View File

@ -671,8 +671,7 @@ class BlockReaderLocalLegacy implements BlockReader {
} }
@Override @Override
public synchronized void close(PeerCache peerCache, public synchronized void close() throws IOException {
FileInputStreamCache fisCache) throws IOException {
IOUtils.cleanup(LOG, dataIn, checksumIn); IOUtils.cleanup(LOG, dataIn, checksumIn);
if (slowReadBuff != null) { if (slowReadBuff != null) {
bufferPool.returnBuffer(slowReadBuff); bufferPool.returnBuffer(slowReadBuff);

View File

@ -441,7 +441,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
// Will be getting a new BlockReader. // Will be getting a new BlockReader.
if (blockReader != null) { if (blockReader != null) {
blockReader.close(peerCache, fileInputStreamCache); blockReader.close();
blockReader = null; blockReader = null;
} }
@ -527,7 +527,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
dfsClient.checkOpen(); dfsClient.checkOpen();
if (blockReader != null) { if (blockReader != null) {
blockReader.close(peerCache, fileInputStreamCache); blockReader.close();
blockReader = null; blockReader = null;
} }
super.close(); super.close();
@ -855,7 +855,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
} }
} finally { } finally {
if (reader != null) { if (reader != null) {
reader.close(peerCache, fileInputStreamCache); reader.close();
} }
} }
// Put chosen node into dead list, continue // Put chosen node into dead list, continue
@ -924,7 +924,8 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
"the FileInputStreamCache."); "the FileInputStreamCache.");
} }
return new BlockReaderLocal(dfsClient.conf, file, return new BlockReaderLocal(dfsClient.conf, file,
block, startOffset, len, fis[0], fis[1], chosenNode, verifyChecksum); block, startOffset, len, fis[0], fis[1], chosenNode, verifyChecksum,
fileInputStreamCache);
} }
// If the legacy local block reader is enabled and we are reading a local // If the legacy local block reader is enabled and we are reading a local
@ -957,7 +958,8 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
reader = BlockReaderFactory.newBlockReader( reader = BlockReaderFactory.newBlockReader(
dfsClient.conf, file, block, blockToken, startOffset, dfsClient.conf, file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode, len, verifyChecksum, clientName, peer, chosenNode,
dsFactory, allowShortCircuitLocalReads); dsFactory, peerCache, fileInputStreamCache,
allowShortCircuitLocalReads);
return reader; return reader;
} catch (IOException ex) { } catch (IOException ex) {
DFSClient.LOG.debug("Error making BlockReader with DomainSocket. " + DFSClient.LOG.debug("Error making BlockReader with DomainSocket. " +
@ -978,8 +980,9 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
shortCircuitLocalReads && (!shortCircuitForbidden()); shortCircuitLocalReads && (!shortCircuitForbidden());
reader = BlockReaderFactory.newBlockReader( reader = BlockReaderFactory.newBlockReader(
dfsClient.conf, file, block, blockToken, startOffset, dfsClient.conf, file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode, len, verifyChecksum, clientName, peer, chosenNode,
dsFactory, allowShortCircuitLocalReads); dsFactory, peerCache, fileInputStreamCache,
allowShortCircuitLocalReads);
return reader; return reader;
} catch (IOException e) { } catch (IOException e) {
DFSClient.LOG.warn("failed to connect to " + domSock, e); DFSClient.LOG.warn("failed to connect to " + domSock, e);
@ -1002,7 +1005,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
reader = BlockReaderFactory.newBlockReader( reader = BlockReaderFactory.newBlockReader(
dfsClient.conf, file, block, blockToken, startOffset, dfsClient.conf, file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode, len, verifyChecksum, clientName, peer, chosenNode,
dsFactory, false); dsFactory, peerCache, fileInputStreamCache, false);
return reader; return reader;
} catch (IOException ex) { } catch (IOException ex) {
DFSClient.LOG.debug("Error making BlockReader. Closing stale " + DFSClient.LOG.debug("Error making BlockReader. Closing stale " +
@ -1021,7 +1024,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
return BlockReaderFactory.newBlockReader( return BlockReaderFactory.newBlockReader(
dfsClient.conf, file, block, blockToken, startOffset, dfsClient.conf, file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode, len, verifyChecksum, clientName, peer, chosenNode,
dsFactory, false); dsFactory, peerCache, fileInputStreamCache, false);
} }

View File

@ -118,8 +118,8 @@ class FileInputStreamCache {
return false; return false;
} }
FileInputStreamCache.Key otherKey = (FileInputStreamCache.Key)other; FileInputStreamCache.Key otherKey = (FileInputStreamCache.Key)other;
return (block.equals(otherKey.block) & return (block.equals(otherKey.block) &&
(block.getGenerationStamp() == otherKey.block.getGenerationStamp()) & (block.getGenerationStamp() == otherKey.block.getGenerationStamp()) &&
datanodeID.equals(otherKey.datanodeID)); datanodeID.equals(otherKey.datanodeID));
} }
@ -233,8 +233,7 @@ class FileInputStreamCache {
executor.remove(cacheCleaner); executor.remove(cacheCleaner);
} }
for (Iterator<Entry<Key, Value>> iter = map.entries().iterator(); for (Iterator<Entry<Key, Value>> iter = map.entries().iterator();
iter.hasNext(); iter.hasNext();) {
iter = map.entries().iterator()) {
Entry<Key, Value> entry = iter.next(); Entry<Key, Value> entry = iter.next();
entry.getValue().close(); entry.getValue().close();
iter.remove(); iter.remove();

View File

@ -23,7 +23,6 @@ import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -33,7 +32,6 @@ import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
@ -41,9 +39,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumIn
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
@ -91,6 +87,8 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
/** Amount of unread data in the current received packet */ /** Amount of unread data in the current received packet */
int dataLeft = 0; int dataLeft = 0;
private final PeerCache peerCache;
/* FSInputChecker interface */ /* FSInputChecker interface */
/* same interface as inputStream java.io.InputStream#read() /* same interface as inputStream java.io.InputStream#read()
@ -324,7 +322,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
private RemoteBlockReader(String file, String bpid, long blockId, private RemoteBlockReader(String file, String bpid, long blockId,
DataInputStream in, DataChecksum checksum, boolean verifyChecksum, DataInputStream in, DataChecksum checksum, boolean verifyChecksum,
long startOffset, long firstChunkOffset, long bytesToRead, Peer peer, long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
DatanodeID datanodeID) { DatanodeID datanodeID, PeerCache peerCache) {
// Path is used only for printing block and file information in debug // Path is used only for printing block and file information in debug
super(new Path("/blk_" + blockId + ":" + bpid + ":of:"+ file)/*too non path-like?*/, super(new Path("/blk_" + blockId + ":" + bpid + ":of:"+ file)/*too non path-like?*/,
1, verifyChecksum, 1, verifyChecksum,
@ -350,6 +348,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
bytesPerChecksum = this.checksum.getBytesPerChecksum(); bytesPerChecksum = this.checksum.getBytesPerChecksum();
checksumSize = this.checksum.getChecksumSize(); checksumSize = this.checksum.getChecksumSize();
this.peerCache = peerCache;
} }
/** /**
@ -373,7 +372,8 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
long startOffset, long len, long startOffset, long len,
int bufferSize, boolean verifyChecksum, int bufferSize, boolean verifyChecksum,
String clientName, Peer peer, String clientName, Peer peer,
DatanodeID datanodeID) DatanodeID datanodeID,
PeerCache peerCache)
throws IOException { throws IOException {
// in and out will be closed when sock is closed (by the caller) // in and out will be closed when sock is closed (by the caller)
final DataOutputStream out = final DataOutputStream out =
@ -409,12 +409,11 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
return new RemoteBlockReader(file, block.getBlockPoolId(), block.getBlockId(), return new RemoteBlockReader(file, block.getBlockPoolId(), block.getBlockId(),
in, checksum, verifyChecksum, startOffset, firstChunkOffset, len, in, checksum, verifyChecksum, startOffset, firstChunkOffset, len,
peer, datanodeID); peer, datanodeID, peerCache);
} }
@Override @Override
public synchronized void close(PeerCache peerCache, public synchronized void close() throws IOException {
FileInputStreamCache fisCache) throws IOException {
startOffset = -1; startOffset = -1;
checksum = null; checksum = null;
if (peerCache != null & sentStatusCode) { if (peerCache != null & sentStatusCode) {

View File

@ -82,6 +82,7 @@ public class RemoteBlockReader2 implements BlockReader {
final private Peer peer; final private Peer peer;
final private DatanodeID datanodeID; final private DatanodeID datanodeID;
final private PeerCache peerCache;
private final ReadableByteChannel in; private final ReadableByteChannel in;
private DataChecksum checksum; private DataChecksum checksum;
@ -253,7 +254,7 @@ public class RemoteBlockReader2 implements BlockReader {
protected RemoteBlockReader2(String file, String bpid, long blockId, protected RemoteBlockReader2(String file, String bpid, long blockId,
DataChecksum checksum, boolean verifyChecksum, DataChecksum checksum, boolean verifyChecksum,
long startOffset, long firstChunkOffset, long bytesToRead, Peer peer, long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
DatanodeID datanodeID) { DatanodeID datanodeID, PeerCache peerCache) {
// Path is used only for printing block and file information in debug // Path is used only for printing block and file information in debug
this.peer = peer; this.peer = peer;
this.datanodeID = datanodeID; this.datanodeID = datanodeID;
@ -262,6 +263,7 @@ public class RemoteBlockReader2 implements BlockReader {
this.verifyChecksum = verifyChecksum; this.verifyChecksum = verifyChecksum;
this.startOffset = Math.max( startOffset, 0 ); this.startOffset = Math.max( startOffset, 0 );
this.filename = file; this.filename = file;
this.peerCache = peerCache;
// The total number of bytes that we need to transfer from the DN is // The total number of bytes that we need to transfer from the DN is
// the amount that the user wants (bytesToRead), plus the padding at // the amount that the user wants (bytesToRead), plus the padding at
@ -274,8 +276,7 @@ public class RemoteBlockReader2 implements BlockReader {
@Override @Override
public synchronized void close(PeerCache peerCache, public synchronized void close() throws IOException {
FileInputStreamCache fisCache) throws IOException {
packetReceiver.close(); packetReceiver.close();
startOffset = -1; startOffset = -1;
checksum = null; checksum = null;
@ -365,8 +366,8 @@ public class RemoteBlockReader2 implements BlockReader {
long startOffset, long len, long startOffset, long len,
boolean verifyChecksum, boolean verifyChecksum,
String clientName, String clientName,
Peer peer, DatanodeID datanodeID) Peer peer, DatanodeID datanodeID,
throws IOException { PeerCache peerCache) throws IOException {
// in and out will be closed when sock is closed (by the caller) // in and out will be closed when sock is closed (by the caller)
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
peer.getOutputStream())); peer.getOutputStream()));
@ -399,7 +400,7 @@ public class RemoteBlockReader2 implements BlockReader {
return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(), return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(),
checksum, verifyChecksum, startOffset, firstChunkOffset, len, peer, checksum, verifyChecksum, startOffset, firstChunkOffset, len, peer,
datanodeID); datanodeID, peerCache);
} }
static void checkSuccess( static void checkSuccess(

View File

@ -215,7 +215,8 @@ public class JspHelper {
offsetIntoBlock, amtToRead, true, offsetIntoBlock, amtToRead, true,
"JspHelper", TcpPeerServer.peerFromSocketAndKey(s, encryptionKey), "JspHelper", TcpPeerServer.peerFromSocketAndKey(s, encryptionKey),
new DatanodeID(addr.getAddress().toString(), new DatanodeID(addr.getAddress().toString(),
addr.getHostName(), poolId, addr.getPort(), 0, 0), null, false); addr.getHostName(), poolId, addr.getPort(), 0, 0), null,
null, null, false);
byte[] buf = new byte[(int)amtToRead]; byte[] buf = new byte[(int)amtToRead];
int readOffset = 0; int readOffset = 0;
@ -234,7 +235,7 @@ public class JspHelper {
amtToRead -= numRead; amtToRead -= numRead;
readOffset += numRead; readOffset += numRead;
} }
blockReader.close(null, null); blockReader.close();
out.print(HtmlQuoting.quoteHtmlChars(new String(buf, Charsets.UTF_8))); out.print(HtmlQuoting.quoteHtmlChars(new String(buf, Charsets.UTF_8)));
} }

View File

@ -1137,7 +1137,17 @@ public class DataNode extends Configured
maxVersion); maxVersion);
} }
metrics.incrBlocksGetLocalPathInfo(); metrics.incrBlocksGetLocalPathInfo();
return data.getShortCircuitFdsForRead(blk); FileInputStream fis[] = new FileInputStream[2];
try {
fis[0] = (FileInputStream)data.getBlockInputStream(blk, 0);
fis[1] = (FileInputStream)data.getMetaDataInputStream(blk).getWrappedStream();
} catch (ClassCastException e) {
LOG.debug("requestShortCircuitFdsForRead failed", e);
throw new ShortCircuitFdsUnsupportedException("This DataNode's " +
"FsDatasetSpi does not support short-circuit local reads");
}
return fis;
} }
@Override @Override

View File

@ -282,16 +282,10 @@ class DataXceiver extends Receiver implements Runnable {
DatanodeRegistration dnR = datanode.getDNRegistrationForBP(blk DatanodeRegistration dnR = datanode.getDNRegistrationForBP(blk
.getBlockPoolId()); .getBlockPoolId());
BlockSender.ClientTraceLog.info(String.format( BlockSender.ClientTraceLog.info(String.format(
String.format( "src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_FDS," +
"src: %s, dest: %s, op: %s, blockid: %s, srvID: %s, " + " blockid: %s, srvID: %s, success: %b",
"success: %b", blk.getBlockId(), dnR.getStorageID(), (fis != null)
"127.0.0.1", // src IP ));
"127.0.0.1", // dst IP
"REQUEST_SHORT_CIRCUIT_FDS", // operation
blk.getBlockId(), // block id
dnR.getStorageID(),
(fis != null)
)));
} }
if (fis != null) { if (fis != null) {
IOUtils.cleanup(LOG, fis); IOUtils.cleanup(LOG, fis);

View File

@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset;
import java.io.File; import java.io.File;
import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.List; import java.util.List;
@ -386,7 +385,4 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
*/ */
public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks) public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks)
throws IOException; throws IOException;
FileInputStream[] getShortCircuitFdsForRead(ExtendedBlock block)
throws IOException;
} }

View File

@ -41,4 +41,8 @@ public class LengthInputStream extends FilterInputStream {
public long getLength() { public long getLength() {
return length; return length;
} }
public InputStream getWrappedStream() {
return in;
}
} }

View File

@ -1700,27 +1700,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
datafile.getAbsolutePath(), metafile.getAbsolutePath()); datafile.getAbsolutePath(), metafile.getAbsolutePath());
return info; return info;
} }
@Override // FsDatasetSpi
public FileInputStream[] getShortCircuitFdsForRead(ExtendedBlock block)
throws IOException {
File datafile = getBlockFile(block);
File metafile = FsDatasetUtil.getMetaFile(datafile,
block.getGenerationStamp());
FileInputStream fis[] = new FileInputStream[2];
boolean success = false;
try {
fis[0] = new FileInputStream(datafile);
fis[1] = new FileInputStream(metafile);
success = true;
return fis;
} finally {
if (!success) {
IOUtils.cleanup(null, fis);
}
}
}
@Override // FsDatasetSpi @Override // FsDatasetSpi
public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks) public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks)
throws IOException { throws IOException {

View File

@ -563,7 +563,7 @@ public class NamenodeFsck {
conf, file, block, lblock.getBlockToken(), 0, -1, true, "fsck", conf, file, block, lblock.getBlockToken(), 0, -1, true, "fsck",
TcpPeerServer.peerFromSocketAndKey(s, namenode.getRpcServer(). TcpPeerServer.peerFromSocketAndKey(s, namenode.getRpcServer().
getDataEncryptionKey()), getDataEncryptionKey()),
chosenNode, null, false); chosenNode, null, null, null, false);
} catch (IOException ex) { } catch (IOException ex) {
// Put chosen node into dead list, continue // Put chosen node into dead list, continue

View File

@ -66,3 +66,33 @@ HDFS Short-Circuit Local Reads
</property> </property>
</configuration> </configuration>
---- ----
* {Configuration Keys}
* dfs.client.read.shortcircuit
This configuration parameter turns on short-circuit local reads.
* dfs.client.read.shortcircuit.skip.checkusm
If this configuration parameter is set, short-circuit local reads will skip
checksums. This is normally not recommended, but it may be useful for
special setups. You might consider using this if you are doing your own
checksumming outside of HDFS.
* dfs.client.read.shortcircuit.streams.cache.size
The DFSClient maintains a cache of recently opened file descriptors. This
parameter controls the size of that cache. Setting this higher will use more
file descriptors, but potentially provide better performance on workloads
involving lots of seeks.
* dfs.client.read.shortcircuit.streams.cache.expiry.ms
This controls the minimum amount of time file descriptors need to sit in the
FileInputStreamCache before they can be closed for being inactive for too long.
* dfs.client.domain.socket.data.traffic
This control whether we will try to pass normal data traffic over UNIX domain
sockets.

View File

@ -155,7 +155,7 @@ public class BlockReaderTestUtil {
testBlock.getBlockToken(), testBlock.getBlockToken(),
offset, lenToRead, offset, lenToRead,
true, "BlockReaderTestUtil", TcpPeerServer.peerFromSocket(sock), true, "BlockReaderTestUtil", TcpPeerServer.peerFromSocket(sock),
nodes[0], null, false); nodes[0], null, null, null, false);
} }
/** /**

View File

@ -127,7 +127,7 @@ public class TestBlockReaderLocal {
checkIn = new FileInputStream(metaFile); checkIn = new FileInputStream(metaFile);
blockReaderLocal = new BlockReaderLocal(conf, blockReaderLocal = new BlockReaderLocal(conf,
TEST_PATH.getName(), block, 0, -1, TEST_PATH.getName(), block, 0, -1,
dataIn, checkIn, datanodeID, checksum); dataIn, checkIn, datanodeID, checksum, null);
dataIn = null; dataIn = null;
checkIn = null; checkIn = null;
test.doTest(blockReaderLocal, original); test.doTest(blockReaderLocal, original);
@ -136,7 +136,7 @@ public class TestBlockReaderLocal {
if (cluster != null) cluster.shutdown(); if (cluster != null) cluster.shutdown();
if (dataIn != null) dataIn.close(); if (dataIn != null) dataIn.close();
if (checkIn != null) checkIn.close(); if (checkIn != null) checkIn.close();
if (blockReaderLocal != null) blockReaderLocal.close(null, null); if (blockReaderLocal != null) blockReaderLocal.close();
} }
} }

View File

@ -61,7 +61,7 @@ public class TestClientBlockVerification {
util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024)); util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true); util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true);
verify(reader).sendReadResult(Status.CHECKSUM_OK); verify(reader).sendReadResult(Status.CHECKSUM_OK);
reader.close(null, null); reader.close();
} }
/** /**
@ -76,7 +76,7 @@ public class TestClientBlockVerification {
// We asked the blockreader for the whole file, and only read // We asked the blockreader for the whole file, and only read
// half of it, so no CHECKSUM_OK // half of it, so no CHECKSUM_OK
verify(reader, never()).sendReadResult(Status.CHECKSUM_OK); verify(reader, never()).sendReadResult(Status.CHECKSUM_OK);
reader.close(null, null); reader.close();
} }
/** /**
@ -92,7 +92,7 @@ public class TestClientBlockVerification {
// And read half the file // And read half the file
util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true); util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true);
verify(reader).sendReadResult(Status.CHECKSUM_OK); verify(reader).sendReadResult(Status.CHECKSUM_OK);
reader.close(null, null); reader.close();
} }
/** /**
@ -111,7 +111,7 @@ public class TestClientBlockVerification {
util.getBlockReader(testBlock, startOffset, length)); util.getBlockReader(testBlock, startOffset, length));
util.readAndCheckEOS(reader, length, true); util.readAndCheckEOS(reader, length, true);
verify(reader).sendReadResult(Status.CHECKSUM_OK); verify(reader).sendReadResult(Status.CHECKSUM_OK);
reader.close(null, null); reader.close();
} }
} }
} }

View File

@ -148,7 +148,7 @@ public class TestBlockTokenWithDFS {
blockReader = BlockReaderFactory.newBlockReader( blockReader = BlockReaderFactory.newBlockReader(
conf, file, block, lblock.getBlockToken(), 0, -1, conf, file, block, lblock.getBlockToken(), 0, -1,
true, "TestBlockTokenWithDFS", TcpPeerServer.peerFromSocket(s), true, "TestBlockTokenWithDFS", TcpPeerServer.peerFromSocket(s),
nodes[0], null, false); nodes[0], null, null, null, false);
} catch (IOException ex) { } catch (IOException ex) {
if (ex instanceof InvalidBlockTokenException) { if (ex instanceof InvalidBlockTokenException) {

View File

@ -964,12 +964,6 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public FileInputStream[] getShortCircuitFdsForRead(ExtendedBlock block)
throws IOException {
throw new UnsupportedOperationException();
}
@Override @Override
public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks) public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks)
throws IOException { throws IOException {

View File

@ -286,8 +286,8 @@ public class TestDataNodeVolumeFailure {
BlockReader blockReader = BlockReader blockReader =
BlockReaderFactory.newBlockReader(conf, file, block, BlockReaderFactory.newBlockReader(conf, file, block,
lblock.getBlockToken(), 0, -1, true, "TestDataNodeVolumeFailure", lblock.getBlockToken(), 0, -1, true, "TestDataNodeVolumeFailure",
TcpPeerServer.peerFromSocket(s), datanode, null, false); TcpPeerServer.peerFromSocket(s), datanode, null, null, null, false);
blockReader.close(null, null); blockReader.close();
} }
/** /**