HBASE-18221 Switch from pread to stream should happen under HStore's

reentrant lock (Ram)
This commit is contained in:
Ramkrishna 2017-06-23 10:32:29 +05:30
parent 7cc458e129
commit d092008766
7 changed files with 235 additions and 38 deletions

View File

@ -27,10 +27,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import com.google.common.collect.ImmutableCollection;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -40,6 +36,10 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
import com.google.common.collect.ImmutableCollection;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
/**
* Default implementation of StoreFileManager. Not thread-safe.
*/
@ -116,6 +116,14 @@ class DefaultStoreFileManager implements StoreFileManager {
return storefiles.size();
}
@Override
public final int getCompactedFilesCount() {
if (compactedfiles == null) {
return 0;
}
return compactedfiles.size();
}
@Override
public void addCompactionResults(
Collection<StoreFile> newCompactedfiles, Collection<StoreFile> results) {

View File

@ -28,6 +28,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.concurrent.Callable;
@ -727,6 +728,11 @@ public class HStore implements Store {
return this.storeEngine.getStoreFileManager().getStorefiles();
}
@Override
public Collection<StoreFile> getCompactedFiles() {
return this.storeEngine.getStoreFileManager().getCompactedfiles();
}
/**
* This throws a WrongRegionException if the HFile does not fit in this region, or an
* InvalidHFileException if the HFile is not valid.
@ -1927,6 +1933,41 @@ public class HStore implements Store {
return scanner;
}
@Override
public List<KeyValueScanner> recreateScanners(List<KeyValueScanner> currentFileScanners,
boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher,
byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
boolean includeMemstoreScanner) throws IOException {
this.lock.readLock().lock();
try {
Map<String, StoreFile> name2File =
new HashMap<>(getStorefilesCount() + getCompactedFilesCount());
for (StoreFile file : getStorefiles()) {
name2File.put(file.getFileInfo().getActiveFileName(), file);
}
if (getCompactedFiles() != null) {
for (StoreFile file : getCompactedFiles()) {
name2File.put(file.getFileInfo().getActiveFileName(), file);
}
}
List<StoreFile> filesToReopen = new ArrayList<>();
for (KeyValueScanner kvs : currentFileScanners) {
assert kvs.isFileScanner();
if (kvs.peek() == null) {
continue;
}
filesToReopen.add(name2File.get(kvs.getFilePath().getName()));
}
if (filesToReopen.isEmpty()) {
return null;
}
return getScanners(filesToReopen, cacheBlocks, false, false, matcher, startRow,
includeStartRow, stopRow, includeStopRow, readPt, false);
} finally {
this.lock.readLock().unlock();
}
}
@Override
public String toString() {
return this.getColumnFamilyName();
@ -1937,6 +1978,11 @@ public class HStore implements Store {
return this.storeEngine.getStoreFileManager().getStorefileCount();
}
@Override
public int getCompactedFilesCount() {
return this.storeEngine.getStoreFileManager().getCompactedFilesCount();
}
@Override
public long getMaxStoreFileAge() {
long earliestTS = Long.MAX_VALUE;

View File

@ -61,6 +61,8 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
Collection<StoreFile> getStorefiles();
Collection<StoreFile> getCompactedFiles();
/**
* Close all the readers We don't need to worry about subsequent requests because the Region
* holds a write lock that will prevent any more reads or writes.
@ -115,6 +117,27 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
ScanQueryMatcher matcher, byte[] startRow, boolean includeStartRow, byte[] stopRow,
boolean includeStopRow, long readPt) throws IOException;
/**
* Recreates the scanners on the current list of active store file scanners
* @param currentFileScanners the current set of active store file scanners
* @param cacheBlocks cache the blocks or not
* @param usePread use pread or not
* @param isCompaction is the scanner for compaction
* @param matcher the scan query matcher
* @param startRow the scan's start row
* @param includeStartRow should the scan include the start row
* @param stopRow the scan's stop row
* @param includeStopRow should the scan include the stop row
* @param readPt the read point of the current scane
* @param includeMemstoreScanner whether the current scanner should include memstorescanner
* @return list of scanners recreated on the current Scanners
* @throws IOException
*/
List<KeyValueScanner> recreateScanners(List<KeyValueScanner> currentFileScanners,
boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher,
byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
boolean includeMemstoreScanner) throws IOException;
/**
* Create scanners on the given files and if needed on the memstore with no filtering based on TTL
* (that happens further down the line).
@ -366,6 +389,11 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
*/
int getStorefilesCount();
/**
* @return Count of compacted store files
*/
int getCompactedFilesCount();
/**
* @return Max age of store files in this store
*/

View File

@ -103,6 +103,12 @@ public interface StoreFileManager {
*/
int getStorefileCount();
/**
* Returns the number of compacted files.
* @return The number of files.
*/
int getCompactedFilesCount();
/**
* Gets the store files to scan for a Scan or Get request.
* @param startRow Start row of the request.

View File

@ -966,7 +966,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
return heap.reseek(kv);
}
private void trySwitchToStreamRead() {
@VisibleForTesting
void trySwitchToStreamRead() {
if (readType != Scan.ReadType.DEFAULT || !scanUsePread || closing || heap.peek() == null ||
bytesRead < preadMaxBytes) {
return;
@ -977,34 +978,27 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
}
scanUsePread = false;
Cell lastTop = heap.peek();
Map<String, StoreFile> name2File = new HashMap<>(store.getStorefilesCount());
for (StoreFile file : store.getStorefiles()) {
name2File.put(file.getFileInfo().getActiveFileName(), file);
}
List<StoreFile> filesToReopen = new ArrayList<>();
List<KeyValueScanner> memstoreScanners = new ArrayList<>();
List<KeyValueScanner> scannersToClose = new ArrayList<>();
for (KeyValueScanner kvs : currentScanners) {
if (!kvs.isFileScanner()) {
// collect memstorescanners here
memstoreScanners.add(kvs);
} else {
scannersToClose.add(kvs);
if (kvs.peek() == null) {
continue;
}
filesToReopen.add(name2File.get(kvs.getFilePath().getName()));
}
}
if (filesToReopen.isEmpty()) {
return;
}
List<KeyValueScanner> fileScanners = null;
List<KeyValueScanner> newCurrentScanners;
KeyValueHeap newHeap;
try {
fileScanners =
store.getScanners(filesToReopen, cacheBlocks, false, false, matcher, scan.getStartRow(),
scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), readPt, false);
// recreate the scanners on the current file scanners
fileScanners = store.recreateScanners(scannersToClose, cacheBlocks, false, false,
matcher, scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(),
scan.includeStopRow(), readPt, false);
if (fileScanners == null) {
return;
}
seekScanners(fileScanners, lastTop, false, parallelSeekEnabled);
newCurrentScanners = new ArrayList<>(fileScanners.size() + memstoreScanners.size());
newCurrentScanners.addAll(fileScanners);

View File

@ -146,6 +146,11 @@ public class StripeStoreFileManager
return state.allCompactedFilesCached;
}
@Override
public int getCompactedFilesCount() {
return state.allCompactedFilesCached.size();
}
@Override
public void insertNewFiles(Collection<StoreFile> sfs) throws IOException {
CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(true);

View File

@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
@ -45,6 +46,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.commons.logging.Log;
@ -104,7 +106,6 @@ import org.junit.rules.TestName;
import org.mockito.Mockito;
import com.google.common.collect.Lists;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Test class for the Store
@ -161,19 +162,19 @@ public class TestStore {
init(methodName, TEST_UTIL.getConfiguration());
}
private void init(String methodName, Configuration conf)
private Store init(String methodName, Configuration conf)
throws IOException {
HColumnDescriptor hcd = new HColumnDescriptor(family);
// some of the tests write 4 versions and then flush
// (with HBASE-4241, lower versions are collected on flush)
hcd.setMaxVersions(4);
init(methodName, conf, hcd);
return init(methodName, conf, hcd);
}
private void init(String methodName, Configuration conf,
private Store init(String methodName, Configuration conf,
HColumnDescriptor hcd) throws IOException {
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
init(methodName, conf, htd, hcd);
return init(methodName, conf, htd, hcd);
}
private Store init(String methodName, Configuration conf, HTableDescriptor htd,
@ -184,6 +185,11 @@ public class TestStore {
@SuppressWarnings("deprecation")
private Store init(String methodName, Configuration conf, HTableDescriptor htd,
HColumnDescriptor hcd, MyScannerHook hook) throws IOException {
return init(methodName, conf, htd, hcd, hook, false);
}
@SuppressWarnings("deprecation")
private Store init(String methodName, Configuration conf, HTableDescriptor htd,
HColumnDescriptor hcd, MyScannerHook hook, boolean switchToPread) throws IOException {
//Setting up a Store
Path basedir = new Path(DIR+methodName);
Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName());
@ -198,7 +204,8 @@ public class TestStore {
} else {
htd.addFamily(hcd);
}
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null);
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false,
MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null);
HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
final Configuration walConf = new Configuration(conf);
FSUtils.setRootDir(walConf, basedir);
@ -208,7 +215,7 @@ public class TestStore {
if (hook == null) {
store = new HStore(region, hcd, conf);
} else {
store = new MyStore(region, hcd, conf, hook);
store = new MyStore(region, hcd, conf, hook, switchToPread);
}
return store;
}
@ -833,9 +840,10 @@ public class TestStore {
public static class DummyStoreEngine extends DefaultStoreEngine {
public static DefaultCompactor lastCreatedCompactor = null;
@Override
protected void createComponents(
Configuration conf, Store store, CellComparator comparator) throws IOException {
protected void createComponents(Configuration conf, Store store, CellComparator comparator)
throws IOException {
super.createComponents(conf, store, comparator);
lastCreatedCompactor = this.compactor;
}
@ -1039,6 +1047,13 @@ public class TestStore {
return c;
}
private Cell createCell(byte[] row, byte[] qualifier, long ts, long sequenceId, byte[] value)
throws IOException {
Cell c = CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put.getCode(), value);
CellUtil.setSequenceId(c, sequenceId);
return c;
}
@Test
public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException {
Configuration conf = HBaseConfiguration.create();
@ -1269,17 +1284,19 @@ public class TestStore {
storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
}
private MyStore initMyStore(String methodName, Configuration conf, MyScannerHook hook) throws IOException {
private MyStore initMyStore(String methodName, Configuration conf, MyScannerHook hook)
throws IOException {
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
HColumnDescriptor hcd = new HColumnDescriptor(family);
hcd.setMaxVersions(5);
return (MyStore) init(methodName, conf, htd, hcd, hook);
}
private static class MyStore extends HStore {
class MyStore extends HStore {
private final MyScannerHook hook;
MyStore(final HRegion region, final HColumnDescriptor family,
final Configuration confParam, MyScannerHook hook) throws IOException {
MyStore(final HRegion region, final HColumnDescriptor family, final Configuration confParam,
MyScannerHook hook, boolean switchToPread) throws IOException {
super(region, family, confParam);
this.hook = hook;
}
@ -1290,14 +1307,107 @@ public class TestStore {
boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
boolean includeMemstoreScanner) throws IOException {
hook.hook(this);
return super.getScanners(files, cacheBlocks, usePread,
isCompaction, matcher, startRow, true, stopRow, false, readPt, includeMemstoreScanner);
return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true,
stopRow, false, readPt, includeMemstoreScanner);
}
}
private interface MyScannerHook {
void hook(MyStore store) throws IOException;
}
@Test
public void testSwitchingPreadtoStreamParallelyWithCompactionDischarger() throws Exception {
int flushSize = 500;
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName());
conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0);
// Set the lower threshold to invoke the "MERGE" policy
HColumnDescriptor hcd = new HColumnDescriptor(family);
hcd.setInMemoryCompaction(MemoryCompactionPolicy.BASIC);
MyStore store = initMyStore(name.getMethodName(), conf, new MyScannerHook() {
@Override
public void hook(org.apache.hadoop.hbase.regionserver.TestStore.MyStore store)
throws IOException {
}
});
MemstoreSize memStoreSize = new MemstoreSize();
long ts = System.currentTimeMillis();
long seqID = 1l;
// Add some data to the region and do some flushes
for (int i = 1; i < 10; i++) {
store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
memStoreSize);
}
// flush them
flushStore(store, seqID);
for (int i = 11; i < 20; i++) {
store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
memStoreSize);
}
// flush them
flushStore(store, seqID);
for (int i = 21; i < 30; i++) {
store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
memStoreSize);
}
// flush them
flushStore(store, seqID);
assertEquals(3, store.getStorefilesCount());
ScanInfo scanInfo = store.getScanInfo();
Scan scan = new Scan();
scan.addFamily(family);
Collection<StoreFile> storefiles2 = store.getStorefiles();
ArrayList<StoreFile> actualStorefiles = Lists.newArrayList(storefiles2);
StoreScanner storeScanner =
(StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE);
// get the current heap
KeyValueHeap heap = storeScanner.heap;
// create more store files
for (int i = 31; i < 40; i++) {
store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
memStoreSize);
}
// flush them
flushStore(store, seqID);
for (int i = 41; i < 50; i++) {
store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
memStoreSize);
}
// flush them
flushStore(store, seqID);
storefiles2 = store.getStorefiles();
ArrayList<StoreFile> actualStorefiles1 = Lists.newArrayList(storefiles2);
actualStorefiles1.removeAll(actualStorefiles);
// Do compaction
List<Exception> exceptions = new ArrayList<Exception>();
MyThread thread = new MyThread(storeScanner);
thread.start();
store.replaceStoreFiles(actualStorefiles, actualStorefiles1);
thread.join();
KeyValueHeap heap2 = thread.getHeap();
assertFalse(heap.equals(heap2));
}
private static class MyThread extends Thread {
private StoreScanner scanner;
private KeyValueHeap heap;
public MyThread(StoreScanner scanner) {
this.scanner = scanner;
}
public KeyValueHeap getHeap() {
return this.heap;
}
public void run() {
scanner.trySwitchToStreamRead();
heap = scanner.heap;
}
}
private static class MyMemStoreCompactor extends MemStoreCompactor {
private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1);