HBASE-27519 Another case for FNFE on StoreFileScanner after a flush followed by a compaction (#4923)

Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
This commit is contained in:
chenglei 2022-12-10 12:41:05 +08:00 committed by GitHub
parent ccd1d8a293
commit bc45f1584e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 155 additions and 5 deletions

View File

@ -31,7 +31,13 @@ public interface ChangedReadersObserver {
long getReadPoint();
/**
* Notify observers.
* Notify observers. <br/>
* NOTE:Before we invoke this method,{@link HStoreFile#increaseRefCount} is invoked for every
* {@link HStoreFile} in 'sfs' input parameter to prevent {@link HStoreFile} is archived after a
* concurrent compaction, and after this method is invoked,{@link HStoreFile#decreaseRefCount} is
* invoked.So if you open the {@link StoreFileReader} or {@link StoreFileScanner} asynchronously
* in this method,you may need to invoke {@link HStoreFile#increaseRefCount} or
* {@link HStoreFile#decreaseRefCount} by yourself to prevent the {@link HStoreFile}s be archived.
* @param sfs The new files
* @param memStoreScanners scanner of current memstore
* @throws IOException e

View File

@ -1195,6 +1195,13 @@ public class HStore
if (snapshotId > 0) {
this.memstore.clearSnapshot(snapshotId);
}
// NOTE: here we must increase the refCount for storeFiles because we would open the
// storeFiles and get the StoreFileScanners for them in HStore.notifyChangedReadersObservers.
// If we don't increase the refCount here, HStore.closeAndArchiveCompactedFiles called by
// CompactedHFilesDischarger may archive the storeFiles after a concurrent compaction.Because
// HStore.requestCompaction is under storeEngine lock, so here we increase the refCount under
// storeEngine lock. see HBASE-27519 for more details.
HStoreFile.increaseStoreFilesRefeCount(sfs);
} finally {
// We need the lock, as long as we are updating the storeFiles
// or changing the memstore. Let us release it before calling
@ -1204,8 +1211,12 @@ public class HStore
this.lock.writeLock().unlock();
}
// notify to be called here - only in case of flushes
notifyChangedReadersObservers(sfs);
try {
// notify to be called here - only in case of flushes
notifyChangedReadersObservers(sfs);
} finally {
HStoreFile.decreaseStoreFilesRefeCount(sfs);
}
if (LOG.isTraceEnabled()) {
long totalSize = getTotalSize(sfs);
String traceMessage = "FLUSH time,count,size,store size,store files ["
@ -1273,7 +1284,13 @@ public class HStore
storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScan(startRow,
includeStartRow, stopRow, includeStopRow);
memStoreScanners = this.memstore.getScanners(readPt);
storeFilesToScan.stream().forEach(f -> f.getFileInfo().refCount.incrementAndGet());
// NOTE: here we must increase the refCount for storeFiles because we would open the
// storeFiles and get the StoreFileScanners for them.If we don't increase the refCount here,
// HStore.closeAndArchiveCompactedFiles called by CompactedHFilesDischarger may archive the
// storeFiles after a concurrent compaction.Because HStore.requestCompaction is under
// storeEngine lock, so here we increase the refCount under storeEngine lock. see HBASE-27484
// for more details.
HStoreFile.increaseStoreFilesRefeCount(storeFilesToScan);
} finally {
this.lock.readLock().unlock();
}
@ -1294,7 +1311,7 @@ public class HStore
clearAndClose(memStoreScanners);
throw t instanceof IOException ? (IOException) t : new IOException(t);
} finally {
storeFilesToScan.stream().forEach(f -> f.getFileInfo().refCount.decrementAndGet());
HStoreFile.decreaseStoreFilesRefeCount(storeFilesToScan);
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
@ -46,6 +47,8 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
/**
@ -609,4 +612,26 @@ public class HStoreFile implements StoreFile {
Set<String> getCompactedStoreFiles() {
return Collections.unmodifiableSet(this.compactedStoreFiles);
}
long increaseRefCount() {
return this.fileInfo.refCount.incrementAndGet();
}
long decreaseRefCount() {
return this.fileInfo.refCount.decrementAndGet();
}
static void increaseStoreFilesRefeCount(Collection<HStoreFile> storeFiles) {
if (CollectionUtils.isEmpty(storeFiles)) {
return;
}
storeFiles.forEach(HStoreFile::increaseRefCount);
}
static void decreaseStoreFilesRefeCount(Collection<HStoreFile> storeFiles) {
if (CollectionUtils.isEmpty(storeFiles)) {
return;
}
storeFiles.forEach(HStoreFile::decreaseRefCount);
}
}

View File

@ -31,6 +31,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.security.PrivilegedExceptionAction;
@ -44,6 +45,7 @@ import java.util.ListIterator;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.TreeSet;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
@ -1444,6 +1446,106 @@ public class TestHStore {
}
}
/**
* This test is for HBASE-27519, when the {@link StoreScanner} is scanning,the Flush and the
* Compaction execute concurrently and theCcompaction compact and archive the flushed
* {@link HStoreFile} which is used by {@link StoreScanner#updateReaders}.Before
* HBASE-27519,{@link StoreScanner.updateReaders} would throw {@link FileNotFoundException}.
*/
@Test
public void testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently() throws IOException {
Configuration conf = HBaseConfiguration.create();
conf.setBoolean(WALFactory.WAL_ENABLED, false);
conf.set(DEFAULT_COMPACTION_POLICY_CLASS_KEY, EverythingPolicy.class.getName());
byte[] r0 = Bytes.toBytes("row0");
byte[] r1 = Bytes.toBytes("row1");
final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
final AtomicBoolean shouldWaitRef = new AtomicBoolean(false);
// Initialize region
final MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook() {
@Override
public void getScanners(MyStore store) throws IOException {
try {
// Here this method is called by StoreScanner.updateReaders which is invoked by the
// following TestHStore.flushStore
if (shouldWaitRef.get()) {
// wait the following compaction Task start
cyclicBarrier.await();
// wait the following HStore.closeAndArchiveCompactedFiles end.
cyclicBarrier.await();
}
} catch (BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
}
});
final AtomicReference<Throwable> compactionExceptionRef = new AtomicReference<Throwable>(null);
Runnable compactionTask = () -> {
try {
// Only when the StoreScanner.updateReaders invoked by TestHStore.flushStore prepares for
// entering the MyStore.getScanners, compactionTask could start.
cyclicBarrier.await();
region.compactStore(family, new NoLimitThroughputController());
myStore.closeAndArchiveCompactedFiles();
// Notify StoreScanner.updateReaders could enter MyStore.getScanners.
cyclicBarrier.await();
} catch (Throwable e) {
compactionExceptionRef.set(e);
}
};
long ts = EnvironmentEdgeManager.currentTime();
long seqId = 100;
byte[] value = Bytes.toBytes("value");
// older data whihc shouldn't be "seen" by client
myStore.add(createCell(r0, qf1, ts, seqId, value), null);
flushStore(myStore, id++);
myStore.add(createCell(r0, qf2, ts, seqId, value), null);
flushStore(myStore, id++);
myStore.add(createCell(r0, qf3, ts, seqId, value), null);
TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
quals.add(qf1);
quals.add(qf2);
quals.add(qf3);
myStore.add(createCell(r1, qf1, ts, seqId, value), null);
myStore.add(createCell(r1, qf2, ts, seqId, value), null);
myStore.add(createCell(r1, qf3, ts, seqId, value), null);
Thread.currentThread()
.setName("testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently thread");
Scan scan = new Scan();
scan.withStartRow(r0, true);
try (InternalScanner scanner = (InternalScanner) myStore.getScanner(scan, quals, seqId)) {
List<Cell> results = new MyList<>(size -> {
switch (size) {
case 1:
shouldWaitRef.set(true);
Thread thread = new Thread(compactionTask);
thread.setName("MyCompacting Thread.");
thread.start();
try {
flushStore(myStore, id++);
thread.join();
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
shouldWaitRef.set(false);
break;
default:
break;
}
});
// Before HBASE-27519, here would throw java.io.FileNotFoundException because the storeFile
// which used by StoreScanner.updateReaders is deleted by compactionTask.
scanner.next(results);
// The results is r0 row cells.
assertEquals(3, results.size());
assertTrue(compactionExceptionRef.get() == null);
}
}
@Test
public void testReclaimChunkWhenScaning() throws IOException {
init("testReclaimChunkWhenScaning");