HBASE-22016 Rewrite the block reading methods by using hbase.nio.ByteBuff

This commit is contained in:
huzheng 2019-03-08 16:46:06 +08:00
parent 287e014b44
commit f97f6e3e6a
4 changed files with 453 additions and 243 deletions

View File

@ -0,0 +1,223 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.io.IOUtils;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
class BlockIOUtils {
static boolean isByteBufferReadable(FSDataInputStream is) {
InputStream cur = is.getWrappedStream();
for (;;) {
if ((cur instanceof FSDataInputStream)) {
cur = ((FSDataInputStream) cur).getWrappedStream();
} else {
break;
}
}
return cur instanceof ByteBufferReadable;
}
/**
* Read length bytes into ByteBuffers directly.
* @param buf the destination {@link ByteBuff}
* @param dis the HDFS input stream which implement the ByteBufferReadable interface.
* @param length bytes to read.
* @throws IOException exception to throw if any error happen
*/
static void readFully(ByteBuff buf, FSDataInputStream dis, int length) throws IOException {
if (!isByteBufferReadable(dis)) {
// If InputStream does not support the ByteBuffer read, just read to heap and copy bytes to
// the destination ByteBuff.
byte[] heapBuf = new byte[length];
IOUtils.readFully(dis, heapBuf, 0, length);
copyToByteBuff(heapBuf, 0, length, buf);
return;
}
ByteBuffer[] buffers = buf.nioByteBuffers();
int remain = length;
int idx = 0;
ByteBuffer cur = buffers[idx];
while (remain > 0) {
while (!cur.hasRemaining()) {
if (++idx >= buffers.length) {
throw new IOException(
"Not enough ByteBuffers to read the reminding " + remain + " " + "bytes");
}
cur = buffers[idx];
}
cur.limit(cur.position() + Math.min(remain, cur.remaining()));
int bytesRead = dis.read(cur);
if (bytesRead < 0) {
throw new IOException(
"Premature EOF from inputStream, but still need " + remain + " " + "bytes");
}
remain -= bytesRead;
}
}
/**
* Read from an input stream at least <code>necessaryLen</code> and if possible,
* <code>extraLen</code> also if available. Analogous to
* {@link IOUtils#readFully(InputStream, byte[], int, int)}, but specifies a number of "extra"
* bytes to also optionally read.
* @param in the input stream to read from
* @param buf the buffer to read into
* @param bufOffset the destination offset in the buffer
* @param necessaryLen the number of bytes that are absolutely necessary to read
* @param extraLen the number of extra bytes that would be nice to read
* @return true if succeeded reading the extra bytes
* @throws IOException if failed to read the necessary bytes
*/
private static boolean readWithExtraOnHeap(InputStream in, byte[] buf, int bufOffset,
int necessaryLen, int extraLen) throws IOException {
int bytesRemaining = necessaryLen + extraLen;
while (bytesRemaining > 0) {
int ret = in.read(buf, bufOffset, bytesRemaining);
if (ret < 0) {
if (bytesRemaining <= extraLen) {
// We could not read the "extra data", but that is OK.
break;
}
throw new IOException("Premature EOF from inputStream (read " + "returned " + ret
+ ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen
+ " extra bytes, " + "successfully read " + (necessaryLen + extraLen - bytesRemaining));
}
bufOffset += ret;
bytesRemaining -= ret;
}
return bytesRemaining <= 0;
}
/**
* Read bytes into ByteBuffers directly, those buffers either contains the extraLen bytes or only
* contains necessaryLen bytes, which depends on how much bytes do the last time we read.
* @param buf the destination {@link ByteBuff}.
* @param dis input stream to read.
* @param necessaryLen bytes which we must read
* @param extraLen bytes which we may read
* @return if the returned flag is true, then we've finished to read the extraLen into our
* ByteBuffers, otherwise we've not read the extraLen bytes yet.
* @throws IOException if failed to read the necessary bytes.
*/
static boolean readWithExtra(ByteBuff buf, FSDataInputStream dis, int necessaryLen, int extraLen)
throws IOException {
if (!isByteBufferReadable(dis)) {
// If InputStream does not support the ByteBuffer read, just read to heap and copy bytes to
// the destination ByteBuff.
byte[] heapBuf = new byte[necessaryLen + extraLen];
boolean ret = readWithExtraOnHeap(dis, heapBuf, 0, necessaryLen, extraLen);
copyToByteBuff(heapBuf, 0, heapBuf.length, buf);
return ret;
}
ByteBuffer[] buffers = buf.nioByteBuffers();
int bytesRead = 0;
int remain = necessaryLen + extraLen;
int idx = 0;
ByteBuffer cur = buffers[idx];
while (bytesRead < necessaryLen) {
while (!cur.hasRemaining()) {
if (++idx >= buffers.length) {
throw new IOException("Not enough ByteBuffers to read the reminding " + remain + "bytes");
}
cur = buffers[idx];
}
cur.limit(cur.position() + Math.min(remain, cur.remaining()));
int ret = dis.read(cur);
if (ret < 0) {
throw new IOException("Premature EOF from inputStream (read returned " + ret
+ ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen
+ " extra bytes, successfully read " + bytesRead);
}
bytesRead += ret;
remain -= ret;
}
return (extraLen > 0) && (bytesRead == necessaryLen + extraLen);
}
/**
* Read from an input stream at least <code>necessaryLen</code> and if possible,
* <code>extraLen</code> also if available. Analogous to
* {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses positional read and
* specifies a number of "extra" bytes that would be desirable but not absolutely necessary to
* read.
* @param buff ByteBuff to read into.
* @param dis the input stream to read from
* @param position the position within the stream from which to start reading
* @param necessaryLen the number of bytes that are absolutely necessary to read
* @param extraLen the number of extra bytes that would be nice to read
* @return true if and only if extraLen is > 0 and reading those extra bytes was successful
* @throws IOException if failed to read the necessary bytes
*/
static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position,
int necessaryLen, int extraLen) throws IOException {
int remain = necessaryLen + extraLen;
byte[] buf = new byte[remain];
int bytesRead = 0;
while (bytesRead < necessaryLen) {
int ret = dis.read(position + bytesRead, buf, bytesRead, remain);
if (ret < 0) {
throw new IOException("Premature EOF from inputStream (positional read returned " + ret
+ ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen
+ " extra bytes, successfully read " + bytesRead);
}
bytesRead += ret;
remain -= ret;
}
// Copy the bytes from on-heap bytes[] to ByteBuffer[] now, and after resolving HDFS-3246, we
// will read the bytes to ByteBuffer[] directly without allocating any on-heap byte[].
// TODO I keep the bytes copy here, because I want to abstract the ByteBuffer[]
// preadWithExtra method for the upper layer, only need to refactor this method if the
// ByteBuffer pread is OK.
copyToByteBuff(buf, 0, bytesRead, buff);
return (extraLen > 0) && (bytesRead == necessaryLen + extraLen);
}
private static int copyToByteBuff(byte[] buf, int offset, int len, ByteBuff out)
throws IOException {
if (offset < 0 || len < 0 || offset + len > buf.length) {
throw new IOException("Invalid offset=" + offset + " and len=" + len + ", cap=" + buf.length);
}
ByteBuffer[] buffers = out.nioByteBuffers();
int idx = 0, remain = len, copyLen;
ByteBuffer cur = buffers[idx];
while (remain > 0) {
while (!cur.hasRemaining()) {
if (++idx >= buffers.length) {
throw new IOException("Not enough ByteBuffers to read the reminding " + remain + "bytes");
}
cur = buffers[idx];
}
copyLen = Math.min(cur.remaining(), remain);
cur.put(buf, offset, copyLen);
remain -= copyLen;
offset += copyLen;
}
return len;
}
}

