HBASE-4544 change RWCC to MVCC

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1203468 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Nicolas Spiegelberg 2011-11-18 02:19:37 +00:00
parent 1e68440431
commit e7fb30c371
10 changed files with 112 additions and 112 deletions

View File

@ -49,7 +49,7 @@ import org.apache.hadoop.io.WritableUtils;
public class HFileWriterV2 extends AbstractHFileWriter { public class HFileWriterV2 extends AbstractHFileWriter {
static final Log LOG = LogFactory.getLog(HFileWriterV2.class); static final Log LOG = LogFactory.getLog(HFileWriterV2.class);
/** Max memstore (rwcc) timestamp in FileInfo */ /** Max memstore (mvcc) timestamp in FileInfo */
public static final byte [] MAX_MEMSTORE_TS_KEY = Bytes.toBytes("MAX_MEMSTORE_TS_KEY"); public static final byte [] MAX_MEMSTORE_TS_KEY = Bytes.toBytes("MAX_MEMSTORE_TS_KEY");
/** KeyValue version in FileInfo */ /** KeyValue version in FileInfo */
public static final byte [] KEY_VALUE_VERSION = Bytes.toBytes("KEY_VALUE_VERSION"); public static final byte [] KEY_VALUE_VERSION = Bytes.toBytes("KEY_VALUE_VERSION");

View File

@ -215,7 +215,7 @@ public class HRegion implements HeapSize { // , Writable{
private ConcurrentHashMap<RegionScanner, Long> scannerReadPoints; private ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
/* /*
* @return The smallest rwcc readPoint across all the scanners in this * @return The smallest mvcc readPoint across all the scanners in this
* region. Writes older than this readPoint, are included in every * region. Writes older than this readPoint, are included in every
* read operation. * read operation.
*/ */
@ -225,7 +225,7 @@ public class HRegion implements HeapSize { // , Writable{
// no new RegionScanners can grab a readPoint that we are unaware of. // no new RegionScanners can grab a readPoint that we are unaware of.
// We achieve this by synchronizing on the scannerReadPoints object. // We achieve this by synchronizing on the scannerReadPoints object.
synchronized(scannerReadPoints) { synchronized(scannerReadPoints) {
minimumReadPoint = rwcc.memstoreReadPoint(); minimumReadPoint = mvcc.memstoreReadPoint();
for (Long readPoint: this.scannerReadPoints.values()) { for (Long readPoint: this.scannerReadPoints.values()) {
if (readPoint < minimumReadPoint) { if (readPoint < minimumReadPoint) {
@ -291,8 +291,8 @@ public class HRegion implements HeapSize { // , Writable{
private boolean splitRequest; private boolean splitRequest;
private byte[] explicitSplitPoint = null; private byte[] explicitSplitPoint = null;
private final ReadWriteConsistencyControl rwcc = private final MultiVersionConsistencyControl mvcc =
new ReadWriteConsistencyControl(); new MultiVersionConsistencyControl();
// Coprocessor host // Coprocessor host
private RegionCoprocessorHost coprocessorHost; private RegionCoprocessorHost coprocessorHost;
@ -538,7 +538,7 @@ public class HRegion implements HeapSize { // , Writable{
maxMemstoreTS = maxStoreMemstoreTS; maxMemstoreTS = maxStoreMemstoreTS;
} }
} }
rwcc.initialize(maxMemstoreTS + 1); mvcc.initialize(maxMemstoreTS + 1);
// Recover any edits if available. // Recover any edits if available.
maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny( maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(
this.regiondir, minSeqId, reporter, status)); this.regiondir, minSeqId, reporter, status));
@ -747,8 +747,8 @@ public class HRegion implements HeapSize { // , Writable{
} }
} }
public ReadWriteConsistencyControl getRWCC() { public MultiVersionConsistencyControl getMVCC() {
return rwcc; return mvcc;
} }
/** /**
@ -1271,7 +1271,7 @@ public class HRegion implements HeapSize { // , Writable{
// during the flush // during the flush
long sequenceId = -1L; long sequenceId = -1L;
long completeSequenceId = -1L; long completeSequenceId = -1L;
ReadWriteConsistencyControl.WriteEntry w = null; MultiVersionConsistencyControl.WriteEntry w = null;
// We have to take a write lock during snapshot, or else a write could // We have to take a write lock during snapshot, or else a write could
// end up in both snapshot and memstore (makes it difficult to do atomic // end up in both snapshot and memstore (makes it difficult to do atomic
@ -1282,9 +1282,9 @@ public class HRegion implements HeapSize { // , Writable{
long currentMemStoreSize = 0; long currentMemStoreSize = 0;
List<StoreFlusher> storeFlushers = new ArrayList<StoreFlusher>(stores.size()); List<StoreFlusher> storeFlushers = new ArrayList<StoreFlusher>(stores.size());
try { try {
// Record the rwcc for all transactions in progress. // Record the mvcc for all transactions in progress.
w = rwcc.beginMemstoreInsert(); w = mvcc.beginMemstoreInsert();
rwcc.advanceMemstore(w); mvcc.advanceMemstore(w);
sequenceId = (wal == null)? myseqid : sequenceId = (wal == null)? myseqid :
wal.startCacheFlush(this.regionInfo.getEncodedNameAsBytes()); wal.startCacheFlush(this.regionInfo.getEncodedNameAsBytes());
@ -1301,15 +1301,15 @@ public class HRegion implements HeapSize { // , Writable{
} finally { } finally {
this.updatesLock.writeLock().unlock(); this.updatesLock.writeLock().unlock();
} }
status.setStatus("Waiting for rwcc"); status.setStatus("Waiting for mvcc");
LOG.debug("Finished snapshotting, commencing waiting for rwcc"); LOG.debug("Finished snapshotting, commencing waiting for mvcc");
// wait for all in-progress transactions to commit to HLog before // wait for all in-progress transactions to commit to HLog before
// we can start the flush. This prevents // we can start the flush. This prevents
// uncommitted transactions from being written into HFiles. // uncommitted transactions from being written into HFiles.
// We have to block before we start the flush, otherwise keys that // We have to block before we start the flush, otherwise keys that
// were removed via a rollbackMemstore could be written to Hfiles. // were removed via a rollbackMemstore could be written to Hfiles.
rwcc.waitForRead(w); mvcc.waitForRead(w);
status.setStatus("Flushing stores"); status.setStatus("Flushing stores");
LOG.debug("Finished snapshotting, commencing flushing stores"); LOG.debug("Finished snapshotting, commencing flushing stores");
@ -1827,7 +1827,7 @@ public class HRegion implements HeapSize { // , Writable{
} }
} }
ReadWriteConsistencyControl.WriteEntry w = null; MultiVersionConsistencyControl.WriteEntry w = null;
long txid = 0; long txid = 0;
boolean walSyncSuccessful = false; boolean walSyncSuccessful = false;
boolean locked = false; boolean locked = false;
@ -1917,17 +1917,17 @@ public class HRegion implements HeapSize { // , Writable{
// //
// ------------------------------------ // ------------------------------------
// Acquire the latest rwcc number // Acquire the latest mvcc number
// ---------------------------------- // ----------------------------------
w = rwcc.beginMemstoreInsert(); w = mvcc.beginMemstoreInsert();
// ------------------------------------ // ------------------------------------
// STEP 3. Write back to memstore // STEP 3. Write back to memstore
// Write to memstore. It is ok to write to memstore // Write to memstore. It is ok to write to memstore
// first without updating the HLog because we do not roll // first without updating the HLog because we do not roll
// forward the memstore RWCC. The RWCC will be moved up when // forward the memstore MVCC. The MVCC will be moved up when
// the complete operation is done. These changes are not yet // the complete operation is done. These changes are not yet
// visible to scanners till we update the RWCC. The RWCC is // visible to scanners till we update the MVCC. The MVCC is
// moved only when the sync is complete. // moved only when the sync is complete.
// ---------------------------------- // ----------------------------------
long addedSize = 0; long addedSize = 0;
@ -1985,10 +1985,10 @@ public class HRegion implements HeapSize { // , Writable{
} }
walSyncSuccessful = true; walSyncSuccessful = true;
// ------------------------------------------------------------------ // ------------------------------------------------------------------
// STEP 8. Advance rwcc. This will make this put visible to scanners and getters. // STEP 8. Advance mvcc. This will make this put visible to scanners and getters.
// ------------------------------------------------------------------ // ------------------------------------------------------------------
if (w != null) { if (w != null) {
rwcc.completeMemstoreInsert(w); mvcc.completeMemstoreInsert(w);
w = null; w = null;
} }
@ -2016,7 +2016,7 @@ public class HRegion implements HeapSize { // , Writable{
if (!walSyncSuccessful) { if (!walSyncSuccessful) {
rollbackMemstore(batchOp, familyMaps, firstIndex, lastIndexExclusive); rollbackMemstore(batchOp, familyMaps, firstIndex, lastIndexExclusive);
} }
if (w != null) rwcc.completeMemstoreInsert(w); if (w != null) mvcc.completeMemstoreInsert(w);
if (locked) { if (locked) {
this.updatesLock.readLock().unlock(); this.updatesLock.readLock().unlock();
@ -2288,20 +2288,20 @@ public class HRegion implements HeapSize { // , Writable{
* <b>not</b> check the families for validity. * <b>not</b> check the families for validity.
* *
* @param familyMap Map of kvs per family * @param familyMap Map of kvs per family
* @param localizedWriteEntry The WriteEntry of the RWCC for this transaction. * @param localizedWriteEntry The WriteEntry of the MVCC for this transaction.
* If null, then this method internally creates a rwcc transaction. * If null, then this method internally creates a mvcc transaction.
* @return the additional memory usage of the memstore caused by the * @return the additional memory usage of the memstore caused by the
* new entries. * new entries.
*/ */
private long applyFamilyMapToMemstore(Map<byte[], List<KeyValue>> familyMap, private long applyFamilyMapToMemstore(Map<byte[], List<KeyValue>> familyMap,
ReadWriteConsistencyControl.WriteEntry localizedWriteEntry) { MultiVersionConsistencyControl.WriteEntry localizedWriteEntry) {
long size = 0; long size = 0;
boolean freerwcc = false; boolean freemvcc = false;
try { try {
if (localizedWriteEntry == null) { if (localizedWriteEntry == null) {
localizedWriteEntry = rwcc.beginMemstoreInsert(); localizedWriteEntry = mvcc.beginMemstoreInsert();
freerwcc = true; freemvcc = true;
} }
for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) { for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
@ -2315,8 +2315,8 @@ public class HRegion implements HeapSize { // , Writable{
} }
} }
} finally { } finally {
if (freerwcc) { if (freemvcc) {
rwcc.completeMemstoreInsert(localizedWriteEntry); mvcc.completeMemstoreInsert(localizedWriteEntry);
} }
} }
@ -3013,7 +3013,7 @@ public class HRegion implements HeapSize { // , Writable{
// synchronize on scannerReadPoints so that nobody calculates // synchronize on scannerReadPoints so that nobody calculates
// getSmallestReadPoint, before scannerReadPoints is updated. // getSmallestReadPoint, before scannerReadPoints is updated.
synchronized(scannerReadPoints) { synchronized(scannerReadPoints) {
this.readPt = ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); this.readPt = MultiVersionConsistencyControl.resetThreadReadPoint(mvcc);
scannerReadPoints.put(this, this.readPt); scannerReadPoints.put(this, this.readPt);
} }
@ -3057,7 +3057,7 @@ public class HRegion implements HeapSize { // , Writable{
try { try {
// This could be a new thread from the last time we called next(). // This could be a new thread from the last time we called next().
ReadWriteConsistencyControl.setThreadReadPoint(this.readPt); MultiVersionConsistencyControl.setThreadReadPoint(this.readPt);
results.clear(); results.clear();
@ -3884,7 +3884,7 @@ public class HRegion implements HeapSize { // , Writable{
*/ */
public Result append(Append append, Integer lockid, boolean writeToWAL) public Result append(Append append, Integer lockid, boolean writeToWAL)
throws IOException { throws IOException {
// TODO: Use RWCC to make this set of appends atomic to reads // TODO: Use MVCC to make this set of appends atomic to reads
byte[] row = append.getRow(); byte[] row = append.getRow();
checkRow(row, "append"); checkRow(row, "append");
boolean flush = false; boolean flush = false;
@ -4024,7 +4024,7 @@ public class HRegion implements HeapSize { // , Writable{
public Result increment(Increment increment, Integer lockid, public Result increment(Increment increment, Integer lockid,
boolean writeToWAL) boolean writeToWAL)
throws IOException { throws IOException {
// TODO: Use RWCC to make this set of increments atomic to reads // TODO: Use MVCC to make this set of increments atomic to reads
byte [] row = increment.getRow(); byte [] row = increment.getRow();
checkRow(row, "increment"); checkRow(row, "increment");
TimeRange tr = increment.getTimeRange(); TimeRange tr = increment.getTimeRange();
@ -4244,7 +4244,7 @@ public class HRegion implements HeapSize { // , Writable{
ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores
(2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock (2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock
ClassSize.ARRAYLIST + // recentFlushes ClassSize.ARRAYLIST + // recentFlushes
ReadWriteConsistencyControl.FIXED_SIZE // rwcc MultiVersionConsistencyControl.FIXED_SIZE // mvcc
; ;
@Override @Override
@ -4253,7 +4253,7 @@ public class HRegion implements HeapSize { // , Writable{
for(Store store : this.stores.values()) { for(Store store : this.stores.values()) {
heapSize += store.heapSize(); heapSize += store.heapSize();
} }
// this does not take into account row locks, recent flushes, rwcc entries // this does not take into account row locks, recent flushes, mvcc entries
return heapSize; return heapSize;
} }

View File

@ -713,7 +713,7 @@ public class MemStore implements HeapSize {
} }
protected KeyValue getNext(Iterator<KeyValue> it) { protected KeyValue getNext(Iterator<KeyValue> it) {
long readPoint = ReadWriteConsistencyControl.getThreadReadPoint(); long readPoint = MultiVersionConsistencyControl.getThreadReadPoint();
while (it.hasNext()) { while (it.hasNext()) {
KeyValue v = it.next(); KeyValue v = it.next();

View File

@ -33,7 +33,7 @@ import org.apache.commons.logging.Log;
* a mechanism for writers to obtain new write numbers, then "commit" * a mechanism for writers to obtain new write numbers, then "commit"
* the new writes for readers to read (thus forming atomic transactions). * the new writes for readers to read (thus forming atomic transactions).
*/ */
public class ReadWriteConsistencyControl { public class MultiVersionConsistencyControl {
private volatile long memstoreRead = 0; private volatile long memstoreRead = 0;
private volatile long memstoreWrite = 0; private volatile long memstoreWrite = 0;
@ -55,7 +55,7 @@ public class ReadWriteConsistencyControl {
/** /**
* Default constructor. Initializes the memstoreRead/Write points to 0. * Default constructor. Initializes the memstoreRead/Write points to 0.
*/ */
public ReadWriteConsistencyControl() { public MultiVersionConsistencyControl() {
this.memstoreRead = this.memstoreWrite = 0; this.memstoreRead = this.memstoreWrite = 0;
} }
@ -66,7 +66,7 @@ public class ReadWriteConsistencyControl {
public void initialize(long startPoint) { public void initialize(long startPoint) {
synchronized (writeQueue) { synchronized (writeQueue) {
if (this.memstoreWrite != this.memstoreRead) { if (this.memstoreWrite != this.memstoreRead) {
throw new RuntimeException("Already used this rwcc. Too late to initialize"); throw new RuntimeException("Already used this mvcc. Too late to initialize");
} }
this.memstoreRead = this.memstoreWrite = startPoint; this.memstoreRead = this.memstoreWrite = startPoint;
@ -83,7 +83,7 @@ public class ReadWriteConsistencyControl {
} }
/** /**
* Set the thread read point to the given value. The thread RWCC * Set the thread read point to the given value. The thread MVCC
* is used by the Memstore scanner so it knows which values to skip. * is used by the Memstore scanner so it knows which values to skip.
* Give it a value of 0 if you want everything. * Give it a value of 0 if you want everything.
*/ */
@ -92,16 +92,16 @@ public class ReadWriteConsistencyControl {
} }
/** /**
* Set the thread RWCC read point to whatever the current read point is in * Set the thread MVCC read point to whatever the current read point is in
* this particular instance of RWCC. Returns the new thread read point value. * this particular instance of MVCC. Returns the new thread read point value.
*/ */
public static long resetThreadReadPoint(ReadWriteConsistencyControl rwcc) { public static long resetThreadReadPoint(MultiVersionConsistencyControl mvcc) {
perThreadReadPoint.set(rwcc.memstoreReadPoint()); perThreadReadPoint.set(mvcc.memstoreReadPoint());
return getThreadReadPoint(); return getThreadReadPoint();
} }
/** /**
* Set the thread RWCC read point to 0 (include everything). * Set the thread MVCC read point to 0 (include everything).
*/ */
public static void resetThreadReadPoint() { public static void resetThreadReadPoint() {
perThreadReadPoint.set(0L); perThreadReadPoint.set(0L);
@ -204,10 +204,10 @@ public class ReadWriteConsistencyControl {
return this.writeNumber; return this.writeNumber;
} }
} }
public static final long FIXED_SIZE = ClassSize.align( public static final long FIXED_SIZE = ClassSize.align(
ClassSize.OBJECT + ClassSize.OBJECT +
2 * Bytes.SIZEOF_LONG + 2 * Bytes.SIZEOF_LONG +
2 * ClassSize.REFERENCE); 2 * ClassSize.REFERENCE);
} }

View File

@ -1285,7 +1285,7 @@ public class Store extends SchemaConfigured implements HeapSize {
StoreFile.Writer writer = null; StoreFile.Writer writer = null;
// Find the smallest read point across all the Scanners. // Find the smallest read point across all the Scanners.
long smallestReadPoint = region.getSmallestReadPoint(); long smallestReadPoint = region.getSmallestReadPoint();
ReadWriteConsistencyControl.setThreadReadPoint(smallestReadPoint); MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint);
try { try {
InternalScanner scanner = null; InternalScanner scanner = null;
try { try {
@ -1942,7 +1942,7 @@ public class Store extends SchemaConfigured implements HeapSize {
throws IOException { throws IOException {
this.lock.readLock().lock(); this.lock.readLock().lock();
try { try {
// TODO: Make this operation atomic w/ RWCC // TODO: Make this operation atomic w/ MVCC
return this.memstore.upsert(kvs); return this.memstore.upsert(kvs);
} finally { } finally {
this.lock.readLock().unlock(); this.lock.readLock().unlock();

View File

@ -52,7 +52,7 @@ class StoreFileScanner implements KeyValueScanner {
private boolean delayedReseek; private boolean delayedReseek;
private KeyValue delayedSeekKV; private KeyValue delayedSeekKV;
private boolean enforceRWCC = false; private boolean enforceMVCC = false;
//The variable, realSeekDone, may cheat on store file scanner for the //The variable, realSeekDone, may cheat on store file scanner for the
// multi-column bloom-filter optimization. // multi-column bloom-filter optimization.
@ -67,10 +67,10 @@ class StoreFileScanner implements KeyValueScanner {
* Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner} * Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner}
* @param hfs HFile scanner * @param hfs HFile scanner
*/ */
public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs, boolean useRWCC) { public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs, boolean useMVCC) {
this.reader = reader; this.reader = reader;
this.hfs = hfs; this.hfs = hfs;
this.enforceRWCC = useRWCC; this.enforceMVCC = useMVCC;
} }
/** /**
@ -182,11 +182,11 @@ class StoreFileScanner implements KeyValueScanner {
} }
protected boolean skipKVsNewerThanReadpoint() throws IOException { protected boolean skipKVsNewerThanReadpoint() throws IOException {
long readPoint = ReadWriteConsistencyControl.getThreadReadPoint(); long readPoint = MultiVersionConsistencyControl.getThreadReadPoint();
// We want to ignore all key-values that are newer than our current // We want to ignore all key-values that are newer than our current
// readPoint // readPoint
while(enforceRWCC while(enforceMVCC
&& cur != null && cur != null
&& (cur.getMemstoreTS() > readPoint)) { && (cur.getMemstoreTS() > readPoint)) {
hfs.next(); hfs.next();

View File

@ -57,7 +57,7 @@ import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.ReadWriteConsistencyControl; import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl;
import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
@ -1525,7 +1525,7 @@ public class HBaseTestingUtility {
*/ */
public static List<KeyValue> getFromStoreFile(Store store, public static List<KeyValue> getFromStoreFile(Store store,
Get get) throws IOException { Get get) throws IOException {
ReadWriteConsistencyControl.resetThreadReadPoint(); MultiVersionConsistencyControl.resetThreadReadPoint();
Scan scan = new Scan(get); Scan scan = new Scan(get);
InternalScanner scanner = (InternalScanner) store.getScanner(scan, InternalScanner scanner = (InternalScanner) store.getScanner(scan,
scan.getFamilyMap().get(store.getFamily().getName())); scan.getFamilyMap().get(store.getFamily().getName()));

View File

@ -1444,12 +1444,12 @@ public class TestHRegion extends HBaseTestCase {
scan.addFamily(fam2); scan.addFamily(fam2);
scan.addFamily(fam4); scan.addFamily(fam4);
is = (RegionScannerImpl) region.getScanner(scan); is = (RegionScannerImpl) region.getScanner(scan);
ReadWriteConsistencyControl.resetThreadReadPoint(region.getRWCC()); MultiVersionConsistencyControl.resetThreadReadPoint(region.getMVCC());
assertEquals(1, ((RegionScannerImpl)is).storeHeap.getHeap().size()); assertEquals(1, ((RegionScannerImpl)is).storeHeap.getHeap().size());
scan = new Scan(); scan = new Scan();
is = (RegionScannerImpl) region.getScanner(scan); is = (RegionScannerImpl) region.getScanner(scan);
ReadWriteConsistencyControl.resetThreadReadPoint(region.getRWCC()); MultiVersionConsistencyControl.resetThreadReadPoint(region.getMVCC());
assertEquals(families.length -1, assertEquals(families.length -1,
((RegionScannerImpl)is).storeHeap.getHeap().size()); ((RegionScannerImpl)is).storeHeap.getHeap().size());
} }

View File

@ -55,12 +55,12 @@ public class TestMemStore extends TestCase {
private static final byte [] CONTENTS = Bytes.toBytes("contents"); private static final byte [] CONTENTS = Bytes.toBytes("contents");
private static final byte [] BASIC = Bytes.toBytes("basic"); private static final byte [] BASIC = Bytes.toBytes("basic");
private static final String CONTENTSTR = "contentstr"; private static final String CONTENTSTR = "contentstr";
private ReadWriteConsistencyControl rwcc; private MultiVersionConsistencyControl mvcc;
@Override @Override
public void setUp() throws Exception { public void setUp() throws Exception {
super.setUp(); super.setUp();
this.rwcc = new ReadWriteConsistencyControl(); this.mvcc = new MultiVersionConsistencyControl();
this.memstore = new MemStore(); this.memstore = new MemStore();
} }
@ -86,7 +86,7 @@ public class TestMemStore extends TestCase {
List<KeyValueScanner> memstorescanners = this.memstore.getScanners(); List<KeyValueScanner> memstorescanners = this.memstore.getScanners();
Scan scan = new Scan(); Scan scan = new Scan();
List<KeyValue> result = new ArrayList<KeyValue>(); List<KeyValue> result = new ArrayList<KeyValue>();
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); MultiVersionConsistencyControl.resetThreadReadPoint(mvcc);
ScanInfo scanInfo = new ScanInfo(null, 0, 1, HConstants.LATEST_TIMESTAMP, false, ScanInfo scanInfo = new ScanInfo(null, 0, 1, HConstants.LATEST_TIMESTAMP, false,
this.memstore.comparator); this.memstore.comparator);
ScanType scanType = ScanType.USER_SCAN; ScanType scanType = ScanType.USER_SCAN;
@ -108,7 +108,7 @@ public class TestMemStore extends TestCase {
scanner.close(); scanner.close();
} }
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); MultiVersionConsistencyControl.resetThreadReadPoint(mvcc);
memstorescanners = this.memstore.getScanners(); memstorescanners = this.memstore.getScanners();
// Now assert can count same number even if a snapshot mid-scan. // Now assert can count same number even if a snapshot mid-scan.
s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners); s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners);
@ -198,7 +198,7 @@ public class TestMemStore extends TestCase {
private void verifyScanAcrossSnapshot2(KeyValue kv1, KeyValue kv2) private void verifyScanAcrossSnapshot2(KeyValue kv1, KeyValue kv2)
throws IOException { throws IOException {
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); MultiVersionConsistencyControl.resetThreadReadPoint(mvcc);
List<KeyValueScanner> memstorescanners = this.memstore.getScanners(); List<KeyValueScanner> memstorescanners = this.memstore.getScanners();
assertEquals(1, memstorescanners.size()); assertEquals(1, memstorescanners.size());
final KeyValueScanner scanner = memstorescanners.get(0); final KeyValueScanner scanner = memstorescanners.get(0);
@ -233,35 +233,35 @@ public class TestMemStore extends TestCase {
final byte[] q2 = Bytes.toBytes("q2"); final byte[] q2 = Bytes.toBytes("q2");
final byte[] v = Bytes.toBytes("value"); final byte[] v = Bytes.toBytes("value");
ReadWriteConsistencyControl.WriteEntry w = MultiVersionConsistencyControl.WriteEntry w =
rwcc.beginMemstoreInsert(); mvcc.beginMemstoreInsert();
KeyValue kv1 = new KeyValue(row, f, q1, v); KeyValue kv1 = new KeyValue(row, f, q1, v);
kv1.setMemstoreTS(w.getWriteNumber()); kv1.setMemstoreTS(w.getWriteNumber());
memstore.add(kv1); memstore.add(kv1);
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); MultiVersionConsistencyControl.resetThreadReadPoint(mvcc);
KeyValueScanner s = this.memstore.getScanners().get(0); KeyValueScanner s = this.memstore.getScanners().get(0);
assertScannerResults(s, new KeyValue[]{}); assertScannerResults(s, new KeyValue[]{});
rwcc.completeMemstoreInsert(w); mvcc.completeMemstoreInsert(w);
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); MultiVersionConsistencyControl.resetThreadReadPoint(mvcc);
s = this.memstore.getScanners().get(0); s = this.memstore.getScanners().get(0);
assertScannerResults(s, new KeyValue[]{kv1}); assertScannerResults(s, new KeyValue[]{kv1});
w = rwcc.beginMemstoreInsert(); w = mvcc.beginMemstoreInsert();
KeyValue kv2 = new KeyValue(row, f, q2, v); KeyValue kv2 = new KeyValue(row, f, q2, v);
kv2.setMemstoreTS(w.getWriteNumber()); kv2.setMemstoreTS(w.getWriteNumber());
memstore.add(kv2); memstore.add(kv2);
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); MultiVersionConsistencyControl.resetThreadReadPoint(mvcc);
s = this.memstore.getScanners().get(0); s = this.memstore.getScanners().get(0);
assertScannerResults(s, new KeyValue[]{kv1}); assertScannerResults(s, new KeyValue[]{kv1});
rwcc.completeMemstoreInsert(w); mvcc.completeMemstoreInsert(w);
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); MultiVersionConsistencyControl.resetThreadReadPoint(mvcc);
s = this.memstore.getScanners().get(0); s = this.memstore.getScanners().get(0);
assertScannerResults(s, new KeyValue[]{kv1, kv2}); assertScannerResults(s, new KeyValue[]{kv1, kv2});
} }
@ -281,8 +281,8 @@ public class TestMemStore extends TestCase {
final byte[] v2 = Bytes.toBytes("value2"); final byte[] v2 = Bytes.toBytes("value2");
// INSERT 1: Write both columns val1 // INSERT 1: Write both columns val1
ReadWriteConsistencyControl.WriteEntry w = MultiVersionConsistencyControl.WriteEntry w =
rwcc.beginMemstoreInsert(); mvcc.beginMemstoreInsert();
KeyValue kv11 = new KeyValue(row, f, q1, v1); KeyValue kv11 = new KeyValue(row, f, q1, v1);
kv11.setMemstoreTS(w.getWriteNumber()); kv11.setMemstoreTS(w.getWriteNumber());
@ -291,15 +291,15 @@ public class TestMemStore extends TestCase {
KeyValue kv12 = new KeyValue(row, f, q2, v1); KeyValue kv12 = new KeyValue(row, f, q2, v1);
kv12.setMemstoreTS(w.getWriteNumber()); kv12.setMemstoreTS(w.getWriteNumber());
memstore.add(kv12); memstore.add(kv12);
rwcc.completeMemstoreInsert(w); mvcc.completeMemstoreInsert(w);
// BEFORE STARTING INSERT 2, SEE FIRST KVS // BEFORE STARTING INSERT 2, SEE FIRST KVS
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); MultiVersionConsistencyControl.resetThreadReadPoint(mvcc);
KeyValueScanner s = this.memstore.getScanners().get(0); KeyValueScanner s = this.memstore.getScanners().get(0);
assertScannerResults(s, new KeyValue[]{kv11, kv12}); assertScannerResults(s, new KeyValue[]{kv11, kv12});
// START INSERT 2: Write both columns val2 // START INSERT 2: Write both columns val2
w = rwcc.beginMemstoreInsert(); w = mvcc.beginMemstoreInsert();
KeyValue kv21 = new KeyValue(row, f, q1, v2); KeyValue kv21 = new KeyValue(row, f, q1, v2);
kv21.setMemstoreTS(w.getWriteNumber()); kv21.setMemstoreTS(w.getWriteNumber());
memstore.add(kv21); memstore.add(kv21);
@ -309,17 +309,17 @@ public class TestMemStore extends TestCase {
memstore.add(kv22); memstore.add(kv22);
// BEFORE COMPLETING INSERT 2, SEE FIRST KVS // BEFORE COMPLETING INSERT 2, SEE FIRST KVS
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); MultiVersionConsistencyControl.resetThreadReadPoint(mvcc);
s = this.memstore.getScanners().get(0); s = this.memstore.getScanners().get(0);
assertScannerResults(s, new KeyValue[]{kv11, kv12}); assertScannerResults(s, new KeyValue[]{kv11, kv12});
// COMPLETE INSERT 2 // COMPLETE INSERT 2
rwcc.completeMemstoreInsert(w); mvcc.completeMemstoreInsert(w);
// NOW SHOULD SEE NEW KVS IN ADDITION TO OLD KVS. // NOW SHOULD SEE NEW KVS IN ADDITION TO OLD KVS.
// See HBASE-1485 for discussion about what we should do with // See HBASE-1485 for discussion about what we should do with
// the duplicate-TS inserts // the duplicate-TS inserts
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); MultiVersionConsistencyControl.resetThreadReadPoint(mvcc);
s = this.memstore.getScanners().get(0); s = this.memstore.getScanners().get(0);
assertScannerResults(s, new KeyValue[]{kv21, kv11, kv22, kv12}); assertScannerResults(s, new KeyValue[]{kv21, kv11, kv22, kv12});
} }
@ -336,8 +336,8 @@ public class TestMemStore extends TestCase {
final byte[] q2 = Bytes.toBytes("q2"); final byte[] q2 = Bytes.toBytes("q2");
final byte[] v1 = Bytes.toBytes("value1"); final byte[] v1 = Bytes.toBytes("value1");
// INSERT 1: Write both columns val1 // INSERT 1: Write both columns val1
ReadWriteConsistencyControl.WriteEntry w = MultiVersionConsistencyControl.WriteEntry w =
rwcc.beginMemstoreInsert(); mvcc.beginMemstoreInsert();
KeyValue kv11 = new KeyValue(row, f, q1, v1); KeyValue kv11 = new KeyValue(row, f, q1, v1);
kv11.setMemstoreTS(w.getWriteNumber()); kv11.setMemstoreTS(w.getWriteNumber());
@ -346,30 +346,30 @@ public class TestMemStore extends TestCase {
KeyValue kv12 = new KeyValue(row, f, q2, v1); KeyValue kv12 = new KeyValue(row, f, q2, v1);
kv12.setMemstoreTS(w.getWriteNumber()); kv12.setMemstoreTS(w.getWriteNumber());
memstore.add(kv12); memstore.add(kv12);
rwcc.completeMemstoreInsert(w); mvcc.completeMemstoreInsert(w);
// BEFORE STARTING INSERT 2, SEE FIRST KVS // BEFORE STARTING INSERT 2, SEE FIRST KVS
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); MultiVersionConsistencyControl.resetThreadReadPoint(mvcc);
KeyValueScanner s = this.memstore.getScanners().get(0); KeyValueScanner s = this.memstore.getScanners().get(0);
assertScannerResults(s, new KeyValue[]{kv11, kv12}); assertScannerResults(s, new KeyValue[]{kv11, kv12});
// START DELETE: Insert delete for one of the columns // START DELETE: Insert delete for one of the columns
w = rwcc.beginMemstoreInsert(); w = mvcc.beginMemstoreInsert();
KeyValue kvDel = new KeyValue(row, f, q2, kv11.getTimestamp(), KeyValue kvDel = new KeyValue(row, f, q2, kv11.getTimestamp(),
KeyValue.Type.DeleteColumn); KeyValue.Type.DeleteColumn);
kvDel.setMemstoreTS(w.getWriteNumber()); kvDel.setMemstoreTS(w.getWriteNumber());
memstore.add(kvDel); memstore.add(kvDel);
// BEFORE COMPLETING DELETE, SEE FIRST KVS // BEFORE COMPLETING DELETE, SEE FIRST KVS
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); MultiVersionConsistencyControl.resetThreadReadPoint(mvcc);
s = this.memstore.getScanners().get(0); s = this.memstore.getScanners().get(0);
assertScannerResults(s, new KeyValue[]{kv11, kv12}); assertScannerResults(s, new KeyValue[]{kv11, kv12});
// COMPLETE DELETE // COMPLETE DELETE
rwcc.completeMemstoreInsert(w); mvcc.completeMemstoreInsert(w);
// NOW WE SHOULD SEE DELETE // NOW WE SHOULD SEE DELETE
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); MultiVersionConsistencyControl.resetThreadReadPoint(mvcc);
s = this.memstore.getScanners().get(0); s = this.memstore.getScanners().get(0);
assertScannerResults(s, new KeyValue[]{kv11, kvDel, kv12}); assertScannerResults(s, new KeyValue[]{kv11, kvDel, kv12});
} }
@ -383,7 +383,7 @@ public class TestMemStore extends TestCase {
final byte[] f = Bytes.toBytes("family"); final byte[] f = Bytes.toBytes("family");
final byte[] q1 = Bytes.toBytes("q1"); final byte[] q1 = Bytes.toBytes("q1");
final ReadWriteConsistencyControl rwcc; final MultiVersionConsistencyControl mvcc;
final MemStore memstore; final MemStore memstore;
AtomicReference<Throwable> caughtException; AtomicReference<Throwable> caughtException;
@ -391,10 +391,10 @@ public class TestMemStore extends TestCase {
public ReadOwnWritesTester(int id, public ReadOwnWritesTester(int id,
MemStore memstore, MemStore memstore,
ReadWriteConsistencyControl rwcc, MultiVersionConsistencyControl mvcc,
AtomicReference<Throwable> caughtException) AtomicReference<Throwable> caughtException)
{ {
this.rwcc = rwcc; this.mvcc = mvcc;
this.memstore = memstore; this.memstore = memstore;
this.caughtException = caughtException; this.caughtException = caughtException;
row = Bytes.toBytes(id); row = Bytes.toBytes(id);
@ -410,8 +410,8 @@ public class TestMemStore extends TestCase {
private void internalRun() throws IOException { private void internalRun() throws IOException {
for (long i = 0; i < NUM_TRIES && caughtException.get() == null; i++) { for (long i = 0; i < NUM_TRIES && caughtException.get() == null; i++) {
ReadWriteConsistencyControl.WriteEntry w = MultiVersionConsistencyControl.WriteEntry w =
rwcc.beginMemstoreInsert(); mvcc.beginMemstoreInsert();
// Insert the sequence value (i) // Insert the sequence value (i)
byte[] v = Bytes.toBytes(i); byte[] v = Bytes.toBytes(i);
@ -419,10 +419,10 @@ public class TestMemStore extends TestCase {
KeyValue kv = new KeyValue(row, f, q1, i, v); KeyValue kv = new KeyValue(row, f, q1, i, v);
kv.setMemstoreTS(w.getWriteNumber()); kv.setMemstoreTS(w.getWriteNumber());
memstore.add(kv); memstore.add(kv);
rwcc.completeMemstoreInsert(w); mvcc.completeMemstoreInsert(w);
// Assert that we can read back // Assert that we can read back
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); MultiVersionConsistencyControl.resetThreadReadPoint(mvcc);
KeyValueScanner s = this.memstore.getScanners().get(0); KeyValueScanner s = this.memstore.getScanners().get(0);
s.seek(kv); s.seek(kv);
@ -443,7 +443,7 @@ public class TestMemStore extends TestCase {
AtomicReference<Throwable> caught = new AtomicReference<Throwable>(); AtomicReference<Throwable> caught = new AtomicReference<Throwable>();
for (int i = 0; i < NUM_THREADS; i++) { for (int i = 0; i < NUM_THREADS; i++) {
threads[i] = new ReadOwnWritesTester(i, memstore, rwcc, caught); threads[i] = new ReadOwnWritesTester(i, memstore, mvcc, caught);
threads[i].start(); threads[i].start();
} }
@ -531,7 +531,7 @@ public class TestMemStore extends TestCase {
* @throws InterruptedException * @throws InterruptedException
*/ */
public void testGetNextRow() throws Exception { public void testGetNextRow() throws Exception {
ReadWriteConsistencyControl.resetThreadReadPoint(); MultiVersionConsistencyControl.resetThreadReadPoint();
addRows(this.memstore); addRows(this.memstore);
// Add more versions to make it a little more interesting. // Add more versions to make it a little more interesting.
Thread.sleep(1); Thread.sleep(1);
@ -947,7 +947,7 @@ public class TestMemStore extends TestCase {
} }
public static void main(String [] args) throws IOException { public static void main(String [] args) throws IOException {
ReadWriteConsistencyControl rwcc = new ReadWriteConsistencyControl(); MultiVersionConsistencyControl mvcc = new MultiVersionConsistencyControl();
MemStore ms = new MemStore(); MemStore ms = new MemStore();
long n1 = System.nanoTime(); long n1 = System.nanoTime();
@ -957,7 +957,7 @@ public class TestMemStore extends TestCase {
System.out.println("foo"); System.out.println("foo");
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); MultiVersionConsistencyControl.resetThreadReadPoint(mvcc);
for (int i = 0 ; i < 50 ; i++) for (int i = 0 ; i < 50 ; i++)
doScan(ms, i); doScan(ms, i);

View File

@ -31,12 +31,12 @@ import java.util.concurrent.atomic.AtomicLong;
public class TestReadWriteConsistencyControl extends TestCase { public class TestReadWriteConsistencyControl extends TestCase {
static class Writer implements Runnable { static class Writer implements Runnable {
final AtomicBoolean finished; final AtomicBoolean finished;
final ReadWriteConsistencyControl rwcc; final MultiVersionConsistencyControl mvcc;
final AtomicBoolean status; final AtomicBoolean status;
Writer(AtomicBoolean finished, ReadWriteConsistencyControl rwcc, AtomicBoolean status) { Writer(AtomicBoolean finished, MultiVersionConsistencyControl mvcc, AtomicBoolean status) {
this.finished = finished; this.finished = finished;
this.rwcc = rwcc; this.mvcc = mvcc;
this.status = status; this.status = status;
} }
private Random rnd = new Random(); private Random rnd = new Random();
@ -44,7 +44,7 @@ public class TestReadWriteConsistencyControl extends TestCase {
public void run() { public void run() {
while (!finished.get()) { while (!finished.get()) {
ReadWriteConsistencyControl.WriteEntry e = rwcc.beginMemstoreInsert(); MultiVersionConsistencyControl.WriteEntry e = mvcc.beginMemstoreInsert();
// System.out.println("Begin write: " + e.getWriteNumber()); // System.out.println("Begin write: " + e.getWriteNumber());
// 10 usec - 500usec (including 0) // 10 usec - 500usec (including 0)
int sleepTime = rnd.nextInt(500); int sleepTime = rnd.nextInt(500);
@ -56,7 +56,7 @@ public class TestReadWriteConsistencyControl extends TestCase {
} catch (InterruptedException e1) { } catch (InterruptedException e1) {
} }
try { try {
rwcc.completeMemstoreInsert(e); mvcc.completeMemstoreInsert(e);
} catch (RuntimeException ex) { } catch (RuntimeException ex) {
// got failure // got failure
System.out.println(ex.toString()); System.out.println(ex.toString());
@ -70,7 +70,7 @@ public class TestReadWriteConsistencyControl extends TestCase {
} }
public void testParallelism() throws Exception { public void testParallelism() throws Exception {
final ReadWriteConsistencyControl rwcc = new ReadWriteConsistencyControl(); final MultiVersionConsistencyControl mvcc = new MultiVersionConsistencyControl();
final AtomicBoolean finished = new AtomicBoolean(false); final AtomicBoolean finished = new AtomicBoolean(false);
@ -79,9 +79,9 @@ public class TestReadWriteConsistencyControl extends TestCase {
final AtomicLong failedAt = new AtomicLong(); final AtomicLong failedAt = new AtomicLong();
Runnable reader = new Runnable() { Runnable reader = new Runnable() {
public void run() { public void run() {
long prev = rwcc.memstoreReadPoint(); long prev = mvcc.memstoreReadPoint();
while (!finished.get()) { while (!finished.get()) {
long newPrev = rwcc.memstoreReadPoint(); long newPrev = mvcc.memstoreReadPoint();
if (newPrev < prev) { if (newPrev < prev) {
// serious problem. // serious problem.
System.out.println("Reader got out of order, prev: " + System.out.println("Reader got out of order, prev: " +
@ -103,7 +103,7 @@ public class TestReadWriteConsistencyControl extends TestCase {
for (int i = 0 ; i < n ; ++i ) { for (int i = 0 ; i < n ; ++i ) {
statuses[i] = new AtomicBoolean(true); statuses[i] = new AtomicBoolean(true);
writers[i] = new Thread(new Writer(finished, rwcc, statuses[i])); writers[i] = new Thread(new Writer(finished, mvcc, statuses[i]));
writers[i].start(); writers[i].start();
} }
readThread.start(); readThread.start();