diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java index f4f9aa68827..915d62fdaf1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java @@ -27,10 +27,6 @@ import java.util.Iterator; import java.util.List; import java.util.Optional; -import com.google.common.collect.ImmutableCollection; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -40,6 +36,10 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; +import com.google.common.collect.ImmutableCollection; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + /** * Default implementation of StoreFileManager. Not thread-safe. */ @@ -116,6 +116,14 @@ class DefaultStoreFileManager implements StoreFileManager { return storefiles.size(); } + @Override + public final int getCompactedFilesCount() { + if (compactedfiles == null) { + return 0; + } + return compactedfiles.size(); + } + @Override public void addCompactionResults( Collection newCompactedfiles, Collection results) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 721299e545b..03c31af0111 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -28,6 +28,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.NavigableSet; import java.util.Set; import java.util.concurrent.Callable; @@ -726,6 +727,11 @@ public class HStore implements Store { return this.storeEngine.getStoreFileManager().getStorefiles(); } + @Override + public Collection getCompactedFiles() { + return this.storeEngine.getStoreFileManager().getCompactedfiles(); + } + /** * This throws a WrongRegionException if the HFile does not fit in this region, or an * InvalidHFileException if the HFile is not valid. @@ -1926,6 +1932,41 @@ public class HStore implements Store { return scanner; } + @Override + public List recreateScanners(List currentFileScanners, + boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, + byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, + boolean includeMemstoreScanner) throws IOException { + this.lock.readLock().lock(); + try { + Map name2File = + new HashMap<>(getStorefilesCount() + getCompactedFilesCount()); + for (StoreFile file : getStorefiles()) { + name2File.put(file.getFileInfo().getActiveFileName(), file); + } + if (getCompactedFiles() != null) { + for (StoreFile file : getCompactedFiles()) { + name2File.put(file.getFileInfo().getActiveFileName(), file); + } + } + List filesToReopen = new ArrayList<>(); + for (KeyValueScanner kvs : currentFileScanners) { + assert kvs.isFileScanner(); + if (kvs.peek() == null) { + continue; + } + filesToReopen.add(name2File.get(kvs.getFilePath().getName())); + } + if (filesToReopen.isEmpty()) { + return null; + } + return getScanners(filesToReopen, cacheBlocks, false, false, matcher, startRow, + includeStartRow, stopRow, includeStopRow, readPt, false); + } finally { + this.lock.readLock().unlock(); + } + } + @Override public String toString() { return this.getColumnFamilyName(); @@ -1936,6 +1977,11 @@ public class HStore implements Store { return this.storeEngine.getStoreFileManager().getStorefileCount(); } + @Override + public int getCompactedFilesCount() { + return this.storeEngine.getStoreFileManager().getCompactedFilesCount(); + } + @Override public long getMaxStoreFileAge() { long earliestTS = Long.MAX_VALUE; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 76595f3622f..766b562c477 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -61,6 +61,8 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf Collection getStorefiles(); + Collection getCompactedFiles(); + /** * Close all the readers We don't need to worry about subsequent requests because the Region * holds a write lock that will prevent any more reads or writes. @@ -115,6 +117,27 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf ScanQueryMatcher matcher, byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt) throws IOException; + /** + * Recreates the scanners on the current list of active store file scanners + * @param currentFileScanners the current set of active store file scanners + * @param cacheBlocks cache the blocks or not + * @param usePread use pread or not + * @param isCompaction is the scanner for compaction + * @param matcher the scan query matcher + * @param startRow the scan's start row + * @param includeStartRow should the scan include the start row + * @param stopRow the scan's stop row + * @param includeStopRow should the scan include the stop row + * @param readPt the read point of the current scane + * @param includeMemstoreScanner whether the current scanner should include memstorescanner + * @return list of scanners recreated on the current Scanners + * @throws IOException + */ + List recreateScanners(List currentFileScanners, + boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, + byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, + boolean includeMemstoreScanner) throws IOException; + /** * Create scanners on the given files and if needed on the memstore with no filtering based on TTL * (that happens further down the line). @@ -366,6 +389,11 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf */ int getStorefilesCount(); + /** + * @return Count of compacted store files + */ + int getCompactedFilesCount(); + /** * @return Max age of store files in this store */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java index 933849cfb0f..eb760edd80c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java @@ -103,6 +103,12 @@ public interface StoreFileManager { */ int getStorefileCount(); + /** + * Returns the number of compacted files. + * @return The number of files. + */ + int getCompactedFilesCount(); + /** * Gets the store files to scan for a Scan or Get request. * @param startRow Start row of the request. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 9849c9363eb..11301d865b6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -966,7 +966,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner return heap.reseek(kv); } - private void trySwitchToStreamRead() { + @VisibleForTesting + void trySwitchToStreamRead() { if (readType != Scan.ReadType.DEFAULT || !scanUsePread || closing || heap.peek() == null || bytesRead < preadMaxBytes) { return; @@ -977,34 +978,27 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } scanUsePread = false; Cell lastTop = heap.peek(); - Map name2File = new HashMap<>(store.getStorefilesCount()); - for (StoreFile file : store.getStorefiles()) { - name2File.put(file.getFileInfo().getActiveFileName(), file); - } - List filesToReopen = new ArrayList<>(); List memstoreScanners = new ArrayList<>(); List scannersToClose = new ArrayList<>(); for (KeyValueScanner kvs : currentScanners) { if (!kvs.isFileScanner()) { + // collect memstorescanners here memstoreScanners.add(kvs); } else { scannersToClose.add(kvs); - if (kvs.peek() == null) { - continue; - } - filesToReopen.add(name2File.get(kvs.getFilePath().getName())); } } - if (filesToReopen.isEmpty()) { - return; - } List fileScanners = null; List newCurrentScanners; KeyValueHeap newHeap; try { - fileScanners = - store.getScanners(filesToReopen, cacheBlocks, false, false, matcher, scan.getStartRow(), - scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), readPt, false); + // recreate the scanners on the current file scanners + fileScanners = store.recreateScanners(scannersToClose, cacheBlocks, false, false, + matcher, scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), + scan.includeStopRow(), readPt, false); + if (fileScanners == null) { + return; + } seekScanners(fileScanners, lastTop, false, parallelSeekEnabled); newCurrentScanners = new ArrayList<>(fileScanners.size() + memstoreScanners.size()); newCurrentScanners.addAll(fileScanners); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java index 3c7469eccb1..18a6eec99fd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java @@ -146,6 +146,11 @@ public class StripeStoreFileManager return state.allCompactedFilesCached; } + @Override + public int getCompactedFilesCount() { + return state.allCompactedFilesCached.size(); + } + @Override public void insertNewFiles(Collection sfs) throws IOException { CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(true); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index 22539c57f6b..2318414d983 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; @@ -45,6 +46,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import org.apache.commons.logging.Log; @@ -104,7 +106,6 @@ import org.junit.rules.TestName; import org.mockito.Mockito; import com.google.common.collect.Lists; -import java.util.concurrent.atomic.AtomicInteger; /** * Test class for the Store @@ -161,19 +162,19 @@ public class TestStore { init(methodName, TEST_UTIL.getConfiguration()); } - private void init(String methodName, Configuration conf) + private Store init(String methodName, Configuration conf) throws IOException { HColumnDescriptor hcd = new HColumnDescriptor(family); // some of the tests write 4 versions and then flush // (with HBASE-4241, lower versions are collected on flush) hcd.setMaxVersions(4); - init(methodName, conf, hcd); + return init(methodName, conf, hcd); } - private void init(String methodName, Configuration conf, + private Store init(String methodName, Configuration conf, HColumnDescriptor hcd) throws IOException { HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table)); - init(methodName, conf, htd, hcd); + return init(methodName, conf, htd, hcd); } private Store init(String methodName, Configuration conf, HTableDescriptor htd, @@ -184,6 +185,11 @@ public class TestStore { @SuppressWarnings("deprecation") private Store init(String methodName, Configuration conf, HTableDescriptor htd, HColumnDescriptor hcd, MyScannerHook hook) throws IOException { + return init(methodName, conf, htd, hcd, hook, false); + } + @SuppressWarnings("deprecation") + private Store init(String methodName, Configuration conf, HTableDescriptor htd, + HColumnDescriptor hcd, MyScannerHook hook, boolean switchToPread) throws IOException { //Setting up a Store Path basedir = new Path(DIR+methodName); Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName()); @@ -198,7 +204,8 @@ public class TestStore { } else { htd.addFamily(hcd); } - ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null); + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, + MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null); HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); final Configuration walConf = new Configuration(conf); FSUtils.setRootDir(walConf, basedir); @@ -208,7 +215,7 @@ public class TestStore { if (hook == null) { store = new HStore(region, hcd, conf); } else { - store = new MyStore(region, hcd, conf, hook); + store = new MyStore(region, hcd, conf, hook, switchToPread); } return store; } @@ -833,9 +840,10 @@ public class TestStore { public static class DummyStoreEngine extends DefaultStoreEngine { public static DefaultCompactor lastCreatedCompactor = null; + @Override - protected void createComponents( - Configuration conf, Store store, CellComparator comparator) throws IOException { + protected void createComponents(Configuration conf, Store store, CellComparator comparator) + throws IOException { super.createComponents(conf, store, comparator); lastCreatedCompactor = this.compactor; } @@ -1039,6 +1047,13 @@ public class TestStore { return c; } + private Cell createCell(byte[] row, byte[] qualifier, long ts, long sequenceId, byte[] value) + throws IOException { + Cell c = CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put.getCode(), value); + CellUtil.setSequenceId(c, sequenceId); + return c; + } + @Test public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException { Configuration conf = HBaseConfiguration.create(); @@ -1269,35 +1284,130 @@ public class TestStore { storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); } - private MyStore initMyStore(String methodName, Configuration conf, MyScannerHook hook) throws IOException { + private MyStore initMyStore(String methodName, Configuration conf, MyScannerHook hook) + throws IOException { HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table)); HColumnDescriptor hcd = new HColumnDescriptor(family); hcd.setMaxVersions(5); return (MyStore) init(methodName, conf, htd, hcd, hook); } - private static class MyStore extends HStore { + class MyStore extends HStore { private final MyScannerHook hook; - MyStore(final HRegion region, final HColumnDescriptor family, - final Configuration confParam, MyScannerHook hook) throws IOException { + + MyStore(final HRegion region, final HColumnDescriptor family, final Configuration confParam, + MyScannerHook hook, boolean switchToPread) throws IOException { super(region, family, confParam); this.hook = hook; } @Override public List getScanners(List files, boolean cacheBlocks, - boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, - boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, - boolean includeMemstoreScanner) throws IOException { + boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, + boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, + boolean includeMemstoreScanner) throws IOException { hook.hook(this); - return super.getScanners(files, cacheBlocks, usePread, - isCompaction, matcher, startRow, true, stopRow, false, readPt, includeMemstoreScanner); + return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, + stopRow, false, readPt, includeMemstoreScanner); } } private interface MyScannerHook { void hook(MyStore store) throws IOException; } + @Test + public void testSwitchingPreadtoStreamParallelyWithCompactionDischarger() throws Exception { + int flushSize = 500; + Configuration conf = HBaseConfiguration.create(); + conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName()); + conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0); + // Set the lower threshold to invoke the "MERGE" policy + HColumnDescriptor hcd = new HColumnDescriptor(family); + hcd.setInMemoryCompaction(MemoryCompactionPolicy.BASIC); + MyStore store = initMyStore(name.getMethodName(), conf, new MyScannerHook() { + @Override + public void hook(org.apache.hadoop.hbase.regionserver.TestStore.MyStore store) + throws IOException { + } + }); + MemstoreSize memStoreSize = new MemstoreSize(); + long ts = System.currentTimeMillis(); + long seqID = 1l; + // Add some data to the region and do some flushes + for (int i = 1; i < 10; i++) { + store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), + memStoreSize); + } + // flush them + flushStore(store, seqID); + for (int i = 11; i < 20; i++) { + store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), + memStoreSize); + } + // flush them + flushStore(store, seqID); + for (int i = 21; i < 30; i++) { + store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), + memStoreSize); + } + // flush them + flushStore(store, seqID); + + assertEquals(3, store.getStorefilesCount()); + ScanInfo scanInfo = store.getScanInfo(); + Scan scan = new Scan(); + scan.addFamily(family); + Collection storefiles2 = store.getStorefiles(); + ArrayList actualStorefiles = Lists.newArrayList(storefiles2); + StoreScanner storeScanner = + (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE); + // get the current heap + KeyValueHeap heap = storeScanner.heap; + // create more store files + for (int i = 31; i < 40; i++) { + store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), + memStoreSize); + } + // flush them + flushStore(store, seqID); + + for (int i = 41; i < 50; i++) { + store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), + memStoreSize); + } + // flush them + flushStore(store, seqID); + storefiles2 = store.getStorefiles(); + ArrayList actualStorefiles1 = Lists.newArrayList(storefiles2); + actualStorefiles1.removeAll(actualStorefiles); + // Do compaction + List exceptions = new ArrayList(); + MyThread thread = new MyThread(storeScanner); + thread.start(); + store.replaceStoreFiles(actualStorefiles, actualStorefiles1); + thread.join(); + KeyValueHeap heap2 = thread.getHeap(); + assertFalse(heap.equals(heap2)); + } + + private static class MyThread extends Thread { + private StoreScanner scanner; + private KeyValueHeap heap; + + public MyThread(StoreScanner scanner) { + this.scanner = scanner; + } + + public KeyValueHeap getHeap() { + return this.heap; + } + + public void run() { + scanner.trySwitchToStreamRead(); + heap = scanner.heap; + } + } + private static class MyMemStoreCompactor extends MemStoreCompactor { private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0); private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1);