From e7fb30c371261d0c10ebb85cf7c07f39305371c4 Mon Sep 17 00:00:00 2001 From: Nicolas Spiegelberg Date: Fri, 18 Nov 2011 02:19:37 +0000 Subject: [PATCH] HBASE-4544 change RWCC to MVCC git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1203468 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/hbase/io/hfile/HFileWriterV2.java | 2 +- .../hadoop/hbase/regionserver/HRegion.java | 72 ++++++++-------- .../hadoop/hbase/regionserver/MemStore.java | 2 +- ...va => MultiVersionConsistencyControl.java} | 26 +++--- .../hadoop/hbase/regionserver/Store.java | 4 +- .../hbase/regionserver/StoreFileScanner.java | 10 +-- .../hadoop/hbase/HBaseTestingUtility.java | 4 +- .../hbase/regionserver/TestHRegion.java | 4 +- .../hbase/regionserver/TestMemStore.java | 82 +++++++++---------- .../TestReadWriteConsistencyControl.java | 18 ++-- 10 files changed, 112 insertions(+), 112 deletions(-) rename src/main/java/org/apache/hadoop/hbase/regionserver/{ReadWriteConsistencyControl.java => MultiVersionConsistencyControl.java} (90%) diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java index 0d3d59d3300..bc61a3ed4fc 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java @@ -49,7 +49,7 @@ import org.apache.hadoop.io.WritableUtils; public class HFileWriterV2 extends AbstractHFileWriter { static final Log LOG = LogFactory.getLog(HFileWriterV2.class); - /** Max memstore (rwcc) timestamp in FileInfo */ + /** Max memstore (mvcc) timestamp in FileInfo */ public static final byte [] MAX_MEMSTORE_TS_KEY = Bytes.toBytes("MAX_MEMSTORE_TS_KEY"); /** KeyValue version in FileInfo */ public static final byte [] KEY_VALUE_VERSION = Bytes.toBytes("KEY_VALUE_VERSION"); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 06b08fda452..5160ae17461 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -215,7 +215,7 @@ public class HRegion implements HeapSize { // , Writable{ private ConcurrentHashMap scannerReadPoints; /* - * @return The smallest rwcc readPoint across all the scanners in this + * @return The smallest mvcc readPoint across all the scanners in this * region. Writes older than this readPoint, are included in every * read operation. */ @@ -225,7 +225,7 @@ public class HRegion implements HeapSize { // , Writable{ // no new RegionScanners can grab a readPoint that we are unaware of. // We achieve this by synchronizing on the scannerReadPoints object. synchronized(scannerReadPoints) { - minimumReadPoint = rwcc.memstoreReadPoint(); + minimumReadPoint = mvcc.memstoreReadPoint(); for (Long readPoint: this.scannerReadPoints.values()) { if (readPoint < minimumReadPoint) { @@ -291,8 +291,8 @@ public class HRegion implements HeapSize { // , Writable{ private boolean splitRequest; private byte[] explicitSplitPoint = null; - private final ReadWriteConsistencyControl rwcc = - new ReadWriteConsistencyControl(); + private final MultiVersionConsistencyControl mvcc = + new MultiVersionConsistencyControl(); // Coprocessor host private RegionCoprocessorHost coprocessorHost; @@ -538,7 +538,7 @@ public class HRegion implements HeapSize { // , Writable{ maxMemstoreTS = maxStoreMemstoreTS; } } - rwcc.initialize(maxMemstoreTS + 1); + mvcc.initialize(maxMemstoreTS + 1); // Recover any edits if available. maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny( this.regiondir, minSeqId, reporter, status)); @@ -747,8 +747,8 @@ public class HRegion implements HeapSize { // , Writable{ } } - public ReadWriteConsistencyControl getRWCC() { - return rwcc; + public MultiVersionConsistencyControl getMVCC() { + return mvcc; } /** @@ -1271,7 +1271,7 @@ public class HRegion implements HeapSize { // , Writable{ // during the flush long sequenceId = -1L; long completeSequenceId = -1L; - ReadWriteConsistencyControl.WriteEntry w = null; + MultiVersionConsistencyControl.WriteEntry w = null; // We have to take a write lock during snapshot, or else a write could // end up in both snapshot and memstore (makes it difficult to do atomic @@ -1282,9 +1282,9 @@ public class HRegion implements HeapSize { // , Writable{ long currentMemStoreSize = 0; List storeFlushers = new ArrayList(stores.size()); try { - // Record the rwcc for all transactions in progress. - w = rwcc.beginMemstoreInsert(); - rwcc.advanceMemstore(w); + // Record the mvcc for all transactions in progress. + w = mvcc.beginMemstoreInsert(); + mvcc.advanceMemstore(w); sequenceId = (wal == null)? myseqid : wal.startCacheFlush(this.regionInfo.getEncodedNameAsBytes()); @@ -1301,15 +1301,15 @@ public class HRegion implements HeapSize { // , Writable{ } finally { this.updatesLock.writeLock().unlock(); } - status.setStatus("Waiting for rwcc"); - LOG.debug("Finished snapshotting, commencing waiting for rwcc"); + status.setStatus("Waiting for mvcc"); + LOG.debug("Finished snapshotting, commencing waiting for mvcc"); // wait for all in-progress transactions to commit to HLog before // we can start the flush. This prevents // uncommitted transactions from being written into HFiles. // We have to block before we start the flush, otherwise keys that // were removed via a rollbackMemstore could be written to Hfiles. - rwcc.waitForRead(w); + mvcc.waitForRead(w); status.setStatus("Flushing stores"); LOG.debug("Finished snapshotting, commencing flushing stores"); @@ -1827,7 +1827,7 @@ public class HRegion implements HeapSize { // , Writable{ } } - ReadWriteConsistencyControl.WriteEntry w = null; + MultiVersionConsistencyControl.WriteEntry w = null; long txid = 0; boolean walSyncSuccessful = false; boolean locked = false; @@ -1917,17 +1917,17 @@ public class HRegion implements HeapSize { // , Writable{ // // ------------------------------------ - // Acquire the latest rwcc number + // Acquire the latest mvcc number // ---------------------------------- - w = rwcc.beginMemstoreInsert(); + w = mvcc.beginMemstoreInsert(); // ------------------------------------ // STEP 3. Write back to memstore // Write to memstore. It is ok to write to memstore // first without updating the HLog because we do not roll - // forward the memstore RWCC. The RWCC will be moved up when + // forward the memstore MVCC. The MVCC will be moved up when // the complete operation is done. These changes are not yet - // visible to scanners till we update the RWCC. The RWCC is + // visible to scanners till we update the MVCC. The MVCC is // moved only when the sync is complete. // ---------------------------------- long addedSize = 0; @@ -1985,10 +1985,10 @@ public class HRegion implements HeapSize { // , Writable{ } walSyncSuccessful = true; // ------------------------------------------------------------------ - // STEP 8. Advance rwcc. This will make this put visible to scanners and getters. + // STEP 8. Advance mvcc. This will make this put visible to scanners and getters. // ------------------------------------------------------------------ if (w != null) { - rwcc.completeMemstoreInsert(w); + mvcc.completeMemstoreInsert(w); w = null; } @@ -2016,7 +2016,7 @@ public class HRegion implements HeapSize { // , Writable{ if (!walSyncSuccessful) { rollbackMemstore(batchOp, familyMaps, firstIndex, lastIndexExclusive); } - if (w != null) rwcc.completeMemstoreInsert(w); + if (w != null) mvcc.completeMemstoreInsert(w); if (locked) { this.updatesLock.readLock().unlock(); @@ -2288,20 +2288,20 @@ public class HRegion implements HeapSize { // , Writable{ * not check the families for validity. * * @param familyMap Map of kvs per family - * @param localizedWriteEntry The WriteEntry of the RWCC for this transaction. - * If null, then this method internally creates a rwcc transaction. + * @param localizedWriteEntry The WriteEntry of the MVCC for this transaction. + * If null, then this method internally creates a mvcc transaction. * @return the additional memory usage of the memstore caused by the * new entries. */ private long applyFamilyMapToMemstore(Map> familyMap, - ReadWriteConsistencyControl.WriteEntry localizedWriteEntry) { + MultiVersionConsistencyControl.WriteEntry localizedWriteEntry) { long size = 0; - boolean freerwcc = false; + boolean freemvcc = false; try { if (localizedWriteEntry == null) { - localizedWriteEntry = rwcc.beginMemstoreInsert(); - freerwcc = true; + localizedWriteEntry = mvcc.beginMemstoreInsert(); + freemvcc = true; } for (Map.Entry> e : familyMap.entrySet()) { @@ -2315,8 +2315,8 @@ public class HRegion implements HeapSize { // , Writable{ } } } finally { - if (freerwcc) { - rwcc.completeMemstoreInsert(localizedWriteEntry); + if (freemvcc) { + mvcc.completeMemstoreInsert(localizedWriteEntry); } } @@ -3013,7 +3013,7 @@ public class HRegion implements HeapSize { // , Writable{ // synchronize on scannerReadPoints so that nobody calculates // getSmallestReadPoint, before scannerReadPoints is updated. synchronized(scannerReadPoints) { - this.readPt = ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + this.readPt = MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); scannerReadPoints.put(this, this.readPt); } @@ -3057,7 +3057,7 @@ public class HRegion implements HeapSize { // , Writable{ try { // This could be a new thread from the last time we called next(). - ReadWriteConsistencyControl.setThreadReadPoint(this.readPt); + MultiVersionConsistencyControl.setThreadReadPoint(this.readPt); results.clear(); @@ -3884,7 +3884,7 @@ public class HRegion implements HeapSize { // , Writable{ */ public Result append(Append append, Integer lockid, boolean writeToWAL) throws IOException { - // TODO: Use RWCC to make this set of appends atomic to reads + // TODO: Use MVCC to make this set of appends atomic to reads byte[] row = append.getRow(); checkRow(row, "append"); boolean flush = false; @@ -4024,7 +4024,7 @@ public class HRegion implements HeapSize { // , Writable{ public Result increment(Increment increment, Integer lockid, boolean writeToWAL) throws IOException { - // TODO: Use RWCC to make this set of increments atomic to reads + // TODO: Use MVCC to make this set of increments atomic to reads byte [] row = increment.getRow(); checkRow(row, "increment"); TimeRange tr = increment.getTimeRange(); @@ -4244,7 +4244,7 @@ public class HRegion implements HeapSize { // , Writable{ ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores (2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock ClassSize.ARRAYLIST + // recentFlushes - ReadWriteConsistencyControl.FIXED_SIZE // rwcc + MultiVersionConsistencyControl.FIXED_SIZE // mvcc ; @Override @@ -4253,7 +4253,7 @@ public class HRegion implements HeapSize { // , Writable{ for(Store store : this.stores.values()) { heapSize += store.heapSize(); } - // this does not take into account row locks, recent flushes, rwcc entries + // this does not take into account row locks, recent flushes, mvcc entries return heapSize; } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index 6727d850f07..5c2d72cf50f 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -713,7 +713,7 @@ public class MemStore implements HeapSize { } protected KeyValue getNext(Iterator it) { - long readPoint = ReadWriteConsistencyControl.getThreadReadPoint(); + long readPoint = MultiVersionConsistencyControl.getThreadReadPoint(); while (it.hasNext()) { KeyValue v = it.next(); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java b/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java similarity index 90% rename from src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java rename to src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java index e7f9386360f..6b28f03436f 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java @@ -33,7 +33,7 @@ import org.apache.commons.logging.Log; * a mechanism for writers to obtain new write numbers, then "commit" * the new writes for readers to read (thus forming atomic transactions). */ -public class ReadWriteConsistencyControl { +public class MultiVersionConsistencyControl { private volatile long memstoreRead = 0; private volatile long memstoreWrite = 0; @@ -55,7 +55,7 @@ public class ReadWriteConsistencyControl { /** * Default constructor. Initializes the memstoreRead/Write points to 0. */ - public ReadWriteConsistencyControl() { + public MultiVersionConsistencyControl() { this.memstoreRead = this.memstoreWrite = 0; } @@ -66,7 +66,7 @@ public class ReadWriteConsistencyControl { public void initialize(long startPoint) { synchronized (writeQueue) { if (this.memstoreWrite != this.memstoreRead) { - throw new RuntimeException("Already used this rwcc. Too late to initialize"); + throw new RuntimeException("Already used this mvcc. Too late to initialize"); } this.memstoreRead = this.memstoreWrite = startPoint; @@ -83,7 +83,7 @@ public class ReadWriteConsistencyControl { } /** - * Set the thread read point to the given value. The thread RWCC + * Set the thread read point to the given value. The thread MVCC * is used by the Memstore scanner so it knows which values to skip. * Give it a value of 0 if you want everything. */ @@ -92,16 +92,16 @@ public class ReadWriteConsistencyControl { } /** - * Set the thread RWCC read point to whatever the current read point is in - * this particular instance of RWCC. Returns the new thread read point value. + * Set the thread MVCC read point to whatever the current read point is in + * this particular instance of MVCC. Returns the new thread read point value. */ - public static long resetThreadReadPoint(ReadWriteConsistencyControl rwcc) { - perThreadReadPoint.set(rwcc.memstoreReadPoint()); + public static long resetThreadReadPoint(MultiVersionConsistencyControl mvcc) { + perThreadReadPoint.set(mvcc.memstoreReadPoint()); return getThreadReadPoint(); } /** - * Set the thread RWCC read point to 0 (include everything). + * Set the thread MVCC read point to 0 (include everything). */ public static void resetThreadReadPoint() { perThreadReadPoint.set(0L); @@ -204,10 +204,10 @@ public class ReadWriteConsistencyControl { return this.writeNumber; } } - + public static final long FIXED_SIZE = ClassSize.align( - ClassSize.OBJECT + - 2 * Bytes.SIZEOF_LONG + + ClassSize.OBJECT + + 2 * Bytes.SIZEOF_LONG + 2 * ClassSize.REFERENCE); - + } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 9b9856ae342..52056f276ad 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -1285,7 +1285,7 @@ public class Store extends SchemaConfigured implements HeapSize { StoreFile.Writer writer = null; // Find the smallest read point across all the Scanners. long smallestReadPoint = region.getSmallestReadPoint(); - ReadWriteConsistencyControl.setThreadReadPoint(smallestReadPoint); + MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint); try { InternalScanner scanner = null; try { @@ -1942,7 +1942,7 @@ public class Store extends SchemaConfigured implements HeapSize { throws IOException { this.lock.readLock().lock(); try { - // TODO: Make this operation atomic w/ RWCC + // TODO: Make this operation atomic w/ MVCC return this.memstore.upsert(kvs); } finally { this.lock.readLock().unlock(); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index 301273e5ab5..97d55de4f31 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -52,7 +52,7 @@ class StoreFileScanner implements KeyValueScanner { private boolean delayedReseek; private KeyValue delayedSeekKV; - private boolean enforceRWCC = false; + private boolean enforceMVCC = false; //The variable, realSeekDone, may cheat on store file scanner for the // multi-column bloom-filter optimization. @@ -67,10 +67,10 @@ class StoreFileScanner implements KeyValueScanner { * Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner} * @param hfs HFile scanner */ - public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs, boolean useRWCC) { + public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs, boolean useMVCC) { this.reader = reader; this.hfs = hfs; - this.enforceRWCC = useRWCC; + this.enforceMVCC = useMVCC; } /** @@ -182,11 +182,11 @@ class StoreFileScanner implements KeyValueScanner { } protected boolean skipKVsNewerThanReadpoint() throws IOException { - long readPoint = ReadWriteConsistencyControl.getThreadReadPoint(); + long readPoint = MultiVersionConsistencyControl.getThreadReadPoint(); // We want to ignore all key-values that are newer than our current // readPoint - while(enforceRWCC + while(enforceMVCC && cur != null && (cur.getMemstoreTS() > readPoint)) { hfs.next(); diff --git a/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 3de109b8e3a..d1b7647cb27 100644 --- a/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -57,7 +57,7 @@ import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.InternalScanner; -import org.apache.hadoop.hbase.regionserver.ReadWriteConsistencyControl; +import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.security.User; @@ -1525,7 +1525,7 @@ public class HBaseTestingUtility { */ public static List getFromStoreFile(Store store, Get get) throws IOException { - ReadWriteConsistencyControl.resetThreadReadPoint(); + MultiVersionConsistencyControl.resetThreadReadPoint(); Scan scan = new Scan(get); InternalScanner scanner = (InternalScanner) store.getScanner(scan, scan.getFamilyMap().get(store.getFamily().getName())); diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index a8a548caa0a..5daa02b1dfa 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -1444,12 +1444,12 @@ public class TestHRegion extends HBaseTestCase { scan.addFamily(fam2); scan.addFamily(fam4); is = (RegionScannerImpl) region.getScanner(scan); - ReadWriteConsistencyControl.resetThreadReadPoint(region.getRWCC()); + MultiVersionConsistencyControl.resetThreadReadPoint(region.getMVCC()); assertEquals(1, ((RegionScannerImpl)is).storeHeap.getHeap().size()); scan = new Scan(); is = (RegionScannerImpl) region.getScanner(scan); - ReadWriteConsistencyControl.resetThreadReadPoint(region.getRWCC()); + MultiVersionConsistencyControl.resetThreadReadPoint(region.getMVCC()); assertEquals(families.length -1, ((RegionScannerImpl)is).storeHeap.getHeap().size()); } diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java index 925b385ee46..be75cb157d7 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java @@ -55,12 +55,12 @@ public class TestMemStore extends TestCase { private static final byte [] CONTENTS = Bytes.toBytes("contents"); private static final byte [] BASIC = Bytes.toBytes("basic"); private static final String CONTENTSTR = "contentstr"; - private ReadWriteConsistencyControl rwcc; + private MultiVersionConsistencyControl mvcc; @Override public void setUp() throws Exception { super.setUp(); - this.rwcc = new ReadWriteConsistencyControl(); + this.mvcc = new MultiVersionConsistencyControl(); this.memstore = new MemStore(); } @@ -86,7 +86,7 @@ public class TestMemStore extends TestCase { List memstorescanners = this.memstore.getScanners(); Scan scan = new Scan(); List result = new ArrayList(); - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); ScanInfo scanInfo = new ScanInfo(null, 0, 1, HConstants.LATEST_TIMESTAMP, false, this.memstore.comparator); ScanType scanType = ScanType.USER_SCAN; @@ -108,7 +108,7 @@ public class TestMemStore extends TestCase { scanner.close(); } - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); memstorescanners = this.memstore.getScanners(); // Now assert can count same number even if a snapshot mid-scan. s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners); @@ -198,7 +198,7 @@ public class TestMemStore extends TestCase { private void verifyScanAcrossSnapshot2(KeyValue kv1, KeyValue kv2) throws IOException { - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); List memstorescanners = this.memstore.getScanners(); assertEquals(1, memstorescanners.size()); final KeyValueScanner scanner = memstorescanners.get(0); @@ -233,35 +233,35 @@ public class TestMemStore extends TestCase { final byte[] q2 = Bytes.toBytes("q2"); final byte[] v = Bytes.toBytes("value"); - ReadWriteConsistencyControl.WriteEntry w = - rwcc.beginMemstoreInsert(); + MultiVersionConsistencyControl.WriteEntry w = + mvcc.beginMemstoreInsert(); KeyValue kv1 = new KeyValue(row, f, q1, v); kv1.setMemstoreTS(w.getWriteNumber()); memstore.add(kv1); - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); KeyValueScanner s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{}); - rwcc.completeMemstoreInsert(w); + mvcc.completeMemstoreInsert(w); - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv1}); - w = rwcc.beginMemstoreInsert(); + w = mvcc.beginMemstoreInsert(); KeyValue kv2 = new KeyValue(row, f, q2, v); kv2.setMemstoreTS(w.getWriteNumber()); memstore.add(kv2); - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv1}); - rwcc.completeMemstoreInsert(w); + mvcc.completeMemstoreInsert(w); - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv1, kv2}); } @@ -281,8 +281,8 @@ public class TestMemStore extends TestCase { final byte[] v2 = Bytes.toBytes("value2"); // INSERT 1: Write both columns val1 - ReadWriteConsistencyControl.WriteEntry w = - rwcc.beginMemstoreInsert(); + MultiVersionConsistencyControl.WriteEntry w = + mvcc.beginMemstoreInsert(); KeyValue kv11 = new KeyValue(row, f, q1, v1); kv11.setMemstoreTS(w.getWriteNumber()); @@ -291,15 +291,15 @@ public class TestMemStore extends TestCase { KeyValue kv12 = new KeyValue(row, f, q2, v1); kv12.setMemstoreTS(w.getWriteNumber()); memstore.add(kv12); - rwcc.completeMemstoreInsert(w); + mvcc.completeMemstoreInsert(w); // BEFORE STARTING INSERT 2, SEE FIRST KVS - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); KeyValueScanner s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv11, kv12}); // START INSERT 2: Write both columns val2 - w = rwcc.beginMemstoreInsert(); + w = mvcc.beginMemstoreInsert(); KeyValue kv21 = new KeyValue(row, f, q1, v2); kv21.setMemstoreTS(w.getWriteNumber()); memstore.add(kv21); @@ -309,17 +309,17 @@ public class TestMemStore extends TestCase { memstore.add(kv22); // BEFORE COMPLETING INSERT 2, SEE FIRST KVS - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv11, kv12}); // COMPLETE INSERT 2 - rwcc.completeMemstoreInsert(w); + mvcc.completeMemstoreInsert(w); // NOW SHOULD SEE NEW KVS IN ADDITION TO OLD KVS. // See HBASE-1485 for discussion about what we should do with // the duplicate-TS inserts - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv21, kv11, kv22, kv12}); } @@ -336,8 +336,8 @@ public class TestMemStore extends TestCase { final byte[] q2 = Bytes.toBytes("q2"); final byte[] v1 = Bytes.toBytes("value1"); // INSERT 1: Write both columns val1 - ReadWriteConsistencyControl.WriteEntry w = - rwcc.beginMemstoreInsert(); + MultiVersionConsistencyControl.WriteEntry w = + mvcc.beginMemstoreInsert(); KeyValue kv11 = new KeyValue(row, f, q1, v1); kv11.setMemstoreTS(w.getWriteNumber()); @@ -346,30 +346,30 @@ public class TestMemStore extends TestCase { KeyValue kv12 = new KeyValue(row, f, q2, v1); kv12.setMemstoreTS(w.getWriteNumber()); memstore.add(kv12); - rwcc.completeMemstoreInsert(w); + mvcc.completeMemstoreInsert(w); // BEFORE STARTING INSERT 2, SEE FIRST KVS - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); KeyValueScanner s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv11, kv12}); // START DELETE: Insert delete for one of the columns - w = rwcc.beginMemstoreInsert(); + w = mvcc.beginMemstoreInsert(); KeyValue kvDel = new KeyValue(row, f, q2, kv11.getTimestamp(), KeyValue.Type.DeleteColumn); kvDel.setMemstoreTS(w.getWriteNumber()); memstore.add(kvDel); // BEFORE COMPLETING DELETE, SEE FIRST KVS - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv11, kv12}); // COMPLETE DELETE - rwcc.completeMemstoreInsert(w); + mvcc.completeMemstoreInsert(w); // NOW WE SHOULD SEE DELETE - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv11, kvDel, kv12}); } @@ -383,7 +383,7 @@ public class TestMemStore extends TestCase { final byte[] f = Bytes.toBytes("family"); final byte[] q1 = Bytes.toBytes("q1"); - final ReadWriteConsistencyControl rwcc; + final MultiVersionConsistencyControl mvcc; final MemStore memstore; AtomicReference caughtException; @@ -391,10 +391,10 @@ public class TestMemStore extends TestCase { public ReadOwnWritesTester(int id, MemStore memstore, - ReadWriteConsistencyControl rwcc, + MultiVersionConsistencyControl mvcc, AtomicReference caughtException) { - this.rwcc = rwcc; + this.mvcc = mvcc; this.memstore = memstore; this.caughtException = caughtException; row = Bytes.toBytes(id); @@ -410,8 +410,8 @@ public class TestMemStore extends TestCase { private void internalRun() throws IOException { for (long i = 0; i < NUM_TRIES && caughtException.get() == null; i++) { - ReadWriteConsistencyControl.WriteEntry w = - rwcc.beginMemstoreInsert(); + MultiVersionConsistencyControl.WriteEntry w = + mvcc.beginMemstoreInsert(); // Insert the sequence value (i) byte[] v = Bytes.toBytes(i); @@ -419,10 +419,10 @@ public class TestMemStore extends TestCase { KeyValue kv = new KeyValue(row, f, q1, i, v); kv.setMemstoreTS(w.getWriteNumber()); memstore.add(kv); - rwcc.completeMemstoreInsert(w); + mvcc.completeMemstoreInsert(w); // Assert that we can read back - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); KeyValueScanner s = this.memstore.getScanners().get(0); s.seek(kv); @@ -443,7 +443,7 @@ public class TestMemStore extends TestCase { AtomicReference caught = new AtomicReference(); for (int i = 0; i < NUM_THREADS; i++) { - threads[i] = new ReadOwnWritesTester(i, memstore, rwcc, caught); + threads[i] = new ReadOwnWritesTester(i, memstore, mvcc, caught); threads[i].start(); } @@ -531,7 +531,7 @@ public class TestMemStore extends TestCase { * @throws InterruptedException */ public void testGetNextRow() throws Exception { - ReadWriteConsistencyControl.resetThreadReadPoint(); + MultiVersionConsistencyControl.resetThreadReadPoint(); addRows(this.memstore); // Add more versions to make it a little more interesting. Thread.sleep(1); @@ -947,7 +947,7 @@ public class TestMemStore extends TestCase { } public static void main(String [] args) throws IOException { - ReadWriteConsistencyControl rwcc = new ReadWriteConsistencyControl(); + MultiVersionConsistencyControl mvcc = new MultiVersionConsistencyControl(); MemStore ms = new MemStore(); long n1 = System.nanoTime(); @@ -957,7 +957,7 @@ public class TestMemStore extends TestCase { System.out.println("foo"); - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); for (int i = 0 ; i < 50 ; i++) doScan(ms, i); diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java index ea6d6a51c1b..5c09a7e560b 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java @@ -31,12 +31,12 @@ import java.util.concurrent.atomic.AtomicLong; public class TestReadWriteConsistencyControl extends TestCase { static class Writer implements Runnable { final AtomicBoolean finished; - final ReadWriteConsistencyControl rwcc; + final MultiVersionConsistencyControl mvcc; final AtomicBoolean status; - Writer(AtomicBoolean finished, ReadWriteConsistencyControl rwcc, AtomicBoolean status) { + Writer(AtomicBoolean finished, MultiVersionConsistencyControl mvcc, AtomicBoolean status) { this.finished = finished; - this.rwcc = rwcc; + this.mvcc = mvcc; this.status = status; } private Random rnd = new Random(); @@ -44,7 +44,7 @@ public class TestReadWriteConsistencyControl extends TestCase { public void run() { while (!finished.get()) { - ReadWriteConsistencyControl.WriteEntry e = rwcc.beginMemstoreInsert(); + MultiVersionConsistencyControl.WriteEntry e = mvcc.beginMemstoreInsert(); // System.out.println("Begin write: " + e.getWriteNumber()); // 10 usec - 500usec (including 0) int sleepTime = rnd.nextInt(500); @@ -56,7 +56,7 @@ public class TestReadWriteConsistencyControl extends TestCase { } catch (InterruptedException e1) { } try { - rwcc.completeMemstoreInsert(e); + mvcc.completeMemstoreInsert(e); } catch (RuntimeException ex) { // got failure System.out.println(ex.toString()); @@ -70,7 +70,7 @@ public class TestReadWriteConsistencyControl extends TestCase { } public void testParallelism() throws Exception { - final ReadWriteConsistencyControl rwcc = new ReadWriteConsistencyControl(); + final MultiVersionConsistencyControl mvcc = new MultiVersionConsistencyControl(); final AtomicBoolean finished = new AtomicBoolean(false); @@ -79,9 +79,9 @@ public class TestReadWriteConsistencyControl extends TestCase { final AtomicLong failedAt = new AtomicLong(); Runnable reader = new Runnable() { public void run() { - long prev = rwcc.memstoreReadPoint(); + long prev = mvcc.memstoreReadPoint(); while (!finished.get()) { - long newPrev = rwcc.memstoreReadPoint(); + long newPrev = mvcc.memstoreReadPoint(); if (newPrev < prev) { // serious problem. System.out.println("Reader got out of order, prev: " + @@ -103,7 +103,7 @@ public class TestReadWriteConsistencyControl extends TestCase { for (int i = 0 ; i < n ; ++i ) { statuses[i] = new AtomicBoolean(true); - writers[i] = new Thread(new Writer(finished, rwcc, statuses[i])); + writers[i] = new Thread(new Writer(finished, mvcc, statuses[i])); writers[i].start(); } readThread.start();