HBASE-11772 Bulk load mvcc and seqId issues with native hfiles (Jerry He)

This commit is contained in:
Ted Yu 2014-09-08 14:49:16 +00:00
parent ecb015d9a3
commit fd5b139a6f
3 changed files with 91 additions and 11 deletions

View File

@ -315,18 +315,31 @@ public class StoreFile {
}
/**
* @return true if this storefile was created by HFileOutputFormat
* for a bulk load.
* Check if this storefile was created by bulk load.
* When a hfile is bulk loaded into HBase, we append
* '_SeqId_<id-when-loaded>' to the hfile name, unless
* "hbase.mapreduce.bulkload.assign.sequenceNumbers" is
* explicitly turned off.
* If "hbase.mapreduce.bulkload.assign.sequenceNumbers"
* is turned off, fall back to BULKLOAD_TIME_KEY.
* @return true if this storefile was created by bulk load.
*/
boolean isBulkLoadResult() {
return metadataMap.containsKey(BULKLOAD_TIME_KEY);
boolean bulkLoadedHFile = false;
String fileName = this.getPath().getName();
int startPos = fileName.indexOf("SeqId_");
if (startPos != -1) {
bulkLoadedHFile = true;
}
return bulkLoadedHFile || metadataMap.containsKey(BULKLOAD_TIME_KEY);
}
/**
* Return the timestamp at which this bulk load file was generated.
*/
public long getBulkLoadTimestamp() {
return Bytes.toLong(metadataMap.get(BULKLOAD_TIME_KEY));
byte[] bulkLoadTimestamp = metadataMap.get(BULKLOAD_TIME_KEY);
return (bulkLoadTimestamp == null) ? 0 : Bytes.toLong(bulkLoadTimestamp);
}
/**
@ -372,7 +385,8 @@ public class StoreFile {
// generate the sequenceId from the fileName
// fileName is of the form <randomName>_SeqId_<id-when-loaded>_
String fileName = this.getPath().getName();
int startPos = fileName.indexOf("SeqId_");
// Use lastIndexOf() to get the last, most recent bulk load seqId.
int startPos = fileName.lastIndexOf("SeqId_");
if (startPos != -1) {
this.sequenceid = Long.parseLong(fileName.substring(startPos + 6,
fileName.indexOf('_', startPos + 6)));

View File

@ -200,7 +200,7 @@ public class StoreFileScanner implements KeyValueScanner {
protected void setCurrentCell(Cell newVal) throws IOException {
this.cur = newVal;
if (this.cur != null && this.reader.isBulkLoaded() && cur.getSequenceId() <= 0) {
if (this.cur != null && this.reader.isBulkLoaded()) {
CellUtil.setSequenceId(cur, this.reader.getSequenceID());
}
}

View File

@ -73,7 +73,8 @@ public class TestScannerWithBulkload {
Scan scan = createScan();
final HTable table = init(admin, l, scan, tableName);
// use bulkload
final Path hfilePath = writeToHFile(l, "/temp/testBulkLoad/", "/temp/testBulkLoad/col/file");
final Path hfilePath = writeToHFile(l, "/temp/testBulkLoad/", "/temp/testBulkLoad/col/file",
false);
Configuration conf = TEST_UTIL.getConfiguration();
conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
@ -101,6 +102,7 @@ public class TestScannerWithBulkload {
}
result = scanner.next();
}
scanner.close();
table.close();
}
@ -121,7 +123,10 @@ public class TestScannerWithBulkload {
return result;
}
private Path writeToHFile(long l, String hFilePath, String pathStr) throws IOException {
// If nativeHFile is true, we will set cell seq id and MAX_SEQ_ID_KEY in the file.
// Else, we will set BULKLOAD_TIME_KEY.
private Path writeToHFile(long l, String hFilePath, String pathStr, boolean nativeHFile)
throws IOException {
FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
final Path hfilePath = new Path(hFilePath);
fs.mkdirs(hfilePath);
@ -132,10 +137,26 @@ public class TestScannerWithBulkload {
HFile.Writer writer = wf.withPath(fs, path).withFileContext(context).create();
KeyValue kv = new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l,
Bytes.toBytes("version2"));
// Set cell seq id to test bulk load native hfiles.
if (nativeHFile) {
// Set a big seq id. Scan should not look at this seq id in a bulk loaded file.
// Scan should only look at the seq id appended at the bulk load time, and not skip
// this kv.
kv.setSequenceId(9999999);
}
writer.append(kv);
// Add the bulk load time_key. otherwise we cannot ensure that it is a bulk
// loaded file
if (nativeHFile) {
// Set a big MAX_SEQ_ID_KEY. Scan should not look at this seq id in a bulk loaded file.
// Scan should only look at the seq id appended at the bulk load time, and not skip its
// kv.
writer.appendFileInfo(StoreFile.MAX_SEQ_ID_KEY, Bytes.toBytes(new Long(9999999)));
}
else {
writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis()));
}
writer.close();
return hfilePath;
}
@ -182,7 +203,7 @@ public class TestScannerWithBulkload {
final HTable table = init(admin, l, scan, tableName);
// use bulkload
final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadWithParallelScan/",
"/temp/testBulkLoadWithParallelScan/col/file");
"/temp/testBulkLoadWithParallelScan/col/file", false);
Configuration conf = TEST_UTIL.getConfiguration();
conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
@ -209,10 +230,55 @@ public class TestScannerWithBulkload {
// scanner
Result result = scanner.next();
scanAfterBulkLoad(scanner, result, "version1");
scanner.close();
table.close();
}
@Test
public void testBulkLoadNativeHFile() throws Exception {
String tableName = "testBulkLoadNativeHFile";
long l = System.currentTimeMillis();
HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
createTable(admin, tableName);
Scan scan = createScan();
final HTable table = init(admin, l, scan, tableName);
// use bulkload
final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadNativeHFile/",
"/temp/testBulkLoadNativeHFile/col/file", true);
Configuration conf = TEST_UTIL.getConfiguration();
conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
bulkload.doBulkLoad(hfilePath, table);
ResultScanner scanner = table.getScanner(scan);
Result result = scanner.next();
// We had 'version0', 'version1' for 'row1,col:q' in the table.
// Bulk load added 'version2' scanner should be able to see 'version2'
result = scanAfterBulkLoad(scanner, result, "version2");
Put put0 = new Put(Bytes.toBytes("row1"));
put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
.toBytes("version3")));
table.put(put0);
table.flushCommits();
admin.flush(tableName);
scanner = table.getScanner(scan);
result = scanner.next();
while (result != null) {
List<KeyValue> kvs = result.getColumn(Bytes.toBytes("col"), Bytes.toBytes("q"));
for (KeyValue _kv : kvs) {
if (Bytes.toString(_kv.getRow()).equals("row1")) {
System.out.println(Bytes.toString(_kv.getRow()));
System.out.println(Bytes.toString(_kv.getQualifier()));
System.out.println(Bytes.toString(_kv.getValue()));
Assert.assertEquals("version3", Bytes.toString(_kv.getValue()));
}
}
result = scanner.next();
}
scanner.close();
table.close();
}
private Scan createScan() {
Scan scan = new Scan();
scan.setMaxVersions(3);