HBASE-2856 Cross Column Family Read Atomicity

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1203465 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Nicolas Spiegelberg 2011-11-18 02:19:24 +00:00
parent a9253a7104
commit d116edef93
14 changed files with 317 additions and 40 deletions

View File

@ -262,7 +262,6 @@ public abstract class AbstractHFileReader extends SchemaConfigured
}
protected static abstract class Scanner implements HFileScanner {
protected HFile.Reader reader;
protected ByteBuffer blockBuffer;
protected boolean cacheBlocks;
@ -271,22 +270,18 @@ public abstract class AbstractHFileReader extends SchemaConfigured
protected int currKeyLen;
protected int currValueLen;
protected int currMemstoreTSLen;
protected long currMemstoreTS;
protected int blockFetches;
public Scanner(final HFile.Reader reader, final boolean cacheBlocks,
public Scanner(final boolean cacheBlocks,
final boolean pread, final boolean isCompaction) {
this.reader = reader;
this.cacheBlocks = cacheBlocks;
this.pread = pread;
this.isCompaction = isCompaction;
}
@Override
public Reader getReader() {
return reader;
}
@Override
public boolean isSeeked(){
return blockBuffer != null;
@ -294,7 +289,7 @@ public abstract class AbstractHFileReader extends SchemaConfigured
@Override
public String toString() {
return "HFileScanner for reader " + String.valueOf(reader);
return "HFileScanner for reader " + String.valueOf(getReader());
}
protected void assertSeeked() {

View File

@ -385,13 +385,13 @@ public class HFileReaderV1 extends AbstractHFileReader {
* Implementation of {@link HFileScanner} interface.
*/
protected static class ScannerV1 extends AbstractHFileReader.Scanner {
private final HFileReaderV1 readerV1;
private final HFileReaderV1 reader;
private int currBlock;
public ScannerV1(HFileReaderV1 reader, boolean cacheBlocks,
final boolean pread, final boolean isCompaction) {
super(reader, cacheBlocks, pread, isCompaction);
readerV1 = reader;
super(cacheBlocks, pread, isCompaction);
this.reader = reader;
}
@Override
@ -458,7 +458,7 @@ public class HFileReaderV1 extends AbstractHFileReader {
blockBuffer = null;
return false;
}
blockBuffer = readerV1.readBlockBuffer(currBlock, cacheBlocks, pread,
blockBuffer = reader.readBlockBuffer(currBlock, cacheBlocks, pread,
isCompaction);
currKeyLen = blockBuffer.getInt();
currValueLen = blockBuffer.getInt();
@ -478,7 +478,7 @@ public class HFileReaderV1 extends AbstractHFileReader {
@Override
public int seekTo(byte[] key, int offset, int length) throws IOException {
int b = readerV1.blockContainingKey(key, offset, length);
int b = reader.blockContainingKey(key, offset, length);
if (b < 0) return -1; // falls before the beginning of the file! :-(
// Avoid re-reading the same block (that'd be dumb).
loadBlock(b, true);
@ -504,7 +504,7 @@ public class HFileReaderV1 extends AbstractHFileReader {
}
}
int b = readerV1.blockContainingKey(key, offset, length);
int b = reader.blockContainingKey(key, offset, length);
if (b < 0) {
return -1;
}
@ -571,7 +571,7 @@ public class HFileReaderV1 extends AbstractHFileReader {
@Override
public boolean seekBefore(byte[] key, int offset, int length)
throws IOException {
int b = readerV1.blockContainingKey(key, offset, length);
int b = reader.blockContainingKey(key, offset, length);
if (b < 0)
return false; // key is before the start of the file.
@ -623,7 +623,7 @@ public class HFileReaderV1 extends AbstractHFileReader {
return true;
}
currBlock = 0;
blockBuffer = readerV1.readBlockBuffer(currBlock, cacheBlocks, pread,
blockBuffer = reader.readBlockBuffer(currBlock, cacheBlocks, pread,
isCompaction);
currKeyLen = blockBuffer.getInt();
currValueLen = blockBuffer.getInt();
@ -633,13 +633,13 @@ public class HFileReaderV1 extends AbstractHFileReader {
private void loadBlock(int bloc, boolean rewind) throws IOException {
if (blockBuffer == null) {
blockBuffer = readerV1.readBlockBuffer(bloc, cacheBlocks, pread,
blockBuffer = reader.readBlockBuffer(bloc, cacheBlocks, pread,
isCompaction);
currBlock = bloc;
blockFetches++;
} else {
if (bloc != currBlock) {
blockBuffer = readerV1.readBlockBuffer(bloc, cacheBlocks, pread,
blockBuffer = reader.readBlockBuffer(bloc, cacheBlocks, pread,
isCompaction);
currBlock = bloc;
blockFetches++;

View File

@ -19,7 +19,9 @@
*/
package org.apache.hadoop.hbase.io.hfile;
import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@ -34,6 +36,7 @@ import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.IdLock;
import org.apache.hadoop.io.WritableUtils;
/**
* {@link HFile} reader for version 2.
@ -46,7 +49,13 @@ public class HFileReaderV2 extends AbstractHFileReader {
* The size of a (key length, value length) tuple that prefixes each entry in
* a data block.
*/
private static final int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT;
private static int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT;
private boolean includesMemstoreTS = false;
private boolean shouldIncludeMemstoreTS() {
return includesMemstoreTS;
}
/**
* A "sparse lock" implementation allowing to lock on a particular block
@ -115,6 +124,9 @@ public class HFileReaderV2 extends AbstractHFileReader {
lastKey = fileInfo.get(FileInfo.LASTKEY);
avgKeyLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_KEY_LEN));
avgValueLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_VALUE_LEN));
byte [] keyValueFormatVersion = fileInfo.get(HFileWriterV2.KEY_VALUE_VERSION);
includesMemstoreTS = (keyValueFormatVersion != null &&
Bytes.toInt(keyValueFormatVersion) == HFileWriterV2.KEY_VALUE_VER_WITH_MEMSTORE);
// Store all other load-on-open blocks for further consumption.
HFileBlock b;
@ -333,10 +345,17 @@ public class HFileReaderV2 extends AbstractHFileReader {
*/
protected static class ScannerV2 extends AbstractHFileReader.Scanner {
private HFileBlock block;
private HFileReaderV2 reader;
public ScannerV2(HFileReaderV2 r, boolean cacheBlocks,
final boolean pread, final boolean isCompaction) {
super(r, cacheBlocks, pread, isCompaction);
super(cacheBlocks, pread, isCompaction);
this.reader = r;
}
@Override
public HFileReaderV2 getReader() {
return reader;
}
@Override
@ -344,8 +363,12 @@ public class HFileReaderV2 extends AbstractHFileReader {
if (!isSeeked())
return null;
return new KeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
+ blockBuffer.position());
KeyValue ret = new KeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
+ blockBuffer.position());
if (this.reader.shouldIncludeMemstoreTS()) {
ret.setMemstoreTS(currMemstoreTS);
}
return ret;
}
@Override
@ -371,6 +394,8 @@ public class HFileReaderV2 extends AbstractHFileReader {
blockBuffer = null;
currKeyLen = 0;
currValueLen = 0;
currMemstoreTS = 0;
currMemstoreTSLen = 0;
}
/**
@ -386,7 +411,7 @@ public class HFileReaderV2 extends AbstractHFileReader {
try {
blockBuffer.position(blockBuffer.position() + KEY_VALUE_LEN_SIZE
+ currKeyLen + currValueLen);
+ currKeyLen + currValueLen + currMemstoreTSLen);
} catch (IllegalArgumentException e) {
LOG.error("Current pos = " + blockBuffer.position()
+ "; currKeyLen = " + currKeyLen + "; currValLen = "
@ -579,6 +604,19 @@ public class HFileReaderV2 extends AbstractHFileReader {
currKeyLen = blockBuffer.getInt();
currValueLen = blockBuffer.getInt();
blockBuffer.reset();
if (this.reader.shouldIncludeMemstoreTS()) {
try {
ByteArrayInputStream byte_input = new ByteArrayInputStream(blockBuffer.array());
byte_input.skip(blockBuffer.arrayOffset() + blockBuffer.position()
+ KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen);
DataInputStream data_input = new DataInputStream(byte_input);
currMemstoreTS = WritableUtils.readVLong(data_input);
currMemstoreTSLen = WritableUtils.getVIntSize(currMemstoreTS);
} catch (Exception e) {
throw new RuntimeException("Error reading memstoreTS. " + e);
}
}
if (currKeyLen < 0 || currValueLen < 0
|| currKeyLen > blockBuffer.limit()
@ -606,12 +644,27 @@ public class HFileReaderV2 extends AbstractHFileReader {
private int blockSeek(byte[] key, int offset, int length,
boolean seekBefore) {
int klen, vlen;
long memstoreTS = 0;
int memstoreTSLen = 0;
int lastKeyValueSize = -1;
do {
blockBuffer.mark();
klen = blockBuffer.getInt();
vlen = blockBuffer.getInt();
blockBuffer.reset();
if (this.reader.shouldIncludeMemstoreTS()) {
try {
ByteArrayInputStream byte_input = new ByteArrayInputStream(blockBuffer.array());
byte_input.skip(blockBuffer.arrayOffset() + blockBuffer.position()
+ KEY_VALUE_LEN_SIZE + klen + vlen);
DataInputStream data_input = new DataInputStream(byte_input);
memstoreTS = WritableUtils.readVLong(data_input);
memstoreTSLen = WritableUtils.getVIntSize(memstoreTS);
} catch (Exception e) {
throw new RuntimeException("Error reading memstoreTS. " + e);
}
}
int keyOffset = blockBuffer.arrayOffset() + blockBuffer.position()
+ KEY_VALUE_LEN_SIZE;
@ -633,6 +686,10 @@ public class HFileReaderV2 extends AbstractHFileReader {
}
currKeyLen = klen;
currValueLen = vlen;
if (this.reader.shouldIncludeMemstoreTS()) {
currMemstoreTS = memstoreTS;
currMemstoreTSLen = memstoreTSLen;
}
return 0; // indicate exact match
}
@ -644,7 +701,7 @@ public class HFileReaderV2 extends AbstractHFileReader {
}
// The size of this key/value tuple, including key/value length fields.
lastKeyValueSize = klen + vlen + KEY_VALUE_LEN_SIZE;
lastKeyValueSize = klen + vlen + memstoreTSLen + KEY_VALUE_LEN_SIZE;
blockBuffer.position(blockBuffer.position() + lastKeyValueSize);
} while (blockBuffer.remaining() > 0);

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.util.BloomFilterWriter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
/**
* Writes HFile format version 2.
@ -47,6 +48,13 @@ import org.apache.hadoop.io.Writable;
public class HFileWriterV2 extends AbstractHFileWriter {
static final Log LOG = LogFactory.getLog(HFileWriterV2.class);
/** Max memstore (rwcc) timestamp in FileInfo */
public static final byte [] MAX_MEMSTORE_TS_KEY = Bytes.toBytes("MAX_MEMSTORE_TS_KEY");
/** KeyValue version in FileInfo */
public static final byte [] KEY_VALUE_VERSION = Bytes.toBytes("KEY_VALUE_VERSION");
/** Version for KeyValue which includes memstore timestamp */
public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1;
/** Inline block writers for multi-level block index and compound Blooms. */
private List<InlineBlockWriter> inlineBlockWriters =
new ArrayList<InlineBlockWriter>();
@ -67,6 +75,9 @@ public class HFileWriterV2 extends AbstractHFileWriter {
private List<BlockWritable> additionalLoadOnOpenData =
new ArrayList<BlockWritable>();
private final boolean includeMemstoreTS = true;
private long maxMemstoreTS = 0;
static class WriterFactoryV2 extends HFile.WriterFactory {
WriterFactoryV2(Configuration conf, CacheConfig cacheConf) {
@ -311,8 +322,9 @@ public class HFileWriterV2 extends AbstractHFileWriter {
*/
@Override
public void append(final KeyValue kv) throws IOException {
append(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(),
append(kv.getMemstoreTS(), kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(),
kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
this.maxMemstoreTS = Math.max(this.maxMemstoreTS, kv.getMemstoreTS());
}
/**
@ -327,7 +339,7 @@ public class HFileWriterV2 extends AbstractHFileWriter {
*/
@Override
public void append(final byte[] key, final byte[] value) throws IOException {
append(key, 0, key.length, value, 0, value.length);
append(0, key, 0, key.length, value, 0, value.length);
}
/**
@ -342,7 +354,7 @@ public class HFileWriterV2 extends AbstractHFileWriter {
* @param vlength
* @throws IOException
*/
private void append(final byte[] key, final int koffset, final int klength,
private void append(final long memstoreTS, final byte[] key, final int koffset, final int klength,
final byte[] value, final int voffset, final int vlength)
throws IOException {
boolean dupKey = checkKey(key, koffset, klength);
@ -355,6 +367,7 @@ public class HFileWriterV2 extends AbstractHFileWriter {
newBlock();
// Write length of key and value and then actual key and value bytes.
// Additionally, we may also write down the memstoreTS.
{
DataOutputStream out = fsBlockWriter.getUserDataStream();
out.writeInt(klength);
@ -363,6 +376,9 @@ public class HFileWriterV2 extends AbstractHFileWriter {
totalValueLength += vlength;
out.write(key, koffset, klength);
out.write(value, voffset, vlength);
if (this.includeMemstoreTS) {
WritableUtils.writeVLong(out, memstoreTS);
}
}
// Are we the first key in this block?
@ -378,6 +394,10 @@ public class HFileWriterV2 extends AbstractHFileWriter {
entryCount++;
}
public static int getEncodedLength(long value) {
return WritableUtils.getVIntSize(value);
}
@Override
public void close() throws IOException {
if (outputStream == null) {
@ -428,6 +448,11 @@ public class HFileWriterV2 extends AbstractHFileWriter {
fsBlockWriter.writeHeaderAndData(outputStream);
totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
if (this.includeMemstoreTS) {
appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS));
appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE));
}
// File info
writeFileInfo(trailer, fsBlockWriter.startWriting(BlockType.FILE_INFO,
false));
@ -449,6 +474,7 @@ public class HFileWriterV2 extends AbstractHFileWriter {
trailer.setComparatorClass(comparator.getClass());
trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries());
finishClose(trailer);
fsBlockWriter.releaseCompressor();

View File

@ -212,6 +212,29 @@ public class HRegion implements HeapSize { // , Writable{
final Path regiondir;
KeyValue.KVComparator comparator;
private ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
/*
* @return The smallest rwcc readPoint across all the scanners in this
* region. Writes older than this readPoint, are included in every
* read operation.
*/
public long getSmallestReadPoint() {
long minimumReadPoint;
// We need to ensure that while we are calculating the smallestReadPoint
// no new RegionScanners can grab a readPoint that we are unaware of.
// We achieve this by synchronizing on the scannerReadPoints object.
synchronized(scannerReadPoints) {
minimumReadPoint = rwcc.memstoreReadPoint();
for (Long readPoint: this.scannerReadPoints.values()) {
if (readPoint < minimumReadPoint) {
minimumReadPoint = readPoint;
}
}
}
return minimumReadPoint;
}
/*
* Data structure of write state flags used coordinating flushes,
* compactions and closes.
@ -371,6 +394,7 @@ public class HRegion implements HeapSize { // , Writable{
this.htableDescriptor = null;
this.threadWakeFrequency = 0L;
this.coprocessorHost = null;
this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
}
/**
@ -414,6 +438,7 @@ public class HRegion implements HeapSize { // , Writable{
String encodedNameStr = this.regionInfo.getEncodedName();
setHTableSpecificConf();
this.regiondir = getRegionDir(this.tableDir, encodedNameStr);
this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
// don't initialize coprocessors if not running within a regionserver
// TODO: revisit if coprocessors should load in other cases
@ -495,6 +520,8 @@ public class HRegion implements HeapSize { // , Writable{
// min across all the max.
long minSeqId = -1;
long maxSeqId = -1;
// initialized to -1 so that we pick up MemstoreTS from column families
long maxMemstoreTS = -1;
for (HColumnDescriptor c : this.htableDescriptor.getFamilies()) {
status.setStatus("Instantiating store for column family " + c);
Store store = instantiateHStore(this.tableDir, c);
@ -506,7 +533,12 @@ public class HRegion implements HeapSize { // , Writable{
if (maxSeqId == -1 || storeSeqId > maxSeqId) {
maxSeqId = storeSeqId;
}
long maxStoreMemstoreTS = store.getMaxMemstoreTS();
if (maxStoreMemstoreTS > maxMemstoreTS) {
maxMemstoreTS = maxStoreMemstoreTS;
}
}
rwcc.initialize(maxMemstoreTS + 1);
// Recover any edits if available.
maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(
this.regiondir, minSeqId, reporter, status));
@ -1666,6 +1698,8 @@ public class HRegion implements HeapSize { // , Writable{
this.put(put, lockid, put.getWriteToWAL());
}
/**
* @param put
* @param lockid
@ -2285,6 +2319,7 @@ public class HRegion implements HeapSize { // , Writable{
rwcc.completeMemstoreInsert(localizedWriteEntry);
}
}
return size;
}
@ -2963,6 +2998,7 @@ public class HRegion implements HeapSize { // , Writable{
}
RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners) throws IOException {
//DebugPrint.println("HRegionScanner.<init>");
this.filter = scan.getFilter();
this.batch = scan.getBatch();
if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
@ -2974,7 +3010,12 @@ public class HRegion implements HeapSize { // , Writable{
// it is [startRow,endRow) and if startRow=endRow we get nothing.
this.isScan = scan.isGetScan() ? -1 : 0;
this.readPt = ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
// synchronize on scannerReadPoints so that nobody calculates
// getSmallestReadPoint, before scannerReadPoints is updated.
synchronized(scannerReadPoints) {
this.readPt = ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
scannerReadPoints.put(this, this.readPt);
}
List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
if (additionalScanners != null) {
@ -2984,7 +3025,9 @@ public class HRegion implements HeapSize { // , Writable{
for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
scan.getFamilyMap().entrySet()) {
Store store = stores.get(entry.getKey());
scanners.add(store.getScanner(scan, entry.getValue()));
StoreScanner scanner = store.getScanner(scan, entry.getValue());
scanner.useRWCC(true);
scanners.add(scanner);
}
this.storeHeap = new KeyValueHeap(scanners, comparator);
}
@ -3135,6 +3178,8 @@ public class HRegion implements HeapSize { // , Writable{
storeHeap.close();
storeHeap = null;
}
// no need to sychronize here.
scannerReadPoints.remove(this);
this.filterClosed = true;
}
@ -4186,7 +4231,7 @@ public class HRegion implements HeapSize { // , Writable{
public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT +
ClassSize.ARRAY +
28 * ClassSize.REFERENCE + Bytes.SIZEOF_INT +
29 * ClassSize.REFERENCE + Bytes.SIZEOF_INT +
(4 * Bytes.SIZEOF_LONG) +
Bytes.SIZEOF_BOOLEAN);
@ -4195,7 +4240,7 @@ public class HRegion implements HeapSize { // , Writable{
(2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing
ClassSize.ATOMIC_LONG + // memStoreSize
ClassSize.ATOMIC_INTEGER + // lockIdGenerator
(2 * ClassSize.CONCURRENT_HASHMAP) + // lockedRows, lockIds
(3 * ClassSize.CONCURRENT_HASHMAP) + // lockedRows, lockIds, scannerReadPoints
WriteState.HEAP_SIZE + // writestate
ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores
(2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock

View File

@ -24,6 +24,9 @@ import java.util.LinkedList;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;
/**
* Manages the read/write consistency within memstore. This provides
* an interface for readers to determine what entries to ignore, and
@ -43,6 +46,31 @@ public class ReadWriteConsistencyControl {
private static final ThreadLocal<Long> perThreadReadPoint =
new ThreadLocal<Long>();
/**
* Default constructor. Initializes the memstoreRead/Write points to 0.
*/
public ReadWriteConsistencyControl() {
this.memstoreRead = this.memstoreWrite = 0;
}
/**
* Initializes the memstoreRead/Write points appropriately.
* @param startPoint
*/
public void initialize(long startPoint) {
synchronized (writeQueue) {
if (this.memstoreWrite != this.memstoreRead) {
throw new RuntimeException("Already used this rwcc. Too late to initialize");
}
if (this.memstoreWrite > startPoint) {
throw new RuntimeException("Cannot decrease RWCC timestamp");
}
this.memstoreRead = this.memstoreWrite = startPoint;
}
}
/**
* Get this thread's read point. Used primarily by the memstore scanner to
* know which values to skip (ie: have not been completed/committed to
@ -151,6 +179,7 @@ public class ReadWriteConsistencyControl {
}
}
if (interrupted) Thread.currentThread().interrupt();
}
public long memstoreReadPoint() {

View File

@ -92,6 +92,12 @@ public class ScanQueryMatcher {
*/
private final long earliestPutTs;
/** Should we ignore KV's with a newer RWCC timestamp **/
private boolean enforceRWCC = false;
public void useRWCC(boolean flag) {
this.enforceRWCC = flag;
}
/**
* This variable shows whether there is an null column in the query. There
* always exists a null column in the wildcard column query.
@ -228,6 +234,13 @@ public class ScanQueryMatcher {
return columns.getNextRowOrNextColumn(bytes, offset, qualLength);
}
// The compaction thread has no readPoint set. For other operations, we
// will ignore updates that are done after the read operation has started.
if (this.enforceRWCC &&
kv.getMemstoreTS() > ReadWriteConsistencyControl.getThreadReadPoint()) {
return MatchCode.SKIP;
}
/*
* The delete logic is pretty complicated now.
* This is corroborated by the following:

View File

@ -237,6 +237,13 @@ public class Store extends SchemaConfigured implements HeapSize {
return StoreFile.getMaxSequenceIdInList(this.getStorefiles());
}
/**
* @return The maximum memstoreTS in all store files.
*/
public long getMaxMemstoreTS() {
return StoreFile.getMaxMemstoreTSInList(this.getStorefiles());
}
/**
* @param tabledir
* @param encodedName Encoded region name.
@ -507,6 +514,9 @@ public class Store extends SchemaConfigured implements HeapSize {
MonitoredTask status)
throws IOException {
StoreFile.Writer writer;
String fileName;
// Find the smallest read point across all the Scanners.
long smallestReadPoint = region.getSmallestReadPoint();
long flushed = 0;
Path pathName;
// Don't flush if there are no entries.
@ -538,6 +548,11 @@ public class Store extends SchemaConfigured implements HeapSize {
hasMore = scanner.next(kvs);
if (!kvs.isEmpty()) {
for (KeyValue kv : kvs) {
// If we know that this KV is going to be included always, then let us
// set its memstoreTS to 0. This will help us save space when writing to disk.
if (kv.getMemstoreTS() <= smallestReadPoint) {
kv.setMemstoreTS(0);
}
writer.append(kv);
flushed += this.memstore.heapSizeChange(kv, true);
}
@ -1224,6 +1239,8 @@ public class Store extends SchemaConfigured implements HeapSize {
// Make the instantiation lazy in case compaction produces no product; i.e.
// where all source cells are expired or deleted.
StoreFile.Writer writer = null;
// Find the smallest read point across all the Scanners.
long smallestReadPoint = region.getSmallestReadPoint();
try {
InternalScanner scanner = null;
try {
@ -1259,6 +1276,9 @@ public class Store extends SchemaConfigured implements HeapSize {
if (writer != null) {
// output to writer:
for (KeyValue kv : kvs) {
if (kv.getMemstoreTS() <= smallestReadPoint) {
kv.setMemstoreTS(0);
}
writer.append(kv);
// update progress per key
++progress.currentCompactedKVs;
@ -1707,7 +1727,7 @@ public class Store extends SchemaConfigured implements HeapSize {
* Return a scanner for both the memstore and the HStore files
* @throws IOException
*/
public KeyValueScanner getScanner(Scan scan,
public StoreScanner getScanner(Scan scan,
final NavigableSet<byte []> targetCols) throws IOException {
lock.readLock().lock();
try {

View File

@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.HFileWriterV1;
import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
import org.apache.hadoop.hbase.util.BloomFilter;
@ -156,6 +157,18 @@ public class StoreFile {
// Set when we obtain a Reader.
private long sequenceid = -1;
// max of the MemstoreTS in the KV's in this store
// Set when we obtain a Reader.
private long maxMemstoreTS = -1;
public long getMaxMemstoreTS() {
return maxMemstoreTS;
}
public void setMaxMemstoreTS(long maxMemstoreTS) {
this.maxMemstoreTS = maxMemstoreTS;
}
// If true, this file was product of a major compaction. Its then set
// whenever you get a Reader.
private AtomicBoolean majorCompaction = null;
@ -342,6 +355,24 @@ public class StoreFile {
return modificationTimeStamp;
}
/**
* Return the largest memstoreTS found across all storefiles in
* the given list. Store files that were created by a mapreduce
* bulk load are ignored, as they do not correspond to any specific
* put operation, and thus do not have a memstoreTS associated with them.
* @return 0 if no non-bulk-load files are provided or, this is Store that
* does not yet have any store files.
*/
public static long getMaxMemstoreTSInList(Collection<StoreFile> sfs) {
long max = 0;
for (StoreFile sf : sfs) {
if (!sf.isBulkLoadResult()) {
max = Math.max(max, sf.getMaxMemstoreTS());
}
}
return max;
}
/**
* Return the highest sequence ID found across all storefiles in
* the given list. Store files that were created by a mapreduce
@ -491,6 +522,11 @@ public class StoreFile {
}
this.reader.setSequenceID(this.sequenceid);
b = metadataMap.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
if (b != null) {
this.maxMemstoreTS = Bytes.toLong(b);
}
b = metadataMap.get(MAJOR_COMPACTION_KEY);
if (b != null) {
boolean mc = Bytes.toBoolean(b);

View File

@ -159,6 +159,15 @@ class StoreScanner extends NonLazyKeyValueScanner
heap = new KeyValueHeap(scanners, scanInfo.getComparator());
}
/**
* Advise the StoreScanner if it should enforce the RWCC mechanism
* for ignoring newer KVs or not.
* @param flag
*/
public void useRWCC(boolean flag) {
matcher.useRWCC(flag);
}
/*
* @return List of scanners ordered properly.
*/

View File

@ -252,6 +252,12 @@ public class TestAcidGuarantees {
writers.add(writer);
ctx.addThread(writer);
}
// Add a flusher
ctx.addThread(new RepeatingTestThread(ctx) {
public void doAnAction() throws Exception {
util.flush();
}
});
List<AtomicGetReader> getters = Lists.newArrayList();
for (int i = 0; i < numGetters; i++) {
@ -288,7 +294,6 @@ public class TestAcidGuarantees {
}
@Test
@Ignore("Currently not passing - see HBASE-2856")
public void testGetAtomicity() throws Exception {
util.startMiniCluster(1);
try {
@ -299,7 +304,6 @@ public class TestAcidGuarantees {
}
@Test
@Ignore("Currently not passing - see HBASE-2670")
public void testScanAtomicity() throws Exception {
util.startMiniCluster(1);
try {
@ -310,7 +314,6 @@ public class TestAcidGuarantees {
}
@Test
@Ignore("Currently not passing - see HBASE-2670")
public void testMixedAtomicity() throws Exception {
util.startMiniCluster(1);
try {
@ -324,7 +327,7 @@ public class TestAcidGuarantees {
Configuration c = HBaseConfiguration.create();
TestAcidGuarantees test = new TestAcidGuarantees();
test.setConf(c);
test.runTestAtomicity(5*60*1000, 5, 2, 2, 3);
test.runTestAtomicity(5000, 50, 2, 2, 3);
}
private void setConf(Configuration c) {

View File

@ -185,9 +185,10 @@ public class TestCacheOnWrite {
}
LOG.info("Block count by type: " + blockCountByType);
String countByType = blockCountByType.toString();
assertEquals(
"{DATA=1367, LEAF_INDEX=172, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=24}",
blockCountByType.toString());
"{DATA=1379, LEAF_INDEX=173, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=24}",
countByType);
reader.close();
}

View File

@ -22,6 +22,8 @@ package org.apache.hadoop.hbase.io.hfile;
import static org.junit.Assert.*;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@ -35,8 +37,13 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -116,10 +123,36 @@ public class TestHFileWriterV2 {
HFileBlock.FSReader blockReader =
new HFileBlock.FSReaderV2(fsdis, COMPRESS_ALGO, fileSize);
// Comparator class name is stored in the trailer in version 2.
RawComparator<byte []> comparator = trailer.createComparator();
HFileBlockIndex.BlockIndexReader dataBlockIndexReader = new HFileBlockIndex.BlockIndexReader(comparator,
trailer.getNumDataIndexLevels());
HFileBlockIndex.BlockIndexReader metaBlockIndexReader = new HFileBlockIndex.BlockIndexReader(
Bytes.BYTES_RAWCOMPARATOR, 1);
HFileBlock.BlockIterator blockIter = blockReader.blockRange(
trailer.getLoadOnOpenDataOffset(),
fileSize - trailer.getTrailerSize());
// Data index. We also read statistics about the block index written after
// the root level.
dataBlockIndexReader.readMultiLevelIndexRoot(
blockIter.nextBlockAsStream(BlockType.ROOT_INDEX),
trailer.getDataIndexCount());
// Meta index.
metaBlockIndexReader.readRootIndex(
blockIter.nextBlockAsStream(BlockType.ROOT_INDEX),
trailer.getMetaIndexCount());
// File info
FileInfo fileInfo = new FileInfo();
fileInfo.readFields(blockIter.nextBlockAsStream(BlockType.FILE_INFO));
byte [] keyValueFormatVersion = fileInfo.get(HFileWriterV2.KEY_VALUE_VERSION);
boolean includeMemstoreTS = (keyValueFormatVersion != null && Bytes.toInt(keyValueFormatVersion) > 0);
// Counters for the number of key/value pairs and the number of blocks
int entriesRead = 0;
int blocksRead = 0;
long memstoreTS = 0;
// Scan blocks the way the reader would scan them
fsdis.seek(0);
@ -138,6 +171,15 @@ public class TestHFileWriterV2 {
byte[] value = new byte[valueLen];
buf.get(value);
if (includeMemstoreTS) {
ByteArrayInputStream byte_input = new ByteArrayInputStream(buf.array(),
buf.arrayOffset() + buf.position(), buf.remaining());
DataInputStream data_input = new DataInputStream(byte_input);
memstoreTS = WritableUtils.readVLong(data_input);
buf.position(buf.position() + WritableUtils.getVIntSize(memstoreTS));
}
// A brute-force check to see that all keys and values are correct.
assertTrue(Bytes.compareTo(key, keys.get(entriesRead)) == 0);
assertTrue(Bytes.compareTo(value, values.get(entriesRead)) == 0);

View File

@ -818,7 +818,8 @@ public class TestStoreFile extends HBaseTestCase {
for (int i=numKVs;i>0;i--) {
KeyValue kv = new KeyValue(b, b, b, i, b);
kvs.add(kv);
totalSize += kv.getLength();
// kv has memstoreTS 0, which takes 1 byte to store.
totalSize += kv.getLength() + 1;
}
int blockSize = totalSize / numBlocks;
StoreFile.Writer writer = new StoreFile.Writer(fs, path, blockSize,