HBASE-1738 Scanner doesnt reset when a snapshot is created, could miss new updates into the 'kvset' (active part)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@805183 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2009-08-17 22:21:00 +00:00
parent eab24df54c
commit 33c03f67fa
5 changed files with 183 additions and 42 deletions

View File

@ -337,6 +337,8 @@ Release 0.20.0 - Unreleased
storefile problems storefile problems
HBASE-1761 getclosest doesn't understand delete family; manifests as HBASE-1761 getclosest doesn't understand delete family; manifests as
"HRegionInfo was null or empty in .META" A.K.A the BS problem "HRegionInfo was null or empty in .META" A.K.A the BS problem
HBASE-1738 Scanner doesnt reset when a snapshot is created, could miss
new updates into the 'kvset' (active part)
IMPROVEMENTS IMPROVEMENTS
HBASE-1089 Add count of regions on filesystem to master UI; add percentage HBASE-1089 Add count of regions on filesystem to master UI; add percentage

View File

@ -28,7 +28,9 @@ import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.NavigableSet; import java.util.NavigableSet;
import java.util.Set;
import java.util.SortedSet; import java.util.SortedSet;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -78,6 +80,10 @@ public class MemStore implements HeapSize {
// Used to track own heapSize // Used to track own heapSize
final AtomicLong size; final AtomicLong size;
// All access must be synchronized.
final CopyOnWriteArraySet<ChangedMemStoreObserver> changedMemStoreObservers =
new CopyOnWriteArraySet<ChangedMemStoreObserver>();
/** /**
* Default constructor. Used for tests. * Default constructor. Used for tests.
*/ */
@ -123,12 +129,10 @@ public class MemStore implements HeapSize {
LOG.warn("Snapshot called again without clearing previous. " + LOG.warn("Snapshot called again without clearing previous. " +
"Doing nothing. Another ongoing flush or did we fail last attempt?"); "Doing nothing. Another ongoing flush or did we fail last attempt?");
} else { } else {
// We used to synchronize on the memstore here but we're inside a
// write lock so removed it. Comment is left in case removal was a
// mistake. St.Ack
if (!this.kvset.isEmpty()) { if (!this.kvset.isEmpty()) {
this.snapshot = this.kvset; this.snapshot = this.kvset;
this.kvset = new KeyValueSkipListSet(this.comparator); this.kvset = new KeyValueSkipListSet(this.comparator);
tellChangedMemStoreObservers();
// Reset heap to not include any keys // Reset heap to not include any keys
this.size.set(DEEP_OVERHEAD); this.size.set(DEEP_OVERHEAD);
} }
@ -138,6 +142,15 @@ public class MemStore implements HeapSize {
} }
} }
/*
* Tell outstanding scanners that memstore has changed.
*/
private void tellChangedMemStoreObservers() {
for (ChangedMemStoreObserver o: this.changedMemStoreObservers) {
o.changedMemStore();
}
}
/** /**
* Return the current snapshot. * Return the current snapshot.
* Called by flusher to get current snapshot made by a previous * Called by flusher to get current snapshot made by a previous
@ -168,6 +181,7 @@ public class MemStore implements HeapSize {
// create a new snapshot and let the old one go. // create a new snapshot and let the old one go.
if (!ss.isEmpty()) { if (!ss.isEmpty()) {
this.snapshot = new KeyValueSkipListSet(this.comparator); this.snapshot = new KeyValueSkipListSet(this.comparator);
tellChangedMemStoreObservers();
} }
} finally { } finally {
this.lock.writeLock().unlock(); this.lock.writeLock().unlock();
@ -445,9 +459,8 @@ public class MemStore implements HeapSize {
KeyValueScanner [] getScanners() { KeyValueScanner [] getScanners() {
this.lock.readLock().lock(); this.lock.readLock().lock();
try { try {
KeyValueScanner [] scanners = new KeyValueScanner[2]; KeyValueScanner [] scanners = new KeyValueScanner[1];
scanners[0] = new MemStoreScanner(this.kvset); scanners[0] = new MemStoreScanner(this.changedMemStoreObservers);
scanners[1] = new MemStoreScanner(this.snapshot);
return scanners; return scanners;
} finally { } finally {
this.lock.readLock().unlock(); this.lock.readLock().unlock();
@ -521,18 +534,22 @@ public class MemStore implements HeapSize {
/* /*
* MemStoreScanner implements the KeyValueScanner. * MemStoreScanner implements the KeyValueScanner.
* It lets the caller scan the contents of a memstore. * It lets the caller scan the contents of a memstore -- both current
* This behaves as if it were a real scanner but does not maintain position * map and snapshot.
* in the passed memstore tree. * This behaves as if it were a real scanner but does not maintain position.
*/ */
protected class MemStoreScanner implements KeyValueScanner { protected class MemStoreScanner implements KeyValueScanner, ChangedMemStoreObserver {
private final NavigableSet<KeyValue> kvs;
private KeyValue current = null;
private List<KeyValue> result = new ArrayList<KeyValue>(); private List<KeyValue> result = new ArrayList<KeyValue>();
private int idx = 0; private int idx = 0;
// Make access atomic.
private FirstOnRow firstOnNextRow = new FirstOnRow();
// Keep reference to Set so can remove myself when closed.
private final Set<ChangedMemStoreObserver> observers;
MemStoreScanner(final NavigableSet<KeyValue> s) { MemStoreScanner(final Set<ChangedMemStoreObserver> observers) {
this.kvs = s; super();
this.observers = observers;
this.observers.add(this);
} }
public boolean seek(KeyValue key) { public boolean seek(KeyValue key) {
@ -541,7 +558,7 @@ public class MemStore implements HeapSize {
close(); close();
return false; return false;
} }
this.current = key; this.firstOnNextRow.set(key);
return cacheNextRow(); return cacheNextRow();
} catch(Exception e) { } catch(Exception e) {
close(); close();
@ -570,47 +587,117 @@ public class MemStore implements HeapSize {
} }
/** /**
* @return True if we successfully cached a NavigableSet aligned on * @return True if successfully cached a next row.
* next row.
*/ */
boolean cacheNextRow() { boolean cacheNextRow() {
SortedSet<KeyValue> keys; // Prevent snapshot being cleared while caching a row.
lock.readLock().lock();
this.result.clear();
this.idx = 0;
try { try {
keys = this.kvs.tailSet(this.current); // Look at each set, kvset and snapshot.
} catch (Exception e) { // Both look for matching entries for this.current row returning what
close(); // they
return false; // have as next row after this.current (or null if nothing in set or if
} // nothing follows.
if (keys == null || keys.isEmpty()) { KeyValue kvsetNextRow = cacheNextRow(kvset);
close(); KeyValue snapshotNextRow = cacheNextRow(snapshot);
return false; if (kvsetNextRow == null && snapshotNextRow == null) {
} // Nothing more in memstore but we might have gotten current row
this.current = null; // results
byte [] row = keys.first().getRow(); // Indicate at end of store by setting next row to null.
for (KeyValue kv: keys) { this.firstOnNextRow.set(null);
if (comparator.compareRows(kv, row) != 0) { return !this.result.isEmpty();
this.current = kv; } else if (kvsetNextRow != null && snapshotNextRow != null) {
break; // Set current at the lowest of the two values.
} int compare = comparator.compare(kvsetNextRow, snapshotNextRow);
result.add(kv); this.firstOnNextRow.set(compare <= 0? kvsetNextRow: snapshotNextRow);
} else {
this.firstOnNextRow.set(kvsetNextRow != null? kvsetNextRow: snapshotNextRow);
} }
return true; return true;
} finally {
lock.readLock().unlock();
}
}
/*
* See if set has entries for the <code>this.current</code> row. If so,
* add them to <code>this.result</code>.
* @param set Set to examine
* @return Next row in passed <code>set</code> or null if nothing in this
* passed <code>set</code>
*/
private KeyValue cacheNextRow(final NavigableSet<KeyValue> set) {
if (this.firstOnNextRow.get() == null || set.isEmpty()) return null;
SortedSet<KeyValue> tail = set.tailSet(this.firstOnNextRow.get());
if (tail == null || tail.isEmpty()) return null;
KeyValue first = tail.first();
KeyValue nextRow = null;
for (KeyValue kv: tail) {
if (comparator.compareRows(first, kv) != 0) {
nextRow = kv;
break;
}
this.result.add(kv);
}
return nextRow;
} }
public void close() { public void close() {
current = null; this.firstOnNextRow.set(null);
idx = 0; idx = 0;
if (!result.isEmpty()) { if (!result.isEmpty()) {
result.clear(); result.clear();
} }
this.observers.remove(this);
}
public void changedMemStore() {
this.firstOnNextRow.reset();
}
}
/*
* Private class that holds firstOnRow and utility.
* Usually firstOnRow is the first KeyValue we find on next row rather than
* the absolute minimal first key (empty column, Type.Maximum, maximum ts).
* Usually its ok being sloppy with firstOnRow letting it be the first thing
* found on next row -- this works -- but if the memstore changes on us, reset
* firstOnRow to be the ultimate firstOnRow. We play sloppy with firstOnRow
* usually so we don't have to allocate a new KeyValue each time firstOnRow
* is updated.
*/
private static class FirstOnRow {
private KeyValue firstOnRow = null;
FirstOnRow() {
super();
}
synchronized void set(final KeyValue kv) {
this.firstOnRow = kv;
}
/* Reset firstOnRow to a 'clean', absolute firstOnRow.
*/
synchronized void reset() {
if (this.firstOnRow == null) return;
this.firstOnRow =
new KeyValue(this.firstOnRow.getRow(), HConstants.LATEST_TIMESTAMP);
}
synchronized KeyValue get() {
return this.firstOnRow;
} }
} }
public final static long FIXED_OVERHEAD = ClassSize.align( public final static long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT + (7 * ClassSize.REFERENCE)); ClassSize.OBJECT + (8 * ClassSize.REFERENCE));
public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG + ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG +
ClassSize.COPYONWRITE_ARRAYSET + ClassSize.COPYONWRITE_ARRAYLIST +
(2 * ClassSize.CONCURRENT_SKIPLISTMAP)); (2 * ClassSize.CONCURRENT_SKIPLISTMAP));
/* /*
@ -682,4 +769,16 @@ public class MemStore implements HeapSize {
} }
LOG.info("Exiting."); LOG.info("Exiting.");
} }
/**
* Observers want to know about MemStore changes.
* Called when snapshot is cleared and when we make one.
*/
interface ChangedMemStoreObserver {
/**
* Notify observers.
* @throws IOException
*/
void changedMemStore();
}
} }

