HBASE-27484 FNFE on StoreFileScanner after a flush followed by a compaction (#4882)
Signed-off-by: Peter Somogyi <psomogyi@apache.org>
This commit is contained in:
parent
843c6fcbc1
commit
b01fad97fb
|
@ -961,10 +961,10 @@ public class HStore
|
|||
storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScan(startRow,
|
||||
includeStartRow, stopRow, includeStopRow);
|
||||
memStoreScanners = this.memstore.getScanners(readPt);
|
||||
storeFilesToScan.stream().forEach(f -> f.getFileInfo().refCount.incrementAndGet());
|
||||
} finally {
|
||||
this.storeEngine.readUnlock();
|
||||
}
|
||||
|
||||
try {
|
||||
// First the store file scanners
|
||||
|
||||
|
@ -981,6 +981,8 @@ public class HStore
|
|||
} catch (Throwable t) {
|
||||
clearAndClose(memStoreScanners);
|
||||
throw t instanceof IOException ? (IOException) t : new IOException(t);
|
||||
} finally {
|
||||
storeFilesToScan.stream().forEach(f -> f.getFileInfo().refCount.decrementAndGet());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import static org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY;
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
|
@ -41,12 +42,14 @@ import java.util.Iterator;
|
|||
import java.util.List;
|
||||
import java.util.ListIterator;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.Optional;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.ConcurrentSkipListSet;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
@ -104,7 +107,9 @@ import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl;
|
|||
import org.apache.hadoop.hbase.regionserver.ChunkCreator.ChunkType;
|
||||
import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.EverythingPolicy;
|
||||
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
|
||||
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
|
||||
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
||||
|
@ -1114,6 +1119,68 @@ public class TestHStore {
|
|||
verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testScanWithCompactionAfterFlush() throws Exception {
|
||||
TEST_UTIL.getConfiguration().set(DEFAULT_COMPACTION_POLICY_CLASS_KEY,
|
||||
EverythingPolicy.class.getName());
|
||||
init(name.getMethodName());
|
||||
|
||||
assertEquals(0, this.store.getStorefilesCount());
|
||||
|
||||
KeyValue kv = new KeyValue(row, family, qf1, 1, (byte[]) null);
|
||||
// add some data, flush
|
||||
this.store.add(kv, null);
|
||||
flush(1);
|
||||
kv = new KeyValue(row, family, qf2, 1, (byte[]) null);
|
||||
// add some data, flush
|
||||
this.store.add(kv, null);
|
||||
flush(2);
|
||||
kv = new KeyValue(row, family, qf3, 1, (byte[]) null);
|
||||
// add some data, flush
|
||||
this.store.add(kv, null);
|
||||
flush(3);
|
||||
|
||||
ExecutorService service = Executors.newFixedThreadPool(2);
|
||||
|
||||
Scan scan = new Scan(new Get(row));
|
||||
Future<KeyValueScanner> scanFuture = service.submit(() -> {
|
||||
try {
|
||||
LOG.info(">>>> creating scanner");
|
||||
return this.store.createScanner(scan,
|
||||
new ScanInfo(HBaseConfiguration.create(),
|
||||
ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build(),
|
||||
Long.MAX_VALUE, 0, CellComparator.getInstance()),
|
||||
scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()), 0);
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
return null;
|
||||
}
|
||||
});
|
||||
Future compactFuture = service.submit(() -> {
|
||||
try {
|
||||
LOG.info(">>>>>> starting compaction");
|
||||
Optional<CompactionContext> opCompaction = this.store.requestCompaction();
|
||||
assertTrue(opCompaction.isPresent());
|
||||
store.compact(opCompaction.get(), new NoLimitThroughputController(), User.getCurrent());
|
||||
LOG.info(">>>>>> Compaction is finished");
|
||||
this.store.closeAndArchiveCompactedFiles();
|
||||
LOG.info(">>>>>> Compacted files deleted");
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
});
|
||||
|
||||
KeyValueScanner kvs = scanFuture.get();
|
||||
compactFuture.get();
|
||||
((StoreScanner) kvs).currentScanners.forEach(s -> {
|
||||
if (s instanceof StoreFileScanner) {
|
||||
assertEquals(1, ((StoreFileScanner) s).getReader().getRefCount());
|
||||
}
|
||||
});
|
||||
kvs.seek(kv);
|
||||
service.shutdownNow();
|
||||
}
|
||||
|
||||
private long countMemStoreScanner(StoreScanner scanner) {
|
||||
if (scanner.currentScanners == null) {
|
||||
return 0;
|
||||
|
|
Loading…
Reference in New Issue