HBASE-16578 Mob data loss after mob compaction and normal compaction
This commit is contained in:
parent
acc606571b
commit
67f1ac1f8e
|
@ -453,7 +453,12 @@ public class StoreFile {
|
||||||
// loaded to hbase, these cells have the same seqIds with the old ones. We do not want
|
// loaded to hbase, these cells have the same seqIds with the old ones. We do not want
|
||||||
// to reset new seqIds for them since this might make a mess of the visibility of cells that
|
// to reset new seqIds for them since this might make a mess of the visibility of cells that
|
||||||
// have the same row key but different seqIds.
|
// have the same row key but different seqIds.
|
||||||
this.reader.setSkipResetSeqId(isSkipResetSeqId(metadataMap.get(SKIP_RESET_SEQ_ID)));
|
boolean skipResetSeqId = isSkipResetSeqId(metadataMap.get(SKIP_RESET_SEQ_ID));
|
||||||
|
if (skipResetSeqId) {
|
||||||
|
// increase the seqId when it is a bulk loaded file from mob compaction.
|
||||||
|
this.sequenceid += 1;
|
||||||
|
}
|
||||||
|
this.reader.setSkipResetSeqId(skipResetSeqId);
|
||||||
this.reader.setBulkLoaded(true);
|
this.reader.setBulkLoaded(true);
|
||||||
}
|
}
|
||||||
this.reader.setSequenceID(this.sequenceid);
|
this.reader.setSequenceID(this.sequenceid);
|
||||||
|
|
|
@ -276,7 +276,11 @@ public abstract class Compactor<T extends CellSink> {
|
||||||
// HFiles, and their readers
|
// HFiles, and their readers
|
||||||
readersToClose = new ArrayList<StoreFile>(request.getFiles().size());
|
readersToClose = new ArrayList<StoreFile>(request.getFiles().size());
|
||||||
for (StoreFile f : request.getFiles()) {
|
for (StoreFile f : request.getFiles()) {
|
||||||
readersToClose.add(f.cloneForReader());
|
StoreFile clonedStoreFile = f.cloneForReader();
|
||||||
|
// create the reader after the store file is cloned in case
|
||||||
|
// the sequence id is used for sorting in scanners
|
||||||
|
clonedStoreFile.createReader();
|
||||||
|
readersToClose.add(clonedStoreFile);
|
||||||
}
|
}
|
||||||
scanners = createFileScanners(readersToClose, smallestReadPoint,
|
scanners = createFileScanners(readersToClose, smallestReadPoint,
|
||||||
store.throttleCompaction(request.getSize()));
|
store.throttleCompaction(request.getSize()));
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
package org.apache.hadoop.hbase.mob.compactions;
|
package org.apache.hadoop.hbase.mob.compactions;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.security.Key;
|
import java.security.Key;
|
||||||
|
@ -63,17 +64,23 @@ import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||||
import org.apache.hadoop.hbase.io.HFileLink;
|
import org.apache.hadoop.hbase.io.HFileLink;
|
||||||
import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
|
import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
|
||||||
import org.apache.hadoop.hbase.io.crypto.aes.AES;
|
import org.apache.hadoop.hbase.io.crypto.aes.AES;
|
||||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||||
|
import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
|
||||||
import org.apache.hadoop.hbase.mob.MobConstants;
|
import org.apache.hadoop.hbase.mob.MobConstants;
|
||||||
import org.apache.hadoop.hbase.mob.MobUtils;
|
import org.apache.hadoop.hbase.mob.MobUtils;
|
||||||
import org.apache.hadoop.hbase.regionserver.BloomType;
|
import org.apache.hadoop.hbase.regionserver.BloomType;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.Store;
|
||||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||||
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||||
import org.apache.hadoop.hbase.security.EncryptionUtil;
|
import org.apache.hadoop.hbase.security.EncryptionUtil;
|
||||||
import org.apache.hadoop.hbase.security.User;
|
import org.apache.hadoop.hbase.security.User;
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
|
@ -118,6 +125,9 @@ public class TestMobCompactor {
|
||||||
TEST_UTIL.getConfiguration().set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY,
|
TEST_UTIL.getConfiguration().set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY,
|
||||||
KeyProviderForTesting.class.getName());
|
KeyProviderForTesting.class.getName());
|
||||||
TEST_UTIL.getConfiguration().set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
|
TEST_UTIL.getConfiguration().set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
|
||||||
|
TEST_UTIL.getConfiguration().setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 0);
|
||||||
|
TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 1);
|
||||||
|
TEST_UTIL.getConfiguration().setInt("hbase.hfile.compaction.discharger.interval", 100);
|
||||||
TEST_UTIL.startMiniCluster(1);
|
TEST_UTIL.startMiniCluster(1);
|
||||||
pool = createThreadPool(TEST_UTIL.getConfiguration());
|
pool = createThreadPool(TEST_UTIL.getConfiguration());
|
||||||
conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration(), pool);
|
conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration(), pool);
|
||||||
|
@ -428,33 +438,100 @@ public class TestMobCompactor {
|
||||||
.getName());
|
.getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This case tests the following mob compaction and normal compaction scenario,
|
||||||
|
* after mob compaction, the mob reference in new bulkloaded hfile will win even after it
|
||||||
|
* is compacted with some other normal hfiles. This is to make sure the mvcc is included
|
||||||
|
* after compaction for mob enabled store files.
|
||||||
|
*/
|
||||||
@Test(timeout = 300000)
|
@Test(timeout = 300000)
|
||||||
public void testScannerAfterCompactions() throws Exception {
|
public void testGetAfterCompaction() throws Exception {
|
||||||
resetConf();
|
resetConf();
|
||||||
setUp("testScannerAfterCompactions");
|
conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 0);
|
||||||
long ts = EnvironmentEdgeManager.currentTime();
|
String famStr = "f1";
|
||||||
byte[] key0 = Bytes.toBytes("k0");
|
byte[] fam = Bytes.toBytes(famStr);
|
||||||
byte[] key1 = Bytes.toBytes("k1");
|
byte[] qualifier = Bytes.toBytes("q1");
|
||||||
String value = "mobValue"; // larger than threshold
|
byte[] mobVal = Bytes.toBytes("01234567890");
|
||||||
String newValue = "new";
|
HTableDescriptor hdt = new HTableDescriptor(TableName.valueOf("testGetAfterCompaction"));
|
||||||
Put put0 = new Put(key0);
|
hdt.addCoprocessor(CompactTwoLatestHfilesCopro.class.getName());
|
||||||
put0.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), ts, Bytes.toBytes(value));
|
HColumnDescriptor hcd = new HColumnDescriptor(fam);
|
||||||
loadData(admin, bufMut, tableName, new Put[] { put0 });
|
hcd.setMobEnabled(true);
|
||||||
Put put1 = new Put(key1);
|
hcd.setMobThreshold(10);
|
||||||
put1.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), ts, Bytes.toBytes(value));
|
hcd.setMaxVersions(1);
|
||||||
loadData(admin, bufMut, tableName, new Put[] { put1 });
|
hdt.addFamily(hcd);
|
||||||
put1 = new Put(key1);
|
try {
|
||||||
put1.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), ts, Bytes.toBytes(newValue));
|
Table table = TEST_UTIL.createTable(hdt, null);
|
||||||
loadData(admin, bufMut, tableName, new Put[] { put1 }); // now two mob files
|
HRegion r = TEST_UTIL.getMiniHBaseCluster().getRegions(hdt.getTableName()).get(0);
|
||||||
admin.majorCompact(tableName);
|
Put p = new Put(Bytes.toBytes("r1"));
|
||||||
waitUntilCompactionFinished(tableName);
|
p.addColumn(fam, qualifier, mobVal);
|
||||||
admin.majorCompact(tableName, hcd1.getName(), CompactType.MOB);
|
table.put(p);
|
||||||
waitUntilMobCompactionFinished(tableName);
|
// Create mob file mob1 and reference file ref1
|
||||||
// read the latest cell of key1.
|
TEST_UTIL.flush(table.getName());
|
||||||
Get get = new Get(key1);
|
// Make sure that it is flushed.
|
||||||
Result result = table.get(get);
|
FileSystem fs = r.getRegionFileSystem().getFileSystem();
|
||||||
Cell cell = result.getColumnLatestCell(hcd1.getName(), Bytes.toBytes(qf1));
|
Path path = r.getRegionFileSystem().getStoreDir(famStr);
|
||||||
assertEquals("After compaction: mob value", "new", Bytes.toString(CellUtil.cloneValue(cell)));
|
waitUntilFilesShowup(fs, path, 1);
|
||||||
|
|
||||||
|
p = new Put(Bytes.toBytes("r2"));
|
||||||
|
p.addColumn(fam, qualifier, mobVal);
|
||||||
|
table.put(p);
|
||||||
|
// Create mob file mob2 and reference file ref2
|
||||||
|
TEST_UTIL.flush(table.getName());
|
||||||
|
waitUntilFilesShowup(fs, path, 2);
|
||||||
|
// Do mob compaction to create mob3 and ref3
|
||||||
|
TEST_UTIL.getAdmin().compact(hdt.getTableName(), fam, CompactType.MOB);
|
||||||
|
waitUntilFilesShowup(fs, path, 3);
|
||||||
|
|
||||||
|
// Compact ref3 and ref2 into ref4
|
||||||
|
TEST_UTIL.getAdmin().compact(hdt.getTableName(), fam);
|
||||||
|
waitUntilFilesShowup(fs, path, 2);
|
||||||
|
|
||||||
|
// Sleep for some time, since TimeToLiveHFileCleaner is 0, the next run of
|
||||||
|
// clean chore is guaranteed to clean up files in archive
|
||||||
|
Thread.sleep(100);
|
||||||
|
// Run cleaner to make sure that files in archive directory are cleaned up
|
||||||
|
TEST_UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().choreForTesting();
|
||||||
|
|
||||||
|
// Get "r2"
|
||||||
|
Get get = new Get(Bytes.toBytes("r2"));
|
||||||
|
try {
|
||||||
|
Result result = table.get(get);
|
||||||
|
assertTrue(Arrays.equals(result.getValue(fam, qualifier), mobVal));
|
||||||
|
} catch (IOException e) {
|
||||||
|
assertTrue("The MOB file doesn't exist", false);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
TEST_UTIL.deleteTable(hdt.getTableName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void waitUntilFilesShowup(final FileSystem fs, final Path path, final int num)
|
||||||
|
throws InterruptedException, IOException {
|
||||||
|
FileStatus[] fileList = fs.listStatus(path);
|
||||||
|
while (fileList.length != num) {
|
||||||
|
Thread.sleep(50);
|
||||||
|
fileList = fs.listStatus(path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This copro overwrites the default compaction policy. It always chooses two latest
|
||||||
|
* hfiles and compacts them into a new one.
|
||||||
|
*/
|
||||||
|
public static class CompactTwoLatestHfilesCopro extends BaseRegionObserver {
|
||||||
|
@Override
|
||||||
|
public void preCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c,
|
||||||
|
final Store store, final List<StoreFile> candidates, final CompactionRequest request)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
|
int count = candidates.size();
|
||||||
|
if (count >= 2) {
|
||||||
|
for (int i = 0; i < count - 2; i++) {
|
||||||
|
candidates.remove(0);
|
||||||
|
}
|
||||||
|
c.bypass();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void waitUntilCompactionFinished(TableName tableName) throws IOException,
|
private void waitUntilCompactionFinished(TableName tableName) throws IOException,
|
||||||
|
|
Loading…
Reference in New Issue