From 01f74ac0749a3ebd6a6ed760256e743e06eb8b20 Mon Sep 17 00:00:00 2001 From: chenglei Date: Sat, 10 Dec 2022 11:19:08 +0800 Subject: [PATCH] HBASE-27519 Another case for FNFE on StoreFileScanner after a flush followed by a compaction (#4922) Signed-off-by: Wellington Chevreuil --- .../regionserver/ChangedReadersObserver.java | 8 +- .../hadoop/hbase/regionserver/HStore.java | 30 +++++- .../hadoop/hbase/regionserver/HStoreFile.java | 25 +++++ .../hadoop/hbase/regionserver/TestHStore.java | 102 ++++++++++++++++++ 4 files changed, 159 insertions(+), 6 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java index d45a8046873..9ad93395a7e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java @@ -31,7 +31,13 @@ public interface ChangedReadersObserver { long getReadPoint(); /** - * Notify observers. + * Notify observers.
+ * NOTE:Before we invoke this method,{@link HStoreFile#increaseRefCount} is invoked for every + * {@link HStoreFile} in 'sfs' input parameter to prevent {@link HStoreFile} is archived after a + * concurrent compaction, and after this method is invoked,{@link HStoreFile#decreaseRefCount} is + * invoked.So if you open the {@link StoreFileReader} or {@link StoreFileScanner} asynchronously + * in this method,you may need to invoke {@link HStoreFile#increaseRefCount} or + * {@link HStoreFile#decreaseRefCount} by yourself to prevent the {@link HStoreFile}s be archived. * @param sfs The new files * @param memStoreScanners scanner of current memstore * @throws IOException e diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index a41c8da1607..ec47ebf8da8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -885,15 +885,29 @@ public class HStore return sfs.stream().mapToLong(sf -> sf.getReader().length()).sum(); } - private boolean completeFlush(List sfs, long snapshotId) throws IOException { + private boolean completeFlush(final List sfs, long snapshotId) throws IOException { // NOTE:we should keep clearSnapshot method inside the write lock because clearSnapshot may // close {@link DefaultMemStore#snapshot}, which may be used by // {@link DefaultMemStore#getScanners}. storeEngine.addStoreFiles(sfs, - snapshotId > 0 ? () -> this.memstore.clearSnapshot(snapshotId) : () -> { + // NOTE: here we must increase the refCount for storeFiles because we would open the + // storeFiles and get the StoreFileScanners for them in HStore.notifyChangedReadersObservers. + // If we don't increase the refCount here, HStore.closeAndArchiveCompactedFiles called by + // CompactedHFilesDischarger may archive the storeFiles after a concurrent compaction.Because + // HStore.requestCompaction is under storeEngine lock, so here we increase the refCount under + // storeEngine lock. see HBASE-27519 for more details. + snapshotId > 0 ? () -> { + this.memstore.clearSnapshot(snapshotId); + HStoreFile.increaseStoreFilesRefeCount(sfs); + } : () -> { + HStoreFile.increaseStoreFilesRefeCount(sfs); }); // notify to be called here - only in case of flushes - notifyChangedReadersObservers(sfs); + try { + notifyChangedReadersObservers(sfs); + } finally { + HStoreFile.decreaseStoreFilesRefeCount(sfs); + } if (LOG.isTraceEnabled()) { long totalSize = getTotalSize(sfs); String traceMessage = "FLUSH time,count,size,store size,store files [" @@ -961,7 +975,13 @@ public class HStore storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScan(startRow, includeStartRow, stopRow, includeStopRow); memStoreScanners = this.memstore.getScanners(readPt); - storeFilesToScan.stream().forEach(f -> f.getFileInfo().refCount.incrementAndGet()); + // NOTE: here we must increase the refCount for storeFiles because we would open the + // storeFiles and get the StoreFileScanners for them.If we don't increase the refCount here, + // HStore.closeAndArchiveCompactedFiles called by CompactedHFilesDischarger may archive the + // storeFiles after a concurrent compaction.Because HStore.requestCompaction is under + // storeEngine lock, so here we increase the refCount under storeEngine lock. see HBASE-27484 + // for more details. + HStoreFile.increaseStoreFilesRefeCount(storeFilesToScan); } finally { this.storeEngine.readUnlock(); } @@ -982,7 +1002,7 @@ public class HStore clearAndClose(memStoreScanners); throw t instanceof IOException ? (IOException) t : new IOException(t); } finally { - storeFilesToScan.stream().forEach(f -> f.getFileInfo().refCount.decrementAndGet()); + HStoreFile.decreaseStoreFilesRefeCount(storeFilesToScan); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java index 2b1acb86400..58d97a8743d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Map; @@ -48,6 +49,8 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; + import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; /** @@ -648,4 +651,26 @@ public class HStoreFile implements StoreFile { Set getCompactedStoreFiles() { return Collections.unmodifiableSet(this.compactedStoreFiles); } + + long increaseRefCount() { + return this.fileInfo.refCount.incrementAndGet(); + } + + long decreaseRefCount() { + return this.fileInfo.refCount.decrementAndGet(); + } + + static void increaseStoreFilesRefeCount(Collection storeFiles) { + if (CollectionUtils.isEmpty(storeFiles)) { + return; + } + storeFiles.forEach(HStoreFile::increaseRefCount); + } + + static void decreaseStoreFilesRefeCount(Collection storeFiles) { + if (CollectionUtils.isEmpty(storeFiles)) { + return; + } + storeFiles.forEach(HStoreFile::decreaseRefCount); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java index a052520e54d..86187172569 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java @@ -31,6 +31,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.io.FileNotFoundException; import java.io.IOException; import java.lang.ref.SoftReference; import java.security.PrivilegedExceptionAction; @@ -44,6 +45,7 @@ import java.util.ListIterator; import java.util.NavigableSet; import java.util.Optional; import java.util.TreeSet; +import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; @@ -1531,6 +1533,106 @@ public class TestHStore { } } + /** + * This test is for HBASE-27519, when the {@link StoreScanner} is scanning,the Flush and the + * Compaction execute concurrently and theCcompaction compact and archive the flushed + * {@link HStoreFile} which is used by {@link StoreScanner#updateReaders}.Before + * HBASE-27519,{@link StoreScanner.updateReaders} would throw {@link FileNotFoundException}. + */ + @Test + public void testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently() throws IOException { + Configuration conf = HBaseConfiguration.create(); + conf.setBoolean(WALFactory.WAL_ENABLED, false); + conf.set(DEFAULT_COMPACTION_POLICY_CLASS_KEY, EverythingPolicy.class.getName()); + byte[] r0 = Bytes.toBytes("row0"); + byte[] r1 = Bytes.toBytes("row1"); + final CyclicBarrier cyclicBarrier = new CyclicBarrier(2); + final AtomicBoolean shouldWaitRef = new AtomicBoolean(false); + // Initialize region + final MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook() { + @Override + public void getScanners(MyStore store) throws IOException { + try { + // Here this method is called by StoreScanner.updateReaders which is invoked by the + // following TestHStore.flushStore + if (shouldWaitRef.get()) { + // wait the following compaction Task start + cyclicBarrier.await(); + // wait the following HStore.closeAndArchiveCompactedFiles end. + cyclicBarrier.await(); + } + } catch (BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + } + }); + + final AtomicReference compactionExceptionRef = new AtomicReference(null); + Runnable compactionTask = () -> { + try { + // Only when the StoreScanner.updateReaders invoked by TestHStore.flushStore prepares for + // entering the MyStore.getScanners, compactionTask could start. + cyclicBarrier.await(); + region.compactStore(family, new NoLimitThroughputController()); + myStore.closeAndArchiveCompactedFiles(); + // Notify StoreScanner.updateReaders could enter MyStore.getScanners. + cyclicBarrier.await(); + } catch (Throwable e) { + compactionExceptionRef.set(e); + } + }; + + long ts = EnvironmentEdgeManager.currentTime(); + long seqId = 100; + byte[] value = Bytes.toBytes("value"); + // older data whihc shouldn't be "seen" by client + myStore.add(createCell(r0, qf1, ts, seqId, value), null); + flushStore(myStore, id++); + myStore.add(createCell(r0, qf2, ts, seqId, value), null); + flushStore(myStore, id++); + myStore.add(createCell(r0, qf3, ts, seqId, value), null); + TreeSet quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); + quals.add(qf1); + quals.add(qf2); + quals.add(qf3); + + myStore.add(createCell(r1, qf1, ts, seqId, value), null); + myStore.add(createCell(r1, qf2, ts, seqId, value), null); + myStore.add(createCell(r1, qf3, ts, seqId, value), null); + + Thread.currentThread() + .setName("testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently thread"); + Scan scan = new Scan(); + scan.withStartRow(r0, true); + try (InternalScanner scanner = (InternalScanner) myStore.getScanner(scan, quals, seqId)) { + List results = new MyList<>(size -> { + switch (size) { + case 1: + shouldWaitRef.set(true); + Thread thread = new Thread(compactionTask); + thread.setName("MyCompacting Thread."); + thread.start(); + try { + flushStore(myStore, id++); + thread.join(); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + shouldWaitRef.set(false); + break; + default: + break; + } + }); + // Before HBASE-27519, here would throw java.io.FileNotFoundException because the storeFile + // which used by StoreScanner.updateReaders is deleted by compactionTask. + scanner.next(results); + // The results is r0 row cells. + assertEquals(3, results.size()); + assertTrue(compactionExceptionRef.get() == null); + } + } + @Test public void testReclaimChunkWhenScaning() throws IOException { init("testReclaimChunkWhenScaning");