HBASE-15477 Purge 'next block header' from cached blocks
When we read from HDFS, we overread to pick up the next blocks header. Doing this saves a seek as we move through the hfile; we save having to do an explicit seek just to read the block header every time we need to read the body. We used to read in the next header as part of the current blocks buffer. This buffer was then what got persisted to blockcache; so we were over-persisting: our block plus the next blocks' header (33 bytes). This patch undoes this over-persisting. Removes support for version 1 blocks (0.2 was added in hbase-0.92.0). Not needed any more. There is an open question on whether checksums should be persisted when caching. The code seems to say no but if cache is SSD backed or backed by anything that does not do error correction, we'll want checksums. Adds loads of documentation. M hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java (write) Add writing from a ByteBuff. M hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java (toString) Add one so ByteBuff looks like ByteBuffer when you click on it in IDE M hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java Remove support for version 1 blocks. Cleaned up handling of metadata added when we serialize a block to caches. Metadata is smaller now. When we serialize (used when caching), do not persist the next blocks header if present. Removed a bunch of methods, a few of which had overlapping functionality and others that exposed too much of our internals. Also removed a bunch of constructors and unified the constructors we had left over making them share a common init method. Shutdown access to defines that should only be used internally here. Renamed all to do w/ 'EXTRA' and 'extraSerialization' to instead talk about metadata saved to caches; was unclear previously what EXTRA was about. Renamed static final declarations as all uppercase. (readBlockDataInternal): Redid. Couldn't make sense of it previously. Undid heavy-duty parse of header by constructing HFileBlock. Other cleanups. Its 1/3rd the length it used to be. More to do in here.
This commit is contained in:
parent
ef94b55239
commit
000117ad9f
|
@ -132,6 +132,10 @@ public enum BlockType {
|
||||||
out.write(magic);
|
out.write(magic);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void write(ByteBuffer buf) {
|
||||||
|
buf.put(magic);
|
||||||
|
}
|
||||||
|
|
||||||
public void write(ByteBuff buf) {
|
public void write(ByteBuff buf) {
|
||||||
buf.put(magic);
|
buf.put(magic);
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,6 +55,26 @@ public class HFileContextBuilder {
|
||||||
|
|
||||||
private String hfileName = null;
|
private String hfileName = null;
|
||||||
|
|
||||||
|
public HFileContextBuilder() {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Use this constructor if you want to change a few settings only in another context.
|
||||||
|
*/
|
||||||
|
public HFileContextBuilder(final HFileContext hfc) {
|
||||||
|
this.usesHBaseChecksum = hfc.isUseHBaseChecksum();
|
||||||
|
this.includesMvcc = hfc.isIncludesMvcc();
|
||||||
|
this.includesTags = hfc.isIncludesTags();
|
||||||
|
this.compression = hfc.getCompression();
|
||||||
|
this.compressTags = hfc.isCompressTags();
|
||||||
|
this.checksumType = hfc.getChecksumType();
|
||||||
|
this.bytesPerChecksum = hfc.getBytesPerChecksum();
|
||||||
|
this.blocksize = hfc.getBlocksize();
|
||||||
|
this.encoding = hfc.getDataBlockEncoding();
|
||||||
|
this.cryptoContext = hfc.getEncryptionContext();
|
||||||
|
this.fileCreateTime = hfc.getFileCreateTime();
|
||||||
|
this.hfileName = hfc.getHFileName();
|
||||||
|
}
|
||||||
|
|
||||||
public HFileContextBuilder withHBaseCheckSum(boolean useHBaseCheckSum) {
|
public HFileContextBuilder withHBaseCheckSum(boolean useHBaseCheckSum) {
|
||||||
this.usesHBaseChecksum = useHBaseCheckSum;
|
this.usesHBaseChecksum = useHBaseCheckSum;
|
||||||
return this;
|
return this;
|
||||||
|
|
|
@ -496,6 +496,12 @@ public abstract class ByteBuff {
|
||||||
return -(low + 1); // key not found.
|
return -(low + 1); // key not found.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return this.getClass().getSimpleName() + "[pos=" + position() + ", lim=" + limit() +
|
||||||
|
", cap= " + capacity() + "]";
|
||||||
|
}
|
||||||
|
|
||||||
public static String toStringBinary(final ByteBuff b, int off, int len) {
|
public static String toStringBinary(final ByteBuff b, int off, int len) {
|
||||||
StringBuilder result = new StringBuilder();
|
StringBuilder result = new StringBuilder();
|
||||||
// Just in case we are passed a 'len' that is > buffer length...
|
// Just in case we are passed a 'len' that is > buffer length...
|
||||||
|
|
|
@ -260,7 +260,7 @@ public class MemcachedBlockCache implements BlockCache {
|
||||||
public HFileBlock decode(CachedData d) {
|
public HFileBlock decode(CachedData d) {
|
||||||
try {
|
try {
|
||||||
ByteBuff buf = new SingleByteBuff(ByteBuffer.wrap(d.getData()));
|
ByteBuff buf = new SingleByteBuff(ByteBuffer.wrap(d.getData()));
|
||||||
return (HFileBlock) HFileBlock.blockDeserializer.deserialize(buf, true,
|
return (HFileBlock) HFileBlock.BLOCK_DESERIALIZER.deserialize(buf, true,
|
||||||
MemoryType.EXCLUSIVE);
|
MemoryType.EXCLUSIVE);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("Error deserializing data from memcached",e);
|
LOG.warn("Error deserializing data from memcached",e);
|
||||||
|
|
|
@ -91,7 +91,7 @@ public class ChecksumUtil {
|
||||||
|
|
||||||
// If this is an older version of the block that does not have
|
// If this is an older version of the block that does not have
|
||||||
// checksums, then return false indicating that checksum verification
|
// checksums, then return false indicating that checksum verification
|
||||||
// did not succeed. Actually, this methiod should never be called
|
// did not succeed. Actually, this method should never be called
|
||||||
// when the minorVersion is 0, thus this is a defensive check for a
|
// when the minorVersion is 0, thus this is a defensive check for a
|
||||||
// cannot-happen case. Since this is a cannot-happen case, it is
|
// cannot-happen case. Since this is a cannot-happen case, it is
|
||||||
// better to return false to indicate a checksum validation failure.
|
// better to return false to indicate a checksum validation failure.
|
||||||
|
@ -141,8 +141,7 @@ public class ChecksumUtil {
|
||||||
* @return The number of bytes needed to store the checksum values
|
* @return The number of bytes needed to store the checksum values
|
||||||
*/
|
*/
|
||||||
static long numBytes(long datasize, int bytesPerChecksum) {
|
static long numBytes(long datasize, int bytesPerChecksum) {
|
||||||
return numChunks(datasize, bytesPerChecksum) *
|
return numChunks(datasize, bytesPerChecksum) * HFileBlock.CHECKSUM_SIZE;
|
||||||
HFileBlock.CHECKSUM_SIZE;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -60,7 +60,7 @@ import org.apache.hadoop.util.StringUtils;
|
||||||
* Examples of how to use the block index writer can be found in
|
* Examples of how to use the block index writer can be found in
|
||||||
* {@link org.apache.hadoop.hbase.io.hfile.CompoundBloomFilterWriter} and
|
* {@link org.apache.hadoop.hbase.io.hfile.CompoundBloomFilterWriter} and
|
||||||
* {@link HFileWriterImpl}. Examples of how to use the reader can be
|
* {@link HFileWriterImpl}. Examples of how to use the reader can be
|
||||||
* found in {@link HFileWriterImpl} and
|
* found in {@link HFileReaderImpl} and
|
||||||
* {@link org.apache.hadoop.hbase.io.hfile.TestHFileBlockIndex}.
|
* {@link org.apache.hadoop.hbase.io.hfile.TestHFileBlockIndex}.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
|
|
|
@ -252,18 +252,20 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
|
||||||
long end = 0;
|
long end = 0;
|
||||||
try {
|
try {
|
||||||
end = getTrailer().getLoadOnOpenDataOffset();
|
end = getTrailer().getLoadOnOpenDataOffset();
|
||||||
HFileBlock prevBlock = null;
|
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("File=" + path.toString() + ", offset=" + offset + ", end=" + end);
|
LOG.trace("File=" + path.toString() + ", offset=" + offset + ", end=" + end);
|
||||||
}
|
}
|
||||||
|
// TODO: Could we use block iterator in here? Would that get stuff into the cache?
|
||||||
|
HFileBlock prevBlock = null;
|
||||||
while (offset < end) {
|
while (offset < end) {
|
||||||
if (Thread.interrupted()) {
|
if (Thread.interrupted()) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
long onDiskSize = -1;
|
// Perhaps we got our block from cache? Unlikely as this may be, if it happens, then
|
||||||
if (prevBlock != null) {
|
// the internal-to-hfileblock thread local which holds the overread that gets the
|
||||||
onDiskSize = prevBlock.getNextBlockOnDiskSizeWithHeader();
|
// next header, will not have happened...so, pass in the onDiskSize gotten from the
|
||||||
}
|
// cached block. This 'optimization' triggers extremely rarely I'd say.
|
||||||
|
long onDiskSize = prevBlock != null? prevBlock.getNextBlockOnDiskSize(): -1;
|
||||||
HFileBlock block = readBlock(offset, onDiskSize, true, false, false, false,
|
HFileBlock block = readBlock(offset, onDiskSize, true, false, false, false,
|
||||||
null, null);
|
null, null);
|
||||||
// Need not update the current block. Ideally here the readBlock won't find the
|
// Need not update the current block. Ideally here the readBlock won't find the
|
||||||
|
@ -903,9 +905,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
|
||||||
|
|
||||||
// We are reading the next block without block type validation, because
|
// We are reading the next block without block type validation, because
|
||||||
// it might turn out to be a non-data block.
|
// it might turn out to be a non-data block.
|
||||||
block = reader.readBlock(block.getOffset()
|
block = reader.readBlock(block.getOffset() + block.getOnDiskSizeWithHeader(),
|
||||||
+ block.getOnDiskSizeWithHeader(),
|
block.getNextBlockOnDiskSize(), cacheBlocks, pread,
|
||||||
block.getNextBlockOnDiskSizeWithHeader(), cacheBlocks, pread,
|
|
||||||
isCompaction, true, null, getEffectiveDataBlockEncoding());
|
isCompaction, true, null, getEffectiveDataBlockEncoding());
|
||||||
if (block != null && !block.getBlockType().isData()) { // Findbugs: NP_NULL_ON_SOME_PATH
|
if (block != null && !block.getBlockType().isData()) { // Findbugs: NP_NULL_ON_SOME_PATH
|
||||||
// Whatever block we read we will be returning it unless
|
// Whatever block we read we will be returning it unless
|
||||||
|
@ -1439,8 +1440,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
|
||||||
// Cache Miss, please load.
|
// Cache Miss, please load.
|
||||||
}
|
}
|
||||||
|
|
||||||
HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset,
|
HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset, blockSize, true).
|
||||||
blockSize, -1, true).unpack(hfileContext, fsBlockReader);
|
unpack(hfileContext, fsBlockReader);
|
||||||
|
|
||||||
// Cache the block
|
// Cache the block
|
||||||
if (cacheBlock) {
|
if (cacheBlock) {
|
||||||
|
@ -1526,8 +1527,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
|
||||||
traceScope.getSpan().addTimelineAnnotation("blockCacheMiss");
|
traceScope.getSpan().addTimelineAnnotation("blockCacheMiss");
|
||||||
}
|
}
|
||||||
// Load block from filesystem.
|
// Load block from filesystem.
|
||||||
HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, -1,
|
HFileBlock hfileBlock =
|
||||||
pread);
|
fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, pread);
|
||||||
validateBlockType(hfileBlock, expectedBlockType);
|
validateBlockType(hfileBlock, expectedBlockType);
|
||||||
HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader);
|
HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader);
|
||||||
BlockType.BlockCategory category = hfileBlock.getBlockType().getCategory();
|
BlockType.BlockCategory category = hfileBlock.getBlockType().getCategory();
|
||||||
|
@ -1871,6 +1872,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
|
||||||
* @return Scanner on this file.
|
* @return Scanner on this file.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
|
@VisibleForTesting
|
||||||
public HFileScanner getScanner(boolean cacheBlocks, final boolean pread) {
|
public HFileScanner getScanner(boolean cacheBlocks, final boolean pread) {
|
||||||
return getScanner(cacheBlocks, pread, false);
|
return getScanner(cacheBlocks, pread, false);
|
||||||
}
|
}
|
||||||
|
|
|
@ -99,18 +99,21 @@ public interface HFileScanner extends Shipper, Closeable {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
boolean seekTo() throws IOException;
|
boolean seekTo() throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Scans to the next entry in the file.
|
* Scans to the next entry in the file.
|
||||||
* @return Returns false if you are at the end otherwise true if more in file.
|
* @return Returns false if you are at the end otherwise true if more in file.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
boolean next() throws IOException;
|
boolean next() throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets the current key in the form of a cell. You must call
|
* Gets the current key in the form of a cell. You must call
|
||||||
* {@link #seekTo(Cell)} before this method.
|
* {@link #seekTo(Cell)} before this method.
|
||||||
* @return gets the current key as a Cell.
|
* @return gets the current key as a Cell.
|
||||||
*/
|
*/
|
||||||
Cell getKey();
|
Cell getKey();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets a buffer view to the current value. You must call
|
* Gets a buffer view to the current value. You must call
|
||||||
* {@link #seekTo(Cell)} before this method.
|
* {@link #seekTo(Cell)} before this method.
|
||||||
|
@ -119,26 +122,35 @@ public interface HFileScanner extends Shipper, Closeable {
|
||||||
* the position is 0, the start of the buffer view.
|
* the position is 0, the start of the buffer view.
|
||||||
*/
|
*/
|
||||||
ByteBuffer getValue();
|
ByteBuffer getValue();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return Instance of {@link org.apache.hadoop.hbase.Cell}.
|
* @return Instance of {@link org.apache.hadoop.hbase.Cell}.
|
||||||
*/
|
*/
|
||||||
Cell getCell();
|
Cell getCell();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Convenience method to get a copy of the key as a string - interpreting the
|
* Convenience method to get a copy of the key as a string - interpreting the
|
||||||
* bytes as UTF8. You must call {@link #seekTo(Cell)} before this method.
|
* bytes as UTF8. You must call {@link #seekTo(Cell)} before this method.
|
||||||
* @return key as a string
|
* @return key as a string
|
||||||
|
* @deprecated Since hbase-2.0.0
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
String getKeyString();
|
String getKeyString();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Convenience method to get a copy of the value as a string - interpreting
|
* Convenience method to get a copy of the value as a string - interpreting
|
||||||
* the bytes as UTF8. You must call {@link #seekTo(Cell)} before this method.
|
* the bytes as UTF8. You must call {@link #seekTo(Cell)} before this method.
|
||||||
* @return value as a string
|
* @return value as a string
|
||||||
|
* @deprecated Since hbase-2.0.0
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
String getValueString();
|
String getValueString();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return Reader that underlies this Scanner instance.
|
* @return Reader that underlies this Scanner instance.
|
||||||
*/
|
*/
|
||||||
HFile.Reader getReader();
|
HFile.Reader getReader();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return True is scanner has had one of the seek calls invoked; i.e.
|
* @return True is scanner has had one of the seek calls invoked; i.e.
|
||||||
* {@link #seekBefore(Cell)} or {@link #seekTo()} or {@link #seekTo(Cell)}.
|
* {@link #seekBefore(Cell)} or {@link #seekTo()} or {@link #seekTo(Cell)}.
|
||||||
|
|
|
@ -1317,25 +1317,22 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
final AtomicLong realCacheSize) throws CacheFullException, IOException,
|
final AtomicLong realCacheSize) throws CacheFullException, IOException,
|
||||||
BucketAllocatorException {
|
BucketAllocatorException {
|
||||||
int len = data.getSerializedLength();
|
int len = data.getSerializedLength();
|
||||||
// This cacheable thing can't be serialized...
|
// This cacheable thing can't be serialized
|
||||||
if (len == 0) return null;
|
if (len == 0) return null;
|
||||||
long offset = bucketAllocator.allocateBlock(len);
|
long offset = bucketAllocator.allocateBlock(len);
|
||||||
BucketEntry bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory);
|
BucketEntry bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory);
|
||||||
bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap);
|
bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap);
|
||||||
try {
|
try {
|
||||||
if (data instanceof HFileBlock) {
|
if (data instanceof HFileBlock) {
|
||||||
HFileBlock block = (HFileBlock) data;
|
// If an instance of HFileBlock, save on some allocations.
|
||||||
ByteBuff sliceBuf = block.getBufferReadOnlyWithHeader();
|
HFileBlock block = (HFileBlock)data;
|
||||||
sliceBuf.rewind();
|
ByteBuff sliceBuf = block.getBufferReadOnly();
|
||||||
assert len == sliceBuf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE ||
|
ByteBuffer metadata = block.getMetaData();
|
||||||
len == sliceBuf.limit() + block.headerSize() + HFileBlock.EXTRA_SERIALIZATION_SPACE;
|
|
||||||
ByteBuffer extraInfoBuffer = ByteBuffer.allocate(HFileBlock.EXTRA_SERIALIZATION_SPACE);
|
|
||||||
block.serializeExtraInfo(extraInfoBuffer);
|
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("Write offset=" + offset + ", len=" + len);
|
LOG.trace("Write offset=" + offset + ", len=" + len);
|
||||||
}
|
}
|
||||||
ioEngine.write(sliceBuf, offset);
|
ioEngine.write(sliceBuf, offset);
|
||||||
ioEngine.write(extraInfoBuffer, offset + len - HFileBlock.EXTRA_SERIALIZATION_SPACE);
|
ioEngine.write(metadata, offset + len - metadata.limit());
|
||||||
} else {
|
} else {
|
||||||
ByteBuffer bb = ByteBuffer.allocate(len);
|
ByteBuffer bb = ByteBuffer.allocate(len);
|
||||||
data.serialize(bb);
|
data.serialize(bb);
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
@ -32,7 +33,7 @@ import org.apache.hadoop.hbase.client.Scan;
|
||||||
// TODO: Change name from KeyValueScanner to CellScanner only we already have a simple CellScanner
|
// TODO: Change name from KeyValueScanner to CellScanner only we already have a simple CellScanner
|
||||||
// so this should be something else altogether, a decoration on our base CellScanner. TODO.
|
// so this should be something else altogether, a decoration on our base CellScanner. TODO.
|
||||||
// This class shows in CPs so do it all in one swell swoop. HBase-2.0.0.
|
// This class shows in CPs so do it all in one swell swoop. HBase-2.0.0.
|
||||||
public interface KeyValueScanner extends Shipper {
|
public interface KeyValueScanner extends Shipper, Closeable {
|
||||||
/**
|
/**
|
||||||
* The byte array represents for NO_NEXT_INDEXED_KEY;
|
* The byte array represents for NO_NEXT_INDEXED_KEY;
|
||||||
* The actual value is irrelevant because this is always compared by reference.
|
* The actual value is irrelevant because this is always compared by reference.
|
||||||
|
@ -74,6 +75,7 @@ public interface KeyValueScanner extends Shipper {
|
||||||
* The default implementation for this would be to return 0. A file having
|
* The default implementation for this would be to return 0. A file having
|
||||||
* lower sequence id will be considered to be the older one.
|
* lower sequence id will be considered to be the older one.
|
||||||
*/
|
*/
|
||||||
|
// TODO: Implement SequenceId Interface instead.
|
||||||
long getSequenceID();
|
long getSequenceID();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -1271,7 +1271,7 @@ public class StoreFile {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Warning: Do not write further code which depends on this call. Instead
|
* @deprecated Do not write further code which depends on this call. Instead
|
||||||
* use getStoreFileScanner() which uses the StoreFileScanner class/interface
|
* use getStoreFileScanner() which uses the StoreFileScanner class/interface
|
||||||
* which is the preferred way to scan a store with higher level concepts.
|
* which is the preferred way to scan a store with higher level concepts.
|
||||||
*
|
*
|
||||||
|
@ -1285,7 +1285,7 @@ public class StoreFile {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Warning: Do not write further code which depends on this call. Instead
|
* @deprecated Do not write further code which depends on this call. Instead
|
||||||
* use getStoreFileScanner() which uses the StoreFileScanner class/interface
|
* use getStoreFileScanner() which uses the StoreFileScanner class/interface
|
||||||
* which is the preferred way to scan a store with higher level concepts.
|
* which is the preferred way to scan a store with higher level concepts.
|
||||||
*
|
*
|
||||||
|
|
|
@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.io.HeapSize;
|
||||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||||
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
|
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
|
||||||
import org.apache.hadoop.hbase.nio.ByteBuff;
|
import org.apache.hadoop.hbase.nio.ByteBuff;
|
||||||
import org.apache.hadoop.hbase.nio.SingleByteBuff;
|
|
||||||
import org.apache.hadoop.hbase.util.ChecksumType;
|
import org.apache.hadoop.hbase.util.ChecksumType;
|
||||||
|
|
||||||
public class CacheTestUtils {
|
public class CacheTestUtils {
|
||||||
|
@ -66,6 +65,7 @@ public class CacheTestUtils {
|
||||||
/*Post eviction, heapsize should be the same */
|
/*Post eviction, heapsize should be the same */
|
||||||
assertEquals(heapSize, ((HeapSize) toBeTested).heapSize());
|
assertEquals(heapSize, ((HeapSize) toBeTested).heapSize());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void testCacheMultiThreaded(final BlockCache toBeTested,
|
public static void testCacheMultiThreaded(final BlockCache toBeTested,
|
||||||
final int blockSize, final int numThreads, final int numQueries,
|
final int blockSize, final int numThreads, final int numQueries,
|
||||||
final double passingScore) throws Exception {
|
final double passingScore) throws Exception {
|
||||||
|
@ -339,25 +339,16 @@ public class CacheTestUtils {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private static HFileBlockPair[] generateHFileBlocks(int blockSize,
|
private static HFileBlockPair[] generateHFileBlocks(int blockSize, int numBlocks) {
|
||||||
int numBlocks) {
|
|
||||||
HFileBlockPair[] returnedBlocks = new HFileBlockPair[numBlocks];
|
HFileBlockPair[] returnedBlocks = new HFileBlockPair[numBlocks];
|
||||||
Random rand = new Random();
|
Random rand = new Random();
|
||||||
HashSet<String> usedStrings = new HashSet<String>();
|
HashSet<String> usedStrings = new HashSet<String>();
|
||||||
for (int i = 0; i < numBlocks; i++) {
|
for (int i = 0; i < numBlocks; i++) {
|
||||||
|
ByteBuffer cachedBuffer = ByteBuffer.allocate(blockSize);
|
||||||
// The buffer serialized size needs to match the size of BlockSize. So we
|
|
||||||
// declare our data size to be smaller than it by the serialization space
|
|
||||||
// required.
|
|
||||||
|
|
||||||
SingleByteBuff cachedBuffer = new SingleByteBuff(ByteBuffer.allocate(blockSize
|
|
||||||
- HFileBlock.EXTRA_SERIALIZATION_SPACE));
|
|
||||||
rand.nextBytes(cachedBuffer.array());
|
rand.nextBytes(cachedBuffer.array());
|
||||||
cachedBuffer.rewind();
|
cachedBuffer.rewind();
|
||||||
int onDiskSizeWithoutHeader = blockSize
|
int onDiskSizeWithoutHeader = blockSize;
|
||||||
- HFileBlock.EXTRA_SERIALIZATION_SPACE;
|
int uncompressedSizeWithoutHeader = blockSize;
|
||||||
int uncompressedSizeWithoutHeader = blockSize
|
|
||||||
- HFileBlock.EXTRA_SERIALIZATION_SPACE;
|
|
||||||
long prevBlockOffset = rand.nextLong();
|
long prevBlockOffset = rand.nextLong();
|
||||||
BlockType.DATA.write(cachedBuffer);
|
BlockType.DATA.write(cachedBuffer);
|
||||||
cachedBuffer.putInt(onDiskSizeWithoutHeader);
|
cachedBuffer.putInt(onDiskSizeWithoutHeader);
|
||||||
|
@ -376,7 +367,7 @@ public class CacheTestUtils {
|
||||||
onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader,
|
onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader,
|
||||||
prevBlockOffset, cachedBuffer, HFileBlock.DONT_FILL_HEADER,
|
prevBlockOffset, cachedBuffer, HFileBlock.DONT_FILL_HEADER,
|
||||||
blockSize,
|
blockSize,
|
||||||
onDiskSizeWithoutHeader + HConstants.HFILEBLOCK_HEADER_SIZE, meta);
|
onDiskSizeWithoutHeader + HConstants.HFILEBLOCK_HEADER_SIZE, -1, meta);
|
||||||
|
|
||||||
String strKey;
|
String strKey;
|
||||||
/* No conflicting keys */
|
/* No conflicting keys */
|
||||||
|
|
|
@ -259,7 +259,6 @@ public class TestCacheOnWrite {
|
||||||
assertTrue(testDescription, scanner.seekTo());
|
assertTrue(testDescription, scanner.seekTo());
|
||||||
|
|
||||||
long offset = 0;
|
long offset = 0;
|
||||||
HFileBlock prevBlock = null;
|
|
||||||
EnumMap<BlockType, Integer> blockCountByType =
|
EnumMap<BlockType, Integer> blockCountByType =
|
||||||
new EnumMap<BlockType, Integer>(BlockType.class);
|
new EnumMap<BlockType, Integer>(BlockType.class);
|
||||||
|
|
||||||
|
@ -267,14 +266,10 @@ public class TestCacheOnWrite {
|
||||||
List<Long> cachedBlocksOffset = new ArrayList<Long>();
|
List<Long> cachedBlocksOffset = new ArrayList<Long>();
|
||||||
Map<Long, HFileBlock> cachedBlocks = new HashMap<Long, HFileBlock>();
|
Map<Long, HFileBlock> cachedBlocks = new HashMap<Long, HFileBlock>();
|
||||||
while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
|
while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
|
||||||
long onDiskSize = -1;
|
|
||||||
if (prevBlock != null) {
|
|
||||||
onDiskSize = prevBlock.getNextBlockOnDiskSizeWithHeader();
|
|
||||||
}
|
|
||||||
// Flags: don't cache the block, use pread, this is not a compaction.
|
// Flags: don't cache the block, use pread, this is not a compaction.
|
||||||
// Also, pass null for expected block type to avoid checking it.
|
// Also, pass null for expected block type to avoid checking it.
|
||||||
HFileBlock block = reader.readBlock(offset, onDiskSize, false, true,
|
HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null,
|
||||||
false, true, null, encodingInCache);
|
encodingInCache);
|
||||||
BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(),
|
BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(),
|
||||||
offset);
|
offset);
|
||||||
HFileBlock fromCache = (HFileBlock) blockCache.getBlock(blockCacheKey, true, false, true);
|
HFileBlock fromCache = (HFileBlock) blockCache.getBlock(blockCacheKey, true, false, true);
|
||||||
|
@ -307,7 +302,6 @@ public class TestCacheOnWrite {
|
||||||
assertEquals(
|
assertEquals(
|
||||||
block.getUncompressedSizeWithoutHeader(), fromCache.getUncompressedSizeWithoutHeader());
|
block.getUncompressedSizeWithoutHeader(), fromCache.getUncompressedSizeWithoutHeader());
|
||||||
}
|
}
|
||||||
prevBlock = block;
|
|
||||||
offset += block.getOnDiskSizeWithHeader();
|
offset += block.getOnDiskSizeWithHeader();
|
||||||
BlockType bt = block.getBlockType();
|
BlockType bt = block.getBlockType();
|
||||||
Integer count = blockCountByType.get(bt);
|
Integer count = blockCountByType.get(bt);
|
||||||
|
|
|
@ -94,7 +94,7 @@ public class TestChecksum {
|
||||||
meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
|
meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
|
||||||
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(
|
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(
|
||||||
is, totalSize, (HFileSystem) fs, path, meta);
|
is, totalSize, (HFileSystem) fs, path, meta);
|
||||||
HFileBlock b = hbr.readBlockData(0, -1, -1, false);
|
HFileBlock b = hbr.readBlockData(0, -1, false);
|
||||||
assertEquals(b.getChecksumType(), ChecksumType.getDefaultChecksumType().getCode());
|
assertEquals(b.getChecksumType(), ChecksumType.getDefaultChecksumType().getCode());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -108,12 +108,14 @@ public class TestChecksum {
|
||||||
ChecksumType cktype = itr.next();
|
ChecksumType cktype = itr.next();
|
||||||
Path path = new Path(TEST_UTIL.getDataTestDir(), "checksum" + cktype.getName());
|
Path path = new Path(TEST_UTIL.getDataTestDir(), "checksum" + cktype.getName());
|
||||||
FSDataOutputStream os = fs.create(path);
|
FSDataOutputStream os = fs.create(path);
|
||||||
HFileContext meta = new HFileContextBuilder()
|
HFileContext meta = new HFileContextBuilder().
|
||||||
.withChecksumType(cktype).build();
|
withChecksumType(cktype).
|
||||||
|
build();
|
||||||
HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
|
HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
|
||||||
DataOutputStream dos = hbw.startWriting(BlockType.DATA);
|
DataOutputStream dos = hbw.startWriting(BlockType.DATA);
|
||||||
for (int i = 0; i < 1000; ++i)
|
for (int i = 0; i < 1000; ++i) {
|
||||||
dos.writeInt(i);
|
dos.writeInt(i);
|
||||||
|
}
|
||||||
hbw.writeHeaderAndData(os);
|
hbw.writeHeaderAndData(os);
|
||||||
int totalSize = hbw.getOnDiskSizeWithHeader();
|
int totalSize = hbw.getOnDiskSizeWithHeader();
|
||||||
os.close();
|
os.close();
|
||||||
|
@ -125,7 +127,7 @@ public class TestChecksum {
|
||||||
meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
|
meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
|
||||||
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(
|
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(
|
||||||
is, totalSize, (HFileSystem) fs, path, meta);
|
is, totalSize, (HFileSystem) fs, path, meta);
|
||||||
HFileBlock b = hbr.readBlockData(0, -1, -1, false);
|
HFileBlock b = hbr.readBlockData(0, -1, false);
|
||||||
ByteBuff data = b.getBufferWithoutHeader();
|
ByteBuff data = b.getBufferWithoutHeader();
|
||||||
for (int i = 0; i < 1000; i++) {
|
for (int i = 0; i < 1000; i++) {
|
||||||
assertEquals(i, data.getInt());
|
assertEquals(i, data.getInt());
|
||||||
|
@ -188,7 +190,7 @@ public class TestChecksum {
|
||||||
.withHBaseCheckSum(true)
|
.withHBaseCheckSum(true)
|
||||||
.build();
|
.build();
|
||||||
HFileBlock.FSReader hbr = new FSReaderImplTest(is, totalSize, fs, path, meta);
|
HFileBlock.FSReader hbr = new FSReaderImplTest(is, totalSize, fs, path, meta);
|
||||||
HFileBlock b = hbr.readBlockData(0, -1, -1, pread);
|
HFileBlock b = hbr.readBlockData(0, -1, pread);
|
||||||
b.sanityCheck();
|
b.sanityCheck();
|
||||||
assertEquals(4936, b.getUncompressedSizeWithoutHeader());
|
assertEquals(4936, b.getUncompressedSizeWithoutHeader());
|
||||||
assertEquals(algo == GZ ? 2173 : 4936,
|
assertEquals(algo == GZ ? 2173 : 4936,
|
||||||
|
@ -209,17 +211,17 @@ public class TestChecksum {
|
||||||
// requests. Verify that this is correct.
|
// requests. Verify that this is correct.
|
||||||
for (int i = 0; i <
|
for (int i = 0; i <
|
||||||
HFileBlock.CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD + 1; i++) {
|
HFileBlock.CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD + 1; i++) {
|
||||||
b = hbr.readBlockData(0, -1, -1, pread);
|
b = hbr.readBlockData(0, -1, pread);
|
||||||
assertEquals(0, HFile.getChecksumFailuresCount());
|
assertEquals(0, HFile.getChecksumFailuresCount());
|
||||||
}
|
}
|
||||||
// The next read should have hbase checksum verification reanabled,
|
// The next read should have hbase checksum verification reanabled,
|
||||||
// we verify this by assertng that there was a hbase-checksum failure.
|
// we verify this by assertng that there was a hbase-checksum failure.
|
||||||
b = hbr.readBlockData(0, -1, -1, pread);
|
b = hbr.readBlockData(0, -1, pread);
|
||||||
assertEquals(1, HFile.getChecksumFailuresCount());
|
assertEquals(1, HFile.getChecksumFailuresCount());
|
||||||
|
|
||||||
// Since the above encountered a checksum failure, we switch
|
// Since the above encountered a checksum failure, we switch
|
||||||
// back to not checking hbase checksums.
|
// back to not checking hbase checksums.
|
||||||
b = hbr.readBlockData(0, -1, -1, pread);
|
b = hbr.readBlockData(0, -1, pread);
|
||||||
assertEquals(0, HFile.getChecksumFailuresCount());
|
assertEquals(0, HFile.getChecksumFailuresCount());
|
||||||
is.close();
|
is.close();
|
||||||
|
|
||||||
|
@ -230,7 +232,7 @@ public class TestChecksum {
|
||||||
assertEquals(false, newfs.useHBaseChecksum());
|
assertEquals(false, newfs.useHBaseChecksum());
|
||||||
is = new FSDataInputStreamWrapper(newfs, path);
|
is = new FSDataInputStreamWrapper(newfs, path);
|
||||||
hbr = new FSReaderImplTest(is, totalSize, newfs, path, meta);
|
hbr = new FSReaderImplTest(is, totalSize, newfs, path, meta);
|
||||||
b = hbr.readBlockData(0, -1, -1, pread);
|
b = hbr.readBlockData(0, -1, pread);
|
||||||
is.close();
|
is.close();
|
||||||
b.sanityCheck();
|
b.sanityCheck();
|
||||||
b = b.unpack(meta, hbr);
|
b = b.unpack(meta, hbr);
|
||||||
|
@ -314,7 +316,7 @@ public class TestChecksum {
|
||||||
.build();
|
.build();
|
||||||
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper(
|
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper(
|
||||||
is, nochecksum), totalSize, hfs, path, meta);
|
is, nochecksum), totalSize, hfs, path, meta);
|
||||||
HFileBlock b = hbr.readBlockData(0, -1, -1, pread);
|
HFileBlock b = hbr.readBlockData(0, -1, pread);
|
||||||
is.close();
|
is.close();
|
||||||
b.sanityCheck();
|
b.sanityCheck();
|
||||||
assertEquals(dataSize, b.getUncompressedSizeWithoutHeader());
|
assertEquals(dataSize, b.getUncompressedSizeWithoutHeader());
|
||||||
|
@ -355,4 +357,3 @@ public class TestChecksum {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -320,7 +320,7 @@ public class TestHFileBlock {
|
||||||
.withIncludesTags(includesTag)
|
.withIncludesTags(includesTag)
|
||||||
.withCompression(algo).build();
|
.withCompression(algo).build();
|
||||||
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta);
|
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta);
|
||||||
HFileBlock b = hbr.readBlockData(0, -1, -1, pread);
|
HFileBlock b = hbr.readBlockData(0, -1, pread);
|
||||||
is.close();
|
is.close();
|
||||||
assertEquals(0, HFile.getChecksumFailuresCount());
|
assertEquals(0, HFile.getChecksumFailuresCount());
|
||||||
|
|
||||||
|
@ -334,17 +334,15 @@ public class TestHFileBlock {
|
||||||
is = fs.open(path);
|
is = fs.open(path);
|
||||||
hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta);
|
hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta);
|
||||||
b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE +
|
b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE +
|
||||||
b.totalChecksumBytes(), -1, pread);
|
b.totalChecksumBytes(), pread);
|
||||||
assertEquals(expected, b);
|
assertEquals(expected, b);
|
||||||
int wrongCompressedSize = 2172;
|
int wrongCompressedSize = 2172;
|
||||||
try {
|
try {
|
||||||
b = hbr.readBlockData(0, wrongCompressedSize
|
b = hbr.readBlockData(0, wrongCompressedSize
|
||||||
+ HConstants.HFILEBLOCK_HEADER_SIZE, -1, pread);
|
+ HConstants.HFILEBLOCK_HEADER_SIZE, pread);
|
||||||
fail("Exception expected");
|
fail("Exception expected");
|
||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
String expectedPrefix = "On-disk size without header provided is "
|
String expectedPrefix = "Passed in onDiskSizeWithHeader=";
|
||||||
+ wrongCompressedSize + ", but block header contains "
|
|
||||||
+ b.getOnDiskSizeWithoutHeader() + ".";
|
|
||||||
assertTrue("Invalid exception message: '" + ex.getMessage()
|
assertTrue("Invalid exception message: '" + ex.getMessage()
|
||||||
+ "'.\nMessage is expected to start with: '" + expectedPrefix
|
+ "'.\nMessage is expected to start with: '" + expectedPrefix
|
||||||
+ "'", ex.getMessage().startsWith(expectedPrefix));
|
+ "'", ex.getMessage().startsWith(expectedPrefix));
|
||||||
|
@ -424,7 +422,7 @@ public class TestHFileBlock {
|
||||||
HFileBlock blockFromHFile, blockUnpacked;
|
HFileBlock blockFromHFile, blockUnpacked;
|
||||||
int pos = 0;
|
int pos = 0;
|
||||||
for (int blockId = 0; blockId < numBlocks; ++blockId) {
|
for (int blockId = 0; blockId < numBlocks; ++blockId) {
|
||||||
blockFromHFile = hbr.readBlockData(pos, -1, -1, pread);
|
blockFromHFile = hbr.readBlockData(pos, -1, pread);
|
||||||
assertEquals(0, HFile.getChecksumFailuresCount());
|
assertEquals(0, HFile.getChecksumFailuresCount());
|
||||||
blockFromHFile.sanityCheck();
|
blockFromHFile.sanityCheck();
|
||||||
pos += blockFromHFile.getOnDiskSizeWithHeader();
|
pos += blockFromHFile.getOnDiskSizeWithHeader();
|
||||||
|
@ -560,7 +558,7 @@ public class TestHFileBlock {
|
||||||
if (detailedLogging) {
|
if (detailedLogging) {
|
||||||
LOG.info("Reading block #" + i + " at offset " + curOffset);
|
LOG.info("Reading block #" + i + " at offset " + curOffset);
|
||||||
}
|
}
|
||||||
HFileBlock b = hbr.readBlockData(curOffset, -1, -1, pread);
|
HFileBlock b = hbr.readBlockData(curOffset, -1, pread);
|
||||||
if (detailedLogging) {
|
if (detailedLogging) {
|
||||||
LOG.info("Block #" + i + ": " + b);
|
LOG.info("Block #" + i + ": " + b);
|
||||||
}
|
}
|
||||||
|
@ -574,8 +572,7 @@ public class TestHFileBlock {
|
||||||
|
|
||||||
// Now re-load this block knowing the on-disk size. This tests a
|
// Now re-load this block knowing the on-disk size. This tests a
|
||||||
// different branch in the loader.
|
// different branch in the loader.
|
||||||
HFileBlock b2 = hbr.readBlockData(curOffset,
|
HFileBlock b2 = hbr.readBlockData(curOffset, b.getOnDiskSizeWithHeader(), pread);
|
||||||
b.getOnDiskSizeWithHeader(), -1, pread);
|
|
||||||
b2.sanityCheck();
|
b2.sanityCheck();
|
||||||
|
|
||||||
assertEquals(b.getBlockType(), b2.getBlockType());
|
assertEquals(b.getBlockType(), b2.getBlockType());
|
||||||
|
@ -601,7 +598,7 @@ public class TestHFileBlock {
|
||||||
b = b.unpack(meta, hbr);
|
b = b.unpack(meta, hbr);
|
||||||
// b's buffer has header + data + checksum while
|
// b's buffer has header + data + checksum while
|
||||||
// expectedContents have header + data only
|
// expectedContents have header + data only
|
||||||
ByteBuff bufRead = b.getBufferWithHeader();
|
ByteBuff bufRead = b.getBufferReadOnly();
|
||||||
ByteBuffer bufExpected = expectedContents.get(i);
|
ByteBuffer bufExpected = expectedContents.get(i);
|
||||||
boolean bytesAreCorrect = Bytes.compareTo(bufRead.array(),
|
boolean bytesAreCorrect = Bytes.compareTo(bufRead.array(),
|
||||||
bufRead.arrayOffset(),
|
bufRead.arrayOffset(),
|
||||||
|
@ -684,7 +681,7 @@ public class TestHFileBlock {
|
||||||
HFileBlock b;
|
HFileBlock b;
|
||||||
try {
|
try {
|
||||||
long onDiskSizeArg = withOnDiskSize ? expectedSize : -1;
|
long onDiskSizeArg = withOnDiskSize ? expectedSize : -1;
|
||||||
b = hbr.readBlockData(offset, onDiskSizeArg, -1, pread);
|
b = hbr.readBlockData(offset, onDiskSizeArg, pread);
|
||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
LOG.error("Error in client " + clientId + " trying to read block at "
|
LOG.error("Error in client " + clientId + " trying to read block at "
|
||||||
+ offset + ", pread=" + pread + ", withOnDiskSize=" +
|
+ offset + ", pread=" + pread + ", withOnDiskSize=" +
|
||||||
|
@ -719,8 +716,7 @@ public class TestHFileBlock {
|
||||||
protected void testConcurrentReadingInternals() throws IOException,
|
protected void testConcurrentReadingInternals() throws IOException,
|
||||||
InterruptedException, ExecutionException {
|
InterruptedException, ExecutionException {
|
||||||
for (Compression.Algorithm compressAlgo : COMPRESSION_ALGORITHMS) {
|
for (Compression.Algorithm compressAlgo : COMPRESSION_ALGORITHMS) {
|
||||||
Path path =
|
Path path = new Path(TEST_UTIL.getDataTestDir(), "concurrent_reading");
|
||||||
new Path(TEST_UTIL.getDataTestDir(), "concurrent_reading");
|
|
||||||
Random rand = defaultRandom();
|
Random rand = defaultRandom();
|
||||||
List<Long> offsets = new ArrayList<Long>();
|
List<Long> offsets = new ArrayList<Long>();
|
||||||
List<BlockType> types = new ArrayList<BlockType>();
|
List<BlockType> types = new ArrayList<BlockType>();
|
||||||
|
@ -843,8 +839,7 @@ public class TestHFileBlock {
|
||||||
.withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
|
.withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
|
||||||
.withChecksumType(ChecksumType.NULL).build();
|
.withChecksumType(ChecksumType.NULL).build();
|
||||||
HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf,
|
HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf,
|
||||||
HFileBlock.FILL_HEADER, -1,
|
HFileBlock.FILL_HEADER, -1, 0, -1, meta);
|
||||||
0, meta);
|
|
||||||
long byteBufferExpectedSize = ClassSize.align(ClassSize.estimateBase(
|
long byteBufferExpectedSize = ClassSize.align(ClassSize.estimateBase(
|
||||||
new MultiByteBuff(buf).getClass(), true)
|
new MultiByteBuff(buf).getClass(), true)
|
||||||
+ HConstants.HFILEBLOCK_HEADER_SIZE + size);
|
+ HConstants.HFILEBLOCK_HEADER_SIZE + size);
|
||||||
|
|
|
@ -1,750 +0,0 @@
|
||||||
/*
|
|
||||||
*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.io.hfile;
|
|
||||||
|
|
||||||
import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ;
|
|
||||||
import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
|
|
||||||
import static org.junit.Assert.*;
|
|
||||||
|
|
||||||
import java.io.ByteArrayOutputStream;
|
|
||||||
import java.io.DataOutputStream;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.nio.ByteBuffer;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.hbase.Cell;
|
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
|
||||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
|
||||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
|
||||||
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
|
|
||||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
|
||||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
|
||||||
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
|
|
||||||
import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
|
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
|
|
||||||
import org.apache.hadoop.hbase.nio.ByteBuff;
|
|
||||||
import org.apache.hadoop.hbase.nio.SingleByteBuff;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
|
||||||
import org.apache.hadoop.hbase.util.ChecksumType;
|
|
||||||
import org.apache.hadoop.io.WritableUtils;
|
|
||||||
import org.apache.hadoop.io.compress.Compressor;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.experimental.categories.Category;
|
|
||||||
import org.junit.runner.RunWith;
|
|
||||||
import org.junit.runners.Parameterized;
|
|
||||||
import org.junit.runners.Parameterized.Parameters;
|
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This class has unit tests to prove that older versions of
|
|
||||||
* HFiles (without checksums) are compatible with current readers.
|
|
||||||
*/
|
|
||||||
@Category({IOTests.class, SmallTests.class})
|
|
||||||
@RunWith(Parameterized.class)
|
|
||||||
public class TestHFileBlockCompatibility {
|
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(TestHFileBlockCompatibility.class);
|
|
||||||
private static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = {
|
|
||||||
NONE, GZ };
|
|
||||||
|
|
||||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
|
||||||
private HFileSystem fs;
|
|
||||||
|
|
||||||
private final boolean includesMemstoreTS;
|
|
||||||
private final boolean includesTag;
|
|
||||||
|
|
||||||
public TestHFileBlockCompatibility(boolean includesMemstoreTS, boolean includesTag) {
|
|
||||||
this.includesMemstoreTS = includesMemstoreTS;
|
|
||||||
this.includesTag = includesTag;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Parameters
|
|
||||||
public static Collection<Object[]> parameters() {
|
|
||||||
return HBaseTestingUtility.MEMSTORETS_TAGS_PARAMETRIZED;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Before
|
|
||||||
public void setUp() throws IOException {
|
|
||||||
fs = (HFileSystem)HFileSystem.get(TEST_UTIL.getConfiguration());
|
|
||||||
}
|
|
||||||
|
|
||||||
public byte[] createTestV1Block(Compression.Algorithm algo)
|
|
||||||
throws IOException {
|
|
||||||
Compressor compressor = algo.getCompressor();
|
|
||||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
|
||||||
OutputStream os = algo.createCompressionStream(baos, compressor, 0);
|
|
||||||
DataOutputStream dos = new DataOutputStream(os);
|
|
||||||
BlockType.META.write(dos); // Let's make this a meta block.
|
|
||||||
TestHFileBlock.writeTestBlockContents(dos);
|
|
||||||
dos.flush();
|
|
||||||
algo.returnCompressor(compressor);
|
|
||||||
return baos.toByteArray();
|
|
||||||
}
|
|
||||||
|
|
||||||
private Writer createTestV2Block(Compression.Algorithm algo)
|
|
||||||
throws IOException {
|
|
||||||
final BlockType blockType = BlockType.DATA;
|
|
||||||
Writer hbw = new Writer(algo, null,
|
|
||||||
includesMemstoreTS, includesTag);
|
|
||||||
DataOutputStream dos = hbw.startWriting(blockType);
|
|
||||||
TestHFileBlock.writeTestBlockContents(dos);
|
|
||||||
// make sure the block is ready by calling hbw.getHeaderAndData()
|
|
||||||
hbw.getHeaderAndData();
|
|
||||||
assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader());
|
|
||||||
hbw.releaseCompressor();
|
|
||||||
return hbw;
|
|
||||||
}
|
|
||||||
|
|
||||||
private String createTestBlockStr(Compression.Algorithm algo,
|
|
||||||
int correctLength) throws IOException {
|
|
||||||
Writer hbw = createTestV2Block(algo);
|
|
||||||
byte[] testV2Block = hbw.getHeaderAndData();
|
|
||||||
int osOffset = HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM + 9;
|
|
||||||
if (testV2Block.length == correctLength) {
|
|
||||||
// Force-set the "OS" field of the gzip header to 3 (Unix) to avoid
|
|
||||||
// variations across operating systems.
|
|
||||||
// See http://www.gzip.org/zlib/rfc-gzip.html for gzip format.
|
|
||||||
testV2Block[osOffset] = 3;
|
|
||||||
}
|
|
||||||
return Bytes.toStringBinary(testV2Block);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testNoCompression() throws IOException {
|
|
||||||
assertEquals(4000, createTestV2Block(NONE).getBlockForCaching().
|
|
||||||
getUncompressedSizeWithoutHeader());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testGzipCompression() throws IOException {
|
|
||||||
final String correctTestBlockStr =
|
|
||||||
"DATABLK*\\x00\\x00\\x00:\\x00\\x00\\x0F\\xA0\\xFF\\xFF\\xFF\\xFF"
|
|
||||||
+ "\\xFF\\xFF\\xFF\\xFF"
|
|
||||||
// gzip-compressed block: http://www.gzip.org/zlib/rfc-gzip.html
|
|
||||||
+ "\\x1F\\x8B" // gzip magic signature
|
|
||||||
+ "\\x08" // Compression method: 8 = "deflate"
|
|
||||||
+ "\\x00" // Flags
|
|
||||||
+ "\\x00\\x00\\x00\\x00" // mtime
|
|
||||||
+ "\\x00" // XFL (extra flags)
|
|
||||||
// OS (0 = FAT filesystems, 3 = Unix). However, this field
|
|
||||||
// sometimes gets set to 0 on Linux and Mac, so we reset it to 3.
|
|
||||||
+ "\\x03"
|
|
||||||
+ "\\xED\\xC3\\xC1\\x11\\x00 \\x08\\xC00DD\\xDD\\x7Fa"
|
|
||||||
+ "\\xD6\\xE8\\xA3\\xB9K\\x84`\\x96Q\\xD3\\xA8\\xDB\\xA8e\\xD4c"
|
|
||||||
+ "\\xD46\\xEA5\\xEA3\\xEA7\\xE7\\x00LI\\x5Cs\\xA0\\x0F\\x00\\x00";
|
|
||||||
final int correctGzipBlockLength = 82;
|
|
||||||
|
|
||||||
String returnedStr = createTestBlockStr(GZ, correctGzipBlockLength);
|
|
||||||
assertEquals(correctTestBlockStr, returnedStr);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testReaderV2() throws IOException {
|
|
||||||
if(includesTag) {
|
|
||||||
TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
|
|
||||||
}
|
|
||||||
for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
|
|
||||||
for (boolean pread : new boolean[] { false, true }) {
|
|
||||||
LOG.info("testReaderV2: Compression algorithm: " + algo +
|
|
||||||
", pread=" + pread);
|
|
||||||
Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
|
|
||||||
+ algo);
|
|
||||||
FSDataOutputStream os = fs.create(path);
|
|
||||||
Writer hbw = new Writer(algo, null,
|
|
||||||
includesMemstoreTS, includesTag);
|
|
||||||
long totalSize = 0;
|
|
||||||
for (int blockId = 0; blockId < 2; ++blockId) {
|
|
||||||
DataOutputStream dos = hbw.startWriting(BlockType.DATA);
|
|
||||||
for (int i = 0; i < 1234; ++i)
|
|
||||||
dos.writeInt(i);
|
|
||||||
hbw.writeHeaderAndData(os);
|
|
||||||
totalSize += hbw.getOnDiskSizeWithHeader();
|
|
||||||
}
|
|
||||||
os.close();
|
|
||||||
|
|
||||||
FSDataInputStream is = fs.open(path);
|
|
||||||
HFileContext meta = new HFileContextBuilder()
|
|
||||||
.withHBaseCheckSum(false)
|
|
||||||
.withIncludesMvcc(includesMemstoreTS)
|
|
||||||
.withIncludesTags(includesTag)
|
|
||||||
.withCompression(algo)
|
|
||||||
.build();
|
|
||||||
HFileBlock.FSReader hbr =
|
|
||||||
new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper(is), totalSize, fs, path, meta);
|
|
||||||
HFileBlock b = hbr.readBlockData(0, -1, -1, pread);
|
|
||||||
is.close();
|
|
||||||
|
|
||||||
b.sanityCheck();
|
|
||||||
assertEquals(4936, b.getUncompressedSizeWithoutHeader());
|
|
||||||
assertEquals(algo == GZ ? 2173 : 4936,
|
|
||||||
b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
|
|
||||||
HFileBlock expected = b;
|
|
||||||
|
|
||||||
if (algo == GZ) {
|
|
||||||
is = fs.open(path);
|
|
||||||
hbr = new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper(is), totalSize, fs, path,
|
|
||||||
meta);
|
|
||||||
b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM +
|
|
||||||
b.totalChecksumBytes(), -1, pread);
|
|
||||||
assertEquals(expected, b);
|
|
||||||
int wrongCompressedSize = 2172;
|
|
||||||
try {
|
|
||||||
b = hbr.readBlockData(0, wrongCompressedSize
|
|
||||||
+ HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM, -1, pread);
|
|
||||||
fail("Exception expected");
|
|
||||||
} catch (IOException ex) {
|
|
||||||
String expectedPrefix = "On-disk size without header provided is "
|
|
||||||
+ wrongCompressedSize + ", but block header contains "
|
|
||||||
+ b.getOnDiskSizeWithoutHeader() + ".";
|
|
||||||
assertTrue("Invalid exception message: '" + ex.getMessage()
|
|
||||||
+ "'.\nMessage is expected to start with: '" + expectedPrefix
|
|
||||||
+ "'", ex.getMessage().startsWith(expectedPrefix));
|
|
||||||
}
|
|
||||||
is.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Test encoding/decoding data blocks.
|
|
||||||
* @throws IOException a bug or a problem with temporary files.
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public void testDataBlockEncoding() throws IOException {
|
|
||||||
if(includesTag) {
|
|
||||||
TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
|
|
||||||
}
|
|
||||||
final int numBlocks = 5;
|
|
||||||
for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
|
|
||||||
for (boolean pread : new boolean[] { false, true }) {
|
|
||||||
for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
|
|
||||||
LOG.info("testDataBlockEncoding algo " + algo +
|
|
||||||
" pread = " + pread +
|
|
||||||
" encoding " + encoding);
|
|
||||||
Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
|
|
||||||
+ algo + "_" + encoding.toString());
|
|
||||||
FSDataOutputStream os = fs.create(path);
|
|
||||||
HFileDataBlockEncoder dataBlockEncoder = (encoding != DataBlockEncoding.NONE) ?
|
|
||||||
new HFileDataBlockEncoderImpl(encoding) : NoOpDataBlockEncoder.INSTANCE;
|
|
||||||
TestHFileBlockCompatibility.Writer hbw =
|
|
||||||
new TestHFileBlockCompatibility.Writer(algo,
|
|
||||||
dataBlockEncoder, includesMemstoreTS, includesTag);
|
|
||||||
long totalSize = 0;
|
|
||||||
final List<Integer> encodedSizes = new ArrayList<Integer>();
|
|
||||||
final List<ByteBuffer> encodedBlocks = new ArrayList<ByteBuffer>();
|
|
||||||
for (int blockId = 0; blockId < numBlocks; ++blockId) {
|
|
||||||
hbw.startWriting(BlockType.DATA);
|
|
||||||
TestHFileBlock.writeTestKeyValues(hbw, blockId, pread, includesTag);
|
|
||||||
hbw.writeHeaderAndData(os);
|
|
||||||
int headerLen = HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
|
|
||||||
byte[] encodedResultWithHeader = hbw.getUncompressedDataWithHeader();
|
|
||||||
final int encodedSize = encodedResultWithHeader.length - headerLen;
|
|
||||||
if (encoding != DataBlockEncoding.NONE) {
|
|
||||||
// We need to account for the two-byte encoding algorithm ID that
|
|
||||||
// comes after the 24-byte block header but before encoded KVs.
|
|
||||||
headerLen += DataBlockEncoding.ID_SIZE;
|
|
||||||
}
|
|
||||||
byte[] encodedDataSection =
|
|
||||||
new byte[encodedResultWithHeader.length - headerLen];
|
|
||||||
System.arraycopy(encodedResultWithHeader, headerLen,
|
|
||||||
encodedDataSection, 0, encodedDataSection.length);
|
|
||||||
final ByteBuffer encodedBuf =
|
|
||||||
ByteBuffer.wrap(encodedDataSection);
|
|
||||||
encodedSizes.add(encodedSize);
|
|
||||||
encodedBlocks.add(encodedBuf);
|
|
||||||
totalSize += hbw.getOnDiskSizeWithHeader();
|
|
||||||
}
|
|
||||||
os.close();
|
|
||||||
|
|
||||||
FSDataInputStream is = fs.open(path);
|
|
||||||
HFileContext meta = new HFileContextBuilder()
|
|
||||||
.withHBaseCheckSum(false)
|
|
||||||
.withIncludesMvcc(includesMemstoreTS)
|
|
||||||
.withIncludesTags(includesTag)
|
|
||||||
.withCompression(algo)
|
|
||||||
.build();
|
|
||||||
HFileBlock.FSReaderImpl hbr = new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper(is),
|
|
||||||
totalSize, fs, path, meta);
|
|
||||||
hbr.setDataBlockEncoder(dataBlockEncoder);
|
|
||||||
hbr.setIncludesMemstoreTS(includesMemstoreTS);
|
|
||||||
|
|
||||||
HFileBlock b;
|
|
||||||
int pos = 0;
|
|
||||||
for (int blockId = 0; blockId < numBlocks; ++blockId) {
|
|
||||||
b = hbr.readBlockData(pos, -1, -1, pread);
|
|
||||||
b.sanityCheck();
|
|
||||||
if (meta.isCompressedOrEncrypted()) {
|
|
||||||
assertFalse(b.isUnpacked());
|
|
||||||
b = b.unpack(meta, hbr);
|
|
||||||
}
|
|
||||||
pos += b.getOnDiskSizeWithHeader();
|
|
||||||
|
|
||||||
assertEquals((int) encodedSizes.get(blockId),
|
|
||||||
b.getUncompressedSizeWithoutHeader());
|
|
||||||
ByteBuff actualBuffer = b.getBufferWithoutHeader();
|
|
||||||
if (encoding != DataBlockEncoding.NONE) {
|
|
||||||
// We expect a two-byte big-endian encoding id.
|
|
||||||
assertEquals(0, actualBuffer.get(0));
|
|
||||||
assertEquals(encoding.getId(), actualBuffer.get(1));
|
|
||||||
actualBuffer.position(2);
|
|
||||||
actualBuffer = actualBuffer.slice();
|
|
||||||
}
|
|
||||||
|
|
||||||
ByteBuffer expectedBuffer = encodedBlocks.get(blockId);
|
|
||||||
expectedBuffer.rewind();
|
|
||||||
|
|
||||||
// test if content matches, produce nice message
|
|
||||||
TestHFileBlock.assertBuffersEqual(new SingleByteBuff(expectedBuffer), actualBuffer,
|
|
||||||
algo, encoding, pread);
|
|
||||||
}
|
|
||||||
is.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
/**
|
|
||||||
* This is the version of the HFileBlock.Writer that is used to
|
|
||||||
* create V2 blocks with minor version 0. These blocks do not
|
|
||||||
* have hbase-level checksums. The code is here to test
|
|
||||||
* backward compatibility. The reason we do not inherit from
|
|
||||||
* HFileBlock.Writer is because we never ever want to change the code
|
|
||||||
* in this class but the code in HFileBlock.Writer will continually
|
|
||||||
* evolve.
|
|
||||||
*/
|
|
||||||
public static final class Writer extends HFileBlock.Writer {
|
|
||||||
|
|
||||||
// These constants are as they were in minorVersion 0.
|
|
||||||
private static final int HEADER_SIZE = HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
|
|
||||||
private static final boolean DONT_FILL_HEADER = HFileBlock.DONT_FILL_HEADER;
|
|
||||||
private static final byte[] DUMMY_HEADER = HFileBlock.DUMMY_HEADER_NO_CHECKSUM;
|
|
||||||
|
|
||||||
private enum State {
|
|
||||||
INIT,
|
|
||||||
WRITING,
|
|
||||||
BLOCK_READY
|
|
||||||
};
|
|
||||||
|
|
||||||
/** Writer state. Used to ensure the correct usage protocol. */
|
|
||||||
private State state = State.INIT;
|
|
||||||
|
|
||||||
/** Compression algorithm for all blocks this instance writes. */
|
|
||||||
private final Compression.Algorithm compressAlgo;
|
|
||||||
|
|
||||||
/** Data block encoder used for data blocks */
|
|
||||||
private final HFileDataBlockEncoder dataBlockEncoder;
|
|
||||||
|
|
||||||
private HFileBlockEncodingContext dataBlockEncodingCtx;
|
|
||||||
/** block encoding context for non-data blocks */
|
|
||||||
private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The stream we use to accumulate data in uncompressed format for each
|
|
||||||
* block. We reset this stream at the end of each block and reuse it. The
|
|
||||||
* header is written as the first {@link #HEADER_SIZE} bytes into this
|
|
||||||
* stream.
|
|
||||||
*/
|
|
||||||
private ByteArrayOutputStream baosInMemory;
|
|
||||||
|
|
||||||
/** Compressor, which is also reused between consecutive blocks. */
|
|
||||||
private Compressor compressor;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Current block type. Set in {@link #startWriting(BlockType)}. Could be
|
|
||||||
* changed in {@link #encodeDataBlockForDisk()} from {@link BlockType#DATA}
|
|
||||||
* to {@link BlockType#ENCODED_DATA}.
|
|
||||||
*/
|
|
||||||
private BlockType blockType;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A stream that we write uncompressed bytes to, which compresses them and
|
|
||||||
* writes them to {@link #baosInMemory}.
|
|
||||||
*/
|
|
||||||
private DataOutputStream userDataStream;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Bytes to be written to the file system, including the header. Compressed
|
|
||||||
* if compression is turned on.
|
|
||||||
*/
|
|
||||||
private byte[] onDiskBytesWithHeader;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Valid in the READY state. Contains the header and the uncompressed (but
|
|
||||||
* potentially encoded, if this is a data block) bytes, so the length is
|
|
||||||
* {@link #uncompressedSizeWithoutHeader} + {@link org.apache.hadoop.hbase.HConstants#HFILEBLOCK_HEADER_SIZE}.
|
|
||||||
*/
|
|
||||||
private byte[] uncompressedBytesWithHeader;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Current block's start offset in the {@link HFile}. Set in
|
|
||||||
* {@link #writeHeaderAndData(FSDataOutputStream)}.
|
|
||||||
*/
|
|
||||||
private long startOffset;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Offset of previous block by block type. Updated when the next block is
|
|
||||||
* started.
|
|
||||||
*/
|
|
||||||
private long[] prevOffsetByType;
|
|
||||||
|
|
||||||
/** The offset of the previous block of the same type */
|
|
||||||
private long prevOffset;
|
|
||||||
|
|
||||||
private int unencodedDataSizeWritten;
|
|
||||||
|
|
||||||
public Writer(Compression.Algorithm compressionAlgorithm,
|
|
||||||
HFileDataBlockEncoder dataBlockEncoder, boolean includesMemstoreTS, boolean includesTag) {
|
|
||||||
this(dataBlockEncoder, new HFileContextBuilder().withHBaseCheckSum(false)
|
|
||||||
.withIncludesMvcc(includesMemstoreTS).withIncludesTags(includesTag)
|
|
||||||
.withCompression(compressionAlgorithm).build());
|
|
||||||
}
|
|
||||||
|
|
||||||
public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext meta) {
|
|
||||||
super(dataBlockEncoder, meta);
|
|
||||||
compressAlgo = meta.getCompression() == null ? NONE : meta.getCompression();
|
|
||||||
this.dataBlockEncoder = dataBlockEncoder != null ? dataBlockEncoder
|
|
||||||
: NoOpDataBlockEncoder.INSTANCE;
|
|
||||||
defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null, DUMMY_HEADER, meta);
|
|
||||||
dataBlockEncodingCtx = this.dataBlockEncoder.newDataBlockEncodingContext(DUMMY_HEADER, meta);
|
|
||||||
baosInMemory = new ByteArrayOutputStream();
|
|
||||||
|
|
||||||
prevOffsetByType = new long[BlockType.values().length];
|
|
||||||
for (int i = 0; i < prevOffsetByType.length; ++i)
|
|
||||||
prevOffsetByType[i] = -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Starts writing into the block. The previous block's data is discarded.
|
|
||||||
*
|
|
||||||
* @return the stream the user can write their data into
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
public DataOutputStream startWriting(BlockType newBlockType)
|
|
||||||
throws IOException {
|
|
||||||
if (state == State.BLOCK_READY && startOffset != -1) {
|
|
||||||
// We had a previous block that was written to a stream at a specific
|
|
||||||
// offset. Save that offset as the last offset of a block of that type.
|
|
||||||
prevOffsetByType[blockType.getId()] = startOffset;
|
|
||||||
}
|
|
||||||
|
|
||||||
startOffset = -1;
|
|
||||||
blockType = newBlockType;
|
|
||||||
|
|
||||||
baosInMemory.reset();
|
|
||||||
baosInMemory.write(DUMMY_HEADER);
|
|
||||||
|
|
||||||
state = State.WRITING;
|
|
||||||
|
|
||||||
// We will compress it later in finishBlock()
|
|
||||||
userDataStream = new DataOutputStream(baosInMemory);
|
|
||||||
if (newBlockType == BlockType.DATA) {
|
|
||||||
this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream);
|
|
||||||
}
|
|
||||||
this.unencodedDataSizeWritten = 0;
|
|
||||||
return userDataStream;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void write(Cell c) throws IOException {
|
|
||||||
KeyValue kv = KeyValueUtil.ensureKeyValue(c);
|
|
||||||
expectState(State.WRITING);
|
|
||||||
this.dataBlockEncoder.encode(kv, dataBlockEncodingCtx, this.userDataStream);
|
|
||||||
this.unencodedDataSizeWritten += kv.getLength();
|
|
||||||
if (dataBlockEncodingCtx.getHFileContext().isIncludesMvcc()) {
|
|
||||||
this.unencodedDataSizeWritten += WritableUtils.getVIntSize(kv.getSequenceId());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the stream for the user to write to. The block writer takes care
|
|
||||||
* of handling compression and buffering for caching on write. Can only be
|
|
||||||
* called in the "writing" state.
|
|
||||||
*
|
|
||||||
* @return the data output stream for the user to write to
|
|
||||||
*/
|
|
||||||
DataOutputStream getUserDataStream() {
|
|
||||||
expectState(State.WRITING);
|
|
||||||
return userDataStream;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Transitions the block writer from the "writing" state to the "block
|
|
||||||
* ready" state. Does nothing if a block is already finished.
|
|
||||||
*/
|
|
||||||
void ensureBlockReady() throws IOException {
|
|
||||||
Preconditions.checkState(state != State.INIT,
|
|
||||||
"Unexpected state: " + state);
|
|
||||||
|
|
||||||
if (state == State.BLOCK_READY)
|
|
||||||
return;
|
|
||||||
|
|
||||||
// This will set state to BLOCK_READY.
|
|
||||||
finishBlock();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* An internal method that flushes the compressing stream (if using
|
|
||||||
* compression), serializes the header, and takes care of the separate
|
|
||||||
* uncompressed stream for caching on write, if applicable. Sets block
|
|
||||||
* write state to "block ready".
|
|
||||||
*/
|
|
||||||
void finishBlock() throws IOException {
|
|
||||||
if (blockType == BlockType.DATA) {
|
|
||||||
this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream,
|
|
||||||
baosInMemory.toByteArray(), blockType);
|
|
||||||
blockType = dataBlockEncodingCtx.getBlockType();
|
|
||||||
}
|
|
||||||
userDataStream.flush();
|
|
||||||
// This does an array copy, so it is safe to cache this byte array.
|
|
||||||
uncompressedBytesWithHeader = baosInMemory.toByteArray();
|
|
||||||
prevOffset = prevOffsetByType[blockType.getId()];
|
|
||||||
|
|
||||||
// We need to set state before we can package the block up for
|
|
||||||
// cache-on-write. In a way, the block is ready, but not yet encoded or
|
|
||||||
// compressed.
|
|
||||||
state = State.BLOCK_READY;
|
|
||||||
if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) {
|
|
||||||
onDiskBytesWithHeader = dataBlockEncodingCtx
|
|
||||||
.compressAndEncrypt(uncompressedBytesWithHeader);
|
|
||||||
} else {
|
|
||||||
onDiskBytesWithHeader = defaultBlockEncodingCtx
|
|
||||||
.compressAndEncrypt(uncompressedBytesWithHeader);
|
|
||||||
}
|
|
||||||
|
|
||||||
// put the header for on disk bytes
|
|
||||||
putHeader(onDiskBytesWithHeader, 0,
|
|
||||||
onDiskBytesWithHeader.length,
|
|
||||||
uncompressedBytesWithHeader.length);
|
|
||||||
//set the header for the uncompressed bytes (for cache-on-write)
|
|
||||||
putHeader(uncompressedBytesWithHeader, 0,
|
|
||||||
onDiskBytesWithHeader.length,
|
|
||||||
uncompressedBytesWithHeader.length);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Put the header into the given byte array at the given offset.
|
|
||||||
* @param onDiskSize size of the block on disk
|
|
||||||
* @param uncompressedSize size of the block after decompression (but
|
|
||||||
* before optional data block decoding)
|
|
||||||
*/
|
|
||||||
private void putHeader(byte[] dest, int offset, int onDiskSize,
|
|
||||||
int uncompressedSize) {
|
|
||||||
offset = blockType.put(dest, offset);
|
|
||||||
offset = Bytes.putInt(dest, offset, onDiskSize - HEADER_SIZE);
|
|
||||||
offset = Bytes.putInt(dest, offset, uncompressedSize - HEADER_SIZE);
|
|
||||||
Bytes.putLong(dest, offset, prevOffset);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records
|
|
||||||
* the offset of this block so that it can be referenced in the next block
|
|
||||||
* of the same type.
|
|
||||||
*
|
|
||||||
* @param out
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
public void writeHeaderAndData(FSDataOutputStream out) throws IOException {
|
|
||||||
long offset = out.getPos();
|
|
||||||
if (startOffset != -1 && offset != startOffset) {
|
|
||||||
throw new IOException("A " + blockType + " block written to a "
|
|
||||||
+ "stream twice, first at offset " + startOffset + ", then at "
|
|
||||||
+ offset);
|
|
||||||
}
|
|
||||||
startOffset = offset;
|
|
||||||
|
|
||||||
writeHeaderAndData((DataOutputStream) out);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Writes the header and the compressed data of this block (or uncompressed
|
|
||||||
* data when not using compression) into the given stream. Can be called in
|
|
||||||
* the "writing" state or in the "block ready" state. If called in the
|
|
||||||
* "writing" state, transitions the writer to the "block ready" state.
|
|
||||||
*
|
|
||||||
* @param out the output stream to write the
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
private void writeHeaderAndData(DataOutputStream out) throws IOException {
|
|
||||||
ensureBlockReady();
|
|
||||||
out.write(onDiskBytesWithHeader);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the header or the compressed data (or uncompressed data when not
|
|
||||||
* using compression) as a byte array. Can be called in the "writing" state
|
|
||||||
* or in the "block ready" state. If called in the "writing" state,
|
|
||||||
* transitions the writer to the "block ready" state.
|
|
||||||
*
|
|
||||||
* @return header and data as they would be stored on disk in a byte array
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
public byte[] getHeaderAndData() throws IOException {
|
|
||||||
ensureBlockReady();
|
|
||||||
return onDiskBytesWithHeader;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Releases the compressor this writer uses to compress blocks into the
|
|
||||||
* compressor pool. Needs to be called before the writer is discarded.
|
|
||||||
*/
|
|
||||||
public void releaseCompressor() {
|
|
||||||
if (compressor != null) {
|
|
||||||
compressAlgo.returnCompressor(compressor);
|
|
||||||
compressor = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the on-disk size of the data portion of the block. This is the
|
|
||||||
* compressed size if compression is enabled. Can only be called in the
|
|
||||||
* "block ready" state. Header is not compressed, and its size is not
|
|
||||||
* included in the return value.
|
|
||||||
*
|
|
||||||
* @return the on-disk size of the block, not including the header.
|
|
||||||
*/
|
|
||||||
public int getOnDiskSizeWithoutHeader() {
|
|
||||||
expectState(State.BLOCK_READY);
|
|
||||||
return onDiskBytesWithHeader.length - HEADER_SIZE;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the on-disk size of the block. Can only be called in the
|
|
||||||
* "block ready" state.
|
|
||||||
*
|
|
||||||
* @return the on-disk size of the block ready to be written, including the
|
|
||||||
* header size
|
|
||||||
*/
|
|
||||||
public int getOnDiskSizeWithHeader() {
|
|
||||||
expectState(State.BLOCK_READY);
|
|
||||||
return onDiskBytesWithHeader.length;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The uncompressed size of the block data. Does not include header size.
|
|
||||||
*/
|
|
||||||
public int getUncompressedSizeWithoutHeader() {
|
|
||||||
expectState(State.BLOCK_READY);
|
|
||||||
return uncompressedBytesWithHeader.length - HEADER_SIZE;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The uncompressed size of the block data, including header size.
|
|
||||||
*/
|
|
||||||
public int getUncompressedSizeWithHeader() {
|
|
||||||
expectState(State.BLOCK_READY);
|
|
||||||
return uncompressedBytesWithHeader.length;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** @return true if a block is being written */
|
|
||||||
public boolean isWriting() {
|
|
||||||
return state == State.WRITING;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the number of bytes written into the current block so far, or
|
|
||||||
* zero if not writing the block at the moment. Note that this will return
|
|
||||||
* zero in the "block ready" state as well.
|
|
||||||
*
|
|
||||||
* @return the number of bytes written
|
|
||||||
*/
|
|
||||||
public int blockSizeWritten() {
|
|
||||||
if (state != State.WRITING)
|
|
||||||
return 0;
|
|
||||||
return this.unencodedDataSizeWritten;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the header followed by the uncompressed data, even if using
|
|
||||||
* compression. This is needed for storing uncompressed blocks in the block
|
|
||||||
* cache. Can be called in the "writing" state or the "block ready" state.
|
|
||||||
*
|
|
||||||
* @return uncompressed block bytes for caching on write
|
|
||||||
*/
|
|
||||||
private byte[] getUncompressedDataWithHeader() {
|
|
||||||
expectState(State.BLOCK_READY);
|
|
||||||
|
|
||||||
return uncompressedBytesWithHeader;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void expectState(State expectedState) {
|
|
||||||
if (state != expectedState) {
|
|
||||||
throw new IllegalStateException("Expected state: " + expectedState +
|
|
||||||
", actual state: " + state);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Similar to {@link #getUncompressedBufferWithHeader()} but returns a byte
|
|
||||||
* buffer.
|
|
||||||
*
|
|
||||||
* @return uncompressed block for caching on write in the form of a buffer
|
|
||||||
*/
|
|
||||||
public ByteBuffer getUncompressedBufferWithHeader() {
|
|
||||||
byte[] b = getUncompressedDataWithHeader();
|
|
||||||
return ByteBuffer.wrap(b, 0, b.length);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Takes the given {@link BlockWritable} instance, creates a new block of
|
|
||||||
* its appropriate type, writes the writable into this block, and flushes
|
|
||||||
* the block into the output stream. The writer is instructed not to buffer
|
|
||||||
* uncompressed bytes for cache-on-write.
|
|
||||||
*
|
|
||||||
* @param bw the block-writable object to write as a block
|
|
||||||
* @param out the file system output stream
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
public void writeBlock(BlockWritable bw, FSDataOutputStream out)
|
|
||||||
throws IOException {
|
|
||||||
bw.writeToBlock(startWriting(bw.getBlockType()));
|
|
||||||
writeHeaderAndData(out);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a new HFileBlock.
|
|
||||||
*/
|
|
||||||
public HFileBlock getBlockForCaching() {
|
|
||||||
HFileContext meta = new HFileContextBuilder()
|
|
||||||
.withHBaseCheckSum(false)
|
|
||||||
.withChecksumType(ChecksumType.NULL)
|
|
||||||
.withBytesPerCheckSum(0)
|
|
||||||
.build();
|
|
||||||
return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
|
|
||||||
getUncompressedSizeWithoutHeader(), prevOffset,
|
|
||||||
getUncompressedBufferWithHeader(), DONT_FILL_HEADER, startOffset,
|
|
||||||
getOnDiskSizeWithoutHeader(), meta);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
|
@ -185,8 +185,7 @@ public class TestHFileBlockIndex {
|
||||||
}
|
}
|
||||||
|
|
||||||
missCount += 1;
|
missCount += 1;
|
||||||
prevBlock = realReader.readBlockData(offset, onDiskSize,
|
prevBlock = realReader.readBlockData(offset, onDiskSize, pread);
|
||||||
-1, pread);
|
|
||||||
prevOffset = offset;
|
prevOffset = offset;
|
||||||
prevOnDiskSize = onDiskSize;
|
prevOnDiskSize = onDiskSize;
|
||||||
prevPread = pread;
|
prevPread = pread;
|
||||||
|
|
|
@ -92,8 +92,7 @@ public class TestHFileDataBlockEncoder {
|
||||||
|
|
||||||
if (blockEncoder.getDataBlockEncoding() ==
|
if (blockEncoder.getDataBlockEncoding() ==
|
||||||
DataBlockEncoding.NONE) {
|
DataBlockEncoding.NONE) {
|
||||||
assertEquals(block.getBufferWithHeader(),
|
assertEquals(block.getBufferReadOnly(), returnedBlock.getBufferReadOnly());
|
||||||
returnedBlock.getBufferWithHeader());
|
|
||||||
} else {
|
} else {
|
||||||
if (BlockType.ENCODED_DATA != returnedBlock.getBlockType()) {
|
if (BlockType.ENCODED_DATA != returnedBlock.getBlockType()) {
|
||||||
System.out.println(blockEncoder);
|
System.out.println(blockEncoder);
|
||||||
|
@ -127,7 +126,7 @@ public class TestHFileDataBlockEncoder {
|
||||||
.build();
|
.build();
|
||||||
HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf,
|
HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf,
|
||||||
HFileBlock.FILL_HEADER, 0,
|
HFileBlock.FILL_HEADER, 0,
|
||||||
0, hfileContext);
|
0, -1, hfileContext);
|
||||||
HFileBlock cacheBlock = createBlockOnDisk(kvs, block, useTags);
|
HFileBlock cacheBlock = createBlockOnDisk(kvs, block, useTags);
|
||||||
assertEquals(headerSize, cacheBlock.getDummyHeaderForVersion().length);
|
assertEquals(headerSize, cacheBlock.getDummyHeaderForVersion().length);
|
||||||
}
|
}
|
||||||
|
@ -198,7 +197,7 @@ public class TestHFileDataBlockEncoder {
|
||||||
.build();
|
.build();
|
||||||
HFileBlock b = new HFileBlock(BlockType.DATA, size, size, -1, buf,
|
HFileBlock b = new HFileBlock(BlockType.DATA, size, size, -1, buf,
|
||||||
HFileBlock.FILL_HEADER, 0,
|
HFileBlock.FILL_HEADER, 0,
|
||||||
0, meta);
|
0, -1, meta);
|
||||||
return b;
|
return b;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -220,7 +219,8 @@ public class TestHFileDataBlockEncoder {
|
||||||
byte[] encodedBytes = baos.toByteArray();
|
byte[] encodedBytes = baos.toByteArray();
|
||||||
size = encodedBytes.length - block.getDummyHeaderForVersion().length;
|
size = encodedBytes.length - block.getDummyHeaderForVersion().length;
|
||||||
return new HFileBlock(context.getBlockType(), size, size, -1, ByteBuffer.wrap(encodedBytes),
|
return new HFileBlock(context.getBlockType(), size, size, -1, ByteBuffer.wrap(encodedBytes),
|
||||||
HFileBlock.FILL_HEADER, 0, block.getOnDiskDataSizeWithHeader(), block.getHFileContext());
|
HFileBlock.FILL_HEADER, 0, block.getOnDiskDataSizeWithHeader(), -1,
|
||||||
|
block.getHFileContext());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void writeBlock(List<Cell> kvs, HFileContext fileContext, boolean useTags)
|
private void writeBlock(List<Cell> kvs, HFileContext fileContext, boolean useTags)
|
||||||
|
|
|
@ -99,7 +99,7 @@ public class TestHFileEncryption {
|
||||||
|
|
||||||
private long readAndVerifyBlock(long pos, HFileContext ctx, HFileBlock.FSReaderImpl hbr, int size)
|
private long readAndVerifyBlock(long pos, HFileContext ctx, HFileBlock.FSReaderImpl hbr, int size)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
HFileBlock b = hbr.readBlockData(pos, -1, -1, false);
|
HFileBlock b = hbr.readBlockData(pos, -1, false);
|
||||||
assertEquals(0, HFile.getChecksumFailuresCount());
|
assertEquals(0, HFile.getChecksumFailuresCount());
|
||||||
b.sanityCheck();
|
b.sanityCheck();
|
||||||
assertFalse(b.isUnpacked());
|
assertFalse(b.isUnpacked());
|
||||||
|
|
|
@ -218,7 +218,7 @@ public class TestHFileWriterV3 {
|
||||||
fsdis.seek(0);
|
fsdis.seek(0);
|
||||||
long curBlockPos = 0;
|
long curBlockPos = 0;
|
||||||
while (curBlockPos <= trailer.getLastDataBlockOffset()) {
|
while (curBlockPos <= trailer.getLastDataBlockOffset()) {
|
||||||
HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false)
|
HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false)
|
||||||
.unpack(context, blockReader);
|
.unpack(context, blockReader);
|
||||||
assertEquals(BlockType.DATA, block.getBlockType());
|
assertEquals(BlockType.DATA, block.getBlockType());
|
||||||
ByteBuff buf = block.getBufferWithoutHeader();
|
ByteBuff buf = block.getBufferWithoutHeader();
|
||||||
|
@ -279,13 +279,14 @@ public class TestHFileWriterV3 {
|
||||||
while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) {
|
while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) {
|
||||||
LOG.info("Current offset: " + fsdis.getPos() + ", scanning until " +
|
LOG.info("Current offset: " + fsdis.getPos() + ", scanning until " +
|
||||||
trailer.getLoadOnOpenDataOffset());
|
trailer.getLoadOnOpenDataOffset());
|
||||||
HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false)
|
HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false)
|
||||||
.unpack(context, blockReader);
|
.unpack(context, blockReader);
|
||||||
assertEquals(BlockType.META, block.getBlockType());
|
assertEquals(BlockType.META, block.getBlockType());
|
||||||
Text t = new Text();
|
Text t = new Text();
|
||||||
ByteBuff buf = block.getBufferWithoutHeader();
|
ByteBuff buf = block.getBufferWithoutHeader();
|
||||||
if (Writables.getWritable(buf.array(), buf.arrayOffset(), buf.limit(), t) == null) {
|
if (Writables.getWritable(buf.array(), buf.arrayOffset(), buf.limit(), t) == null) {
|
||||||
throw new IOException("Failed to deserialize block " + this + " into a " + t.getClass().getSimpleName());
|
throw new IOException("Failed to deserialize block " + this +
|
||||||
|
" into a " + t.getClass().getSimpleName());
|
||||||
}
|
}
|
||||||
Text expectedText =
|
Text expectedText =
|
||||||
(metaCounter == 0 ? new Text("Paris") : metaCounter == 1 ? new Text(
|
(metaCounter == 0 ? new Text("Paris") : metaCounter == 1 ? new Text(
|
||||||
|
|
|
@ -78,14 +78,8 @@ public class TestPrefetch {
|
||||||
// Check that all of the data blocks were preloaded
|
// Check that all of the data blocks were preloaded
|
||||||
BlockCache blockCache = cacheConf.getBlockCache();
|
BlockCache blockCache = cacheConf.getBlockCache();
|
||||||
long offset = 0;
|
long offset = 0;
|
||||||
HFileBlock prevBlock = null;
|
|
||||||
while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
|
while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
|
||||||
long onDiskSize = -1;
|
HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null);
|
||||||
if (prevBlock != null) {
|
|
||||||
onDiskSize = prevBlock.getNextBlockOnDiskSizeWithHeader();
|
|
||||||
}
|
|
||||||
HFileBlock block = reader.readBlock(offset, onDiskSize, false, true, false, true, null,
|
|
||||||
null);
|
|
||||||
BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
|
BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
|
||||||
boolean isCached = blockCache.getBlock(blockCacheKey, true, false, true) != null;
|
boolean isCached = blockCache.getBlock(blockCacheKey, true, false, true) != null;
|
||||||
if (block.getBlockType() == BlockType.DATA ||
|
if (block.getBlockType() == BlockType.DATA ||
|
||||||
|
@ -93,7 +87,6 @@ public class TestPrefetch {
|
||||||
block.getBlockType() == BlockType.INTERMEDIATE_INDEX) {
|
block.getBlockType() == BlockType.INTERMEDIATE_INDEX) {
|
||||||
assertTrue(isCached);
|
assertTrue(isCached);
|
||||||
}
|
}
|
||||||
prevBlock = block;
|
|
||||||
offset += block.getOnDiskSizeWithHeader();
|
offset += block.getOnDiskSizeWithHeader();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -227,15 +227,10 @@ public class TestCacheOnWriteInSchema {
|
||||||
assertTrue(testDescription, scanner.seekTo());
|
assertTrue(testDescription, scanner.seekTo());
|
||||||
// Cribbed from io.hfile.TestCacheOnWrite
|
// Cribbed from io.hfile.TestCacheOnWrite
|
||||||
long offset = 0;
|
long offset = 0;
|
||||||
HFileBlock prevBlock = null;
|
|
||||||
while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
|
while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
|
||||||
long onDiskSize = -1;
|
|
||||||
if (prevBlock != null) {
|
|
||||||
onDiskSize = prevBlock.getNextBlockOnDiskSizeWithHeader();
|
|
||||||
}
|
|
||||||
// Flags: don't cache the block, use pread, this is not a compaction.
|
// Flags: don't cache the block, use pread, this is not a compaction.
|
||||||
// Also, pass null for expected block type to avoid checking it.
|
// Also, pass null for expected block type to avoid checking it.
|
||||||
HFileBlock block = reader.readBlock(offset, onDiskSize, false, true,
|
HFileBlock block = reader.readBlock(offset, -1, false, true,
|
||||||
false, true, null, DataBlockEncoding.NONE);
|
false, true, null, DataBlockEncoding.NONE);
|
||||||
BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(),
|
BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(),
|
||||||
offset);
|
offset);
|
||||||
|
@ -249,7 +244,6 @@ public class TestCacheOnWriteInSchema {
|
||||||
"block: " + block + "\n" +
|
"block: " + block + "\n" +
|
||||||
"blockCacheKey: " + blockCacheKey);
|
"blockCacheKey: " + blockCacheKey);
|
||||||
}
|
}
|
||||||
prevBlock = block;
|
|
||||||
offset += block.getOnDiskSizeWithHeader();
|
offset += block.getOnDiskSizeWithHeader();
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
|
Loading…
Reference in New Issue