HBASE-12030 Wrong compaction report and assert when MOB compaction switches to minor.

This commit is contained in:
anoopsjohn 2014-09-25 09:47:57 +05:30
parent a7f8035ec5
commit a0d9e18ccf
4 changed files with 108 additions and 31 deletions

View File

@ -322,19 +322,19 @@ public class HMobStore extends HStore {
* In order to avoid this, we need mutually exclude the running of the major compaction and
* sweeping in mob files.
* The minor compaction is not affected.
* The major compaction is converted to a minor one when a sweeping is in progress.
* The major compaction is marked as retainDeleteMarkers when a sweeping is in progress.
*/
@Override
public List<StoreFile> compact(CompactionContext compaction) throws IOException {
// If it's major compaction, try to find whether there's a sweeper is running
// If yes, change the major compaction to a minor one.
if (compaction.getRequest().isMajor()) {
// If yes, mark the major compaction as retainDeleteMarkers
if (compaction.getRequest().isAllFiles()) {
// Use the Zookeeper to coordinate.
// 1. Acquire a operation lock.
// 1.1. If no, convert the major compaction to a minor one and continue the compaction.
// 1.1. If no, mark the major compaction as retainDeleteMarkers and continue the compaction.
// 1.2. If the lock is obtained, search the node of sweeping.
// 1.2.1. If the node is there, the sweeping is in progress, convert the major
// compaction to a minor one and continue the compaction.
// 1.2.1. If the node is there, the sweeping is in progress, mark the major
// compaction as retainDeleteMarkers and continue the compaction.
// 1.2.2. If the node is not there, add a child to the major compaction node, and
// run the compaction directly.
String compactionName = UUID.randomUUID().toString().replaceAll("-", "");
@ -342,26 +342,27 @@ public class HMobStore extends HStore {
try {
zk = MobZookeeper.newInstance(region.getBaseConf(), compactionName);
} catch (KeeperException e) {
LOG.error("Cannot connect to the zookeeper, ready to perform the minor compaction instead",
e);
// change the major compaction into a minor one
compaction.getRequest().setIsMajor(false, false);
LOG.error("Cannot connect to the zookeeper, forcing the delete markers to be retained", e);
compaction.getRequest().forceRetainDeleteMarkers();
return super.compact(compaction);
}
boolean major = false;
boolean keepDeleteMarkers = true;
boolean majorCompactNodeAdded = false;
try {
// try to acquire the operation lock.
if (zk.lockColumnFamily(getTableName().getNameAsString(), getFamily().getNameAsString())) {
try {
LOG.info("Obtain the lock for the store[" + this
+ "], ready to perform the major compaction");
+ "], forcing the delete markers to be retained");
// check the sweeping node to find out whether the sweeping is in progress.
boolean hasSweeper = zk.isSweeperZNodeExist(getTableName().getNameAsString(),
getFamily().getNameAsString());
if (!hasSweeper) {
// if not, add a child to the major compaction node of this store.
major = zk.addMajorCompactionZNode(getTableName().getNameAsString(), getFamily()
.getNameAsString(), compactionName);
majorCompactNodeAdded = zk.addMajorCompactionZNode(getTableName().getNameAsString(),
getFamily().getNameAsString(), compactionName);
// If we failed to add the major compact node, go with keep delete markers mode.
keepDeleteMarkers = !majorCompactNodeAdded;
}
} catch (Exception e) {
LOG.error("Fail to handle the Zookeeper", e);
@ -371,17 +372,14 @@ public class HMobStore extends HStore {
}
}
try {
if (major) {
return super.compact(compaction);
} else {
LOG.warn("Cannot obtain the lock or a sweep tool is running on this store["
+ this + "], ready to perform the minor compaction instead");
// change the major compaction into a minor one
compaction.getRequest().setIsMajor(false, false);
return super.compact(compaction);
if (keepDeleteMarkers) {
LOG.warn("Cannot obtain the lock or a sweep tool is running on this store[" + this
+ "], forcing the delete markers to be retained");
compaction.getRequest().forceRetainDeleteMarkers();
}
return super.compact(compaction);
} finally {
if (major) {
if (majorCompactNodeAdded) {
try {
zk.deleteMajorCompactionZNode(getTableName().getNameAsString(), getFamily()
.getNameAsString(), compactionName);

View File

@ -59,6 +59,8 @@ public class CompactionRequest implements Comparable<CompactionRequest> {
private String storeName = "";
private long totalSize = -1L;
private Boolean retainDeleteMarkers = null;
/**
* This ctor should be used by coprocessors that want to subclass CompactionRequest.
*/
@ -200,6 +202,23 @@ public class CompactionRequest implements Comparable<CompactionRequest> {
: (isMajor ? DisplayCompactionType.MAJOR : DisplayCompactionType.ALL_FILES);
}
/**
* Forcefully setting that this compaction has to retain the delete markers in the new compacted
* file, whatever be the type of the compaction.<br>
* Note : By default HBase drops delete markers when the compaction is on all files.
*/
public void forceRetainDeleteMarkers() {
this.retainDeleteMarkers = Boolean.TRUE;
}
/**
* @return Whether the compaction has to retain the delete markers or not.
*/
public boolean isRetainDeleteMarkers() {
return (this.retainDeleteMarkers != null) ? this.retainDeleteMarkers.booleanValue()
: isAllFiles();
}
@Override
public String toString() {
String fsList = Joiner.on(", ").join(

View File

@ -59,8 +59,8 @@ public class DefaultCompactor extends Compactor {
InternalScanner scanner = null;
try {
/* Include deletes, unless we are doing a compaction of all files */
ScanType scanType =
request.isAllFiles() ? ScanType.COMPACT_DROP_DELETES : ScanType.COMPACT_RETAIN_DELETES;
ScanType scanType = request.isRetainDeleteMarkers() ? ScanType.COMPACT_RETAIN_DELETES
: ScanType.COMPACT_DROP_DELETES;
scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners);
if (scanner == null) {
scanner = createScanner(store, scanners, scanType, smallestReadPoint, fd.earliestPutTs);

View File

@ -37,13 +37,16 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestCase.HRegionIncommon;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
@ -53,7 +56,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.mob.MobZookeeper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
@ -99,9 +101,11 @@ public class TestMobCompaction {
UTIL.shutdownMiniCluster();
}
private void init(long mobThreshold) throws Exception {
private void init(Configuration conf, long mobThreshold) throws Exception {
this.conf = conf;
this.mobCellThreshold = mobThreshold;
conf = UTIL.getConfiguration();
HBaseTestingUtility UTIL = new HBaseTestingUtility(conf);
compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
htd = UTIL.createTableDescriptor(name.getMethodName());
hcd = new HColumnDescriptor(COLUMN_FAMILY);
@ -125,7 +129,7 @@ public class TestMobCompaction {
*/
@Test
public void testSmallerValue() throws Exception {
init(500);
init(UTIL.getConfiguration(), 500);
byte[] dummyData = makeDummyData(300); // smaller than mob threshold
HRegionIncommon loader = new HRegionIncommon(region);
// one hfile per row
@ -153,7 +157,7 @@ public class TestMobCompaction {
*/
@Test
public void testLargerValue() throws Exception {
init(200);
init(UTIL.getConfiguration(), 200);
byte[] dummyData = makeDummyData(300); // larger than mob threshold
HRegionIncommon loader = new HRegionIncommon(region);
for (int i = 0; i < compactionThreshold; i++) {
@ -185,7 +189,7 @@ public class TestMobCompaction {
@Test
public void testMobCompactionWithBulkload() throws Exception {
// The following will produce store files of 600.
init(300);
init(UTIL.getConfiguration(), 300);
byte[] dummyData = makeDummyData(600);
Path hbaseRootDir = FSUtils.getRootDir(conf);
@ -216,6 +220,62 @@ public class TestMobCompaction {
assertEquals("After compaction: referenced mob file count", 1, countReferencedMobFiles());
}
/**
* Tests the major compaction when the zk is not connected.
* After that the major compaction will be marked as retainDeleteMarkers, the delete marks
* will be retained.
* @throws Exception
*/
@Test
public void testMajorCompactionWithZKError() throws Exception {
Configuration conf = new Configuration(UTIL.getConfiguration());
// use the wrong zk settings
conf.setInt("zookeeper.recovery.retry", 0);
conf.setInt(HConstants.ZK_SESSION_TIMEOUT, 100);
conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT,
conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2181) - 1);
init(conf, 200);
byte[] dummyData = makeDummyData(300); // larger than mob threshold
HRegionIncommon loader = new HRegionIncommon(region);
byte[] deleteRow = Bytes.toBytes(0);
for (int i = 0; i < compactionThreshold - 1 ; i++) {
Put p = new Put(Bytes.toBytes(i));
p.setDurability(Durability.SKIP_WAL);
p.add(COLUMN_FAMILY, Bytes.toBytes("colX"), dummyData);
loader.put(p);
loader.flushcache();
}
Delete delete = new Delete(deleteRow);
delete.deleteFamily(COLUMN_FAMILY);
region.delete(delete);
loader.flushcache();
assertEquals("Before compaction: store files", compactionThreshold, countStoreFiles());
region.compactStores(true);
assertEquals("After compaction: store files", 1, countStoreFiles());
Scan scan = new Scan();
scan.setRaw(true);
InternalScanner scanner = region.getScanner(scan);
List<Cell> results = new ArrayList<Cell>();
scanner.next(results);
int deleteCount = 0;
while (!results.isEmpty()) {
for (Cell c : results) {
if (c.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) {
deleteCount++;
assertTrue(Bytes.equals(CellUtil.cloneRow(c), deleteRow));
}
}
results.clear();
scanner.next(results);
}
// assert the delete mark is retained, the major compaction is marked as
// retainDeleteMarkers.
assertEquals(1, deleteCount);
scanner.close();
}
private int countStoreFiles() throws IOException {
Store store = region.getStore(COLUMN_FAMILY);
return store.getStorefilesCount();