From fd5b139a6f60970f575499d5dfb0aa762c590696 Mon Sep 17 00:00:00 2001 From: Ted Yu Date: Mon, 8 Sep 2014 14:49:16 +0000 Subject: [PATCH] HBASE-11772 Bulk load mvcc and seqId issues with native hfiles (Jerry He) --- .../hadoop/hbase/regionserver/StoreFile.java | 24 ++++-- .../hbase/regionserver/StoreFileScanner.java | 2 +- .../regionserver/TestScannerWithBulkload.java | 76 +++++++++++++++++-- 3 files changed, 91 insertions(+), 11 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 27c64f0120b..6a45c47e83b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -315,18 +315,31 @@ public class StoreFile { } /** - * @return true if this storefile was created by HFileOutputFormat - * for a bulk load. + * Check if this storefile was created by bulk load. + * When a hfile is bulk loaded into HBase, we append + * '_SeqId_' to the hfile name, unless + * "hbase.mapreduce.bulkload.assign.sequenceNumbers" is + * explicitly turned off. + * If "hbase.mapreduce.bulkload.assign.sequenceNumbers" + * is turned off, fall back to BULKLOAD_TIME_KEY. + * @return true if this storefile was created by bulk load. */ boolean isBulkLoadResult() { - return metadataMap.containsKey(BULKLOAD_TIME_KEY); + boolean bulkLoadedHFile = false; + String fileName = this.getPath().getName(); + int startPos = fileName.indexOf("SeqId_"); + if (startPos != -1) { + bulkLoadedHFile = true; + } + return bulkLoadedHFile || metadataMap.containsKey(BULKLOAD_TIME_KEY); } /** * Return the timestamp at which this bulk load file was generated. */ public long getBulkLoadTimestamp() { - return Bytes.toLong(metadataMap.get(BULKLOAD_TIME_KEY)); + byte[] bulkLoadTimestamp = metadataMap.get(BULKLOAD_TIME_KEY); + return (bulkLoadTimestamp == null) ? 0 : Bytes.toLong(bulkLoadTimestamp); } /** @@ -372,7 +385,8 @@ public class StoreFile { // generate the sequenceId from the fileName // fileName is of the form _SeqId__ String fileName = this.getPath().getName(); - int startPos = fileName.indexOf("SeqId_"); + // Use lastIndexOf() to get the last, most recent bulk load seqId. + int startPos = fileName.lastIndexOf("SeqId_"); if (startPos != -1) { this.sequenceid = Long.parseLong(fileName.substring(startPos + 6, fileName.indexOf('_', startPos + 6))); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index aa351d3d28b..2784845a33a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -200,7 +200,7 @@ public class StoreFileScanner implements KeyValueScanner { protected void setCurrentCell(Cell newVal) throws IOException { this.cur = newVal; - if (this.cur != null && this.reader.isBulkLoaded() && cur.getSequenceId() <= 0) { + if (this.cur != null && this.reader.isBulkLoaded()) { CellUtil.setSequenceId(cur, this.reader.getSequenceID()); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java index 3ff63944ad1..af4b9c560e9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java @@ -73,7 +73,8 @@ public class TestScannerWithBulkload { Scan scan = createScan(); final HTable table = init(admin, l, scan, tableName); // use bulkload - final Path hfilePath = writeToHFile(l, "/temp/testBulkLoad/", "/temp/testBulkLoad/col/file"); + final Path hfilePath = writeToHFile(l, "/temp/testBulkLoad/", "/temp/testBulkLoad/col/file", + false); Configuration conf = TEST_UTIL.getConfiguration(); conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true); final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf); @@ -101,6 +102,7 @@ public class TestScannerWithBulkload { } result = scanner.next(); } + scanner.close(); table.close(); } @@ -121,7 +123,10 @@ public class TestScannerWithBulkload { return result; } - private Path writeToHFile(long l, String hFilePath, String pathStr) throws IOException { + // If nativeHFile is true, we will set cell seq id and MAX_SEQ_ID_KEY in the file. + // Else, we will set BULKLOAD_TIME_KEY. + private Path writeToHFile(long l, String hFilePath, String pathStr, boolean nativeHFile) + throws IOException { FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration()); final Path hfilePath = new Path(hFilePath); fs.mkdirs(hfilePath); @@ -132,10 +137,26 @@ public class TestScannerWithBulkload { HFile.Writer writer = wf.withPath(fs, path).withFileContext(context).create(); KeyValue kv = new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes.toBytes("version2")); + + // Set cell seq id to test bulk load native hfiles. + if (nativeHFile) { + // Set a big seq id. Scan should not look at this seq id in a bulk loaded file. + // Scan should only look at the seq id appended at the bulk load time, and not skip + // this kv. + kv.setSequenceId(9999999); + } + writer.append(kv); - // Add the bulk load time_key. otherwise we cannot ensure that it is a bulk - // loaded file + + if (nativeHFile) { + // Set a big MAX_SEQ_ID_KEY. Scan should not look at this seq id in a bulk loaded file. + // Scan should only look at the seq id appended at the bulk load time, and not skip its + // kv. + writer.appendFileInfo(StoreFile.MAX_SEQ_ID_KEY, Bytes.toBytes(new Long(9999999))); + } + else { writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis())); + } writer.close(); return hfilePath; } @@ -182,7 +203,7 @@ public class TestScannerWithBulkload { final HTable table = init(admin, l, scan, tableName); // use bulkload final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadWithParallelScan/", - "/temp/testBulkLoadWithParallelScan/col/file"); + "/temp/testBulkLoadWithParallelScan/col/file", false); Configuration conf = TEST_UTIL.getConfiguration(); conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true); final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf); @@ -209,10 +230,55 @@ public class TestScannerWithBulkload { // scanner Result result = scanner.next(); scanAfterBulkLoad(scanner, result, "version1"); + scanner.close(); table.close(); } + @Test + public void testBulkLoadNativeHFile() throws Exception { + String tableName = "testBulkLoadNativeHFile"; + long l = System.currentTimeMillis(); + HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + createTable(admin, tableName); + Scan scan = createScan(); + final HTable table = init(admin, l, scan, tableName); + // use bulkload + final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadNativeHFile/", + "/temp/testBulkLoadNativeHFile/col/file", true); + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true); + final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf); + bulkload.doBulkLoad(hfilePath, table); + ResultScanner scanner = table.getScanner(scan); + Result result = scanner.next(); + // We had 'version0', 'version1' for 'row1,col:q' in the table. + // Bulk load added 'version2' scanner should be able to see 'version2' + result = scanAfterBulkLoad(scanner, result, "version2"); + Put put0 = new Put(Bytes.toBytes("row1")); + put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes + .toBytes("version3"))); + table.put(put0); + table.flushCommits(); + admin.flush(tableName); + scanner = table.getScanner(scan); + result = scanner.next(); + while (result != null) { + List kvs = result.getColumn(Bytes.toBytes("col"), Bytes.toBytes("q")); + for (KeyValue _kv : kvs) { + if (Bytes.toString(_kv.getRow()).equals("row1")) { + System.out.println(Bytes.toString(_kv.getRow())); + System.out.println(Bytes.toString(_kv.getQualifier())); + System.out.println(Bytes.toString(_kv.getValue())); + Assert.assertEquals("version3", Bytes.toString(_kv.getValue())); + } + } + result = scanner.next(); + } + scanner.close(); + table.close(); + } + private Scan createScan() { Scan scan = new Scan(); scan.setMaxVersions(3);