diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java index 6723b615ba6..9123e70be08 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java @@ -262,7 +262,6 @@ public abstract class AbstractHFileReader extends SchemaConfigured } protected static abstract class Scanner implements HFileScanner { - protected HFile.Reader reader; protected ByteBuffer blockBuffer; protected boolean cacheBlocks; @@ -271,22 +270,18 @@ public abstract class AbstractHFileReader extends SchemaConfigured protected int currKeyLen; protected int currValueLen; + protected int currMemstoreTSLen; + protected long currMemstoreTS; protected int blockFetches; - public Scanner(final HFile.Reader reader, final boolean cacheBlocks, + public Scanner(final boolean cacheBlocks, final boolean pread, final boolean isCompaction) { - this.reader = reader; this.cacheBlocks = cacheBlocks; this.pread = pread; this.isCompaction = isCompaction; } - @Override - public Reader getReader() { - return reader; - } - @Override public boolean isSeeked(){ return blockBuffer != null; @@ -294,7 +289,7 @@ public abstract class AbstractHFileReader extends SchemaConfigured @Override public String toString() { - return "HFileScanner for reader " + String.valueOf(reader); + return "HFileScanner for reader " + String.valueOf(getReader()); } protected void assertSeeked() { diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java index 69589eb4f5b..1eb316ae046 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java @@ -385,13 +385,13 @@ public class HFileReaderV1 extends AbstractHFileReader { * Implementation of {@link HFileScanner} interface. */ protected static class ScannerV1 extends AbstractHFileReader.Scanner { - private final HFileReaderV1 readerV1; + private final HFileReaderV1 reader; private int currBlock; public ScannerV1(HFileReaderV1 reader, boolean cacheBlocks, final boolean pread, final boolean isCompaction) { - super(reader, cacheBlocks, pread, isCompaction); - readerV1 = reader; + super(cacheBlocks, pread, isCompaction); + this.reader = reader; } @Override @@ -458,7 +458,7 @@ public class HFileReaderV1 extends AbstractHFileReader { blockBuffer = null; return false; } - blockBuffer = readerV1.readBlockBuffer(currBlock, cacheBlocks, pread, + blockBuffer = reader.readBlockBuffer(currBlock, cacheBlocks, pread, isCompaction); currKeyLen = blockBuffer.getInt(); currValueLen = blockBuffer.getInt(); @@ -478,7 +478,7 @@ public class HFileReaderV1 extends AbstractHFileReader { @Override public int seekTo(byte[] key, int offset, int length) throws IOException { - int b = readerV1.blockContainingKey(key, offset, length); + int b = reader.blockContainingKey(key, offset, length); if (b < 0) return -1; // falls before the beginning of the file! :-( // Avoid re-reading the same block (that'd be dumb). loadBlock(b, true); @@ -504,7 +504,7 @@ public class HFileReaderV1 extends AbstractHFileReader { } } - int b = readerV1.blockContainingKey(key, offset, length); + int b = reader.blockContainingKey(key, offset, length); if (b < 0) { return -1; } @@ -571,7 +571,7 @@ public class HFileReaderV1 extends AbstractHFileReader { @Override public boolean seekBefore(byte[] key, int offset, int length) throws IOException { - int b = readerV1.blockContainingKey(key, offset, length); + int b = reader.blockContainingKey(key, offset, length); if (b < 0) return false; // key is before the start of the file. @@ -623,7 +623,7 @@ public class HFileReaderV1 extends AbstractHFileReader { return true; } currBlock = 0; - blockBuffer = readerV1.readBlockBuffer(currBlock, cacheBlocks, pread, + blockBuffer = reader.readBlockBuffer(currBlock, cacheBlocks, pread, isCompaction); currKeyLen = blockBuffer.getInt(); currValueLen = blockBuffer.getInt(); @@ -633,13 +633,13 @@ public class HFileReaderV1 extends AbstractHFileReader { private void loadBlock(int bloc, boolean rewind) throws IOException { if (blockBuffer == null) { - blockBuffer = readerV1.readBlockBuffer(bloc, cacheBlocks, pread, + blockBuffer = reader.readBlockBuffer(bloc, cacheBlocks, pread, isCompaction); currBlock = bloc; blockFetches++; } else { if (bloc != currBlock) { - blockBuffer = readerV1.readBlockBuffer(bloc, cacheBlocks, pread, + blockBuffer = reader.readBlockBuffer(bloc, cacheBlocks, pread, isCompaction); currBlock = bloc; blockFetches++; diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java index 64d9fbc4677..d21dd82f293 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java @@ -19,7 +19,9 @@ */ package org.apache.hadoop.hbase.io.hfile; +import java.io.ByteArrayInputStream; import java.io.DataInput; +import java.io.DataInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -34,6 +36,7 @@ import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory; import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.IdLock; +import org.apache.hadoop.io.WritableUtils; /** * {@link HFile} reader for version 2. @@ -46,7 +49,13 @@ public class HFileReaderV2 extends AbstractHFileReader { * The size of a (key length, value length) tuple that prefixes each entry in * a data block. */ - private static final int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT; + private static int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT; + + private boolean includesMemstoreTS = false; + + private boolean shouldIncludeMemstoreTS() { + return includesMemstoreTS; + } /** * A "sparse lock" implementation allowing to lock on a particular block @@ -115,6 +124,9 @@ public class HFileReaderV2 extends AbstractHFileReader { lastKey = fileInfo.get(FileInfo.LASTKEY); avgKeyLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_KEY_LEN)); avgValueLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_VALUE_LEN)); + byte [] keyValueFormatVersion = fileInfo.get(HFileWriterV2.KEY_VALUE_VERSION); + includesMemstoreTS = (keyValueFormatVersion != null && + Bytes.toInt(keyValueFormatVersion) == HFileWriterV2.KEY_VALUE_VER_WITH_MEMSTORE); // Store all other load-on-open blocks for further consumption. HFileBlock b; @@ -333,10 +345,17 @@ public class HFileReaderV2 extends AbstractHFileReader { */ protected static class ScannerV2 extends AbstractHFileReader.Scanner { private HFileBlock block; + private HFileReaderV2 reader; public ScannerV2(HFileReaderV2 r, boolean cacheBlocks, final boolean pread, final boolean isCompaction) { - super(r, cacheBlocks, pread, isCompaction); + super(cacheBlocks, pread, isCompaction); + this.reader = r; + } + + @Override + public HFileReaderV2 getReader() { + return reader; } @Override @@ -344,8 +363,12 @@ public class HFileReaderV2 extends AbstractHFileReader { if (!isSeeked()) return null; - return new KeyValue(blockBuffer.array(), blockBuffer.arrayOffset() - + blockBuffer.position()); + KeyValue ret = new KeyValue(blockBuffer.array(), blockBuffer.arrayOffset() + + blockBuffer.position()); + if (this.reader.shouldIncludeMemstoreTS()) { + ret.setMemstoreTS(currMemstoreTS); + } + return ret; } @Override @@ -371,6 +394,8 @@ public class HFileReaderV2 extends AbstractHFileReader { blockBuffer = null; currKeyLen = 0; currValueLen = 0; + currMemstoreTS = 0; + currMemstoreTSLen = 0; } /** @@ -386,7 +411,7 @@ public class HFileReaderV2 extends AbstractHFileReader { try { blockBuffer.position(blockBuffer.position() + KEY_VALUE_LEN_SIZE - + currKeyLen + currValueLen); + + currKeyLen + currValueLen + currMemstoreTSLen); } catch (IllegalArgumentException e) { LOG.error("Current pos = " + blockBuffer.position() + "; currKeyLen = " + currKeyLen + "; currValLen = " @@ -579,6 +604,19 @@ public class HFileReaderV2 extends AbstractHFileReader { currKeyLen = blockBuffer.getInt(); currValueLen = blockBuffer.getInt(); blockBuffer.reset(); + if (this.reader.shouldIncludeMemstoreTS()) { + try { + ByteArrayInputStream byte_input = new ByteArrayInputStream(blockBuffer.array()); + byte_input.skip(blockBuffer.arrayOffset() + blockBuffer.position() + + KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen); + DataInputStream data_input = new DataInputStream(byte_input); + + currMemstoreTS = WritableUtils.readVLong(data_input); + currMemstoreTSLen = WritableUtils.getVIntSize(currMemstoreTS); + } catch (Exception e) { + throw new RuntimeException("Error reading memstoreTS. " + e); + } + } if (currKeyLen < 0 || currValueLen < 0 || currKeyLen > blockBuffer.limit() @@ -606,12 +644,27 @@ public class HFileReaderV2 extends AbstractHFileReader { private int blockSeek(byte[] key, int offset, int length, boolean seekBefore) { int klen, vlen; + long memstoreTS = 0; + int memstoreTSLen = 0; int lastKeyValueSize = -1; do { blockBuffer.mark(); klen = blockBuffer.getInt(); vlen = blockBuffer.getInt(); blockBuffer.reset(); + if (this.reader.shouldIncludeMemstoreTS()) { + try { + ByteArrayInputStream byte_input = new ByteArrayInputStream(blockBuffer.array()); + byte_input.skip(blockBuffer.arrayOffset() + blockBuffer.position() + + KEY_VALUE_LEN_SIZE + klen + vlen); + DataInputStream data_input = new DataInputStream(byte_input); + + memstoreTS = WritableUtils.readVLong(data_input); + memstoreTSLen = WritableUtils.getVIntSize(memstoreTS); + } catch (Exception e) { + throw new RuntimeException("Error reading memstoreTS. " + e); + } + } int keyOffset = blockBuffer.arrayOffset() + blockBuffer.position() + KEY_VALUE_LEN_SIZE; @@ -633,6 +686,10 @@ public class HFileReaderV2 extends AbstractHFileReader { } currKeyLen = klen; currValueLen = vlen; + if (this.reader.shouldIncludeMemstoreTS()) { + currMemstoreTS = memstoreTS; + currMemstoreTSLen = memstoreTSLen; + } return 0; // indicate exact match } @@ -644,7 +701,7 @@ public class HFileReaderV2 extends AbstractHFileReader { } // The size of this key/value tuple, including key/value length fields. - lastKeyValueSize = klen + vlen + KEY_VALUE_LEN_SIZE; + lastKeyValueSize = klen + vlen + memstoreTSLen + KEY_VALUE_LEN_SIZE; blockBuffer.position(blockBuffer.position() + lastKeyValueSize); } while (blockBuffer.remaining() > 0); diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java index 92dcec41865..e8d74b6678e 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; import org.apache.hadoop.hbase.util.BloomFilterWriter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; /** * Writes HFile format version 2. @@ -47,6 +48,13 @@ import org.apache.hadoop.io.Writable; public class HFileWriterV2 extends AbstractHFileWriter { static final Log LOG = LogFactory.getLog(HFileWriterV2.class); + /** Max memstore (rwcc) timestamp in FileInfo */ + public static final byte [] MAX_MEMSTORE_TS_KEY = Bytes.toBytes("MAX_MEMSTORE_TS_KEY"); + /** KeyValue version in FileInfo */ + public static final byte [] KEY_VALUE_VERSION = Bytes.toBytes("KEY_VALUE_VERSION"); + /** Version for KeyValue which includes memstore timestamp */ + public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1; + /** Inline block writers for multi-level block index and compound Blooms. */ private List inlineBlockWriters = new ArrayList(); @@ -67,6 +75,9 @@ public class HFileWriterV2 extends AbstractHFileWriter { private List additionalLoadOnOpenData = new ArrayList(); + private final boolean includeMemstoreTS = true; + private long maxMemstoreTS = 0; + static class WriterFactoryV2 extends HFile.WriterFactory { WriterFactoryV2(Configuration conf, CacheConfig cacheConf) { @@ -311,8 +322,9 @@ public class HFileWriterV2 extends AbstractHFileWriter { */ @Override public void append(final KeyValue kv) throws IOException { - append(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(), + append(kv.getMemstoreTS(), kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(), kv.getBuffer(), kv.getValueOffset(), kv.getValueLength()); + this.maxMemstoreTS = Math.max(this.maxMemstoreTS, kv.getMemstoreTS()); } /** @@ -327,7 +339,7 @@ public class HFileWriterV2 extends AbstractHFileWriter { */ @Override public void append(final byte[] key, final byte[] value) throws IOException { - append(key, 0, key.length, value, 0, value.length); + append(0, key, 0, key.length, value, 0, value.length); } /** @@ -342,7 +354,7 @@ public class HFileWriterV2 extends AbstractHFileWriter { * @param vlength * @throws IOException */ - private void append(final byte[] key, final int koffset, final int klength, + private void append(final long memstoreTS, final byte[] key, final int koffset, final int klength, final byte[] value, final int voffset, final int vlength) throws IOException { boolean dupKey = checkKey(key, koffset, klength); @@ -355,6 +367,7 @@ public class HFileWriterV2 extends AbstractHFileWriter { newBlock(); // Write length of key and value and then actual key and value bytes. + // Additionally, we may also write down the memstoreTS. { DataOutputStream out = fsBlockWriter.getUserDataStream(); out.writeInt(klength); @@ -363,6 +376,9 @@ public class HFileWriterV2 extends AbstractHFileWriter { totalValueLength += vlength; out.write(key, koffset, klength); out.write(value, voffset, vlength); + if (this.includeMemstoreTS) { + WritableUtils.writeVLong(out, memstoreTS); + } } // Are we the first key in this block? @@ -378,6 +394,10 @@ public class HFileWriterV2 extends AbstractHFileWriter { entryCount++; } + public static int getEncodedLength(long value) { + return WritableUtils.getVIntSize(value); + } + @Override public void close() throws IOException { if (outputStream == null) { @@ -428,6 +448,11 @@ public class HFileWriterV2 extends AbstractHFileWriter { fsBlockWriter.writeHeaderAndData(outputStream); totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader(); + if (this.includeMemstoreTS) { + appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS)); + appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE)); + } + // File info writeFileInfo(trailer, fsBlockWriter.startWriting(BlockType.FILE_INFO, false)); @@ -449,6 +474,7 @@ public class HFileWriterV2 extends AbstractHFileWriter { trailer.setComparatorClass(comparator.getClass()); trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries()); + finishClose(trailer); fsBlockWriter.releaseCompressor(); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 00fd0863a55..654b64cadb2 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -212,6 +212,29 @@ public class HRegion implements HeapSize { // , Writable{ final Path regiondir; KeyValue.KVComparator comparator; + private ConcurrentHashMap scannerReadPoints; + + /* + * @return The smallest rwcc readPoint across all the scanners in this + * region. Writes older than this readPoint, are included in every + * read operation. + */ + public long getSmallestReadPoint() { + long minimumReadPoint; + // We need to ensure that while we are calculating the smallestReadPoint + // no new RegionScanners can grab a readPoint that we are unaware of. + // We achieve this by synchronizing on the scannerReadPoints object. + synchronized(scannerReadPoints) { + minimumReadPoint = rwcc.memstoreReadPoint(); + + for (Long readPoint: this.scannerReadPoints.values()) { + if (readPoint < minimumReadPoint) { + minimumReadPoint = readPoint; + } + } + } + return minimumReadPoint; + } /* * Data structure of write state flags used coordinating flushes, * compactions and closes. @@ -371,6 +394,7 @@ public class HRegion implements HeapSize { // , Writable{ this.htableDescriptor = null; this.threadWakeFrequency = 0L; this.coprocessorHost = null; + this.scannerReadPoints = new ConcurrentHashMap(); } /** @@ -414,6 +438,7 @@ public class HRegion implements HeapSize { // , Writable{ String encodedNameStr = this.regionInfo.getEncodedName(); setHTableSpecificConf(); this.regiondir = getRegionDir(this.tableDir, encodedNameStr); + this.scannerReadPoints = new ConcurrentHashMap(); // don't initialize coprocessors if not running within a regionserver // TODO: revisit if coprocessors should load in other cases @@ -495,6 +520,8 @@ public class HRegion implements HeapSize { // , Writable{ // min across all the max. long minSeqId = -1; long maxSeqId = -1; + // initialized to -1 so that we pick up MemstoreTS from column families + long maxMemstoreTS = -1; for (HColumnDescriptor c : this.htableDescriptor.getFamilies()) { status.setStatus("Instantiating store for column family " + c); Store store = instantiateHStore(this.tableDir, c); @@ -506,7 +533,12 @@ public class HRegion implements HeapSize { // , Writable{ if (maxSeqId == -1 || storeSeqId > maxSeqId) { maxSeqId = storeSeqId; } + long maxStoreMemstoreTS = store.getMaxMemstoreTS(); + if (maxStoreMemstoreTS > maxMemstoreTS) { + maxMemstoreTS = maxStoreMemstoreTS; + } } + rwcc.initialize(maxMemstoreTS + 1); // Recover any edits if available. maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny( this.regiondir, minSeqId, reporter, status)); @@ -1666,6 +1698,8 @@ public class HRegion implements HeapSize { // , Writable{ this.put(put, lockid, put.getWriteToWAL()); } + + /** * @param put * @param lockid @@ -2285,6 +2319,7 @@ public class HRegion implements HeapSize { // , Writable{ rwcc.completeMemstoreInsert(localizedWriteEntry); } } + return size; } @@ -2963,6 +2998,7 @@ public class HRegion implements HeapSize { // , Writable{ } RegionScannerImpl(Scan scan, List additionalScanners) throws IOException { //DebugPrint.println("HRegionScanner."); + this.filter = scan.getFilter(); this.batch = scan.getBatch(); if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) { @@ -2974,7 +3010,12 @@ public class HRegion implements HeapSize { // , Writable{ // it is [startRow,endRow) and if startRow=endRow we get nothing. this.isScan = scan.isGetScan() ? -1 : 0; - this.readPt = ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + // synchronize on scannerReadPoints so that nobody calculates + // getSmallestReadPoint, before scannerReadPoints is updated. + synchronized(scannerReadPoints) { + this.readPt = ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + scannerReadPoints.put(this, this.readPt); + } List scanners = new ArrayList(); if (additionalScanners != null) { @@ -2984,7 +3025,9 @@ public class HRegion implements HeapSize { // , Writable{ for (Map.Entry> entry : scan.getFamilyMap().entrySet()) { Store store = stores.get(entry.getKey()); - scanners.add(store.getScanner(scan, entry.getValue())); + StoreScanner scanner = store.getScanner(scan, entry.getValue()); + scanner.useRWCC(true); + scanners.add(scanner); } this.storeHeap = new KeyValueHeap(scanners, comparator); } @@ -3135,6 +3178,8 @@ public class HRegion implements HeapSize { // , Writable{ storeHeap.close(); storeHeap = null; } + // no need to sychronize here. + scannerReadPoints.remove(this); this.filterClosed = true; } @@ -4186,7 +4231,7 @@ public class HRegion implements HeapSize { // , Writable{ public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + ClassSize.ARRAY + - 28 * ClassSize.REFERENCE + Bytes.SIZEOF_INT + + 29 * ClassSize.REFERENCE + Bytes.SIZEOF_INT + (4 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN); @@ -4195,7 +4240,7 @@ public class HRegion implements HeapSize { // , Writable{ (2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing ClassSize.ATOMIC_LONG + // memStoreSize ClassSize.ATOMIC_INTEGER + // lockIdGenerator - (2 * ClassSize.CONCURRENT_HASHMAP) + // lockedRows, lockIds + (3 * ClassSize.CONCURRENT_HASHMAP) + // lockedRows, lockIds, scannerReadPoints WriteState.HEAP_SIZE + // writestate ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores (2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java b/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java index e68d9866f31..10918188ce5 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java @@ -24,6 +24,9 @@ import java.util.LinkedList; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.Log; + /** * Manages the read/write consistency within memstore. This provides * an interface for readers to determine what entries to ignore, and @@ -43,6 +46,31 @@ public class ReadWriteConsistencyControl { private static final ThreadLocal perThreadReadPoint = new ThreadLocal(); + /** + * Default constructor. Initializes the memstoreRead/Write points to 0. + */ + public ReadWriteConsistencyControl() { + this.memstoreRead = this.memstoreWrite = 0; + } + + /** + * Initializes the memstoreRead/Write points appropriately. + * @param startPoint + */ + public void initialize(long startPoint) { + synchronized (writeQueue) { + if (this.memstoreWrite != this.memstoreRead) { + throw new RuntimeException("Already used this rwcc. Too late to initialize"); + } + + if (this.memstoreWrite > startPoint) { + throw new RuntimeException("Cannot decrease RWCC timestamp"); + } + + this.memstoreRead = this.memstoreWrite = startPoint; + } + } + /** * Get this thread's read point. Used primarily by the memstore scanner to * know which values to skip (ie: have not been completed/committed to @@ -151,6 +179,7 @@ public class ReadWriteConsistencyControl { } } if (interrupted) Thread.currentThread().interrupt(); + } public long memstoreReadPoint() { diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java b/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java index acea2f2201e..4da8595ec95 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java @@ -92,6 +92,12 @@ public class ScanQueryMatcher { */ private final long earliestPutTs; + /** Should we ignore KV's with a newer RWCC timestamp **/ + private boolean enforceRWCC = false; + public void useRWCC(boolean flag) { + this.enforceRWCC = flag; + } + /** * This variable shows whether there is an null column in the query. There * always exists a null column in the wildcard column query. @@ -228,6 +234,13 @@ public class ScanQueryMatcher { return columns.getNextRowOrNextColumn(bytes, offset, qualLength); } + // The compaction thread has no readPoint set. For other operations, we + // will ignore updates that are done after the read operation has started. + if (this.enforceRWCC && + kv.getMemstoreTS() > ReadWriteConsistencyControl.getThreadReadPoint()) { + return MatchCode.SKIP; + } + /* * The delete logic is pretty complicated now. * This is corroborated by the following: diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index fec5547a6c3..d8755f5bfbf 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -237,6 +237,13 @@ public class Store extends SchemaConfigured implements HeapSize { return StoreFile.getMaxSequenceIdInList(this.getStorefiles()); } + /** + * @return The maximum memstoreTS in all store files. + */ + public long getMaxMemstoreTS() { + return StoreFile.getMaxMemstoreTSInList(this.getStorefiles()); + } + /** * @param tabledir * @param encodedName Encoded region name. @@ -507,6 +514,9 @@ public class Store extends SchemaConfigured implements HeapSize { MonitoredTask status) throws IOException { StoreFile.Writer writer; + String fileName; + // Find the smallest read point across all the Scanners. + long smallestReadPoint = region.getSmallestReadPoint(); long flushed = 0; Path pathName; // Don't flush if there are no entries. @@ -538,6 +548,11 @@ public class Store extends SchemaConfigured implements HeapSize { hasMore = scanner.next(kvs); if (!kvs.isEmpty()) { for (KeyValue kv : kvs) { + // If we know that this KV is going to be included always, then let us + // set its memstoreTS to 0. This will help us save space when writing to disk. + if (kv.getMemstoreTS() <= smallestReadPoint) { + kv.setMemstoreTS(0); + } writer.append(kv); flushed += this.memstore.heapSizeChange(kv, true); } @@ -1224,6 +1239,8 @@ public class Store extends SchemaConfigured implements HeapSize { // Make the instantiation lazy in case compaction produces no product; i.e. // where all source cells are expired or deleted. StoreFile.Writer writer = null; + // Find the smallest read point across all the Scanners. + long smallestReadPoint = region.getSmallestReadPoint(); try { InternalScanner scanner = null; try { @@ -1259,6 +1276,9 @@ public class Store extends SchemaConfigured implements HeapSize { if (writer != null) { // output to writer: for (KeyValue kv : kvs) { + if (kv.getMemstoreTS() <= smallestReadPoint) { + kv.setMemstoreTS(0); + } writer.append(kv); // update progress per key ++progress.currentCompactedKVs; @@ -1707,7 +1727,7 @@ public class Store extends SchemaConfigured implements HeapSize { * Return a scanner for both the memstore and the HStore files * @throws IOException */ - public KeyValueScanner getScanner(Scan scan, + public StoreScanner getScanner(Scan scan, final NavigableSet targetCols) throws IOException { lock.readLock().lock(); try { diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 8fa63b879a7..fdcf3123997 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.io.hfile.HFileWriterV1; +import org.apache.hadoop.hbase.io.hfile.HFileWriterV2; import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured; import org.apache.hadoop.hbase.util.BloomFilter; @@ -156,6 +157,18 @@ public class StoreFile { // Set when we obtain a Reader. private long sequenceid = -1; + // max of the MemstoreTS in the KV's in this store + // Set when we obtain a Reader. + private long maxMemstoreTS = -1; + + public long getMaxMemstoreTS() { + return maxMemstoreTS; + } + + public void setMaxMemstoreTS(long maxMemstoreTS) { + this.maxMemstoreTS = maxMemstoreTS; + } + // If true, this file was product of a major compaction. Its then set // whenever you get a Reader. private AtomicBoolean majorCompaction = null; @@ -342,6 +355,24 @@ public class StoreFile { return modificationTimeStamp; } + /** + * Return the largest memstoreTS found across all storefiles in + * the given list. Store files that were created by a mapreduce + * bulk load are ignored, as they do not correspond to any specific + * put operation, and thus do not have a memstoreTS associated with them. + * @return 0 if no non-bulk-load files are provided or, this is Store that + * does not yet have any store files. + */ + public static long getMaxMemstoreTSInList(Collection sfs) { + long max = 0; + for (StoreFile sf : sfs) { + if (!sf.isBulkLoadResult()) { + max = Math.max(max, sf.getMaxMemstoreTS()); + } + } + return max; + } + /** * Return the highest sequence ID found across all storefiles in * the given list. Store files that were created by a mapreduce @@ -491,6 +522,11 @@ public class StoreFile { } this.reader.setSequenceID(this.sequenceid); + b = metadataMap.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY); + if (b != null) { + this.maxMemstoreTS = Bytes.toLong(b); + } + b = metadataMap.get(MAJOR_COMPACTION_KEY); if (b != null) { boolean mc = Bytes.toBoolean(b); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 6512a54f78f..d1e8a2c102b 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -159,6 +159,15 @@ class StoreScanner extends NonLazyKeyValueScanner heap = new KeyValueHeap(scanners, scanInfo.getComparator()); } + /** + * Advise the StoreScanner if it should enforce the RWCC mechanism + * for ignoring newer KVs or not. + * @param flag + */ + public void useRWCC(boolean flag) { + matcher.useRWCC(flag); + } + /* * @return List of scanners ordered properly. */ diff --git a/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java b/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java index 0ec7687a7ea..b479e5f0923 100644 --- a/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java +++ b/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java @@ -252,6 +252,12 @@ public class TestAcidGuarantees { writers.add(writer); ctx.addThread(writer); } + // Add a flusher + ctx.addThread(new RepeatingTestThread(ctx) { + public void doAnAction() throws Exception { + util.flush(); + } + }); List getters = Lists.newArrayList(); for (int i = 0; i < numGetters; i++) { @@ -288,7 +294,6 @@ public class TestAcidGuarantees { } @Test - @Ignore("Currently not passing - see HBASE-2856") public void testGetAtomicity() throws Exception { util.startMiniCluster(1); try { @@ -299,7 +304,6 @@ public class TestAcidGuarantees { } @Test - @Ignore("Currently not passing - see HBASE-2670") public void testScanAtomicity() throws Exception { util.startMiniCluster(1); try { @@ -310,7 +314,6 @@ public class TestAcidGuarantees { } @Test - @Ignore("Currently not passing - see HBASE-2670") public void testMixedAtomicity() throws Exception { util.startMiniCluster(1); try { @@ -324,7 +327,7 @@ public class TestAcidGuarantees { Configuration c = HBaseConfiguration.create(); TestAcidGuarantees test = new TestAcidGuarantees(); test.setConf(c); - test.runTestAtomicity(5*60*1000, 5, 2, 2, 3); + test.runTestAtomicity(5000, 50, 2, 2, 3); } private void setConf(Configuration c) { diff --git a/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java index 812f11ff1ac..58fd1552879 100644 --- a/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java +++ b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java @@ -185,9 +185,10 @@ public class TestCacheOnWrite { } LOG.info("Block count by type: " + blockCountByType); + String countByType = blockCountByType.toString(); assertEquals( - "{DATA=1367, LEAF_INDEX=172, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=24}", - blockCountByType.toString()); + "{DATA=1379, LEAF_INDEX=173, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=24}", + countByType); reader.close(); } diff --git a/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java index b144ce8dd5e..371195fc354 100644 --- a/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java +++ b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java @@ -22,6 +22,8 @@ package org.apache.hadoop.hbase.io.hfile; import static org.junit.Assert.*; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -35,8 +37,13 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableUtils; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -116,10 +123,36 @@ public class TestHFileWriterV2 { HFileBlock.FSReader blockReader = new HFileBlock.FSReaderV2(fsdis, COMPRESS_ALGO, fileSize); + // Comparator class name is stored in the trailer in version 2. + RawComparator comparator = trailer.createComparator(); + HFileBlockIndex.BlockIndexReader dataBlockIndexReader = new HFileBlockIndex.BlockIndexReader(comparator, + trailer.getNumDataIndexLevels()); + HFileBlockIndex.BlockIndexReader metaBlockIndexReader = new HFileBlockIndex.BlockIndexReader( + Bytes.BYTES_RAWCOMPARATOR, 1); + + HFileBlock.BlockIterator blockIter = blockReader.blockRange( + trailer.getLoadOnOpenDataOffset(), + fileSize - trailer.getTrailerSize()); + // Data index. We also read statistics about the block index written after + // the root level. + dataBlockIndexReader.readMultiLevelIndexRoot( + blockIter.nextBlockAsStream(BlockType.ROOT_INDEX), + trailer.getDataIndexCount()); + + // Meta index. + metaBlockIndexReader.readRootIndex( + blockIter.nextBlockAsStream(BlockType.ROOT_INDEX), + trailer.getMetaIndexCount()); + // File info + FileInfo fileInfo = new FileInfo(); + fileInfo.readFields(blockIter.nextBlockAsStream(BlockType.FILE_INFO)); + byte [] keyValueFormatVersion = fileInfo.get(HFileWriterV2.KEY_VALUE_VERSION); + boolean includeMemstoreTS = (keyValueFormatVersion != null && Bytes.toInt(keyValueFormatVersion) > 0); // Counters for the number of key/value pairs and the number of blocks int entriesRead = 0; int blocksRead = 0; + long memstoreTS = 0; // Scan blocks the way the reader would scan them fsdis.seek(0); @@ -138,6 +171,15 @@ public class TestHFileWriterV2 { byte[] value = new byte[valueLen]; buf.get(value); + if (includeMemstoreTS) { + ByteArrayInputStream byte_input = new ByteArrayInputStream(buf.array(), + buf.arrayOffset() + buf.position(), buf.remaining()); + DataInputStream data_input = new DataInputStream(byte_input); + + memstoreTS = WritableUtils.readVLong(data_input); + buf.position(buf.position() + WritableUtils.getVIntSize(memstoreTS)); + } + // A brute-force check to see that all keys and values are correct. assertTrue(Bytes.compareTo(key, keys.get(entriesRead)) == 0); assertTrue(Bytes.compareTo(value, values.get(entriesRead)) == 0); diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java index fee22cecf16..2bc0c59c827 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java @@ -818,7 +818,8 @@ public class TestStoreFile extends HBaseTestCase { for (int i=numKVs;i>0;i--) { KeyValue kv = new KeyValue(b, b, b, i, b); kvs.add(kv); - totalSize += kv.getLength(); + // kv has memstoreTS 0, which takes 1 byte to store. + totalSize += kv.getLength() + 1; } int blockSize = totalSize / numBlocks; StoreFile.Writer writer = new StoreFile.Writer(fs, path, blockSize,