View File

@ -92,6 +92,12 @@ public class ClassSize {
/** Overhead for AtomicBoolean */ /** Overhead for AtomicBoolean */
public static int ATOMIC_BOOLEAN = 0; public static int ATOMIC_BOOLEAN = 0;
/** Overhead for CopyOnWriteArraySet */
public static int COPYONWRITE_ARRAYSET = 0;
/** Overhead for CopyOnWriteArrayList */
public static int COPYONWRITE_ARRAYLIST = 0;
private static final String THIRTY_TWO = "32"; private static final String THIRTY_TWO = "32";
/** /**
@ -151,6 +157,9 @@ public class ClassSize {
ATOMIC_BOOLEAN = align(OBJECT + Bytes.SIZEOF_BOOLEAN); ATOMIC_BOOLEAN = align(OBJECT + Bytes.SIZEOF_BOOLEAN);
COPYONWRITE_ARRAYSET = align(OBJECT + REFERENCE);
COPYONWRITE_ARRAYLIST = align(OBJECT + (2 * REFERENCE) + ARRAY);
} }
/** /**

View File

@ -6,6 +6,8 @@ import java.util.ArrayList;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -164,6 +166,25 @@ public class TestHeapSize extends TestCase {
assertEquals(expected, actual); assertEquals(expected, actual);
} }
// CopyOnWriteArraySet
cl = CopyOnWriteArraySet.class;
expected = ClassSize.estimateBase(cl, false);
actual = ClassSize.COPYONWRITE_ARRAYSET;
if(expected != actual) {
ClassSize.estimateBase(cl, true);
assertEquals(expected, actual);
}
// CopyOnWriteArrayList
cl = CopyOnWriteArrayList.class;
expected = ClassSize.estimateBase(cl, false);
actual = ClassSize.COPYONWRITE_ARRAYLIST;
if(expected != actual) {
ClassSize.estimateBase(cl, true);
assertEquals(expected, actual);
}
} }
/** /**
@ -240,11 +261,15 @@ public class TestHeapSize extends TestCase {
expected += ClassSize.estimateBase(AtomicLong.class, false); expected += ClassSize.estimateBase(AtomicLong.class, false);
expected += ClassSize.estimateBase(ConcurrentSkipListMap.class, false); expected += ClassSize.estimateBase(ConcurrentSkipListMap.class, false);
expected += ClassSize.estimateBase(ConcurrentSkipListMap.class, false); expected += ClassSize.estimateBase(ConcurrentSkipListMap.class, false);
expected += ClassSize.estimateBase(CopyOnWriteArraySet.class, false);
expected += ClassSize.estimateBase(CopyOnWriteArrayList.class, false);
if(expected != actual) { if(expected != actual) {
ClassSize.estimateBase(cl, true); ClassSize.estimateBase(cl, true);
ClassSize.estimateBase(ReentrantReadWriteLock.class, true); ClassSize.estimateBase(ReentrantReadWriteLock.class, true);
ClassSize.estimateBase(AtomicLong.class, true); ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(ConcurrentSkipListMap.class, true); ClassSize.estimateBase(ConcurrentSkipListMap.class, true);
ClassSize.estimateBase(CopyOnWriteArraySet.class, true);
ClassSize.estimateBase(CopyOnWriteArrayList.class, true);
assertEquals(expected, actual); assertEquals(expected, actual);
} }

View File

@ -90,6 +90,10 @@ public class TestMemStore extends TestCase {
s.close(); s.close();
} }
assertEquals(rowCount, count); assertEquals(rowCount, count);
for (int i = 0; i < memstorescanners.length; i++) {
memstorescanners[0].close();
}
memstorescanners = this.memstore.getScanners();
// Now assert can count same number even if a snapshot mid-scan. // Now assert can count same number even if a snapshot mid-scan.
s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP, s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP,
this.memstore.comparator, null, memstorescanners); this.memstore.comparator, null, memstorescanners);
@ -112,6 +116,10 @@ public class TestMemStore extends TestCase {
s.close(); s.close();
} }
assertEquals(rowCount, count); assertEquals(rowCount, count);
for (int i = 0; i < memstorescanners.length; i++) {
memstorescanners[0].close();
}
memstorescanners = this.memstore.getScanners();
// Assert that new values are seen in kvset as we scan. // Assert that new values are seen in kvset as we scan.
long ts = System.currentTimeMillis(); long ts = System.currentTimeMillis();
s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP, s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP,
@ -124,8 +132,7 @@ public class TestMemStore extends TestCase {
// Assert the stuff is coming out in right order. // Assert the stuff is coming out in right order.
assertTrue(Bytes.compareTo(Bytes.toBytes(count), result.get(0).getRow()) == 0); assertTrue(Bytes.compareTo(Bytes.toBytes(count), result.get(0).getRow()) == 0);
// Row count is same as column count. // Row count is same as column count.
// TODO PUTBACK assertEquals("count=" + count + ", result=" + result, assertEquals("count=" + count + ", result=" + result, rowCount, result.size());
// rowCount, result.size());
count++; count++;
if (count == snapshotIndex) { if (count == snapshotIndex) {
this.memstore.snapshot(); this.memstore.snapshot();
@ -408,7 +415,6 @@ public class TestMemStore extends TestCase {
} }
} }
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
// Delete tests // Delete tests
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////