From 40d15ba6ff0d3347ad2e47743d9b2708604329a0 Mon Sep 17 00:00:00 2001 From: Todd Lipcon Date: Thu, 3 Nov 2011 06:53:44 +0000 Subject: [PATCH] HDFS-2129. Simplify BlockReader to not inherit from FSInputChecker. Contributed by Todd Lipcon. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1196975 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../org/apache/hadoop/hdfs/BlockReader.java | 24 +- .../hadoop/hdfs/BlockReaderFactory.java | 30 +- .../org/apache/hadoop/hdfs/DFSClient.java | 4 + .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 2 + .../apache/hadoop/hdfs/DFSInputStream.java | 3 +- .../apache/hadoop/hdfs/RemoteBlockReader.java | 64 +-- .../hadoop/hdfs/RemoteBlockReader2.java | 500 ++++++++++++++++++ .../protocol/datatransfer/PacketHeader.java | 2 +- .../hadoop/hdfs/server/common/JspHelper.java | 32 +- .../server/datanode/DatanodeJspHelper.java | 10 +- .../hdfs/server/namenode/FileDataServlet.java | 11 +- .../hdfs/server/namenode/NamenodeFsck.java | 5 +- .../web/resources/NamenodeWebHdfsMethods.java | 9 +- .../hadoop/hdfs/util/DirectBufferPool.java | 112 ++++ .../hadoop/hdfs/BlockReaderTestUtil.java | 6 +- .../hdfs/TestClientBlockVerification.java | 14 +- .../org/apache/hadoop/hdfs/TestConnCache.java | 17 +- .../org/apache/hadoop/hdfs/TestSeekBug.java | 5 +- .../TestBlockTokenWithDFS.java | 8 +- .../datanode/TestDataNodeVolumeFailure.java | 9 +- .../hdfs/server/datanode/TestDatanodeJsp.java | 3 +- .../hdfs/util/TestDirectBufferPool.java | 95 ++++ 23 files changed, 842 insertions(+), 126 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/DirectBufferPool.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestDirectBufferPool.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 93918d78e15..85b6c7cdf6d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -15,6 +15,9 @@ Release 0.23.1 - UNRELEASED HDFS-2533. Remove needless synchronization on some FSDataSet methods. (todd) + HDFS-2129. Simplify BlockReader to not inherit from FSInputChecker. + (todd) + BUG FIXES Release 0.23.0 - 2011-11-01 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java index 3ebbeec68bd..dfab7309b53 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java @@ -20,14 +20,11 @@ package org.apache.hadoop.hdfs; import java.io.IOException; import java.net.Socket; -import org.apache.hadoop.fs.PositionedReadable; -import org.apache.hadoop.fs.Seekable; - /** * A BlockReader is responsible for reading a single block * from a single datanode. */ -public interface BlockReader extends Seekable, PositionedReadable { +public interface BlockReader { /* same interface as inputStream java.io.InputStream#read() * used by DFSInputStream#read() @@ -43,16 +40,21 @@ public interface BlockReader extends Seekable, PositionedReadable { */ long skip(long n) throws IOException; - /** - * Read a single byte, returning -1 at enf of stream. - */ - int read() throws IOException; - void close() throws IOException; /** - * kind of like readFully(). Only reads as much as possible. - * And allows use of protected readFully(). + * Read exactly the given amount of data, throwing an exception + * if EOF is reached before that amount + */ + void readFully(byte[] buf, int readOffset, int amtToRead) throws IOException; + + /** + * Similar to {@link #readFully(byte[], int, int)} except that it will + * not throw an exception on EOF. However, it differs from the simple + * {@link #read(byte[], int, int)} call in that it is guaranteed to + * read the data if it is available. In other words, if this call + * does not throw an exception, then either the buffer has been + * filled or the next call will return EOF. */ int readAll(byte[] buf, int offset, int len) throws IOException; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java index 52c6cc42ce7..855d7ece795 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java @@ -22,6 +22,8 @@ import java.net.InetSocketAddress; import java.net.Socket; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSClient.Conf; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.security.token.Token; @@ -32,17 +34,26 @@ import org.apache.hadoop.security.token.Token; */ @InterfaceAudience.Private public class BlockReaderFactory { - public static BlockReader newBlockReader(Socket sock, String file, + /** + * @see #newBlockReader(Conf, Socket, String, ExtendedBlock, Token, long, long, int, boolean, String) + */ + public static BlockReader newBlockReader( + Configuration conf, + Socket sock, String file, ExtendedBlock block, Token blockToken, - long startOffset, long len, int bufferSize) throws IOException { - return newBlockReader(sock, file, block, blockToken, startOffset, + long startOffset, long len) throws IOException { + int bufferSize = conf.getInt(DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY, + DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT); + return newBlockReader(new Conf(conf), + sock, file, block, blockToken, startOffset, len, bufferSize, true, ""); } /** * Create a new BlockReader specifically to satisfy a read. * This method also sends the OP_READ_BLOCK request. - * + * + * @param conf the DFSClient configuration * @param sock An established Socket to the DN. The BlockReader will not close it normally * @param file File location * @param block The block object @@ -54,7 +65,9 @@ public class BlockReaderFactory { * @param clientName Client name * @return New BlockReader instance, or null on error. */ + @SuppressWarnings("deprecation") public static BlockReader newBlockReader( + Conf conf, Socket sock, String file, ExtendedBlock block, Token blockToken, @@ -62,8 +75,13 @@ public class BlockReaderFactory { int bufferSize, boolean verifyChecksum, String clientName) throws IOException { - return RemoteBlockReader.newBlockReader( - sock, file, block, blockToken, startOffset, len, bufferSize, verifyChecksum, clientName); + if (conf.useLegacyBlockReader) { + return RemoteBlockReader.newBlockReader( + sock, file, block, blockToken, startOffset, len, bufferSize, verifyChecksum, clientName); + } else { + return RemoteBlockReader2.newBlockReader( + sock, file, block, blockToken, startOffset, len, bufferSize, verifyChecksum, clientName); + } } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index f55ae4c0864..3da99da18e3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -155,6 +155,7 @@ public class DFSClient implements java.io.Closeable { final short defaultReplication; final String taskId; final FsPermission uMask; + final boolean useLegacyBlockReader; Conf(Configuration conf) { maxBlockAcquireFailures = conf.getInt( @@ -192,6 +193,9 @@ public class DFSClient implements java.io.Closeable { .getInt(DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT); uMask = FsPermission.getUMask(conf); + useLegacyBlockReader = conf.getBoolean( + DFS_CLIENT_USE_LEGACY_BLOCKREADER, + DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT); } private int getChecksumType(Configuration conf) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 461cc3b178d..763033537a7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -181,6 +181,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT = 3; public static final String DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY = "dfs.client.max.block.acquire.failures"; public static final int DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT = 3; + public static final String DFS_CLIENT_USE_LEGACY_BLOCKREADER = "dfs.client.use.legacy.blockreader"; + public static final boolean DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT = false; public static final String DFS_BALANCER_MOVEDWINWIDTH_KEY = "dfs.balancer.movedWinWidth"; public static final long DFS_BALANCER_MOVEDWINWIDTH_DEFAULT = 5400*1000L; public static final String DFS_DATANODE_ADDRESS_KEY = "dfs.datanode.address"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 2964ccb5a77..71ec00e20e1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -780,7 +780,8 @@ public class DFSInputStream extends FSInputStream { try { // The OP_READ_BLOCK request is sent as we make the BlockReader BlockReader reader = - BlockReaderFactory.newBlockReader(sock, file, block, + BlockReaderFactory.newBlockReader(dfsClient.getConf(), + sock, file, block, blockToken, startOffset, len, bufferSize, verifyChecksum, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java index 8ffd56b9137..44b35b40229 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java @@ -50,27 +50,13 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; -/** This is a wrapper around connection to datanode - * and understands checksum, offset etc. - * - * Terminology: - *
- *
block
- *
The hdfs block, typically large (~64MB). - *
- *
chunk
- *
A block is divided into chunks, each comes with a checksum. - * We want transfers to be chunk-aligned, to be able to - * verify checksums. - *
- *
packet
- *
A grouping of chunks used for transport. It contains a - * header, followed by checksum data, followed by real data. - *
- *
- * Please see DataNode for the RPC specification. +/** + * @deprecated this is an old implementation that is being left around + * in case any issues spring up with the new {@link RemoteBlockReader2} implementation. + * It will be removed in the next release. */ @InterfaceAudience.Private +@Deprecated public class RemoteBlockReader extends FSInputChecker implements BlockReader { Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read. @@ -410,7 +396,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { BlockOpResponseProto status = BlockOpResponseProto.parseFrom( vintPrefixed(in)); - checkSuccess(status, sock, block, file); + RemoteBlockReader2.checkSuccess(status, sock, block, file); ReadOpChecksumInfoProto checksumInfo = status.getReadOpChecksumInfo(); DataChecksum checksum = DataTransferProtoUtil.fromProto( @@ -431,28 +417,6 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { in, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock); } - private static void checkSuccess( - BlockOpResponseProto status, Socket sock, - ExtendedBlock block, String file) - throws IOException { - if (status.getStatus() != Status.SUCCESS) { - if (status.getStatus() == Status.ERROR_ACCESS_TOKEN) { - throw new InvalidBlockTokenException( - "Got access token error for OP_READ_BLOCK, self=" - + sock.getLocalSocketAddress() + ", remote=" - + sock.getRemoteSocketAddress() + ", for file " + file - + ", for pool " + block.getBlockPoolId() + " block " - + block.getBlockId() + "_" + block.getGenerationStamp()); - } else { - throw new IOException("Got error for OP_READ_BLOCK, self=" - + sock.getLocalSocketAddress() + ", remote=" - + sock.getRemoteSocketAddress() + ", for file " + file - + ", for pool " + block.getBlockPoolId() + " block " - + block.getBlockId() + "_" + block.getGenerationStamp()); - } - } - } - @Override public synchronized void close() throws IOException { startOffset = -1; @@ -464,6 +428,12 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { // in will be closed when its Socket is closed. } + @Override + public void readFully(byte[] buf, int readOffset, int amtToRead) + throws IOException { + IOUtils.readFully(this, buf, readOffset, amtToRead); + } + @Override public int readAll(byte[] buf, int offset, int len) throws IOException { return readFully(this, buf, offset, len); @@ -492,14 +462,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { void sendReadResult(Socket sock, Status statusCode) { assert !sentStatusCode : "already sent status code to " + sock; try { - OutputStream out = NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT); - - ClientReadStatusProto.newBuilder() - .setStatus(statusCode) - .build() - .writeDelimitedTo(out); - - out.flush(); + RemoteBlockReader2.writeReadResult(sock, statusCode); sentStatusCode = true; } catch (IOException e) { // It's ok not to be able to send this. But something is probably wrong. @@ -519,4 +482,5 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { final String poolId, final long blockId) { return s.toString() + ":" + poolId + ":" + blockId; } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java new file mode 100644 index 00000000000..2b2f77ecb45 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java @@ -0,0 +1,500 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed; + +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.ChecksumException; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; +import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; +import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; +import org.apache.hadoop.hdfs.util.DirectBufferPool; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.net.SocketInputStream; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.DataChecksum; + +import com.google.common.base.Preconditions; + +/** + * This is a wrapper around connection to datanode + * and understands checksum, offset etc. + * + * Terminology: + *
+ *
block
+ *
The hdfs block, typically large (~64MB). + *
+ *
chunk
+ *
A block is divided into chunks, each comes with a checksum. + * We want transfers to be chunk-aligned, to be able to + * verify checksums. + *
+ *
packet
+ *
A grouping of chunks used for transport. It contains a + * header, followed by checksum data, followed by real data. + *
+ *
+ * Please see DataNode for the RPC specification. + * + * This is a new implementation introduced in Hadoop 0.23 which + * is more efficient and simpler than the older BlockReader + * implementation. It should be renamed to RemoteBlockReader + * once we are confident in it. + */ +@InterfaceAudience.Private +public class RemoteBlockReader2 implements BlockReader { + + static final Log LOG = LogFactory.getLog(RemoteBlockReader2.class); + + Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read. + private ReadableByteChannel in; + private DataChecksum checksum; + + private PacketHeader curHeader; + private ByteBuffer curPacketBuf = null; + private ByteBuffer curDataSlice = null; + + + /** offset in block of the last chunk received */ + private long lastSeqNo = -1; + + /** offset in block where reader wants to actually read */ + private long startOffset; + private final String filename; + + private static DirectBufferPool bufferPool = + new DirectBufferPool(); + private ByteBuffer headerBuf = ByteBuffer.allocate( + PacketHeader.PKT_HEADER_LEN); + + private int bytesPerChecksum; + private int checksumSize; + + /** + * The total number of bytes we need to transfer from the DN. + * This is the amount that the user has requested plus some padding + * at the beginning so that the read can begin on a chunk boundary. + */ + private long bytesNeededToFinish; + + private final boolean verifyChecksum; + + private boolean sentStatusCode = false; + + byte[] skipBuf = null; + ByteBuffer checksumBytes = null; + /** Amount of unread data in the current received packet */ + int dataLeft = 0; + + @Override + public synchronized int read(byte[] buf, int off, int len) + throws IOException { + + if (curPacketBuf == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { + readNextPacket(); + } + if (curDataSlice.remaining() == 0) { + // we're at EOF now + return -1; + } + + int nRead = Math.min(curDataSlice.remaining(), len); + curDataSlice.get(buf, off, nRead); + + return nRead; + } + + private void readNextPacket() throws IOException { + Preconditions.checkState(curHeader == null || !curHeader.isLastPacketInBlock()); + + //Read packet headers. + readPacketHeader(); + + if (LOG.isTraceEnabled()) { + LOG.trace("DFSClient readNextPacket got header " + curHeader); + } + + // Sanity check the lengths + if (!curHeader.sanityCheck(lastSeqNo)) { + throw new IOException("BlockReader: error in packet header " + + curHeader); + } + + if (curHeader.getDataLen() > 0) { + int chunks = 1 + (curHeader.getDataLen() - 1) / bytesPerChecksum; + int checksumsLen = chunks * checksumSize; + int bufsize = checksumsLen + curHeader.getDataLen(); + + resetPacketBuffer(checksumsLen, curHeader.getDataLen()); + + lastSeqNo = curHeader.getSeqno(); + if (bufsize > 0) { + readChannelFully(in, curPacketBuf); + curPacketBuf.flip(); + if (verifyChecksum) { + verifyPacketChecksums(); + } + } + bytesNeededToFinish -= curHeader.getDataLen(); + } + + // First packet will include some data prior to the first byte + // the user requested. Skip it. + if (curHeader.getOffsetInBlock() < startOffset) { + int newPos = (int) (startOffset - curHeader.getOffsetInBlock()); + curDataSlice.position(newPos); + } + + // If we've now satisfied the whole client read, read one last packet + // header, which should be empty + if (bytesNeededToFinish <= 0) { + readTrailingEmptyPacket(); + if (verifyChecksum) { + sendReadResult(dnSock, Status.CHECKSUM_OK); + } else { + sendReadResult(dnSock, Status.SUCCESS); + } + } + } + + private void verifyPacketChecksums() throws ChecksumException { + // N.B.: the checksum error offset reported here is actually + // relative to the start of the block, not the start of the file. + // This is slightly misleading, but preserves the behavior from + // the older BlockReader. + checksum.verifyChunkedSums(curDataSlice, curPacketBuf, + filename, curHeader.getOffsetInBlock()); + } + + private static void readChannelFully(ReadableByteChannel ch, ByteBuffer buf) + throws IOException { + while (buf.remaining() > 0) { + int n = ch.read(buf); + if (n < 0) { + throw new IOException("Premature EOF reading from " + ch); + } + } + } + + private void resetPacketBuffer(int checksumsLen, int dataLen) { + int packetLen = checksumsLen + dataLen; + if (curPacketBuf == null || + curPacketBuf.capacity() < packetLen) { + returnPacketBufToPool(); + curPacketBuf = bufferPool.getBuffer(packetLen); + } + curPacketBuf.position(checksumsLen); + curDataSlice = curPacketBuf.slice(); + curDataSlice.limit(dataLen); + curPacketBuf.clear(); + curPacketBuf.limit(checksumsLen + dataLen); + } + + @Override + public synchronized long skip(long n) throws IOException { + /* How can we make sure we don't throw a ChecksumException, at least + * in majority of the cases?. This one throws. */ + if ( skipBuf == null ) { + skipBuf = new byte[bytesPerChecksum]; + } + + long nSkipped = 0; + while ( nSkipped < n ) { + int toSkip = (int)Math.min(n-nSkipped, skipBuf.length); + int ret = read(skipBuf, 0, toSkip); + if ( ret <= 0 ) { + return nSkipped; + } + nSkipped += ret; + } + return nSkipped; + } + + private void readPacketHeader() throws IOException { + headerBuf.clear(); + readChannelFully(in, headerBuf); + headerBuf.flip(); + if (curHeader == null) curHeader = new PacketHeader(); + curHeader.readFields(headerBuf); + } + + private void readTrailingEmptyPacket() throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Reading empty packet at end of read"); + } + headerBuf.clear(); + readChannelFully(in, headerBuf); + headerBuf.flip(); + PacketHeader trailer = new PacketHeader(); + trailer.readFields(headerBuf); + if (!trailer.isLastPacketInBlock() || + trailer.getDataLen() != 0) { + throw new IOException("Expected empty end-of-read packet! Header: " + + trailer); + } + } + + private RemoteBlockReader2(String file, String bpid, long blockId, + ReadableByteChannel in, DataChecksum checksum, boolean verifyChecksum, + long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock) { + // Path is used only for printing block and file information in debug + this.dnSock = dnSock; + this.in = in; + this.checksum = checksum; + this.verifyChecksum = verifyChecksum; + this.startOffset = Math.max( startOffset, 0 ); + this.filename = file; + + // The total number of bytes that we need to transfer from the DN is + // the amount that the user wants (bytesToRead), plus the padding at + // the beginning in order to chunk-align. Note that the DN may elect + // to send more than this amount if the read starts/ends mid-chunk. + this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset); + bytesPerChecksum = this.checksum.getBytesPerChecksum(); + checksumSize = this.checksum.getChecksumSize(); + } + + + @Override + public synchronized void close() throws IOException { + returnPacketBufToPool(); + + startOffset = -1; + checksum = null; + if (dnSock != null) { + dnSock.close(); + } + + // in will be closed when its Socket is closed. + } + + @Override + protected void finalize() throws Throwable { + try { + // just in case it didn't get closed, we + // may as well still try to return the buffer + returnPacketBufToPool(); + } finally { + super.finalize(); + } + } + + private void returnPacketBufToPool() { + if (curPacketBuf != null) { + bufferPool.returnBuffer(curPacketBuf); + curPacketBuf = null; + } + } + + /** + * Take the socket used to talk to the DN. + */ + public Socket takeSocket() { + assert hasSentStatusCode() : + "BlockReader shouldn't give back sockets mid-read"; + Socket res = dnSock; + dnSock = null; + return res; + } + + /** + * Whether the BlockReader has reached the end of its input stream + * and successfully sent a status code back to the datanode. + */ + public boolean hasSentStatusCode() { + return sentStatusCode; + } + + /** + * When the reader reaches end of the read, it sends a status response + * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN + * closing our connection (which we will re-open), but won't affect + * data correctness. + */ + void sendReadResult(Socket sock, Status statusCode) { + assert !sentStatusCode : "already sent status code to " + sock; + try { + writeReadResult(sock, statusCode); + sentStatusCode = true; + } catch (IOException e) { + // It's ok not to be able to send this. But something is probably wrong. + LOG.info("Could not send read status (" + statusCode + ") to datanode " + + sock.getInetAddress() + ": " + e.getMessage()); + } + } + + /** + * Serialize the actual read result on the wire. + */ + static void writeReadResult(Socket sock, Status statusCode) + throws IOException { + OutputStream out = NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT); + + ClientReadStatusProto.newBuilder() + .setStatus(statusCode) + .build() + .writeDelimitedTo(out); + + out.flush(); + } + + /** + * File name to print when accessing a block directly (from servlets) + * @param s Address of the block location + * @param poolId Block pool ID of the block + * @param blockId Block ID of the block + * @return string that has a file name for debug purposes + */ + public static String getFileName(final InetSocketAddress s, + final String poolId, final long blockId) { + return s.toString() + ":" + poolId + ":" + blockId; + } + + @Override + public int readAll(byte[] buf, int offset, int len) throws IOException { + int n = 0; + for (;;) { + int nread = read(buf, offset + n, len - n); + if (nread <= 0) + return (n == 0) ? nread : n; + n += nread; + if (n >= len) + return n; + } + } + + @Override + public void readFully(byte[] buf, int off, int len) + throws IOException { + int toRead = len; + while (toRead > 0) { + int ret = read(buf, off, toRead); + if (ret < 0) { + throw new IOException("Premature EOF from inputStream"); + } + toRead -= ret; + off += ret; + } + } + + /** + * Create a new BlockReader specifically to satisfy a read. + * This method also sends the OP_READ_BLOCK request. + * + * @param sock An established Socket to the DN. The BlockReader will not close it normally. + * This socket must have an associated Channel. + * @param file File location + * @param block The block object + * @param blockToken The block token for security + * @param startOffset The read offset, relative to block head + * @param len The number of bytes to read + * @param bufferSize The IO buffer size (not the client buffer size) + * @param verifyChecksum Whether to verify checksum + * @param clientName Client name + * @return New BlockReader instance, or null on error. + */ + public static BlockReader newBlockReader( Socket sock, String file, + ExtendedBlock block, + Token blockToken, + long startOffset, long len, + int bufferSize, boolean verifyChecksum, + String clientName) + throws IOException { + // in and out will be closed when sock is closed (by the caller) + final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( + NetUtils.getOutputStream(sock, + HdfsServerConstants.WRITE_TIMEOUT))); + new Sender(out).readBlock(block, blockToken, clientName, startOffset, len); + + // + // Get bytes in block, set streams + // + Preconditions.checkArgument(sock.getChannel() != null, + "Socket %s does not have an associated Channel.", + sock); + SocketInputStream sin = + (SocketInputStream)NetUtils.getInputStream(sock); + DataInputStream in = new DataInputStream(sin); + + BlockOpResponseProto status = BlockOpResponseProto.parseFrom( + vintPrefixed(in)); + checkSuccess(status, sock, block, file); + ReadOpChecksumInfoProto checksumInfo = + status.getReadOpChecksumInfo(); + DataChecksum checksum = DataTransferProtoUtil.fromProto( + checksumInfo.getChecksum()); + //Warning when we get CHECKSUM_NULL? + + // Read the first chunk offset. + long firstChunkOffset = checksumInfo.getChunkOffset(); + + if ( firstChunkOffset < 0 || firstChunkOffset > startOffset || + firstChunkOffset >= (startOffset + checksum.getBytesPerChecksum())) { + throw new IOException("BlockReader: error in first chunk offset (" + + firstChunkOffset + ") startOffset is " + + startOffset + " for file " + file); + } + + return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(), + sin, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock); + } + + static void checkSuccess( + BlockOpResponseProto status, Socket sock, + ExtendedBlock block, String file) + throws IOException { + if (status.getStatus() != Status.SUCCESS) { + if (status.getStatus() == Status.ERROR_ACCESS_TOKEN) { + throw new InvalidBlockTokenException( + "Got access token error for OP_READ_BLOCK, self=" + + sock.getLocalSocketAddress() + ", remote=" + + sock.getRemoteSocketAddress() + ", for file " + file + + ", for pool " + block.getBlockPoolId() + " block " + + block.getBlockId() + "_" + block.getGenerationStamp()); + } else { + throw new IOException("Got error for OP_READ_BLOCK, self=" + + sock.getLocalSocketAddress() + ", remote=" + + sock.getRemoteSocketAddress() + ", for file " + file + + ", for pool " + block.getBlockPoolId() + " block " + + block.getBlockId() + "_" + block.getGenerationStamp()); + } + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java index 73e1b20a710..d8b9f2b6206 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java @@ -136,7 +136,7 @@ public class PacketHeader { */ public boolean sanityCheck(long lastSeqNo) { // We should only have a non-positive data length for the last packet - if (proto.getDataLen() <= 0 && proto.getLastPacketInBlock()) return false; + if (proto.getDataLen() <= 0 && !proto.getLastPacketInBlock()) return false; // The last packet should not contain data if (proto.getLastPacketInBlock() && proto.getDataLen() != 0) return false; // Seqnos should always increase by 1 with each packet received diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java index 8ae1390ed8d..69879d81ed7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer; import org.apache.hadoop.hdfs.web.resources.DelegationParam; import org.apache.hadoop.hdfs.web.resources.UserParam; import org.apache.hadoop.http.HtmlQuoting; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.AccessControlException; @@ -117,7 +118,8 @@ public class JspHelper { return 0; } } - public static DatanodeInfo bestNode(LocatedBlocks blks) throws IOException { + public static DatanodeInfo bestNode(LocatedBlocks blks, Configuration conf) + throws IOException { HashMap map = new HashMap(); for (LocatedBlock block : blks.getLocatedBlocks()) { @@ -133,16 +135,17 @@ public class JspHelper { } NodeRecord[] nodes = map.values().toArray(new NodeRecord[map.size()]); Arrays.sort(nodes, new NodeRecordComparator()); - return bestNode(nodes, false); + return bestNode(nodes, false, conf); } - public static DatanodeInfo bestNode(LocatedBlock blk) throws IOException { + public static DatanodeInfo bestNode(LocatedBlock blk, Configuration conf) + throws IOException { DatanodeInfo[] nodes = blk.getLocations(); - return bestNode(nodes, true); + return bestNode(nodes, true, conf); } - public static DatanodeInfo bestNode(DatanodeInfo[] nodes, boolean doRandom) - throws IOException { + public static DatanodeInfo bestNode(DatanodeInfo[] nodes, boolean doRandom, + Configuration conf) throws IOException { TreeSet deadNodes = new TreeSet(); DatanodeInfo chosenNode = null; int failures = 0; @@ -169,7 +172,7 @@ public class JspHelper { chosenNode.getHost() + ":" + chosenNode.getInfoPort()); try { - s = new Socket(); + s = NetUtils.getDefaultSocketFactory(conf).createSocket(); s.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT); s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); } catch (IOException e) { @@ -191,27 +194,26 @@ public class JspHelper { long blockSize, long offsetIntoBlock, long chunkSizeToView, JspWriter out, Configuration conf) throws IOException { if (chunkSizeToView == 0) return; - Socket s = new Socket(); + Socket s = NetUtils.getDefaultSocketFactory(conf).createSocket(); s.connect(addr, HdfsServerConstants.READ_TIMEOUT); s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); - long amtToRead = Math.min(chunkSizeToView, blockSize - offsetIntoBlock); + int amtToRead = (int)Math.min(chunkSizeToView, blockSize - offsetIntoBlock); // Use the block name for file name. - int bufferSize = conf.getInt(DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY, - DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT); String file = BlockReaderFactory.getFileName(addr, poolId, blockId); - BlockReader blockReader = BlockReaderFactory.newBlockReader(s, file, + BlockReader blockReader = BlockReaderFactory.newBlockReader( + conf, s, file, new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken, - offsetIntoBlock, amtToRead, bufferSize); + offsetIntoBlock, amtToRead); byte[] buf = new byte[(int)amtToRead]; int readOffset = 0; int retries = 2; while ( amtToRead > 0 ) { - int numRead; + int numRead = amtToRead; try { - numRead = blockReader.readAll(buf, readOffset, (int)amtToRead); + blockReader.readFully(buf, readOffset, amtToRead); } catch (IOException e) { retries--; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java index 6217bb6fc47..d017679e0a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java @@ -124,7 +124,7 @@ public class DatanodeJspHelper { if (locations == null || locations.length == 0) { out.print("Empty file"); } else { - DatanodeInfo chosenNode = JspHelper.bestNode(firstBlock); + DatanodeInfo chosenNode = JspHelper.bestNode(firstBlock, conf); String fqdn = InetAddress.getByName(chosenNode.getHost()) .getCanonicalHostName(); String datanodeAddr = chosenNode.getName(); @@ -299,7 +299,7 @@ public class DatanodeJspHelper { // URL for TAIL LocatedBlock lastBlk = blocks.get(blocks.size() - 1); try { - chosenNode = JspHelper.bestNode(lastBlk); + chosenNode = JspHelper.bestNode(lastBlk, conf); } catch (IOException e) { out.print(e.toString()); dfs.close(); @@ -514,7 +514,7 @@ public class DatanodeJspHelper { .getGenerationStamp()); nextStartOffset = 0; nextBlockSize = nextBlock.getBlock().getNumBytes(); - DatanodeInfo d = JspHelper.bestNode(nextBlock); + DatanodeInfo d = JspHelper.bestNode(nextBlock, conf); String datanodeAddr = d.getName(); nextDatanodePort = Integer.parseInt(datanodeAddr.substring( datanodeAddr.indexOf(':') + 1, datanodeAddr.length())); @@ -569,7 +569,7 @@ public class DatanodeJspHelper { if (prevStartOffset < 0) prevStartOffset = 0; prevBlockSize = prevBlock.getBlock().getNumBytes(); - DatanodeInfo d = JspHelper.bestNode(prevBlock); + DatanodeInfo d = JspHelper.bestNode(prevBlock, conf); String datanodeAddr = d.getName(); prevDatanodePort = Integer.parseInt(datanodeAddr.substring( datanodeAddr.indexOf(':') + 1, datanodeAddr.length())); @@ -686,7 +686,7 @@ public class DatanodeJspHelper { long genStamp = lastBlk.getBlock().getGenerationStamp(); DatanodeInfo chosenNode; try { - chosenNode = JspHelper.bestNode(lastBlk); + chosenNode = JspHelper.bestNode(lastBlk, conf); } catch (IOException e) { out.print(e.toString()); dfs.close(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java index 54b0ec64abc..3e630661e94 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java @@ -52,7 +52,9 @@ public class FileDataServlet extends DfsServlet { String scheme = request.getScheme(); final LocatedBlocks blks = nnproxy.getBlockLocations( status.getFullPath(new Path(path)).toUri().getPath(), 0, 1); - final DatanodeID host = pickSrcDatanode(blks, status); + final Configuration conf = NameNodeHttpServer.getConfFromContext( + getServletContext()); + final DatanodeID host = pickSrcDatanode(blks, status, conf); final String hostname; if (host instanceof DatanodeInfo) { hostname = ((DatanodeInfo)host).getHostName(); @@ -83,16 +85,17 @@ public class FileDataServlet extends DfsServlet { /** Select a datanode to service this request. * Currently, this looks at no more than the first five blocks of a file, * selecting a datanode randomly from the most represented. + * @param conf */ - private DatanodeID pickSrcDatanode(LocatedBlocks blks, HdfsFileStatus i) - throws IOException { + private DatanodeID pickSrcDatanode(LocatedBlocks blks, HdfsFileStatus i, + Configuration conf) throws IOException { if (i.getLen() == 0 || blks.getLocatedBlocks().size() <= 0) { // pick a random datanode NameNode nn = NameNodeHttpServer.getNameNodeFromContext( getServletContext()); return NamenodeJspHelper.getRandomDatanode(nn); } - return JspHelper.bestNode(blks); + return JspHelper.bestNode(blks, conf); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java index 358d778eaf3..e60f83b5366 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java @@ -509,8 +509,9 @@ public class NamenodeFsck { String file = BlockReaderFactory.getFileName(targetAddr, block.getBlockPoolId(), block.getBlockId()); - blockReader = BlockReaderFactory.newBlockReader(s, file, block, lblock - .getBlockToken(), 0, -1, conf.getInt("io.file.buffer.size", 4096)); + blockReader = BlockReaderFactory.newBlockReader( + conf, s, file, block, lblock + .getBlockToken(), 0, -1); } catch (IOException ex) { // Put chosen node into dead list, continue diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java index 46ea367cb92..84b44640eaa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java @@ -118,8 +118,8 @@ public class NamenodeWebHdfsMethods { private @Context HttpServletResponse response; private static DatanodeInfo chooseDatanode(final NameNode namenode, - final String path, final HttpOpParam.Op op, final long openOffset - ) throws IOException { + final String path, final HttpOpParam.Op op, final long openOffset, + Configuration conf) throws IOException { if (op == GetOpParam.Op.OPEN || op == GetOpParam.Op.GETFILECHECKSUM || op == PostOpParam.Op.APPEND) { @@ -139,7 +139,7 @@ public class NamenodeWebHdfsMethods { final LocatedBlocks locations = np.getBlockLocations(path, offset, 1); final int count = locations.locatedBlockCount(); if (count > 0) { - return JspHelper.bestNode(locations.get(0)); + return JspHelper.bestNode(locations.get(0), conf); } } } @@ -165,7 +165,8 @@ public class NamenodeWebHdfsMethods { final UserGroupInformation ugi, final DelegationParam delegation, final String path, final HttpOpParam.Op op, final long openOffset, final Param... parameters) throws URISyntaxException, IOException { - final DatanodeInfo dn = chooseDatanode(namenode, path, op, openOffset); + final Configuration conf = (Configuration)context.getAttribute(JspHelper.CURRENT_CONF); + final DatanodeInfo dn = chooseDatanode(namenode, path, op, openOffset, conf); final String delegationQuery; if (!UserGroupInformation.isSecurityEnabled()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/DirectBufferPool.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/DirectBufferPool.java new file mode 100644 index 00000000000..69b238bbbf8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/DirectBufferPool.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.util; + +import java.lang.ref.WeakReference; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; + +import org.apache.hadoop.classification.InterfaceAudience; + +import com.google.common.annotations.VisibleForTesting; + +/** + * A simple class for pooling direct ByteBuffers. This is necessary + * because Direct Byte Buffers do not take up much space on the heap, + * and hence will not trigger GCs on their own. However, they do take + * native memory, and thus can cause high memory usage if not pooled. + * The pooled instances are referred to only via weak references, allowing + * them to be collected when a GC does run. + * + * This class only does effective pooling when many buffers will be + * allocated at the same size. There is no attempt to reuse larger + * buffers to satisfy smaller allocations. + */ +@InterfaceAudience.Private +public class DirectBufferPool { + + // Essentially implement a multimap with weak values. + ConcurrentMap>> buffersBySize = + new ConcurrentHashMap>>(); + + /** + * Allocate a direct buffer of the specified size, in bytes. + * If a pooled buffer is available, returns that. Otherwise + * allocates a new one. + */ + public ByteBuffer getBuffer(int size) { + Queue> list = buffersBySize.get(size); + if (list == null) { + // no available buffers for this size + return ByteBuffer.allocateDirect(size); + } + + WeakReference ref; + while ((ref = list.poll()) != null) { + ByteBuffer b = ref.get(); + if (b != null) { + return b; + } + } + + return ByteBuffer.allocateDirect(size); + } + + /** + * Return a buffer into the pool. After being returned, + * the buffer may be recycled, so the user must not + * continue to use it in any way. + * @param buf the buffer to return + */ + public void returnBuffer(ByteBuffer buf) { + buf.clear(); // reset mark, limit, etc + int size = buf.capacity(); + Queue> list = buffersBySize.get(size); + if (list == null) { + list = new ConcurrentLinkedQueue>(); + Queue> prev = buffersBySize.putIfAbsent(size, list); + // someone else put a queue in the map before we did + if (prev != null) { + list = prev; + } + } + list.add(new WeakReference(buf)); + } + + /** + * Return the number of available buffers of a given size. + * This is used only for tests. + */ + @VisibleForTesting + int countBuffersOfSize(int size) { + Queue> list = buffersBySize.get(size); + if (list == null) { + return 0; + } + + return list.size(); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java index c61e65b6c04..3c338e56f53 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java @@ -139,15 +139,17 @@ public class BlockReaderTestUtil { ExtendedBlock block = testBlock.getBlock(); DatanodeInfo[] nodes = testBlock.getLocations(); targetAddr = NetUtils.createSocketAddr(nodes[0].getName()); - sock = new Socket(); + sock = NetUtils.getDefaultSocketFactory(conf).createSocket(); sock.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT); sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); return BlockReaderFactory.newBlockReader( + new DFSClient.Conf(conf), sock, targetAddr.toString()+ ":" + block.getBlockId(), block, testBlock.getBlockToken(), offset, lenToRead, - conf.getInt("io.file.buffer.size", 4096)); + conf.getInt("io.file.buffer.size", 4096), + true, ""); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java index 8315e1f5d02..ec2d41c06de 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java @@ -20,11 +20,12 @@ package org.apache.hadoop.hdfs; import java.util.List; -import org.apache.hadoop.hdfs.RemoteBlockReader; import org.apache.hadoop.hdfs.DFSClient; +import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.fs.Path; +import org.apache.log4j.Level; import org.junit.Test; import org.junit.AfterClass; @@ -40,6 +41,9 @@ public class TestClientBlockVerification { static final int FILE_SIZE_K = 256; static LocatedBlock testBlock = null; + static { + ((Log4JLogger)RemoteBlockReader2.LOG).getLogger().setLevel(Level.ALL); + } @BeforeClass public static void setupCluster() throws Exception { final int REPLICATION_FACTOR = 1; @@ -54,7 +58,7 @@ public class TestClientBlockVerification { */ @Test public void testBlockVerification() throws Exception { - RemoteBlockReader reader = (RemoteBlockReader)spy( + RemoteBlockReader2 reader = (RemoteBlockReader2)spy( util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024)); util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true); verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK); @@ -66,7 +70,7 @@ public class TestClientBlockVerification { */ @Test public void testIncompleteRead() throws Exception { - RemoteBlockReader reader = (RemoteBlockReader)spy( + RemoteBlockReader2 reader = (RemoteBlockReader2)spy( util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024)); util.readAndCheckEOS(reader, FILE_SIZE_K / 2 * 1024, false); @@ -84,7 +88,7 @@ public class TestClientBlockVerification { @Test public void testCompletePartialRead() throws Exception { // Ask for half the file - RemoteBlockReader reader = (RemoteBlockReader)spy( + RemoteBlockReader2 reader = (RemoteBlockReader2)spy( util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2)); // And read half the file util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true); @@ -104,7 +108,7 @@ public class TestClientBlockVerification { for (int length : lengths) { DFSClient.LOG.info("Testing startOffset = " + startOffset + " and " + " len=" + length); - RemoteBlockReader reader = (RemoteBlockReader)spy( + RemoteBlockReader2 reader = (RemoteBlockReader2)spy( util.getBlockReader(testBlock, startOffset, length)); util.readAndCheckEOS(reader, length, true); verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java index a7d11c165ad..136a72205c7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java @@ -28,7 +28,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.RemoteBlockReader; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSInputStream; import org.apache.hadoop.hdfs.SocketCache; @@ -76,20 +75,20 @@ public class TestConnCache { * It verifies that all invocation to DFSInputStream.getBlockReader() * use the same socket. */ - private class MockGetBlockReader implements Answer { - public RemoteBlockReader reader = null; + private class MockGetBlockReader implements Answer { + public RemoteBlockReader2 reader = null; private Socket sock = null; - public RemoteBlockReader answer(InvocationOnMock invocation) throws Throwable { - RemoteBlockReader prevReader = reader; - reader = (RemoteBlockReader) invocation.callRealMethod(); + public RemoteBlockReader2 answer(InvocationOnMock invocation) throws Throwable { + RemoteBlockReader2 prevReader = reader; + reader = (RemoteBlockReader2) invocation.callRealMethod(); if (sock == null) { sock = reader.dnSock; - } else if (prevReader != null && prevReader.hasSentStatusCode()) { - // Can't reuse socket if the previous BlockReader didn't read till EOS. + } else if (prevReader != null) { assertSame("DFSInputStream should use the same socket", sock, reader.dnSock); - } return reader; + } + return reader; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSeekBug.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSeekBug.java index d2073a56815..a34e00a03c9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSeekBug.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSeekBug.java @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.ChecksumFileSystem; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; /** * This class tests the presence of seek bug as described @@ -67,12 +68,12 @@ public class TestSeekBug extends TestCase { stm.read(actual, 0, actual.length); // Now read a byte array that is bigger than the internal buffer actual = new byte[100000]; - stm.read(actual, 0, actual.length); + IOUtils.readFully(stm, actual, 0, actual.length); checkAndEraseData(actual, 128, expected, "First Read Test"); // now do a small seek, within the range that is already read stm.seek(96036); // 4 byte seek actual = new byte[128]; - stm.read(actual, 0, actual.length); + IOUtils.readFully(stm, actual, 0, actual.length); checkAndEraseData(actual, 96036, expected, "Seek Bug"); // all done stm.close(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java index d9309edc1d8..ddef17ba0f5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java @@ -137,15 +137,15 @@ public class TestBlockTokenWithDFS { try { DatanodeInfo[] nodes = lblock.getLocations(); targetAddr = NetUtils.createSocketAddr(nodes[0].getName()); - s = new Socket(); + s = NetUtils.getDefaultSocketFactory(conf).createSocket(); s.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT); s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); String file = BlockReaderFactory.getFileName(targetAddr, "test-blockpoolid", block.getBlockId()); - blockReader = BlockReaderFactory.newBlockReader(s, file, block, - lblock.getBlockToken(), 0, -1, - conf.getInt("io.file.buffer.size", 4096)); + blockReader = BlockReaderFactory.newBlockReader( + conf, s, file, block, + lblock.getBlockToken(), 0, -1); } catch (IOException ex) { if (ex instanceof InvalidBlockTokenException) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java index 89e48fb586f..c6fb4304da2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java @@ -56,6 +56,7 @@ import static org.junit.Assert.*; public class TestDataNodeVolumeFailure { final private int block_size = 512; MiniDFSCluster cluster = null; + private Configuration conf; int dn_num = 2; int blocks_num = 30; short repl=2; @@ -74,7 +75,7 @@ public class TestDataNodeVolumeFailure { @Before public void setUp() throws Exception { // bring up a cluster of 2 - Configuration conf = new HdfsConfiguration(); + conf = new HdfsConfiguration(); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, block_size); // Allow a single volume failure (there are two volumes) conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1); @@ -264,7 +265,7 @@ public class TestDataNodeVolumeFailure { targetAddr = NetUtils.createSocketAddr(datanode.getName()); - s = new Socket(); + s = NetUtils.getDefaultSocketFactory(conf).createSocket(); s.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT); s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); @@ -272,8 +273,8 @@ public class TestDataNodeVolumeFailure { "test-blockpoolid", block.getBlockId()); BlockReader blockReader = - BlockReaderFactory.newBlockReader(s, file, block, lblock - .getBlockToken(), 0, -1, 4096); + BlockReaderFactory.newBlockReader(conf, s, file, block, lblock + .getBlockToken(), 0, -1); // nothing - if it fails - it will throw and exception } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeJsp.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeJsp.java index eb78aced85a..d8a7f38ba3f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeJsp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeJsp.java @@ -67,7 +67,8 @@ public class TestDatanodeJsp { String viewFilePage = DFSTestUtil.urlGet(url); - assertTrue("page should show preview of file contents", viewFilePage.contains(FILE_DATA)); + assertTrue("page should show preview of file contents, got: " + viewFilePage, + viewFilePage.contains(FILE_DATA)); if (!doTail) { assertTrue("page should show link to download file", viewFilePage diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestDirectBufferPool.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestDirectBufferPool.java new file mode 100644 index 00000000000..8d2edf3e2e6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestDirectBufferPool.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.util; + +import static org.junit.Assert.*; + +import java.nio.ByteBuffer; +import java.util.List; + +import org.junit.Test; + +import com.google.common.collect.Lists; + +public class TestDirectBufferPool { + DirectBufferPool pool = new DirectBufferPool(); + + @Test + public void testBasics() { + ByteBuffer a = pool.getBuffer(100); + assertEquals(100, a.capacity()); + assertEquals(100, a.remaining()); + pool.returnBuffer(a); + + // Getting a new buffer should return the same one + ByteBuffer b = pool.getBuffer(100); + assertSame(a, b); + + // Getting a new buffer before returning "B" should + // not return the same one + ByteBuffer c = pool.getBuffer(100); + assertNotSame(b, c); + pool.returnBuffer(b); + pool.returnBuffer(c); + } + + @Test + public void testBuffersAreReset() { + ByteBuffer a = pool.getBuffer(100); + a.putInt(0xdeadbeef); + assertEquals(96, a.remaining()); + pool.returnBuffer(a); + + // Even though we return the same buffer, + // its position should be reset to 0 + ByteBuffer b = pool.getBuffer(100); + assertSame(a, b); + assertEquals(100, a.remaining()); + pool.returnBuffer(b); + } + + @Test + public void testWeakRefClearing() { + // Allocate and return 10 buffers. + List bufs = Lists.newLinkedList(); + for (int i = 0; i < 10; i++) { + ByteBuffer buf = pool.getBuffer(100); + bufs.add(buf); + } + + for (ByteBuffer buf : bufs) { + pool.returnBuffer(buf); + } + + assertEquals(10, pool.countBuffersOfSize(100)); + + // Clear out any references to the buffers, and force + // GC. Weak refs should get cleared. + bufs.clear(); + bufs = null; + for (int i = 0; i < 3; i++) { + System.gc(); + } + + ByteBuffer buf = pool.getBuffer(100); + // the act of getting a buffer should clear all the nulled + // references from the pool. + assertEquals(0, pool.countBuffersOfSize(100)); + pool.returnBuffer(buf); + } +} \ No newline at end of file