HBASE-11772 Bulk load mvcc and seqId issues with native hfiles (Jerry He)
This commit is contained in:
parent
0a9bfcaf74
commit
f9bea36146
|
@ -315,18 +315,31 @@ public class StoreFile {
|
|||
}
|
||||
|
||||
/**
|
||||
* @return true if this storefile was created by HFileOutputFormat
|
||||
* for a bulk load.
|
||||
* Check if this storefile was created by bulk load.
|
||||
* When a hfile is bulk loaded into HBase, we append
|
||||
* '_SeqId_<id-when-loaded>' to the hfile name, unless
|
||||
* "hbase.mapreduce.bulkload.assign.sequenceNumbers" is
|
||||
* explicitly turned off.
|
||||
* If "hbase.mapreduce.bulkload.assign.sequenceNumbers"
|
||||
* is turned off, fall back to BULKLOAD_TIME_KEY.
|
||||
* @return true if this storefile was created by bulk load.
|
||||
*/
|
||||
boolean isBulkLoadResult() {
|
||||
return metadataMap.containsKey(BULKLOAD_TIME_KEY);
|
||||
boolean bulkLoadedHFile = false;
|
||||
String fileName = this.getPath().getName();
|
||||
int startPos = fileName.indexOf("SeqId_");
|
||||
if (startPos != -1) {
|
||||
bulkLoadedHFile = true;
|
||||
}
|
||||
return bulkLoadedHFile || metadataMap.containsKey(BULKLOAD_TIME_KEY);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the timestamp at which this bulk load file was generated.
|
||||
*/
|
||||
public long getBulkLoadTimestamp() {
|
||||
return Bytes.toLong(metadataMap.get(BULKLOAD_TIME_KEY));
|
||||
byte[] bulkLoadTimestamp = metadataMap.get(BULKLOAD_TIME_KEY);
|
||||
return (bulkLoadTimestamp == null) ? 0 : Bytes.toLong(bulkLoadTimestamp);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -372,7 +385,8 @@ public class StoreFile {
|
|||
// generate the sequenceId from the fileName
|
||||
// fileName is of the form <randomName>_SeqId_<id-when-loaded>_
|
||||
String fileName = this.getPath().getName();
|
||||
int startPos = fileName.indexOf("SeqId_");
|
||||
// Use lastIndexOf() to get the last, most recent bulk load seqId.
|
||||
int startPos = fileName.lastIndexOf("SeqId_");
|
||||
if (startPos != -1) {
|
||||
this.sequenceid = Long.parseLong(fileName.substring(startPos + 6,
|
||||
fileName.indexOf('_', startPos + 6)));
|
||||
|
|
|
@ -200,7 +200,7 @@ public class StoreFileScanner implements KeyValueScanner {
|
|||
|
||||
protected void setCurrentCell(Cell newVal) throws IOException {
|
||||
this.cur = newVal;
|
||||
if (this.cur != null && this.reader.isBulkLoaded() && cur.getSequenceId() <= 0) {
|
||||
if (this.cur != null && this.reader.isBulkLoaded()) {
|
||||
CellUtil.setSequenceId(cur, this.reader.getSequenceID());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -73,7 +73,8 @@ public class TestScannerWithBulkload {
|
|||
Scan scan = createScan();
|
||||
final HTable table = init(admin, l, scan, tableName);
|
||||
// use bulkload
|
||||
final Path hfilePath = writeToHFile(l, "/temp/testBulkLoad/", "/temp/testBulkLoad/col/file");
|
||||
final Path hfilePath = writeToHFile(l, "/temp/testBulkLoad/", "/temp/testBulkLoad/col/file",
|
||||
false);
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
|
||||
final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
|
||||
|
@ -101,6 +102,7 @@ public class TestScannerWithBulkload {
|
|||
}
|
||||
result = scanner.next();
|
||||
}
|
||||
scanner.close();
|
||||
table.close();
|
||||
}
|
||||
|
||||
|
@ -121,7 +123,10 @@ public class TestScannerWithBulkload {
|
|||
return result;
|
||||
}
|
||||
|
||||
private Path writeToHFile(long l, String hFilePath, String pathStr) throws IOException {
|
||||
// If nativeHFile is true, we will set cell seq id and MAX_SEQ_ID_KEY in the file.
|
||||
// Else, we will set BULKLOAD_TIME_KEY.
|
||||
private Path writeToHFile(long l, String hFilePath, String pathStr, boolean nativeHFile)
|
||||
throws IOException {
|
||||
FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
|
||||
final Path hfilePath = new Path(hFilePath);
|
||||
fs.mkdirs(hfilePath);
|
||||
|
@ -132,10 +137,26 @@ public class TestScannerWithBulkload {
|
|||
HFile.Writer writer = wf.withPath(fs, path).withFileContext(context).create();
|
||||
KeyValue kv = new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l,
|
||||
Bytes.toBytes("version2"));
|
||||
|
||||
// Set cell seq id to test bulk load native hfiles.
|
||||
if (nativeHFile) {
|
||||
// Set a big seq id. Scan should not look at this seq id in a bulk loaded file.
|
||||
// Scan should only look at the seq id appended at the bulk load time, and not skip
|
||||
// this kv.
|
||||
kv.setSequenceId(9999999);
|
||||
}
|
||||
|
||||
writer.append(kv);
|
||||
// Add the bulk load time_key. otherwise we cannot ensure that it is a bulk
|
||||
// loaded file
|
||||
|
||||
if (nativeHFile) {
|
||||
// Set a big MAX_SEQ_ID_KEY. Scan should not look at this seq id in a bulk loaded file.
|
||||
// Scan should only look at the seq id appended at the bulk load time, and not skip its
|
||||
// kv.
|
||||
writer.appendFileInfo(StoreFile.MAX_SEQ_ID_KEY, Bytes.toBytes(new Long(9999999)));
|
||||
}
|
||||
else {
|
||||
writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis()));
|
||||
}
|
||||
writer.close();
|
||||
return hfilePath;
|
||||
}
|
||||
|
@ -182,7 +203,7 @@ public class TestScannerWithBulkload {
|
|||
final HTable table = init(admin, l, scan, tableName);
|
||||
// use bulkload
|
||||
final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadWithParallelScan/",
|
||||
"/temp/testBulkLoadWithParallelScan/col/file");
|
||||
"/temp/testBulkLoadWithParallelScan/col/file", false);
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
|
||||
final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
|
||||
|
@ -209,10 +230,55 @@ public class TestScannerWithBulkload {
|
|||
// scanner
|
||||
Result result = scanner.next();
|
||||
scanAfterBulkLoad(scanner, result, "version1");
|
||||
scanner.close();
|
||||
table.close();
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBulkLoadNativeHFile() throws Exception {
|
||||
String tableName = "testBulkLoadNativeHFile";
|
||||
long l = System.currentTimeMillis();
|
||||
HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
|
||||
createTable(admin, tableName);
|
||||
Scan scan = createScan();
|
||||
final HTable table = init(admin, l, scan, tableName);
|
||||
// use bulkload
|
||||
final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadNativeHFile/",
|
||||
"/temp/testBulkLoadNativeHFile/col/file", true);
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
|
||||
final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
|
||||
bulkload.doBulkLoad(hfilePath, table);
|
||||
ResultScanner scanner = table.getScanner(scan);
|
||||
Result result = scanner.next();
|
||||
// We had 'version0', 'version1' for 'row1,col:q' in the table.
|
||||
// Bulk load added 'version2' scanner should be able to see 'version2'
|
||||
result = scanAfterBulkLoad(scanner, result, "version2");
|
||||
Put put0 = new Put(Bytes.toBytes("row1"));
|
||||
put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
|
||||
.toBytes("version3")));
|
||||
table.put(put0);
|
||||
table.flushCommits();
|
||||
admin.flush(tableName);
|
||||
scanner = table.getScanner(scan);
|
||||
result = scanner.next();
|
||||
while (result != null) {
|
||||
List<KeyValue> kvs = result.getColumn(Bytes.toBytes("col"), Bytes.toBytes("q"));
|
||||
for (KeyValue _kv : kvs) {
|
||||
if (Bytes.toString(_kv.getRow()).equals("row1")) {
|
||||
System.out.println(Bytes.toString(_kv.getRow()));
|
||||
System.out.println(Bytes.toString(_kv.getQualifier()));
|
||||
System.out.println(Bytes.toString(_kv.getValue()));
|
||||
Assert.assertEquals("version3", Bytes.toString(_kv.getValue()));
|
||||
}
|
||||
}
|
||||
result = scanner.next();
|
||||
}
|
||||
scanner.close();
|
||||
table.close();
|
||||
}
|
||||
|
||||
private Scan createScan() {
|
||||
Scan scan = new Scan();
|
||||
scan.setMaxVersions(3);
|
||||
|
|
Loading…
Reference in New Issue