HBASE-19468 FNFE during scans and flushes (Ram)

This commit is contained in:
ramkrish86 2017-12-20 17:06:27 +05:30
parent d53e960c55
commit d28732fd98
2 changed files with 68 additions and 7 deletions

View File

@ -24,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.NavigableSet;
import java.util.concurrent.CountDownLatch;
@ -132,8 +133,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
private boolean scanUsePread = false;
// Indicates whether there was flush during the course of the scan
private volatile boolean flushed = false;
// generally we get one file from a flush
private final List<StoreFile> flushedStoreFiles = new ArrayList<StoreFile>(1);
private final List<KeyValueScanner> flushedstoreFileScanners =
new ArrayList<KeyValueScanner>(1);
// generally we get one memstroe scanner from a flush
private final List<KeyValueScanner> memStoreScannersAfterFlush = new ArrayList<>(1);
// The current list of scanners
@ -463,6 +466,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
this.closing = true;
clearAndClose(scannersForDelayedClose);
clearAndClose(memStoreScannersAfterFlush);
// clear them at any case. In case scanner.next() was never called
// and there were some lease expiry we need to close all the scanners
// on the flushed files which are open
clearAndClose(flushedstoreFileScanners);
// Under test, we dont have a this.store
if (this.store != null)
this.store.deleteChangedReaderObserver(this);
@ -833,7 +840,17 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
flushLock.lock();
try {
flushed = true;
flushedStoreFiles.addAll(sfs);
final boolean isCompaction = false;
boolean usePread = get || scanUsePread;
// SEE HBASE-19468 where the flushed files are getting compacted even before a scanner
// calls next(). So its better we create scanners here rather than next() call. Ensure
// these scanners are properly closed() whether or not the scan is completed successfully
// Eagerly creating scanners so that we have the ref counting ticking on the newly created
// store files. In case of stream scanners this eager creation does not induce performance
// penalty because in scans (that uses stream scanners) the next() call is bound to happen.
List<KeyValueScanner> scanners = store.getScanners(sfs, cacheBlocks, get, usePread,
isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, false);
flushedstoreFileScanners.addAll(scanners);
if (!CollectionUtils.isEmpty(memStoreScanners)) {
clearAndClose(memStoreScannersAfterFlush);
memStoreScannersAfterFlush.addAll(memStoreScanners);
@ -901,13 +918,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
List<KeyValueScanner> scanners = null;
flushLock.lock();
try {
List<KeyValueScanner> allScanners = new ArrayList<>(flushedStoreFiles.size() + memStoreScannersAfterFlush.size());
allScanners.addAll(store.getScanners(flushedStoreFiles, cacheBlocks, get, usePread,
isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, false));
List<KeyValueScanner> allScanners =
new ArrayList<>(flushedstoreFileScanners.size() + memStoreScannersAfterFlush.size());
allScanners.addAll(flushedstoreFileScanners);
allScanners.addAll(memStoreScannersAfterFlush);
scanners = selectScannersFrom(allScanners);
// Clear the current set of flushed store files so that they don't get added again
flushedStoreFiles.clear();
flushedstoreFileScanners.clear();
memStoreScannersAfterFlush.clear();
} finally {
flushLock.unlock();

View File

@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
@ -336,6 +337,49 @@ public class TestCompactedHFilesDischarger {
assertTrue(compactedfiles.size() == 0);
}
@Test
public void testStoreFileMissing() throws Exception {
// Write 3 records and create 3 store files.
write("row1");
region.flush(true);
write("row2");
region.flush(true);
write("row3");
region.flush(true);
Scan scan = new Scan();
scan.setCaching(1);
RegionScanner scanner = region.getScanner(scan);
List<Cell> res = new ArrayList<Cell>();
// Read first item
scanner.next(res);
assertEquals("row1", Bytes.toString(CellUtil.cloneRow(res.get(0))));
res.clear();
// Create a new file in between scan nexts
write("row4");
region.flush(true);
// Compact the table
region.compact(true);
// Create the cleaner object
CompactedHFilesDischarger cleaner =
new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false);
cleaner.chore();
// This issues scan next
scanner.next(res);
assertEquals("row2", Bytes.toString(CellUtil.cloneRow(res.get(0))));
scanner.close();
}
private void write(String row1) throws IOException {
byte[] row = Bytes.toBytes(row1);
Put put = new Put(row);
put.addColumn(fam, qual1, row);
region.put(put);
}
protected void countDown() {
// count down 3 times
latch.countDown();
@ -369,7 +413,7 @@ public class TestCompactedHFilesDischarger {
try {
initiateScan(region);
} catch (IOException e) {
// do nothing
e.printStackTrace();
}
}