From dea6480023e78a3facdaf1cfc00ad6cc35ecb3ea Mon Sep 17 00:00:00 2001 From: Ramkrishna Date: Tue, 26 Aug 2014 17:36:37 +0530 Subject: [PATCH] HBASE-11591 Scanner fails to retrieve KV from bulk loaded file with highest sequence id than the cell's mvcc in a non-bulk loaded file (Ram) --- .../hadoop/hbase/regionserver/HRegion.java | 32 ++- .../hadoop/hbase/regionserver/StoreFile.java | 10 + .../hbase/regionserver/StoreFileScanner.java | 40 +++- .../regionserver/TestScanWithBloomError.java | 11 +- .../regionserver/TestScannerWithBulkload.java | 226 ++++++++++++++++++ 5 files changed, 296 insertions(+), 23 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 274c1b3c5d6..73b795706a3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -66,6 +66,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CompoundConfiguration; import org.apache.hadoop.hbase.DoNotRetryIOException; @@ -74,7 +75,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants.OperationStatusCode; -import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; @@ -120,8 +120,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServic import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor; -import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor; import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.wal.HLog; @@ -1741,6 +1741,7 @@ public class HRegion implements HeapSize { // , Writable{ if (this.memstoreSize.get() <= 0) { // Take an update lock because am about to change the sequence id and we want the sequence id // to be at the border of the empty memstore. + MultiVersionConsistencyControl.WriteEntry w = null; this.updatesLock.writeLock().lock(); try { if (this.memstoreSize.get() <= 0) { @@ -1750,13 +1751,29 @@ public class HRegion implements HeapSize { // , Writable{ // sure just beyond the last appended region edit (useful as a marker when bulk loading, // etc.) // wal can be null replaying edits. - return wal != null? - new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, - getNextSequenceId(wal), "Nothing to flush"): - new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush"); + try { + if (wal != null) { + w = mvcc.beginMemstoreInsert(); + long flushSeqId = getNextSequenceId(wal); + FlushResult flushResult = new FlushResult( + FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, flushSeqId, "Nothing to flush"); + w.setWriteNumber(flushSeqId); + mvcc.waitForPreviousTransactionsComplete(w); + w = null; + return flushResult; + } else { + return new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, + "Nothing to flush"); + } + + } finally { + this.updatesLock.writeLock().unlock(); + } } } finally { - this.updatesLock.writeLock().unlock(); + if (w != null) { + mvcc.advanceMemstore(w); + } } } @@ -1864,6 +1881,7 @@ public class HRegion implements HeapSize { // , Writable{ // uncommitted transactions from being written into HFiles. // We have to block before we start the flush, otherwise keys that // were removed via a rollbackMemstore could be written to Hfiles. + w.setWriteNumber(flushSeqId); mvcc.waitForPreviousTransactionsComplete(w); // set w to null to prevent mvcc.advanceMemstore from being called again inside finally block w = null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index ec164ca4158..27c64f0120b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -381,6 +381,7 @@ public class StoreFile { this.sequenceid += 1; } } + this.reader.setBulkLoaded(true); } this.reader.setSequenceID(this.sequenceid); @@ -1009,6 +1010,7 @@ public class StoreFile { protected long sequenceID = -1; private byte[] lastBloomKey; private long deleteFamilyCnt = -1; + private boolean bulkLoadResult = false; public Reader(FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf) throws IOException { @@ -1475,6 +1477,14 @@ public class StoreFile { this.sequenceID = sequenceID; } + public void setBulkLoaded(boolean bulkLoadResult) { + this.bulkLoadResult = bulkLoadResult; + } + + public boolean isBulkLoaded() { + return this.bulkLoadResult; + } + BloomFilter getGeneralBloomFilter() { return generalBloomFilter; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index 1b0759404bc..6474e966fc0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -137,9 +137,10 @@ public class StoreFileScanner implements KeyValueScanner { // only seek if we aren't at the end. cur == null implies 'end'. if (cur != null) { hfs.next(); - cur = hfs.getKeyValue(); - if (hasMVCCInfo) + setCurrentCell(hfs.getKeyValue()); + if (hasMVCCInfo || this.reader.isBulkLoaded()) { skipKVsNewerThanReadpoint(); + } } } catch(IOException e) { throw new IOException("Could not iterate " + this, e); @@ -157,9 +158,13 @@ public class StoreFileScanner implements KeyValueScanner { return false; } - cur = hfs.getKeyValue(); + setCurrentCell(hfs.getKeyValue()); - return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint(); + if (!hasMVCCInfo && this.reader.isBulkLoaded()) { + return skipKVsNewerThanReadpoint(); + } else { + return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint(); + } } finally { realSeekDone = true; } @@ -177,9 +182,13 @@ public class StoreFileScanner implements KeyValueScanner { close(); return false; } - cur = hfs.getKeyValue(); + setCurrentCell(hfs.getKeyValue()); - return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint(); + if (!hasMVCCInfo && this.reader.isBulkLoaded()) { + return skipKVsNewerThanReadpoint(); + } else { + return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint(); + } } finally { realSeekDone = true; } @@ -189,6 +198,15 @@ public class StoreFileScanner implements KeyValueScanner { } } + protected void setCurrentCell(Cell newVal) { + this.cur = newVal; + if(this.cur != null && this.reader.isBulkLoaded() && cur.getSequenceId() <= 0) { + KeyValue curKV = KeyValueUtil.ensureKeyValue(cur); + curKV.setSequenceId(this.reader.getSequenceID()); + cur = curKV; + } + } + protected boolean skipKVsNewerThanReadpoint() throws IOException { // We want to ignore all key-values that are newer than our current // readPoint @@ -197,7 +215,7 @@ public class StoreFileScanner implements KeyValueScanner { && cur != null && (cur.getMvccVersion() > readPt)) { hfs.next(); - cur = hfs.getKeyValue(); + setCurrentCell(hfs.getKeyValue()); if (this.stopSkippingKVsIfNextRow && getComparator().compareRows(cur.getRowArray(), cur.getRowOffset(), cur.getRowLength(), startKV.getRowArray(), startKV.getRowOffset(), @@ -325,7 +343,7 @@ public class StoreFileScanner implements KeyValueScanner { // a higher timestamp than the max timestamp in this file. We know that // the next point when we have to consider this file again is when we // pass the max timestamp of this file (with the same row/column). - cur = KeyValueUtil.createFirstOnRowColTS(kv, maxTimestampInFile); + setCurrentCell(KeyValueUtil.createFirstOnRowColTS(kv, maxTimestampInFile)); } else { // This will be the case e.g. when we need to seek to the next // row/column, and we don't know exactly what they are, so we set the @@ -343,13 +361,13 @@ public class StoreFileScanner implements KeyValueScanner { // key/value and the store scanner will progress to the next column. This // is obviously not a "real real" seek, but unlike the fake KV earlier in // this method, we want this to be propagated to ScanQueryMatcher. - cur = KeyValueUtil.createLastOnRowCol(kv); + setCurrentCell(KeyValueUtil.createLastOnRowCol(kv)); realSeekDone = true; return true; } - Reader getReaderForTesting() { + Reader getReader() { return reader; } @@ -420,7 +438,7 @@ public class StoreFileScanner implements KeyValueScanner { return false; } - cur = hfs.getKeyValue(); + setCurrentCell(hfs.getKeyValue()); this.stopSkippingKVsIfNextRow = true; boolean resultOfSkipKVs; try { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java index 011de0f4781..54e8517ae84 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java @@ -18,6 +18,9 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -53,8 +56,6 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; -import static org.junit.Assert.*; - /** * Test a multi-column scanner when there is a Bloom filter false-positive. * This is needed for the multi-column Bloom filter optimization. @@ -132,8 +133,8 @@ public class TestScanWithBloomError { Collections.sort(scanners, new Comparator() { @Override public int compare(StoreFileScanner s1, StoreFileScanner s2) { - Path p1 = s1.getReaderForTesting().getHFileReader().getPath(); - Path p2 = s2.getReaderForTesting().getHFileReader().getPath(); + Path p1 = s1.getReader().getHFileReader().getPath(); + Path p2 = s2.getReader().getHFileReader().getPath(); long t1, t2; try { t1 = fs.getFileStatus(p1).getModificationTime(); @@ -147,7 +148,7 @@ public class TestScanWithBloomError { StoreFile.Reader lastStoreFileReader = null; for (StoreFileScanner sfScanner : scanners) - lastStoreFileReader = sfScanner.getReaderForTesting(); + lastStoreFileReader = sfScanner.getReader(); new HFilePrettyPrinter().run(new String[]{ "-m", "-p", "-f", lastStoreFileReader.getHFileReader().getPath().toString()}); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java new file mode 100644 index 00000000000..3ff63944ad1 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +import junit.framework.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestScannerWithBulkload { + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(1); + } + + private static void createTable(HBaseAdmin admin, String tableName) throws IOException { + HTableDescriptor desc = new HTableDescriptor(tableName); + HColumnDescriptor hcd = new HColumnDescriptor("col"); + hcd.setMaxVersions(3); + desc.addFamily(hcd); + admin.createTable(desc); + } + + @Test + public void testBulkLoad() throws Exception { + String tableName = "testBulkLoad"; + long l = System.currentTimeMillis(); + HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + createTable(admin, tableName); + Scan scan = createScan(); + final HTable table = init(admin, l, scan, tableName); + // use bulkload + final Path hfilePath = writeToHFile(l, "/temp/testBulkLoad/", "/temp/testBulkLoad/col/file"); + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true); + final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf); + bulkload.doBulkLoad(hfilePath, table); + ResultScanner scanner = table.getScanner(scan); + Result result = scanner.next(); + result = scanAfterBulkLoad(scanner, result, "version2"); + Put put0 = new Put(Bytes.toBytes("row1")); + put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes + .toBytes("version3"))); + table.put(put0); + table.flushCommits(); + admin.flush(tableName); + scanner = table.getScanner(scan); + result = scanner.next(); + while (result != null) { + List kvs = result.getColumn(Bytes.toBytes("col"), Bytes.toBytes("q")); + for (KeyValue _kv : kvs) { + if (Bytes.toString(_kv.getRow()).equals("row1")) { + System.out.println(Bytes.toString(_kv.getRow())); + System.out.println(Bytes.toString(_kv.getQualifier())); + System.out.println(Bytes.toString(_kv.getValue())); + Assert.assertEquals("version3", Bytes.toString(_kv.getValue())); + } + } + result = scanner.next(); + } + table.close(); + } + + private Result scanAfterBulkLoad(ResultScanner scanner, Result result, String expctedVal) + throws IOException { + while (result != null) { + List kvs = result.getColumn(Bytes.toBytes("col"), Bytes.toBytes("q")); + for (KeyValue _kv : kvs) { + if (Bytes.toString(_kv.getRow()).equals("row1")) { + System.out.println(Bytes.toString(_kv.getRow())); + System.out.println(Bytes.toString(_kv.getQualifier())); + System.out.println(Bytes.toString(_kv.getValue())); + Assert.assertEquals(expctedVal, Bytes.toString(_kv.getValue())); + } + } + result = scanner.next(); + } + return result; + } + + private Path writeToHFile(long l, String hFilePath, String pathStr) throws IOException { + FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration()); + final Path hfilePath = new Path(hFilePath); + fs.mkdirs(hfilePath); + Path path = new Path(pathStr); + HFile.WriterFactory wf = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration()); + Assert.assertNotNull(wf); + HFileContext context = new HFileContext(); + HFile.Writer writer = wf.withPath(fs, path).withFileContext(context).create(); + KeyValue kv = new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, + Bytes.toBytes("version2")); + writer.append(kv); + // Add the bulk load time_key. otherwise we cannot ensure that it is a bulk + // loaded file + writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis())); + writer.close(); + return hfilePath; + } + + private HTable init(HBaseAdmin admin, long l, Scan scan, String tableName) throws Exception { + HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName); + Put put0 = new Put(Bytes.toBytes("row1")); + put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes + .toBytes("version0"))); + table.put(put0); + table.flushCommits(); + admin.flush(tableName); + Put put1 = new Put(Bytes.toBytes("row2")); + put1.add(new KeyValue(Bytes.toBytes("row2"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes + .toBytes("version0"))); + table.put(put1); + table.flushCommits(); + admin.flush(tableName); + admin.close(); + put0 = new Put(Bytes.toBytes("row1")); + put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes + .toBytes("version1"))); + table.put(put0); + table.flushCommits(); + admin.flush(tableName); + admin.compact(tableName); + + ResultScanner scanner = table.getScanner(scan); + Result result = scanner.next(); + List kvs = result.getColumn(Bytes.toBytes("col"), Bytes.toBytes("q")); + Assert.assertEquals(1, kvs.size()); + Assert.assertEquals("version1", Bytes.toString(kvs.get(0).getValue())); + scanner.close(); + return table; + } + + @Test + public void testBulkLoadWithParallelScan() throws Exception { + String tableName = "testBulkLoadWithParallelScan"; + final long l = System.currentTimeMillis(); + HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + createTable(admin, tableName); + Scan scan = createScan(); + final HTable table = init(admin, l, scan, tableName); + // use bulkload + final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadWithParallelScan/", + "/temp/testBulkLoadWithParallelScan/col/file"); + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true); + final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf); + ResultScanner scanner = table.getScanner(scan); + // Create a scanner and then do bulk load + final CountDownLatch latch = new CountDownLatch(1); + new Thread() { + public void run() { + try { + Put put1 = new Put(Bytes.toBytes("row5")); + put1.add(new KeyValue(Bytes.toBytes("row5"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, + Bytes.toBytes("version0"))); + table.put(put1); + table.flushCommits(); + bulkload.doBulkLoad(hfilePath, table); + latch.countDown(); + } catch (TableNotFoundException e) { + } catch (IOException e) { + } + } + }.start(); + latch.await(); + // By the time we do next() the bulk loaded files are also added to the kv + // scanner + Result result = scanner.next(); + scanAfterBulkLoad(scanner, result, "version1"); + table.close(); + + } + + private Scan createScan() { + Scan scan = new Scan(); + scan.setMaxVersions(3); + return scan; + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } +}