HBASE-27484 FNFE on StoreFileScanner after a flush followed by a compaction (#4882)

Signed-off-by: Peter Somogyi <psomogyi@apache.org>
This commit is contained in:
Wellington Ramos Chevreuil 2022-11-25 21:58:23 +00:00 committed by GitHub
parent 1ddb5bb43c
commit a5ff289d7a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 70 additions and 1 deletions

View File

@ -961,10 +961,10 @@ public class HStore
storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScan(startRow,
includeStartRow, stopRow, includeStopRow);
memStoreScanners = this.memstore.getScanners(readPt);
storeFilesToScan.stream().forEach(f -> f.getFileInfo().refCount.incrementAndGet());
} finally {
this.storeEngine.readUnlock();
}
try {
// First the store file scanners
@ -981,6 +981,8 @@ public class HStore
} catch (Throwable t) {
clearAndClose(memStoreScanners);
throw t instanceof IOException ? (IOException) t : new IOException(t);
} finally {
storeFilesToScan.stream().forEach(f -> f.getFileInfo().refCount.decrementAndGet());
}
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -41,12 +42,14 @@ import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -103,7 +106,9 @@ import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl;
import org.apache.hadoop.hbase.regionserver.ChunkCreator.ChunkType;
import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.compactions.EverythingPolicy;
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
@ -1113,6 +1118,68 @@ public class TestHStore {
verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any(), any());
}
@Test
public void testScanWithCompactionAfterFlush() throws Exception {
TEST_UTIL.getConfiguration().set(DEFAULT_COMPACTION_POLICY_CLASS_KEY,
EverythingPolicy.class.getName());
init(name.getMethodName());
assertEquals(0, this.store.getStorefilesCount());
KeyValue kv = new KeyValue(row, family, qf1, 1, (byte[]) null);
// add some data, flush
this.store.add(kv, null);
flush(1);
kv = new KeyValue(row, family, qf2, 1, (byte[]) null);
// add some data, flush
this.store.add(kv, null);
flush(2);
kv = new KeyValue(row, family, qf3, 1, (byte[]) null);
// add some data, flush
this.store.add(kv, null);
flush(3);
ExecutorService service = Executors.newFixedThreadPool(2);
Scan scan = new Scan(new Get(row));
Future<KeyValueScanner> scanFuture = service.submit(() -> {
try {
LOG.info(">>>> creating scanner");
return this.store.createScanner(scan,
new ScanInfo(HBaseConfiguration.create(),
ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build(),
Long.MAX_VALUE, 0, CellComparator.getInstance()),
scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()), 0);
} catch (IOException e) {
e.printStackTrace();
return null;
}
});
Future compactFuture = service.submit(() -> {
try {
LOG.info(">>>>>> starting compaction");
Optional<CompactionContext> opCompaction = this.store.requestCompaction();
assertTrue(opCompaction.isPresent());
store.compact(opCompaction.get(), new NoLimitThroughputController(), User.getCurrent());
LOG.info(">>>>>> Compaction is finished");
this.store.closeAndArchiveCompactedFiles();
LOG.info(">>>>>> Compacted files deleted");
} catch (IOException e) {
e.printStackTrace();
}
});
KeyValueScanner kvs = scanFuture.get();
compactFuture.get();
((StoreScanner) kvs).currentScanners.forEach(s -> {
if (s instanceof StoreFileScanner) {
assertEquals(1, ((StoreFileScanner) s).getReader().getRefCount());
}
});
kvs.seek(kv);
service.shutdownNow();
}
private long countMemStoreScanner(StoreScanner scanner) {
if (scanner.currentScanners == null) {
return 0;