HBASE-13531 Flakey failure of TestAcidGuarantees#testMobScanAtomicity (Jingcheng Du)
rd point was not properly handled in the mob scanner case.
This commit is contained in:
parent
0e20bbf6a3
commit
4da0149ee4
@ -71,13 +71,25 @@ public class MobFile {
|
||||
* @throws IOException
|
||||
*/
|
||||
public Cell readCell(Cell search, boolean cacheMobBlocks) throws IOException {
|
||||
return readCell(search, cacheMobBlocks, sf.getMaxMemstoreTS());
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads a cell from the mob file.
|
||||
* @param search The cell need to be searched in the mob file.
|
||||
* @param cacheMobBlocks Should this scanner cache blocks.
|
||||
* @param readPt the read point.
|
||||
* @return The cell in the mob file.
|
||||
* @throws IOException
|
||||
*/
|
||||
public Cell readCell(Cell search, boolean cacheMobBlocks, long readPt) throws IOException {
|
||||
Cell result = null;
|
||||
StoreFileScanner scanner = null;
|
||||
List<StoreFile> sfs = new ArrayList<StoreFile>();
|
||||
sfs.add(sf);
|
||||
try {
|
||||
List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(sfs,
|
||||
cacheMobBlocks, true, false, null, sf.getMaxMemstoreTS());
|
||||
cacheMobBlocks, true, false, null, readPt);
|
||||
if (!sfScanners.isEmpty()) {
|
||||
scanner = sfScanners.get(0);
|
||||
if (scanner.seek(search)) {
|
||||
|
@ -196,7 +196,9 @@ public class MobFileCache {
|
||||
*/
|
||||
public MobFile openFile(FileSystem fs, Path path, MobCacheConfig cacheConf) throws IOException {
|
||||
if (!isCacheEnabled) {
|
||||
return MobFile.create(fs, path, conf, cacheConf);
|
||||
MobFile mobFile = MobFile.create(fs, path, conf, cacheConf);
|
||||
mobFile.open();
|
||||
return mobFile;
|
||||
} else {
|
||||
String fileName = path.getName();
|
||||
CachedMobFile cached = map.get(fileName);
|
||||
|
@ -554,7 +554,7 @@ public class MobUtils {
|
||||
HColumnDescriptor family, MobFileName mobFileName, Path basePath, long maxKeyCount,
|
||||
Compression.Algorithm compression, CacheConfig cacheConfig) throws IOException {
|
||||
HFileContext hFileContext = new HFileContextBuilder().withCompression(compression)
|
||||
.withIncludesMvcc(false).withIncludesTags(true).withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE)
|
||||
.withIncludesMvcc(true).withIncludesTags(true).withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE)
|
||||
.withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).withBlockSize(family.getBlocksize())
|
||||
.withHBaseCheckSum(true).withDataBlockEncoding(family.getDataBlockEncoding()).build();
|
||||
|
||||
|
@ -339,6 +339,7 @@ public class SweepReducer extends Reducer<Text, KeyValue, Writable, Writable> {
|
||||
MobFile file = MobFile.create(fs,
|
||||
new Path(familyDir, mobFileName.getFileName()), conf, cacheConfig);
|
||||
StoreFileScanner scanner = null;
|
||||
file.open();
|
||||
try {
|
||||
scanner = file.getScanner();
|
||||
scanner.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_BYTE_ARRAY));
|
||||
@ -356,6 +357,7 @@ public class SweepReducer extends Reducer<Text, KeyValue, Writable, Writable> {
|
||||
if (scanner != null) {
|
||||
scanner.close();
|
||||
}
|
||||
file.close();
|
||||
}
|
||||
toBeDeleted.add(mobFileStat.getFileStatus().getPath());
|
||||
}
|
||||
|
@ -247,7 +247,7 @@ public class HMobStore extends HStore {
|
||||
Compression.Algorithm compression) throws IOException {
|
||||
final CacheConfig writerCacheConf = mobCacheConfig;
|
||||
HFileContext hFileContext = new HFileContextBuilder().withCompression(compression)
|
||||
.withIncludesMvcc(false).withIncludesTags(true)
|
||||
.withIncludesMvcc(true).withIncludesTags(true)
|
||||
.withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE)
|
||||
.withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
|
||||
.withBlockSize(getFamily().getBlocksize())
|
||||
@ -305,13 +305,25 @@ public class HMobStore extends HStore {
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads the cell from the mob file.
|
||||
* Reads the cell from the mob file, and the read point does not count.
|
||||
* @param reference The cell found in the HBase, its value is a path to a mob file.
|
||||
* @param cacheBlocks Whether the scanner should cache blocks.
|
||||
* @return The cell found in the mob file.
|
||||
* @throws IOException
|
||||
*/
|
||||
public Cell resolve(Cell reference, boolean cacheBlocks) throws IOException {
|
||||
return resolve(reference, cacheBlocks, -1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads the cell from the mob file.
|
||||
* @param reference The cell found in the HBase, its value is a path to a mob file.
|
||||
* @param cacheBlocks Whether the scanner should cache blocks.
|
||||
* @param readPt the read point.
|
||||
* @return The cell found in the mob file.
|
||||
* @throws IOException
|
||||
*/
|
||||
public Cell resolve(Cell reference, boolean cacheBlocks, long readPt) throws IOException {
|
||||
Cell result = null;
|
||||
if (MobUtils.hasValidMobRefCellValue(reference)) {
|
||||
String fileName = MobUtils.getMobFileName(reference);
|
||||
@ -336,7 +348,7 @@ public class HMobStore extends HStore {
|
||||
keyLock.releaseLockEntry(lockEntry);
|
||||
}
|
||||
}
|
||||
result = readCell(locations, fileName, reference, cacheBlocks);
|
||||
result = readCell(locations, fileName, reference, cacheBlocks, readPt);
|
||||
}
|
||||
}
|
||||
if (result == null) {
|
||||
@ -363,18 +375,20 @@ public class HMobStore extends HStore {
|
||||
* @param fileName The file to be read.
|
||||
* @param search The cell to be searched.
|
||||
* @param cacheMobBlocks Whether the scanner should cache blocks.
|
||||
* @param readPt the read point.
|
||||
* @return The found cell. Null if there's no such a cell.
|
||||
* @throws IOException
|
||||
*/
|
||||
private Cell readCell(List<Path> locations, String fileName, Cell search, boolean cacheMobBlocks)
|
||||
throws IOException {
|
||||
private Cell readCell(List<Path> locations, String fileName, Cell search, boolean cacheMobBlocks,
|
||||
long readPt) throws IOException {
|
||||
FileSystem fs = getFileSystem();
|
||||
for (Path location : locations) {
|
||||
MobFile file = null;
|
||||
Path path = new Path(location, fileName);
|
||||
try {
|
||||
file = mobCacheConfig.getMobFileCache().openFile(fs, path, mobCacheConfig);
|
||||
return file.readCell(search, cacheMobBlocks);
|
||||
return readPt != -1 ? file.readCell(search, cacheMobBlocks, readPt) : file.readCell(search,
|
||||
cacheMobBlocks);
|
||||
} catch (IOException e) {
|
||||
mobCacheConfig.getMobFileCache().evictFile(fileName);
|
||||
if ((e instanceof FileNotFoundException) ||
|
||||
|
@ -66,7 +66,7 @@ public class MobStoreScanner extends StoreScanner {
|
||||
for (int i = 0; i < outResult.size(); i++) {
|
||||
Cell cell = outResult.get(i);
|
||||
if (MobUtils.isMobReferenceCell(cell)) {
|
||||
Cell mobCell = mobStore.resolve(cell, cacheMobBlocks);
|
||||
Cell mobCell = mobStore.resolve(cell, cacheMobBlocks, readPt);
|
||||
mobKVCount++;
|
||||
mobKVSize += mobCell.getValueLength();
|
||||
outResult.set(i, mobCell);
|
||||
|
@ -66,7 +66,7 @@ public class ReversedMobStoreScanner extends ReversedStoreScanner {
|
||||
for (int i = 0; i < outResult.size(); i++) {
|
||||
Cell cell = outResult.get(i);
|
||||
if (MobUtils.isMobReferenceCell(cell)) {
|
||||
Cell mobCell = mobStore.resolve(cell, cacheMobBlocks);
|
||||
Cell mobCell = mobStore.resolve(cell, cacheMobBlocks, readPt);
|
||||
mobKVCount++;
|
||||
mobKVSize += mobCell.getValueLength();
|
||||
outResult.set(i, mobCell);
|
||||
|
@ -107,7 +107,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||
private boolean scanUsePread = false;
|
||||
protected ReentrantLock lock = new ReentrantLock();
|
||||
|
||||
private final long readPt;
|
||||
protected final long readPt;
|
||||
|
||||
// used by the injection framework to test race between StoreScanner construction and compaction
|
||||
enum StoreScannerCompactionRace {
|
||||
|
@ -31,9 +31,15 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.*;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.mob.MobConstants;
|
||||
import org.apache.hadoop.hbase.mob.MobUtils;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
@ -51,6 +57,7 @@ public class TestMobStoreScanner {
|
||||
|
||||
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private final static byte [] row1 = Bytes.toBytes("row1");
|
||||
private final static byte [] row2 = Bytes.toBytes("row2");
|
||||
private final static byte [] family = Bytes.toBytes("family");
|
||||
private final static byte [] qf1 = Bytes.toBytes("qualifier1");
|
||||
private final static byte [] qf2 = Bytes.toBytes("qualifier2");
|
||||
@ -76,8 +83,8 @@ public class TestMobStoreScanner {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
public void setUp(long threshold, String TN) throws Exception {
|
||||
desc = new HTableDescriptor(TableName.valueOf(TN));
|
||||
public void setUp(long threshold, TableName tn) throws Exception {
|
||||
desc = new HTableDescriptor(tn);
|
||||
hcd = new HColumnDescriptor(family);
|
||||
hcd.setMobEnabled(true);
|
||||
hcd.setMobThreshold(threshold);
|
||||
@ -86,7 +93,7 @@ public class TestMobStoreScanner {
|
||||
admin = TEST_UTIL.getHBaseAdmin();
|
||||
admin.createTable(desc);
|
||||
table = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())
|
||||
.getTable(TableName.valueOf(TN));
|
||||
.getTable(tn);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -136,8 +143,7 @@ public class TestMobStoreScanner {
|
||||
|
||||
@Test(timeout=60000)
|
||||
public void testGetMassive() throws Exception {
|
||||
String TN = "testGetMassive";
|
||||
setUp(defaultThreshold, TN);
|
||||
setUp(defaultThreshold, TableName.valueOf("testGetMassive"));
|
||||
|
||||
// Put some data 5 10, 15, 20 mb ok (this would be right below protobuf default max size of 64MB.
|
||||
// 25, 30, 40 fail. these is above protobuf max size of 64MB
|
||||
@ -154,10 +160,45 @@ public class TestMobStoreScanner {
|
||||
// should not have blown up.
|
||||
}
|
||||
|
||||
public void testGetFromFiles(boolean reversed) throws Exception {
|
||||
String TN = "testGetFromFiles" + reversed;
|
||||
TableName tn = TableName.valueOf(TN);
|
||||
setUp(defaultThreshold, TN);
|
||||
@Test
|
||||
public void testReadPt() throws Exception {
|
||||
TableName tn = TableName.valueOf("testReadPt");
|
||||
setUp(0L, tn);
|
||||
long ts = System.currentTimeMillis();
|
||||
byte[] value1 = Bytes.toBytes("value1");
|
||||
Put put1 = new Put(row1);
|
||||
put1.addColumn(family, qf1, ts, value1);
|
||||
table.put(put1);
|
||||
Put put2 = new Put(row2);
|
||||
byte[] value2 = Bytes.toBytes("value2");
|
||||
put2.addColumn(family, qf1, ts, value2);
|
||||
table.put(put2);
|
||||
|
||||
Scan scan = new Scan();
|
||||
scan.setCaching(1);
|
||||
ResultScanner rs = table.getScanner(scan);
|
||||
|
||||
Put put3 = new Put(row1);
|
||||
byte[] value3 = Bytes.toBytes("value3");
|
||||
put3.addColumn(family, qf1, ts, value3);
|
||||
table.put(put3);
|
||||
Put put4 = new Put(row2);
|
||||
byte[] value4 = Bytes.toBytes("value4");
|
||||
put4.addColumn(family, qf1, ts, value4);
|
||||
table.put(put4);
|
||||
Result result = rs.next();
|
||||
Cell cell = result.getColumnLatestCell(family, qf1);
|
||||
Assert.assertEquals("value1", Bytes.toString(cell.getValue()));
|
||||
|
||||
admin.flush(tn);
|
||||
result = rs.next();
|
||||
cell = result.getColumnLatestCell(family, qf1);
|
||||
Assert.assertEquals("value2", Bytes.toString(cell.getValue()));
|
||||
}
|
||||
|
||||
private void testGetFromFiles(boolean reversed) throws Exception {
|
||||
TableName tn = TableName.valueOf("testGetFromFiles" + reversed);
|
||||
setUp(defaultThreshold, tn);
|
||||
long ts1 = System.currentTimeMillis();
|
||||
long ts2 = ts1 + 1;
|
||||
long ts3 = ts1 + 2;
|
||||
@ -189,9 +230,8 @@ public class TestMobStoreScanner {
|
||||
Assert.assertEquals(3, count);
|
||||
}
|
||||
|
||||
public void testGetFromMemStore(boolean reversed) throws Exception {
|
||||
String TN = "testGetFromMemStore" + reversed;
|
||||
setUp(defaultThreshold, TN);
|
||||
private void testGetFromMemStore(boolean reversed) throws Exception {
|
||||
setUp(defaultThreshold, TableName.valueOf("testGetFromMemStore" + reversed));
|
||||
long ts1 = System.currentTimeMillis();
|
||||
long ts2 = ts1 + 1;
|
||||
long ts3 = ts1 + 2;
|
||||
@ -221,10 +261,9 @@ public class TestMobStoreScanner {
|
||||
Assert.assertEquals(3, count);
|
||||
}
|
||||
|
||||
public void testGetReferences(boolean reversed) throws Exception {
|
||||
String TN = "testGetReferences" + reversed;
|
||||
TableName tn = TableName.valueOf(TN);
|
||||
setUp(defaultThreshold, TN);
|
||||
private void testGetReferences(boolean reversed) throws Exception {
|
||||
TableName tn = TableName.valueOf("testGetReferences" + reversed);
|
||||
setUp(defaultThreshold, tn);
|
||||
long ts1 = System.currentTimeMillis();
|
||||
long ts2 = ts1 + 1;
|
||||
long ts3 = ts1 + 2;
|
||||
@ -247,7 +286,7 @@ public class TestMobStoreScanner {
|
||||
List<Cell> cells = res.listCells();
|
||||
for(Cell cell : cells) {
|
||||
// Verify the value
|
||||
assertIsMobReference(cell, row1, family, value, TN);
|
||||
assertIsMobReference(cell, row1, family, value, tn);
|
||||
count++;
|
||||
}
|
||||
}
|
||||
@ -255,10 +294,9 @@ public class TestMobStoreScanner {
|
||||
Assert.assertEquals(3, count);
|
||||
}
|
||||
|
||||
public void testMobThreshold(boolean reversed) throws Exception {
|
||||
String TN = "testMobThreshold" + reversed;
|
||||
TableName tn = TableName.valueOf(TN);
|
||||
setUp(defaultThreshold, TN);
|
||||
private void testMobThreshold(boolean reversed) throws Exception {
|
||||
TableName tn = TableName.valueOf("testMobThreshold" + reversed);
|
||||
setUp(defaultThreshold, tn);
|
||||
byte [] valueLess = generateMobValue((int)defaultThreshold-1);
|
||||
byte [] valueEqual = generateMobValue((int)defaultThreshold);
|
||||
byte [] valueGreater = generateMobValue((int)defaultThreshold+1);
|
||||
@ -302,14 +340,13 @@ public class TestMobStoreScanner {
|
||||
Assert.assertEquals(3, count);
|
||||
assertNotMobReference(cellLess, row1, family, valueLess);
|
||||
assertNotMobReference(cellEqual, row1, family, valueEqual);
|
||||
assertIsMobReference(cellGreater, row1, family, valueGreater, TN);
|
||||
assertIsMobReference(cellGreater, row1, family, valueGreater, tn);
|
||||
results.close();
|
||||
}
|
||||
|
||||
public void testGetFromArchive(boolean reversed) throws Exception {
|
||||
String TN = "testGetFromArchive" + reversed;
|
||||
TableName tn = TableName.valueOf(TN);
|
||||
setUp(defaultThreshold, TN);
|
||||
private void testGetFromArchive(boolean reversed) throws Exception {
|
||||
TableName tn = TableName.valueOf("testGetFromArchive" + reversed);
|
||||
setUp(defaultThreshold, tn);
|
||||
long ts1 = System.currentTimeMillis();
|
||||
long ts2 = ts1 + 1;
|
||||
long ts3 = ts1 + 2;
|
||||
@ -325,15 +362,15 @@ public class TestMobStoreScanner {
|
||||
|
||||
// Get the files in the mob path
|
||||
Path mobFamilyPath;
|
||||
mobFamilyPath = new Path(MobUtils.getMobRegionPath(TEST_UTIL.getConfiguration(),
|
||||
TableName.valueOf(TN)), hcd.getNameAsString());
|
||||
mobFamilyPath = new Path(MobUtils.getMobRegionPath(TEST_UTIL.getConfiguration(), tn),
|
||||
hcd.getNameAsString());
|
||||
FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
|
||||
FileStatus[] files = fs.listStatus(mobFamilyPath);
|
||||
|
||||
// Get the archive path
|
||||
Path rootDir = FSUtils.getRootDir(TEST_UTIL.getConfiguration());
|
||||
Path tableDir = FSUtils.getTableDir(rootDir, TableName.valueOf(TN));
|
||||
HRegionInfo regionInfo = MobUtils.getMobRegionInfo(TableName.valueOf(TN));
|
||||
Path tableDir = FSUtils.getTableDir(rootDir, tn);
|
||||
HRegionInfo regionInfo = MobUtils.getMobRegionInfo(tn);
|
||||
Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(TEST_UTIL.getConfiguration(),
|
||||
regionInfo, tableDir, family);
|
||||
|
||||
@ -389,7 +426,7 @@ public class TestMobStoreScanner {
|
||||
* Assert the value is store in mob.
|
||||
*/
|
||||
private static void assertIsMobReference(Cell cell, byte[] row, byte[] family,
|
||||
byte[] value, String TN) throws IOException {
|
||||
byte[] value, TableName tn) throws IOException {
|
||||
Assert.assertEquals(Bytes.toString(row),
|
||||
Bytes.toString(CellUtil.cloneRow(cell)));
|
||||
Assert.assertEquals(Bytes.toString(family),
|
||||
@ -403,7 +440,7 @@ public class TestMobStoreScanner {
|
||||
Assert.assertEquals(value.length, valLen);
|
||||
Path mobFamilyPath;
|
||||
mobFamilyPath = new Path(MobUtils.getMobRegionPath(TEST_UTIL.getConfiguration(),
|
||||
TableName.valueOf(TN)), hcd.getNameAsString());
|
||||
tn), hcd.getNameAsString());
|
||||
Path targetPath = new Path(mobFamilyPath, fileName);
|
||||
FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
|
||||
Assert.assertTrue(fs.exists(targetPath));
|
||||
|
Loading…
x
Reference in New Issue
Block a user