diff --git a/CHANGES.txt b/CHANGES.txt index b2d6ec494d0..a729eba8695 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -337,6 +337,8 @@ Release 0.20.0 - Unreleased storefile problems HBASE-1761 getclosest doesn't understand delete family; manifests as "HRegionInfo was null or empty in .META" A.K.A the BS problem + HBASE-1738 Scanner doesnt reset when a snapshot is created, could miss + new updates into the 'kvset' (active part) IMPROVEMENTS HBASE-1089 Add count of regions on filesystem to master UI; add percentage diff --git a/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java index 16c4818d0e6..fa5ede2e3dd 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -28,7 +28,9 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.NavigableSet; +import java.util.Set; import java.util.SortedSet; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -78,6 +80,10 @@ public class MemStore implements HeapSize { // Used to track own heapSize final AtomicLong size; + // All access must be synchronized. + final CopyOnWriteArraySet changedMemStoreObservers = + new CopyOnWriteArraySet(); + /** * Default constructor. Used for tests. */ @@ -123,12 +129,10 @@ public class MemStore implements HeapSize { LOG.warn("Snapshot called again without clearing previous. " + "Doing nothing. Another ongoing flush or did we fail last attempt?"); } else { - // We used to synchronize on the memstore here but we're inside a - // write lock so removed it. Comment is left in case removal was a - // mistake. St.Ack if (!this.kvset.isEmpty()) { this.snapshot = this.kvset; this.kvset = new KeyValueSkipListSet(this.comparator); + tellChangedMemStoreObservers(); // Reset heap to not include any keys this.size.set(DEEP_OVERHEAD); } @@ -138,6 +142,15 @@ public class MemStore implements HeapSize { } } + /* + * Tell outstanding scanners that memstore has changed. + */ + private void tellChangedMemStoreObservers() { + for (ChangedMemStoreObserver o: this.changedMemStoreObservers) { + o.changedMemStore(); + } + } + /** * Return the current snapshot. * Called by flusher to get current snapshot made by a previous @@ -168,6 +181,7 @@ public class MemStore implements HeapSize { // create a new snapshot and let the old one go. if (!ss.isEmpty()) { this.snapshot = new KeyValueSkipListSet(this.comparator); + tellChangedMemStoreObservers(); } } finally { this.lock.writeLock().unlock(); @@ -445,9 +459,8 @@ public class MemStore implements HeapSize { KeyValueScanner [] getScanners() { this.lock.readLock().lock(); try { - KeyValueScanner [] scanners = new KeyValueScanner[2]; - scanners[0] = new MemStoreScanner(this.kvset); - scanners[1] = new MemStoreScanner(this.snapshot); + KeyValueScanner [] scanners = new KeyValueScanner[1]; + scanners[0] = new MemStoreScanner(this.changedMemStoreObservers); return scanners; } finally { this.lock.readLock().unlock(); @@ -521,18 +534,22 @@ public class MemStore implements HeapSize { /* * MemStoreScanner implements the KeyValueScanner. - * It lets the caller scan the contents of a memstore. - * This behaves as if it were a real scanner but does not maintain position - * in the passed memstore tree. + * It lets the caller scan the contents of a memstore -- both current + * map and snapshot. + * This behaves as if it were a real scanner but does not maintain position. */ - protected class MemStoreScanner implements KeyValueScanner { - private final NavigableSet kvs; - private KeyValue current = null; + protected class MemStoreScanner implements KeyValueScanner, ChangedMemStoreObserver { private List result = new ArrayList(); private int idx = 0; + // Make access atomic. + private FirstOnRow firstOnNextRow = new FirstOnRow(); + // Keep reference to Set so can remove myself when closed. + private final Set observers; - MemStoreScanner(final NavigableSet s) { - this.kvs = s; + MemStoreScanner(final Set observers) { + super(); + this.observers = observers; + this.observers.add(this); } public boolean seek(KeyValue key) { @@ -541,7 +558,7 @@ public class MemStore implements HeapSize { close(); return false; } - this.current = key; + this.firstOnNextRow.set(key); return cacheNextRow(); } catch(Exception e) { close(); @@ -570,47 +587,117 @@ public class MemStore implements HeapSize { } /** - * @return True if we successfully cached a NavigableSet aligned on - * next row. + * @return True if successfully cached a next row. */ boolean cacheNextRow() { - SortedSet keys; + // Prevent snapshot being cleared while caching a row. + lock.readLock().lock(); + this.result.clear(); + this.idx = 0; try { - keys = this.kvs.tailSet(this.current); - } catch (Exception e) { - close(); - return false; + // Look at each set, kvset and snapshot. + // Both look for matching entries for this.current row returning what + // they + // have as next row after this.current (or null if nothing in set or if + // nothing follows. + KeyValue kvsetNextRow = cacheNextRow(kvset); + KeyValue snapshotNextRow = cacheNextRow(snapshot); + if (kvsetNextRow == null && snapshotNextRow == null) { + // Nothing more in memstore but we might have gotten current row + // results + // Indicate at end of store by setting next row to null. + this.firstOnNextRow.set(null); + return !this.result.isEmpty(); + } else if (kvsetNextRow != null && snapshotNextRow != null) { + // Set current at the lowest of the two values. + int compare = comparator.compare(kvsetNextRow, snapshotNextRow); + this.firstOnNextRow.set(compare <= 0? kvsetNextRow: snapshotNextRow); + } else { + this.firstOnNextRow.set(kvsetNextRow != null? kvsetNextRow: snapshotNextRow); + } + return true; + } finally { + lock.readLock().unlock(); } - if (keys == null || keys.isEmpty()) { - close(); - return false; - } - this.current = null; - byte [] row = keys.first().getRow(); - for (KeyValue kv: keys) { - if (comparator.compareRows(kv, row) != 0) { - this.current = kv; + } + + /* + * See if set has entries for the this.current row. If so, + * add them to this.result. + * @param set Set to examine + * @return Next row in passed set or null if nothing in this + * passed set + */ + private KeyValue cacheNextRow(final NavigableSet set) { + if (this.firstOnNextRow.get() == null || set.isEmpty()) return null; + SortedSet tail = set.tailSet(this.firstOnNextRow.get()); + if (tail == null || tail.isEmpty()) return null; + KeyValue first = tail.first(); + KeyValue nextRow = null; + for (KeyValue kv: tail) { + if (comparator.compareRows(first, kv) != 0) { + nextRow = kv; break; } - result.add(kv); + this.result.add(kv); } - return true; + return nextRow; } public void close() { - current = null; + this.firstOnNextRow.set(null); idx = 0; if (!result.isEmpty()) { result.clear(); } + this.observers.remove(this); + } + + public void changedMemStore() { + this.firstOnNextRow.reset(); } } - + + /* + * Private class that holds firstOnRow and utility. + * Usually firstOnRow is the first KeyValue we find on next row rather than + * the absolute minimal first key (empty column, Type.Maximum, maximum ts). + * Usually its ok being sloppy with firstOnRow letting it be the first thing + * found on next row -- this works -- but if the memstore changes on us, reset + * firstOnRow to be the ultimate firstOnRow. We play sloppy with firstOnRow + * usually so we don't have to allocate a new KeyValue each time firstOnRow + * is updated. + */ + private static class FirstOnRow { + private KeyValue firstOnRow = null; + + FirstOnRow() { + super(); + } + + synchronized void set(final KeyValue kv) { + this.firstOnRow = kv; + } + + /* Reset firstOnRow to a 'clean', absolute firstOnRow. + */ + synchronized void reset() { + if (this.firstOnRow == null) return; + this.firstOnRow = + new KeyValue(this.firstOnRow.getRow(), HConstants.LATEST_TIMESTAMP); + } + + synchronized KeyValue get() { + return this.firstOnRow; + } + } + public final static long FIXED_OVERHEAD = ClassSize.align( - ClassSize.OBJECT + (7 * ClassSize.REFERENCE)); + ClassSize.OBJECT + (8 * ClassSize.REFERENCE)); public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG + + ClassSize.COPYONWRITE_ARRAYSET + ClassSize.COPYONWRITE_ARRAYLIST + (2 * ClassSize.CONCURRENT_SKIPLISTMAP)); /* @@ -682,4 +769,16 @@ public class MemStore implements HeapSize { } LOG.info("Exiting."); } -} + + /** + * Observers want to know about MemStore changes. + * Called when snapshot is cleared and when we make one. + */ + interface ChangedMemStoreObserver { + /** + * Notify observers. + * @throws IOException + */ + void changedMemStore(); + } +} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/util/ClassSize.java b/src/java/org/apache/hadoop/hbase/util/ClassSize.java index a8cf26d78a7..81f4bfff4d5 100755 --- a/src/java/org/apache/hadoop/hbase/util/ClassSize.java +++ b/src/java/org/apache/hadoop/hbase/util/ClassSize.java @@ -92,6 +92,12 @@ public class ClassSize { /** Overhead for AtomicBoolean */ public static int ATOMIC_BOOLEAN = 0; + /** Overhead for CopyOnWriteArraySet */ + public static int COPYONWRITE_ARRAYSET = 0; + + /** Overhead for CopyOnWriteArrayList */ + public static int COPYONWRITE_ARRAYLIST = 0; + private static final String THIRTY_TWO = "32"; /** @@ -151,6 +157,9 @@ public class ClassSize { ATOMIC_BOOLEAN = align(OBJECT + Bytes.SIZEOF_BOOLEAN); + COPYONWRITE_ARRAYSET = align(OBJECT + REFERENCE); + + COPYONWRITE_ARRAYLIST = align(OBJECT + (2 * REFERENCE) + ARRAY); } /** diff --git a/src/test/org/apache/hadoop/hbase/io/TestHeapSize.java b/src/test/org/apache/hadoop/hbase/io/TestHeapSize.java index 39d3d6336c0..88de39d3683 100644 --- a/src/test/org/apache/hadoop/hbase/io/TestHeapSize.java +++ b/src/test/org/apache/hadoop/hbase/io/TestHeapSize.java @@ -6,6 +6,8 @@ import java.util.ArrayList; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -164,6 +166,25 @@ public class TestHeapSize extends TestCase { assertEquals(expected, actual); } + // CopyOnWriteArraySet + cl = CopyOnWriteArraySet.class; + expected = ClassSize.estimateBase(cl, false); + actual = ClassSize.COPYONWRITE_ARRAYSET; + if(expected != actual) { + ClassSize.estimateBase(cl, true); + assertEquals(expected, actual); + } + + // CopyOnWriteArrayList + cl = CopyOnWriteArrayList.class; + expected = ClassSize.estimateBase(cl, false); + actual = ClassSize.COPYONWRITE_ARRAYLIST; + if(expected != actual) { + ClassSize.estimateBase(cl, true); + assertEquals(expected, actual); + } + + } /** @@ -240,11 +261,15 @@ public class TestHeapSize extends TestCase { expected += ClassSize.estimateBase(AtomicLong.class, false); expected += ClassSize.estimateBase(ConcurrentSkipListMap.class, false); expected += ClassSize.estimateBase(ConcurrentSkipListMap.class, false); + expected += ClassSize.estimateBase(CopyOnWriteArraySet.class, false); + expected += ClassSize.estimateBase(CopyOnWriteArrayList.class, false); if(expected != actual) { ClassSize.estimateBase(cl, true); ClassSize.estimateBase(ReentrantReadWriteLock.class, true); ClassSize.estimateBase(AtomicLong.class, true); ClassSize.estimateBase(ConcurrentSkipListMap.class, true); + ClassSize.estimateBase(CopyOnWriteArraySet.class, true); + ClassSize.estimateBase(CopyOnWriteArrayList.class, true); assertEquals(expected, actual); } diff --git a/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java b/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java index 37df316436f..1e602e7affa 100644 --- a/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java +++ b/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java @@ -90,6 +90,10 @@ public class TestMemStore extends TestCase { s.close(); } assertEquals(rowCount, count); + for (int i = 0; i < memstorescanners.length; i++) { + memstorescanners[0].close(); + } + memstorescanners = this.memstore.getScanners(); // Now assert can count same number even if a snapshot mid-scan. s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP, this.memstore.comparator, null, memstorescanners); @@ -112,6 +116,10 @@ public class TestMemStore extends TestCase { s.close(); } assertEquals(rowCount, count); + for (int i = 0; i < memstorescanners.length; i++) { + memstorescanners[0].close(); + } + memstorescanners = this.memstore.getScanners(); // Assert that new values are seen in kvset as we scan. long ts = System.currentTimeMillis(); s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP, @@ -124,8 +132,7 @@ public class TestMemStore extends TestCase { // Assert the stuff is coming out in right order. assertTrue(Bytes.compareTo(Bytes.toBytes(count), result.get(0).getRow()) == 0); // Row count is same as column count. - // TODO PUTBACK assertEquals("count=" + count + ", result=" + result, - // rowCount, result.size()); + assertEquals("count=" + count + ", result=" + result, rowCount, result.size()); count++; if (count == snapshotIndex) { this.memstore.snapshot(); @@ -407,8 +414,7 @@ public class TestMemStore extends TestCase { assertEquals(expected.get(i), result.get(i)); } } - - + ////////////////////////////////////////////////////////////////////////////// // Delete tests ////////////////////////////////////////////////////////////////////////////// @@ -637,4 +643,4 @@ public class TestMemStore extends TestCase { return new KeyValue(row, Bytes.toBytes("test_col:"), HConstants.LATEST_TIMESTAMP, value); } -} \ No newline at end of file +}