HBASE-18221 Switch from pread to stream should happen under HStore's
reentrant lock (Ram)
This commit is contained in:
parent
0357c2266a
commit
49825aec6b
|
@ -27,10 +27,6 @@ import java.util.Iterator;
|
|||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import com.google.common.collect.ImmutableCollection;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -40,6 +36,10 @@ import org.apache.hadoop.hbase.KeyValue;
|
|||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
|
||||
|
||||
import com.google.common.collect.ImmutableCollection;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
/**
|
||||
* Default implementation of StoreFileManager. Not thread-safe.
|
||||
*/
|
||||
|
@ -116,6 +116,14 @@ class DefaultStoreFileManager implements StoreFileManager {
|
|||
return storefiles.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public final int getCompactedFilesCount() {
|
||||
if (compactedfiles == null) {
|
||||
return 0;
|
||||
}
|
||||
return compactedfiles.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addCompactionResults(
|
||||
Collection<StoreFile> newCompactedfiles, Collection<StoreFile> results) {
|
||||
|
|
|
@ -28,6 +28,7 @@ import java.util.HashMap;
|
|||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
|
@ -726,6 +727,11 @@ public class HStore implements Store {
|
|||
return this.storeEngine.getStoreFileManager().getStorefiles();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<StoreFile> getCompactedFiles() {
|
||||
return this.storeEngine.getStoreFileManager().getCompactedfiles();
|
||||
}
|
||||
|
||||
/**
|
||||
* This throws a WrongRegionException if the HFile does not fit in this region, or an
|
||||
* InvalidHFileException if the HFile is not valid.
|
||||
|
@ -1926,6 +1932,41 @@ public class HStore implements Store {
|
|||
return scanner;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<KeyValueScanner> recreateScanners(List<KeyValueScanner> currentFileScanners,
|
||||
boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher,
|
||||
byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
|
||||
boolean includeMemstoreScanner) throws IOException {
|
||||
this.lock.readLock().lock();
|
||||
try {
|
||||
Map<String, StoreFile> name2File =
|
||||
new HashMap<>(getStorefilesCount() + getCompactedFilesCount());
|
||||
for (StoreFile file : getStorefiles()) {
|
||||
name2File.put(file.getFileInfo().getActiveFileName(), file);
|
||||
}
|
||||
if (getCompactedFiles() != null) {
|
||||
for (StoreFile file : getCompactedFiles()) {
|
||||
name2File.put(file.getFileInfo().getActiveFileName(), file);
|
||||
}
|
||||
}
|
||||
List<StoreFile> filesToReopen = new ArrayList<>();
|
||||
for (KeyValueScanner kvs : currentFileScanners) {
|
||||
assert kvs.isFileScanner();
|
||||
if (kvs.peek() == null) {
|
||||
continue;
|
||||
}
|
||||
filesToReopen.add(name2File.get(kvs.getFilePath().getName()));
|
||||
}
|
||||
if (filesToReopen.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
return getScanners(filesToReopen, cacheBlocks, false, false, matcher, startRow,
|
||||
includeStartRow, stopRow, includeStopRow, readPt, false);
|
||||
} finally {
|
||||
this.lock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return this.getColumnFamilyName();
|
||||
|
@ -1936,6 +1977,11 @@ public class HStore implements Store {
|
|||
return this.storeEngine.getStoreFileManager().getStorefileCount();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getCompactedFilesCount() {
|
||||
return this.storeEngine.getStoreFileManager().getCompactedFilesCount();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMaxStoreFileAge() {
|
||||
long earliestTS = Long.MAX_VALUE;
|
||||
|
|
|
@ -61,6 +61,8 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
|
|||
|
||||
Collection<StoreFile> getStorefiles();
|
||||
|
||||
Collection<StoreFile> getCompactedFiles();
|
||||
|
||||
/**
|
||||
* Close all the readers We don't need to worry about subsequent requests because the Region
|
||||
* holds a write lock that will prevent any more reads or writes.
|
||||
|
@ -115,6 +117,27 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
|
|||
ScanQueryMatcher matcher, byte[] startRow, boolean includeStartRow, byte[] stopRow,
|
||||
boolean includeStopRow, long readPt) throws IOException;
|
||||
|
||||
/**
|
||||
* Recreates the scanners on the current list of active store file scanners
|
||||
* @param currentFileScanners the current set of active store file scanners
|
||||
* @param cacheBlocks cache the blocks or not
|
||||
* @param usePread use pread or not
|
||||
* @param isCompaction is the scanner for compaction
|
||||
* @param matcher the scan query matcher
|
||||
* @param startRow the scan's start row
|
||||
* @param includeStartRow should the scan include the start row
|
||||
* @param stopRow the scan's stop row
|
||||
* @param includeStopRow should the scan include the stop row
|
||||
* @param readPt the read point of the current scane
|
||||
* @param includeMemstoreScanner whether the current scanner should include memstorescanner
|
||||
* @return list of scanners recreated on the current Scanners
|
||||
* @throws IOException
|
||||
*/
|
||||
List<KeyValueScanner> recreateScanners(List<KeyValueScanner> currentFileScanners,
|
||||
boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher,
|
||||
byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
|
||||
boolean includeMemstoreScanner) throws IOException;
|
||||
|
||||
/**
|
||||
* Create scanners on the given files and if needed on the memstore with no filtering based on TTL
|
||||
* (that happens further down the line).
|
||||
|
@ -366,6 +389,11 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
|
|||
*/
|
||||
int getStorefilesCount();
|
||||
|
||||
/**
|
||||
* @return Count of compacted store files
|
||||
*/
|
||||
int getCompactedFilesCount();
|
||||
|
||||
/**
|
||||
* @return Max age of store files in this store
|
||||
*/
|
||||
|
|
|
@ -103,6 +103,12 @@ public interface StoreFileManager {
|
|||
*/
|
||||
int getStorefileCount();
|
||||
|
||||
/**
|
||||
* Returns the number of compacted files.
|
||||
* @return The number of files.
|
||||
*/
|
||||
int getCompactedFilesCount();
|
||||
|
||||
/**
|
||||
* Gets the store files to scan for a Scan or Get request.
|
||||
* @param startRow Start row of the request.
|
||||
|
|
|
@ -966,7 +966,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
|||
return heap.reseek(kv);
|
||||
}
|
||||
|
||||
private void trySwitchToStreamRead() {
|
||||
@VisibleForTesting
|
||||
void trySwitchToStreamRead() {
|
||||
if (readType != Scan.ReadType.DEFAULT || !scanUsePread || closing || heap.peek() == null ||
|
||||
bytesRead < preadMaxBytes) {
|
||||
return;
|
||||
|
@ -977,34 +978,27 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
|||
}
|
||||
scanUsePread = false;
|
||||
Cell lastTop = heap.peek();
|
||||
Map<String, StoreFile> name2File = new HashMap<>(store.getStorefilesCount());
|
||||
for (StoreFile file : store.getStorefiles()) {
|
||||
name2File.put(file.getFileInfo().getActiveFileName(), file);
|
||||
}
|
||||
List<StoreFile> filesToReopen = new ArrayList<>();
|
||||
List<KeyValueScanner> memstoreScanners = new ArrayList<>();
|
||||
List<KeyValueScanner> scannersToClose = new ArrayList<>();
|
||||
for (KeyValueScanner kvs : currentScanners) {
|
||||
if (!kvs.isFileScanner()) {
|
||||
// collect memstorescanners here
|
||||
memstoreScanners.add(kvs);
|
||||
} else {
|
||||
scannersToClose.add(kvs);
|
||||
if (kvs.peek() == null) {
|
||||
continue;
|
||||
}
|
||||
filesToReopen.add(name2File.get(kvs.getFilePath().getName()));
|
||||
}
|
||||
}
|
||||
if (filesToReopen.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
List<KeyValueScanner> fileScanners = null;
|
||||
List<KeyValueScanner> newCurrentScanners;
|
||||
KeyValueHeap newHeap;
|
||||
try {
|
||||
fileScanners =
|
||||
store.getScanners(filesToReopen, cacheBlocks, false, false, matcher, scan.getStartRow(),
|
||||
scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), readPt, false);
|
||||
// recreate the scanners on the current file scanners
|
||||
fileScanners = store.recreateScanners(scannersToClose, cacheBlocks, false, false,
|
||||
matcher, scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(),
|
||||
scan.includeStopRow(), readPt, false);
|
||||
if (fileScanners == null) {
|
||||
return;
|
||||
}
|
||||
seekScanners(fileScanners, lastTop, false, parallelSeekEnabled);
|
||||
newCurrentScanners = new ArrayList<>(fileScanners.size() + memstoreScanners.size());
|
||||
newCurrentScanners.addAll(fileScanners);
|
||||
|
|
|
@ -146,6 +146,11 @@ public class StripeStoreFileManager
|
|||
return state.allCompactedFilesCached;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getCompactedFilesCount() {
|
||||
return state.allCompactedFilesCached.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void insertNewFiles(Collection<StoreFile> sfs) throws IOException {
|
||||
CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(true);
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Matchers.any;
|
||||
|
@ -45,6 +46,7 @@ import java.util.concurrent.ExecutorService;
|
|||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -104,7 +106,6 @@ import org.junit.rules.TestName;
|
|||
import org.mockito.Mockito;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* Test class for the Store
|
||||
|
@ -161,19 +162,19 @@ public class TestStore {
|
|||
init(methodName, TEST_UTIL.getConfiguration());
|
||||
}
|
||||
|
||||
private void init(String methodName, Configuration conf)
|
||||
private Store init(String methodName, Configuration conf)
|
||||
throws IOException {
|
||||
HColumnDescriptor hcd = new HColumnDescriptor(family);
|
||||
// some of the tests write 4 versions and then flush
|
||||
// (with HBASE-4241, lower versions are collected on flush)
|
||||
hcd.setMaxVersions(4);
|
||||
init(methodName, conf, hcd);
|
||||
return init(methodName, conf, hcd);
|
||||
}
|
||||
|
||||
private void init(String methodName, Configuration conf,
|
||||
private Store init(String methodName, Configuration conf,
|
||||
HColumnDescriptor hcd) throws IOException {
|
||||
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
|
||||
init(methodName, conf, htd, hcd);
|
||||
return init(methodName, conf, htd, hcd);
|
||||
}
|
||||
|
||||
private Store init(String methodName, Configuration conf, HTableDescriptor htd,
|
||||
|
@ -184,6 +185,11 @@ public class TestStore {
|
|||
@SuppressWarnings("deprecation")
|
||||
private Store init(String methodName, Configuration conf, HTableDescriptor htd,
|
||||
HColumnDescriptor hcd, MyScannerHook hook) throws IOException {
|
||||
return init(methodName, conf, htd, hcd, hook, false);
|
||||
}
|
||||
@SuppressWarnings("deprecation")
|
||||
private Store init(String methodName, Configuration conf, HTableDescriptor htd,
|
||||
HColumnDescriptor hcd, MyScannerHook hook, boolean switchToPread) throws IOException {
|
||||
//Setting up a Store
|
||||
Path basedir = new Path(DIR+methodName);
|
||||
Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName());
|
||||
|
@ -198,7 +204,8 @@ public class TestStore {
|
|||
} else {
|
||||
htd.addFamily(hcd);
|
||||
}
|
||||
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null);
|
||||
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false,
|
||||
MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null);
|
||||
HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
|
||||
final Configuration walConf = new Configuration(conf);
|
||||
FSUtils.setRootDir(walConf, basedir);
|
||||
|
@ -208,7 +215,7 @@ public class TestStore {
|
|||
if (hook == null) {
|
||||
store = new HStore(region, hcd, conf);
|
||||
} else {
|
||||
store = new MyStore(region, hcd, conf, hook);
|
||||
store = new MyStore(region, hcd, conf, hook, switchToPread);
|
||||
}
|
||||
return store;
|
||||
}
|
||||
|
@ -833,9 +840,10 @@ public class TestStore {
|
|||
|
||||
public static class DummyStoreEngine extends DefaultStoreEngine {
|
||||
public static DefaultCompactor lastCreatedCompactor = null;
|
||||
|
||||
@Override
|
||||
protected void createComponents(
|
||||
Configuration conf, Store store, CellComparator comparator) throws IOException {
|
||||
protected void createComponents(Configuration conf, Store store, CellComparator comparator)
|
||||
throws IOException {
|
||||
super.createComponents(conf, store, comparator);
|
||||
lastCreatedCompactor = this.compactor;
|
||||
}
|
||||
|
@ -1039,6 +1047,13 @@ public class TestStore {
|
|||
return c;
|
||||
}
|
||||
|
||||
private Cell createCell(byte[] row, byte[] qualifier, long ts, long sequenceId, byte[] value)
|
||||
throws IOException {
|
||||
Cell c = CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put.getCode(), value);
|
||||
CellUtil.setSequenceId(c, sequenceId);
|
||||
return c;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
|
@ -1269,35 +1284,130 @@ public class TestStore {
|
|||
storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
|
||||
}
|
||||
|
||||
private MyStore initMyStore(String methodName, Configuration conf, MyScannerHook hook) throws IOException {
|
||||
private MyStore initMyStore(String methodName, Configuration conf, MyScannerHook hook)
|
||||
throws IOException {
|
||||
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
|
||||
HColumnDescriptor hcd = new HColumnDescriptor(family);
|
||||
hcd.setMaxVersions(5);
|
||||
return (MyStore) init(methodName, conf, htd, hcd, hook);
|
||||
}
|
||||
|
||||
private static class MyStore extends HStore {
|
||||
class MyStore extends HStore {
|
||||
private final MyScannerHook hook;
|
||||
MyStore(final HRegion region, final HColumnDescriptor family,
|
||||
final Configuration confParam, MyScannerHook hook) throws IOException {
|
||||
|
||||
MyStore(final HRegion region, final HColumnDescriptor family, final Configuration confParam,
|
||||
MyScannerHook hook, boolean switchToPread) throws IOException {
|
||||
super(region, family, confParam);
|
||||
this.hook = hook;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<KeyValueScanner> getScanners(List<StoreFile> files, boolean cacheBlocks,
|
||||
boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
|
||||
boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
|
||||
boolean includeMemstoreScanner) throws IOException {
|
||||
boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
|
||||
boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
|
||||
boolean includeMemstoreScanner) throws IOException {
|
||||
hook.hook(this);
|
||||
return super.getScanners(files, cacheBlocks, usePread,
|
||||
isCompaction, matcher, startRow, true, stopRow, false, readPt, includeMemstoreScanner);
|
||||
return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true,
|
||||
stopRow, false, readPt, includeMemstoreScanner);
|
||||
}
|
||||
}
|
||||
private interface MyScannerHook {
|
||||
void hook(MyStore store) throws IOException;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSwitchingPreadtoStreamParallelyWithCompactionDischarger() throws Exception {
|
||||
int flushSize = 500;
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName());
|
||||
conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0);
|
||||
// Set the lower threshold to invoke the "MERGE" policy
|
||||
HColumnDescriptor hcd = new HColumnDescriptor(family);
|
||||
hcd.setInMemoryCompaction(MemoryCompactionPolicy.BASIC);
|
||||
MyStore store = initMyStore(name.getMethodName(), conf, new MyScannerHook() {
|
||||
@Override
|
||||
public void hook(org.apache.hadoop.hbase.regionserver.TestStore.MyStore store)
|
||||
throws IOException {
|
||||
}
|
||||
});
|
||||
MemstoreSize memStoreSize = new MemstoreSize();
|
||||
long ts = System.currentTimeMillis();
|
||||
long seqID = 1l;
|
||||
// Add some data to the region and do some flushes
|
||||
for (int i = 1; i < 10; i++) {
|
||||
store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
|
||||
memStoreSize);
|
||||
}
|
||||
// flush them
|
||||
flushStore(store, seqID);
|
||||
for (int i = 11; i < 20; i++) {
|
||||
store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
|
||||
memStoreSize);
|
||||
}
|
||||
// flush them
|
||||
flushStore(store, seqID);
|
||||
for (int i = 21; i < 30; i++) {
|
||||
store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
|
||||
memStoreSize);
|
||||
}
|
||||
// flush them
|
||||
flushStore(store, seqID);
|
||||
|
||||
assertEquals(3, store.getStorefilesCount());
|
||||
ScanInfo scanInfo = store.getScanInfo();
|
||||
Scan scan = new Scan();
|
||||
scan.addFamily(family);
|
||||
Collection<StoreFile> storefiles2 = store.getStorefiles();
|
||||
ArrayList<StoreFile> actualStorefiles = Lists.newArrayList(storefiles2);
|
||||
StoreScanner storeScanner =
|
||||
(StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE);
|
||||
// get the current heap
|
||||
KeyValueHeap heap = storeScanner.heap;
|
||||
// create more store files
|
||||
for (int i = 31; i < 40; i++) {
|
||||
store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
|
||||
memStoreSize);
|
||||
}
|
||||
// flush them
|
||||
flushStore(store, seqID);
|
||||
|
||||
for (int i = 41; i < 50; i++) {
|
||||
store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
|
||||
memStoreSize);
|
||||
}
|
||||
// flush them
|
||||
flushStore(store, seqID);
|
||||
storefiles2 = store.getStorefiles();
|
||||
ArrayList<StoreFile> actualStorefiles1 = Lists.newArrayList(storefiles2);
|
||||
actualStorefiles1.removeAll(actualStorefiles);
|
||||
// Do compaction
|
||||
List<Exception> exceptions = new ArrayList<Exception>();
|
||||
MyThread thread = new MyThread(storeScanner);
|
||||
thread.start();
|
||||
store.replaceStoreFiles(actualStorefiles, actualStorefiles1);
|
||||
thread.join();
|
||||
KeyValueHeap heap2 = thread.getHeap();
|
||||
assertFalse(heap.equals(heap2));
|
||||
}
|
||||
|
||||
private static class MyThread extends Thread {
|
||||
private StoreScanner scanner;
|
||||
private KeyValueHeap heap;
|
||||
|
||||
public MyThread(StoreScanner scanner) {
|
||||
this.scanner = scanner;
|
||||
}
|
||||
|
||||
public KeyValueHeap getHeap() {
|
||||
return this.heap;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
scanner.trySwitchToStreamRead();
|
||||
heap = scanner.heap;
|
||||
}
|
||||
}
|
||||
|
||||
private static class MyMemStoreCompactor extends MemStoreCompactor {
|
||||
private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
|
||||
private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1);
|
||||
|
|
Loading…
Reference in New Issue