diff --git a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml index 4eaa65c8e4c..05f92b1f945 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml @@ -291,13 +291,6 @@ - - - - - - - diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java index 17ecb9e30d6..5f3a5075728 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java @@ -51,19 +51,9 @@ public interface BlockReader extends ByteBufferReadable { /** * Close the block reader. * - * @param peerCache The PeerCache to put the Peer we're using back - * into, or null if we should simply close the Peer - * we're using (along with its Socket). - * Ignored by Readers that don't maintain Peers. - * @param fisCache The FileInputStreamCache to put our FileInputStreams - * back into, or null if we should simply close them. - * Ignored by Readers that don't maintain - * FileInputStreams. - * * @throws IOException */ - void close(PeerCache peerCache, FileInputStreamCache fisCache) - throws IOException; + void close() throws IOException; /** * Read exactly the given amount of data, throwing an exception diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java index 7fbb1a01d59..a67b3892f2a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java @@ -87,6 +87,8 @@ public class BlockReaderFactory { Peer peer, DatanodeID datanodeID, DomainSocketFactory domSockFactory, + PeerCache peerCache, + FileInputStreamCache fisCache, boolean allowShortCircuitLocalReads) throws IOException { peer.setReadTimeout(conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, @@ -101,16 +103,15 @@ public class BlockReaderFactory { // enabled, try to set up a BlockReaderLocal. BlockReader reader = newShortCircuitBlockReader(conf, file, block, blockToken, startOffset, len, peer, datanodeID, - domSockFactory, verifyChecksum); + domSockFactory, verifyChecksum, fisCache); if (reader != null) { // One we've constructed the short-circuit block reader, we don't // need the socket any more. So let's return it to the cache. - PeerCache peerCache = PeerCache.getInstance( - conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, - DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT), - conf.getLong(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY, - DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT)); - peerCache.put(datanodeID, peer); + if (peerCache != null) { + peerCache.put(datanodeID, peer); + } else { + IOUtils.cleanup(null, peer); + } return reader; } } @@ -131,11 +132,11 @@ public class BlockReaderFactory { block, blockToken, startOffset, len, conf.getInt(DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY, DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT), - verifyChecksum, clientName, peer, datanodeID); + verifyChecksum, clientName, peer, datanodeID, peerCache); } else { return RemoteBlockReader2.newBlockReader( file, block, blockToken, startOffset, len, - verifyChecksum, clientName, peer, datanodeID); + verifyChecksum, clientName, peer, datanodeID, peerCache); } } @@ -175,8 +176,8 @@ public class BlockReaderFactory { Configuration conf, String file, ExtendedBlock block, Token blockToken, long startOffset, long len, Peer peer, DatanodeID datanodeID, - DomainSocketFactory domSockFactory, boolean verifyChecksum) - throws IOException { + DomainSocketFactory domSockFactory, boolean verifyChecksum, + FileInputStreamCache fisCache) throws IOException { final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( peer.getOutputStream())); @@ -194,7 +195,8 @@ public class BlockReaderFactory { sock.recvFileInputStreams(fis, buf, 0, buf.length); try { reader = new BlockReaderLocal(conf, file, block, - startOffset, len, fis[0], fis[1], datanodeID, verifyChecksum); + startOffset, len, fis[0], fis[1], datanodeID, verifyChecksum, + fisCache); } finally { if (reader == null) { IOUtils.cleanup(DFSClient.LOG, fis[0], fis[1]); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java index ea22a9888f8..5a557089377 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java @@ -88,6 +88,8 @@ class BlockReaderLocal implements BlockReader { private final DatanodeID datanodeID; private final ExtendedBlock block; + private final FileInputStreamCache fisCache; + private static int getSlowReadBufferNumChunks(Configuration conf, int bytesPerChecksum) { @@ -109,13 +111,15 @@ class BlockReaderLocal implements BlockReader { public BlockReaderLocal(Configuration conf, String filename, ExtendedBlock block, long startOffset, long length, FileInputStream dataIn, FileInputStream checksumIn, - DatanodeID datanodeID, boolean verifyChecksum) throws IOException { + DatanodeID datanodeID, boolean verifyChecksum, + FileInputStreamCache fisCache) throws IOException { this.dataIn = dataIn; this.checksumIn = checksumIn; this.startOffset = Math.max(startOffset, 0); this.filename = filename; this.datanodeID = datanodeID; this.block = block; + this.fisCache = fisCache; // read and handle the common header here. For now just a version checksumIn.getChannel().position(0); @@ -489,8 +493,7 @@ class BlockReaderLocal implements BlockReader { } @Override - public synchronized void close(PeerCache peerCache, - FileInputStreamCache fisCache) throws IOException { + public synchronized void close() throws IOException { if (fisCache != null) { if (LOG.isDebugEnabled()) { LOG.debug("putting FileInputStream for " + filename + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java index 11fb4922fcb..1d5a334dc52 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java @@ -671,8 +671,7 @@ class BlockReaderLocalLegacy implements BlockReader { } @Override - public synchronized void close(PeerCache peerCache, - FileInputStreamCache fisCache) throws IOException { + public synchronized void close() throws IOException { IOUtils.cleanup(LOG, dataIn, checksumIn); if (slowReadBuff != null) { bufferPool.returnBuffer(slowReadBuff); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 685487c8497..a78b6c83039 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -441,7 +441,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable // Will be getting a new BlockReader. if (blockReader != null) { - blockReader.close(peerCache, fileInputStreamCache); + blockReader.close(); blockReader = null; } @@ -527,7 +527,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable dfsClient.checkOpen(); if (blockReader != null) { - blockReader.close(peerCache, fileInputStreamCache); + blockReader.close(); blockReader = null; } super.close(); @@ -855,7 +855,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable } } finally { if (reader != null) { - reader.close(peerCache, fileInputStreamCache); + reader.close(); } } // Put chosen node into dead list, continue @@ -924,7 +924,8 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable "the FileInputStreamCache."); } return new BlockReaderLocal(dfsClient.conf, file, - block, startOffset, len, fis[0], fis[1], chosenNode, verifyChecksum); + block, startOffset, len, fis[0], fis[1], chosenNode, verifyChecksum, + fileInputStreamCache); } // If the legacy local block reader is enabled and we are reading a local @@ -957,7 +958,8 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable reader = BlockReaderFactory.newBlockReader( dfsClient.conf, file, block, blockToken, startOffset, len, verifyChecksum, clientName, peer, chosenNode, - dsFactory, allowShortCircuitLocalReads); + dsFactory, peerCache, fileInputStreamCache, + allowShortCircuitLocalReads); return reader; } catch (IOException ex) { DFSClient.LOG.debug("Error making BlockReader with DomainSocket. " + @@ -978,8 +980,9 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable shortCircuitLocalReads && (!shortCircuitForbidden()); reader = BlockReaderFactory.newBlockReader( dfsClient.conf, file, block, blockToken, startOffset, - len, verifyChecksum, clientName, peer, chosenNode, - dsFactory, allowShortCircuitLocalReads); + len, verifyChecksum, clientName, peer, chosenNode, + dsFactory, peerCache, fileInputStreamCache, + allowShortCircuitLocalReads); return reader; } catch (IOException e) { DFSClient.LOG.warn("failed to connect to " + domSock, e); @@ -1002,7 +1005,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable reader = BlockReaderFactory.newBlockReader( dfsClient.conf, file, block, blockToken, startOffset, len, verifyChecksum, clientName, peer, chosenNode, - dsFactory, false); + dsFactory, peerCache, fileInputStreamCache, false); return reader; } catch (IOException ex) { DFSClient.LOG.debug("Error making BlockReader. Closing stale " + @@ -1021,7 +1024,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable return BlockReaderFactory.newBlockReader( dfsClient.conf, file, block, blockToken, startOffset, len, verifyChecksum, clientName, peer, chosenNode, - dsFactory, false); + dsFactory, peerCache, fileInputStreamCache, false); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/FileInputStreamCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/FileInputStreamCache.java index ac0af814362..7879fd4a898 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/FileInputStreamCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/FileInputStreamCache.java @@ -118,8 +118,8 @@ class FileInputStreamCache { return false; } FileInputStreamCache.Key otherKey = (FileInputStreamCache.Key)other; - return (block.equals(otherKey.block) & - (block.getGenerationStamp() == otherKey.block.getGenerationStamp()) & + return (block.equals(otherKey.block) && + (block.getGenerationStamp() == otherKey.block.getGenerationStamp()) && datanodeID.equals(otherKey.datanodeID)); } @@ -233,8 +233,7 @@ class FileInputStreamCache { executor.remove(cacheCleaner); } for (Iterator> iter = map.entries().iterator(); - iter.hasNext(); - iter = map.entries().iterator()) { + iter.hasNext();) { Entry entry = iter.next(); entry.getValue().close(); iter.remove(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java index 61120f39edc..d391728c889 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java @@ -23,7 +23,6 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.net.InetSocketAddress; -import java.net.Socket; import java.nio.ByteBuffer; import org.apache.hadoop.classification.InterfaceAudience; @@ -33,7 +32,6 @@ import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; -import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; @@ -41,9 +39,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumIn import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; @@ -91,6 +87,8 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { /** Amount of unread data in the current received packet */ int dataLeft = 0; + private final PeerCache peerCache; + /* FSInputChecker interface */ /* same interface as inputStream java.io.InputStream#read() @@ -324,7 +322,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { private RemoteBlockReader(String file, String bpid, long blockId, DataInputStream in, DataChecksum checksum, boolean verifyChecksum, long startOffset, long firstChunkOffset, long bytesToRead, Peer peer, - DatanodeID datanodeID) { + DatanodeID datanodeID, PeerCache peerCache) { // Path is used only for printing block and file information in debug super(new Path("/blk_" + blockId + ":" + bpid + ":of:"+ file)/*too non path-like?*/, 1, verifyChecksum, @@ -350,6 +348,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { bytesPerChecksum = this.checksum.getBytesPerChecksum(); checksumSize = this.checksum.getChecksumSize(); + this.peerCache = peerCache; } /** @@ -373,7 +372,8 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { long startOffset, long len, int bufferSize, boolean verifyChecksum, String clientName, Peer peer, - DatanodeID datanodeID) + DatanodeID datanodeID, + PeerCache peerCache) throws IOException { // in and out will be closed when sock is closed (by the caller) final DataOutputStream out = @@ -409,12 +409,11 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { return new RemoteBlockReader(file, block.getBlockPoolId(), block.getBlockId(), in, checksum, verifyChecksum, startOffset, firstChunkOffset, len, - peer, datanodeID); + peer, datanodeID, peerCache); } @Override - public synchronized void close(PeerCache peerCache, - FileInputStreamCache fisCache) throws IOException { + public synchronized void close() throws IOException { startOffset = -1; checksum = null; if (peerCache != null & sentStatusCode) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java index 4d62734b1fa..01d83eaeeba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java @@ -82,6 +82,7 @@ public class RemoteBlockReader2 implements BlockReader { final private Peer peer; final private DatanodeID datanodeID; + final private PeerCache peerCache; private final ReadableByteChannel in; private DataChecksum checksum; @@ -253,7 +254,7 @@ public class RemoteBlockReader2 implements BlockReader { protected RemoteBlockReader2(String file, String bpid, long blockId, DataChecksum checksum, boolean verifyChecksum, long startOffset, long firstChunkOffset, long bytesToRead, Peer peer, - DatanodeID datanodeID) { + DatanodeID datanodeID, PeerCache peerCache) { // Path is used only for printing block and file information in debug this.peer = peer; this.datanodeID = datanodeID; @@ -262,6 +263,7 @@ public class RemoteBlockReader2 implements BlockReader { this.verifyChecksum = verifyChecksum; this.startOffset = Math.max( startOffset, 0 ); this.filename = file; + this.peerCache = peerCache; // The total number of bytes that we need to transfer from the DN is // the amount that the user wants (bytesToRead), plus the padding at @@ -274,8 +276,7 @@ public class RemoteBlockReader2 implements BlockReader { @Override - public synchronized void close(PeerCache peerCache, - FileInputStreamCache fisCache) throws IOException { + public synchronized void close() throws IOException { packetReceiver.close(); startOffset = -1; checksum = null; @@ -365,8 +366,8 @@ public class RemoteBlockReader2 implements BlockReader { long startOffset, long len, boolean verifyChecksum, String clientName, - Peer peer, DatanodeID datanodeID) - throws IOException { + Peer peer, DatanodeID datanodeID, + PeerCache peerCache) throws IOException { // in and out will be closed when sock is closed (by the caller) final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( peer.getOutputStream())); @@ -399,7 +400,7 @@ public class RemoteBlockReader2 implements BlockReader { return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(), checksum, verifyChecksum, startOffset, firstChunkOffset, len, peer, - datanodeID); + datanodeID, peerCache); } static void checkSuccess( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java index 7b8af412327..590794cda7b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java @@ -215,7 +215,8 @@ public class JspHelper { offsetIntoBlock, amtToRead, true, "JspHelper", TcpPeerServer.peerFromSocketAndKey(s, encryptionKey), new DatanodeID(addr.getAddress().toString(), - addr.getHostName(), poolId, addr.getPort(), 0, 0), null, false); + addr.getHostName(), poolId, addr.getPort(), 0, 0), null, + null, null, false); byte[] buf = new byte[(int)amtToRead]; int readOffset = 0; @@ -234,7 +235,7 @@ public class JspHelper { amtToRead -= numRead; readOffset += numRead; } - blockReader.close(null, null); + blockReader.close(); out.print(HtmlQuoting.quoteHtmlChars(new String(buf, Charsets.UTF_8))); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 32031f391c7..37a6b6acb25 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -1137,7 +1137,17 @@ public class DataNode extends Configured maxVersion); } metrics.incrBlocksGetLocalPathInfo(); - return data.getShortCircuitFdsForRead(blk); + FileInputStream fis[] = new FileInputStream[2]; + + try { + fis[0] = (FileInputStream)data.getBlockInputStream(blk, 0); + fis[1] = (FileInputStream)data.getMetaDataInputStream(blk).getWrappedStream(); + } catch (ClassCastException e) { + LOG.debug("requestShortCircuitFdsForRead failed", e); + throw new ShortCircuitFdsUnsupportedException("This DataNode's " + + "FsDatasetSpi does not support short-circuit local reads"); + } + return fis; } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index 466635b5e88..b08959dbe2d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -282,16 +282,10 @@ class DataXceiver extends Receiver implements Runnable { DatanodeRegistration dnR = datanode.getDNRegistrationForBP(blk .getBlockPoolId()); BlockSender.ClientTraceLog.info(String.format( - String.format( - "src: %s, dest: %s, op: %s, blockid: %s, srvID: %s, " + - "success: %b", - "127.0.0.1", // src IP - "127.0.0.1", // dst IP - "REQUEST_SHORT_CIRCUIT_FDS", // operation - blk.getBlockId(), // block id - dnR.getStorageID(), - (fis != null) - ))); + "src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_FDS," + + " blockid: %s, srvID: %s, success: %b", + blk.getBlockId(), dnR.getStorageID(), (fis != null) + )); } if (fis != null) { IOUtils.cleanup(LOG, fis); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index 57e8887be81..4f633973111 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.util.List; @@ -386,7 +385,4 @@ public interface FsDatasetSpi extends FSDatasetMBean { */ public HdfsBlocksMetadata getHdfsBlocksMetadata(List blocks) throws IOException; - - FileInputStream[] getShortCircuitFdsForRead(ExtendedBlock block) - throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/LengthInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/LengthInputStream.java index 22ec6dc44a7..1fbb26ae3fe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/LengthInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/LengthInputStream.java @@ -41,4 +41,8 @@ public class LengthInputStream extends FilterInputStream { public long getLength() { return length; } + + public InputStream getWrappedStream() { + return in; + } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 691e1e7b45b..90cdd5888fe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -1681,27 +1681,7 @@ class FsDatasetImpl implements FsDatasetSpi { datafile.getAbsolutePath(), metafile.getAbsolutePath()); return info; } - - @Override // FsDatasetSpi - public FileInputStream[] getShortCircuitFdsForRead(ExtendedBlock block) - throws IOException { - File datafile = getBlockFile(block); - File metafile = FsDatasetUtil.getMetaFile(datafile, - block.getGenerationStamp()); - FileInputStream fis[] = new FileInputStream[2]; - boolean success = false; - try { - fis[0] = new FileInputStream(datafile); - fis[1] = new FileInputStream(metafile); - success = true; - return fis; - } finally { - if (!success) { - IOUtils.cleanup(null, fis); - } - } - } - + @Override // FsDatasetSpi public HdfsBlocksMetadata getHdfsBlocksMetadata(List blocks) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java index e66611f8dab..26103fc2e22 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java @@ -563,7 +563,7 @@ public class NamenodeFsck { conf, file, block, lblock.getBlockToken(), 0, -1, true, "fsck", TcpPeerServer.peerFromSocketAndKey(s, namenode.getRpcServer(). getDataEncryptionKey()), - chosenNode, null, false); + chosenNode, null, null, null, false); } catch (IOException ex) { // Put chosen node into dead list, continue diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/ShortCircuitLocalReads.apt.vm b/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/ShortCircuitLocalReads.apt.vm index 6792f156b38..58f07124161 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/ShortCircuitLocalReads.apt.vm +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/ShortCircuitLocalReads.apt.vm @@ -66,3 +66,33 @@ HDFS Short-Circuit Local Reads ---- + +* {Configuration Keys} + + * dfs.client.read.shortcircuit + + This configuration parameter turns on short-circuit local reads. + + * dfs.client.read.shortcircuit.skip.checkusm + + If this configuration parameter is set, short-circuit local reads will skip + checksums. This is normally not recommended, but it may be useful for + special setups. You might consider using this if you are doing your own + checksumming outside of HDFS. + + * dfs.client.read.shortcircuit.streams.cache.size + + The DFSClient maintains a cache of recently opened file descriptors. This + parameter controls the size of that cache. Setting this higher will use more + file descriptors, but potentially provide better performance on workloads + involving lots of seeks. + + * dfs.client.read.shortcircuit.streams.cache.expiry.ms + + This controls the minimum amount of time file descriptors need to sit in the + FileInputStreamCache before they can be closed for being inactive for too long. + + * dfs.client.domain.socket.data.traffic + + This control whether we will try to pass normal data traffic over UNIX domain + sockets. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java index f96849e64cd..35381634527 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java @@ -155,7 +155,7 @@ public class BlockReaderTestUtil { testBlock.getBlockToken(), offset, lenToRead, true, "BlockReaderTestUtil", TcpPeerServer.peerFromSocket(sock), - nodes[0], null, false); + nodes[0], null, null, null, false); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java index e35da42a7d4..704a5421a72 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java @@ -127,7 +127,7 @@ public class TestBlockReaderLocal { checkIn = new FileInputStream(metaFile); blockReaderLocal = new BlockReaderLocal(conf, TEST_PATH.getName(), block, 0, -1, - dataIn, checkIn, datanodeID, checksum); + dataIn, checkIn, datanodeID, checksum, null); dataIn = null; checkIn = null; test.doTest(blockReaderLocal, original); @@ -136,7 +136,7 @@ public class TestBlockReaderLocal { if (cluster != null) cluster.shutdown(); if (dataIn != null) dataIn.close(); if (checkIn != null) checkIn.close(); - if (blockReaderLocal != null) blockReaderLocal.close(null, null); + if (blockReaderLocal != null) blockReaderLocal.close(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java index 4a88e0b1d30..8dd3d6fd38a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java @@ -61,7 +61,7 @@ public class TestClientBlockVerification { util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024)); util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true); verify(reader).sendReadResult(Status.CHECKSUM_OK); - reader.close(null, null); + reader.close(); } /** @@ -76,7 +76,7 @@ public class TestClientBlockVerification { // We asked the blockreader for the whole file, and only read // half of it, so no CHECKSUM_OK verify(reader, never()).sendReadResult(Status.CHECKSUM_OK); - reader.close(null, null); + reader.close(); } /** @@ -92,7 +92,7 @@ public class TestClientBlockVerification { // And read half the file util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true); verify(reader).sendReadResult(Status.CHECKSUM_OK); - reader.close(null, null); + reader.close(); } /** @@ -111,7 +111,7 @@ public class TestClientBlockVerification { util.getBlockReader(testBlock, startOffset, length)); util.readAndCheckEOS(reader, length, true); verify(reader).sendReadResult(Status.CHECKSUM_OK); - reader.close(null, null); + reader.close(); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java index d68625de36a..689852e3fcf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java @@ -148,7 +148,7 @@ public class TestBlockTokenWithDFS { blockReader = BlockReaderFactory.newBlockReader( conf, file, block, lblock.getBlockToken(), 0, -1, true, "TestBlockTokenWithDFS", TcpPeerServer.peerFromSocket(s), - nodes[0], null, false); + nodes[0], null, null, null, false); } catch (IOException ex) { if (ex instanceof InvalidBlockTokenException) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index ef73d868151..274e5a3a6bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -964,12 +964,6 @@ public class SimulatedFSDataset implements FsDatasetSpi { throw new UnsupportedOperationException(); } - @Override - public FileInputStream[] getShortCircuitFdsForRead(ExtendedBlock block) - throws IOException { - throw new UnsupportedOperationException(); - } - @Override public HdfsBlocksMetadata getHdfsBlocksMetadata(List blocks) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java index 3ba91c4dc1c..3697ae25804 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java @@ -285,8 +285,8 @@ public class TestDataNodeVolumeFailure { BlockReader blockReader = BlockReaderFactory.newBlockReader(conf, file, block, lblock.getBlockToken(), 0, -1, true, "TestDataNodeVolumeFailure", - TcpPeerServer.peerFromSocket(s), datanode, null, false); - blockReader.close(null, null); + TcpPeerServer.peerFromSocket(s), datanode, null, null, null, false); + blockReader.close(); } /**