HBASE-17887 Row-level consistency is broken for read

This commit is contained in:
Chia-Ping Tsai 2017-05-12 19:45:07 +08:00
parent 5e046151d6
commit b34ab5980e
7 changed files with 293 additions and 28 deletions

View File

@ -21,18 +21,24 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* If set of MapFile.Readers in Store change, implementors are notified.
*/
@InterfaceAudience.Private
public interface ChangedReadersObserver {
/**
* @return the read point of the current scan
*/
long getReadPoint();
/**
* Notify observers.
* @param sfs The new files
* @param memStoreScanners scanner of current memstore
* @throws IOException e
*/
void updateReaders(List<StoreFile> sfs) throws IOException;
void updateReaders(List<StoreFile> sfs, List<KeyValueScanner> memStoreScanners) throws IOException;
}

View File

@ -317,18 +317,24 @@ public class CompactingMemStore extends AbstractMemStore {
* Scanners are ordered from 0 (oldest) to newest in increasing order.
*/
public List<KeyValueScanner> getScanners(long readPt) throws IOException {
MutableSegment activeTmp = active;
List<? extends Segment> pipelineList = pipeline.getSegments();
List<? extends Segment> snapshotList = snapshot.getAllSegments();
long order = 1 + pipelineList.size() + snapshotList.size();
// The list of elements in pipeline + the active element + the snapshot segment
// The order is the Segment ordinal
List<KeyValueScanner> list = new ArrayList<KeyValueScanner>((int) order);
order = addToScanners(active, readPt, order, list);
List<KeyValueScanner> list = createList((int) order);
order = addToScanners(activeTmp, readPt, order, list);
order = addToScanners(pipelineList, readPt, order, list);
addToScanners(snapshotList, readPt, order, list);
return list;
}
@VisibleForTesting
protected List<KeyValueScanner> createList(int capacity) {
return new ArrayList<>(capacity);
}
/**
* Check whether anything need to be done based on the current active set size.
* The method is invoked upon every addition to the active set.
@ -428,7 +434,7 @@ public class CompactingMemStore extends AbstractMemStore {
}
}
private void pushActiveToPipeline(MutableSegment active) {
protected void pushActiveToPipeline(MutableSegment active) {
if (!active.isEmpty()) {
pipeline.pushHead(active);
resetActive();

View File

@ -105,7 +105,7 @@ import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
*/
@InterfaceAudience.Private
public class HStore implements Store {
private static final String MEMSTORE_CLASS_NAME = "hbase.regionserver.memstore.class";
public static final String MEMSTORE_CLASS_NAME = "hbase.regionserver.memstore.class";
public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY =
"hbase.server.compactchecker.interval.multiplier";
public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles";
@ -244,22 +244,25 @@ public class HStore implements Store {
// Why not just pass a HColumnDescriptor in here altogether? Even if have
// to clone it?
scanInfo = new ScanInfo(conf, family, ttl, timeToPurgeDeletes, this.comparator);
String className = conf.get(MEMSTORE_CLASS_NAME, DefaultMemStore.class.getName());
MemoryCompactionPolicy inMemoryCompaction = family.getInMemoryCompaction();
if(inMemoryCompaction == null) {
inMemoryCompaction = MemoryCompactionPolicy.valueOf(
conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT));
}
String className;
switch (inMemoryCompaction) {
case BASIC :
case EAGER :
className = CompactingMemStore.class.getName();
this.memstore = new CompactingMemStore(conf, this.comparator, this,
this.getHRegion().getRegionServicesForStores(), inMemoryCompaction);
Class<? extends CompactingMemStore> clz = conf.getClass(MEMSTORE_CLASS_NAME,
CompactingMemStore.class, CompactingMemStore.class);
className = clz.getName();
this.memstore = ReflectionUtils.newInstance(clz, new Object[] { conf, this.comparator, this,
this.getHRegion().getRegionServicesForStores(), inMemoryCompaction});
break;
case NONE :
default:
className = DefaultMemStore.class.getName();
this.memstore = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {
Configuration.class, CellComparator.class }, new Object[] { conf, this.comparator });
}
@ -1149,7 +1152,14 @@ public class HStore implements Store {
*/
private void notifyChangedReadersObservers(List<StoreFile> sfs) throws IOException {
for (ChangedReadersObserver o : this.changedReaderObservers) {
o.updateReaders(sfs);
List<KeyValueScanner> memStoreScanners;
this.lock.readLock().lock();
try {
memStoreScanners = this.memstore.getScanners(o.getReadPoint());
} finally {
this.lock.readLock().unlock();
}
o.updateReaders(sfs, memStoreScanners);
}
}

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.regionserver.querymatcher.CompactionScanQueryMatc
import org.apache.hadoop.hbase.regionserver.querymatcher.LegacyScanQueryMatcher;
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher;
import org.apache.hadoop.hbase.util.CollectionUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
@ -145,6 +146,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
private volatile boolean flushed = false;
// generally we get one file from a flush
private final List<StoreFile> flushedStoreFiles = new ArrayList<>(1);
// generally we get one memstroe scanner from a flush
private final List<KeyValueScanner> memStoreScannersAfterFlush = new ArrayList<>(1);
// The current list of scanners
private final List<KeyValueScanner> currentScanners = new ArrayList<>();
// flush update lock
@ -493,6 +496,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
this.store.deleteChangedReaderObserver(this);
}
if (withHeapClose) {
clearAndClose(memStoreScannersAfterFlush);
for (KeyValueHeap h : this.heapsForDelayedClose) {
h.close();
}
@ -814,13 +818,33 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
return true;
}
@Override
public long getReadPoint() {
return this.readPt;
}
private static void clearAndClose(List<KeyValueScanner> scanners) {
for (KeyValueScanner s : scanners) {
s.close();
}
scanners.clear();
}
// Implementation of ChangedReadersObserver
@Override
public void updateReaders(List<StoreFile> sfs) throws IOException {
flushed = true;
public void updateReaders(List<StoreFile> sfs, List<KeyValueScanner> memStoreScanners) throws IOException {
if (CollectionUtils.isEmpty(sfs)
&& CollectionUtils.isEmpty(memStoreScanners)) {
return;
}
flushLock.lock();
try {
flushed = true;
flushedStoreFiles.addAll(sfs);
if (!CollectionUtils.isEmpty(memStoreScanners)) {
clearAndClose(memStoreScannersAfterFlush);
memStoreScannersAfterFlush.addAll(memStoreScanners);
}
} finally {
flushLock.unlock();
}
@ -838,10 +862,14 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
List<KeyValueScanner> scanners;
flushLock.lock();
try {
scanners = selectScannersFrom(store.getScanners(flushedStoreFiles, cacheBlocks, get,
scanUsePread, false, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, true));
List<KeyValueScanner> allScanners = new ArrayList<>(flushedStoreFiles.size() + memStoreScannersAfterFlush.size());
allScanners.addAll(store.getScanners(flushedStoreFiles, cacheBlocks, get,
scanUsePread, false, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, false));
allScanners.addAll(memStoreScannersAfterFlush);
scanners = selectScannersFrom(allScanners);
// Clear the current set of flushed store files so that they don't get added again
flushedStoreFiles.clear();
memStoreScannersAfterFlush.clear();
} finally {
flushLock.unlock();
}
@ -851,7 +879,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// remove the older memstore scanner
for (int i = 0; i < currentScanners.size(); i++) {
if (!currentScanners.get(i).isFileScanner()) {
currentScanners.remove(i);
currentScanners.remove(i).close();
break;
}
}

View File

@ -28,7 +28,6 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.security.PrivilegedExceptionAction;
@ -38,9 +37,16 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.IntStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -53,14 +59,17 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MemoryCompactionPolicy;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@ -70,6 +79,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -161,9 +171,14 @@ public class TestStore {
init(methodName, conf, htd, hcd);
}
@SuppressWarnings("deprecation")
private Store init(String methodName, Configuration conf, HTableDescriptor htd,
HColumnDescriptor hcd) throws IOException {
return init(methodName, conf, htd, hcd, null);
}
@SuppressWarnings("deprecation")
private Store init(String methodName, Configuration conf, HTableDescriptor htd,
HColumnDescriptor hcd, MyScannerHook hook) throws IOException {
//Setting up a Store
Path basedir = new Path(DIR+methodName);
Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName());
@ -185,8 +200,11 @@ public class TestStore {
final WALFactory wals = new WALFactory(walConf, null, methodName);
HRegion region = new HRegion(tableDir, wals.getWAL(info.getEncodedNameAsBytes(),
info.getTable().getNamespace()), fs, conf, info, htd, null);
if (hook == null) {
store = new HStore(region, hcd, conf);
} else {
store = new MyStore(region, hcd, conf, hook);
}
return store;
}
@ -939,4 +957,199 @@ public class TestStore {
//ensure that replaceStoreFiles is not called if files are not refreshed
verify(spiedStore, times(0)).replaceStoreFiles(null, null);
}
private Cell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value) throws IOException {
Cell c = CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put.getCode(), value);
CellUtil.setSequenceId(c, sequenceId);
return c;
}
@Test
public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException {
Configuration conf = HBaseConfiguration.create();
conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore.class.getName());
HColumnDescriptor hcd = new HColumnDescriptor(family);
hcd.setInMemoryCompaction(MemoryCompactionPolicy.BASIC);
init(name.getMethodName(), conf, hcd);
byte[] value = Bytes.toBytes("value");
MemstoreSize memStoreSize = new MemstoreSize();
long ts = EnvironmentEdgeManager.currentTime();
long seqId = 100;
// older data whihc shouldn't be "seen" by client
store.add(createCell(qf1, ts, seqId, value), memStoreSize);
store.add(createCell(qf2, ts, seqId, value), memStoreSize);
store.add(createCell(qf3, ts, seqId, value), memStoreSize);
TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
quals.add(qf1);
quals.add(qf2);
quals.add(qf3);
StoreFlushContext storeFlushCtx = store.createFlushContext(id++);
MyCompactingMemStore.START_TEST.set(true);
Runnable flush = () -> {
// this is blocked until we create first scanner from pipeline and snapshot -- phase (1/5)
// recreate the active memstore -- phase (4/5)
storeFlushCtx.prepare();
};
ExecutorService service = Executors.newSingleThreadExecutor();
service.submit(flush);
// we get scanner from pipeline and snapshot but they are empty. -- phase (2/5)
// this is blocked until we recreate the active memstore -- phase (3/5)
// we get scanner from active memstore but it is empty -- phase (5/5)
InternalScanner scanner = (InternalScanner) store.getScanner(
new Scan(new Get(row)), quals, seqId + 1);
service.shutdown();
service.awaitTermination(20, TimeUnit.SECONDS);
try {
try {
List<Cell> results = new ArrayList<>();
scanner.next(results);
assertEquals(3, results.size());
for (Cell c : results) {
byte[] actualValue = CellUtil.cloneValue(c);
assertTrue("expected:" + Bytes.toStringBinary(value)
+ ", actual:" + Bytes.toStringBinary(actualValue)
, Bytes.equals(actualValue, value));
}
} finally {
scanner.close();
}
} finally {
MyCompactingMemStore.START_TEST.set(false);
storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
}
}
@Test
public void testScanWithDoubleFlush() throws IOException {
Configuration conf = HBaseConfiguration.create();
// Initialize region
MyStore myStore = initMyStore(name.getMethodName(), conf, (final MyStore store1) -> {
final long tmpId = id++;
ExecutorService s = Executors.newSingleThreadExecutor();
s.submit(() -> {
try {
// flush the store before storescanner updates the scanners from store.
// The current data will be flushed into files, and the memstore will
// be clear.
// -- phase (4/4)
flushStore(store1, tmpId);
}catch (IOException ex) {
throw new RuntimeException(ex);
}
});
s.shutdown();
try {
// wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers.
s.awaitTermination(3, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
}
});
byte[] oldValue = Bytes.toBytes("oldValue");
byte[] currentValue = Bytes.toBytes("currentValue");
MemstoreSize memStoreSize = new MemstoreSize();
long ts = EnvironmentEdgeManager.currentTime();
long seqId = 100;
// older data whihc shouldn't be "seen" by client
myStore.add(createCell(qf1, ts, seqId, oldValue), memStoreSize);
myStore.add(createCell(qf2, ts, seqId, oldValue), memStoreSize);
myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSize);
long snapshotId = id++;
// push older data into snapshot -- phase (1/4)
StoreFlushContext storeFlushCtx = store.createFlushContext(snapshotId);
storeFlushCtx.prepare();
// insert current data into active -- phase (2/4)
myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue), memStoreSize);
myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue), memStoreSize);
myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue), memStoreSize);
TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
quals.add(qf1);
quals.add(qf2);
quals.add(qf3);
try (InternalScanner scanner = (InternalScanner) myStore.getScanner(
new Scan(new Get(row)), quals, seqId + 1)) {
// complete the flush -- phase (3/4)
storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
List<Cell> results = new ArrayList<>();
scanner.next(results);
assertEquals(3, results.size());
for (Cell c : results) {
byte[] actualValue = CellUtil.cloneValue(c);
assertTrue("expected:" + Bytes.toStringBinary(currentValue)
+ ", actual:" + Bytes.toStringBinary(actualValue)
, Bytes.equals(actualValue, currentValue));
}
}
}
private MyStore initMyStore(String methodName, Configuration conf, MyScannerHook hook) throws IOException {
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
HColumnDescriptor hcd = new HColumnDescriptor(family);
hcd.setMaxVersions(5);
return (MyStore) init(methodName, conf, htd, hcd, hook);
}
private static class MyStore extends HStore {
private final MyScannerHook hook;
MyStore(final HRegion region, final HColumnDescriptor family,
final Configuration confParam, MyScannerHook hook) throws IOException {
super(region, family, confParam);
this.hook = hook;
}
@Override
public List<KeyValueScanner> getScanners(List<StoreFile> files, boolean cacheBlocks,
boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
boolean includeMemstoreScanner) throws IOException {
hook.hook(this);
return super.getScanners(files, cacheBlocks, usePread,
isCompaction, matcher, startRow, true, stopRow, false, readPt, includeMemstoreScanner);
}
}
private interface MyScannerHook {
void hook(MyStore store) throws IOException;
}
public static class MyCompactingMemStore extends CompactingMemStore {
private static final AtomicBoolean START_TEST = new AtomicBoolean(false);
private final CountDownLatch getScannerLatch = new CountDownLatch(1);
private final CountDownLatch snapshotLatch = new CountDownLatch(1);
public MyCompactingMemStore(Configuration conf, CellComparator c,
HStore store, RegionServicesForStores regionServices,
MemoryCompactionPolicy compactionPolicy) throws IOException {
super(conf, c, store, regionServices, compactionPolicy);
}
@Override
protected List<KeyValueScanner> createList(int capacity) {
if (START_TEST.get()) {
try {
getScannerLatch.countDown();
snapshotLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return new ArrayList<>(capacity);
}
@Override
protected void pushActiveToPipeline(MutableSegment active) {
if (START_TEST.get()) {
try {
getScannerLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
super.pushActiveToPipeline(active);
if (START_TEST.get()) {
snapshotLatch.countDown();
}
}
}
}

View File

@ -25,6 +25,7 @@ import static org.junit.Assert.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.NavigableSet;
import java.util.TreeSet;
@ -861,9 +862,9 @@ public class TestStoreScanner {
// normally cause an NPE because scan.store is null. So as long as we get through these
// two calls we are good and the bug was quashed.
scan.updateReaders(new ArrayList<>());
scan.updateReaders(Collections.EMPTY_LIST, Collections.EMPTY_LIST);
scan.updateReaders(new ArrayList<>());
scan.updateReaders(Collections.EMPTY_LIST, Collections.EMPTY_LIST);
scan.peek();
}

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
@ -130,7 +131,7 @@ public class TestWideScanner extends HBaseTestCase {
((HRegion.RegionScannerImpl)s).storeHeap.getHeap().iterator();
while (scanners.hasNext()) {
StoreScanner ss = (StoreScanner)scanners.next();
ss.updateReaders(new ArrayList<>());
ss.updateReaders(Collections.EMPTY_LIST, Collections.EMPTY_LIST);
}
} while (more);