diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java index 0bc75e7d078..4421ac5a08c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java @@ -30,9 +30,16 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; */ @InterfaceAudience.Private public interface ChangedReadersObserver { + + /** + * @return the read point of the current scan + */ + long getReadPoint(); /** * Notify observers. + * @param sfs The new files + * @param memStoreScanners scanner of current memstore * @throws IOException e */ - void updateReaders(List sfs) throws IOException; + void updateReaders(List sfs, List memStoreScanners) throws IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index 7b7446ab2f0..b1e9f325e42 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -71,38 +71,26 @@ import com.google.common.annotations.VisibleForTesting; @InterfaceAudience.Private public class DefaultMemStore implements MemStore { private static final Log LOG = LogFactory.getLog(DefaultMemStore.class); + @VisibleForTesting static final String USEMSLAB_KEY = "hbase.hregion.memstore.mslab.enabled"; private static final boolean USEMSLAB_DEFAULT = true; - static final String MSLAB_CLASS_NAME = "hbase.regionserver.mslab.class"; + private static final String MSLAB_CLASS_NAME = "hbase.regionserver.mslab.class"; private Configuration conf; - // MemStore. Use a CellSkipListSet rather than SkipListSet because of the - // better semantics. The Map will overwrite if passed a key it already had - // whereas the Set will not add new Cell if key is same though value might be - // different. Value is not important -- just make sure always same - // reference passed. - volatile CellSkipListSet cellSet; - - // Snapshot of memstore. Made for flusher. - volatile CellSkipListSet snapshot; - + @VisibleForTesting final KeyValue.KVComparator comparator; - // Used to track own heapSize - final AtomicLong size; - private volatile long snapshotSize; - // Used to track when to flush - volatile long timeOfOldestEdit = Long.MAX_VALUE; + private volatile long timeOfOldestEdit = Long.MAX_VALUE; - TimeRangeTracker timeRangeTracker; - TimeRangeTracker snapshotTimeRangeTracker; + private volatile long snapshotId; + private volatile boolean tagsPresent; - volatile MemStoreLAB allocator; - volatile MemStoreLAB snapshotAllocator; - volatile long snapshotId; - volatile boolean tagsPresent; + @VisibleForTesting + volatile Section activeSection; + @VisibleForTesting + volatile Section snapshotSection; /** * Default constructor. Used for tests. @@ -119,28 +107,8 @@ public class DefaultMemStore implements MemStore { final KeyValue.KVComparator c) { this.conf = conf; this.comparator = c; - this.cellSet = new CellSkipListSet(c); - this.snapshot = new CellSkipListSet(c); - timeRangeTracker = new TimeRangeTracker(); - snapshotTimeRangeTracker = new TimeRangeTracker(); - this.size = new AtomicLong(DEEP_OVERHEAD); - this.snapshotSize = 0; - if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) { - String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName()); - this.allocator = ReflectionUtils.instantiateWithCustomCtor(className, - new Class[] { Configuration.class }, new Object[] { conf }); - } else { - this.allocator = null; - } - } - - void dump() { - for (Cell cell: this.cellSet) { - LOG.info(cell); - } - for (Cell cell: this.snapshot) { - LOG.info(cell); - } + this.activeSection = Section.newActiveSection(comparator, conf); + this.snapshotSection = Section.newSnapshotSection(comparator); } /** @@ -151,31 +119,22 @@ public class DefaultMemStore implements MemStore { public MemStoreSnapshot snapshot() { // If snapshot currently has entries, then flusher failed or didn't call // cleanup. Log a warning. - if (!this.snapshot.isEmpty()) { + if (!snapshotSection.getCellSkipListSet().isEmpty()) { LOG.warn("Snapshot called again without clearing previous. " + "Doing nothing. Another ongoing flush or did we fail last attempt?"); } else { this.snapshotId = EnvironmentEdgeManager.currentTime(); - this.snapshotSize = keySize(); - if (!this.cellSet.isEmpty()) { - this.snapshot = this.cellSet; - this.cellSet = new CellSkipListSet(this.comparator); - this.snapshotTimeRangeTracker = this.timeRangeTracker; - this.timeRangeTracker = new TimeRangeTracker(); - // Reset heap to not include any keys - this.size.set(DEEP_OVERHEAD); - this.snapshotAllocator = this.allocator; - // Reset allocator so we get a fresh buffer for the new memstore - if (allocator != null) { - String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName()); - this.allocator = ReflectionUtils.instantiateWithCustomCtor(className, - new Class[] { Configuration.class }, new Object[] { conf }); - } + if (!activeSection.getCellSkipListSet().isEmpty()) { + snapshotSection = activeSection; + activeSection = Section.newActiveSection(comparator, conf); + snapshotSection.getHeapSize().addAndGet(-DEEP_OVERHEAD); timeOfOldestEdit = Long.MAX_VALUE; } } - MemStoreSnapshot memStoreSnapshot = new MemStoreSnapshot(this.snapshotId, snapshot.size(), this.snapshotSize, - this.snapshotTimeRangeTracker, new CollectionBackedScanner(snapshot, this.comparator), + MemStoreSnapshot memStoreSnapshot = new MemStoreSnapshot(this.snapshotId, + snapshotSection.getCellSkipListSet().size(), snapshotSection.getHeapSize().get(), + snapshotSection.getTimeRangeTracker(), + new CollectionBackedScanner(snapshotSection.getCellSkipListSet(), this.comparator), this.tagsPresent); this.tagsPresent = false; return memStoreSnapshot; @@ -189,37 +148,29 @@ public class DefaultMemStore implements MemStore { */ @Override public void clearSnapshot(long id) throws UnexpectedStateException { - MemStoreLAB tmpAllocator = null; if (this.snapshotId == -1) return; // already cleared if (this.snapshotId != id) { throw new UnexpectedStateException("Current snapshot id is " + this.snapshotId + ",passed " + id); } - // OK. Passed in snapshot is same as current snapshot. If not-empty, - // create a new snapshot and let the old one go. - if (!this.snapshot.isEmpty()) { - this.snapshot = new CellSkipListSet(this.comparator); - this.snapshotTimeRangeTracker = new TimeRangeTracker(); - } - this.snapshotSize = 0; - this.snapshotId = -1; - if (this.snapshotAllocator != null) { - tmpAllocator = this.snapshotAllocator; - this.snapshotAllocator = null; - } + // OK. Passed in snapshot is same as current snapshot. + MemStoreLAB tmpAllocator = snapshotSection.getMemStoreLAB(); + snapshotSection = Section.newSnapshotSection(comparator); if (tmpAllocator != null) { tmpAllocator.close(); } + this.snapshotId = -1; } @Override public long getFlushableSize() { - return this.snapshotSize > 0 ? this.snapshotSize : keySize(); + long snapshotSize = snapshotSection.getHeapSize().get(); + return snapshotSize > 0 ? snapshotSize : keySize(); } @Override public long getSnapshotSize() { - return this.snapshotSize; + return snapshotSection.getHeapSize().get(); } /** @@ -249,7 +200,7 @@ public class DefaultMemStore implements MemStore { } private boolean addToCellSet(Cell e) { - boolean b = this.cellSet.add(e); + boolean b = this.activeSection.getCellSkipListSet().add(e); // In no tags case this NoTagsKeyValue.getTagsLength() is a cheap call. // When we use ACL CP or Visibility CP which deals with Tags during // mutation, the TagRewriteCell.getTagsLength() is a cheaper call. We do not @@ -262,7 +213,7 @@ public class DefaultMemStore implements MemStore { } private boolean removeFromCellSet(Cell e) { - boolean b = this.cellSet.remove(e); + boolean b = this.activeSection.getCellSkipListSet().remove(e); setOldestEditTimeToNow(); return b; } @@ -291,8 +242,8 @@ public class DefaultMemStore implements MemStore { if (!notPresent && mslabUsed) { s += getCellLength(toAdd); } - timeRangeTracker.includeTimestamp(toAdd); - this.size.addAndGet(s); + activeSection.getTimeRangeTracker().includeTimestamp(toAdd); + activeSection.getHeapSize().addAndGet(s); return s; } @@ -305,12 +256,12 @@ public class DefaultMemStore implements MemStore { } private Cell maybeCloneWithAllocator(Cell cell) { - if (allocator == null) { + if (activeSection.getMemStoreLAB() == null) { return cell; } int len = getCellLength(cell); - ByteRange alloc = allocator.allocateBytes(len); + ByteRange alloc = activeSection.getMemStoreLAB().allocateBytes(len); if (alloc == null) { // The allocation was too large, allocator decided // not to do anything with it. @@ -338,18 +289,19 @@ public class DefaultMemStore implements MemStore { // not the snapshot. The flush of this snapshot to disk has not // yet started because Store.flush() waits for all rwcc transactions to // commit before starting the flush to disk. - Cell found = this.snapshot.get(cell); + Cell found = snapshotSection.getCellSkipListSet().get(cell); if (found != null && found.getSequenceId() == cell.getSequenceId()) { - this.snapshot.remove(cell); + snapshotSection.getCellSkipListSet().remove(cell); long sz = heapSizeChange(cell, true); - this.snapshotSize -= sz; + snapshotSection.getHeapSize().addAndGet(-sz); } + // If the key is in the memstore, delete it. Update this.size. - found = this.cellSet.get(cell); + found = activeSection.getCellSkipListSet().get(cell); if (found != null && found.getSequenceId() == cell.getSequenceId()) { removeFromCellSet(found); - long s = heapSizeChange(found, true); - this.size.addAndGet(-s); + long sz = heapSizeChange(found, true); + activeSection.getHeapSize().addAndGet(-sz); } } @@ -371,7 +323,8 @@ public class DefaultMemStore implements MemStore { * @return Next row or null if none found. */ Cell getNextRow(final Cell cell) { - return getLowest(getNextRow(cell, this.cellSet), getNextRow(cell, this.snapshot)); + return getLowest(getNextRow(cell, activeSection.getCellSkipListSet()), + getNextRow(cell, snapshotSection.getCellSkipListSet())); } /* @@ -416,8 +369,8 @@ public class DefaultMemStore implements MemStore { */ @Override public void getRowKeyAtOrBefore(final GetClosestRowBeforeTracker state) { - getRowKeyAtOrBefore(cellSet, state); - getRowKeyAtOrBefore(snapshot, state); + getRowKeyAtOrBefore(activeSection.getCellSkipListSet(), state); + getRowKeyAtOrBefore(snapshotSection.getCellSkipListSet(), state); } /* @@ -515,7 +468,7 @@ public class DefaultMemStore implements MemStore { long now) { Cell firstCell = KeyValueUtil.createFirstOnRow(row, family, qualifier); // Is there a Cell in 'snapshot' with the same TS? If so, upgrade the timestamp a bit. - SortedSet snSs = snapshot.tailSet(firstCell); + SortedSet snSs = snapshotSection.getCellSkipListSet().tailSet(firstCell); if (!snSs.isEmpty()) { Cell snc = snSs.first(); // is there a matching Cell in the snapshot? @@ -533,7 +486,7 @@ public class DefaultMemStore implements MemStore { // so we cant add the new Cell w/o knowing what's there already, but we also // want to take this chance to delete some cells. So two loops (sad) - SortedSet ss = cellSet.tailSet(firstCell); + SortedSet ss = activeSection.getCellSkipListSet().tailSet(firstCell); for (Cell cell : ss) { // if this isnt the row we are interested in, then bail: if (!CellUtil.matchingColumn(cell, family, qualifier) @@ -612,7 +565,7 @@ public class DefaultMemStore implements MemStore { cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); - SortedSet ss = cellSet.tailSet(firstCell); + SortedSet ss = activeSection.getCellSkipListSet().tailSet(firstCell); Iterator it = ss.iterator(); // versions visible to oldest scanner int versionsVisible = 0; @@ -635,7 +588,7 @@ public class DefaultMemStore implements MemStore { // false means there was a change, so give us the size. long delta = heapSizeChange(cur, true); addedSize -= delta; - this.size.addAndGet(-delta); + activeSection.getHeapSize().addAndGet(-delta); if (removedCells != null) { removedCells.add(cur); } @@ -694,7 +647,8 @@ public class DefaultMemStore implements MemStore { */ @Override public List getScanners(long readPt) { - return Collections. singletonList(new MemStoreScanner(readPt)); + return Collections. singletonList( + new MemStoreScanner(activeSection, snapshotSection, readPt, comparator)); } /** @@ -705,14 +659,29 @@ public class DefaultMemStore implements MemStore { * @return False if the key definitely does not exist in this Memstore */ public boolean shouldSeek(Scan scan, Store store, long oldestUnexpiredTS) { + return shouldSeek(activeSection.getTimeRangeTracker(), + snapshotSection.getTimeRangeTracker(), scan, store, oldestUnexpiredTS); + } + + /** + * Check if this memstore may contain the required keys + * @param activeTimeRangeTracker the tracker of active data + * @param snapshotTimeRangeTracker the tracker of snapshot data + * @param scan scan + * @param store holds reference to cf + * @param oldestUnexpiredTS + * @return False if the key definitely does not exist in this Memstore + */ + private static boolean shouldSeek(TimeRangeTracker activeTimeRangeTracker, + TimeRangeTracker snapshotTimeRangeTracker, Scan scan, Store store, long oldestUnexpiredTS) { byte[] cf = store.getFamily().getName(); TimeRange timeRange = scan.getColumnFamilyTimeRange().get(cf); if (timeRange == null) { timeRange = scan.getTimeRange(); } - return (timeRangeTracker.includesTimeRange(timeRange) || + return (activeTimeRangeTracker.includesTimeRange(timeRange) || snapshotTimeRangeTracker.includesTimeRange(timeRange)) && - (Math.max(timeRangeTracker.getMax(), snapshotTimeRangeTracker.getMax()) >= oldestUnexpiredTS); + (Math.max(activeTimeRangeTracker.getMax(), snapshotTimeRangeTracker.getMax()) >= oldestUnexpiredTS); } /* @@ -721,7 +690,7 @@ public class DefaultMemStore implements MemStore { * map and snapshot. * This behaves as if it were a real scanner but does not maintain position. */ - protected class MemStoreScanner extends NonLazyKeyValueScanner { + protected static class MemStoreScanner extends NonLazyKeyValueScanner { // Next row information for either cellSet or snapshot private Cell cellSetNextRow = null; private Cell snapshotNextRow = null; @@ -735,22 +704,18 @@ public class DefaultMemStore implements MemStore { private Iterator snapshotIt; // The cellSet and snapshot at the time of creating this scanner - private CellSkipListSet cellSetAtCreation; - private CellSkipListSet snapshotAtCreation; + private final Section activeAtCreation; + private final Section snapshotAtCreation; // the pre-calculated Cell to be returned by peek() or next() private Cell theNext; - // The allocator and snapshot allocator at the time of creating this scanner - volatile MemStoreLAB allocatorAtCreation; - volatile MemStoreLAB snapshotAllocatorAtCreation; - // A flag represents whether could stop skipping Cells for MVCC // if have encountered the next row. Only used for reversed scan private boolean stopSkippingCellsIfNextRow = false; - private long readPoint; - + private final long readPoint; + private final KeyValue.KVComparator comparator; /* Some notes... @@ -772,19 +737,16 @@ public class DefaultMemStore implements MemStore { the adds to kvset in the MemStoreScanner. */ - MemStoreScanner(long readPoint) { - super(); - + MemStoreScanner(Section activeSection, Section snapshotSection, long readPoint, final KeyValue.KVComparator c) { this.readPoint = readPoint; - cellSetAtCreation = cellSet; - snapshotAtCreation = snapshot; - if (allocator != null) { - this.allocatorAtCreation = allocator; - this.allocatorAtCreation.incScannerCount(); + this.comparator = c; + activeAtCreation = activeSection; + snapshotAtCreation = snapshotSection; + if (activeAtCreation.getMemStoreLAB() != null) { + activeAtCreation.getMemStoreLAB().incScannerCount(); } - if (snapshotAllocator != null) { - this.snapshotAllocatorAtCreation = snapshotAllocator; - this.snapshotAllocatorAtCreation.incScannerCount(); + if (snapshotAtCreation.getMemStoreLAB() != null) { + snapshotAtCreation.getMemStoreLAB().incScannerCount(); } if (Trace.isTracing() && Trace.currentSpan() != null) { Trace.currentSpan().addTimelineAnnotation("Creating MemStoreScanner"); @@ -839,8 +801,8 @@ public class DefaultMemStore implements MemStore { } // kvset and snapshot will never be null. // if tailSet can't find anything, SortedSet is empty (not null). - cellSetIt = cellSetAtCreation.tailSet(key).iterator(); - snapshotIt = snapshotAtCreation.tailSet(key).iterator(); + cellSetIt = activeAtCreation.getCellSkipListSet().tailSet(key).iterator(); + snapshotIt = snapshotAtCreation.getCellSkipListSet().tailSet(key).iterator(); cellSetItRow = null; snapshotItRow = null; @@ -882,8 +844,8 @@ public class DefaultMemStore implements MemStore { get it. So we remember the last keys we iterated to and restore the reseeked set to at least that point. */ - cellSetIt = cellSetAtCreation.tailSet(getHighest(key, cellSetItRow)).iterator(); - snapshotIt = snapshotAtCreation.tailSet(getHighest(key, snapshotItRow)).iterator(); + cellSetIt = activeAtCreation.getCellSkipListSet().tailSet(getHighest(key, cellSetItRow)).iterator(); + snapshotIt = snapshotAtCreation.getCellSkipListSet().tailSet(getHighest(key, snapshotItRow)).iterator(); return seekInSubLists(key); } @@ -951,20 +913,19 @@ public class DefaultMemStore implements MemStore { return (first != null ? first : second); } + @Override public synchronized void close() { this.cellSetNextRow = null; this.snapshotNextRow = null; this.cellSetIt = null; this.snapshotIt = null; - - if (allocatorAtCreation != null) { - this.allocatorAtCreation.decScannerCount(); - this.allocatorAtCreation = null; + + if (activeAtCreation != null && activeAtCreation.getMemStoreLAB() != null) { + activeAtCreation.getMemStoreLAB().decScannerCount(); } - if (snapshotAllocatorAtCreation != null) { - this.snapshotAllocatorAtCreation.decScannerCount(); - this.snapshotAllocatorAtCreation = null; + if (snapshotAtCreation != null && snapshotAtCreation.getMemStoreLAB() != null) { + snapshotAtCreation.getMemStoreLAB().decScannerCount(); } this.cellSetItRow = null; @@ -983,7 +944,8 @@ public class DefaultMemStore implements MemStore { @Override public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) { - return shouldSeek(scan, store, oldestUnexpiredTS); + return shouldSeek(activeAtCreation.getTimeRangeTracker(), + snapshotAtCreation.getTimeRangeTracker(), scan, store, oldestUnexpiredTS); } /** @@ -1012,9 +974,9 @@ public class DefaultMemStore implements MemStore { do { Cell firstKeyOnRow = KeyValueUtil.createFirstOnRow(key.getRowArray(), key.getRowOffset(), key.getRowLength()); - SortedSet cellHead = cellSetAtCreation.headSet(firstKeyOnRow); + SortedSet cellHead = activeAtCreation.getCellSkipListSet().headSet(firstKeyOnRow); Cell cellSetBeforeRow = cellHead.isEmpty() ? null : cellHead.last(); - SortedSet snapshotHead = snapshotAtCreation + SortedSet snapshotHead = snapshotAtCreation.getCellSkipListSet() .headSet(firstKeyOnRow); Cell snapshotBeforeRow = snapshotHead.isEmpty() ? null : snapshotHead .last(); @@ -1042,10 +1004,10 @@ public class DefaultMemStore implements MemStore { @Override public synchronized boolean seekToLastRow() { - Cell first = cellSetAtCreation.isEmpty() ? null : cellSetAtCreation - .last(); - Cell second = snapshotAtCreation.isEmpty() ? null - : snapshotAtCreation.last(); + Cell first = activeAtCreation.getCellSkipListSet().isEmpty() ? null + : activeAtCreation.getCellSkipListSet().last(); + Cell second = snapshotAtCreation.getCellSkipListSet().isEmpty() ? null + : snapshotAtCreation.getCellSkipListSet().last(); Cell higherCell = getHighest(first, second); if (higherCell == null) { return false; @@ -1062,10 +1024,10 @@ public class DefaultMemStore implements MemStore { } public final static long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT - + (9 * ClassSize.REFERENCE) + (3 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN); + + (4 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN); public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + - ClassSize.ATOMIC_LONG + (2 * ClassSize.TIMERANGE_TRACKER) + + (2 * ClassSize.ATOMIC_LONG) + (2 * ClassSize.TIMERANGE_TRACKER) + (2 * ClassSize.CELL_SKIPLIST_SET) + (2 * ClassSize.CONCURRENT_SKIPLISTMAP)); /* @@ -1090,7 +1052,7 @@ public class DefaultMemStore implements MemStore { */ @Override public long heapSize() { - return size.get(); + return activeSection.getHeapSize().get(); } @Override @@ -1140,4 +1102,63 @@ public class DefaultMemStore implements MemStore { LOG.info("Exiting."); } + /** + * Contains the fields which are useful to MemStoreScanner. + */ + @InterfaceAudience.Private + @VisibleForTesting + static class Section { + /** + * MemStore. Use a CellSkipListSet rather than SkipListSet because of the + * better semantics. The Map will overwrite if passed a key it already had + * whereas the Set will not add new Cell if key is same though value might be + * different. Value is not important -- just make sure always same reference passed. + */ + private final CellSkipListSet cellSet; + private final TimeRangeTracker tracker = new TimeRangeTracker(); + /** + * Used to track own heapSize. + */ + private final AtomicLong heapSize; + private final MemStoreLAB allocator; + + static Section newSnapshotSection(final KeyValue.KVComparator c) { + return new Section(c, null, 0); + } + + static Section newActiveSection(final KeyValue.KVComparator c, + final Configuration conf) { + return new Section(c, conf, DEEP_OVERHEAD); + } + + private Section(final KeyValue.KVComparator c, + final Configuration conf, long initHeapSize) { + this.cellSet = new CellSkipListSet(c); + this.heapSize = new AtomicLong(initHeapSize); + if (conf != null && conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) { + String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName()); + this.allocator = ReflectionUtils.instantiateWithCustomCtor(className, + new Class[]{Configuration.class}, new Object[]{conf}); + } else { + this.allocator = null; + } + } + + CellSkipListSet getCellSkipListSet() { + return cellSet; + } + + TimeRangeTracker getTimeRangeTracker() { + return tracker; + } + + AtomicLong getHeapSize() { + return heapSize; + } + + MemStoreLAB getMemStoreLAB() { + return allocator; + } + + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 4032a195196..c211736c1a8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1152,7 +1152,14 @@ public class HStore implements Store { */ private void notifyChangedReadersObservers(List sfs) throws IOException { for (ChangedReadersObserver o : this.changedReaderObservers) { - o.updateReaders(sfs); + List memStoreScanners; + this.lock.readLock().lock(); + try { + memStoreScanners = this.memstore.getScanners(o.getReadPoint()); + } finally { + this.lock.readLock().unlock(); + } + o.updateReaders(sfs, memStoreScanners); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index fa33326911a..05cfe24d77a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -50,8 +50,8 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.regionserver.querymatcher.CompactionScanQueryMatcher; import org.apache.hadoop.hbase.regionserver.querymatcher.LegacyScanQueryMatcher; import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; -import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode; import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher; +import org.apache.hadoop.hbase.util.CollectionUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** @@ -128,9 +128,11 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // Indicates whether there was flush during the course of the scan private volatile boolean flushed = false; // generally we get one file from a flush - private List flushedStoreFiles = new ArrayList(1); + private final List flushedStoreFiles = new ArrayList(1); + // generally we get one memstroe scanner from a flush + private final List memStoreScannersAfterFlush = new ArrayList<>(1); // The current list of scanners - private List currentScanners = new ArrayList(); + private final List currentScanners = new ArrayList(); // flush update lock private ReentrantLock flushLock = new ReentrantLock(); @@ -453,6 +455,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner public void close() { if (this.closing) return; this.closing = true; + clearAndClose(memStoreScannersAfterFlush); // Under test, we dont have a this.store if (this.store != null) this.store.deleteChangedReaderObserver(this); @@ -770,13 +773,33 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner return true; } + @Override + public long getReadPoint() { + return readPt; + } + + private static void clearAndClose(List scanners) { + for (KeyValueScanner s : scanners) { + s.close(); + } + scanners.clear(); + } + // Implementation of ChangedReadersObserver @Override - public void updateReaders(List sfs) throws IOException { - flushed = true; + public void updateReaders(List sfs, List memStoreScanners) throws IOException { + if (CollectionUtils.isEmpty(sfs) + && CollectionUtils.isEmpty(memStoreScanners)) { + return; + } flushLock.lock(); try { + flushed = true; flushedStoreFiles.addAll(sfs); + if (!CollectionUtils.isEmpty(memStoreScanners)) { + clearAndClose(memStoreScannersAfterFlush); + memStoreScannersAfterFlush.addAll(memStoreScanners); + } } finally { flushLock.unlock(); } @@ -836,12 +859,16 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner final boolean isCompaction = false; boolean usePread = get || scanUsePread; List scanners = null; + flushLock.lock(); try { - flushLock.lock(); - scanners = selectScannersFrom(store.getScanners(flushedStoreFiles, cacheBlocks, get, usePread, - isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, true)); + List allScanners = new ArrayList<>(flushedStoreFiles.size() + memStoreScannersAfterFlush.size()); + allScanners.addAll(store.getScanners(flushedStoreFiles, cacheBlocks, get, usePread, + isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, false)); + allScanners.addAll(memStoreScannersAfterFlush); + scanners = selectScannersFrom(allScanners); // Clear the current set of flushed store files so that they don't get added again flushedStoreFiles.clear(); + memStoreScannersAfterFlush.clear(); } finally { flushLock.unlock(); } @@ -851,7 +878,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // remove the older memstore scanner for (int i = 0; i < currentScanners.size(); i++) { if (!currentScanners.get(i).isFileScanner()) { - currentScanners.remove(i); + currentScanners.remove(i).close(); break; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java index 27dd96cf600..1ea65fa1878 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java @@ -304,13 +304,14 @@ public class TestHeapSize { // DefaultMemStore Deep Overhead actual = DefaultMemStore.DEEP_OVERHEAD; expected = ClassSize.estimateBase(cl, false); - expected += ClassSize.estimateBase(AtomicLong.class, false); + expected += (2 * ClassSize.estimateBase(AtomicLong.class, false)); expected += (2 * ClassSize.estimateBase(CellSkipListSet.class, false)); expected += (2 * ClassSize.estimateBase(ConcurrentSkipListMap.class, false)); expected += (2 * ClassSize.estimateBase(TimeRangeTracker.class, false)); if(expected != actual) { ClassSize.estimateBase(cl, true); ClassSize.estimateBase(AtomicLong.class, true); + ClassSize.estimateBase(AtomicLong.class, true); ClassSize.estimateBase(CellSkipListSet.class, true); ClassSize.estimateBase(CellSkipListSet.class, true); ClassSize.estimateBase(ConcurrentSkipListMap.class, true); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index 7cb74b6e63a..6e53a64af27 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -87,8 +87,8 @@ public class TestDefaultMemStore extends TestCase { byte [] other = Bytes.toBytes("somethingelse"); KeyValue samekey = new KeyValue(bytes, bytes, bytes, other); this.memstore.add(samekey); - Cell found = this.memstore.cellSet.first(); - assertEquals(1, this.memstore.cellSet.size()); + Cell found = this.memstore.activeSection.getCellSkipListSet().first(); + assertEquals(1, this.memstore.activeSection.getCellSkipListSet().size()); assertTrue(Bytes.toString(found.getValue()), CellUtil.matchingValue(samekey, found)); } @@ -99,13 +99,13 @@ public class TestDefaultMemStore extends TestCase { long sizeChangeForSecondCell = this.memstore.add(kv); // make sure memstore size increase won't double-count MSLAB chunk size assertEquals(DefaultMemStore.heapSizeChange(kv, true), sizeChangeForFirstCell); - if (this.memstore.allocator != null) { + if (this.memstore.activeSection.getMemStoreLAB() != null) { // make sure memstore size increased when using MSLAB assertEquals(memstore.getCellLength(kv), sizeChangeForSecondCell); // make sure chunk size increased even when writing the same cell, if using MSLAB - if (this.memstore.allocator instanceof HeapMemStoreLAB) { + if (this.memstore.activeSection.getMemStoreLAB() instanceof HeapMemStoreLAB) { assertEquals(2 * memstore.getCellLength(kv), - ((HeapMemStoreLAB) this.memstore.allocator).getCurrentChunk().getNextFreeOffset()); + ((HeapMemStoreLAB) this.memstore.activeSection.getMemStoreLAB()).getCurrentChunk().getNextFreeOffset()); } } else { // make sure no memstore size change w/o MSLAB @@ -493,7 +493,7 @@ public class TestDefaultMemStore extends TestCase { for (int i = 0; i < snapshotCount; i++) { addRows(this.memstore); runSnapshot(this.memstore); - assertEquals("History not being cleared", 0, this.memstore.snapshot.size()); + assertEquals("History not being cleared", 0, this.memstore.snapshotSection.getCellSkipListSet().size()); } } @@ -514,7 +514,7 @@ public class TestDefaultMemStore extends TestCase { m.add(key2); assertTrue("Expected memstore to hold 3 values, actually has " + - m.cellSet.size(), m.cellSet.size() == 3); + m.activeSection.getCellSkipListSet().size(), m.activeSection.getCellSkipListSet().size() == 3); } ////////////////////////////////////////////////////////////////////////////// @@ -588,12 +588,12 @@ public class TestDefaultMemStore extends TestCase { memstore.add(new KeyValue(row, fam ,qf3, val)); //Creating a snapshot memstore.snapshot(); - assertEquals(3, memstore.snapshot.size()); + assertEquals(3, memstore.snapshotSection.getCellSkipListSet().size()); //Adding value to "new" memstore - assertEquals(0, memstore.cellSet.size()); + assertEquals(0, memstore.activeSection.getCellSkipListSet().size()); memstore.add(new KeyValue(row, fam ,qf4, val)); memstore.add(new KeyValue(row, fam ,qf5, val)); - assertEquals(2, memstore.cellSet.size()); + assertEquals(2, memstore.activeSection.getCellSkipListSet().size()); } ////////////////////////////////////////////////////////////////////////////// @@ -615,7 +615,7 @@ public class TestDefaultMemStore extends TestCase { memstore.add(put2); memstore.add(put3); - assertEquals(3, memstore.cellSet.size()); + assertEquals(3, memstore.activeSection.getCellSkipListSet().size()); KeyValue del2 = new KeyValue(row, fam, qf1, ts2, KeyValue.Type.Delete, val); memstore.delete(del2); @@ -626,9 +626,9 @@ public class TestDefaultMemStore extends TestCase { expected.add(put2); expected.add(put1); - assertEquals(4, memstore.cellSet.size()); + assertEquals(4, memstore.activeSection.getCellSkipListSet().size()); int i = 0; - for(Cell cell : memstore.cellSet) { + for(Cell cell : memstore.activeSection.getCellSkipListSet()) { assertEquals(expected.get(i++), cell); } } @@ -649,7 +649,7 @@ public class TestDefaultMemStore extends TestCase { memstore.add(put2); memstore.add(put3); - assertEquals(3, memstore.cellSet.size()); + assertEquals(3, memstore.activeSection.getCellSkipListSet().size()); KeyValue del2 = new KeyValue(row, fam, qf1, ts2, KeyValue.Type.DeleteColumn, val); @@ -662,9 +662,9 @@ public class TestDefaultMemStore extends TestCase { expected.add(put1); - assertEquals(4, memstore.cellSet.size()); + assertEquals(4, memstore.activeSection.getCellSkipListSet().size()); int i = 0; - for (Cell cell: memstore.cellSet) { + for (Cell cell: memstore.activeSection.getCellSkipListSet()) { assertEquals(expected.get(i++), cell); } } @@ -702,9 +702,9 @@ public class TestDefaultMemStore extends TestCase { - assertEquals(5, memstore.cellSet.size()); + assertEquals(5, memstore.activeSection.getCellSkipListSet().size()); int i = 0; - for (Cell cell: memstore.cellSet) { + for (Cell cell: memstore.activeSection.getCellSkipListSet()) { assertEquals(expected.get(i++), cell); } } @@ -718,8 +718,8 @@ public class TestDefaultMemStore extends TestCase { memstore.add(new KeyValue(row, fam, qf, ts, val)); KeyValue delete = new KeyValue(row, fam, qf, ts, KeyValue.Type.Delete, val); memstore.delete(delete); - assertEquals(2, memstore.cellSet.size()); - assertEquals(delete, memstore.cellSet.first()); + assertEquals(2, memstore.activeSection.getCellSkipListSet().size()); + assertEquals(delete, memstore.activeSection.getCellSkipListSet().first()); } public void testRetainsDeleteVersion() throws IOException { @@ -731,8 +731,8 @@ public class TestDefaultMemStore extends TestCase { "row1", "fam", "a", 100, KeyValue.Type.Delete, "dont-care"); memstore.delete(delete); - assertEquals(2, memstore.cellSet.size()); - assertEquals(delete, memstore.cellSet.first()); + assertEquals(2, memstore.activeSection.getCellSkipListSet().size()); + assertEquals(delete, memstore.activeSection.getCellSkipListSet().first()); } public void testRetainsDeleteColumn() throws IOException { // add a put to memstore @@ -743,8 +743,8 @@ public class TestDefaultMemStore extends TestCase { KeyValue.Type.DeleteColumn, "dont-care"); memstore.delete(delete); - assertEquals(2, memstore.cellSet.size()); - assertEquals(delete, memstore.cellSet.first()); + assertEquals(2, memstore.activeSection.getCellSkipListSet().size()); + assertEquals(delete, memstore.activeSection.getCellSkipListSet().first()); } public void testRetainsDeleteFamily() throws IOException { // add a put to memstore @@ -755,8 +755,8 @@ public class TestDefaultMemStore extends TestCase { KeyValue.Type.DeleteFamily, "dont-care"); memstore.delete(delete); - assertEquals(2, memstore.cellSet.size()); - assertEquals(delete, memstore.cellSet.first()); + assertEquals(2, memstore.activeSection.getCellSkipListSet().size()); + assertEquals(delete, memstore.activeSection.getCellSkipListSet().first()); } //////////////////////////////////// @@ -856,7 +856,7 @@ public class TestDefaultMemStore extends TestCase { public void testUpsertMemstoreSize() throws Exception { Configuration conf = HBaseConfiguration.create(); memstore = new DefaultMemStore(conf, KeyValue.COMPARATOR); - long oldSize = memstore.size.get(); + long oldSize = memstore.activeSection.getHeapSize().get(); List l = new ArrayList(); KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v"); @@ -867,18 +867,18 @@ public class TestDefaultMemStore extends TestCase { l.add(kv1); l.add(kv2); l.add(kv3); this.memstore.upsert(l, 2, null);// readpoint is 2 - long newSize = this.memstore.size.get(); + long newSize = this.memstore.activeSection.getHeapSize().get(); assert(newSize > oldSize); //The kv1 should be removed. - assert(memstore.cellSet.size() == 2); - + assert(memstore.activeSection.getCellSkipListSet().size() == 2); + KeyValue kv4 = KeyValueTestUtil.create("r", "f", "q", 104, "v"); kv4.setSequenceId(1); l.clear(); l.add(kv4); this.memstore.upsert(l, 3, null); - assertEquals(newSize, this.memstore.size.get()); + assertEquals(newSize, this.memstore.activeSection.getHeapSize().get()); //The kv2 should be removed. - assert(memstore.cellSet.size() == 2); + assert(memstore.activeSection.getCellSkipListSet().size() == 2); //this.memstore = null; } @@ -1039,10 +1039,10 @@ public class TestDefaultMemStore extends TestCase { private long runSnapshot(final DefaultMemStore hmc) throws UnexpectedStateException { // Save off old state. - int oldHistorySize = hmc.snapshot.size(); + int oldHistorySize = hmc.snapshotSection.getCellSkipListSet().size(); MemStoreSnapshot snapshot = hmc.snapshot(); // Make some assertions about what just happened. - assertTrue("History size has not increased", oldHistorySize < hmc.snapshot.size()); + assertTrue("History size has not increased", oldHistorySize < hmc.snapshotSection.getCellSkipListSet().size()); long t = memstore.timeOfOldestEdit(); assertTrue("Time of oldest edit is not Long.MAX_VALUE", t == Long.MAX_VALUE); hmc.clearSnapshot(snapshot.getId()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 3c9abfaaa6a..03a19262b19 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -2555,10 +2555,10 @@ public class TestHRegion { // This is kinda hacky, but better than nothing... long now = System.currentTimeMillis(); DefaultMemStore memstore = (DefaultMemStore) ((HStore) region.getStore(fam1)).memstore; - Cell firstCell = memstore.cellSet.first(); + Cell firstCell = memstore.activeSection.getCellSkipListSet().first(); assertTrue(firstCell.getTimestamp() <= now); now = firstCell.getTimestamp(); - for (Cell cell : memstore.cellSet) { + for (Cell cell : memstore.activeSection.getCellSkipListSet()) { assertTrue(cell.getTimestamp() <= now); now = cell.getTimestamp(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java index 4a72b4af049..f0262ed8123 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java @@ -116,13 +116,13 @@ public class TestMemStoreChunkPool { // Creating a snapshot MemStoreSnapshot snapshot = memstore.snapshot(); - assertEquals(3, memstore.snapshot.size()); + assertEquals(3, memstore.snapshotSection.getCellSkipListSet().size()); // Adding value to "new" memstore - assertEquals(0, memstore.cellSet.size()); + assertEquals(0, memstore.activeSection.getCellSkipListSet().size()); memstore.add(new KeyValue(row, fam, qf4, val)); memstore.add(new KeyValue(row, fam, qf5, val)); - assertEquals(2, memstore.cellSet.size()); + assertEquals(2, memstore.activeSection.getCellSkipListSet().size()); memstore.clearSnapshot(snapshot.getId()); int chunkCount = chunkPool.getPoolSize(); @@ -153,13 +153,13 @@ public class TestMemStoreChunkPool { // Creating a snapshot MemStoreSnapshot snapshot = memstore.snapshot(); - assertEquals(3, memstore.snapshot.size()); + assertEquals(3, memstore.snapshotSection.getCellSkipListSet().size()); // Adding value to "new" memstore - assertEquals(0, memstore.cellSet.size()); + assertEquals(0, memstore.activeSection.getCellSkipListSet().size()); memstore.add(new KeyValue(row, fam, qf4, val)); memstore.add(new KeyValue(row, fam, qf5, val)); - assertEquals(2, memstore.cellSet.size()); + assertEquals(2, memstore.activeSection.getCellSkipListSet().size()); // opening scanner before clear the snapshot List scanners = memstore.getScanners(0); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index 5b7f8e85064..1dc6ae5df4d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -19,24 +19,31 @@ package org.apache.hadoop.hbase.regionserver; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import com.google.common.collect.Lists; import java.io.IOException; import java.lang.ref.SoftReference; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.NavigableSet; +import java.util.TreeSet; import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -57,9 +64,9 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.KVComparator; import org.apache.hadoop.hbase.KeyValueUtil; -import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.CacheConfig; @@ -69,16 +76,18 @@ import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; +import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; -import org.apache.hadoop.hbase.wal.DefaultWALProvider; -import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; +import org.apache.hadoop.hbase.wal.DefaultWALProvider; +import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.util.Progressable; import org.junit.After; import org.junit.Assert; @@ -89,9 +98,6 @@ import org.junit.experimental.categories.Category; import org.junit.rules.TestName; import org.mockito.Mockito; -import com.google.common.collect.Lists; -import java.util.Arrays; - /** * Test class for the Store */ @@ -163,9 +169,14 @@ public class TestStore { init(methodName, conf, htd, hcd); } - @SuppressWarnings("deprecation") private Store init(String methodName, Configuration conf, HTableDescriptor htd, HColumnDescriptor hcd) throws IOException { + return init(methodName, conf, htd, hcd, null); + } + + @SuppressWarnings("deprecation") + private Store init(String methodName, Configuration conf, HTableDescriptor htd, + HColumnDescriptor hcd, MyScannerHook hook) throws IOException { //Setting up a Store Path basedir = new Path(DIR+methodName); Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName()); @@ -186,8 +197,11 @@ public class TestStore { final WALFactory wals = new WALFactory(walConf, null, methodName); HRegion region = new HRegion(tableDir, wals.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace()), fs, conf, info, htd, null); - - store = new HStore(region, hcd, conf); + if (hook == null) { + store = new HStore(region, hcd, conf); + } else { + store = new MyStore(region, hcd, conf, hook); + } return store; } @@ -576,7 +590,7 @@ public class TestStore { this.store.snapshot(); flushStore(store, id++); Assert.assertEquals(storeFilessize, this.store.getStorefiles().size()); - Assert.assertEquals(0, ((DefaultMemStore)this.store.memstore).cellSet.size()); + Assert.assertEquals(0, ((DefaultMemStore)this.store.memstore).activeSection.getCellSkipListSet().size()); } private void assertCheck() { @@ -621,7 +635,7 @@ public class TestStore { flushStore(store, id++); Assert.assertEquals(1, this.store.getStorefiles().size()); // from the one we inserted up there, and a new one - Assert.assertEquals(2, ((DefaultMemStore)this.store.memstore).cellSet.size()); + Assert.assertEquals(2, ((DefaultMemStore)this.store.memstore).activeSection.getCellSkipListSet().size()); // how many key/values for this row are there? Get get = new Get(row); @@ -690,7 +704,7 @@ public class TestStore { } long computedSize=0; - for (Cell cell : ((DefaultMemStore)this.store.memstore).cellSet) { + for (Cell cell : ((DefaultMemStore)this.store.memstore).activeSection.getCellSkipListSet()) { long kvsize = DefaultMemStore.heapSizeChange(cell, true); //System.out.println(kv + " size= " + kvsize + " kvsize= " + kv.heapSize()); computedSize += kvsize; @@ -722,7 +736,7 @@ public class TestStore { // then flush. flushStore(store, id++); Assert.assertEquals(1, this.store.getStorefiles().size()); - Assert.assertEquals(1, ((DefaultMemStore)this.store.memstore).cellSet.size()); + Assert.assertEquals(1, ((DefaultMemStore)this.store.memstore).activeSection.getCellSkipListSet().size()); // now increment again: newValue += 1; @@ -1136,4 +1150,113 @@ public class TestStore { //ensure that replaceStoreFiles is not called if files are not refreshed verify(spiedStore, times(0)).replaceStoreFiles(null, null); } -} + + private Cell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value) throws IOException { + Cell c = CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put.getCode(), value); + CellUtil.setSequenceId(c, sequenceId); + return c; + } + + @Test + public void testScanWithDoubleFlush() throws IOException { + Configuration conf = HBaseConfiguration.create(); + // Initialize region + MyStore myStore = initMyStore(name.getMethodName(), conf, new MyScannerHook() { + @Override + public void hook(final MyStore store) throws IOException { + final long tmpId = id++; + ExecutorService s = Executors.newSingleThreadExecutor(); + s.submit(new Runnable() { + @Override + public void run() { + try { + // flush the store before storescanner updates the scanners from store. + // The current data will be flushed into files and the memstore will + // be clear. + // -- phase (4/4) + flushStore(store, tmpId); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + }); + s.shutdown(); + try { + // wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers. + s.awaitTermination(500, TimeUnit.MILLISECONDS); + } catch (InterruptedException ex) { + } + } + }); + byte[] oldValue = Bytes.toBytes("oldValue"); + byte[] currentValue = Bytes.toBytes("currentValue"); + long ts = EnvironmentEdgeManager.currentTime(); + long seqId = 100; + // older data whihc shouldn't be "seen" by client + myStore.add(createCell(qf1, ts, seqId, oldValue)); + myStore.add(createCell(qf2, ts, seqId, oldValue)); + myStore.add(createCell(qf3, ts, seqId, oldValue)); + long snapshotId = id++; + // push older data into snapshot -- phase (1/4) + StoreFlushContext storeFlushCtx = store.createFlushContext(snapshotId); + storeFlushCtx.prepare(); + + // insert current data into active -- phase (2/4) + myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue)); + myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue)); + myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue)); + TreeSet quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); + quals.add(qf1); + quals.add(qf2); + quals.add(qf3); + try (InternalScanner scanner = (InternalScanner) myStore.getScanner( + new Scan(new Get(row)), quals, seqId + 1)) { + // complete the flush -- phase (3/4) + storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); + storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); + + List results = new ArrayList<>(); + scanner.next(results); + assertEquals(3, results.size()); + for (Cell c : results) { + byte[] actualValue = CellUtil.cloneValue(c); + assertTrue("expected:" + Bytes.toStringBinary(currentValue) + + ", actual:" + Bytes.toStringBinary(actualValue), + Bytes.equals(actualValue, currentValue)); + } + } + + } + + private MyStore initMyStore(String methodName, Configuration conf, MyScannerHook hook) throws IOException { + HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table)); + HColumnDescriptor hcd = new HColumnDescriptor(family); + hcd.setMaxVersions(5); + return (MyStore) init(methodName, conf, htd, hcd, hook); + } + + private static class MyStore extends HStore { + + private final MyScannerHook hook; + + MyStore(final HRegion region, final HColumnDescriptor family, + final Configuration confParam, MyScannerHook hook) throws IOException { + super(region, family, confParam); + this.hook = hook; + } + + @Override + public List getScanners(List files, boolean cacheBlocks, + boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, + byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner) throws IOException { + hook.hook(this); + return super.getScanners(files, cacheBlocks, isGet, usePread, + isCompaction, matcher, startRow, stopRow, readPt, includeMemstoreScanner); + } + } + + private interface MyScannerHook { + + void hook(MyStore store) throws IOException; + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java index e592321d055..1b257448486 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java @@ -24,6 +24,7 @@ import static org.apache.hadoop.hbase.regionserver.KeyValueScanFixture.scanFixtu import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.NavigableSet; import java.util.TreeSet; @@ -831,9 +832,9 @@ public class TestStoreScanner extends TestCase { // normally cause an NPE because scan.store is null. So as long as we get through these // two calls we are good and the bug was quashed. - scan.updateReaders(new ArrayList()); + scan.updateReaders(Collections.EMPTY_LIST, Collections.EMPTY_LIST); - scan.updateReaders(new ArrayList()); + scan.updateReaders(Collections.EMPTY_LIST, Collections.EMPTY_LIST); scan.peek(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java index 7e866323e40..e379e85f1d7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Random; @@ -125,7 +126,7 @@ public class TestWideScanner extends HBaseTestCase { ((HRegion.RegionScannerImpl)s).storeHeap.getHeap().iterator(); while (scanners.hasNext()) { StoreScanner ss = (StoreScanner)scanners.next(); - ss.updateReaders(new ArrayList()); + ss.updateReaders(Collections.EMPTY_LIST, Collections.EMPTY_LIST); } } while (more);