HBASE-19468 FNFE during scans and flushes (Ram)
This commit is contained in:
parent
e67a67d444
commit
d2534fc570
|
@ -148,7 +148,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
// Indicates whether there was flush during the course of the scan
|
// Indicates whether there was flush during the course of the scan
|
||||||
private volatile boolean flushed = false;
|
private volatile boolean flushed = false;
|
||||||
// generally we get one file from a flush
|
// generally we get one file from a flush
|
||||||
private final List<HStoreFile> flushedStoreFiles = new ArrayList<>(1);
|
private final List<KeyValueScanner> flushedstoreFileScanners = new ArrayList<>(1);
|
||||||
// Since CompactingMemstore is now default, we get three memstore scanners from a flush
|
// Since CompactingMemstore is now default, we get three memstore scanners from a flush
|
||||||
private final List<KeyValueScanner> memStoreScannersAfterFlush = new ArrayList<>(3);
|
private final List<KeyValueScanner> memStoreScannersAfterFlush = new ArrayList<>(3);
|
||||||
// The current list of scanners
|
// The current list of scanners
|
||||||
|
@ -479,6 +479,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
if (withDelayedScannersClose) {
|
if (withDelayedScannersClose) {
|
||||||
clearAndClose(scannersForDelayedClose);
|
clearAndClose(scannersForDelayedClose);
|
||||||
clearAndClose(memStoreScannersAfterFlush);
|
clearAndClose(memStoreScannersAfterFlush);
|
||||||
|
clearAndClose(flushedstoreFileScanners);
|
||||||
if (this.heap != null) {
|
if (this.heap != null) {
|
||||||
this.heap.close();
|
this.heap.close();
|
||||||
this.currentScanners.clear();
|
this.currentScanners.clear();
|
||||||
|
@ -842,7 +843,17 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
flushLock.lock();
|
flushLock.lock();
|
||||||
try {
|
try {
|
||||||
flushed = true;
|
flushed = true;
|
||||||
flushedStoreFiles.addAll(sfs);
|
final boolean isCompaction = false;
|
||||||
|
boolean usePread = get || scanUsePread;
|
||||||
|
// SEE HBASE-19468 where the flushed files are getting compacted even before a scanner
|
||||||
|
// calls next(). So its better we create scanners here rather than next() call. Ensure
|
||||||
|
// these scanners are properly closed() whether or not the scan is completed successfully
|
||||||
|
// Eagerly creating scanners so that we have the ref counting ticking on the newly created
|
||||||
|
// store files. In case of stream scanners this eager creation does not induce performance
|
||||||
|
// penalty because in scans (that uses stream scanners) the next() call is bound to happen.
|
||||||
|
List<KeyValueScanner> scanners = store.getScanners(sfs, cacheBlocks, get, usePread,
|
||||||
|
isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, false);
|
||||||
|
flushedstoreFileScanners.addAll(scanners);
|
||||||
if (!CollectionUtils.isEmpty(memStoreScanners)) {
|
if (!CollectionUtils.isEmpty(memStoreScanners)) {
|
||||||
clearAndClose(memStoreScannersAfterFlush);
|
clearAndClose(memStoreScannersAfterFlush);
|
||||||
memStoreScannersAfterFlush.addAll(memStoreScanners);
|
memStoreScannersAfterFlush.addAll(memStoreScanners);
|
||||||
|
@ -865,13 +876,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
List<KeyValueScanner> scanners;
|
List<KeyValueScanner> scanners;
|
||||||
flushLock.lock();
|
flushLock.lock();
|
||||||
try {
|
try {
|
||||||
List<KeyValueScanner> allScanners = new ArrayList<>(flushedStoreFiles.size() + memStoreScannersAfterFlush.size());
|
List<KeyValueScanner> allScanners =
|
||||||
allScanners.addAll(store.getScanners(flushedStoreFiles, cacheBlocks, get,
|
new ArrayList<>(flushedstoreFileScanners.size() + memStoreScannersAfterFlush.size());
|
||||||
scanUsePread, false, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, false));
|
allScanners.addAll(flushedstoreFileScanners);
|
||||||
allScanners.addAll(memStoreScannersAfterFlush);
|
allScanners.addAll(memStoreScannersAfterFlush);
|
||||||
scanners = selectScannersFrom(store, allScanners);
|
scanners = selectScannersFrom(store, allScanners);
|
||||||
// Clear the current set of flushed store files so that they don't get added again
|
// Clear the current set of flushed store files scanners so that they don't get added again
|
||||||
flushedStoreFiles.clear();
|
flushedstoreFileScanners.clear();
|
||||||
memStoreScannersAfterFlush.clear();
|
memStoreScannersAfterFlush.clear();
|
||||||
} finally {
|
} finally {
|
||||||
flushLock.unlock();
|
flushLock.unlock();
|
||||||
|
|
|
@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
import org.apache.hadoop.hbase.CellUtil;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
|
@ -333,6 +334,49 @@ public class TestCompactedHFilesDischarger {
|
||||||
assertTrue(compactedfiles.isEmpty());
|
assertTrue(compactedfiles.isEmpty());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStoreFileMissing() throws Exception {
|
||||||
|
// Write 3 records and create 3 store files.
|
||||||
|
write("row1");
|
||||||
|
region.flush(true);
|
||||||
|
write("row2");
|
||||||
|
region.flush(true);
|
||||||
|
write("row3");
|
||||||
|
region.flush(true);
|
||||||
|
|
||||||
|
Scan scan = new Scan();
|
||||||
|
scan.setCaching(1);
|
||||||
|
RegionScanner scanner = region.getScanner(scan);
|
||||||
|
List<Cell> res = new ArrayList<Cell>();
|
||||||
|
// Read first item
|
||||||
|
scanner.next(res);
|
||||||
|
assertEquals("row1", Bytes.toString(CellUtil.cloneRow(res.get(0))));
|
||||||
|
res.clear();
|
||||||
|
// Create a new file in between scan nexts
|
||||||
|
write("row4");
|
||||||
|
region.flush(true);
|
||||||
|
|
||||||
|
// Compact the table
|
||||||
|
region.compact(true);
|
||||||
|
|
||||||
|
// Create the cleaner object
|
||||||
|
CompactedHFilesDischarger cleaner =
|
||||||
|
new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false);
|
||||||
|
cleaner.chore();
|
||||||
|
// This issues scan next
|
||||||
|
scanner.next(res);
|
||||||
|
assertEquals("row2", Bytes.toString(CellUtil.cloneRow(res.get(0))));
|
||||||
|
|
||||||
|
scanner.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void write(String row1) throws IOException {
|
||||||
|
byte[] row = Bytes.toBytes(row1);
|
||||||
|
Put put = new Put(row);
|
||||||
|
put.addColumn(fam, qual1, row);
|
||||||
|
region.put(put);
|
||||||
|
}
|
||||||
|
|
||||||
protected void countDown() {
|
protected void countDown() {
|
||||||
// count down 3 times
|
// count down 3 times
|
||||||
latch.countDown();
|
latch.countDown();
|
||||||
|
@ -366,7 +410,7 @@ public class TestCompactedHFilesDischarger {
|
||||||
try {
|
try {
|
||||||
initiateScan(region);
|
initiateScan(region);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
// do nothing
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue