HBASE-11591 Scanner fails to retrieve KV from bulk loaded file with

highest sequence id than the cell's mvcc in a non-bulk loaded file (Ram)
This commit is contained in:
Ramkrishna 2014-08-26 17:36:37 +05:30
parent 582123cd8f
commit dea6480023
5 changed files with 296 additions and 23 deletions

View File

@ -66,6 +66,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
@ -74,7 +75,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
@ -120,8 +120,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServic
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
@ -1741,6 +1741,7 @@ public class HRegion implements HeapSize { // , Writable{
if (this.memstoreSize.get() <= 0) {
// Take an update lock because am about to change the sequence id and we want the sequence id
// to be at the border of the empty memstore.
MultiVersionConsistencyControl.WriteEntry w = null;
this.updatesLock.writeLock().lock();
try {
if (this.memstoreSize.get() <= 0) {
@ -1750,13 +1751,29 @@ public class HRegion implements HeapSize { // , Writable{
// sure just beyond the last appended region edit (useful as a marker when bulk loading,
// etc.)
// wal can be null replaying edits.
return wal != null?
new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
getNextSequenceId(wal), "Nothing to flush"):
new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush");
try {
if (wal != null) {
w = mvcc.beginMemstoreInsert();
long flushSeqId = getNextSequenceId(wal);
FlushResult flushResult = new FlushResult(
FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, flushSeqId, "Nothing to flush");
w.setWriteNumber(flushSeqId);
mvcc.waitForPreviousTransactionsComplete(w);
w = null;
return flushResult;
} else {
return new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
"Nothing to flush");
}
} finally {
this.updatesLock.writeLock().unlock();
}
}
} finally {
this.updatesLock.writeLock().unlock();
if (w != null) {
mvcc.advanceMemstore(w);
}
}
}
@ -1864,6 +1881,7 @@ public class HRegion implements HeapSize { // , Writable{
// uncommitted transactions from being written into HFiles.
// We have to block before we start the flush, otherwise keys that
// were removed via a rollbackMemstore could be written to Hfiles.
w.setWriteNumber(flushSeqId);
mvcc.waitForPreviousTransactionsComplete(w);
// set w to null to prevent mvcc.advanceMemstore from being called again inside finally block
w = null;

View File

@ -381,6 +381,7 @@ public class StoreFile {
this.sequenceid += 1;
}
}
this.reader.setBulkLoaded(true);
}
this.reader.setSequenceID(this.sequenceid);
@ -1009,6 +1010,7 @@ public class StoreFile {
protected long sequenceID = -1;
private byte[] lastBloomKey;
private long deleteFamilyCnt = -1;
private boolean bulkLoadResult = false;
public Reader(FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf)
throws IOException {
@ -1475,6 +1477,14 @@ public class StoreFile {
this.sequenceID = sequenceID;
}
public void setBulkLoaded(boolean bulkLoadResult) {
this.bulkLoadResult = bulkLoadResult;
}
public boolean isBulkLoaded() {
return this.bulkLoadResult;
}
BloomFilter getGeneralBloomFilter() {
return generalBloomFilter;
}

View File

@ -137,9 +137,10 @@ public class StoreFileScanner implements KeyValueScanner {
// only seek if we aren't at the end. cur == null implies 'end'.
if (cur != null) {
hfs.next();
cur = hfs.getKeyValue();
if (hasMVCCInfo)
setCurrentCell(hfs.getKeyValue());
if (hasMVCCInfo || this.reader.isBulkLoaded()) {
skipKVsNewerThanReadpoint();
}
}
} catch(IOException e) {
throw new IOException("Could not iterate " + this, e);
@ -157,9 +158,13 @@ public class StoreFileScanner implements KeyValueScanner {
return false;
}
cur = hfs.getKeyValue();
setCurrentCell(hfs.getKeyValue());
return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
if (!hasMVCCInfo && this.reader.isBulkLoaded()) {
return skipKVsNewerThanReadpoint();
} else {
return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
}
} finally {
realSeekDone = true;
}
@ -177,9 +182,13 @@ public class StoreFileScanner implements KeyValueScanner {
close();
return false;
}
cur = hfs.getKeyValue();
setCurrentCell(hfs.getKeyValue());
return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
if (!hasMVCCInfo && this.reader.isBulkLoaded()) {
return skipKVsNewerThanReadpoint();
} else {
return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
}
} finally {
realSeekDone = true;
}
@ -189,6 +198,15 @@ public class StoreFileScanner implements KeyValueScanner {
}
}
protected void setCurrentCell(Cell newVal) {
this.cur = newVal;
if(this.cur != null && this.reader.isBulkLoaded() && cur.getSequenceId() <= 0) {
KeyValue curKV = KeyValueUtil.ensureKeyValue(cur);
curKV.setSequenceId(this.reader.getSequenceID());
cur = curKV;
}
}
protected boolean skipKVsNewerThanReadpoint() throws IOException {
// We want to ignore all key-values that are newer than our current
// readPoint
@ -197,7 +215,7 @@ public class StoreFileScanner implements KeyValueScanner {
&& cur != null
&& (cur.getMvccVersion() > readPt)) {
hfs.next();
cur = hfs.getKeyValue();
setCurrentCell(hfs.getKeyValue());
if (this.stopSkippingKVsIfNextRow
&& getComparator().compareRows(cur.getRowArray(), cur.getRowOffset(),
cur.getRowLength(), startKV.getRowArray(), startKV.getRowOffset(),
@ -325,7 +343,7 @@ public class StoreFileScanner implements KeyValueScanner {
// a higher timestamp than the max timestamp in this file. We know that
// the next point when we have to consider this file again is when we
// pass the max timestamp of this file (with the same row/column).
cur = KeyValueUtil.createFirstOnRowColTS(kv, maxTimestampInFile);
setCurrentCell(KeyValueUtil.createFirstOnRowColTS(kv, maxTimestampInFile));
} else {
// This will be the case e.g. when we need to seek to the next
// row/column, and we don't know exactly what they are, so we set the
@ -343,13 +361,13 @@ public class StoreFileScanner implements KeyValueScanner {
// key/value and the store scanner will progress to the next column. This
// is obviously not a "real real" seek, but unlike the fake KV earlier in
// this method, we want this to be propagated to ScanQueryMatcher.
cur = KeyValueUtil.createLastOnRowCol(kv);
setCurrentCell(KeyValueUtil.createLastOnRowCol(kv));
realSeekDone = true;
return true;
}
Reader getReaderForTesting() {
Reader getReader() {
return reader;
}
@ -420,7 +438,7 @@ public class StoreFileScanner implements KeyValueScanner {
return false;
}
cur = hfs.getKeyValue();
setCurrentCell(hfs.getKeyValue());
this.stopSkippingKVsIfNextRow = true;
boolean resultOfSkipKVs;
try {

View File

@ -18,6 +18,9 @@
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@ -53,8 +56,6 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import static org.junit.Assert.*;
/**
* Test a multi-column scanner when there is a Bloom filter false-positive.
* This is needed for the multi-column Bloom filter optimization.
@ -132,8 +133,8 @@ public class TestScanWithBloomError {
Collections.sort(scanners, new Comparator<StoreFileScanner>() {
@Override
public int compare(StoreFileScanner s1, StoreFileScanner s2) {
Path p1 = s1.getReaderForTesting().getHFileReader().getPath();
Path p2 = s2.getReaderForTesting().getHFileReader().getPath();
Path p1 = s1.getReader().getHFileReader().getPath();
Path p2 = s2.getReader().getHFileReader().getPath();
long t1, t2;
try {
t1 = fs.getFileStatus(p1).getModificationTime();
@ -147,7 +148,7 @@ public class TestScanWithBloomError {
StoreFile.Reader lastStoreFileReader = null;
for (StoreFileScanner sfScanner : scanners)
lastStoreFileReader = sfScanner.getReaderForTesting();
lastStoreFileReader = sfScanner.getReader();
new HFilePrettyPrinter().run(new String[]{ "-m", "-p", "-f",
lastStoreFileReader.getHFileReader().getPath().toString()});

View File

@ -0,0 +1,226 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(MediumTests.class)
public class TestScannerWithBulkload {
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniCluster(1);
}
private static void createTable(HBaseAdmin admin, String tableName) throws IOException {
HTableDescriptor desc = new HTableDescriptor(tableName);
HColumnDescriptor hcd = new HColumnDescriptor("col");
hcd.setMaxVersions(3);
desc.addFamily(hcd);
admin.createTable(desc);
}
@Test
public void testBulkLoad() throws Exception {
String tableName = "testBulkLoad";
long l = System.currentTimeMillis();
HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
createTable(admin, tableName);
Scan scan = createScan();
final HTable table = init(admin, l, scan, tableName);
// use bulkload
final Path hfilePath = writeToHFile(l, "/temp/testBulkLoad/", "/temp/testBulkLoad/col/file");
Configuration conf = TEST_UTIL.getConfiguration();
conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
bulkload.doBulkLoad(hfilePath, table);
ResultScanner scanner = table.getScanner(scan);
Result result = scanner.next();
result = scanAfterBulkLoad(scanner, result, "version2");
Put put0 = new Put(Bytes.toBytes("row1"));
put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
.toBytes("version3")));
table.put(put0);
table.flushCommits();
admin.flush(tableName);
scanner = table.getScanner(scan);
result = scanner.next();
while (result != null) {
List<KeyValue> kvs = result.getColumn(Bytes.toBytes("col"), Bytes.toBytes("q"));
for (KeyValue _kv : kvs) {
if (Bytes.toString(_kv.getRow()).equals("row1")) {
System.out.println(Bytes.toString(_kv.getRow()));
System.out.println(Bytes.toString(_kv.getQualifier()));
System.out.println(Bytes.toString(_kv.getValue()));
Assert.assertEquals("version3", Bytes.toString(_kv.getValue()));
}
}
result = scanner.next();
}
table.close();
}
private Result scanAfterBulkLoad(ResultScanner scanner, Result result, String expctedVal)
throws IOException {
while (result != null) {
List<KeyValue> kvs = result.getColumn(Bytes.toBytes("col"), Bytes.toBytes("q"));
for (KeyValue _kv : kvs) {
if (Bytes.toString(_kv.getRow()).equals("row1")) {
System.out.println(Bytes.toString(_kv.getRow()));
System.out.println(Bytes.toString(_kv.getQualifier()));
System.out.println(Bytes.toString(_kv.getValue()));
Assert.assertEquals(expctedVal, Bytes.toString(_kv.getValue()));
}
}
result = scanner.next();
}
return result;
}
private Path writeToHFile(long l, String hFilePath, String pathStr) throws IOException {
FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
final Path hfilePath = new Path(hFilePath);
fs.mkdirs(hfilePath);
Path path = new Path(pathStr);
HFile.WriterFactory wf = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration());
Assert.assertNotNull(wf);
HFileContext context = new HFileContext();
HFile.Writer writer = wf.withPath(fs, path).withFileContext(context).create();
KeyValue kv = new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l,
Bytes.toBytes("version2"));
writer.append(kv);
// Add the bulk load time_key. otherwise we cannot ensure that it is a bulk
// loaded file
writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis()));
writer.close();
return hfilePath;
}
private HTable init(HBaseAdmin admin, long l, Scan scan, String tableName) throws Exception {
HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
Put put0 = new Put(Bytes.toBytes("row1"));
put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
.toBytes("version0")));
table.put(put0);
table.flushCommits();
admin.flush(tableName);
Put put1 = new Put(Bytes.toBytes("row2"));
put1.add(new KeyValue(Bytes.toBytes("row2"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
.toBytes("version0")));
table.put(put1);
table.flushCommits();
admin.flush(tableName);
admin.close();
put0 = new Put(Bytes.toBytes("row1"));
put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
.toBytes("version1")));
table.put(put0);
table.flushCommits();
admin.flush(tableName);
admin.compact(tableName);
ResultScanner scanner = table.getScanner(scan);
Result result = scanner.next();
List<KeyValue> kvs = result.getColumn(Bytes.toBytes("col"), Bytes.toBytes("q"));
Assert.assertEquals(1, kvs.size());
Assert.assertEquals("version1", Bytes.toString(kvs.get(0).getValue()));
scanner.close();
return table;
}
@Test
public void testBulkLoadWithParallelScan() throws Exception {
String tableName = "testBulkLoadWithParallelScan";
final long l = System.currentTimeMillis();
HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
createTable(admin, tableName);
Scan scan = createScan();
final HTable table = init(admin, l, scan, tableName);
// use bulkload
final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadWithParallelScan/",
"/temp/testBulkLoadWithParallelScan/col/file");
Configuration conf = TEST_UTIL.getConfiguration();
conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
ResultScanner scanner = table.getScanner(scan);
// Create a scanner and then do bulk load
final CountDownLatch latch = new CountDownLatch(1);
new Thread() {
public void run() {
try {
Put put1 = new Put(Bytes.toBytes("row5"));
put1.add(new KeyValue(Bytes.toBytes("row5"), Bytes.toBytes("col"), Bytes.toBytes("q"), l,
Bytes.toBytes("version0")));
table.put(put1);
table.flushCommits();
bulkload.doBulkLoad(hfilePath, table);
latch.countDown();
} catch (TableNotFoundException e) {
} catch (IOException e) {
}
}
}.start();
latch.await();
// By the time we do next() the bulk loaded files are also added to the kv
// scanner
Result result = scanner.next();
scanAfterBulkLoad(scanner, result, "version1");
table.close();
}
private Scan createScan() {
Scan scan = new Scan();
scan.setMaxVersions(3);
return scan;
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
}