HBASE-27484 FNFE on StoreFileScanner after a flush followed by a compaction (#4882)
Signed-off-by: Peter Somogyi <psomogyi@apache.org>
This commit is contained in:
parent
59f92a8f60
commit
8a76451a9b
|
@ -961,10 +961,10 @@ public class HStore
|
||||||
storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScan(startRow,
|
storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScan(startRow,
|
||||||
includeStartRow, stopRow, includeStopRow);
|
includeStartRow, stopRow, includeStopRow);
|
||||||
memStoreScanners = this.memstore.getScanners(readPt);
|
memStoreScanners = this.memstore.getScanners(readPt);
|
||||||
|
storeFilesToScan.stream().forEach(f -> f.getFileInfo().refCount.incrementAndGet());
|
||||||
} finally {
|
} finally {
|
||||||
this.storeEngine.readUnlock();
|
this.storeEngine.readUnlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// First the store file scanners
|
// First the store file scanners
|
||||||
|
|
||||||
|
@ -981,6 +981,8 @@ public class HStore
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
clearAndClose(memStoreScanners);
|
clearAndClose(memStoreScanners);
|
||||||
throw t instanceof IOException ? (IOException) t : new IOException(t);
|
throw t instanceof IOException ? (IOException) t : new IOException(t);
|
||||||
|
} finally {
|
||||||
|
storeFilesToScan.stream().forEach(f -> f.getFileInfo().refCount.decrementAndGet());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY;
|
||||||
import static org.junit.Assert.assertArrayEquals;
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
|
@ -41,12 +42,14 @@ import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.ListIterator;
|
import java.util.ListIterator;
|
||||||
import java.util.NavigableSet;
|
import java.util.NavigableSet;
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
import java.util.concurrent.ConcurrentSkipListSet;
|
import java.util.concurrent.ConcurrentSkipListSet;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.CyclicBarrier;
|
import java.util.concurrent.CyclicBarrier;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
@ -104,7 +107,9 @@ import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl;
|
||||||
import org.apache.hadoop.hbase.regionserver.ChunkCreator.ChunkType;
|
import org.apache.hadoop.hbase.regionserver.ChunkCreator.ChunkType;
|
||||||
import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action;
|
import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
|
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.compactions.EverythingPolicy;
|
||||||
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
|
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
|
||||||
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
|
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
|
||||||
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
||||||
|
@ -1114,6 +1119,68 @@ public class TestHStore {
|
||||||
verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any());
|
verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testScanWithCompactionAfterFlush() throws Exception {
|
||||||
|
TEST_UTIL.getConfiguration().set(DEFAULT_COMPACTION_POLICY_CLASS_KEY,
|
||||||
|
EverythingPolicy.class.getName());
|
||||||
|
init(name.getMethodName());
|
||||||
|
|
||||||
|
assertEquals(0, this.store.getStorefilesCount());
|
||||||
|
|
||||||
|
KeyValue kv = new KeyValue(row, family, qf1, 1, (byte[]) null);
|
||||||
|
// add some data, flush
|
||||||
|
this.store.add(kv, null);
|
||||||
|
flush(1);
|
||||||
|
kv = new KeyValue(row, family, qf2, 1, (byte[]) null);
|
||||||
|
// add some data, flush
|
||||||
|
this.store.add(kv, null);
|
||||||
|
flush(2);
|
||||||
|
kv = new KeyValue(row, family, qf3, 1, (byte[]) null);
|
||||||
|
// add some data, flush
|
||||||
|
this.store.add(kv, null);
|
||||||
|
flush(3);
|
||||||
|
|
||||||
|
ExecutorService service = Executors.newFixedThreadPool(2);
|
||||||
|
|
||||||
|
Scan scan = new Scan(new Get(row));
|
||||||
|
Future<KeyValueScanner> scanFuture = service.submit(() -> {
|
||||||
|
try {
|
||||||
|
LOG.info(">>>> creating scanner");
|
||||||
|
return this.store.createScanner(scan,
|
||||||
|
new ScanInfo(HBaseConfiguration.create(),
|
||||||
|
ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build(),
|
||||||
|
Long.MAX_VALUE, 0, CellComparator.getInstance()),
|
||||||
|
scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()), 0);
|
||||||
|
} catch (IOException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Future compactFuture = service.submit(() -> {
|
||||||
|
try {
|
||||||
|
LOG.info(">>>>>> starting compaction");
|
||||||
|
Optional<CompactionContext> opCompaction = this.store.requestCompaction();
|
||||||
|
assertTrue(opCompaction.isPresent());
|
||||||
|
store.compact(opCompaction.get(), new NoLimitThroughputController(), User.getCurrent());
|
||||||
|
LOG.info(">>>>>> Compaction is finished");
|
||||||
|
this.store.closeAndArchiveCompactedFiles();
|
||||||
|
LOG.info(">>>>>> Compacted files deleted");
|
||||||
|
} catch (IOException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
KeyValueScanner kvs = scanFuture.get();
|
||||||
|
compactFuture.get();
|
||||||
|
((StoreScanner) kvs).currentScanners.forEach(s -> {
|
||||||
|
if (s instanceof StoreFileScanner) {
|
||||||
|
assertEquals(1, ((StoreFileScanner) s).getReader().getRefCount());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
kvs.seek(kv);
|
||||||
|
service.shutdownNow();
|
||||||
|
}
|
||||||
|
|
||||||
private long countMemStoreScanner(StoreScanner scanner) {
|
private long countMemStoreScanner(StoreScanner scanner) {
|
||||||
if (scanner.currentScanners == null) {
|
if (scanner.currentScanners == null) {
|
||||||
return 0;
|
return 0;
|
||||||
|
|
Loading…
Reference in New Issue