Revert "HBASE-15477 Purge 'next block header' from cached blocks"

Overcommit. Revert to fix.

This reverts commit 000117ad9f.
This commit is contained in:
stack 2016-03-22 18:37:25 -07:00
parent 3f3613a234
commit 54a543de22
23 changed files with 1378 additions and 611 deletions

View File

@ -132,10 +132,6 @@ public enum BlockType {
out.write(magic);
}
public void write(ByteBuffer buf) {
buf.put(magic);
}
public void write(ByteBuff buf) {
buf.put(magic);
}

View File

@ -55,26 +55,6 @@ public class HFileContextBuilder {
private String hfileName = null;
public HFileContextBuilder() {}
/**
* Use this constructor if you want to change a few settings only in another context.
*/
public HFileContextBuilder(final HFileContext hfc) {
this.usesHBaseChecksum = hfc.isUseHBaseChecksum();
this.includesMvcc = hfc.isIncludesMvcc();
this.includesTags = hfc.isIncludesTags();
this.compression = hfc.getCompression();
this.compressTags = hfc.isCompressTags();
this.checksumType = hfc.getChecksumType();
this.bytesPerChecksum = hfc.getBytesPerChecksum();
this.blocksize = hfc.getBlocksize();
this.encoding = hfc.getDataBlockEncoding();
this.cryptoContext = hfc.getEncryptionContext();
this.fileCreateTime = hfc.getFileCreateTime();
this.hfileName = hfc.getHFileName();
}
public HFileContextBuilder withHBaseCheckSum(boolean useHBaseCheckSum) {
this.usesHBaseChecksum = useHBaseCheckSum;
return this;

View File

@ -496,12 +496,6 @@ public abstract class ByteBuff {
return -(low + 1); // key not found.
}
@Override
public String toString() {
return this.getClass().getSimpleName() + "[pos=" + position() + ", lim=" + limit() +
", cap= " + capacity() + "]";
}
public static String toStringBinary(final ByteBuff b, int off, int len) {
StringBuilder result = new StringBuilder();
// Just in case we are passed a 'len' that is > buffer length...

View File

@ -260,7 +260,7 @@ public class MemcachedBlockCache implements BlockCache {
public HFileBlock decode(CachedData d) {
try {
ByteBuff buf = new SingleByteBuff(ByteBuffer.wrap(d.getData()));
return (HFileBlock) HFileBlock.BLOCK_DESERIALIZER.deserialize(buf, true,
return (HFileBlock) HFileBlock.blockDeserializer.deserialize(buf, true,
MemoryType.EXCLUSIVE);
} catch (IOException e) {
LOG.warn("Error deserializing data from memcached",e);

View File

@ -91,7 +91,7 @@ public class ChecksumUtil {
// If this is an older version of the block that does not have
// checksums, then return false indicating that checksum verification
// did not succeed. Actually, this method should never be called
// did not succeed. Actually, this methiod should never be called
// when the minorVersion is 0, thus this is a defensive check for a
// cannot-happen case. Since this is a cannot-happen case, it is
// better to return false to indicate a checksum validation failure.
@ -141,7 +141,8 @@ public class ChecksumUtil {
* @return The number of bytes needed to store the checksum values
*/
static long numBytes(long datasize, int bytesPerChecksum) {
return numChunks(datasize, bytesPerChecksum) * HFileBlock.CHECKSUM_SIZE;
return numChunks(datasize, bytesPerChecksum) *
HFileBlock.CHECKSUM_SIZE;
}
/**

View File

@ -60,7 +60,7 @@ import org.apache.hadoop.util.StringUtils;
* Examples of how to use the block index writer can be found in
* {@link org.apache.hadoop.hbase.io.hfile.CompoundBloomFilterWriter} and
* {@link HFileWriterImpl}. Examples of how to use the reader can be
* found in {@link HFileReaderImpl} and
* found in {@link HFileWriterImpl} and
* {@link org.apache.hadoop.hbase.io.hfile.TestHFileBlockIndex}.
*/
@InterfaceAudience.Private

View File

@ -252,20 +252,18 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
long end = 0;
try {
end = getTrailer().getLoadOnOpenDataOffset();
HFileBlock prevBlock = null;
if (LOG.isTraceEnabled()) {
LOG.trace("File=" + path.toString() + ", offset=" + offset + ", end=" + end);
}
// TODO: Could we use block iterator in here? Would that get stuff into the cache?
HFileBlock prevBlock = null;
while (offset < end) {
if (Thread.interrupted()) {
break;
}
// Perhaps we got our block from cache? Unlikely as this may be, if it happens, then
// the internal-to-hfileblock thread local which holds the overread that gets the
// next header, will not have happened...so, pass in the onDiskSize gotten from the
// cached block. This 'optimization' triggers extremely rarely I'd say.
long onDiskSize = prevBlock != null? prevBlock.getNextBlockOnDiskSize(): -1;
long onDiskSize = -1;
if (prevBlock != null) {
onDiskSize = prevBlock.getNextBlockOnDiskSizeWithHeader();
}
HFileBlock block = readBlock(offset, onDiskSize, true, false, false, false,
null, null);
// Need not update the current block. Ideally here the readBlock won't find the
@ -905,8 +903,9 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
// We are reading the next block without block type validation, because
// it might turn out to be a non-data block.
block = reader.readBlock(block.getOffset() + block.getOnDiskSizeWithHeader(),
block.getNextBlockOnDiskSize(), cacheBlocks, pread,
block = reader.readBlock(block.getOffset()
+ block.getOnDiskSizeWithHeader(),
block.getNextBlockOnDiskSizeWithHeader(), cacheBlocks, pread,
isCompaction, true, null, getEffectiveDataBlockEncoding());
if (block != null && !block.getBlockType().isData()) { // Findbugs: NP_NULL_ON_SOME_PATH
// Whatever block we read we will be returning it unless
@ -1440,8 +1439,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
// Cache Miss, please load.
}
HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset, blockSize, true).
unpack(hfileContext, fsBlockReader);
HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset,
blockSize, -1, true).unpack(hfileContext, fsBlockReader);
// Cache the block
if (cacheBlock) {
@ -1527,8 +1526,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
traceScope.getSpan().addTimelineAnnotation("blockCacheMiss");
}
// Load block from filesystem.
HFileBlock hfileBlock =
fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, pread);
HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, -1,
pread);
validateBlockType(hfileBlock, expectedBlockType);
HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader);
BlockType.BlockCategory category = hfileBlock.getBlockType().getCategory();
@ -1872,7 +1871,6 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
* @return Scanner on this file.
*/
@Override
@VisibleForTesting
public HFileScanner getScanner(boolean cacheBlocks, final boolean pread) {
return getScanner(cacheBlocks, pread, false);
}

View File

@ -99,21 +99,18 @@ public interface HFileScanner extends Shipper, Closeable {
* @throws IOException
*/
boolean seekTo() throws IOException;
/**
* Scans to the next entry in the file.
* @return Returns false if you are at the end otherwise true if more in file.
* @throws IOException
*/
boolean next() throws IOException;
/**
* Gets the current key in the form of a cell. You must call
* {@link #seekTo(Cell)} before this method.
* @return gets the current key as a Cell.
*/
Cell getKey();
/**
* Gets a buffer view to the current value. You must call
* {@link #seekTo(Cell)} before this method.
@ -122,35 +119,26 @@ public interface HFileScanner extends Shipper, Closeable {
* the position is 0, the start of the buffer view.
*/
ByteBuffer getValue();
/**
* @return Instance of {@link org.apache.hadoop.hbase.Cell}.
*/
Cell getCell();
/**
* Convenience method to get a copy of the key as a string - interpreting the
* bytes as UTF8. You must call {@link #seekTo(Cell)} before this method.
* @return key as a string
* @deprecated Since hbase-2.0.0
*/
@Deprecated
String getKeyString();
/**
* Convenience method to get a copy of the value as a string - interpreting
* the bytes as UTF8. You must call {@link #seekTo(Cell)} before this method.
* @return value as a string
* @deprecated Since hbase-2.0.0
*/
@Deprecated
String getValueString();
/**
* @return Reader that underlies this Scanner instance.
*/
HFile.Reader getReader();
/**
* @return True is scanner has had one of the seek calls invoked; i.e.
* {@link #seekBefore(Cell)} or {@link #seekTo()} or {@link #seekTo(Cell)}.

View File

@ -1317,22 +1317,25 @@ public class BucketCache implements BlockCache, HeapSize {
final AtomicLong realCacheSize) throws CacheFullException, IOException,
BucketAllocatorException {
int len = data.getSerializedLength();
// This cacheable thing can't be serialized
// This cacheable thing can't be serialized...
if (len == 0) return null;
long offset = bucketAllocator.allocateBlock(len);
BucketEntry bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory);
bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap);
try {
if (data instanceof HFileBlock) {
// If an instance of HFileBlock, save on some allocations.
HFileBlock block = (HFileBlock)data;
ByteBuff sliceBuf = block.getBufferReadOnly();
ByteBuffer metadata = block.getMetaData();
HFileBlock block = (HFileBlock) data;
ByteBuff sliceBuf = block.getBufferReadOnlyWithHeader();
sliceBuf.rewind();
assert len == sliceBuf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE ||
len == sliceBuf.limit() + block.headerSize() + HFileBlock.EXTRA_SERIALIZATION_SPACE;
ByteBuffer extraInfoBuffer = ByteBuffer.allocate(HFileBlock.EXTRA_SERIALIZATION_SPACE);
block.serializeExtraInfo(extraInfoBuffer);
if (LOG.isTraceEnabled()) {
LOG.trace("Write offset=" + offset + ", len=" + len);
}
ioEngine.write(sliceBuf, offset);
ioEngine.write(metadata, offset + len - metadata.limit());
ioEngine.write(extraInfoBuffer, offset + len - HFileBlock.EXTRA_SERIALIZATION_SPACE);
} else {
ByteBuffer bb = ByteBuffer.allocate(len);
data.serialize(bb);

View File

@ -18,7 +18,6 @@
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.Closeable;
import java.io.IOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -33,7 +32,7 @@ import org.apache.hadoop.hbase.client.Scan;
// TODO: Change name from KeyValueScanner to CellScanner only we already have a simple CellScanner
// so this should be something else altogether, a decoration on our base CellScanner. TODO.
// This class shows in CPs so do it all in one swell swoop. HBase-2.0.0.
public interface KeyValueScanner extends Shipper, Closeable {
public interface KeyValueScanner extends Shipper {
/**
* The byte array represents for NO_NEXT_INDEXED_KEY;
* The actual value is irrelevant because this is always compared by reference.
@ -75,7 +74,6 @@ public interface KeyValueScanner extends Shipper, Closeable {
* The default implementation for this would be to return 0. A file having
* lower sequence id will be considered to be the older one.
*/
// TODO: Implement SequenceId Interface instead.
long getSequenceID();
/**
@ -139,11 +137,11 @@ public interface KeyValueScanner extends Shipper, Closeable {
* peek KeyValue of scanner has the same row with specified Cell,
* otherwise seek the scanner at the first Cell of the row which is the
* previous row of specified KeyValue
*
*
* @param key seek KeyValue
* @return true if the scanner is at the valid KeyValue, false if such
* KeyValue does not exist
*
*
*/
public boolean backwardSeek(Cell key) throws IOException;
@ -158,7 +156,7 @@ public interface KeyValueScanner extends Shipper, Closeable {
/**
* Seek the scanner at the first KeyValue of last row
*
*
* @return true if scanner has values left, false if the underlying data is
* empty
* @throws IOException
@ -171,4 +169,4 @@ public interface KeyValueScanner extends Shipper, Closeable {
* see HFileWriterImpl#getMidpoint, or null if not known.
*/
public Cell getNextIndexedKey();
}
}

View File

@ -1271,7 +1271,7 @@ public class StoreFile {
}
/**
* @deprecated Do not write further code which depends on this call. Instead
* Warning: Do not write further code which depends on this call. Instead
* use getStoreFileScanner() which uses the StoreFileScanner class/interface
* which is the preferred way to scan a store with higher level concepts.
*
@ -1285,7 +1285,7 @@ public class StoreFile {
}
/**
* @deprecated Do not write further code which depends on this call. Instead
* Warning: Do not write further code which depends on this call. Instead
* use getStoreFileScanner() which uses the StoreFileScanner class/interface
* which is the preferred way to scan a store with higher level concepts.
*

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.util.ChecksumType;
public class CacheTestUtils {
@ -65,7 +66,6 @@ public class CacheTestUtils {
/*Post eviction, heapsize should be the same */
assertEquals(heapSize, ((HeapSize) toBeTested).heapSize());
}
public static void testCacheMultiThreaded(final BlockCache toBeTested,
final int blockSize, final int numThreads, final int numQueries,
final double passingScore) throws Exception {
@ -339,16 +339,25 @@ public class CacheTestUtils {
}
private static HFileBlockPair[] generateHFileBlocks(int blockSize, int numBlocks) {
private static HFileBlockPair[] generateHFileBlocks(int blockSize,
int numBlocks) {
HFileBlockPair[] returnedBlocks = new HFileBlockPair[numBlocks];
Random rand = new Random();
HashSet<String> usedStrings = new HashSet<String>();
for (int i = 0; i < numBlocks; i++) {
ByteBuffer cachedBuffer = ByteBuffer.allocate(blockSize);
// The buffer serialized size needs to match the size of BlockSize. So we
// declare our data size to be smaller than it by the serialization space
// required.
SingleByteBuff cachedBuffer = new SingleByteBuff(ByteBuffer.allocate(blockSize
- HFileBlock.EXTRA_SERIALIZATION_SPACE));
rand.nextBytes(cachedBuffer.array());
cachedBuffer.rewind();
int onDiskSizeWithoutHeader = blockSize;
int uncompressedSizeWithoutHeader = blockSize;
int onDiskSizeWithoutHeader = blockSize
- HFileBlock.EXTRA_SERIALIZATION_SPACE;
int uncompressedSizeWithoutHeader = blockSize
- HFileBlock.EXTRA_SERIALIZATION_SPACE;
long prevBlockOffset = rand.nextLong();
BlockType.DATA.write(cachedBuffer);
cachedBuffer.putInt(onDiskSizeWithoutHeader);
@ -367,7 +376,7 @@ public class CacheTestUtils {
onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader,
prevBlockOffset, cachedBuffer, HFileBlock.DONT_FILL_HEADER,
blockSize,
onDiskSizeWithoutHeader + HConstants.HFILEBLOCK_HEADER_SIZE, -1, meta);
onDiskSizeWithoutHeader + HConstants.HFILEBLOCK_HEADER_SIZE, meta);
String strKey;
/* No conflicting keys */
@ -386,4 +395,4 @@ public class CacheTestUtils {
BlockCacheKey blockName;
HFileBlock block;
}
}
}

View File

@ -259,6 +259,7 @@ public class TestCacheOnWrite {
assertTrue(testDescription, scanner.seekTo());
long offset = 0;
HFileBlock prevBlock = null;
EnumMap<BlockType, Integer> blockCountByType =
new EnumMap<BlockType, Integer>(BlockType.class);
@ -266,10 +267,14 @@ public class TestCacheOnWrite {
List<Long> cachedBlocksOffset = new ArrayList<Long>();
Map<Long, HFileBlock> cachedBlocks = new HashMap<Long, HFileBlock>();
while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
long onDiskSize = -1;
if (prevBlock != null) {
onDiskSize = prevBlock.getNextBlockOnDiskSizeWithHeader();
}
// Flags: don't cache the block, use pread, this is not a compaction.
// Also, pass null for expected block type to avoid checking it.
HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null,
encodingInCache);
HFileBlock block = reader.readBlock(offset, onDiskSize, false, true,
false, true, null, encodingInCache);
BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(),
offset);
HFileBlock fromCache = (HFileBlock) blockCache.getBlock(blockCacheKey, true, false, true);
@ -302,6 +307,7 @@ public class TestCacheOnWrite {
assertEquals(
block.getUncompressedSizeWithoutHeader(), fromCache.getUncompressedSizeWithoutHeader());
}
prevBlock = block;
offset += block.getOnDiskSizeWithHeader();
BlockType bt = block.getBlockType();
Integer count = blockCountByType.get(bt);

View File

@ -94,7 +94,7 @@ public class TestChecksum {
meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(
is, totalSize, (HFileSystem) fs, path, meta);
HFileBlock b = hbr.readBlockData(0, -1, false);
HFileBlock b = hbr.readBlockData(0, -1, -1, false);
assertEquals(b.getChecksumType(), ChecksumType.getDefaultChecksumType().getCode());
}
@ -108,14 +108,12 @@ public class TestChecksum {
ChecksumType cktype = itr.next();
Path path = new Path(TEST_UTIL.getDataTestDir(), "checksum" + cktype.getName());
FSDataOutputStream os = fs.create(path);
HFileContext meta = new HFileContextBuilder().
withChecksumType(cktype).
build();
HFileContext meta = new HFileContextBuilder()
.withChecksumType(cktype).build();
HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
DataOutputStream dos = hbw.startWriting(BlockType.DATA);
for (int i = 0; i < 1000; ++i) {
for (int i = 0; i < 1000; ++i)
dos.writeInt(i);
}
hbw.writeHeaderAndData(os);
int totalSize = hbw.getOnDiskSizeWithHeader();
os.close();
@ -127,7 +125,7 @@ public class TestChecksum {
meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(
is, totalSize, (HFileSystem) fs, path, meta);
HFileBlock b = hbr.readBlockData(0, -1, false);
HFileBlock b = hbr.readBlockData(0, -1, -1, false);
ByteBuff data = b.getBufferWithoutHeader();
for (int i = 0; i < 1000; i++) {
assertEquals(i, data.getInt());
@ -190,7 +188,7 @@ public class TestChecksum {
.withHBaseCheckSum(true)
.build();
HFileBlock.FSReader hbr = new FSReaderImplTest(is, totalSize, fs, path, meta);
HFileBlock b = hbr.readBlockData(0, -1, pread);
HFileBlock b = hbr.readBlockData(0, -1, -1, pread);
b.sanityCheck();
assertEquals(4936, b.getUncompressedSizeWithoutHeader());
assertEquals(algo == GZ ? 2173 : 4936,
@ -211,17 +209,17 @@ public class TestChecksum {
// requests. Verify that this is correct.
for (int i = 0; i <
HFileBlock.CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD + 1; i++) {
b = hbr.readBlockData(0, -1, pread);
b = hbr.readBlockData(0, -1, -1, pread);
assertEquals(0, HFile.getChecksumFailuresCount());
}
// The next read should have hbase checksum verification reanabled,
// we verify this by assertng that there was a hbase-checksum failure.
b = hbr.readBlockData(0, -1, pread);
b = hbr.readBlockData(0, -1, -1, pread);
assertEquals(1, HFile.getChecksumFailuresCount());
// Since the above encountered a checksum failure, we switch
// back to not checking hbase checksums.
b = hbr.readBlockData(0, -1, pread);
b = hbr.readBlockData(0, -1, -1, pread);
assertEquals(0, HFile.getChecksumFailuresCount());
is.close();
@ -232,7 +230,7 @@ public class TestChecksum {
assertEquals(false, newfs.useHBaseChecksum());
is = new FSDataInputStreamWrapper(newfs, path);
hbr = new FSReaderImplTest(is, totalSize, newfs, path, meta);
b = hbr.readBlockData(0, -1, pread);
b = hbr.readBlockData(0, -1, -1, pread);
is.close();
b.sanityCheck();
b = b.unpack(meta, hbr);
@ -316,7 +314,7 @@ public class TestChecksum {
.build();
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper(
is, nochecksum), totalSize, hfs, path, meta);
HFileBlock b = hbr.readBlockData(0, -1, pread);
HFileBlock b = hbr.readBlockData(0, -1, -1, pread);
is.close();
b.sanityCheck();
assertEquals(dataSize, b.getUncompressedSizeWithoutHeader());
@ -356,4 +354,5 @@ public class TestChecksum {
return false; // checksum validation failure
}
}
}
}

View File

@ -320,7 +320,7 @@ public class TestHFileBlock {
.withIncludesTags(includesTag)
.withCompression(algo).build();
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta);
HFileBlock b = hbr.readBlockData(0, -1, pread);
HFileBlock b = hbr.readBlockData(0, -1, -1, pread);
is.close();
assertEquals(0, HFile.getChecksumFailuresCount());
@ -334,15 +334,17 @@ public class TestHFileBlock {
is = fs.open(path);
hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta);
b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE +
b.totalChecksumBytes(), pread);
b.totalChecksumBytes(), -1, pread);
assertEquals(expected, b);
int wrongCompressedSize = 2172;
try {
b = hbr.readBlockData(0, wrongCompressedSize
+ HConstants.HFILEBLOCK_HEADER_SIZE, pread);
+ HConstants.HFILEBLOCK_HEADER_SIZE, -1, pread);
fail("Exception expected");
} catch (IOException ex) {
String expectedPrefix = "Passed in onDiskSizeWithHeader=";
String expectedPrefix = "On-disk size without header provided is "
+ wrongCompressedSize + ", but block header contains "
+ b.getOnDiskSizeWithoutHeader() + ".";
assertTrue("Invalid exception message: '" + ex.getMessage()
+ "'.\nMessage is expected to start with: '" + expectedPrefix
+ "'", ex.getMessage().startsWith(expectedPrefix));
@ -422,7 +424,7 @@ public class TestHFileBlock {
HFileBlock blockFromHFile, blockUnpacked;
int pos = 0;
for (int blockId = 0; blockId < numBlocks; ++blockId) {
blockFromHFile = hbr.readBlockData(pos, -1, pread);
blockFromHFile = hbr.readBlockData(pos, -1, -1, pread);
assertEquals(0, HFile.getChecksumFailuresCount());
blockFromHFile.sanityCheck();
pos += blockFromHFile.getOnDiskSizeWithHeader();
@ -558,7 +560,7 @@ public class TestHFileBlock {
if (detailedLogging) {
LOG.info("Reading block #" + i + " at offset " + curOffset);
}
HFileBlock b = hbr.readBlockData(curOffset, -1, pread);
HFileBlock b = hbr.readBlockData(curOffset, -1, -1, pread);
if (detailedLogging) {
LOG.info("Block #" + i + ": " + b);
}
@ -572,7 +574,8 @@ public class TestHFileBlock {
// Now re-load this block knowing the on-disk size. This tests a
// different branch in the loader.
HFileBlock b2 = hbr.readBlockData(curOffset, b.getOnDiskSizeWithHeader(), pread);
HFileBlock b2 = hbr.readBlockData(curOffset,
b.getOnDiskSizeWithHeader(), -1, pread);
b2.sanityCheck();
assertEquals(b.getBlockType(), b2.getBlockType());
@ -598,7 +601,7 @@ public class TestHFileBlock {
b = b.unpack(meta, hbr);
// b's buffer has header + data + checksum while
// expectedContents have header + data only
ByteBuff bufRead = b.getBufferReadOnly();
ByteBuff bufRead = b.getBufferWithHeader();
ByteBuffer bufExpected = expectedContents.get(i);
boolean bytesAreCorrect = Bytes.compareTo(bufRead.array(),
bufRead.arrayOffset(),
@ -681,7 +684,7 @@ public class TestHFileBlock {
HFileBlock b;
try {
long onDiskSizeArg = withOnDiskSize ? expectedSize : -1;
b = hbr.readBlockData(offset, onDiskSizeArg, pread);
b = hbr.readBlockData(offset, onDiskSizeArg, -1, pread);
} catch (IOException ex) {
LOG.error("Error in client " + clientId + " trying to read block at "
+ offset + ", pread=" + pread + ", withOnDiskSize=" +
@ -716,7 +719,8 @@ public class TestHFileBlock {
protected void testConcurrentReadingInternals() throws IOException,
InterruptedException, ExecutionException {
for (Compression.Algorithm compressAlgo : COMPRESSION_ALGORITHMS) {
Path path = new Path(TEST_UTIL.getDataTestDir(), "concurrent_reading");
Path path =
new Path(TEST_UTIL.getDataTestDir(), "concurrent_reading");
Random rand = defaultRandom();
List<Long> offsets = new ArrayList<Long>();
List<BlockType> types = new ArrayList<BlockType>();
@ -839,7 +843,8 @@ public class TestHFileBlock {
.withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
.withChecksumType(ChecksumType.NULL).build();
HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf,
HFileBlock.FILL_HEADER, -1, 0, -1, meta);
HFileBlock.FILL_HEADER, -1,
0, meta);
long byteBufferExpectedSize = ClassSize.align(ClassSize.estimateBase(
new MultiByteBuff(buf).getClass(), true)
+ HConstants.HFILEBLOCK_HEADER_SIZE + size);

View File

@ -0,0 +1,750 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ;
import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
import static org.junit.Assert.*;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.compress.Compressor;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import com.google.common.base.Preconditions;
/**
* This class has unit tests to prove that older versions of
* HFiles (without checksums) are compatible with current readers.
*/
@Category({IOTests.class, SmallTests.class})
@RunWith(Parameterized.class)
public class TestHFileBlockCompatibility {
private static final Log LOG = LogFactory.getLog(TestHFileBlockCompatibility.class);
private static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = {
NONE, GZ };
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private HFileSystem fs;
private final boolean includesMemstoreTS;
private final boolean includesTag;
public TestHFileBlockCompatibility(boolean includesMemstoreTS, boolean includesTag) {
this.includesMemstoreTS = includesMemstoreTS;
this.includesTag = includesTag;
}
@Parameters
public static Collection<Object[]> parameters() {
return HBaseTestingUtility.MEMSTORETS_TAGS_PARAMETRIZED;
}
@Before
public void setUp() throws IOException {
fs = (HFileSystem)HFileSystem.get(TEST_UTIL.getConfiguration());
}
public byte[] createTestV1Block(Compression.Algorithm algo)
throws IOException {
Compressor compressor = algo.getCompressor();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
OutputStream os = algo.createCompressionStream(baos, compressor, 0);
DataOutputStream dos = new DataOutputStream(os);
BlockType.META.write(dos); // Let's make this a meta block.
TestHFileBlock.writeTestBlockContents(dos);
dos.flush();
algo.returnCompressor(compressor);
return baos.toByteArray();
}
private Writer createTestV2Block(Compression.Algorithm algo)
throws IOException {
final BlockType blockType = BlockType.DATA;
Writer hbw = new Writer(algo, null,
includesMemstoreTS, includesTag);
DataOutputStream dos = hbw.startWriting(blockType);
TestHFileBlock.writeTestBlockContents(dos);
// make sure the block is ready by calling hbw.getHeaderAndData()
hbw.getHeaderAndData();
assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader());
hbw.releaseCompressor();
return hbw;
}
private String createTestBlockStr(Compression.Algorithm algo,
int correctLength) throws IOException {
Writer hbw = createTestV2Block(algo);
byte[] testV2Block = hbw.getHeaderAndData();
int osOffset = HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM + 9;
if (testV2Block.length == correctLength) {
// Force-set the "OS" field of the gzip header to 3 (Unix) to avoid
// variations across operating systems.
// See http://www.gzip.org/zlib/rfc-gzip.html for gzip format.
testV2Block[osOffset] = 3;
}
return Bytes.toStringBinary(testV2Block);
}
@Test
public void testNoCompression() throws IOException {
assertEquals(4000, createTestV2Block(NONE).getBlockForCaching().
getUncompressedSizeWithoutHeader());
}
@Test
public void testGzipCompression() throws IOException {
final String correctTestBlockStr =
"DATABLK*\\x00\\x00\\x00:\\x00\\x00\\x0F\\xA0\\xFF\\xFF\\xFF\\xFF"
+ "\\xFF\\xFF\\xFF\\xFF"
// gzip-compressed block: http://www.gzip.org/zlib/rfc-gzip.html
+ "\\x1F\\x8B" // gzip magic signature
+ "\\x08" // Compression method: 8 = "deflate"
+ "\\x00" // Flags
+ "\\x00\\x00\\x00\\x00" // mtime
+ "\\x00" // XFL (extra flags)
// OS (0 = FAT filesystems, 3 = Unix). However, this field
// sometimes gets set to 0 on Linux and Mac, so we reset it to 3.
+ "\\x03"
+ "\\xED\\xC3\\xC1\\x11\\x00 \\x08\\xC00DD\\xDD\\x7Fa"
+ "\\xD6\\xE8\\xA3\\xB9K\\x84`\\x96Q\\xD3\\xA8\\xDB\\xA8e\\xD4c"
+ "\\xD46\\xEA5\\xEA3\\xEA7\\xE7\\x00LI\\x5Cs\\xA0\\x0F\\x00\\x00";
final int correctGzipBlockLength = 82;
String returnedStr = createTestBlockStr(GZ, correctGzipBlockLength);
assertEquals(correctTestBlockStr, returnedStr);
}
@Test
public void testReaderV2() throws IOException {
if(includesTag) {
TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
}
for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
for (boolean pread : new boolean[] { false, true }) {
LOG.info("testReaderV2: Compression algorithm: " + algo +
", pread=" + pread);
Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
+ algo);
FSDataOutputStream os = fs.create(path);
Writer hbw = new Writer(algo, null,
includesMemstoreTS, includesTag);
long totalSize = 0;
for (int blockId = 0; blockId < 2; ++blockId) {
DataOutputStream dos = hbw.startWriting(BlockType.DATA);
for (int i = 0; i < 1234; ++i)
dos.writeInt(i);
hbw.writeHeaderAndData(os);
totalSize += hbw.getOnDiskSizeWithHeader();
}
os.close();
FSDataInputStream is = fs.open(path);
HFileContext meta = new HFileContextBuilder()
.withHBaseCheckSum(false)
.withIncludesMvcc(includesMemstoreTS)
.withIncludesTags(includesTag)
.withCompression(algo)
.build();
HFileBlock.FSReader hbr =
new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper(is), totalSize, fs, path, meta);
HFileBlock b = hbr.readBlockData(0, -1, -1, pread);
is.close();
b.sanityCheck();
assertEquals(4936, b.getUncompressedSizeWithoutHeader());
assertEquals(algo == GZ ? 2173 : 4936,
b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
HFileBlock expected = b;
if (algo == GZ) {
is = fs.open(path);
hbr = new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper(is), totalSize, fs, path,
meta);
b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM +
b.totalChecksumBytes(), -1, pread);
assertEquals(expected, b);
int wrongCompressedSize = 2172;
try {
b = hbr.readBlockData(0, wrongCompressedSize
+ HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM, -1, pread);
fail("Exception expected");
} catch (IOException ex) {
String expectedPrefix = "On-disk size without header provided is "
+ wrongCompressedSize + ", but block header contains "
+ b.getOnDiskSizeWithoutHeader() + ".";
assertTrue("Invalid exception message: '" + ex.getMessage()
+ "'.\nMessage is expected to start with: '" + expectedPrefix
+ "'", ex.getMessage().startsWith(expectedPrefix));
}
is.close();
}
}
}
}
/**
* Test encoding/decoding data blocks.
* @throws IOException a bug or a problem with temporary files.
*/
@Test
public void testDataBlockEncoding() throws IOException {
if(includesTag) {
TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
}
final int numBlocks = 5;
for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
for (boolean pread : new boolean[] { false, true }) {
for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
LOG.info("testDataBlockEncoding algo " + algo +
" pread = " + pread +
" encoding " + encoding);
Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
+ algo + "_" + encoding.toString());
FSDataOutputStream os = fs.create(path);
HFileDataBlockEncoder dataBlockEncoder = (encoding != DataBlockEncoding.NONE) ?
new HFileDataBlockEncoderImpl(encoding) : NoOpDataBlockEncoder.INSTANCE;
TestHFileBlockCompatibility.Writer hbw =
new TestHFileBlockCompatibility.Writer(algo,
dataBlockEncoder, includesMemstoreTS, includesTag);
long totalSize = 0;
final List<Integer> encodedSizes = new ArrayList<Integer>();
final List<ByteBuffer> encodedBlocks = new ArrayList<ByteBuffer>();
for (int blockId = 0; blockId < numBlocks; ++blockId) {
hbw.startWriting(BlockType.DATA);
TestHFileBlock.writeTestKeyValues(hbw, blockId, pread, includesTag);
hbw.writeHeaderAndData(os);
int headerLen = HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
byte[] encodedResultWithHeader = hbw.getUncompressedDataWithHeader();
final int encodedSize = encodedResultWithHeader.length - headerLen;
if (encoding != DataBlockEncoding.NONE) {
// We need to account for the two-byte encoding algorithm ID that
// comes after the 24-byte block header but before encoded KVs.
headerLen += DataBlockEncoding.ID_SIZE;
}
byte[] encodedDataSection =
new byte[encodedResultWithHeader.length - headerLen];
System.arraycopy(encodedResultWithHeader, headerLen,
encodedDataSection, 0, encodedDataSection.length);
final ByteBuffer encodedBuf =
ByteBuffer.wrap(encodedDataSection);
encodedSizes.add(encodedSize);
encodedBlocks.add(encodedBuf);
totalSize += hbw.getOnDiskSizeWithHeader();
}
os.close();
FSDataInputStream is = fs.open(path);
HFileContext meta = new HFileContextBuilder()
.withHBaseCheckSum(false)
.withIncludesMvcc(includesMemstoreTS)
.withIncludesTags(includesTag)
.withCompression(algo)
.build();
HFileBlock.FSReaderImpl hbr = new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper(is),
totalSize, fs, path, meta);
hbr.setDataBlockEncoder(dataBlockEncoder);
hbr.setIncludesMemstoreTS(includesMemstoreTS);
HFileBlock b;
int pos = 0;
for (int blockId = 0; blockId < numBlocks; ++blockId) {
b = hbr.readBlockData(pos, -1, -1, pread);
b.sanityCheck();
if (meta.isCompressedOrEncrypted()) {
assertFalse(b.isUnpacked());
b = b.unpack(meta, hbr);
}
pos += b.getOnDiskSizeWithHeader();
assertEquals((int) encodedSizes.get(blockId),
b.getUncompressedSizeWithoutHeader());
ByteBuff actualBuffer = b.getBufferWithoutHeader();
if (encoding != DataBlockEncoding.NONE) {
// We expect a two-byte big-endian encoding id.
assertEquals(0, actualBuffer.get(0));
assertEquals(encoding.getId(), actualBuffer.get(1));
actualBuffer.position(2);
actualBuffer = actualBuffer.slice();
}
ByteBuffer expectedBuffer = encodedBlocks.get(blockId);
expectedBuffer.rewind();
// test if content matches, produce nice message
TestHFileBlock.assertBuffersEqual(new SingleByteBuff(expectedBuffer), actualBuffer,
algo, encoding, pread);
}
is.close();
}
}
}
}
/**
* This is the version of the HFileBlock.Writer that is used to
* create V2 blocks with minor version 0. These blocks do not
* have hbase-level checksums. The code is here to test
* backward compatibility. The reason we do not inherit from
* HFileBlock.Writer is because we never ever want to change the code
* in this class but the code in HFileBlock.Writer will continually
* evolve.
*/
public static final class Writer extends HFileBlock.Writer {
// These constants are as they were in minorVersion 0.
private static final int HEADER_SIZE = HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
private static final boolean DONT_FILL_HEADER = HFileBlock.DONT_FILL_HEADER;
private static final byte[] DUMMY_HEADER = HFileBlock.DUMMY_HEADER_NO_CHECKSUM;
private enum State {
INIT,
WRITING,
BLOCK_READY
};
/** Writer state. Used to ensure the correct usage protocol. */
private State state = State.INIT;
/** Compression algorithm for all blocks this instance writes. */
private final Compression.Algorithm compressAlgo;
/** Data block encoder used for data blocks */
private final HFileDataBlockEncoder dataBlockEncoder;
private HFileBlockEncodingContext dataBlockEncodingCtx;
/** block encoding context for non-data blocks */
private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx;
/**
* The stream we use to accumulate data in uncompressed format for each
* block. We reset this stream at the end of each block and reuse it. The
* header is written as the first {@link #HEADER_SIZE} bytes into this
* stream.
*/
private ByteArrayOutputStream baosInMemory;
/** Compressor, which is also reused between consecutive blocks. */
private Compressor compressor;
/**
* Current block type. Set in {@link #startWriting(BlockType)}. Could be
* changed in {@link #encodeDataBlockForDisk()} from {@link BlockType#DATA}
* to {@link BlockType#ENCODED_DATA}.
*/
private BlockType blockType;
/**
* A stream that we write uncompressed bytes to, which compresses them and
* writes them to {@link #baosInMemory}.
*/
private DataOutputStream userDataStream;
/**
* Bytes to be written to the file system, including the header. Compressed
* if compression is turned on.
*/
private byte[] onDiskBytesWithHeader;
/**
* Valid in the READY state. Contains the header and the uncompressed (but
* potentially encoded, if this is a data block) bytes, so the length is
* {@link #uncompressedSizeWithoutHeader} + {@link org.apache.hadoop.hbase.HConstants#HFILEBLOCK_HEADER_SIZE}.
*/
private byte[] uncompressedBytesWithHeader;
/**
* Current block's start offset in the {@link HFile}. Set in
* {@link #writeHeaderAndData(FSDataOutputStream)}.
*/
private long startOffset;
/**
* Offset of previous block by block type. Updated when the next block is
* started.
*/
private long[] prevOffsetByType;
/** The offset of the previous block of the same type */
private long prevOffset;
private int unencodedDataSizeWritten;
public Writer(Compression.Algorithm compressionAlgorithm,
HFileDataBlockEncoder dataBlockEncoder, boolean includesMemstoreTS, boolean includesTag) {
this(dataBlockEncoder, new HFileContextBuilder().withHBaseCheckSum(false)
.withIncludesMvcc(includesMemstoreTS).withIncludesTags(includesTag)
.withCompression(compressionAlgorithm).build());
}
public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext meta) {
super(dataBlockEncoder, meta);
compressAlgo = meta.getCompression() == null ? NONE : meta.getCompression();
this.dataBlockEncoder = dataBlockEncoder != null ? dataBlockEncoder
: NoOpDataBlockEncoder.INSTANCE;
defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null, DUMMY_HEADER, meta);
dataBlockEncodingCtx = this.dataBlockEncoder.newDataBlockEncodingContext(DUMMY_HEADER, meta);
baosInMemory = new ByteArrayOutputStream();
prevOffsetByType = new long[BlockType.values().length];
for (int i = 0; i < prevOffsetByType.length; ++i)
prevOffsetByType[i] = -1;
}
/**
* Starts writing into the block. The previous block's data is discarded.
*
* @return the stream the user can write their data into
* @throws IOException
*/
public DataOutputStream startWriting(BlockType newBlockType)
throws IOException {
if (state == State.BLOCK_READY && startOffset != -1) {
// We had a previous block that was written to a stream at a specific
// offset. Save that offset as the last offset of a block of that type.
prevOffsetByType[blockType.getId()] = startOffset;
}
startOffset = -1;
blockType = newBlockType;
baosInMemory.reset();
baosInMemory.write(DUMMY_HEADER);
state = State.WRITING;
// We will compress it later in finishBlock()
userDataStream = new DataOutputStream(baosInMemory);
if (newBlockType == BlockType.DATA) {
this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream);
}
this.unencodedDataSizeWritten = 0;
return userDataStream;
}
@Override
public void write(Cell c) throws IOException {
KeyValue kv = KeyValueUtil.ensureKeyValue(c);
expectState(State.WRITING);
this.dataBlockEncoder.encode(kv, dataBlockEncodingCtx, this.userDataStream);
this.unencodedDataSizeWritten += kv.getLength();
if (dataBlockEncodingCtx.getHFileContext().isIncludesMvcc()) {
this.unencodedDataSizeWritten += WritableUtils.getVIntSize(kv.getSequenceId());
}
}
/**
* Returns the stream for the user to write to. The block writer takes care
* of handling compression and buffering for caching on write. Can only be
* called in the "writing" state.
*
* @return the data output stream for the user to write to
*/
DataOutputStream getUserDataStream() {
expectState(State.WRITING);
return userDataStream;
}
/**
* Transitions the block writer from the "writing" state to the "block
* ready" state. Does nothing if a block is already finished.
*/
void ensureBlockReady() throws IOException {
Preconditions.checkState(state != State.INIT,
"Unexpected state: " + state);
if (state == State.BLOCK_READY)
return;
// This will set state to BLOCK_READY.
finishBlock();
}
/**
* An internal method that flushes the compressing stream (if using
* compression), serializes the header, and takes care of the separate
* uncompressed stream for caching on write, if applicable. Sets block
* write state to "block ready".
*/
void finishBlock() throws IOException {
if (blockType == BlockType.DATA) {
this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream,
baosInMemory.toByteArray(), blockType);
blockType = dataBlockEncodingCtx.getBlockType();
}
userDataStream.flush();
// This does an array copy, so it is safe to cache this byte array.
uncompressedBytesWithHeader = baosInMemory.toByteArray();
prevOffset = prevOffsetByType[blockType.getId()];
// We need to set state before we can package the block up for
// cache-on-write. In a way, the block is ready, but not yet encoded or
// compressed.
state = State.BLOCK_READY;
if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) {
onDiskBytesWithHeader = dataBlockEncodingCtx
.compressAndEncrypt(uncompressedBytesWithHeader);
} else {
onDiskBytesWithHeader = defaultBlockEncodingCtx
.compressAndEncrypt(uncompressedBytesWithHeader);
}
// put the header for on disk bytes
putHeader(onDiskBytesWithHeader, 0,
onDiskBytesWithHeader.length,
uncompressedBytesWithHeader.length);
//set the header for the uncompressed bytes (for cache-on-write)
putHeader(uncompressedBytesWithHeader, 0,
onDiskBytesWithHeader.length,
uncompressedBytesWithHeader.length);
}
/**
* Put the header into the given byte array at the given offset.
* @param onDiskSize size of the block on disk
* @param uncompressedSize size of the block after decompression (but
* before optional data block decoding)
*/
private void putHeader(byte[] dest, int offset, int onDiskSize,
int uncompressedSize) {
offset = blockType.put(dest, offset);
offset = Bytes.putInt(dest, offset, onDiskSize - HEADER_SIZE);
offset = Bytes.putInt(dest, offset, uncompressedSize - HEADER_SIZE);
Bytes.putLong(dest, offset, prevOffset);
}
/**
* Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records
* the offset of this block so that it can be referenced in the next block
* of the same type.
*
* @param out
* @throws IOException
*/
public void writeHeaderAndData(FSDataOutputStream out) throws IOException {
long offset = out.getPos();
if (startOffset != -1 && offset != startOffset) {
throw new IOException("A " + blockType + " block written to a "
+ "stream twice, first at offset " + startOffset + ", then at "
+ offset);
}
startOffset = offset;
writeHeaderAndData((DataOutputStream) out);
}
/**
* Writes the header and the compressed data of this block (or uncompressed
* data when not using compression) into the given stream. Can be called in
* the "writing" state or in the "block ready" state. If called in the
* "writing" state, transitions the writer to the "block ready" state.
*
* @param out the output stream to write the
* @throws IOException
*/
private void writeHeaderAndData(DataOutputStream out) throws IOException {
ensureBlockReady();
out.write(onDiskBytesWithHeader);
}
/**
* Returns the header or the compressed data (or uncompressed data when not
* using compression) as a byte array. Can be called in the "writing" state
* or in the "block ready" state. If called in the "writing" state,
* transitions the writer to the "block ready" state.
*
* @return header and data as they would be stored on disk in a byte array
* @throws IOException
*/
public byte[] getHeaderAndData() throws IOException {
ensureBlockReady();
return onDiskBytesWithHeader;
}
/**
* Releases the compressor this writer uses to compress blocks into the
* compressor pool. Needs to be called before the writer is discarded.
*/
public void releaseCompressor() {
if (compressor != null) {
compressAlgo.returnCompressor(compressor);
compressor = null;
}
}
/**
* Returns the on-disk size of the data portion of the block. This is the
* compressed size if compression is enabled. Can only be called in the
* "block ready" state. Header is not compressed, and its size is not
* included in the return value.
*
* @return the on-disk size of the block, not including the header.
*/
public int getOnDiskSizeWithoutHeader() {
expectState(State.BLOCK_READY);
return onDiskBytesWithHeader.length - HEADER_SIZE;
}
/**
* Returns the on-disk size of the block. Can only be called in the
* "block ready" state.
*
* @return the on-disk size of the block ready to be written, including the
* header size
*/
public int getOnDiskSizeWithHeader() {
expectState(State.BLOCK_READY);
return onDiskBytesWithHeader.length;
}
/**
* The uncompressed size of the block data. Does not include header size.
*/
public int getUncompressedSizeWithoutHeader() {
expectState(State.BLOCK_READY);
return uncompressedBytesWithHeader.length - HEADER_SIZE;
}
/**
* The uncompressed size of the block data, including header size.
*/
public int getUncompressedSizeWithHeader() {
expectState(State.BLOCK_READY);
return uncompressedBytesWithHeader.length;
}
/** @return true if a block is being written */
public boolean isWriting() {
return state == State.WRITING;
}
/**
* Returns the number of bytes written into the current block so far, or
* zero if not writing the block at the moment. Note that this will return
* zero in the "block ready" state as well.
*
* @return the number of bytes written
*/
public int blockSizeWritten() {
if (state != State.WRITING)
return 0;
return this.unencodedDataSizeWritten;
}
/**
* Returns the header followed by the uncompressed data, even if using
* compression. This is needed for storing uncompressed blocks in the block
* cache. Can be called in the "writing" state or the "block ready" state.
*
* @return uncompressed block bytes for caching on write
*/
private byte[] getUncompressedDataWithHeader() {
expectState(State.BLOCK_READY);
return uncompressedBytesWithHeader;
}
private void expectState(State expectedState) {
if (state != expectedState) {
throw new IllegalStateException("Expected state: " + expectedState +
", actual state: " + state);
}
}
/**
* Similar to {@link #getUncompressedBufferWithHeader()} but returns a byte
* buffer.
*
* @return uncompressed block for caching on write in the form of a buffer
*/
public ByteBuffer getUncompressedBufferWithHeader() {
byte[] b = getUncompressedDataWithHeader();
return ByteBuffer.wrap(b, 0, b.length);
}
/**
* Takes the given {@link BlockWritable} instance, creates a new block of
* its appropriate type, writes the writable into this block, and flushes
* the block into the output stream. The writer is instructed not to buffer
* uncompressed bytes for cache-on-write.
*
* @param bw the block-writable object to write as a block
* @param out the file system output stream
* @throws IOException
*/
public void writeBlock(BlockWritable bw, FSDataOutputStream out)
throws IOException {
bw.writeToBlock(startWriting(bw.getBlockType()));
writeHeaderAndData(out);
}
/**
* Creates a new HFileBlock.
*/
public HFileBlock getBlockForCaching() {
HFileContext meta = new HFileContextBuilder()
.withHBaseCheckSum(false)
.withChecksumType(ChecksumType.NULL)
.withBytesPerCheckSum(0)
.build();
return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
getUncompressedSizeWithoutHeader(), prevOffset,
getUncompressedBufferWithHeader(), DONT_FILL_HEADER, startOffset,
getOnDiskSizeWithoutHeader(), meta);
}
}
}

View File

@ -185,7 +185,8 @@ public class TestHFileBlockIndex {
}
missCount += 1;
prevBlock = realReader.readBlockData(offset, onDiskSize, pread);
prevBlock = realReader.readBlockData(offset, onDiskSize,
-1, pread);
prevOffset = offset;
prevOnDiskSize = onDiskSize;
prevPread = pread;

View File

@ -92,7 +92,8 @@ public class TestHFileDataBlockEncoder {
if (blockEncoder.getDataBlockEncoding() ==
DataBlockEncoding.NONE) {
assertEquals(block.getBufferReadOnly(), returnedBlock.getBufferReadOnly());
assertEquals(block.getBufferWithHeader(),
returnedBlock.getBufferWithHeader());
} else {
if (BlockType.ENCODED_DATA != returnedBlock.getBlockType()) {
System.out.println(blockEncoder);
@ -126,7 +127,7 @@ public class TestHFileDataBlockEncoder {
.build();
HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf,
HFileBlock.FILL_HEADER, 0,
0, -1, hfileContext);
0, hfileContext);
HFileBlock cacheBlock = createBlockOnDisk(kvs, block, useTags);
assertEquals(headerSize, cacheBlock.getDummyHeaderForVersion().length);
}
@ -197,7 +198,7 @@ public class TestHFileDataBlockEncoder {
.build();
HFileBlock b = new HFileBlock(BlockType.DATA, size, size, -1, buf,
HFileBlock.FILL_HEADER, 0,
0, -1, meta);
0, meta);
return b;
}
@ -219,8 +220,7 @@ public class TestHFileDataBlockEncoder {
byte[] encodedBytes = baos.toByteArray();
size = encodedBytes.length - block.getDummyHeaderForVersion().length;
return new HFileBlock(context.getBlockType(), size, size, -1, ByteBuffer.wrap(encodedBytes),
HFileBlock.FILL_HEADER, 0, block.getOnDiskDataSizeWithHeader(), -1,
block.getHFileContext());
HFileBlock.FILL_HEADER, 0, block.getOnDiskDataSizeWithHeader(), block.getHFileContext());
}
private void writeBlock(List<Cell> kvs, HFileContext fileContext, boolean useTags)

View File

@ -99,7 +99,7 @@ public class TestHFileEncryption {
private long readAndVerifyBlock(long pos, HFileContext ctx, HFileBlock.FSReaderImpl hbr, int size)
throws IOException {
HFileBlock b = hbr.readBlockData(pos, -1, false);
HFileBlock b = hbr.readBlockData(pos, -1, -1, false);
assertEquals(0, HFile.getChecksumFailuresCount());
b.sanityCheck();
assertFalse(b.isUnpacked());

View File

@ -218,7 +218,7 @@ public class TestHFileWriterV3 {
fsdis.seek(0);
long curBlockPos = 0;
while (curBlockPos <= trailer.getLastDataBlockOffset()) {
HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false)
HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false)
.unpack(context, blockReader);
assertEquals(BlockType.DATA, block.getBlockType());
ByteBuff buf = block.getBufferWithoutHeader();
@ -279,14 +279,13 @@ public class TestHFileWriterV3 {
while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) {
LOG.info("Current offset: " + fsdis.getPos() + ", scanning until " +
trailer.getLoadOnOpenDataOffset());
HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false)
HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false)
.unpack(context, blockReader);
assertEquals(BlockType.META, block.getBlockType());
Text t = new Text();
ByteBuff buf = block.getBufferWithoutHeader();
if (Writables.getWritable(buf.array(), buf.arrayOffset(), buf.limit(), t) == null) {
throw new IOException("Failed to deserialize block " + this +
" into a " + t.getClass().getSimpleName());
throw new IOException("Failed to deserialize block " + this + " into a " + t.getClass().getSimpleName());
}
Text expectedText =
(metaCounter == 0 ? new Text("Paris") : metaCounter == 1 ? new Text(

View File

@ -78,8 +78,14 @@ public class TestPrefetch {
// Check that all of the data blocks were preloaded
BlockCache blockCache = cacheConf.getBlockCache();
long offset = 0;
HFileBlock prevBlock = null;
while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null);
long onDiskSize = -1;
if (prevBlock != null) {
onDiskSize = prevBlock.getNextBlockOnDiskSizeWithHeader();
}
HFileBlock block = reader.readBlock(offset, onDiskSize, false, true, false, true, null,
null);
BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
boolean isCached = blockCache.getBlock(blockCacheKey, true, false, true) != null;
if (block.getBlockType() == BlockType.DATA ||
@ -87,6 +93,7 @@ public class TestPrefetch {
block.getBlockType() == BlockType.INTERMEDIATE_INDEX) {
assertTrue(isCached);
}
prevBlock = block;
offset += block.getOnDiskSizeWithHeader();
}
}

View File

@ -227,10 +227,15 @@ public class TestCacheOnWriteInSchema {
assertTrue(testDescription, scanner.seekTo());
// Cribbed from io.hfile.TestCacheOnWrite
long offset = 0;
HFileBlock prevBlock = null;
while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
long onDiskSize = -1;
if (prevBlock != null) {
onDiskSize = prevBlock.getNextBlockOnDiskSizeWithHeader();
}
// Flags: don't cache the block, use pread, this is not a compaction.
// Also, pass null for expected block type to avoid checking it.
HFileBlock block = reader.readBlock(offset, -1, false, true,
HFileBlock block = reader.readBlock(offset, onDiskSize, false, true,
false, true, null, DataBlockEncoding.NONE);
BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(),
offset);
@ -244,6 +249,7 @@ public class TestCacheOnWriteInSchema {
"block: " + block + "\n" +
"blockCacheKey: " + blockCacheKey);
}
prevBlock = block;
offset += block.getOnDiskSizeWithHeader();
}
} finally {