HBASE-17887 Row-level consistency is broken for read

This commit is contained in:
Chia-Ping Tsai 2017-05-12 19:44:16 +08:00
parent 51cb53776d
commit f81486445c
11 changed files with 401 additions and 213 deletions

View File

@ -30,9 +30,16 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
*/
@InterfaceAudience.Private
public interface ChangedReadersObserver {
/**
* @return the read point of the current scan
*/
long getReadPoint();
/**
* Notify observers.
* @param sfs The new files
* @param memStoreScanners scanner of current memstore
* @throws IOException e
*/
void updateReaders(List<StoreFile> sfs) throws IOException;
void updateReaders(List<StoreFile> sfs, List<KeyValueScanner> memStoreScanners) throws IOException;
}

View File

@ -71,38 +71,26 @@ import com.google.common.annotations.VisibleForTesting;
@InterfaceAudience.Private
public class DefaultMemStore implements MemStore {
private static final Log LOG = LogFactory.getLog(DefaultMemStore.class);
@VisibleForTesting
static final String USEMSLAB_KEY = "hbase.hregion.memstore.mslab.enabled";
private static final boolean USEMSLAB_DEFAULT = true;
static final String MSLAB_CLASS_NAME = "hbase.regionserver.mslab.class";
private static final String MSLAB_CLASS_NAME = "hbase.regionserver.mslab.class";
private Configuration conf;
// MemStore. Use a CellSkipListSet rather than SkipListSet because of the
// better semantics. The Map will overwrite if passed a key it already had
// whereas the Set will not add new Cell if key is same though value might be
// different. Value is not important -- just make sure always same
// reference passed.
volatile CellSkipListSet cellSet;
// Snapshot of memstore. Made for flusher.
volatile CellSkipListSet snapshot;
@VisibleForTesting
final KeyValue.KVComparator comparator;
// Used to track own heapSize
final AtomicLong size;
private volatile long snapshotSize;
// Used to track when to flush
volatile long timeOfOldestEdit = Long.MAX_VALUE;
private volatile long timeOfOldestEdit = Long.MAX_VALUE;
TimeRangeTracker timeRangeTracker;
TimeRangeTracker snapshotTimeRangeTracker;
private volatile long snapshotId;
private volatile boolean tagsPresent;
volatile MemStoreLAB allocator;
volatile MemStoreLAB snapshotAllocator;
volatile long snapshotId;
volatile boolean tagsPresent;
@VisibleForTesting
volatile Section activeSection;
@VisibleForTesting
volatile Section snapshotSection;
/**
* Default constructor. Used for tests.
@ -119,28 +107,8 @@ public class DefaultMemStore implements MemStore {
final KeyValue.KVComparator c) {
this.conf = conf;
this.comparator = c;
this.cellSet = new CellSkipListSet(c);
this.snapshot = new CellSkipListSet(c);
timeRangeTracker = new TimeRangeTracker();
snapshotTimeRangeTracker = new TimeRangeTracker();
this.size = new AtomicLong(DEEP_OVERHEAD);
this.snapshotSize = 0;
if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) {
String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName());
this.allocator = ReflectionUtils.instantiateWithCustomCtor(className,
new Class[] { Configuration.class }, new Object[] { conf });
} else {
this.allocator = null;
}
}
void dump() {
for (Cell cell: this.cellSet) {
LOG.info(cell);
}
for (Cell cell: this.snapshot) {
LOG.info(cell);
}
this.activeSection = Section.newActiveSection(comparator, conf);
this.snapshotSection = Section.newSnapshotSection(comparator);
}
/**
@ -151,31 +119,22 @@ public class DefaultMemStore implements MemStore {
public MemStoreSnapshot snapshot() {
// If snapshot currently has entries, then flusher failed or didn't call
// cleanup. Log a warning.
if (!this.snapshot.isEmpty()) {
if (!snapshotSection.getCellSkipListSet().isEmpty()) {
LOG.warn("Snapshot called again without clearing previous. " +
"Doing nothing. Another ongoing flush or did we fail last attempt?");
} else {
this.snapshotId = EnvironmentEdgeManager.currentTime();
this.snapshotSize = keySize();
if (!this.cellSet.isEmpty()) {
this.snapshot = this.cellSet;
this.cellSet = new CellSkipListSet(this.comparator);
this.snapshotTimeRangeTracker = this.timeRangeTracker;
this.timeRangeTracker = new TimeRangeTracker();
// Reset heap to not include any keys
this.size.set(DEEP_OVERHEAD);
this.snapshotAllocator = this.allocator;
// Reset allocator so we get a fresh buffer for the new memstore
if (allocator != null) {
String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName());
this.allocator = ReflectionUtils.instantiateWithCustomCtor(className,
new Class[] { Configuration.class }, new Object[] { conf });
}
if (!activeSection.getCellSkipListSet().isEmpty()) {
snapshotSection = activeSection;
activeSection = Section.newActiveSection(comparator, conf);
snapshotSection.getHeapSize().addAndGet(-DEEP_OVERHEAD);
timeOfOldestEdit = Long.MAX_VALUE;
}
}
MemStoreSnapshot memStoreSnapshot = new MemStoreSnapshot(this.snapshotId, snapshot.size(), this.snapshotSize,
this.snapshotTimeRangeTracker, new CollectionBackedScanner(snapshot, this.comparator),
MemStoreSnapshot memStoreSnapshot = new MemStoreSnapshot(this.snapshotId,
snapshotSection.getCellSkipListSet().size(), snapshotSection.getHeapSize().get(),
snapshotSection.getTimeRangeTracker(),
new CollectionBackedScanner(snapshotSection.getCellSkipListSet(), this.comparator),
this.tagsPresent);
this.tagsPresent = false;
return memStoreSnapshot;
@ -189,37 +148,29 @@ public class DefaultMemStore implements MemStore {
*/
@Override
public void clearSnapshot(long id) throws UnexpectedStateException {
MemStoreLAB tmpAllocator = null;
if (this.snapshotId == -1) return; // already cleared
if (this.snapshotId != id) {
throw new UnexpectedStateException("Current snapshot id is " + this.snapshotId + ",passed "
+ id);
}
// OK. Passed in snapshot is same as current snapshot. If not-empty,
// create a new snapshot and let the old one go.
if (!this.snapshot.isEmpty()) {
this.snapshot = new CellSkipListSet(this.comparator);
this.snapshotTimeRangeTracker = new TimeRangeTracker();
}
this.snapshotSize = 0;
this.snapshotId = -1;
if (this.snapshotAllocator != null) {
tmpAllocator = this.snapshotAllocator;
this.snapshotAllocator = null;
}
// OK. Passed in snapshot is same as current snapshot.
MemStoreLAB tmpAllocator = snapshotSection.getMemStoreLAB();
snapshotSection = Section.newSnapshotSection(comparator);
if (tmpAllocator != null) {
tmpAllocator.close();
}
this.snapshotId = -1;
}
@Override
public long getFlushableSize() {
return this.snapshotSize > 0 ? this.snapshotSize : keySize();
long snapshotSize = snapshotSection.getHeapSize().get();
return snapshotSize > 0 ? snapshotSize : keySize();
}
@Override
public long getSnapshotSize() {
return this.snapshotSize;
return snapshotSection.getHeapSize().get();
}
/**
@ -249,7 +200,7 @@ public class DefaultMemStore implements MemStore {
}
private boolean addToCellSet(Cell e) {
boolean b = this.cellSet.add(e);
boolean b = this.activeSection.getCellSkipListSet().add(e);
// In no tags case this NoTagsKeyValue.getTagsLength() is a cheap call.
// When we use ACL CP or Visibility CP which deals with Tags during
// mutation, the TagRewriteCell.getTagsLength() is a cheaper call. We do not
@ -262,7 +213,7 @@ public class DefaultMemStore implements MemStore {
}
private boolean removeFromCellSet(Cell e) {
boolean b = this.cellSet.remove(e);
boolean b = this.activeSection.getCellSkipListSet().remove(e);
setOldestEditTimeToNow();
return b;
}
@ -291,8 +242,8 @@ public class DefaultMemStore implements MemStore {
if (!notPresent && mslabUsed) {
s += getCellLength(toAdd);
}
timeRangeTracker.includeTimestamp(toAdd);
this.size.addAndGet(s);
activeSection.getTimeRangeTracker().includeTimestamp(toAdd);
activeSection.getHeapSize().addAndGet(s);
return s;
}
@ -305,12 +256,12 @@ public class DefaultMemStore implements MemStore {
}
private Cell maybeCloneWithAllocator(Cell cell) {
if (allocator == null) {
if (activeSection.getMemStoreLAB() == null) {
return cell;
}
int len = getCellLength(cell);
ByteRange alloc = allocator.allocateBytes(len);
ByteRange alloc = activeSection.getMemStoreLAB().allocateBytes(len);
if (alloc == null) {
// The allocation was too large, allocator decided
// not to do anything with it.
@ -338,18 +289,19 @@ public class DefaultMemStore implements MemStore {
// not the snapshot. The flush of this snapshot to disk has not
// yet started because Store.flush() waits for all rwcc transactions to
// commit before starting the flush to disk.
Cell found = this.snapshot.get(cell);
Cell found = snapshotSection.getCellSkipListSet().get(cell);
if (found != null && found.getSequenceId() == cell.getSequenceId()) {
this.snapshot.remove(cell);
snapshotSection.getCellSkipListSet().remove(cell);
long sz = heapSizeChange(cell, true);
this.snapshotSize -= sz;
snapshotSection.getHeapSize().addAndGet(-sz);
}
// If the key is in the memstore, delete it. Update this.size.
found = this.cellSet.get(cell);
found = activeSection.getCellSkipListSet().get(cell);
if (found != null && found.getSequenceId() == cell.getSequenceId()) {
removeFromCellSet(found);
long s = heapSizeChange(found, true);
this.size.addAndGet(-s);
long sz = heapSizeChange(found, true);
activeSection.getHeapSize().addAndGet(-sz);
}
}
@ -371,7 +323,8 @@ public class DefaultMemStore implements MemStore {
* @return Next row or null if none found.
*/
Cell getNextRow(final Cell cell) {
return getLowest(getNextRow(cell, this.cellSet), getNextRow(cell, this.snapshot));
return getLowest(getNextRow(cell, activeSection.getCellSkipListSet()),
getNextRow(cell, snapshotSection.getCellSkipListSet()));
}
/*
@ -416,8 +369,8 @@ public class DefaultMemStore implements MemStore {
*/
@Override
public void getRowKeyAtOrBefore(final GetClosestRowBeforeTracker state) {
getRowKeyAtOrBefore(cellSet, state);
getRowKeyAtOrBefore(snapshot, state);
getRowKeyAtOrBefore(activeSection.getCellSkipListSet(), state);
getRowKeyAtOrBefore(snapshotSection.getCellSkipListSet(), state);
}
/*
@ -515,7 +468,7 @@ public class DefaultMemStore implements MemStore {
long now) {
Cell firstCell = KeyValueUtil.createFirstOnRow(row, family, qualifier);
// Is there a Cell in 'snapshot' with the same TS? If so, upgrade the timestamp a bit.
SortedSet<Cell> snSs = snapshot.tailSet(firstCell);
SortedSet<Cell> snSs = snapshotSection.getCellSkipListSet().tailSet(firstCell);
if (!snSs.isEmpty()) {
Cell snc = snSs.first();
// is there a matching Cell in the snapshot?
@ -533,7 +486,7 @@ public class DefaultMemStore implements MemStore {
// so we cant add the new Cell w/o knowing what's there already, but we also
// want to take this chance to delete some cells. So two loops (sad)
SortedSet<Cell> ss = cellSet.tailSet(firstCell);
SortedSet<Cell> ss = activeSection.getCellSkipListSet().tailSet(firstCell);
for (Cell cell : ss) {
// if this isnt the row we are interested in, then bail:
if (!CellUtil.matchingColumn(cell, family, qualifier)
@ -612,7 +565,7 @@ public class DefaultMemStore implements MemStore {
cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
SortedSet<Cell> ss = cellSet.tailSet(firstCell);
SortedSet<Cell> ss = activeSection.getCellSkipListSet().tailSet(firstCell);
Iterator<Cell> it = ss.iterator();
// versions visible to oldest scanner
int versionsVisible = 0;
@ -635,7 +588,7 @@ public class DefaultMemStore implements MemStore {
// false means there was a change, so give us the size.
long delta = heapSizeChange(cur, true);
addedSize -= delta;
this.size.addAndGet(-delta);
activeSection.getHeapSize().addAndGet(-delta);
if (removedCells != null) {
removedCells.add(cur);
}
@ -694,7 +647,8 @@ public class DefaultMemStore implements MemStore {
*/
@Override
public List<KeyValueScanner> getScanners(long readPt) {
return Collections.<KeyValueScanner> singletonList(new MemStoreScanner(readPt));
return Collections.<KeyValueScanner> singletonList(
new MemStoreScanner(activeSection, snapshotSection, readPt, comparator));
}
/**
@ -705,14 +659,29 @@ public class DefaultMemStore implements MemStore {
* @return False if the key definitely does not exist in this Memstore
*/
public boolean shouldSeek(Scan scan, Store store, long oldestUnexpiredTS) {
return shouldSeek(activeSection.getTimeRangeTracker(),
snapshotSection.getTimeRangeTracker(), scan, store, oldestUnexpiredTS);
}
/**
* Check if this memstore may contain the required keys
* @param activeTimeRangeTracker the tracker of active data
* @param snapshotTimeRangeTracker the tracker of snapshot data
* @param scan scan
* @param store holds reference to cf
* @param oldestUnexpiredTS
* @return False if the key definitely does not exist in this Memstore
*/
private static boolean shouldSeek(TimeRangeTracker activeTimeRangeTracker,
TimeRangeTracker snapshotTimeRangeTracker, Scan scan, Store store, long oldestUnexpiredTS) {
byte[] cf = store.getFamily().getName();
TimeRange timeRange = scan.getColumnFamilyTimeRange().get(cf);
if (timeRange == null) {
timeRange = scan.getTimeRange();
}
return (timeRangeTracker.includesTimeRange(timeRange) ||
return (activeTimeRangeTracker.includesTimeRange(timeRange) ||
snapshotTimeRangeTracker.includesTimeRange(timeRange)) &&
(Math.max(timeRangeTracker.getMax(), snapshotTimeRangeTracker.getMax()) >= oldestUnexpiredTS);
(Math.max(activeTimeRangeTracker.getMax(), snapshotTimeRangeTracker.getMax()) >= oldestUnexpiredTS);
}
/*
@ -721,7 +690,7 @@ public class DefaultMemStore implements MemStore {
* map and snapshot.
* This behaves as if it were a real scanner but does not maintain position.
*/
protected class MemStoreScanner extends NonLazyKeyValueScanner {
protected static class MemStoreScanner extends NonLazyKeyValueScanner {
// Next row information for either cellSet or snapshot
private Cell cellSetNextRow = null;
private Cell snapshotNextRow = null;
@ -735,22 +704,18 @@ public class DefaultMemStore implements MemStore {
private Iterator<Cell> snapshotIt;
// The cellSet and snapshot at the time of creating this scanner
private CellSkipListSet cellSetAtCreation;
private CellSkipListSet snapshotAtCreation;
private final Section activeAtCreation;
private final Section snapshotAtCreation;
// the pre-calculated Cell to be returned by peek() or next()
private Cell theNext;
// The allocator and snapshot allocator at the time of creating this scanner
volatile MemStoreLAB allocatorAtCreation;
volatile MemStoreLAB snapshotAllocatorAtCreation;
// A flag represents whether could stop skipping Cells for MVCC
// if have encountered the next row. Only used for reversed scan
private boolean stopSkippingCellsIfNextRow = false;
private long readPoint;
private final long readPoint;
private final KeyValue.KVComparator comparator;
/*
Some notes...
@ -772,19 +737,16 @@ public class DefaultMemStore implements MemStore {
the adds to kvset in the MemStoreScanner.
*/
MemStoreScanner(long readPoint) {
super();
MemStoreScanner(Section activeSection, Section snapshotSection, long readPoint, final KeyValue.KVComparator c) {
this.readPoint = readPoint;
cellSetAtCreation = cellSet;
snapshotAtCreation = snapshot;
if (allocator != null) {
this.allocatorAtCreation = allocator;
this.allocatorAtCreation.incScannerCount();
this.comparator = c;
activeAtCreation = activeSection;
snapshotAtCreation = snapshotSection;
if (activeAtCreation.getMemStoreLAB() != null) {
activeAtCreation.getMemStoreLAB().incScannerCount();
}
if (snapshotAllocator != null) {
this.snapshotAllocatorAtCreation = snapshotAllocator;
this.snapshotAllocatorAtCreation.incScannerCount();
if (snapshotAtCreation.getMemStoreLAB() != null) {
snapshotAtCreation.getMemStoreLAB().incScannerCount();
}
if (Trace.isTracing() && Trace.currentSpan() != null) {
Trace.currentSpan().addTimelineAnnotation("Creating MemStoreScanner");
@ -839,8 +801,8 @@ public class DefaultMemStore implements MemStore {
}
// kvset and snapshot will never be null.
// if tailSet can't find anything, SortedSet is empty (not null).
cellSetIt = cellSetAtCreation.tailSet(key).iterator();
snapshotIt = snapshotAtCreation.tailSet(key).iterator();
cellSetIt = activeAtCreation.getCellSkipListSet().tailSet(key).iterator();
snapshotIt = snapshotAtCreation.getCellSkipListSet().tailSet(key).iterator();
cellSetItRow = null;
snapshotItRow = null;
@ -882,8 +844,8 @@ public class DefaultMemStore implements MemStore {
get it. So we remember the last keys we iterated to and restore
the reseeked set to at least that point.
*/
cellSetIt = cellSetAtCreation.tailSet(getHighest(key, cellSetItRow)).iterator();
snapshotIt = snapshotAtCreation.tailSet(getHighest(key, snapshotItRow)).iterator();
cellSetIt = activeAtCreation.getCellSkipListSet().tailSet(getHighest(key, cellSetItRow)).iterator();
snapshotIt = snapshotAtCreation.getCellSkipListSet().tailSet(getHighest(key, snapshotItRow)).iterator();
return seekInSubLists(key);
}
@ -951,20 +913,19 @@ public class DefaultMemStore implements MemStore {
return (first != null ? first : second);
}
@Override
public synchronized void close() {
this.cellSetNextRow = null;
this.snapshotNextRow = null;
this.cellSetIt = null;
this.snapshotIt = null;
if (allocatorAtCreation != null) {
this.allocatorAtCreation.decScannerCount();
this.allocatorAtCreation = null;
if (activeAtCreation != null && activeAtCreation.getMemStoreLAB() != null) {
activeAtCreation.getMemStoreLAB().decScannerCount();
}
if (snapshotAllocatorAtCreation != null) {
this.snapshotAllocatorAtCreation.decScannerCount();
this.snapshotAllocatorAtCreation = null;
if (snapshotAtCreation != null && snapshotAtCreation.getMemStoreLAB() != null) {
snapshotAtCreation.getMemStoreLAB().decScannerCount();
}
this.cellSetItRow = null;
@ -983,7 +944,8 @@ public class DefaultMemStore implements MemStore {
@Override
public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) {
return shouldSeek(scan, store, oldestUnexpiredTS);
return shouldSeek(activeAtCreation.getTimeRangeTracker(),
snapshotAtCreation.getTimeRangeTracker(), scan, store, oldestUnexpiredTS);
}
/**
@ -1012,9 +974,9 @@ public class DefaultMemStore implements MemStore {
do {
Cell firstKeyOnRow = KeyValueUtil.createFirstOnRow(key.getRowArray(), key.getRowOffset(),
key.getRowLength());
SortedSet<Cell> cellHead = cellSetAtCreation.headSet(firstKeyOnRow);
SortedSet<Cell> cellHead = activeAtCreation.getCellSkipListSet().headSet(firstKeyOnRow);
Cell cellSetBeforeRow = cellHead.isEmpty() ? null : cellHead.last();
SortedSet<Cell> snapshotHead = snapshotAtCreation
SortedSet<Cell> snapshotHead = snapshotAtCreation.getCellSkipListSet()
.headSet(firstKeyOnRow);
Cell snapshotBeforeRow = snapshotHead.isEmpty() ? null : snapshotHead
.last();
@ -1042,10 +1004,10 @@ public class DefaultMemStore implements MemStore {
@Override
public synchronized boolean seekToLastRow() {
Cell first = cellSetAtCreation.isEmpty() ? null : cellSetAtCreation
.last();
Cell second = snapshotAtCreation.isEmpty() ? null
: snapshotAtCreation.last();
Cell first = activeAtCreation.getCellSkipListSet().isEmpty() ? null
: activeAtCreation.getCellSkipListSet().last();
Cell second = snapshotAtCreation.getCellSkipListSet().isEmpty() ? null
: snapshotAtCreation.getCellSkipListSet().last();
Cell higherCell = getHighest(first, second);
if (higherCell == null) {
return false;
@ -1062,10 +1024,10 @@ public class DefaultMemStore implements MemStore {
}
public final static long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
+ (9 * ClassSize.REFERENCE) + (3 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN);
+ (4 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN);
public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
ClassSize.ATOMIC_LONG + (2 * ClassSize.TIMERANGE_TRACKER) +
(2 * ClassSize.ATOMIC_LONG) + (2 * ClassSize.TIMERANGE_TRACKER) +
(2 * ClassSize.CELL_SKIPLIST_SET) + (2 * ClassSize.CONCURRENT_SKIPLISTMAP));
/*
@ -1090,7 +1052,7 @@ public class DefaultMemStore implements MemStore {
*/
@Override
public long heapSize() {
return size.get();
return activeSection.getHeapSize().get();
}
@Override
@ -1140,4 +1102,63 @@ public class DefaultMemStore implements MemStore {
LOG.info("Exiting.");
}
/**
* Contains the fields which are useful to MemStoreScanner.
*/
@InterfaceAudience.Private
@VisibleForTesting
static class Section {
/**
* MemStore. Use a CellSkipListSet rather than SkipListSet because of the
* better semantics. The Map will overwrite if passed a key it already had
* whereas the Set will not add new Cell if key is same though value might be
* different. Value is not important -- just make sure always same reference passed.
*/
private final CellSkipListSet cellSet;
private final TimeRangeTracker tracker = new TimeRangeTracker();
/**
* Used to track own heapSize.
*/
private final AtomicLong heapSize;
private final MemStoreLAB allocator;
static Section newSnapshotSection(final KeyValue.KVComparator c) {
return new Section(c, null, 0);
}
static Section newActiveSection(final KeyValue.KVComparator c,
final Configuration conf) {
return new Section(c, conf, DEEP_OVERHEAD);
}
private Section(final KeyValue.KVComparator c,
final Configuration conf, long initHeapSize) {
this.cellSet = new CellSkipListSet(c);
this.heapSize = new AtomicLong(initHeapSize);
if (conf != null && conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) {
String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName());
this.allocator = ReflectionUtils.instantiateWithCustomCtor(className,
new Class[]{Configuration.class}, new Object[]{conf});
} else {
this.allocator = null;
}
}
CellSkipListSet getCellSkipListSet() {
return cellSet;
}
TimeRangeTracker getTimeRangeTracker() {
return tracker;
}
AtomicLong getHeapSize() {
return heapSize;
}
MemStoreLAB getMemStoreLAB() {
return allocator;
}
}
}

View File

@ -1152,7 +1152,14 @@ public class HStore implements Store {
*/
private void notifyChangedReadersObservers(List<StoreFile> sfs) throws IOException {
for (ChangedReadersObserver o : this.changedReaderObservers) {
o.updateReaders(sfs);
List<KeyValueScanner> memStoreScanners;
this.lock.readLock().lock();
try {
memStoreScanners = this.memstore.getScanners(o.getReadPoint());
} finally {
this.lock.readLock().unlock();
}
o.updateReaders(sfs, memStoreScanners);
}
}

View File

@ -50,8 +50,8 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.regionserver.querymatcher.CompactionScanQueryMatcher;
import org.apache.hadoop.hbase.regionserver.querymatcher.LegacyScanQueryMatcher;
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode;
import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher;
import org.apache.hadoop.hbase.util.CollectionUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
@ -128,9 +128,11 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// Indicates whether there was flush during the course of the scan
private volatile boolean flushed = false;
// generally we get one file from a flush
private List<StoreFile> flushedStoreFiles = new ArrayList<StoreFile>(1);
private final List<StoreFile> flushedStoreFiles = new ArrayList<StoreFile>(1);
// generally we get one memstroe scanner from a flush
private final List<KeyValueScanner> memStoreScannersAfterFlush = new ArrayList<>(1);
// The current list of scanners
private List<KeyValueScanner> currentScanners = new ArrayList<KeyValueScanner>();
private final List<KeyValueScanner> currentScanners = new ArrayList<KeyValueScanner>();
// flush update lock
private ReentrantLock flushLock = new ReentrantLock();
@ -453,6 +455,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
public void close() {
if (this.closing) return;
this.closing = true;
clearAndClose(memStoreScannersAfterFlush);
// Under test, we dont have a this.store
if (this.store != null)
this.store.deleteChangedReaderObserver(this);
@ -770,13 +773,33 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
return true;
}
@Override
public long getReadPoint() {
return readPt;
}
private static void clearAndClose(List<KeyValueScanner> scanners) {
for (KeyValueScanner s : scanners) {
s.close();
}
scanners.clear();
}
// Implementation of ChangedReadersObserver
@Override
public void updateReaders(List<StoreFile> sfs) throws IOException {
flushed = true;
public void updateReaders(List<StoreFile> sfs, List<KeyValueScanner> memStoreScanners) throws IOException {
if (CollectionUtils.isEmpty(sfs)
&& CollectionUtils.isEmpty(memStoreScanners)) {
return;
}
flushLock.lock();
try {
flushed = true;
flushedStoreFiles.addAll(sfs);
if (!CollectionUtils.isEmpty(memStoreScanners)) {
clearAndClose(memStoreScannersAfterFlush);
memStoreScannersAfterFlush.addAll(memStoreScanners);
}
} finally {
flushLock.unlock();
}
@ -836,12 +859,16 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
final boolean isCompaction = false;
boolean usePread = get || scanUsePread;
List<KeyValueScanner> scanners = null;
flushLock.lock();
try {
flushLock.lock();
scanners = selectScannersFrom(store.getScanners(flushedStoreFiles, cacheBlocks, get, usePread,
isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, true));
List<KeyValueScanner> allScanners = new ArrayList<>(flushedStoreFiles.size() + memStoreScannersAfterFlush.size());
allScanners.addAll(store.getScanners(flushedStoreFiles, cacheBlocks, get, usePread,
isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, false));
allScanners.addAll(memStoreScannersAfterFlush);
scanners = selectScannersFrom(allScanners);
// Clear the current set of flushed store files so that they don't get added again
flushedStoreFiles.clear();
memStoreScannersAfterFlush.clear();
} finally {
flushLock.unlock();
}
@ -851,7 +878,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// remove the older memstore scanner
for (int i = 0; i < currentScanners.size(); i++) {
if (!currentScanners.get(i).isFileScanner()) {
currentScanners.remove(i);
currentScanners.remove(i).close();
break;
}
}

View File

@ -304,13 +304,14 @@ public class TestHeapSize {
// DefaultMemStore Deep Overhead
actual = DefaultMemStore.DEEP_OVERHEAD;
expected = ClassSize.estimateBase(cl, false);
expected += ClassSize.estimateBase(AtomicLong.class, false);
expected += (2 * ClassSize.estimateBase(AtomicLong.class, false));
expected += (2 * ClassSize.estimateBase(CellSkipListSet.class, false));
expected += (2 * ClassSize.estimateBase(ConcurrentSkipListMap.class, false));
expected += (2 * ClassSize.estimateBase(TimeRangeTracker.class, false));
if(expected != actual) {
ClassSize.estimateBase(cl, true);
ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(CellSkipListSet.class, true);
ClassSize.estimateBase(CellSkipListSet.class, true);
ClassSize.estimateBase(ConcurrentSkipListMap.class, true);

View File

@ -87,8 +87,8 @@ public class TestDefaultMemStore extends TestCase {
byte [] other = Bytes.toBytes("somethingelse");
KeyValue samekey = new KeyValue(bytes, bytes, bytes, other);
this.memstore.add(samekey);
Cell found = this.memstore.cellSet.first();
assertEquals(1, this.memstore.cellSet.size());
Cell found = this.memstore.activeSection.getCellSkipListSet().first();
assertEquals(1, this.memstore.activeSection.getCellSkipListSet().size());
assertTrue(Bytes.toString(found.getValue()), CellUtil.matchingValue(samekey, found));
}
@ -99,13 +99,13 @@ public class TestDefaultMemStore extends TestCase {
long sizeChangeForSecondCell = this.memstore.add(kv);
// make sure memstore size increase won't double-count MSLAB chunk size
assertEquals(DefaultMemStore.heapSizeChange(kv, true), sizeChangeForFirstCell);
if (this.memstore.allocator != null) {
if (this.memstore.activeSection.getMemStoreLAB() != null) {
// make sure memstore size increased when using MSLAB
assertEquals(memstore.getCellLength(kv), sizeChangeForSecondCell);
// make sure chunk size increased even when writing the same cell, if using MSLAB
if (this.memstore.allocator instanceof HeapMemStoreLAB) {
if (this.memstore.activeSection.getMemStoreLAB() instanceof HeapMemStoreLAB) {
assertEquals(2 * memstore.getCellLength(kv),
((HeapMemStoreLAB) this.memstore.allocator).getCurrentChunk().getNextFreeOffset());
((HeapMemStoreLAB) this.memstore.activeSection.getMemStoreLAB()).getCurrentChunk().getNextFreeOffset());
}
} else {
// make sure no memstore size change w/o MSLAB
@ -493,7 +493,7 @@ public class TestDefaultMemStore extends TestCase {
for (int i = 0; i < snapshotCount; i++) {
addRows(this.memstore);
runSnapshot(this.memstore);
assertEquals("History not being cleared", 0, this.memstore.snapshot.size());
assertEquals("History not being cleared", 0, this.memstore.snapshotSection.getCellSkipListSet().size());
}
}
@ -514,7 +514,7 @@ public class TestDefaultMemStore extends TestCase {
m.add(key2);
assertTrue("Expected memstore to hold 3 values, actually has " +
m.cellSet.size(), m.cellSet.size() == 3);
m.activeSection.getCellSkipListSet().size(), m.activeSection.getCellSkipListSet().size() == 3);
}
//////////////////////////////////////////////////////////////////////////////
@ -588,12 +588,12 @@ public class TestDefaultMemStore extends TestCase {
memstore.add(new KeyValue(row, fam ,qf3, val));
//Creating a snapshot
memstore.snapshot();
assertEquals(3, memstore.snapshot.size());
assertEquals(3, memstore.snapshotSection.getCellSkipListSet().size());
//Adding value to "new" memstore
assertEquals(0, memstore.cellSet.size());
assertEquals(0, memstore.activeSection.getCellSkipListSet().size());
memstore.add(new KeyValue(row, fam ,qf4, val));
memstore.add(new KeyValue(row, fam ,qf5, val));
assertEquals(2, memstore.cellSet.size());
assertEquals(2, memstore.activeSection.getCellSkipListSet().size());
}
//////////////////////////////////////////////////////////////////////////////
@ -615,7 +615,7 @@ public class TestDefaultMemStore extends TestCase {
memstore.add(put2);
memstore.add(put3);
assertEquals(3, memstore.cellSet.size());
assertEquals(3, memstore.activeSection.getCellSkipListSet().size());
KeyValue del2 = new KeyValue(row, fam, qf1, ts2, KeyValue.Type.Delete, val);
memstore.delete(del2);
@ -626,9 +626,9 @@ public class TestDefaultMemStore extends TestCase {
expected.add(put2);
expected.add(put1);
assertEquals(4, memstore.cellSet.size());
assertEquals(4, memstore.activeSection.getCellSkipListSet().size());
int i = 0;
for(Cell cell : memstore.cellSet) {
for(Cell cell : memstore.activeSection.getCellSkipListSet()) {
assertEquals(expected.get(i++), cell);
}
}
@ -649,7 +649,7 @@ public class TestDefaultMemStore extends TestCase {
memstore.add(put2);
memstore.add(put3);
assertEquals(3, memstore.cellSet.size());
assertEquals(3, memstore.activeSection.getCellSkipListSet().size());
KeyValue del2 =
new KeyValue(row, fam, qf1, ts2, KeyValue.Type.DeleteColumn, val);
@ -662,9 +662,9 @@ public class TestDefaultMemStore extends TestCase {
expected.add(put1);
assertEquals(4, memstore.cellSet.size());
assertEquals(4, memstore.activeSection.getCellSkipListSet().size());
int i = 0;
for (Cell cell: memstore.cellSet) {
for (Cell cell: memstore.activeSection.getCellSkipListSet()) {
assertEquals(expected.get(i++), cell);
}
}
@ -702,9 +702,9 @@ public class TestDefaultMemStore extends TestCase {
assertEquals(5, memstore.cellSet.size());
assertEquals(5, memstore.activeSection.getCellSkipListSet().size());
int i = 0;
for (Cell cell: memstore.cellSet) {
for (Cell cell: memstore.activeSection.getCellSkipListSet()) {
assertEquals(expected.get(i++), cell);
}
}
@ -718,8 +718,8 @@ public class TestDefaultMemStore extends TestCase {
memstore.add(new KeyValue(row, fam, qf, ts, val));
KeyValue delete = new KeyValue(row, fam, qf, ts, KeyValue.Type.Delete, val);
memstore.delete(delete);
assertEquals(2, memstore.cellSet.size());
assertEquals(delete, memstore.cellSet.first());
assertEquals(2, memstore.activeSection.getCellSkipListSet().size());
assertEquals(delete, memstore.activeSection.getCellSkipListSet().first());
}
public void testRetainsDeleteVersion() throws IOException {
@ -731,8 +731,8 @@ public class TestDefaultMemStore extends TestCase {
"row1", "fam", "a", 100, KeyValue.Type.Delete, "dont-care");
memstore.delete(delete);
assertEquals(2, memstore.cellSet.size());
assertEquals(delete, memstore.cellSet.first());
assertEquals(2, memstore.activeSection.getCellSkipListSet().size());
assertEquals(delete, memstore.activeSection.getCellSkipListSet().first());
}
public void testRetainsDeleteColumn() throws IOException {
// add a put to memstore
@ -743,8 +743,8 @@ public class TestDefaultMemStore extends TestCase {
KeyValue.Type.DeleteColumn, "dont-care");
memstore.delete(delete);
assertEquals(2, memstore.cellSet.size());
assertEquals(delete, memstore.cellSet.first());
assertEquals(2, memstore.activeSection.getCellSkipListSet().size());
assertEquals(delete, memstore.activeSection.getCellSkipListSet().first());
}
public void testRetainsDeleteFamily() throws IOException {
// add a put to memstore
@ -755,8 +755,8 @@ public class TestDefaultMemStore extends TestCase {
KeyValue.Type.DeleteFamily, "dont-care");
memstore.delete(delete);
assertEquals(2, memstore.cellSet.size());
assertEquals(delete, memstore.cellSet.first());
assertEquals(2, memstore.activeSection.getCellSkipListSet().size());
assertEquals(delete, memstore.activeSection.getCellSkipListSet().first());
}
////////////////////////////////////
@ -856,7 +856,7 @@ public class TestDefaultMemStore extends TestCase {
public void testUpsertMemstoreSize() throws Exception {
Configuration conf = HBaseConfiguration.create();
memstore = new DefaultMemStore(conf, KeyValue.COMPARATOR);
long oldSize = memstore.size.get();
long oldSize = memstore.activeSection.getHeapSize().get();
List<Cell> l = new ArrayList<Cell>();
KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v");
@ -867,18 +867,18 @@ public class TestDefaultMemStore extends TestCase {
l.add(kv1); l.add(kv2); l.add(kv3);
this.memstore.upsert(l, 2, null);// readpoint is 2
long newSize = this.memstore.size.get();
long newSize = this.memstore.activeSection.getHeapSize().get();
assert(newSize > oldSize);
//The kv1 should be removed.
assert(memstore.cellSet.size() == 2);
assert(memstore.activeSection.getCellSkipListSet().size() == 2);
KeyValue kv4 = KeyValueTestUtil.create("r", "f", "q", 104, "v");
kv4.setSequenceId(1);
l.clear(); l.add(kv4);
this.memstore.upsert(l, 3, null);
assertEquals(newSize, this.memstore.size.get());
assertEquals(newSize, this.memstore.activeSection.getHeapSize().get());
//The kv2 should be removed.
assert(memstore.cellSet.size() == 2);
assert(memstore.activeSection.getCellSkipListSet().size() == 2);
//this.memstore = null;
}
@ -1039,10 +1039,10 @@ public class TestDefaultMemStore extends TestCase {
private long runSnapshot(final DefaultMemStore hmc) throws UnexpectedStateException {
// Save off old state.
int oldHistorySize = hmc.snapshot.size();
int oldHistorySize = hmc.snapshotSection.getCellSkipListSet().size();
MemStoreSnapshot snapshot = hmc.snapshot();
// Make some assertions about what just happened.
assertTrue("History size has not increased", oldHistorySize < hmc.snapshot.size());
assertTrue("History size has not increased", oldHistorySize < hmc.snapshotSection.getCellSkipListSet().size());
long t = memstore.timeOfOldestEdit();
assertTrue("Time of oldest edit is not Long.MAX_VALUE", t == Long.MAX_VALUE);
hmc.clearSnapshot(snapshot.getId());

View File

@ -2555,10 +2555,10 @@ public class TestHRegion {
// This is kinda hacky, but better than nothing...
long now = System.currentTimeMillis();
DefaultMemStore memstore = (DefaultMemStore) ((HStore) region.getStore(fam1)).memstore;
Cell firstCell = memstore.cellSet.first();
Cell firstCell = memstore.activeSection.getCellSkipListSet().first();
assertTrue(firstCell.getTimestamp() <= now);
now = firstCell.getTimestamp();
for (Cell cell : memstore.cellSet) {
for (Cell cell : memstore.activeSection.getCellSkipListSet()) {
assertTrue(cell.getTimestamp() <= now);
now = cell.getTimestamp();
}

View File

@ -116,13 +116,13 @@ public class TestMemStoreChunkPool {
// Creating a snapshot
MemStoreSnapshot snapshot = memstore.snapshot();
assertEquals(3, memstore.snapshot.size());
assertEquals(3, memstore.snapshotSection.getCellSkipListSet().size());
// Adding value to "new" memstore
assertEquals(0, memstore.cellSet.size());
assertEquals(0, memstore.activeSection.getCellSkipListSet().size());
memstore.add(new KeyValue(row, fam, qf4, val));
memstore.add(new KeyValue(row, fam, qf5, val));
assertEquals(2, memstore.cellSet.size());
assertEquals(2, memstore.activeSection.getCellSkipListSet().size());
memstore.clearSnapshot(snapshot.getId());
int chunkCount = chunkPool.getPoolSize();
@ -153,13 +153,13 @@ public class TestMemStoreChunkPool {
// Creating a snapshot
MemStoreSnapshot snapshot = memstore.snapshot();
assertEquals(3, memstore.snapshot.size());
assertEquals(3, memstore.snapshotSection.getCellSkipListSet().size());
// Adding value to "new" memstore
assertEquals(0, memstore.cellSet.size());
assertEquals(0, memstore.activeSection.getCellSkipListSet().size());
memstore.add(new KeyValue(row, fam, qf4, val));
memstore.add(new KeyValue(row, fam, qf5, val));
assertEquals(2, memstore.cellSet.size());
assertEquals(2, memstore.activeSection.getCellSkipListSet().size());
// opening scanner before clear the snapshot
List<KeyValueScanner> scanners = memstore.getScanners(0);

View File

@ -19,24 +19,31 @@
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -57,9 +64,9 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@ -69,16 +76,18 @@ import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.util.Progressable;
import org.junit.After;
import org.junit.Assert;
@ -89,9 +98,6 @@ import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.mockito.Mockito;
import com.google.common.collect.Lists;
import java.util.Arrays;
/**
* Test class for the Store
*/
@ -163,9 +169,14 @@ public class TestStore {
init(methodName, conf, htd, hcd);
}
@SuppressWarnings("deprecation")
private Store init(String methodName, Configuration conf, HTableDescriptor htd,
HColumnDescriptor hcd) throws IOException {
return init(methodName, conf, htd, hcd, null);
}
@SuppressWarnings("deprecation")
private Store init(String methodName, Configuration conf, HTableDescriptor htd,
HColumnDescriptor hcd, MyScannerHook hook) throws IOException {
//Setting up a Store
Path basedir = new Path(DIR+methodName);
Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName());
@ -186,8 +197,11 @@ public class TestStore {
final WALFactory wals = new WALFactory(walConf, null, methodName);
HRegion region = new HRegion(tableDir, wals.getWAL(info.getEncodedNameAsBytes(),
info.getTable().getNamespace()), fs, conf, info, htd, null);
store = new HStore(region, hcd, conf);
if (hook == null) {
store = new HStore(region, hcd, conf);
} else {
store = new MyStore(region, hcd, conf, hook);
}
return store;
}
@ -576,7 +590,7 @@ public class TestStore {
this.store.snapshot();
flushStore(store, id++);
Assert.assertEquals(storeFilessize, this.store.getStorefiles().size());
Assert.assertEquals(0, ((DefaultMemStore)this.store.memstore).cellSet.size());
Assert.assertEquals(0, ((DefaultMemStore)this.store.memstore).activeSection.getCellSkipListSet().size());
}
private void assertCheck() {
@ -621,7 +635,7 @@ public class TestStore {
flushStore(store, id++);
Assert.assertEquals(1, this.store.getStorefiles().size());
// from the one we inserted up there, and a new one
Assert.assertEquals(2, ((DefaultMemStore)this.store.memstore).cellSet.size());
Assert.assertEquals(2, ((DefaultMemStore)this.store.memstore).activeSection.getCellSkipListSet().size());
// how many key/values for this row are there?
Get get = new Get(row);
@ -690,7 +704,7 @@ public class TestStore {
}
long computedSize=0;
for (Cell cell : ((DefaultMemStore)this.store.memstore).cellSet) {
for (Cell cell : ((DefaultMemStore)this.store.memstore).activeSection.getCellSkipListSet()) {
long kvsize = DefaultMemStore.heapSizeChange(cell, true);
//System.out.println(kv + " size= " + kvsize + " kvsize= " + kv.heapSize());
computedSize += kvsize;
@ -722,7 +736,7 @@ public class TestStore {
// then flush.
flushStore(store, id++);
Assert.assertEquals(1, this.store.getStorefiles().size());
Assert.assertEquals(1, ((DefaultMemStore)this.store.memstore).cellSet.size());
Assert.assertEquals(1, ((DefaultMemStore)this.store.memstore).activeSection.getCellSkipListSet().size());
// now increment again:
newValue += 1;
@ -1136,4 +1150,113 @@ public class TestStore {
//ensure that replaceStoreFiles is not called if files are not refreshed
verify(spiedStore, times(0)).replaceStoreFiles(null, null);
}
}
private Cell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value) throws IOException {
Cell c = CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put.getCode(), value);
CellUtil.setSequenceId(c, sequenceId);
return c;
}
@Test
public void testScanWithDoubleFlush() throws IOException {
Configuration conf = HBaseConfiguration.create();
// Initialize region
MyStore myStore = initMyStore(name.getMethodName(), conf, new MyScannerHook() {
@Override
public void hook(final MyStore store) throws IOException {
final long tmpId = id++;
ExecutorService s = Executors.newSingleThreadExecutor();
s.submit(new Runnable() {
@Override
public void run() {
try {
// flush the store before storescanner updates the scanners from store.
// The current data will be flushed into files and the memstore will
// be clear.
// -- phase (4/4)
flushStore(store, tmpId);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
});
s.shutdown();
try {
// wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers.
s.awaitTermination(500, TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
}
}
});
byte[] oldValue = Bytes.toBytes("oldValue");
byte[] currentValue = Bytes.toBytes("currentValue");
long ts = EnvironmentEdgeManager.currentTime();
long seqId = 100;
// older data whihc shouldn't be "seen" by client
myStore.add(createCell(qf1, ts, seqId, oldValue));
myStore.add(createCell(qf2, ts, seqId, oldValue));
myStore.add(createCell(qf3, ts, seqId, oldValue));
long snapshotId = id++;
// push older data into snapshot -- phase (1/4)
StoreFlushContext storeFlushCtx = store.createFlushContext(snapshotId);
storeFlushCtx.prepare();
// insert current data into active -- phase (2/4)
myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue));
myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue));
myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue));
TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
quals.add(qf1);
quals.add(qf2);
quals.add(qf3);
try (InternalScanner scanner = (InternalScanner) myStore.getScanner(
new Scan(new Get(row)), quals, seqId + 1)) {
// complete the flush -- phase (3/4)
storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
List<Cell> results = new ArrayList<>();
scanner.next(results);
assertEquals(3, results.size());
for (Cell c : results) {
byte[] actualValue = CellUtil.cloneValue(c);
assertTrue("expected:" + Bytes.toStringBinary(currentValue)
+ ", actual:" + Bytes.toStringBinary(actualValue),
Bytes.equals(actualValue, currentValue));
}
}
}
private MyStore initMyStore(String methodName, Configuration conf, MyScannerHook hook) throws IOException {
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
HColumnDescriptor hcd = new HColumnDescriptor(family);
hcd.setMaxVersions(5);
return (MyStore) init(methodName, conf, htd, hcd, hook);
}
private static class MyStore extends HStore {
private final MyScannerHook hook;
MyStore(final HRegion region, final HColumnDescriptor family,
final Configuration confParam, MyScannerHook hook) throws IOException {
super(region, family, confParam);
this.hook = hook;
}
@Override
public List<KeyValueScanner> getScanners(List<StoreFile> files, boolean cacheBlocks,
boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher,
byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner) throws IOException {
hook.hook(this);
return super.getScanners(files, cacheBlocks, isGet, usePread,
isCompaction, matcher, startRow, stopRow, readPt, includeMemstoreScanner);
}
}
private interface MyScannerHook {
void hook(MyStore store) throws IOException;
}
}

View File

@ -24,6 +24,7 @@ import static org.apache.hadoop.hbase.regionserver.KeyValueScanFixture.scanFixtu
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.NavigableSet;
import java.util.TreeSet;
@ -831,9 +832,9 @@ public class TestStoreScanner extends TestCase {
// normally cause an NPE because scan.store is null. So as long as we get through these
// two calls we are good and the bug was quashed.
scan.updateReaders(new ArrayList<StoreFile>());
scan.updateReaders(Collections.EMPTY_LIST, Collections.EMPTY_LIST);
scan.updateReaders(new ArrayList<StoreFile>());
scan.updateReaders(Collections.EMPTY_LIST, Collections.EMPTY_LIST);
scan.peek();
}

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
@ -125,7 +126,7 @@ public class TestWideScanner extends HBaseTestCase {
((HRegion.RegionScannerImpl)s).storeHeap.getHeap().iterator();
while (scanners.hasNext()) {
StoreScanner ss = (StoreScanner)scanners.next();
ss.updateReaders(new ArrayList<StoreFile>());
ss.updateReaders(Collections.EMPTY_LIST, Collections.EMPTY_LIST);
}
} while (more);