HBASE-27519 Another case for FNFE on StoreFileScanner after a flush followed by a compaction (#4922)
Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
This commit is contained in:
parent
dd60d7f86e
commit
01f74ac074
|
@ -31,7 +31,13 @@ public interface ChangedReadersObserver {
|
|||
long getReadPoint();
|
||||
|
||||
/**
|
||||
* Notify observers.
|
||||
* Notify observers. <br/>
|
||||
* NOTE:Before we invoke this method,{@link HStoreFile#increaseRefCount} is invoked for every
|
||||
* {@link HStoreFile} in 'sfs' input parameter to prevent {@link HStoreFile} is archived after a
|
||||
* concurrent compaction, and after this method is invoked,{@link HStoreFile#decreaseRefCount} is
|
||||
* invoked.So if you open the {@link StoreFileReader} or {@link StoreFileScanner} asynchronously
|
||||
* in this method,you may need to invoke {@link HStoreFile#increaseRefCount} or
|
||||
* {@link HStoreFile#decreaseRefCount} by yourself to prevent the {@link HStoreFile}s be archived.
|
||||
* @param sfs The new files
|
||||
* @param memStoreScanners scanner of current memstore
|
||||
* @throws IOException e
|
||||
|
|
|
@ -885,15 +885,29 @@ public class HStore
|
|||
return sfs.stream().mapToLong(sf -> sf.getReader().length()).sum();
|
||||
}
|
||||
|
||||
private boolean completeFlush(List<HStoreFile> sfs, long snapshotId) throws IOException {
|
||||
private boolean completeFlush(final List<HStoreFile> sfs, long snapshotId) throws IOException {
|
||||
// NOTE:we should keep clearSnapshot method inside the write lock because clearSnapshot may
|
||||
// close {@link DefaultMemStore#snapshot}, which may be used by
|
||||
// {@link DefaultMemStore#getScanners}.
|
||||
storeEngine.addStoreFiles(sfs,
|
||||
snapshotId > 0 ? () -> this.memstore.clearSnapshot(snapshotId) : () -> {
|
||||
// NOTE: here we must increase the refCount for storeFiles because we would open the
|
||||
// storeFiles and get the StoreFileScanners for them in HStore.notifyChangedReadersObservers.
|
||||
// If we don't increase the refCount here, HStore.closeAndArchiveCompactedFiles called by
|
||||
// CompactedHFilesDischarger may archive the storeFiles after a concurrent compaction.Because
|
||||
// HStore.requestCompaction is under storeEngine lock, so here we increase the refCount under
|
||||
// storeEngine lock. see HBASE-27519 for more details.
|
||||
snapshotId > 0 ? () -> {
|
||||
this.memstore.clearSnapshot(snapshotId);
|
||||
HStoreFile.increaseStoreFilesRefeCount(sfs);
|
||||
} : () -> {
|
||||
HStoreFile.increaseStoreFilesRefeCount(sfs);
|
||||
});
|
||||
// notify to be called here - only in case of flushes
|
||||
notifyChangedReadersObservers(sfs);
|
||||
try {
|
||||
notifyChangedReadersObservers(sfs);
|
||||
} finally {
|
||||
HStoreFile.decreaseStoreFilesRefeCount(sfs);
|
||||
}
|
||||
if (LOG.isTraceEnabled()) {
|
||||
long totalSize = getTotalSize(sfs);
|
||||
String traceMessage = "FLUSH time,count,size,store size,store files ["
|
||||
|
@ -961,7 +975,13 @@ public class HStore
|
|||
storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScan(startRow,
|
||||
includeStartRow, stopRow, includeStopRow);
|
||||
memStoreScanners = this.memstore.getScanners(readPt);
|
||||
storeFilesToScan.stream().forEach(f -> f.getFileInfo().refCount.incrementAndGet());
|
||||
// NOTE: here we must increase the refCount for storeFiles because we would open the
|
||||
// storeFiles and get the StoreFileScanners for them.If we don't increase the refCount here,
|
||||
// HStore.closeAndArchiveCompactedFiles called by CompactedHFilesDischarger may archive the
|
||||
// storeFiles after a concurrent compaction.Because HStore.requestCompaction is under
|
||||
// storeEngine lock, so here we increase the refCount under storeEngine lock. see HBASE-27484
|
||||
// for more details.
|
||||
HStoreFile.increaseStoreFilesRefeCount(storeFilesToScan);
|
||||
} finally {
|
||||
this.storeEngine.readUnlock();
|
||||
}
|
||||
|
@ -982,7 +1002,7 @@ public class HStore
|
|||
clearAndClose(memStoreScanners);
|
||||
throw t instanceof IOException ? (IOException) t : new IOException(t);
|
||||
} finally {
|
||||
storeFilesToScan.stream().forEach(f -> f.getFileInfo().refCount.decrementAndGet());
|
||||
HStoreFile.decreaseStoreFilesRefeCount(storeFilesToScan);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
import java.io.IOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.net.URLEncoder;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
|
@ -48,6 +49,8 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
|
||||
/**
|
||||
|
@ -648,4 +651,26 @@ public class HStoreFile implements StoreFile {
|
|||
Set<String> getCompactedStoreFiles() {
|
||||
return Collections.unmodifiableSet(this.compactedStoreFiles);
|
||||
}
|
||||
|
||||
long increaseRefCount() {
|
||||
return this.fileInfo.refCount.incrementAndGet();
|
||||
}
|
||||
|
||||
long decreaseRefCount() {
|
||||
return this.fileInfo.refCount.decrementAndGet();
|
||||
}
|
||||
|
||||
static void increaseStoreFilesRefeCount(Collection<HStoreFile> storeFiles) {
|
||||
if (CollectionUtils.isEmpty(storeFiles)) {
|
||||
return;
|
||||
}
|
||||
storeFiles.forEach(HStoreFile::increaseRefCount);
|
||||
}
|
||||
|
||||
static void decreaseStoreFilesRefeCount(Collection<HStoreFile> storeFiles) {
|
||||
if (CollectionUtils.isEmpty(storeFiles)) {
|
||||
return;
|
||||
}
|
||||
storeFiles.forEach(HStoreFile::decreaseRefCount);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,6 +31,7 @@ import static org.mockito.Mockito.times;
|
|||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.lang.ref.SoftReference;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
|
@ -44,6 +45,7 @@ import java.util.ListIterator;
|
|||
import java.util.NavigableSet;
|
||||
import java.util.Optional;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.BrokenBarrierException;
|
||||
import java.util.concurrent.ConcurrentSkipListSet;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
|
@ -1531,6 +1533,106 @@ public class TestHStore {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This test is for HBASE-27519, when the {@link StoreScanner} is scanning,the Flush and the
|
||||
* Compaction execute concurrently and theCcompaction compact and archive the flushed
|
||||
* {@link HStoreFile} which is used by {@link StoreScanner#updateReaders}.Before
|
||||
* HBASE-27519,{@link StoreScanner.updateReaders} would throw {@link FileNotFoundException}.
|
||||
*/
|
||||
@Test
|
||||
public void testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently() throws IOException {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
conf.setBoolean(WALFactory.WAL_ENABLED, false);
|
||||
conf.set(DEFAULT_COMPACTION_POLICY_CLASS_KEY, EverythingPolicy.class.getName());
|
||||
byte[] r0 = Bytes.toBytes("row0");
|
||||
byte[] r1 = Bytes.toBytes("row1");
|
||||
final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
|
||||
final AtomicBoolean shouldWaitRef = new AtomicBoolean(false);
|
||||
// Initialize region
|
||||
final MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook() {
|
||||
@Override
|
||||
public void getScanners(MyStore store) throws IOException {
|
||||
try {
|
||||
// Here this method is called by StoreScanner.updateReaders which is invoked by the
|
||||
// following TestHStore.flushStore
|
||||
if (shouldWaitRef.get()) {
|
||||
// wait the following compaction Task start
|
||||
cyclicBarrier.await();
|
||||
// wait the following HStore.closeAndArchiveCompactedFiles end.
|
||||
cyclicBarrier.await();
|
||||
}
|
||||
} catch (BrokenBarrierException | InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
final AtomicReference<Throwable> compactionExceptionRef = new AtomicReference<Throwable>(null);
|
||||
Runnable compactionTask = () -> {
|
||||
try {
|
||||
// Only when the StoreScanner.updateReaders invoked by TestHStore.flushStore prepares for
|
||||
// entering the MyStore.getScanners, compactionTask could start.
|
||||
cyclicBarrier.await();
|
||||
region.compactStore(family, new NoLimitThroughputController());
|
||||
myStore.closeAndArchiveCompactedFiles();
|
||||
// Notify StoreScanner.updateReaders could enter MyStore.getScanners.
|
||||
cyclicBarrier.await();
|
||||
} catch (Throwable e) {
|
||||
compactionExceptionRef.set(e);
|
||||
}
|
||||
};
|
||||
|
||||
long ts = EnvironmentEdgeManager.currentTime();
|
||||
long seqId = 100;
|
||||
byte[] value = Bytes.toBytes("value");
|
||||
// older data whihc shouldn't be "seen" by client
|
||||
myStore.add(createCell(r0, qf1, ts, seqId, value), null);
|
||||
flushStore(myStore, id++);
|
||||
myStore.add(createCell(r0, qf2, ts, seqId, value), null);
|
||||
flushStore(myStore, id++);
|
||||
myStore.add(createCell(r0, qf3, ts, seqId, value), null);
|
||||
TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
|
||||
quals.add(qf1);
|
||||
quals.add(qf2);
|
||||
quals.add(qf3);
|
||||
|
||||
myStore.add(createCell(r1, qf1, ts, seqId, value), null);
|
||||
myStore.add(createCell(r1, qf2, ts, seqId, value), null);
|
||||
myStore.add(createCell(r1, qf3, ts, seqId, value), null);
|
||||
|
||||
Thread.currentThread()
|
||||
.setName("testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently thread");
|
||||
Scan scan = new Scan();
|
||||
scan.withStartRow(r0, true);
|
||||
try (InternalScanner scanner = (InternalScanner) myStore.getScanner(scan, quals, seqId)) {
|
||||
List<Cell> results = new MyList<>(size -> {
|
||||
switch (size) {
|
||||
case 1:
|
||||
shouldWaitRef.set(true);
|
||||
Thread thread = new Thread(compactionTask);
|
||||
thread.setName("MyCompacting Thread.");
|
||||
thread.start();
|
||||
try {
|
||||
flushStore(myStore, id++);
|
||||
thread.join();
|
||||
} catch (IOException | InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
shouldWaitRef.set(false);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
});
|
||||
// Before HBASE-27519, here would throw java.io.FileNotFoundException because the storeFile
|
||||
// which used by StoreScanner.updateReaders is deleted by compactionTask.
|
||||
scanner.next(results);
|
||||
// The results is r0 row cells.
|
||||
assertEquals(3, results.size());
|
||||
assertTrue(compactionExceptionRef.get() == null);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReclaimChunkWhenScaning() throws IOException {
|
||||
init("testReclaimChunkWhenScaning");
|
||||
|
|
Loading…
Reference in New Issue