diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java
index d45a8046873..9ad93395a7e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java
@@ -31,7 +31,13 @@ public interface ChangedReadersObserver {
long getReadPoint();
/**
- * Notify observers.
+ * Notify observers.
+ * NOTE:Before we invoke this method,{@link HStoreFile#increaseRefCount} is invoked for every
+ * {@link HStoreFile} in 'sfs' input parameter to prevent {@link HStoreFile} is archived after a
+ * concurrent compaction, and after this method is invoked,{@link HStoreFile#decreaseRefCount} is
+ * invoked.So if you open the {@link StoreFileReader} or {@link StoreFileScanner} asynchronously
+ * in this method,you may need to invoke {@link HStoreFile#increaseRefCount} or
+ * {@link HStoreFile#decreaseRefCount} by yourself to prevent the {@link HStoreFile}s be archived.
* @param sfs The new files
* @param memStoreScanners scanner of current memstore
* @throws IOException e
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index fc3dfd340ac..b132a153163 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -1195,6 +1195,13 @@ public class HStore
if (snapshotId > 0) {
this.memstore.clearSnapshot(snapshotId);
}
+ // NOTE: here we must increase the refCount for storeFiles because we would open the
+ // storeFiles and get the StoreFileScanners for them in HStore.notifyChangedReadersObservers.
+ // If we don't increase the refCount here, HStore.closeAndArchiveCompactedFiles called by
+ // CompactedHFilesDischarger may archive the storeFiles after a concurrent compaction.Because
+ // HStore.requestCompaction is under storeEngine lock, so here we increase the refCount under
+ // storeEngine lock. see HBASE-27519 for more details.
+ HStoreFile.increaseStoreFilesRefeCount(sfs);
} finally {
// We need the lock, as long as we are updating the storeFiles
// or changing the memstore. Let us release it before calling
@@ -1204,8 +1211,12 @@ public class HStore
this.lock.writeLock().unlock();
}
- // notify to be called here - only in case of flushes
- notifyChangedReadersObservers(sfs);
+ try {
+ // notify to be called here - only in case of flushes
+ notifyChangedReadersObservers(sfs);
+ } finally {
+ HStoreFile.decreaseStoreFilesRefeCount(sfs);
+ }
if (LOG.isTraceEnabled()) {
long totalSize = getTotalSize(sfs);
String traceMessage = "FLUSH time,count,size,store size,store files ["
@@ -1273,7 +1284,13 @@ public class HStore
storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScan(startRow,
includeStartRow, stopRow, includeStopRow);
memStoreScanners = this.memstore.getScanners(readPt);
- storeFilesToScan.stream().forEach(f -> f.getFileInfo().refCount.incrementAndGet());
+ // NOTE: here we must increase the refCount for storeFiles because we would open the
+ // storeFiles and get the StoreFileScanners for them.If we don't increase the refCount here,
+ // HStore.closeAndArchiveCompactedFiles called by CompactedHFilesDischarger may archive the
+ // storeFiles after a concurrent compaction.Because HStore.requestCompaction is under
+ // storeEngine lock, so here we increase the refCount under storeEngine lock. see HBASE-27484
+ // for more details.
+ HStoreFile.increaseStoreFilesRefeCount(storeFilesToScan);
} finally {
this.lock.readLock().unlock();
}
@@ -1294,7 +1311,7 @@ public class HStore
clearAndClose(memStoreScanners);
throw t instanceof IOException ? (IOException) t : new IOException(t);
} finally {
- storeFilesToScan.stream().forEach(f -> f.getFileInfo().refCount.decrementAndGet());
+ HStoreFile.decreaseStoreFilesRefeCount(storeFilesToScan);
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
index 29203b26e77..984b081f774 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
@@ -46,6 +47,8 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
+
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
/**
@@ -609,4 +612,26 @@ public class HStoreFile implements StoreFile {
Set getCompactedStoreFiles() {
return Collections.unmodifiableSet(this.compactedStoreFiles);
}
+
+ long increaseRefCount() {
+ return this.fileInfo.refCount.incrementAndGet();
+ }
+
+ long decreaseRefCount() {
+ return this.fileInfo.refCount.decrementAndGet();
+ }
+
+ static void increaseStoreFilesRefeCount(Collection storeFiles) {
+ if (CollectionUtils.isEmpty(storeFiles)) {
+ return;
+ }
+ storeFiles.forEach(HStoreFile::increaseRefCount);
+ }
+
+ static void decreaseStoreFilesRefeCount(Collection storeFiles) {
+ if (CollectionUtils.isEmpty(storeFiles)) {
+ return;
+ }
+ storeFiles.forEach(HStoreFile::decreaseRefCount);
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
index d2e9ac944da..f8009f85358 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
@@ -31,6 +31,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.security.PrivilegedExceptionAction;
@@ -44,6 +45,7 @@ import java.util.ListIterator;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.TreeSet;
+import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
@@ -1444,6 +1446,106 @@ public class TestHStore {
}
}
+ /**
+ * This test is for HBASE-27519, when the {@link StoreScanner} is scanning,the Flush and the
+ * Compaction execute concurrently and theCcompaction compact and archive the flushed
+ * {@link HStoreFile} which is used by {@link StoreScanner#updateReaders}.Before
+ * HBASE-27519,{@link StoreScanner.updateReaders} would throw {@link FileNotFoundException}.
+ */
+ @Test
+ public void testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently() throws IOException {
+ Configuration conf = HBaseConfiguration.create();
+ conf.setBoolean(WALFactory.WAL_ENABLED, false);
+ conf.set(DEFAULT_COMPACTION_POLICY_CLASS_KEY, EverythingPolicy.class.getName());
+ byte[] r0 = Bytes.toBytes("row0");
+ byte[] r1 = Bytes.toBytes("row1");
+ final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
+ final AtomicBoolean shouldWaitRef = new AtomicBoolean(false);
+ // Initialize region
+ final MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook() {
+ @Override
+ public void getScanners(MyStore store) throws IOException {
+ try {
+ // Here this method is called by StoreScanner.updateReaders which is invoked by the
+ // following TestHStore.flushStore
+ if (shouldWaitRef.get()) {
+ // wait the following compaction Task start
+ cyclicBarrier.await();
+ // wait the following HStore.closeAndArchiveCompactedFiles end.
+ cyclicBarrier.await();
+ }
+ } catch (BrokenBarrierException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+ final AtomicReference compactionExceptionRef = new AtomicReference(null);
+ Runnable compactionTask = () -> {
+ try {
+ // Only when the StoreScanner.updateReaders invoked by TestHStore.flushStore prepares for
+ // entering the MyStore.getScanners, compactionTask could start.
+ cyclicBarrier.await();
+ region.compactStore(family, new NoLimitThroughputController());
+ myStore.closeAndArchiveCompactedFiles();
+ // Notify StoreScanner.updateReaders could enter MyStore.getScanners.
+ cyclicBarrier.await();
+ } catch (Throwable e) {
+ compactionExceptionRef.set(e);
+ }
+ };
+
+ long ts = EnvironmentEdgeManager.currentTime();
+ long seqId = 100;
+ byte[] value = Bytes.toBytes("value");
+ // older data whihc shouldn't be "seen" by client
+ myStore.add(createCell(r0, qf1, ts, seqId, value), null);
+ flushStore(myStore, id++);
+ myStore.add(createCell(r0, qf2, ts, seqId, value), null);
+ flushStore(myStore, id++);
+ myStore.add(createCell(r0, qf3, ts, seqId, value), null);
+ TreeSet quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+ quals.add(qf1);
+ quals.add(qf2);
+ quals.add(qf3);
+
+ myStore.add(createCell(r1, qf1, ts, seqId, value), null);
+ myStore.add(createCell(r1, qf2, ts, seqId, value), null);
+ myStore.add(createCell(r1, qf3, ts, seqId, value), null);
+
+ Thread.currentThread()
+ .setName("testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently thread");
+ Scan scan = new Scan();
+ scan.withStartRow(r0, true);
+ try (InternalScanner scanner = (InternalScanner) myStore.getScanner(scan, quals, seqId)) {
+ List results = new MyList<>(size -> {
+ switch (size) {
+ case 1:
+ shouldWaitRef.set(true);
+ Thread thread = new Thread(compactionTask);
+ thread.setName("MyCompacting Thread.");
+ thread.start();
+ try {
+ flushStore(myStore, id++);
+ thread.join();
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ shouldWaitRef.set(false);
+ break;
+ default:
+ break;
+ }
+ });
+ // Before HBASE-27519, here would throw java.io.FileNotFoundException because the storeFile
+ // which used by StoreScanner.updateReaders is deleted by compactionTask.
+ scanner.next(results);
+ // The results is r0 row cells.
+ assertEquals(3, results.size());
+ assertTrue(compactionExceptionRef.get() == null);
+ }
+ }
+
@Test
public void testReclaimChunkWhenScaning() throws IOException {
init("testReclaimChunkWhenScaning");
|