View File

@ -21,7 +21,6 @@ import java.io.DataInputStream;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
@ -51,7 +50,6 @@ import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.io.IOUtils;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
@ -276,9 +274,7 @@ public class HFileBlock implements Cacheable {
boolean usesChecksum = buf.get() == (byte) 1; boolean usesChecksum = buf.get() == (byte) 1;
long offset = buf.getLong(); long offset = buf.getLong();
int nextBlockOnDiskSize = buf.getInt(); int nextBlockOnDiskSize = buf.getInt();
HFileBlock hFileBlock = return new HFileBlock(newByteBuff, usesChecksum, memType, offset, nextBlockOnDiskSize, null);
new HFileBlock(newByteBuff, usesChecksum, memType, offset, nextBlockOnDiskSize, null);
return hFileBlock;
} }
@Override @Override
@ -311,9 +307,9 @@ public class HFileBlock implements Cacheable {
* param. * param.
*/ */
private HFileBlock(HFileBlock that, boolean bufCopy) { private HFileBlock(HFileBlock that, boolean bufCopy) {
init(that.blockType, that.onDiskSizeWithoutHeader, init(that.blockType, that.onDiskSizeWithoutHeader, that.uncompressedSizeWithoutHeader,
that.uncompressedSizeWithoutHeader, that.prevBlockOffset, that.prevBlockOffset, that.offset, that.onDiskDataSizeWithHeader, that.nextBlockOnDiskSize,
that.offset, that.onDiskDataSizeWithHeader, that.nextBlockOnDiskSize, that.fileContext); that.fileContext);
if (bufCopy) { if (bufCopy) {
this.buf = new SingleByteBuff(ByteBuffer.wrap(that.buf.toBytes(0, that.buf.limit()))); this.buf = new SingleByteBuff(ByteBuffer.wrap(that.buf.toBytes(0, that.buf.limit())));
} else { } else {
@ -328,6 +324,7 @@ public class HFileBlock implements Cacheable {
* See {@link Writer#getBlockForCaching(CacheConfig)}. * See {@link Writer#getBlockForCaching(CacheConfig)}.
* *
* <p>TODO: The caller presumes no checksumming * <p>TODO: The caller presumes no checksumming
* <p>TODO: HFile block writer can also off-heap ? </p>
* required of this block instance since going into cache; checksum already verified on * required of this block instance since going into cache; checksum already verified on
* underlying block data pulled in from filesystem. Is that correct? What if cache is SSD? * underlying block data pulled in from filesystem. Is that correct? What if cache is SSD?
* *
@ -346,8 +343,8 @@ public class HFileBlock implements Cacheable {
int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuffer b, boolean fillHeader, int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuffer b, boolean fillHeader,
long offset, final int nextBlockOnDiskSize, int onDiskDataSizeWithHeader, long offset, final int nextBlockOnDiskSize, int onDiskDataSizeWithHeader,
HFileContext fileContext) { HFileContext fileContext) {
init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, offset,
prevBlockOffset, offset, onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext); onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext);
this.buf = new SingleByteBuff(b); this.buf = new SingleByteBuff(b);
if (fillHeader) { if (fillHeader) {
overwriteHeader(); overwriteHeader();
@ -363,7 +360,8 @@ public class HFileBlock implements Cacheable {
* @param buf Has header, content, and trailing checksums if present. * @param buf Has header, content, and trailing checksums if present.
*/ */
HFileBlock(ByteBuff buf, boolean usesHBaseChecksum, MemoryType memType, final long offset, HFileBlock(ByteBuff buf, boolean usesHBaseChecksum, MemoryType memType, final long offset,
final int nextBlockOnDiskSize, HFileContext fileContext) throws IOException { final int nextBlockOnDiskSize, HFileContext fileContext)
throws IOException {
buf.rewind(); buf.rewind();
final BlockType blockType = BlockType.read(buf); final BlockType blockType = BlockType.read(buf);
final int onDiskSizeWithoutHeader = buf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX); final int onDiskSizeWithoutHeader = buf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX);
@ -391,8 +389,8 @@ public class HFileBlock implements Cacheable {
} }
fileContext = fileContextBuilder.build(); fileContext = fileContextBuilder.build();
assert usesHBaseChecksum == fileContext.isUseHBaseChecksum(); assert usesHBaseChecksum == fileContext.isUseHBaseChecksum();
init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, offset,
prevBlockOffset, offset, onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext); onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext);
this.memType = memType; this.memType = memType;
this.offset = offset; this.offset = offset;
this.buf = buf; this.buf = buf;
@ -403,9 +401,8 @@ public class HFileBlock implements Cacheable {
* Called from constructors. * Called from constructors.
*/ */
private void init(BlockType blockType, int onDiskSizeWithoutHeader, private void init(BlockType blockType, int onDiskSizeWithoutHeader,
int uncompressedSizeWithoutHeader, long prevBlockOffset, int uncompressedSizeWithoutHeader, long prevBlockOffset, long offset,
long offset, int onDiskDataSizeWithHeader, final int nextBlockOnDiskSize, int onDiskDataSizeWithHeader, final int nextBlockOnDiskSize, HFileContext fileContext) {
HFileContext fileContext) {
this.blockType = blockType; this.blockType = blockType;
this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader; this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader; this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
@ -422,10 +419,9 @@ public class HFileBlock implements Cacheable {
* @param verifyChecksum true if checksum verification is in use. * @param verifyChecksum true if checksum verification is in use.
* @return Size of the block with header included. * @return Size of the block with header included.
*/ */
private static int getOnDiskSizeWithHeader(final ByteBuffer headerBuf, private static int getOnDiskSizeWithHeader(final ByteBuff headerBuf,
boolean verifyChecksum) { boolean verifyChecksum) {
return headerBuf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX) + return headerBuf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX) + headerSize(verifyChecksum);
headerSize(verifyChecksum);
} }
/** /**
@ -648,9 +644,10 @@ public class HFileBlock implements Cacheable {
ByteBuff dup = this.buf.duplicate(); ByteBuff dup = this.buf.duplicate();
dup.position(this.headerSize()); dup.position(this.headerSize());
dup = dup.slice(); dup = dup.slice();
ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(), ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(),
unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(), unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(), dup);
dup);
return unpacked; return unpacked;
} }
@ -664,15 +661,14 @@ public class HFileBlock implements Cacheable {
int headerSize = headerSize(); int headerSize = headerSize();
int capacityNeeded = headerSize + uncompressedSizeWithoutHeader + cksumBytes; int capacityNeeded = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
// TODO we need consider allocating offheap here? ByteBuff newBuf = new SingleByteBuff(ByteBuffer.allocate(capacityNeeded));
ByteBuffer newBuf = ByteBuffer.allocate(capacityNeeded);
// Copy header bytes into newBuf. // Copy header bytes into newBuf.
// newBuf is HBB so no issue in calling array() // newBuf is HBB so no issue in calling array()
buf.position(0); buf.position(0);
buf.get(newBuf.array(), newBuf.arrayOffset(), headerSize); newBuf.put(0, buf, 0, headerSize);
buf = new SingleByteBuff(newBuf); buf = newBuf;
// set limit to exclude next block's header // set limit to exclude next block's header
buf.limit(headerSize + uncompressedSizeWithoutHeader + cksumBytes); buf.limit(headerSize + uncompressedSizeWithoutHeader + cksumBytes);
} }
@ -689,17 +685,6 @@ public class HFileBlock implements Cacheable {
return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize; return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize;
} }
/** An additional sanity-check in case no compression or encryption is being used. */
@VisibleForTesting
void sanityCheckUncompressedSize() throws IOException {
if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader + totalChecksumBytes()) {
throw new IOException("Using no compression but "
+ "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", "
+ "uncompressedSizeWithoutHeader=" + uncompressedSizeWithoutHeader
+ ", numChecksumbytes=" + totalChecksumBytes());
}
}
/** /**
* Cannot be {@link #UNSET}. Must be a legitimate value. Used re-making the {@link BlockCacheKey} when * Cannot be {@link #UNSET}. Must be a legitimate value. Used re-making the {@link BlockCacheKey} when
* block is returned to the cache. * block is returned to the cache.
@ -744,82 +729,6 @@ public class HFileBlock implements Cacheable {
return ClassSize.align(size); return ClassSize.align(size);
} }
/**
* Read from an input stream at least <code>necessaryLen</code> and if possible,
* <code>extraLen</code> also if available. Analogous to
* {@link IOUtils#readFully(InputStream, byte[], int, int)}, but specifies a
* number of "extra" bytes to also optionally read.
*
* @param in the input stream to read from
* @param buf the buffer to read into
* @param bufOffset the destination offset in the buffer
* @param necessaryLen the number of bytes that are absolutely necessary to read
* @param extraLen the number of extra bytes that would be nice to read
* @return true if succeeded reading the extra bytes
* @throws IOException if failed to read the necessary bytes
*/
static boolean readWithExtra(InputStream in, byte[] buf,
int bufOffset, int necessaryLen, int extraLen) throws IOException {
int bytesRemaining = necessaryLen + extraLen;
while (bytesRemaining > 0) {
int ret = in.read(buf, bufOffset, bytesRemaining);
if (ret == -1 && bytesRemaining <= extraLen) {
// We could not read the "extra data", but that is OK.
break;
}
if (ret < 0) {
throw new IOException("Premature EOF from inputStream (read "
+ "returned " + ret + ", was trying to read " + necessaryLen
+ " necessary bytes and " + extraLen + " extra bytes, "
+ "successfully read "
+ (necessaryLen + extraLen - bytesRemaining));
}
bufOffset += ret;
bytesRemaining -= ret;
}
return bytesRemaining <= 0;
}
/**
* Read from an input stream at least <code>necessaryLen</code> and if possible,
* <code>extraLen</code> also if available. Analogous to
* {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses
* positional read and specifies a number of "extra" bytes that would be
* desirable but not absolutely necessary to read.
*
* @param in the input stream to read from
* @param position the position within the stream from which to start reading
* @param buf the buffer to read into
* @param bufOffset the destination offset in the buffer
* @param necessaryLen the number of bytes that are absolutely necessary to
* read
* @param extraLen the number of extra bytes that would be nice to read
* @return true if and only if extraLen is > 0 and reading those extra bytes
* was successful
* @throws IOException if failed to read the necessary bytes
*/
@VisibleForTesting
static boolean positionalReadWithExtra(FSDataInputStream in,
long position, byte[] buf, int bufOffset, int necessaryLen, int extraLen)
throws IOException {
int bytesRemaining = necessaryLen + extraLen;
int bytesRead = 0;
while (bytesRead < necessaryLen) {
int ret = in.read(position, buf, bufOffset, bytesRemaining);
if (ret < 0) {
throw new IOException("Premature EOF from inputStream (positional read "
+ "returned " + ret + ", was trying to read " + necessaryLen
+ " necessary bytes and " + extraLen + " extra bytes, "
+ "successfully read " + bytesRead);
}
position += ret;
bufOffset += ret;
bytesRemaining -= ret;
bytesRead += ret;
}
return bytesRead != necessaryLen && bytesRemaining <= 0;
}
/** /**
* Unified version 2 {@link HFile} block writer. The intended usage pattern * Unified version 2 {@link HFile} block writer. The intended usage pattern
* is as follows: * is as follows:
@ -984,18 +893,6 @@ public class HFileBlock implements Cacheable {
this.encodedDataSizeWritten += this.userDataStream.size() - posBeforeEncode; this.encodedDataSizeWritten += this.userDataStream.size() - posBeforeEncode;
} }
/**
* Returns the stream for the user to write to. The block writer takes care
* of handling compression and buffering for caching on write. Can only be
* called in the "writing" state.
*
* @return the data output stream for the user to write to
*/
DataOutputStream getUserDataStream() {
expectState(State.WRITING);
return userDataStream;
}
/** /**
* Transitions the block writer from the "writing" state to the "block * Transitions the block writer from the "writing" state to the "block
* ready" state. Does nothing if a block is already finished. * ready" state. Does nothing if a block is already finished.
@ -1258,11 +1155,9 @@ public class HFileBlock implements Cacheable {
} }
/** /**
* Clones the header followed by the on-disk (compressed/encoded/encrypted) data. This is * Clones the header followed by the on-disk (compressed/encoded/encrypted) data. This is needed
* needed for storing packed blocks in the block cache. Expects calling semantics identical to * for storing packed blocks in the block cache. Returns only the header and data, Does not
* {@link #getUncompressedBufferWithHeader()}. Returns only the header and data, * include checksum data.
* Does not include checksum data.
*
* @return Returns a copy of block bytes for caching on write * @return Returns a copy of block bytes for caching on write
*/ */
private ByteBuffer cloneOnDiskBufferWithHeader() { private ByteBuffer cloneOnDiskBufferWithHeader() {
@ -1320,9 +1215,8 @@ public class HFileBlock implements Cacheable {
.build(); .build();
return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(), return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
getUncompressedSizeWithoutHeader(), prevOffset, getUncompressedSizeWithoutHeader(), prevOffset,
cacheConf.shouldCacheCompressed(blockType.getCategory())? cacheConf.shouldCacheCompressed(blockType.getCategory()) ? cloneOnDiskBufferWithHeader()
cloneOnDiskBufferWithHeader() : : cloneUncompressedBufferWithHeader(),
cloneUncompressedBufferWithHeader(),
FILL_HEADER, startOffset, UNSET, FILL_HEADER, startOffset, UNSET,
onDiskBlockBytesWithHeader.size() + onDiskChecksum.length, newContext); onDiskBlockBytesWithHeader.size() + onDiskChecksum.length, newContext);
} }
@ -1413,7 +1307,7 @@ public class HFileBlock implements Cacheable {
private static class PrefetchedHeader { private static class PrefetchedHeader {
long offset = -1; long offset = -1;
byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE]; byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
final ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE); final ByteBuff buf = new SingleByteBuff(ByteBuffer.wrap(header, 0, header.length));
@Override @Override
public String toString() { public String toString() {
@ -1476,8 +1370,8 @@ public class HFileBlock implements Cacheable {
} }
/** /**
* A constructor that reads files with the latest minor version. * A constructor that reads files with the latest minor version. This is used by unit tests
* This is used by unit tests only. * only.
*/ */
FSReaderImpl(FSDataInputStream istream, long fileSize, HFileContext fileContext) FSReaderImpl(FSDataInputStream istream, long fileSize, HFileContext fileContext)
throws IOException { throws IOException {
@ -1517,60 +1411,49 @@ public class HFileBlock implements Cacheable {
} }
/** /**
* Does a positional read or a seek and read into the given buffer. Returns * Does a positional read or a seek and read into the given byte buffer. We need take care that
* the on-disk size of the next block, or -1 if it could not be read/determined; e.g. EOF. * we will call the {@link ByteBuff#release()} for every exit to deallocate the ByteBuffers,
* * otherwise the memory leak may happen.
* @param dest destination buffer * @param dest destination buffer
* @param destOffset offset into the destination buffer at where to put the bytes we read
* @param size size of read * @param size size of read
* @param peekIntoNextBlock whether to read the next block's on-disk size * @param peekIntoNextBlock whether to read the next block's on-disk size
* @param fileOffset position in the stream to read at * @param fileOffset position in the stream to read at
* @param pread whether we should do a positional read * @param pread whether we should do a positional read
* @param istream The input source of data * @param istream The input source of data
* @return the on-disk size of the next block with header size included, or * @return true to indicate the destination buffer include the next block header, otherwise only
* -1 if it could not be determined; if not -1, the <code>dest</code> INCLUDES the * include the current block data without the next block header.
* next header * @throws IOException if any IO error happen.
* @throws IOException
*/ */
@VisibleForTesting protected boolean readAtOffset(FSDataInputStream istream, ByteBuff dest, int size,
protected int readAtOffset(FSDataInputStream istream, byte[] dest, int destOffset, int size, boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException {
boolean peekIntoNextBlock, long fileOffset, boolean pread)
throws IOException {
if (peekIntoNextBlock && destOffset + size + hdrSize > dest.length) {
// We are asked to read the next block's header as well, but there is
// not enough room in the array.
throw new IOException("Attempted to read " + size + " bytes and " + hdrSize +
" bytes of next header into a " + dest.length + "-byte array at offset " + destOffset);
}
if (!pread) { if (!pread) {
// Seek + read. Better for scanning. // Seek + read. Better for scanning.
HFileUtil.seekOnMultipleSources(istream, fileOffset); HFileUtil.seekOnMultipleSources(istream, fileOffset);
// TODO: do we need seek time latencies?
long realOffset = istream.getPos(); long realOffset = istream.getPos();
if (realOffset != fileOffset) { if (realOffset != fileOffset) {
throw new IOException("Tried to seek to " + fileOffset + " to " + "read " + size + throw new IOException("Tried to seek to " + fileOffset + " to read " + size
" bytes, but pos=" + realOffset + " after seek"); + " bytes, but pos=" + realOffset + " after seek");
} }
if (!peekIntoNextBlock) { if (!peekIntoNextBlock) {
IOUtils.readFully(istream, dest, destOffset, size); BlockIOUtils.readFully(dest, istream, size);
return -1; return false;
} }
// Try to read the next block header. // Try to read the next block header
if (!readWithExtra(istream, dest, destOffset, size, hdrSize)) { if (!BlockIOUtils.readWithExtra(dest, istream, size, hdrSize)) {
return -1; // did not read the next block header.
return false;
} }
} else { } else {
// Positional read. Better for random reads; or when the streamLock is already locked. // Positional read. Better for random reads; or when the streamLock is already locked.
int extraSize = peekIntoNextBlock ? hdrSize : 0; int extraSize = peekIntoNextBlock ? hdrSize : 0;
if (!positionalReadWithExtra(istream, fileOffset, dest, destOffset, size, extraSize)) { if (!BlockIOUtils.preadWithExtra(dest, istream, fileOffset, size, extraSize)) {
return -1; // did not read the next block header.
return false;
} }
} }
assert peekIntoNextBlock; assert peekIntoNextBlock;
return Bytes.toInt(dest, destOffset + size + BlockType.MAGIC_LENGTH) + hdrSize; return true;
} }
/** /**
@ -1669,7 +1552,7 @@ public class HFileBlock implements Cacheable {
* is not right. * is not right.
* @throws IOException * @throws IOException
*/ */
private void verifyOnDiskSizeMatchesHeader(final int passedIn, final ByteBuffer headerBuf, private void verifyOnDiskSizeMatchesHeader(final int passedIn, final ByteBuff headerBuf,
final long offset, boolean verifyChecksum) final long offset, boolean verifyChecksum)
throws IOException { throws IOException {
// Assert size provided aligns with what is in the header // Assert size provided aligns with what is in the header
@ -1688,9 +1571,9 @@ public class HFileBlock implements Cacheable {
* we have to backup the stream because we over-read (the next block's header). * we have to backup the stream because we over-read (the next block's header).
* @see PrefetchedHeader * @see PrefetchedHeader
* @return The cached block header or null if not found. * @return The cached block header or null if not found.
* @see #cacheNextBlockHeader(long, byte[], int, int) * @see #cacheNextBlockHeader(long, ByteBuff, int, int)
*/ */
private ByteBuffer getCachedHeader(final long offset) { private ByteBuff getCachedHeader(final long offset) {
PrefetchedHeader ph = this.prefetchedHeader.get(); PrefetchedHeader ph = this.prefetchedHeader.get();
return ph != null && ph.offset == offset ? ph.buf : null; return ph != null && ph.offset == offset ? ph.buf : null;
} }
@ -1701,13 +1584,24 @@ public class HFileBlock implements Cacheable {
* @see PrefetchedHeader * @see PrefetchedHeader
*/ */
private void cacheNextBlockHeader(final long offset, private void cacheNextBlockHeader(final long offset,
final byte [] header, final int headerOffset, final int headerLength) { ByteBuff onDiskBlock, int onDiskSizeWithHeader, int headerLength) {
PrefetchedHeader ph = new PrefetchedHeader(); PrefetchedHeader ph = new PrefetchedHeader();
ph.offset = offset; ph.offset = offset;
System.arraycopy(header, headerOffset, ph.header, 0, headerLength); onDiskBlock.get(onDiskSizeWithHeader, ph.header, 0, headerLength);
this.prefetchedHeader.set(ph); this.prefetchedHeader.set(ph);
} }
private int getNextBlockOnDiskSize(boolean readNextHeader, ByteBuff onDiskBlock,
int onDiskSizeWithHeader) {
int nextBlockOnDiskSize = -1;
if (readNextHeader) {
nextBlockOnDiskSize =
onDiskBlock.getIntAfterPosition(onDiskSizeWithHeader + BlockType.MAGIC_LENGTH)
+ hdrSize;
}
return nextBlockOnDiskSize;
}
/** /**
* Reads a version 2 block. * Reads a version 2 block.
* *
@ -1734,7 +1628,7 @@ public class HFileBlock implements Cacheable {
// Try and get cached header. Will serve us in rare case where onDiskSizeWithHeaderL is -1 // Try and get cached header. Will serve us in rare case where onDiskSizeWithHeaderL is -1
// and will save us having to seek the stream backwards to reread the header we // and will save us having to seek the stream backwards to reread the header we
// read the last time through here. // read the last time through here.
ByteBuffer headerBuf = getCachedHeader(offset); ByteBuff headerBuf = getCachedHeader(offset);
LOG.trace("Reading {} at offset={}, pread={}, verifyChecksum={}, cachedHeader={}, " + LOG.trace("Reading {} at offset={}, pread={}, verifyChecksum={}, cachedHeader={}, " +
"onDiskSizeWithHeader={}", this.fileContext.getHFileName(), offset, pread, "onDiskSizeWithHeader={}", this.fileContext.getHFileName(), offset, pread,
verifyChecksum, headerBuf, onDiskSizeWithHeader); verifyChecksum, headerBuf, onDiskSizeWithHeader);
@ -1754,9 +1648,9 @@ public class HFileBlock implements Cacheable {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Extra see to get block size!", new RuntimeException()); LOG.trace("Extra see to get block size!", new RuntimeException());
} }
headerBuf = ByteBuffer.allocate(hdrSize); headerBuf = new SingleByteBuff(ByteBuffer.allocate(hdrSize));
readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(), hdrSize, false, readAtOffset(is, headerBuf, hdrSize, false, offset, pread);
offset, pread); headerBuf.rewind();
} }
onDiskSizeWithHeader = getOnDiskSizeWithHeader(headerBuf, checksumSupport); onDiskSizeWithHeader = getOnDiskSizeWithHeader(headerBuf, checksumSupport);
} }
@ -1767,24 +1661,27 @@ public class HFileBlock implements Cacheable {
// says where to start reading. If we have the header cached, then we don't need to read // says where to start reading. If we have the header cached, then we don't need to read
// it again and we can likely read from last place we left off w/o need to backup and reread // it again and we can likely read from last place we left off w/o need to backup and reread
// the header we read last time through here. // the header we read last time through here.
// TODO: Make this ByteBuffer-based. Will make it easier to go to HDFS with BBPool (offheap). ByteBuff onDiskBlock =
byte [] onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize]; new SingleByteBuff(ByteBuffer.allocate(onDiskSizeWithHeader + hdrSize));
int nextBlockOnDiskSize = readAtOffset(is, onDiskBlock, preReadHeaderSize, boolean initHFileBlockSuccess = false;
onDiskSizeWithHeader - preReadHeaderSize, true, offset + preReadHeaderSize, pread); try {
if (headerBuf != null) { if (headerBuf != null) {
// The header has been read when reading the previous block OR in a distinct header-only onDiskBlock.put(0, headerBuf, 0, hdrSize).position(hdrSize);
// read. Copy to this block's header. }
System.arraycopy(headerBuf.array(), headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize); boolean readNextHeader = readAtOffset(is, onDiskBlock,
} else { onDiskSizeWithHeader - preReadHeaderSize, true, offset + preReadHeaderSize, pread);
headerBuf = ByteBuffer.wrap(onDiskBlock, 0, hdrSize); onDiskBlock.rewind(); // in case of moving position when copying a cached header
int nextBlockOnDiskSize =
getNextBlockOnDiskSize(readNextHeader, onDiskBlock, onDiskSizeWithHeader);
if (headerBuf == null) {
headerBuf = onDiskBlock.duplicate().position(0).limit(hdrSize);
} }
// Do a few checks before we go instantiate HFileBlock. // Do a few checks before we go instantiate HFileBlock.
assert onDiskSizeWithHeader > this.hdrSize; assert onDiskSizeWithHeader > this.hdrSize;
verifyOnDiskSizeMatchesHeader(onDiskSizeWithHeader, headerBuf, offset, checksumSupport); verifyOnDiskSizeMatchesHeader(onDiskSizeWithHeader, headerBuf, offset, checksumSupport);
ByteBuff onDiskBlockByteBuff = ByteBuff curBlock = onDiskBlock.duplicate().limit(onDiskSizeWithHeader);
new SingleByteBuff(ByteBuffer.wrap(onDiskBlock, 0, onDiskSizeWithHeader));
// Verify checksum of the data before using it for building HFileBlock. // Verify checksum of the data before using it for building HFileBlock.
if (verifyChecksum && !validateChecksum(offset, onDiskBlockByteBuff, hdrSize)) { if (verifyChecksum && !validateChecksum(offset, curBlock, hdrSize)) {
return null; return null;
} }
long duration = System.currentTimeMillis() - startTime; long duration = System.currentTimeMillis() - startTime;
@ -1794,8 +1691,8 @@ public class HFileBlock implements Cacheable {
// The onDiskBlock will become the headerAndDataBuffer for this block. // The onDiskBlock will become the headerAndDataBuffer for this block.
// If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
// contains the header of next block, so no need to set next block's header in it. // contains the header of next block, so no need to set next block's header in it.
HFileBlock hFileBlock = new HFileBlock(onDiskBlockByteBuff, checksumSupport, HFileBlock hFileBlock = new HFileBlock(curBlock, checksumSupport, MemoryType.EXCLUSIVE,
MemoryType.EXCLUSIVE, offset, nextBlockOnDiskSize, fileContext); offset, nextBlockOnDiskSize, fileContext);
// Run check on uncompressed sizings. // Run check on uncompressed sizings.
if (!fileContext.isCompressedOrEncrypted()) { if (!fileContext.isCompressedOrEncrypted()) {
hFileBlock.sanityCheckUncompressed(); hFileBlock.sanityCheckUncompressed();
@ -1803,10 +1700,16 @@ public class HFileBlock implements Cacheable {
LOG.trace("Read {} in {} ns", hFileBlock, duration); LOG.trace("Read {} in {} ns", hFileBlock, duration);
// Cache next block header if we read it for the next time through here. // Cache next block header if we read it for the next time through here.
if (nextBlockOnDiskSize != -1) { if (nextBlockOnDiskSize != -1) {
cacheNextBlockHeader(offset + hFileBlock.getOnDiskSizeWithHeader(), cacheNextBlockHeader(offset + hFileBlock.getOnDiskSizeWithHeader(), onDiskBlock,
onDiskBlock, onDiskSizeWithHeader, hdrSize); onDiskSizeWithHeader, hdrSize);
} }
initHFileBlockSuccess = true;
return hFileBlock; return hFileBlock;
} finally {
if (!initHFileBlockSuccess) {
onDiskBlock.release();
}
}
} }
@Override @Override

View File

@ -17,33 +17,115 @@
*/ */
package org.apache.hadoop.hbase.io.hfile; package org.apache.hadoop.hbase.io.hfile;
import static org.junit.Assert.*; import static org.junit.Assert.assertArrayEquals;
import static org.mockito.Mockito.*; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.MultiByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException; import org.junit.rules.ExpectedException;
/**
* Unit test suite covering HFileBlock positional read logic.
*/
@Category({ IOTests.class, SmallTests.class }) @Category({ IOTests.class, SmallTests.class })
public class TestHFileBlockPositionalRead { public class TestBlockIOUtils {
@ClassRule @ClassRule
public static final HBaseClassTestRule CLASS_RULE = public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestHFileBlockPositionalRead.class); HBaseClassTestRule.forClass(TestBlockIOUtils.class);
@Rule @Rule
public ExpectedException exception = ExpectedException.none(); public ExpectedException exception = ExpectedException.none();
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@Test
public void testIsByteBufferReadable() throws IOException {
FileSystem fs = TEST_UTIL.getTestFileSystem();
Path p = new Path(TEST_UTIL.getDataTestDirOnTestFS(), "testIsByteBufferReadable");
try (FSDataOutputStream out = fs.create(p)) {
out.writeInt(23);
}
try (FSDataInputStream is = fs.open(p)) {
assertFalse(BlockIOUtils.isByteBufferReadable(is));
}
}
@Test
public void testReadFully() throws IOException {
FileSystem fs = TEST_UTIL.getTestFileSystem();
Path p = new Path(TEST_UTIL.getDataTestDirOnTestFS(), "testReadFully");
String s = "hello world";
try (FSDataOutputStream out = fs.create(p)) {
out.writeBytes(s);
}
ByteBuff buf = new SingleByteBuff(ByteBuffer.allocate(11));
try (FSDataInputStream in = fs.open(p)) {
BlockIOUtils.readFully(buf, in, 11);
}
buf.rewind();
byte[] heapBuf = new byte[s.length()];
buf.get(heapBuf, 0, heapBuf.length);
assertArrayEquals(Bytes.toBytes(s), heapBuf);
}
@Test
public void testReadWithExtra() throws IOException {
FileSystem fs = TEST_UTIL.getTestFileSystem();
Path p = new Path(TEST_UTIL.getDataTestDirOnTestFS(), "testReadWithExtra");
String s = "hello world";
try (FSDataOutputStream out = fs.create(p)) {
out.writeBytes(s);
}
ByteBuff buf = new SingleByteBuff(ByteBuffer.allocate(8));
try (FSDataInputStream in = fs.open(p)) {
assertTrue(BlockIOUtils.readWithExtra(buf, in, 6, 2));
}
buf.rewind();
byte[] heapBuf = new byte[buf.capacity()];
buf.get(heapBuf, 0, heapBuf.length);
assertArrayEquals(Bytes.toBytes("hello wo"), heapBuf);
buf = new MultiByteBuff(ByteBuffer.allocate(4), ByteBuffer.allocate(4), ByteBuffer.allocate(4));
try (FSDataInputStream in = fs.open(p)) {
assertTrue(BlockIOUtils.readWithExtra(buf, in, 8, 3));
}
buf.rewind();
heapBuf = new byte[11];
buf.get(heapBuf, 0, heapBuf.length);
assertArrayEquals(Bytes.toBytes("hello world"), heapBuf);
buf.position(0).limit(12);
try (FSDataInputStream in = fs.open(p)) {
try {
BlockIOUtils.readWithExtra(buf, in, 12, 0);
fail("Should only read 11 bytes");
} catch (IOException e) {
}
}
}
@Test @Test
public void testPositionalReadNoExtra() throws IOException { public void testPositionalReadNoExtra() throws IOException {
long position = 0; long position = 0;
@ -52,10 +134,10 @@ public class TestHFileBlockPositionalRead {
int extraLen = 0; int extraLen = 0;
int totalLen = necessaryLen + extraLen; int totalLen = necessaryLen + extraLen;
byte[] buf = new byte[totalLen]; byte[] buf = new byte[totalLen];
ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen));
FSDataInputStream in = mock(FSDataInputStream.class); FSDataInputStream in = mock(FSDataInputStream.class);
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(totalLen); when(in.read(position, buf, bufOffset, totalLen)).thenReturn(totalLen);
boolean ret = HFileBlock.positionalReadWithExtra(in, position, buf, boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
bufOffset, necessaryLen, extraLen);
assertFalse("Expect false return when no extra bytes requested", ret); assertFalse("Expect false return when no extra bytes requested", ret);
verify(in).read(position, buf, bufOffset, totalLen); verify(in).read(position, buf, bufOffset, totalLen);
verifyNoMoreInteractions(in); verifyNoMoreInteractions(in);
@ -69,11 +151,11 @@ public class TestHFileBlockPositionalRead {
int extraLen = 0; int extraLen = 0;
int totalLen = necessaryLen + extraLen; int totalLen = necessaryLen + extraLen;
byte[] buf = new byte[totalLen]; byte[] buf = new byte[totalLen];
ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen));
FSDataInputStream in = mock(FSDataInputStream.class); FSDataInputStream in = mock(FSDataInputStream.class);
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(5); when(in.read(position, buf, bufOffset, totalLen)).thenReturn(5);
when(in.read(5, buf, 5, 5)).thenReturn(5); when(in.read(5, buf, 5, 5)).thenReturn(5);
boolean ret = HFileBlock.positionalReadWithExtra(in, position, buf, boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
bufOffset, necessaryLen, extraLen);
assertFalse("Expect false return when no extra bytes requested", ret); assertFalse("Expect false return when no extra bytes requested", ret);
verify(in).read(position, buf, bufOffset, totalLen); verify(in).read(position, buf, bufOffset, totalLen);
verify(in).read(5, buf, 5, 5); verify(in).read(5, buf, 5, 5);
@ -88,10 +170,10 @@ public class TestHFileBlockPositionalRead {
int extraLen = 5; int extraLen = 5;
int totalLen = necessaryLen + extraLen; int totalLen = necessaryLen + extraLen;
byte[] buf = new byte[totalLen]; byte[] buf = new byte[totalLen];
ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen));
FSDataInputStream in = mock(FSDataInputStream.class); FSDataInputStream in = mock(FSDataInputStream.class);
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(totalLen); when(in.read(position, buf, bufOffset, totalLen)).thenReturn(totalLen);
boolean ret = HFileBlock.positionalReadWithExtra(in, position, buf, boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
bufOffset, necessaryLen, extraLen);
assertTrue("Expect true return when reading extra bytes succeeds", ret); assertTrue("Expect true return when reading extra bytes succeeds", ret);
verify(in).read(position, buf, bufOffset, totalLen); verify(in).read(position, buf, bufOffset, totalLen);
verifyNoMoreInteractions(in); verifyNoMoreInteractions(in);
@ -105,10 +187,10 @@ public class TestHFileBlockPositionalRead {
int extraLen = 5; int extraLen = 5;
int totalLen = necessaryLen + extraLen; int totalLen = necessaryLen + extraLen;
byte[] buf = new byte[totalLen]; byte[] buf = new byte[totalLen];
ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen));
FSDataInputStream in = mock(FSDataInputStream.class); FSDataInputStream in = mock(FSDataInputStream.class);
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(necessaryLen); when(in.read(position, buf, bufOffset, totalLen)).thenReturn(necessaryLen);
boolean ret = HFileBlock.positionalReadWithExtra(in, position, buf, boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
bufOffset, necessaryLen, extraLen);
assertFalse("Expect false return when reading extra bytes fails", ret); assertFalse("Expect false return when reading extra bytes fails", ret);
verify(in).read(position, buf, bufOffset, totalLen); verify(in).read(position, buf, bufOffset, totalLen);
verifyNoMoreInteractions(in); verifyNoMoreInteractions(in);
@ -123,11 +205,11 @@ public class TestHFileBlockPositionalRead {
int extraLen = 5; int extraLen = 5;
int totalLen = necessaryLen + extraLen; int totalLen = necessaryLen + extraLen;
byte[] buf = new byte[totalLen]; byte[] buf = new byte[totalLen];
ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen));
FSDataInputStream in = mock(FSDataInputStream.class); FSDataInputStream in = mock(FSDataInputStream.class);
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(5); when(in.read(position, buf, bufOffset, totalLen)).thenReturn(5);
when(in.read(5, buf, 5, 10)).thenReturn(10); when(in.read(5, buf, 5, 10)).thenReturn(10);
boolean ret = HFileBlock.positionalReadWithExtra(in, position, buf, boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
bufOffset, necessaryLen, extraLen);
assertTrue("Expect true return when reading extra bytes succeeds", ret); assertTrue("Expect true return when reading extra bytes succeeds", ret);
verify(in).read(position, buf, bufOffset, totalLen); verify(in).read(position, buf, bufOffset, totalLen);
verify(in).read(5, buf, 5, 10); verify(in).read(5, buf, 5, 10);
@ -142,12 +224,12 @@ public class TestHFileBlockPositionalRead {
int extraLen = 0; int extraLen = 0;
int totalLen = necessaryLen + extraLen; int totalLen = necessaryLen + extraLen;
byte[] buf = new byte[totalLen]; byte[] buf = new byte[totalLen];
ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen));
FSDataInputStream in = mock(FSDataInputStream.class); FSDataInputStream in = mock(FSDataInputStream.class);
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(9); when(in.read(position, buf, bufOffset, totalLen)).thenReturn(9);
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(-1); when(in.read(position, buf, bufOffset, totalLen)).thenReturn(-1);
exception.expect(IOException.class); exception.expect(IOException.class);
exception.expectMessage("EOF"); exception.expectMessage("EOF");
HFileBlock.positionalReadWithExtra(in, position, buf, bufOffset, BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
necessaryLen, extraLen);
} }
} }

View File

@ -398,23 +398,25 @@ public class TestChecksum {
return b; return b;
} }
@Override @Override
protected int readAtOffset(FSDataInputStream istream, byte [] dest, int destOffset, int size, protected boolean readAtOffset(FSDataInputStream istream, ByteBuff dest, int size,
boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException { boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException {
int returnValue = super.readAtOffset(istream, dest, destOffset, size, peekIntoNextBlock, int destOffset = dest.position();
fileOffset, pread); boolean returnValue =
super.readAtOffset(istream, dest, size, peekIntoNextBlock, fileOffset, pread);
if (!corruptDataStream) { if (!corruptDataStream) {
return returnValue; return returnValue;
} }
// Corrupt 3rd character of block magic of next block's header. // Corrupt 3rd character of block magic of next block's header.
if (peekIntoNextBlock) { if (peekIntoNextBlock) {
dest[destOffset + size + 3] = 0b00000000; dest.put(destOffset + size + 3, (byte) 0b00000000);
} }
// We might be reading this block's header too, corrupt it. // We might be reading this block's header too, corrupt it.
dest[destOffset + 1] = 0b00000000; dest.put(destOffset + 1, (byte) 0b00000000);
// Corrupt non header data // Corrupt non header data
if (size > hdrSize) { if (size > hdrSize) {
dest[destOffset + hdrSize + 1] = 0b00000000; dest.put(destOffset + hdrSize + 1, (byte) 0b00000000);
} }
return returnValue; return returnValue;
} }