From 4da0149ee416a51132f22bc8c6c13cf1cb0a2ae1 Mon Sep 17 00:00:00 2001 From: Jonathan M Hsieh Date: Sat, 16 May 2015 16:45:49 -0700 Subject: [PATCH] HBASE-13531 Flakey failure of TestAcidGuarantees#testMobScanAtomicity (Jingcheng Du) rd point was not properly handled in the mob scanner case. --- .../org/apache/hadoop/hbase/mob/MobFile.java | 14 ++- .../apache/hadoop/hbase/mob/MobFileCache.java | 4 +- .../org/apache/hadoop/hbase/mob/MobUtils.java | 2 +- .../hbase/mob/mapreduce/SweepReducer.java | 2 + .../hadoop/hbase/regionserver/HMobStore.java | 26 ++++- .../hbase/regionserver/MobStoreScanner.java | 2 +- .../regionserver/ReversedMobStoreScanner.java | 2 +- .../hbase/regionserver/StoreScanner.java | 2 +- .../regionserver/TestMobStoreScanner.java | 105 ++++++++++++------ 9 files changed, 113 insertions(+), 46 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java index a120057c89a..09438db9721 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java @@ -71,13 +71,25 @@ public class MobFile { * @throws IOException */ public Cell readCell(Cell search, boolean cacheMobBlocks) throws IOException { + return readCell(search, cacheMobBlocks, sf.getMaxMemstoreTS()); + } + + /** + * Reads a cell from the mob file. + * @param search The cell need to be searched in the mob file. + * @param cacheMobBlocks Should this scanner cache blocks. + * @param readPt the read point. + * @return The cell in the mob file. + * @throws IOException + */ + public Cell readCell(Cell search, boolean cacheMobBlocks, long readPt) throws IOException { Cell result = null; StoreFileScanner scanner = null; List sfs = new ArrayList(); sfs.add(sf); try { List sfScanners = StoreFileScanner.getScannersForStoreFiles(sfs, - cacheMobBlocks, true, false, null, sf.getMaxMemstoreTS()); + cacheMobBlocks, true, false, null, readPt); if (!sfScanners.isEmpty()) { scanner = sfScanners.get(0); if (scanner.seek(search)) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java index cd08a9887d0..7d8c9a58077 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java @@ -196,7 +196,9 @@ public class MobFileCache { */ public MobFile openFile(FileSystem fs, Path path, MobCacheConfig cacheConf) throws IOException { if (!isCacheEnabled) { - return MobFile.create(fs, path, conf, cacheConf); + MobFile mobFile = MobFile.create(fs, path, conf, cacheConf); + mobFile.open(); + return mobFile; } else { String fileName = path.getName(); CachedMobFile cached = map.get(fileName); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java index a11ea88831c..21595973c1e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java @@ -554,7 +554,7 @@ public class MobUtils { HColumnDescriptor family, MobFileName mobFileName, Path basePath, long maxKeyCount, Compression.Algorithm compression, CacheConfig cacheConfig) throws IOException { HFileContext hFileContext = new HFileContextBuilder().withCompression(compression) - .withIncludesMvcc(false).withIncludesTags(true).withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE) + .withIncludesMvcc(true).withIncludesTags(true).withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE) .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).withBlockSize(family.getBlocksize()) .withHBaseCheckSum(true).withDataBlockEncoding(family.getDataBlockEncoding()).build(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java index cbefd8a5be4..f1060ba30fd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java @@ -339,6 +339,7 @@ public class SweepReducer extends Reducer { MobFile file = MobFile.create(fs, new Path(familyDir, mobFileName.getFileName()), conf, cacheConfig); StoreFileScanner scanner = null; + file.open(); try { scanner = file.getScanner(); scanner.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_BYTE_ARRAY)); @@ -356,6 +357,7 @@ public class SweepReducer extends Reducer { if (scanner != null) { scanner.close(); } + file.close(); } toBeDeleted.add(mobFileStat.getFileStatus().getPath()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java index bfd5d422732..6016b39f27d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java @@ -247,7 +247,7 @@ public class HMobStore extends HStore { Compression.Algorithm compression) throws IOException { final CacheConfig writerCacheConf = mobCacheConfig; HFileContext hFileContext = new HFileContextBuilder().withCompression(compression) - .withIncludesMvcc(false).withIncludesTags(true) + .withIncludesMvcc(true).withIncludesTags(true) .withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE) .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM) .withBlockSize(getFamily().getBlocksize()) @@ -305,13 +305,25 @@ public class HMobStore extends HStore { } /** - * Reads the cell from the mob file. + * Reads the cell from the mob file, and the read point does not count. * @param reference The cell found in the HBase, its value is a path to a mob file. * @param cacheBlocks Whether the scanner should cache blocks. * @return The cell found in the mob file. * @throws IOException */ public Cell resolve(Cell reference, boolean cacheBlocks) throws IOException { + return resolve(reference, cacheBlocks, -1); + } + + /** + * Reads the cell from the mob file. + * @param reference The cell found in the HBase, its value is a path to a mob file. + * @param cacheBlocks Whether the scanner should cache blocks. + * @param readPt the read point. + * @return The cell found in the mob file. + * @throws IOException + */ + public Cell resolve(Cell reference, boolean cacheBlocks, long readPt) throws IOException { Cell result = null; if (MobUtils.hasValidMobRefCellValue(reference)) { String fileName = MobUtils.getMobFileName(reference); @@ -336,7 +348,7 @@ public class HMobStore extends HStore { keyLock.releaseLockEntry(lockEntry); } } - result = readCell(locations, fileName, reference, cacheBlocks); + result = readCell(locations, fileName, reference, cacheBlocks, readPt); } } if (result == null) { @@ -363,18 +375,20 @@ public class HMobStore extends HStore { * @param fileName The file to be read. * @param search The cell to be searched. * @param cacheMobBlocks Whether the scanner should cache blocks. + * @param readPt the read point. * @return The found cell. Null if there's no such a cell. * @throws IOException */ - private Cell readCell(List locations, String fileName, Cell search, boolean cacheMobBlocks) - throws IOException { + private Cell readCell(List locations, String fileName, Cell search, boolean cacheMobBlocks, + long readPt) throws IOException { FileSystem fs = getFileSystem(); for (Path location : locations) { MobFile file = null; Path path = new Path(location, fileName); try { file = mobCacheConfig.getMobFileCache().openFile(fs, path, mobCacheConfig); - return file.readCell(search, cacheMobBlocks); + return readPt != -1 ? file.readCell(search, cacheMobBlocks, readPt) : file.readCell(search, + cacheMobBlocks); } catch (IOException e) { mobCacheConfig.getMobFileCache().evictFile(fileName); if ((e instanceof FileNotFoundException) || diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java index 5739df1b590..0521cce53c0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java @@ -66,7 +66,7 @@ public class MobStoreScanner extends StoreScanner { for (int i = 0; i < outResult.size(); i++) { Cell cell = outResult.get(i); if (MobUtils.isMobReferenceCell(cell)) { - Cell mobCell = mobStore.resolve(cell, cacheMobBlocks); + Cell mobCell = mobStore.resolve(cell, cacheMobBlocks, readPt); mobKVCount++; mobKVSize += mobCell.getValueLength(); outResult.set(i, mobCell); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java index 85be382c802..d1cca9834a4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java @@ -66,7 +66,7 @@ public class ReversedMobStoreScanner extends ReversedStoreScanner { for (int i = 0; i < outResult.size(); i++) { Cell cell = outResult.get(i); if (MobUtils.isMobReferenceCell(cell)) { - Cell mobCell = mobStore.resolve(cell, cacheMobBlocks); + Cell mobCell = mobStore.resolve(cell, cacheMobBlocks, readPt); mobKVCount++; mobKVSize += mobCell.getValueLength(); outResult.set(i, mobCell); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index da6ea1cf744..1f203b8e30e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -107,7 +107,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner private boolean scanUsePread = false; protected ReentrantLock lock = new ReentrantLock(); - private final long readPt; + protected final long readPt; // used by the injection framework to test race between StoreScanner construction and compaction enum StoreScannerCompactionRace { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java index 3b5a47473de..01b6a5f8ffd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java @@ -31,9 +31,15 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.*; -import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.mob.MobConstants; import org.apache.hadoop.hbase.mob.MobUtils; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -51,6 +57,7 @@ public class TestMobStoreScanner { private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private final static byte [] row1 = Bytes.toBytes("row1"); + private final static byte [] row2 = Bytes.toBytes("row2"); private final static byte [] family = Bytes.toBytes("family"); private final static byte [] qf1 = Bytes.toBytes("qualifier1"); private final static byte [] qf2 = Bytes.toBytes("qualifier2"); @@ -76,8 +83,8 @@ public class TestMobStoreScanner { TEST_UTIL.shutdownMiniCluster(); } - public void setUp(long threshold, String TN) throws Exception { - desc = new HTableDescriptor(TableName.valueOf(TN)); + public void setUp(long threshold, TableName tn) throws Exception { + desc = new HTableDescriptor(tn); hcd = new HColumnDescriptor(family); hcd.setMobEnabled(true); hcd.setMobThreshold(threshold); @@ -86,7 +93,7 @@ public class TestMobStoreScanner { admin = TEST_UTIL.getHBaseAdmin(); admin.createTable(desc); table = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()) - .getTable(TableName.valueOf(TN)); + .getTable(tn); } /** @@ -136,8 +143,7 @@ public class TestMobStoreScanner { @Test(timeout=60000) public void testGetMassive() throws Exception { - String TN = "testGetMassive"; - setUp(defaultThreshold, TN); + setUp(defaultThreshold, TableName.valueOf("testGetMassive")); // Put some data 5 10, 15, 20 mb ok (this would be right below protobuf default max size of 64MB. // 25, 30, 40 fail. these is above protobuf max size of 64MB @@ -154,10 +160,45 @@ public class TestMobStoreScanner { // should not have blown up. } - public void testGetFromFiles(boolean reversed) throws Exception { - String TN = "testGetFromFiles" + reversed; - TableName tn = TableName.valueOf(TN); - setUp(defaultThreshold, TN); + @Test + public void testReadPt() throws Exception { + TableName tn = TableName.valueOf("testReadPt"); + setUp(0L, tn); + long ts = System.currentTimeMillis(); + byte[] value1 = Bytes.toBytes("value1"); + Put put1 = new Put(row1); + put1.addColumn(family, qf1, ts, value1); + table.put(put1); + Put put2 = new Put(row2); + byte[] value2 = Bytes.toBytes("value2"); + put2.addColumn(family, qf1, ts, value2); + table.put(put2); + + Scan scan = new Scan(); + scan.setCaching(1); + ResultScanner rs = table.getScanner(scan); + + Put put3 = new Put(row1); + byte[] value3 = Bytes.toBytes("value3"); + put3.addColumn(family, qf1, ts, value3); + table.put(put3); + Put put4 = new Put(row2); + byte[] value4 = Bytes.toBytes("value4"); + put4.addColumn(family, qf1, ts, value4); + table.put(put4); + Result result = rs.next(); + Cell cell = result.getColumnLatestCell(family, qf1); + Assert.assertEquals("value1", Bytes.toString(cell.getValue())); + + admin.flush(tn); + result = rs.next(); + cell = result.getColumnLatestCell(family, qf1); + Assert.assertEquals("value2", Bytes.toString(cell.getValue())); + } + + private void testGetFromFiles(boolean reversed) throws Exception { + TableName tn = TableName.valueOf("testGetFromFiles" + reversed); + setUp(defaultThreshold, tn); long ts1 = System.currentTimeMillis(); long ts2 = ts1 + 1; long ts3 = ts1 + 2; @@ -189,9 +230,8 @@ public class TestMobStoreScanner { Assert.assertEquals(3, count); } - public void testGetFromMemStore(boolean reversed) throws Exception { - String TN = "testGetFromMemStore" + reversed; - setUp(defaultThreshold, TN); + private void testGetFromMemStore(boolean reversed) throws Exception { + setUp(defaultThreshold, TableName.valueOf("testGetFromMemStore" + reversed)); long ts1 = System.currentTimeMillis(); long ts2 = ts1 + 1; long ts3 = ts1 + 2; @@ -221,10 +261,9 @@ public class TestMobStoreScanner { Assert.assertEquals(3, count); } - public void testGetReferences(boolean reversed) throws Exception { - String TN = "testGetReferences" + reversed; - TableName tn = TableName.valueOf(TN); - setUp(defaultThreshold, TN); + private void testGetReferences(boolean reversed) throws Exception { + TableName tn = TableName.valueOf("testGetReferences" + reversed); + setUp(defaultThreshold, tn); long ts1 = System.currentTimeMillis(); long ts2 = ts1 + 1; long ts3 = ts1 + 2; @@ -247,7 +286,7 @@ public class TestMobStoreScanner { List cells = res.listCells(); for(Cell cell : cells) { // Verify the value - assertIsMobReference(cell, row1, family, value, TN); + assertIsMobReference(cell, row1, family, value, tn); count++; } } @@ -255,10 +294,9 @@ public class TestMobStoreScanner { Assert.assertEquals(3, count); } - public void testMobThreshold(boolean reversed) throws Exception { - String TN = "testMobThreshold" + reversed; - TableName tn = TableName.valueOf(TN); - setUp(defaultThreshold, TN); + private void testMobThreshold(boolean reversed) throws Exception { + TableName tn = TableName.valueOf("testMobThreshold" + reversed); + setUp(defaultThreshold, tn); byte [] valueLess = generateMobValue((int)defaultThreshold-1); byte [] valueEqual = generateMobValue((int)defaultThreshold); byte [] valueGreater = generateMobValue((int)defaultThreshold+1); @@ -302,14 +340,13 @@ public class TestMobStoreScanner { Assert.assertEquals(3, count); assertNotMobReference(cellLess, row1, family, valueLess); assertNotMobReference(cellEqual, row1, family, valueEqual); - assertIsMobReference(cellGreater, row1, family, valueGreater, TN); + assertIsMobReference(cellGreater, row1, family, valueGreater, tn); results.close(); } - public void testGetFromArchive(boolean reversed) throws Exception { - String TN = "testGetFromArchive" + reversed; - TableName tn = TableName.valueOf(TN); - setUp(defaultThreshold, TN); + private void testGetFromArchive(boolean reversed) throws Exception { + TableName tn = TableName.valueOf("testGetFromArchive" + reversed); + setUp(defaultThreshold, tn); long ts1 = System.currentTimeMillis(); long ts2 = ts1 + 1; long ts3 = ts1 + 2; @@ -325,15 +362,15 @@ public class TestMobStoreScanner { // Get the files in the mob path Path mobFamilyPath; - mobFamilyPath = new Path(MobUtils.getMobRegionPath(TEST_UTIL.getConfiguration(), - TableName.valueOf(TN)), hcd.getNameAsString()); + mobFamilyPath = new Path(MobUtils.getMobRegionPath(TEST_UTIL.getConfiguration(), tn), + hcd.getNameAsString()); FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration()); FileStatus[] files = fs.listStatus(mobFamilyPath); // Get the archive path Path rootDir = FSUtils.getRootDir(TEST_UTIL.getConfiguration()); - Path tableDir = FSUtils.getTableDir(rootDir, TableName.valueOf(TN)); - HRegionInfo regionInfo = MobUtils.getMobRegionInfo(TableName.valueOf(TN)); + Path tableDir = FSUtils.getTableDir(rootDir, tn); + HRegionInfo regionInfo = MobUtils.getMobRegionInfo(tn); Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(TEST_UTIL.getConfiguration(), regionInfo, tableDir, family); @@ -389,7 +426,7 @@ public class TestMobStoreScanner { * Assert the value is store in mob. */ private static void assertIsMobReference(Cell cell, byte[] row, byte[] family, - byte[] value, String TN) throws IOException { + byte[] value, TableName tn) throws IOException { Assert.assertEquals(Bytes.toString(row), Bytes.toString(CellUtil.cloneRow(cell))); Assert.assertEquals(Bytes.toString(family), @@ -403,7 +440,7 @@ public class TestMobStoreScanner { Assert.assertEquals(value.length, valLen); Path mobFamilyPath; mobFamilyPath = new Path(MobUtils.getMobRegionPath(TEST_UTIL.getConfiguration(), - TableName.valueOf(TN)), hcd.getNameAsString()); + tn), hcd.getNameAsString()); Path targetPath = new Path(mobFamilyPath, fileName); FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration()); Assert.assertTrue(fs.exists(targetPath));