HBASE-7660 Remove HFileV1 code (Ted Yu)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1439753 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
657c26acb8
commit
0d95dc61e8
|
@ -165,6 +165,9 @@ public class HFile {
|
|||
public final static String DEFAULT_COMPRESSION =
|
||||
DEFAULT_COMPRESSION_ALGORITHM.getName();
|
||||
|
||||
/** Meta data block name for bloom filter bits. */
|
||||
public static final String BLOOM_FILTER_DATA_KEY = "BLOOM_FILTER_DATA";
|
||||
|
||||
/**
|
||||
* We assume that HFile path ends with
|
||||
* ROOT_DIR/TABLE_NAME/REGION_NAME/CF_NAME/HFILE, so it has at least this
|
||||
|
@ -447,8 +450,6 @@ public class HFile {
|
|||
CacheConfig cacheConf) {
|
||||
int version = getFormatVersion(conf);
|
||||
switch (version) {
|
||||
case 1:
|
||||
return new HFileWriterV1.WriterFactoryV1(conf, cacheConf);
|
||||
case 2:
|
||||
return new HFileWriterV2.WriterFactoryV2(conf, cacheConf);
|
||||
default:
|
||||
|
@ -557,9 +558,6 @@ public class HFile {
|
|||
throw new CorruptHFileException("Problem reading HFile Trailer from file " + path, iae);
|
||||
}
|
||||
switch (trailer.getMajorVersion()) {
|
||||
case 1:
|
||||
return new HFileReaderV1(path, trailer, fsdis, size, closeIStream,
|
||||
cacheConf);
|
||||
case 2:
|
||||
return new HFileReaderV2(path, trailer, fsdis, fsdisNoFsChecksum,
|
||||
size, closeIStream,
|
||||
|
|
|
@ -1316,110 +1316,6 @@ public class HFileBlock implements Cacheable {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads version 1 blocks from the file system. In version 1 blocks,
|
||||
* everything is compressed, including the magic record, if compression is
|
||||
* enabled. Everything might be uncompressed if no compression is used. This
|
||||
* reader returns blocks represented in the uniform version 2 format in
|
||||
* memory.
|
||||
*/
|
||||
static class FSReaderV1 extends AbstractFSReader {
|
||||
|
||||
/** Header size difference between version 1 and 2 */
|
||||
private static final int HEADER_DELTA = HEADER_SIZE_NO_CHECKSUM -
|
||||
MAGIC_LENGTH;
|
||||
|
||||
public FSReaderV1(FSDataInputStream istream, Algorithm compressAlgo,
|
||||
long fileSize) throws IOException {
|
||||
super(istream, istream, compressAlgo, fileSize, 0, null, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Read a version 1 block. There is no uncompressed header, and the block
|
||||
* type (the magic record) is part of the compressed data. This
|
||||
* implementation assumes that the bounded range file input stream is
|
||||
* needed to stop the decompressor reading into next block, because the
|
||||
* decompressor just grabs a bunch of data without regard to whether it is
|
||||
* coming to end of the compressed section.
|
||||
*
|
||||
* The block returned is still a version 2 block, and in particular, its
|
||||
* first {@link #HEADER_SIZE} bytes contain a valid version 2 header.
|
||||
*
|
||||
* @param offset the offset of the block to read in the file
|
||||
* @param onDiskSizeWithMagic the on-disk size of the version 1 block,
|
||||
* including the magic record, which is the part of compressed
|
||||
* data if using compression
|
||||
* @param uncompressedSizeWithMagic uncompressed size of the version 1
|
||||
* block, including the magic record
|
||||
*/
|
||||
@Override
|
||||
public HFileBlock readBlockData(long offset, long onDiskSizeWithMagic,
|
||||
int uncompressedSizeWithMagic, boolean pread) throws IOException {
|
||||
if (uncompressedSizeWithMagic <= 0) {
|
||||
throw new IOException("Invalid uncompressedSize="
|
||||
+ uncompressedSizeWithMagic + " for a version 1 block");
|
||||
}
|
||||
|
||||
if (onDiskSizeWithMagic <= 0 || onDiskSizeWithMagic >= Integer.MAX_VALUE)
|
||||
{
|
||||
throw new IOException("Invalid onDiskSize=" + onDiskSizeWithMagic
|
||||
+ " (maximum allowed: " + Integer.MAX_VALUE + ")");
|
||||
}
|
||||
|
||||
int onDiskSize = (int) onDiskSizeWithMagic;
|
||||
|
||||
if (uncompressedSizeWithMagic < MAGIC_LENGTH) {
|
||||
throw new IOException("Uncompressed size for a version 1 block is "
|
||||
+ uncompressedSizeWithMagic + " but must be at least "
|
||||
+ MAGIC_LENGTH);
|
||||
}
|
||||
|
||||
// The existing size already includes magic size, and we are inserting
|
||||
// a version 2 header.
|
||||
ByteBuffer buf = ByteBuffer.allocate(uncompressedSizeWithMagic
|
||||
+ HEADER_DELTA);
|
||||
|
||||
int onDiskSizeWithoutHeader;
|
||||
if (compressAlgo == Compression.Algorithm.NONE) {
|
||||
// A special case when there is no compression.
|
||||
if (onDiskSize != uncompressedSizeWithMagic) {
|
||||
throw new IOException("onDiskSize=" + onDiskSize
|
||||
+ " and uncompressedSize=" + uncompressedSizeWithMagic
|
||||
+ " must be equal for version 1 with no compression");
|
||||
}
|
||||
|
||||
// The first MAGIC_LENGTH bytes of what this will read will be
|
||||
// overwritten.
|
||||
readAtOffset(istream, buf.array(), buf.arrayOffset() + HEADER_DELTA,
|
||||
onDiskSize, false, offset, pread);
|
||||
|
||||
onDiskSizeWithoutHeader = uncompressedSizeWithMagic - MAGIC_LENGTH;
|
||||
} else {
|
||||
InputStream bufferedBoundedStream = createBufferedBoundedStream(
|
||||
offset, onDiskSize, pread);
|
||||
Compression.decompress(buf.array(), buf.arrayOffset()
|
||||
+ HEADER_DELTA, bufferedBoundedStream, onDiskSize,
|
||||
uncompressedSizeWithMagic, this.compressAlgo);
|
||||
|
||||
// We don't really have a good way to exclude the "magic record" size
|
||||
// from the compressed block's size, since it is compressed as well.
|
||||
onDiskSizeWithoutHeader = onDiskSize;
|
||||
}
|
||||
|
||||
BlockType newBlockType = BlockType.parse(buf.array(), buf.arrayOffset()
|
||||
+ HEADER_DELTA, MAGIC_LENGTH);
|
||||
|
||||
// We set the uncompressed size of the new HFile block we are creating
|
||||
// to the size of the data portion of the block without the magic record,
|
||||
// since the magic record gets moved to the header.
|
||||
HFileBlock b = new HFileBlock(newBlockType, onDiskSizeWithoutHeader,
|
||||
uncompressedSizeWithMagic - MAGIC_LENGTH, -1L, buf, FILL_HEADER,
|
||||
offset, MemStore.NO_PERSISTENT_TS, 0, 0, ChecksumType.NULL.getCode(),
|
||||
onDiskSizeWithoutHeader + HEADER_SIZE_NO_CHECKSUM);
|
||||
return b;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* We always prefetch the header of the next block, so that we know its
|
||||
* on-disk size in advance and can read it in one operation.
|
||||
|
|
|
@ -1,689 +0,0 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io.hfile;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.DataInput;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.RawComparator;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* {@link HFile} reader for version 1. Does not support data block encoding,
|
||||
* even in cache only, i.e. HFile v1 blocks are always brought into cache
|
||||
* unencoded.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class HFileReaderV1 extends AbstractHFileReader {
|
||||
private static final Log LOG = LogFactory.getLog(HFileReaderV1.class);
|
||||
|
||||
private volatile boolean fileInfoLoaded = false;
|
||||
|
||||
/**
|
||||
* Opens a HFile. You must load the index before you can
|
||||
* use it by calling {@link #loadFileInfo()}.
|
||||
*
|
||||
* @param fsdis input stream. Caller is responsible for closing the passed
|
||||
* stream.
|
||||
* @param size Length of the stream.
|
||||
* @param cacheConf cache references and configuration
|
||||
*/
|
||||
public HFileReaderV1(Path path, FixedFileTrailer trailer,
|
||||
final FSDataInputStream fsdis, final long size,
|
||||
final boolean closeIStream,
|
||||
final CacheConfig cacheConf) throws IOException {
|
||||
super(path, trailer, fsdis, size, closeIStream, cacheConf);
|
||||
|
||||
trailer.expectMajorVersion(1);
|
||||
fsBlockReader = new HFileBlock.FSReaderV1(fsdis, compressAlgo, fileSize);
|
||||
}
|
||||
|
||||
private byte[] readAllIndex(final FSDataInputStream in,
|
||||
final long indexOffset, final int indexSize) throws IOException {
|
||||
byte[] allIndex = new byte[indexSize];
|
||||
in.seek(indexOffset);
|
||||
IOUtils.readFully(in, allIndex, 0, allIndex.length);
|
||||
|
||||
return allIndex;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read in the index and file info.
|
||||
*
|
||||
* @return A map of fileinfo data.
|
||||
* @see Writer#appendFileInfo(byte[], byte[])
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public FileInfo loadFileInfo() throws IOException {
|
||||
if (fileInfoLoaded)
|
||||
return fileInfo;
|
||||
|
||||
// Read in the fileinfo and get what we need from it.
|
||||
istream.seek(trailer.getFileInfoOffset());
|
||||
fileInfo = new FileInfo();
|
||||
fileInfo.read(istream);
|
||||
lastKey = fileInfo.get(FileInfo.LASTKEY);
|
||||
avgKeyLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_KEY_LEN));
|
||||
avgValueLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_VALUE_LEN));
|
||||
|
||||
// Comparator is stored in the file info in version 1.
|
||||
String clazzName = Bytes.toString(fileInfo.get(FileInfo.COMPARATOR));
|
||||
comparator = getComparator(clazzName);
|
||||
|
||||
dataBlockIndexReader =
|
||||
new HFileBlockIndex.BlockIndexReader(comparator, 1);
|
||||
metaBlockIndexReader =
|
||||
new HFileBlockIndex.BlockIndexReader(Bytes.BYTES_RAWCOMPARATOR, 1);
|
||||
|
||||
int sizeToLoadOnOpen = (int) (fileSize - trailer.getLoadOnOpenDataOffset() -
|
||||
trailer.getTrailerSize());
|
||||
byte[] dataAndMetaIndex = readAllIndex(istream,
|
||||
trailer.getLoadOnOpenDataOffset(), sizeToLoadOnOpen);
|
||||
|
||||
ByteArrayInputStream bis = new ByteArrayInputStream(dataAndMetaIndex);
|
||||
DataInputStream dis = new DataInputStream(bis);
|
||||
|
||||
// Read in the data index.
|
||||
if (trailer.getDataIndexCount() > 0)
|
||||
BlockType.INDEX_V1.readAndCheck(dis);
|
||||
dataBlockIndexReader.readRootIndex(dis, trailer.getDataIndexCount());
|
||||
|
||||
// Read in the metadata index.
|
||||
if (trailer.getMetaIndexCount() > 0)
|
||||
BlockType.INDEX_V1.readAndCheck(dis);
|
||||
metaBlockIndexReader.readRootIndex(dis, trailer.getMetaIndexCount());
|
||||
|
||||
fileInfoLoaded = true;
|
||||
return fileInfo;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates comparator from the given class name.
|
||||
*
|
||||
* @param clazzName the comparator class name read from the trailer
|
||||
* @return an instance of the comparator to use
|
||||
* @throws IOException in case comparator class name is invalid
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private RawComparator<byte[]> getComparator(final String clazzName)
|
||||
throws IOException {
|
||||
if (clazzName == null || clazzName.length() == 0) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
return (RawComparator<byte[]>)Class.forName(clazzName).newInstance();
|
||||
} catch (InstantiationException e) {
|
||||
throw new IOException(e);
|
||||
} catch (IllegalAccessException e) {
|
||||
throw new IOException(e);
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a Scanner on this file. No seeks or reads are done on creation. Call
|
||||
* {@link HFileScanner#seekTo(byte[])} to position an start the read. There is
|
||||
* nothing to clean up in a Scanner. Letting go of your references to the
|
||||
* scanner is sufficient.
|
||||
*
|
||||
* @param cacheBlocks True if we should cache blocks read in by this scanner.
|
||||
* @param pread Use positional read rather than seek+read if true (pread is
|
||||
* better for random reads, seek+read is better scanning).
|
||||
* @param isCompaction is scanner being used for a compaction?
|
||||
* @return Scanner on this file.
|
||||
*/
|
||||
@Override
|
||||
public HFileScanner getScanner(boolean cacheBlocks, final boolean pread,
|
||||
final boolean isCompaction) {
|
||||
return new ScannerV1(this, cacheBlocks, pread, isCompaction);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param key Key to search.
|
||||
* @return Block number of the block containing the key or -1 if not in this
|
||||
* file.
|
||||
*/
|
||||
protected int blockContainingKey(final byte[] key, int offset, int length) {
|
||||
Preconditions.checkState(!dataBlockIndexReader.isEmpty(),
|
||||
"Block index not loaded");
|
||||
return dataBlockIndexReader.rootBlockContainingKey(key, offset, length);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param metaBlockName
|
||||
* @param cacheBlock Add block to cache, if found
|
||||
* @return Block wrapped in a ByteBuffer
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public ByteBuffer getMetaBlock(String metaBlockName, boolean cacheBlock)
|
||||
throws IOException {
|
||||
if (trailer.getMetaIndexCount() == 0) {
|
||||
return null; // there are no meta blocks
|
||||
}
|
||||
if (metaBlockIndexReader == null) {
|
||||
throw new IOException("Meta index not loaded");
|
||||
}
|
||||
|
||||
byte[] nameBytes = Bytes.toBytes(metaBlockName);
|
||||
int block = metaBlockIndexReader.rootBlockContainingKey(nameBytes, 0,
|
||||
nameBytes.length);
|
||||
if (block == -1)
|
||||
return null;
|
||||
long offset = metaBlockIndexReader.getRootBlockOffset(block);
|
||||
long nextOffset;
|
||||
if (block == metaBlockIndexReader.getRootBlockCount() - 1) {
|
||||
nextOffset = trailer.getFileInfoOffset();
|
||||
} else {
|
||||
nextOffset = metaBlockIndexReader.getRootBlockOffset(block + 1);
|
||||
}
|
||||
|
||||
long startTimeNs = System.nanoTime();
|
||||
|
||||
BlockCacheKey cacheKey = new BlockCacheKey(name, offset,
|
||||
DataBlockEncoding.NONE, BlockType.META);
|
||||
|
||||
BlockCategory effectiveCategory = BlockCategory.META;
|
||||
if (metaBlockName.equals(HFileWriterV1.BLOOM_FILTER_META_KEY) ||
|
||||
metaBlockName.equals(HFileWriterV1.BLOOM_FILTER_DATA_KEY)) {
|
||||
effectiveCategory = BlockCategory.BLOOM;
|
||||
}
|
||||
|
||||
// Per meta key from any given file, synchronize reads for said block
|
||||
synchronized (metaBlockIndexReader.getRootBlockKey(block)) {
|
||||
// Check cache for block. If found return.
|
||||
if (cacheConf.isBlockCacheEnabled()) {
|
||||
HFileBlock cachedBlock =
|
||||
(HFileBlock) cacheConf.getBlockCache().getBlock(cacheKey,
|
||||
cacheConf.shouldCacheBlockOnRead(effectiveCategory), false);
|
||||
if (cachedBlock != null) {
|
||||
return cachedBlock.getBufferWithoutHeader();
|
||||
}
|
||||
// Cache Miss, please load.
|
||||
}
|
||||
|
||||
HFileBlock hfileBlock = fsBlockReader.readBlockData(offset,
|
||||
nextOffset - offset, metaBlockIndexReader.getRootBlockDataSize(block),
|
||||
true);
|
||||
hfileBlock.expectType(BlockType.META);
|
||||
|
||||
final long delta = System.nanoTime() - startTimeNs;
|
||||
HFile.offerReadLatency(delta, true);
|
||||
|
||||
// Cache the block
|
||||
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(effectiveCategory)) {
|
||||
cacheConf.getBlockCache().cacheBlock(cacheKey, hfileBlock,
|
||||
cacheConf.isInMemory());
|
||||
}
|
||||
|
||||
return hfileBlock.getBufferWithoutHeader();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Read in a file block.
|
||||
* @param block Index of block to read.
|
||||
* @param pread Use positional read instead of seek+read (positional is
|
||||
* better doing random reads whereas seek+read is better scanning).
|
||||
* @param isCompaction is this block being read as part of a compaction
|
||||
* @return Block wrapped in a ByteBuffer.
|
||||
* @throws IOException
|
||||
*/
|
||||
ByteBuffer readBlockBuffer(int block, boolean cacheBlock,
|
||||
final boolean pread, final boolean isCompaction) throws IOException {
|
||||
if (dataBlockIndexReader == null) {
|
||||
throw new IOException("Block index not loaded");
|
||||
}
|
||||
if (block < 0 || block >= dataBlockIndexReader.getRootBlockCount()) {
|
||||
throw new IOException("Requested block is out of range: " + block +
|
||||
", max: " + dataBlockIndexReader.getRootBlockCount());
|
||||
}
|
||||
|
||||
long offset = dataBlockIndexReader.getRootBlockOffset(block);
|
||||
BlockCacheKey cacheKey = new BlockCacheKey(name, offset);
|
||||
|
||||
// For any given block from any given file, synchronize reads for said
|
||||
// block.
|
||||
// Without a cache, this synchronizing is needless overhead, but really
|
||||
// the other choice is to duplicate work (which the cache would prevent you
|
||||
// from doing).
|
||||
synchronized (dataBlockIndexReader.getRootBlockKey(block)) {
|
||||
// Check cache for block. If found return.
|
||||
if (cacheConf.isBlockCacheEnabled()) {
|
||||
HFileBlock cachedBlock =
|
||||
(HFileBlock) cacheConf.getBlockCache().getBlock(cacheKey,
|
||||
cacheConf.shouldCacheDataOnRead(), false);
|
||||
if (cachedBlock != null) {
|
||||
return cachedBlock.getBufferWithoutHeader();
|
||||
}
|
||||
// Carry on, please load.
|
||||
}
|
||||
|
||||
// Load block from filesystem.
|
||||
long startTimeNs = System.nanoTime();
|
||||
long nextOffset;
|
||||
|
||||
if (block == dataBlockIndexReader.getRootBlockCount() - 1) {
|
||||
// last block! The end of data block is first meta block if there is
|
||||
// one or if there isn't, the fileinfo offset.
|
||||
nextOffset = (metaBlockIndexReader.getRootBlockCount() == 0) ?
|
||||
this.trailer.getFileInfoOffset() :
|
||||
metaBlockIndexReader.getRootBlockOffset(0);
|
||||
} else {
|
||||
nextOffset = dataBlockIndexReader.getRootBlockOffset(block + 1);
|
||||
}
|
||||
|
||||
HFileBlock hfileBlock = fsBlockReader.readBlockData(offset, nextOffset
|
||||
- offset, dataBlockIndexReader.getRootBlockDataSize(block), pread);
|
||||
hfileBlock.expectType(BlockType.DATA);
|
||||
|
||||
final long delta = System.nanoTime() - startTimeNs;
|
||||
HFile.offerReadLatency(delta, pread);
|
||||
|
||||
// Cache the block
|
||||
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(
|
||||
hfileBlock.getBlockType().getCategory())) {
|
||||
cacheConf.getBlockCache().cacheBlock(cacheKey, hfileBlock,
|
||||
cacheConf.isInMemory());
|
||||
}
|
||||
return hfileBlock.getBufferWithoutHeader();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Last key in the file. May be null if file has no entries.
|
||||
* Note that this is not the last rowkey, but rather the byte form of
|
||||
* the last KeyValue.
|
||||
*/
|
||||
public byte[] getLastKey() {
|
||||
if (!fileInfoLoaded) {
|
||||
throw new RuntimeException("Load file info first");
|
||||
}
|
||||
return dataBlockIndexReader.isEmpty() ? null : lastKey;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Midkey for this file. We work with block boundaries only so
|
||||
* returned midkey is an approximation only.
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public byte[] midkey() throws IOException {
|
||||
Preconditions.checkState(isFileInfoLoaded(), "File info is not loaded");
|
||||
Preconditions.checkState(!dataBlockIndexReader.isEmpty(),
|
||||
"Data block index is not loaded or is empty");
|
||||
return dataBlockIndexReader.midkey();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
close(cacheConf.shouldEvictOnClose());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close(boolean evictOnClose) throws IOException {
|
||||
if (evictOnClose && cacheConf.isBlockCacheEnabled()) {
|
||||
int numEvicted = 0;
|
||||
for (int i = 0; i < dataBlockIndexReader.getRootBlockCount(); i++) {
|
||||
if (cacheConf.getBlockCache().evictBlock(
|
||||
new BlockCacheKey(name,
|
||||
dataBlockIndexReader.getRootBlockOffset(i),
|
||||
DataBlockEncoding.NONE, BlockType.DATA))) {
|
||||
numEvicted++;
|
||||
}
|
||||
}
|
||||
LOG.debug("On close of file " + name + " evicted " + numEvicted
|
||||
+ " block(s) of " + dataBlockIndexReader.getRootBlockCount()
|
||||
+ " total blocks");
|
||||
}
|
||||
if (this.closeIStream && this.istream != null) {
|
||||
this.istream.close();
|
||||
this.istream = null;
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract static class AbstractScannerV1
|
||||
extends AbstractHFileReader.Scanner {
|
||||
protected int currBlock;
|
||||
|
||||
/**
|
||||
* This masks a field with the same name in the superclass and saves us the
|
||||
* runtime overhead of casting from abstract reader to reader V1.
|
||||
*/
|
||||
protected HFileReaderV1 reader;
|
||||
|
||||
public AbstractScannerV1(HFileReaderV1 reader, boolean cacheBlocks,
|
||||
final boolean pread, final boolean isCompaction) {
|
||||
super(reader, cacheBlocks, pread, isCompaction);
|
||||
this.reader = (HFileReaderV1) reader;
|
||||
}
|
||||
|
||||
/**
|
||||
* Within a loaded block, seek looking for the first key
|
||||
* that is smaller than (or equal to?) the key we are interested in.
|
||||
*
|
||||
* A note on the seekBefore - if you have seekBefore = true, AND the
|
||||
* first key in the block = key, then you'll get thrown exceptions.
|
||||
* @param key to find
|
||||
* @param seekBefore find the key before the exact match.
|
||||
*/
|
||||
protected abstract int blockSeek(byte[] key, int offset, int length,
|
||||
boolean seekBefore);
|
||||
|
||||
protected abstract void loadBlock(int bloc, boolean rewind)
|
||||
throws IOException;
|
||||
|
||||
@Override
|
||||
public int seekTo(byte[] key, int offset, int length) throws IOException {
|
||||
int b = reader.blockContainingKey(key, offset, length);
|
||||
if (b < 0) return -1; // falls before the beginning of the file! :-(
|
||||
// Avoid re-reading the same block (that'd be dumb).
|
||||
loadBlock(b, true);
|
||||
return blockSeek(key, offset, length, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int reseekTo(byte[] key, int offset, int length)
|
||||
throws IOException {
|
||||
if (blockBuffer != null && currKeyLen != 0) {
|
||||
ByteBuffer bb = getKey();
|
||||
int compared = reader.getComparator().compare(key, offset,
|
||||
length, bb.array(), bb.arrayOffset(), bb.limit());
|
||||
if (compared < 1) {
|
||||
// If the required key is less than or equal to current key, then
|
||||
// don't do anything.
|
||||
return compared;
|
||||
}
|
||||
}
|
||||
|
||||
int b = reader.blockContainingKey(key, offset, length);
|
||||
if (b < 0) {
|
||||
return -1;
|
||||
}
|
||||
loadBlock(b, false);
|
||||
return blockSeek(key, offset, length, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean seekBefore(byte[] key, int offset, int length)
|
||||
throws IOException {
|
||||
int b = reader.blockContainingKey(key, offset, length);
|
||||
if (b < 0)
|
||||
return false; // key is before the start of the file.
|
||||
|
||||
// Question: does this block begin with 'key'?
|
||||
byte[] firstkKey = reader.getDataBlockIndexReader().getRootBlockKey(b);
|
||||
if (reader.getComparator().compare(firstkKey, 0, firstkKey.length,
|
||||
key, offset, length) == 0) {
|
||||
// Ok the key we're interested in is the first of the block, so go back
|
||||
// by one.
|
||||
if (b == 0) {
|
||||
// we have a 'problem', the key we want is the first of the file.
|
||||
return false;
|
||||
}
|
||||
b--;
|
||||
// TODO shortcut: seek forward in this block to the last key of the
|
||||
// block.
|
||||
}
|
||||
loadBlock(b, true);
|
||||
blockSeek(key, offset, length, true);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Implementation of {@link HFileScanner} interface.
|
||||
*/
|
||||
|
||||
protected static class ScannerV1 extends AbstractScannerV1 {
|
||||
private HFileReaderV1 reader;
|
||||
|
||||
public ScannerV1(HFileReaderV1 reader, boolean cacheBlocks,
|
||||
final boolean pread, final boolean isCompaction) {
|
||||
super(reader, cacheBlocks, pread, isCompaction);
|
||||
this.reader = reader;
|
||||
}
|
||||
|
||||
@Override
|
||||
public KeyValue getKeyValue() {
|
||||
if (blockBuffer == null) {
|
||||
return null;
|
||||
}
|
||||
return new KeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
|
||||
+ blockBuffer.position() - 8);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer getKey() {
|
||||
Preconditions.checkState(blockBuffer != null && currKeyLen > 0,
|
||||
"you need to seekTo() before calling getKey()");
|
||||
|
||||
ByteBuffer keyBuff = blockBuffer.slice();
|
||||
keyBuff.limit(currKeyLen);
|
||||
keyBuff.rewind();
|
||||
// Do keyBuff.asReadOnly()?
|
||||
return keyBuff;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer getValue() {
|
||||
if (blockBuffer == null || currKeyLen == 0) {
|
||||
throw new RuntimeException(
|
||||
"you need to seekTo() before calling getValue()");
|
||||
}
|
||||
|
||||
// TODO: Could this be done with one ByteBuffer rather than create two?
|
||||
ByteBuffer valueBuff = blockBuffer.slice();
|
||||
valueBuff.position(currKeyLen);
|
||||
valueBuff = valueBuff.slice();
|
||||
valueBuff.limit(currValueLen);
|
||||
valueBuff.rewind();
|
||||
return valueBuff;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean next() throws IOException {
|
||||
if (blockBuffer == null) {
|
||||
throw new IOException("Next called on non-seeked scanner");
|
||||
}
|
||||
|
||||
try {
|
||||
blockBuffer.position(blockBuffer.position() + currKeyLen
|
||||
+ currValueLen);
|
||||
} catch (IllegalArgumentException e) {
|
||||
LOG.error("Current pos = " + blockBuffer.position() +
|
||||
"; currKeyLen = " + currKeyLen +
|
||||
"; currValLen = " + currValueLen +
|
||||
"; block limit = " + blockBuffer.limit() +
|
||||
"; HFile name = " + reader.getName() +
|
||||
"; currBlock id = " + currBlock, e);
|
||||
throw e;
|
||||
}
|
||||
if (blockBuffer.remaining() <= 0) {
|
||||
currBlock++;
|
||||
if (currBlock >= reader.getDataBlockIndexReader().getRootBlockCount()) {
|
||||
// damn we are at the end
|
||||
currBlock = 0;
|
||||
blockBuffer = null;
|
||||
return false;
|
||||
}
|
||||
blockBuffer = reader.readBlockBuffer(currBlock, cacheBlocks, pread,
|
||||
isCompaction);
|
||||
currKeyLen = blockBuffer.getInt();
|
||||
currValueLen = blockBuffer.getInt();
|
||||
blockFetches++;
|
||||
return true;
|
||||
}
|
||||
|
||||
currKeyLen = blockBuffer.getInt();
|
||||
currValueLen = blockBuffer.getInt();
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int blockSeek(byte[] key, int offset, int length,
|
||||
boolean seekBefore) {
|
||||
int klen, vlen;
|
||||
int lastLen = 0;
|
||||
do {
|
||||
klen = blockBuffer.getInt();
|
||||
vlen = blockBuffer.getInt();
|
||||
int comp = reader.getComparator().compare(key, offset, length,
|
||||
blockBuffer.array(),
|
||||
blockBuffer.arrayOffset() + blockBuffer.position(), klen);
|
||||
if (comp == 0) {
|
||||
if (seekBefore) {
|
||||
blockBuffer.position(blockBuffer.position() - lastLen - 16);
|
||||
currKeyLen = blockBuffer.getInt();
|
||||
currValueLen = blockBuffer.getInt();
|
||||
return 1; // non exact match.
|
||||
}
|
||||
currKeyLen = klen;
|
||||
currValueLen = vlen;
|
||||
return 0; // indicate exact match
|
||||
}
|
||||
if (comp < 0) {
|
||||
// go back one key:
|
||||
blockBuffer.position(blockBuffer.position() - lastLen - 16);
|
||||
currKeyLen = blockBuffer.getInt();
|
||||
currValueLen = blockBuffer.getInt();
|
||||
return 1;
|
||||
}
|
||||
blockBuffer.position(blockBuffer.position() + klen + vlen);
|
||||
lastLen = klen + vlen;
|
||||
} while (blockBuffer.remaining() > 0);
|
||||
|
||||
// ok we are at the end, so go back a littleeeeee....
|
||||
// The 8 in the below is intentionally different to the 16s in the above
|
||||
// Do the math you you'll figure it.
|
||||
blockBuffer.position(blockBuffer.position() - lastLen - 8);
|
||||
currKeyLen = blockBuffer.getInt();
|
||||
currValueLen = blockBuffer.getInt();
|
||||
return 1; // didn't exactly find it.
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getKeyString() {
|
||||
return Bytes.toStringBinary(blockBuffer.array(),
|
||||
blockBuffer.arrayOffset() + blockBuffer.position(), currKeyLen);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getValueString() {
|
||||
return Bytes.toString(blockBuffer.array(), blockBuffer.arrayOffset() +
|
||||
blockBuffer.position() + currKeyLen, currValueLen);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean seekTo() throws IOException {
|
||||
if (reader.getDataBlockIndexReader().isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
if (blockBuffer != null && currBlock == 0) {
|
||||
blockBuffer.rewind();
|
||||
currKeyLen = blockBuffer.getInt();
|
||||
currValueLen = blockBuffer.getInt();
|
||||
return true;
|
||||
}
|
||||
currBlock = 0;
|
||||
blockBuffer = reader.readBlockBuffer(currBlock, cacheBlocks, pread,
|
||||
isCompaction);
|
||||
currKeyLen = blockBuffer.getInt();
|
||||
currValueLen = blockBuffer.getInt();
|
||||
blockFetches++;
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void loadBlock(int bloc, boolean rewind) throws IOException {
|
||||
if (blockBuffer == null) {
|
||||
blockBuffer = reader.readBlockBuffer(bloc, cacheBlocks, pread,
|
||||
isCompaction);
|
||||
currBlock = bloc;
|
||||
blockFetches++;
|
||||
} else {
|
||||
if (bloc != currBlock) {
|
||||
blockBuffer = reader.readBlockBuffer(bloc, cacheBlocks, pread,
|
||||
isCompaction);
|
||||
currBlock = bloc;
|
||||
blockFetches++;
|
||||
} else {
|
||||
// we are already in the same block, just rewind to seek again.
|
||||
if (rewind) {
|
||||
blockBuffer.rewind();
|
||||
}
|
||||
else {
|
||||
// Go back by (size of rowlength + size of valuelength) = 8 bytes
|
||||
blockBuffer.position(blockBuffer.position()-8);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public HFileBlock readBlock(long offset, long onDiskBlockSize,
|
||||
boolean cacheBlock, boolean pread, boolean isCompaction,
|
||||
BlockType expectedBlockType) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataInput getGeneralBloomFilterMetadata() throws IOException {
|
||||
// Shouldn't cache Bloom filter blocks, otherwise server would abort when
|
||||
// splitting, see HBASE-6479
|
||||
ByteBuffer buf = getMetaBlock(HFileWriterV1.BLOOM_FILTER_META_KEY, false);
|
||||
if (buf == null)
|
||||
return null;
|
||||
ByteArrayInputStream bais = new ByteArrayInputStream(buf.array(),
|
||||
buf.arrayOffset(), buf.limit());
|
||||
return new DataInputStream(bais);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataInput getDeleteBloomFilterMetadata() throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isFileInfoLoaded() {
|
||||
return fileInfoLoaded;
|
||||
}
|
||||
|
||||
}
|
|
@ -1,448 +0,0 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io.hfile;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValue.KeyComparator;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
|
||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
|
||||
import org.apache.hadoop.hbase.regionserver.MemStore;
|
||||
import org.apache.hadoop.hbase.util.ChecksumType;
|
||||
import org.apache.hadoop.hbase.util.BloomFilterWriter;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.io.compress.Compressor;
|
||||
|
||||
/**
|
||||
* Writes version 1 HFiles. Mainly used for testing backwards-compatibility.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class HFileWriterV1 extends AbstractHFileWriter {
|
||||
|
||||
/** Meta data block name for bloom filter parameters. */
|
||||
static final String BLOOM_FILTER_META_KEY = "BLOOM_FILTER_META";
|
||||
|
||||
/** Meta data block name for bloom filter bits. */
|
||||
public static final String BLOOM_FILTER_DATA_KEY = "BLOOM_FILTER_DATA";
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(HFileWriterV1.class);
|
||||
|
||||
// A stream made per block written.
|
||||
private DataOutputStream out;
|
||||
|
||||
// Offset where the current block began.
|
||||
private long blockBegin;
|
||||
|
||||
// First keys of every block.
|
||||
private ArrayList<byte[]> blockKeys = new ArrayList<byte[]>();
|
||||
|
||||
// Block offset in backing stream.
|
||||
private ArrayList<Long> blockOffsets = new ArrayList<Long>();
|
||||
|
||||
// Raw (decompressed) data size.
|
||||
private ArrayList<Integer> blockDataSizes = new ArrayList<Integer>();
|
||||
|
||||
private Compressor compressor;
|
||||
|
||||
// Additional byte array output stream used to fill block cache
|
||||
private ByteArrayOutputStream baos;
|
||||
private DataOutputStream baosDos;
|
||||
|
||||
static class WriterFactoryV1 extends HFile.WriterFactory {
|
||||
WriterFactoryV1(Configuration conf, CacheConfig cacheConf) {
|
||||
super(conf, cacheConf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Writer createWriter(FileSystem fs, Path path,
|
||||
FSDataOutputStream ostream, int blockSize,
|
||||
Algorithm compressAlgo, HFileDataBlockEncoder dataBlockEncoder,
|
||||
KeyComparator comparator, final ChecksumType checksumType,
|
||||
final int bytesPerChecksum) throws IOException {
|
||||
// version 1 does not implement checksums
|
||||
return new HFileWriterV1(conf, cacheConf, fs, path, ostream, blockSize,
|
||||
compressAlgo, dataBlockEncoder, comparator);
|
||||
}
|
||||
}
|
||||
|
||||
/** Constructor that takes a path, creates and closes the output stream. */
|
||||
public HFileWriterV1(Configuration conf, CacheConfig cacheConf,
|
||||
FileSystem fs, Path path, FSDataOutputStream ostream,
|
||||
int blockSize, Compression.Algorithm compress,
|
||||
HFileDataBlockEncoder blockEncoder,
|
||||
final KeyComparator comparator) throws IOException {
|
||||
super(cacheConf, ostream == null ? createOutputStream(conf, fs, path) : ostream, path,
|
||||
blockSize, compress, blockEncoder, comparator);
|
||||
}
|
||||
|
||||
/**
|
||||
* If at block boundary, opens new block.
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
private void checkBlockBoundary() throws IOException {
|
||||
if (this.out != null && this.out.size() < blockSize)
|
||||
return;
|
||||
finishBlock();
|
||||
newBlock();
|
||||
}
|
||||
|
||||
/**
|
||||
* Do the cleanup if a current block.
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
private void finishBlock() throws IOException {
|
||||
if (this.out == null)
|
||||
return;
|
||||
long startTimeNs = System.nanoTime();
|
||||
|
||||
int size = releaseCompressingStream(this.out);
|
||||
this.out = null;
|
||||
blockKeys.add(firstKeyInBlock);
|
||||
blockOffsets.add(Long.valueOf(blockBegin));
|
||||
blockDataSizes.add(Integer.valueOf(size));
|
||||
this.totalUncompressedBytes += size;
|
||||
|
||||
HFile.offerWriteLatency(System.nanoTime() - startTimeNs);
|
||||
|
||||
if (cacheConf.shouldCacheDataOnWrite()) {
|
||||
baosDos.flush();
|
||||
// we do not do data block encoding on disk for HFile v1
|
||||
byte[] bytes = baos.toByteArray();
|
||||
HFileBlock block = new HFileBlock(BlockType.DATA,
|
||||
(int) (outputStream.getPos() - blockBegin), bytes.length, -1,
|
||||
ByteBuffer.wrap(bytes, 0, bytes.length), HFileBlock.FILL_HEADER,
|
||||
blockBegin, MemStore.NO_PERSISTENT_TS,
|
||||
HFileBlock.MINOR_VERSION_NO_CHECKSUM, // minor version
|
||||
0, // bytesPerChecksum
|
||||
ChecksumType.NULL.getCode(), // checksum type
|
||||
(int) (outputStream.getPos() - blockBegin) +
|
||||
HFileBlock.HEADER_SIZE_NO_CHECKSUM); // onDiskDataSizeWithHeader
|
||||
|
||||
block = blockEncoder.diskToCacheFormat(block, false);
|
||||
cacheConf.getBlockCache().cacheBlock(
|
||||
new BlockCacheKey(name, blockBegin, DataBlockEncoding.NONE,
|
||||
block.getBlockType()), block);
|
||||
baosDos.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Ready a new block for writing.
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
private void newBlock() throws IOException {
|
||||
// This is where the next block begins.
|
||||
blockBegin = outputStream.getPos();
|
||||
this.out = getCompressingStream();
|
||||
BlockType.DATA.write(out);
|
||||
firstKeyInBlock = null;
|
||||
if (cacheConf.shouldCacheDataOnWrite()) {
|
||||
this.baos = new ByteArrayOutputStream();
|
||||
this.baosDos = new DataOutputStream(baos);
|
||||
baosDos.write(HFileBlock.DUMMY_HEADER_NO_CHECKSUM);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets up a compressor and creates a compression stream on top of
|
||||
* this.outputStream. Get one per block written.
|
||||
*
|
||||
* @return A compressing stream; if 'none' compression, returned stream does
|
||||
* not compress.
|
||||
*
|
||||
* @throws IOException
|
||||
*
|
||||
* @see {@link #releaseCompressingStream(DataOutputStream)}
|
||||
*/
|
||||
private DataOutputStream getCompressingStream() throws IOException {
|
||||
this.compressor = compressAlgo.getCompressor();
|
||||
// Get new DOS compression stream. In tfile, the DOS, is not closed,
|
||||
// just finished, and that seems to be fine over there. TODO: Check
|
||||
// no memory retention of the DOS. Should I disable the 'flush' on the
|
||||
// DOS as the BCFile over in tfile does? It wants to make it so flushes
|
||||
// don't go through to the underlying compressed stream. Flush on the
|
||||
// compressed downstream should be only when done. I was going to but
|
||||
// looks like when we call flush in here, its legitimate flush that
|
||||
// should go through to the compressor.
|
||||
OutputStream os = this.compressAlgo.createCompressionStream(
|
||||
this.outputStream, this.compressor, 0);
|
||||
return new DataOutputStream(os);
|
||||
}
|
||||
|
||||
/**
|
||||
* Let go of block compressor and compressing stream gotten in call {@link
|
||||
* #getCompressingStream}.
|
||||
*
|
||||
* @param dos
|
||||
*
|
||||
* @return How much was written on this stream since it was taken out.
|
||||
*
|
||||
* @see #getCompressingStream()
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
private int releaseCompressingStream(final DataOutputStream dos)
|
||||
throws IOException {
|
||||
dos.flush();
|
||||
this.compressAlgo.returnCompressor(this.compressor);
|
||||
this.compressor = null;
|
||||
return dos.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a meta block to the end of the file. Call before close(). Metadata
|
||||
* blocks are expensive. Fill one with a bunch of serialized data rather than
|
||||
* do a metadata block per metadata instance. If metadata is small, consider
|
||||
* adding to file info using {@link #appendFileInfo(byte[], byte[])}
|
||||
*
|
||||
* @param metaBlockName
|
||||
* name of the block
|
||||
* @param content
|
||||
* will call readFields to get data later (DO NOT REUSE)
|
||||
*/
|
||||
public void appendMetaBlock(String metaBlockName, Writable content) {
|
||||
byte[] key = Bytes.toBytes(metaBlockName);
|
||||
int i;
|
||||
for (i = 0; i < metaNames.size(); ++i) {
|
||||
// stop when the current key is greater than our own
|
||||
byte[] cur = metaNames.get(i);
|
||||
if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0,
|
||||
key.length) > 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
metaNames.add(i, key);
|
||||
metaData.add(i, content);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add key/value to file. Keys must be added in an order that agrees with the
|
||||
* Comparator passed on construction.
|
||||
*
|
||||
* @param kv
|
||||
* KeyValue to add. Cannot be empty nor null.
|
||||
* @throws IOException
|
||||
*/
|
||||
public void append(final KeyValue kv) throws IOException {
|
||||
append(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(),
|
||||
kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
|
||||
}
|
||||
|
||||
/**
|
||||
* Add key/value to file. Keys must be added in an order that agrees with the
|
||||
* Comparator passed on construction.
|
||||
*
|
||||
* @param key
|
||||
* Key to add. Cannot be empty nor null.
|
||||
* @param value
|
||||
* Value to add. Cannot be empty nor null.
|
||||
* @throws IOException
|
||||
*/
|
||||
public void append(final byte[] key, final byte[] value) throws IOException {
|
||||
append(key, 0, key.length, value, 0, value.length);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add key/value to file. Keys must be added in an order that agrees with the
|
||||
* Comparator passed on construction.
|
||||
*
|
||||
* @param key
|
||||
* @param koffset
|
||||
* @param klength
|
||||
* @param value
|
||||
* @param voffset
|
||||
* @param vlength
|
||||
* @throws IOException
|
||||
*/
|
||||
private void append(final byte[] key, final int koffset, final int klength,
|
||||
final byte[] value, final int voffset, final int vlength)
|
||||
throws IOException {
|
||||
boolean dupKey = checkKey(key, koffset, klength);
|
||||
checkValue(value, voffset, vlength);
|
||||
if (!dupKey) {
|
||||
checkBlockBoundary();
|
||||
}
|
||||
// Write length of key and value and then actual key and value bytes.
|
||||
this.out.writeInt(klength);
|
||||
totalKeyLength += klength;
|
||||
this.out.writeInt(vlength);
|
||||
totalValueLength += vlength;
|
||||
this.out.write(key, koffset, klength);
|
||||
this.out.write(value, voffset, vlength);
|
||||
// Are we the first key in this block?
|
||||
if (this.firstKeyInBlock == null) {
|
||||
// Copy the key.
|
||||
this.firstKeyInBlock = new byte[klength];
|
||||
System.arraycopy(key, koffset, this.firstKeyInBlock, 0, klength);
|
||||
}
|
||||
this.lastKeyBuffer = key;
|
||||
this.lastKeyOffset = koffset;
|
||||
this.lastKeyLength = klength;
|
||||
this.entryCount++;
|
||||
// If we are pre-caching blocks on write, fill byte array stream
|
||||
if (cacheConf.shouldCacheDataOnWrite()) {
|
||||
this.baosDos.writeInt(klength);
|
||||
this.baosDos.writeInt(vlength);
|
||||
this.baosDos.write(key, koffset, klength);
|
||||
this.baosDos.write(value, voffset, vlength);
|
||||
}
|
||||
}
|
||||
|
||||
public void close() throws IOException {
|
||||
if (this.outputStream == null) {
|
||||
return;
|
||||
}
|
||||
// Save data block encoder metadata in the file info.
|
||||
blockEncoder.saveMetadata(this);
|
||||
// Write out the end of the data blocks, then write meta data blocks.
|
||||
// followed by fileinfo, data block index and meta block index.
|
||||
|
||||
finishBlock();
|
||||
|
||||
FixedFileTrailer trailer = new FixedFileTrailer(1,
|
||||
HFileBlock.MINOR_VERSION_NO_CHECKSUM);
|
||||
|
||||
// Write out the metadata blocks if any.
|
||||
ArrayList<Long> metaOffsets = null;
|
||||
ArrayList<Integer> metaDataSizes = null;
|
||||
if (metaNames.size() > 0) {
|
||||
metaOffsets = new ArrayList<Long>(metaNames.size());
|
||||
metaDataSizes = new ArrayList<Integer>(metaNames.size());
|
||||
for (int i = 0; i < metaNames.size(); ++i) {
|
||||
// store the beginning offset
|
||||
long curPos = outputStream.getPos();
|
||||
metaOffsets.add(curPos);
|
||||
// write the metadata content
|
||||
DataOutputStream dos = getCompressingStream();
|
||||
BlockType.META.write(dos);
|
||||
metaData.get(i).write(dos);
|
||||
int size = releaseCompressingStream(dos);
|
||||
// store the metadata size
|
||||
metaDataSizes.add(size);
|
||||
}
|
||||
}
|
||||
|
||||
writeFileInfo(trailer, outputStream);
|
||||
|
||||
// Write the data block index.
|
||||
trailer.setLoadOnOpenOffset(writeBlockIndex(this.outputStream,
|
||||
this.blockKeys, this.blockOffsets, this.blockDataSizes));
|
||||
LOG.info("Wrote a version 1 block index with " + this.blockKeys.size()
|
||||
+ " keys");
|
||||
|
||||
if (metaNames.size() > 0) {
|
||||
// Write the meta index.
|
||||
writeBlockIndex(this.outputStream, metaNames, metaOffsets, metaDataSizes);
|
||||
}
|
||||
|
||||
// Now finish off the trailer.
|
||||
trailer.setDataIndexCount(blockKeys.size());
|
||||
|
||||
finishClose(trailer);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void finishFileInfo() throws IOException {
|
||||
super.finishFileInfo();
|
||||
|
||||
// In version 1, we store comparator name in the file info.
|
||||
fileInfo.append(FileInfo.COMPARATOR,
|
||||
Bytes.toBytes(comparator.getClass().getName()), false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addInlineBlockWriter(InlineBlockWriter bloomWriter) {
|
||||
// Inline blocks only exist in HFile format version 2.
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
/**
|
||||
* Version 1 general Bloom filters are stored in two meta blocks with two different
|
||||
* keys.
|
||||
*/
|
||||
@Override
|
||||
public void addGeneralBloomFilter(BloomFilterWriter bfw) {
|
||||
appendMetaBlock(BLOOM_FILTER_META_KEY,
|
||||
bfw.getMetaWriter());
|
||||
Writable dataWriter = bfw.getDataWriter();
|
||||
if (dataWriter != null) {
|
||||
appendMetaBlock(BLOOM_FILTER_DATA_KEY, dataWriter);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addDeleteFamilyBloomFilter(BloomFilterWriter bfw)
|
||||
throws IOException {
|
||||
throw new IOException("Delete Bloom filter is not supported in HFile V1");
|
||||
}
|
||||
|
||||
/**
|
||||
* Write out the index in the version 1 format. This conforms to the legacy
|
||||
* version 1 format, but can still be read by
|
||||
* {@link HFileBlockIndex.BlockIndexReader#readRootIndex(java.io.DataInputStream,
|
||||
* int)}.
|
||||
*
|
||||
* @param out the stream to write to
|
||||
* @param keys
|
||||
* @param offsets
|
||||
* @param uncompressedSizes in contrast with a version 2 root index format,
|
||||
* the sizes stored in the version 1 are uncompressed sizes
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
private static long writeBlockIndex(final FSDataOutputStream out,
|
||||
final List<byte[]> keys, final List<Long> offsets,
|
||||
final List<Integer> uncompressedSizes) throws IOException {
|
||||
long pos = out.getPos();
|
||||
// Don't write an index if nothing in the index.
|
||||
if (keys.size() > 0) {
|
||||
BlockType.INDEX_V1.write(out);
|
||||
// Write the index.
|
||||
for (int i = 0; i < keys.size(); ++i) {
|
||||
out.writeLong(offsets.get(i).longValue());
|
||||
out.writeInt(uncompressedSizes.get(i).intValue());
|
||||
byte[] key = keys.get(i);
|
||||
Bytes.writeByteArray(out, key);
|
||||
}
|
||||
}
|
||||
return pos;
|
||||
}
|
||||
|
||||
}
|
|
@ -56,7 +56,6 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
|||
import org.apache.hadoop.hbase.io.hfile.BlockType;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileWriterV1;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
|
||||
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
|
||||
|
@ -1535,7 +1534,7 @@ public class StoreFile {
|
|||
bloom = null;
|
||||
shouldCheckBloom = true;
|
||||
} else {
|
||||
bloom = reader.getMetaBlock(HFileWriterV1.BLOOM_FILTER_DATA_KEY,
|
||||
bloom = reader.getMetaBlock(HFile.BLOOM_FILTER_DATA_KEY,
|
||||
true);
|
||||
shouldCheckBloom = bloom != null;
|
||||
}
|
||||
|
|
|
@ -1,123 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io.encoding;
|
||||
|
||||
import static org.apache.hadoop.hbase.io.encoding.TestChangingEncoding.CF;
|
||||
import static org.apache.hadoop.hbase.io.encoding.TestChangingEncoding.CF_BYTES;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.LargeTests;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category(LargeTests.class)
|
||||
public class TestUpgradeFromHFileV1ToEncoding {
|
||||
|
||||
private static final Log LOG =
|
||||
LogFactory.getLog(TestUpgradeFromHFileV1ToEncoding.class);
|
||||
|
||||
private static final String TABLE = "UpgradeTable";
|
||||
private static final byte[] TABLE_BYTES = Bytes.toBytes(TABLE);
|
||||
|
||||
private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private static final Configuration conf = TEST_UTIL.getConfiguration();
|
||||
|
||||
private static final int NUM_HFILE_V1_BATCHES = 10;
|
||||
private static final int NUM_HFILE_V2_BATCHES = 20;
|
||||
|
||||
private static final int NUM_SLAVES = 3;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
// Use a small flush size to create more HFiles.
|
||||
conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
|
||||
conf.setInt(HFile.FORMAT_VERSION_KEY, 1); // Use HFile v1 initially
|
||||
TEST_UTIL.startMiniCluster(NUM_SLAVES);
|
||||
LOG.debug("Started an HFile v1 cluster");
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpgrade() throws Exception {
|
||||
int numBatches = 0;
|
||||
HTableDescriptor htd = new HTableDescriptor(TABLE);
|
||||
|
||||
// We don't want a split in this test.
|
||||
htd.setValue(HTableDescriptor.SPLIT_POLICY, ConstantSizeRegionSplitPolicy.class.getName());
|
||||
htd.setMaxFileSize(Long.MAX_VALUE);
|
||||
HColumnDescriptor hcd = new HColumnDescriptor(CF);
|
||||
htd.addFamily(hcd);
|
||||
HBaseAdmin admin = new HBaseAdmin(conf);
|
||||
admin.createTable(htd);
|
||||
admin.close();
|
||||
for (int i = 0; i < NUM_HFILE_V1_BATCHES; ++i) {
|
||||
TestChangingEncoding.writeTestDataBatch(conf, TABLE, numBatches++);
|
||||
}
|
||||
TEST_UTIL.shutdownMiniHBaseCluster();
|
||||
|
||||
conf.setInt(HFile.FORMAT_VERSION_KEY, 2);
|
||||
TEST_UTIL.startMiniHBaseCluster(1, NUM_SLAVES);
|
||||
LOG.debug("Started an HFile v2 cluster");
|
||||
admin = new HBaseAdmin(conf);
|
||||
htd = admin.getTableDescriptor(TABLE_BYTES);
|
||||
hcd = htd.getFamily(CF_BYTES);
|
||||
hcd.setDataBlockEncoding(DataBlockEncoding.PREFIX);
|
||||
admin.disableTable(TABLE);
|
||||
admin.modifyColumn(TABLE, hcd);
|
||||
admin.enableTable(TABLE);
|
||||
admin.close();
|
||||
for (int i = 0; i < NUM_HFILE_V2_BATCHES; ++i) {
|
||||
TestChangingEncoding.writeTestDataBatch(conf, TABLE, numBatches++);
|
||||
}
|
||||
|
||||
LOG.debug("Verifying all 'batches', both HFile v1 and encoded HFile v2");
|
||||
verifyBatches(numBatches);
|
||||
|
||||
LOG.debug("Doing a manual compaction");
|
||||
admin.compact(TABLE);
|
||||
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
|
||||
|
||||
LOG.debug("Verify all the data again");
|
||||
verifyBatches(numBatches);
|
||||
}
|
||||
|
||||
private void verifyBatches(int numBatches) throws Exception {
|
||||
for (int i = 0; i < numBatches; ++i) {
|
||||
TestChangingEncoding.verifyTestDataBatch(conf, TABLE, i);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -80,8 +80,6 @@ public class TestForceCacheImportantBlocks {
|
|||
public static Collection<Object[]> parameters() {
|
||||
// HFile versions
|
||||
return Arrays.asList(new Object[][] {
|
||||
new Object[] { new Integer(1), false },
|
||||
new Object[] { new Integer(1), true },
|
||||
new Object[] { new Integer(2), false },
|
||||
new Object[] { new Integer(2), true }
|
||||
});
|
||||
|
|
|
@ -257,41 +257,6 @@ public class TestHFileBlock {
|
|||
correctGzipBlockLength));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReaderV1() throws IOException {
|
||||
for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
|
||||
for (boolean pread : new boolean[] { false, true }) {
|
||||
byte[] block = createTestV1Block(algo);
|
||||
Path path = new Path(TEST_UTIL.getDataTestDir(),
|
||||
"blocks_v1_"+ algo);
|
||||
LOG.info("Creating temporary file at " + path);
|
||||
FSDataOutputStream os = fs.create(path);
|
||||
int totalSize = 0;
|
||||
int numBlocks = 50;
|
||||
for (int i = 0; i < numBlocks; ++i) {
|
||||
os.write(block);
|
||||
totalSize += block.length;
|
||||
}
|
||||
os.close();
|
||||
|
||||
FSDataInputStream is = fs.open(path);
|
||||
HFileBlock.FSReader hbr = new HFileBlock.FSReaderV1(is, algo,
|
||||
totalSize);
|
||||
HFileBlock b;
|
||||
int numBlocksRead = 0;
|
||||
long pos = 0;
|
||||
while (pos < totalSize) {
|
||||
b = hbr.readBlockData(pos, block.length, uncompressedSizeV1, pread);
|
||||
b.sanityCheck();
|
||||
pos += block.length;
|
||||
numBlocksRead++;
|
||||
}
|
||||
assertEquals(numBlocks, numBlocksRead);
|
||||
is.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReaderV2() throws IOException {
|
||||
for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
|
||||
|
|
|
@ -168,41 +168,6 @@ public class TestHFileBlockCompatibility {
|
|||
assertEquals(correctTestBlockStr, returnedStr);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReaderV1() throws IOException {
|
||||
for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
|
||||
for (boolean pread : new boolean[] { false, true }) {
|
||||
byte[] block = createTestV1Block(algo);
|
||||
Path path = new Path(TEST_UTIL.getDataTestDir(),
|
||||
"blocks_v1_"+ algo);
|
||||
LOG.info("Creating temporary file at " + path);
|
||||
FSDataOutputStream os = fs.create(path);
|
||||
int totalSize = 0;
|
||||
int numBlocks = 50;
|
||||
for (int i = 0; i < numBlocks; ++i) {
|
||||
os.write(block);
|
||||
totalSize += block.length;
|
||||
}
|
||||
os.close();
|
||||
|
||||
FSDataInputStream is = fs.open(path);
|
||||
HFileBlock.FSReader hbr = new HFileBlock.FSReaderV1(is, algo,
|
||||
totalSize);
|
||||
HFileBlock b;
|
||||
int numBlocksRead = 0;
|
||||
long pos = 0;
|
||||
while (pos < totalSize) {
|
||||
b = hbr.readBlockData(pos, block.length, uncompressedSizeV1, pread);
|
||||
b.sanityCheck();
|
||||
pos += block.length;
|
||||
numBlocksRead++;
|
||||
}
|
||||
assertEquals(numBlocks, numBlocksRead);
|
||||
is.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReaderV2() throws IOException {
|
||||
for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
|
||||
|
|
|
@ -1,92 +0,0 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.io.hfile;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URL;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.SmallTests;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.mortbay.log.Log;
|
||||
|
||||
@Category(SmallTests.class)
|
||||
public class TestHFileReaderV1 {
|
||||
|
||||
private static final HBaseTestingUtility TEST_UTIL =
|
||||
new HBaseTestingUtility();
|
||||
|
||||
private Configuration conf;
|
||||
private FileSystem fs;
|
||||
|
||||
private static final int N = 1000;
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException {
|
||||
conf = TEST_UTIL.getConfiguration();
|
||||
fs = FileSystem.get(conf);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadingExistingVersion1HFile() throws IOException {
|
||||
URL url = TestHFileReaderV1.class.getResource(
|
||||
"8e8ab58dcf39412da19833fcd8f687ac");
|
||||
Path existingHFilePath = new Path(url.getPath());
|
||||
HFile.Reader reader =
|
||||
HFile.createReader(fs, existingHFilePath, new CacheConfig(conf));
|
||||
reader.loadFileInfo();
|
||||
FixedFileTrailer trailer = reader.getTrailer();
|
||||
|
||||
assertEquals(N, reader.getEntries());
|
||||
assertEquals(N, trailer.getEntryCount());
|
||||
assertEquals(1, trailer.getMajorVersion());
|
||||
assertEquals(Compression.Algorithm.GZ, trailer.getCompressionCodec());
|
||||
|
||||
for (boolean pread : new boolean[] { false, true }) {
|
||||
int totalDataSize = 0;
|
||||
int n = 0;
|
||||
|
||||
HFileScanner scanner = reader.getScanner(false, pread);
|
||||
assertTrue(scanner.seekTo());
|
||||
do {
|
||||
totalDataSize += scanner.getKey().limit() + scanner.getValue().limit()
|
||||
+ Bytes.SIZEOF_INT * 2;
|
||||
++n;
|
||||
} while (scanner.next());
|
||||
|
||||
// Add magic record sizes, one per data block.
|
||||
totalDataSize += 8 * trailer.getDataIndexCount();
|
||||
|
||||
assertEquals(N, n);
|
||||
assertEquals(trailer.getTotalUncompressedBytes(), totalDataSize);
|
||||
}
|
||||
reader.close();
|
||||
}
|
||||
}
|
|
@ -202,21 +202,6 @@ public class TestSplitTransaction {
|
|||
assertFalse(st.prepare());
|
||||
}
|
||||
|
||||
@Test public void testWholesomeSplitWithHFileV1() throws IOException {
|
||||
int defaultVersion = TEST_UTIL.getConfiguration().getInt(
|
||||
HFile.FORMAT_VERSION_KEY, 2);
|
||||
TEST_UTIL.getConfiguration().setInt(HFile.FORMAT_VERSION_KEY, 1);
|
||||
try {
|
||||
for (Store store : this.parent.stores.values()) {
|
||||
store.getFamily().setBloomFilterType(BloomType.ROW);
|
||||
}
|
||||
testWholesomeSplit();
|
||||
} finally {
|
||||
TEST_UTIL.getConfiguration().setInt(HFile.FORMAT_VERSION_KEY,
|
||||
defaultVersion);
|
||||
}
|
||||
}
|
||||
|
||||
@Test public void testWholesomeSplit() throws IOException {
|
||||
final int rowcount = TEST_UTIL.loadRegion(this.parent, CF, true);
|
||||
assertTrue(rowcount > 0);
|
||||
|
|
|
@ -587,60 +587,6 @@ public class TestStoreFile extends HBaseTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testBloomEdgeCases() throws Exception {
|
||||
float err = (float)0.005;
|
||||
FileSystem fs = FileSystem.getLocal(conf);
|
||||
Path f = new Path(ROOT_DIR, getName());
|
||||
conf.setFloat(BloomFilterFactory.IO_STOREFILE_BLOOM_ERROR_RATE, err);
|
||||
conf.setBoolean(BloomFilterFactory.IO_STOREFILE_BLOOM_ENABLED, true);
|
||||
conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_MAX_KEYS, 1000);
|
||||
|
||||
// This test only runs for HFile format version 1.
|
||||
conf.setInt(HFile.FORMAT_VERSION_KEY, 1);
|
||||
|
||||
// this should not create a bloom because the max keys is too small
|
||||
StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, fs,
|
||||
StoreFile.DEFAULT_BLOCKSIZE_SMALL)
|
||||
.withFilePath(f)
|
||||
.withBloomType(BloomType.ROW)
|
||||
.withMaxKeyCount(2000)
|
||||
.withChecksumType(CKTYPE)
|
||||
.withBytesPerChecksum(CKBYTES)
|
||||
.build();
|
||||
assertFalse(writer.hasGeneralBloom());
|
||||
writer.close();
|
||||
fs.delete(f, true);
|
||||
|
||||
conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_MAX_KEYS,
|
||||
Integer.MAX_VALUE);
|
||||
|
||||
// TODO: commented out because we run out of java heap space on trunk
|
||||
// the below config caused IllegalArgumentException in our production cluster
|
||||
// however, the resulting byteSize is < MAX_INT, so this should work properly
|
||||
writer = new StoreFile.WriterBuilder(conf, cacheConf, fs,
|
||||
StoreFile.DEFAULT_BLOCKSIZE_SMALL)
|
||||
.withFilePath(f)
|
||||
.withBloomType(BloomType.ROW)
|
||||
.withMaxKeyCount(27244696)
|
||||
.build();
|
||||
assertTrue(writer.hasGeneralBloom());
|
||||
bloomWriteRead(writer, fs);
|
||||
|
||||
// this, however, is too large and should not create a bloom
|
||||
// because Java can't create a contiguous array > MAX_INT
|
||||
writer = new StoreFile.WriterBuilder(conf, cacheConf, fs,
|
||||
StoreFile.DEFAULT_BLOCKSIZE_SMALL)
|
||||
.withFilePath(f)
|
||||
.withBloomType(BloomType.ROW)
|
||||
.withMaxKeyCount(Integer.MAX_VALUE)
|
||||
.withChecksumType(CKTYPE)
|
||||
.withBytesPerChecksum(CKBYTES)
|
||||
.build();
|
||||
assertFalse(writer.hasGeneralBloom());
|
||||
writer.close();
|
||||
fs.delete(f, true);
|
||||
}
|
||||
|
||||
public void testSeqIdComparator() {
|
||||
assertOrdering(StoreFile.Comparators.SEQ_ID,
|
||||
mockStoreFile(true, 1000, -1, "/foo/123"),
|
||||
|
|
Loading…
Reference in New Issue