From 16823ff55e7da4822d82fb2a2108b4a253fd42f9 Mon Sep 17 00:00:00 2001 From: binlijin Date: Mon, 24 Oct 2016 23:31:14 +0800 Subject: [PATCH] HBASE-16931 Setting cell's seqId to zero in compaction flow might cause RS down. Signed-off-by: Yu Li --- .../regionserver/compactions/Compactor.java | 17 +++- .../hbase/regionserver/TestCompaction.java | 83 ++++++++++++++++++- 2 files changed, 94 insertions(+), 6 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index c6fc0c6dcb1..b6d145de5d3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -26,8 +26,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import com.google.common.io.Closeables; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -58,6 +56,8 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; +import com.google.common.io.Closeables; + /** * A compactor is a compaction algorithm associated a given policy. Base class also contains * reusable parts for implementing compactors (what is common and what isn't is evolving). @@ -75,7 +75,7 @@ public abstract class Compactor { protected final Compression.Algorithm compactionCompression; /** specify how many days to keep MVCC values during major compaction **/ - protected final int keepSeqIdPeriod; + protected int keepSeqIdPeriod; //TODO: depending on Store is not good but, realistically, all compactors currently do. Compactor(final Configuration conf, final Store store) { @@ -435,9 +435,16 @@ public abstract class Compactor { now = EnvironmentEdgeManager.currentTime(); } // output to writer: + Cell lastCleanCell = null; + long lastCleanCellSeqId = 0; for (Cell c : cells) { if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) { + lastCleanCell = c; + lastCleanCellSeqId = c.getSequenceId(); CellUtil.setSequenceId(c, 0); + } else { + lastCleanCell = null; + lastCleanCellSeqId = 0; } writer.append(c); int len = KeyValueUtil.length(c); @@ -459,6 +466,10 @@ public abstract class Compactor { } } } + if (lastCleanCell != null) { + // HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly + CellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId); + } // Log the progress of long running compactions every minute if // logging at DEBUG level if (LOG.isDebugEnabled()) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index da0bf42f084..6b1382fa706 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -50,10 +50,9 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestCase; import org.apache.hadoop.hbase.HBaseTestCase.HRegionIncommon; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Put; @@ -64,10 +63,12 @@ import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; -import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.WAL; import org.junit.After; import org.junit.Assume; import org.junit.Before; @@ -98,6 +99,7 @@ public class TestCompaction { private int compactionThreshold; private byte[] secondRowBytes, thirdRowBytes; private static final long MAX_FILES_TO_COMPACT = 10; + private final byte[] FAMILY = Bytes.toBytes("cf"); /** constructor */ public TestCompaction() { @@ -120,6 +122,15 @@ public class TestCompaction { @Before public void setUp() throws Exception { this.htd = UTIL.createTableDescriptor(name.getMethodName()); + if (name.getMethodName().equals("testCompactionSeqId")) { + UTIL.getConfiguration().set("hbase.hstore.compaction.kv.max", "10"); + UTIL.getConfiguration().set( + DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY, + DummyCompactor.class.getName()); + HColumnDescriptor hcd = new HColumnDescriptor(FAMILY); + hcd.setMaxVersions(65536); + this.htd.addFamily(hcd); + } this.r = UTIL.createLocalHRegion(htd, null, null); } @@ -604,6 +615,72 @@ public class TestCompaction { cst.interruptIfNecessary(); } + /** + * Firstly write 10 cells (with different time stamp) to a qualifier and flush + * to hfile1, then write 10 cells (with different time stamp) to the same + * qualifier and flush to hfile2. The latest cell (cell-A) in hfile1 and the + * oldest cell (cell-B) in hfile2 are with the same time stamp but different + * sequence id, and will get scanned successively during compaction. + *

+ * We set compaction.kv.max to 10 so compaction will scan 10 versions each + * round, meanwhile we set keepSeqIdPeriod=0 in {@link DummyCompactor} so all + * 10 versions of hfile2 will be written out with seqId cleaned (set to 0) + * including cell-B, then when scanner goes to cell-A it will cause a scan + * out-of-order assertion error before HBASE-16931 + * + * @throws Exception + * if error occurs during the test + */ + @Test + public void testCompactionSeqId() throws Exception { + final byte[] ROW = Bytes.toBytes("row"); + final byte[] QUALIFIER = Bytes.toBytes("qualifier"); + + long timestamp = 10000; + + // row1/cf:a/10009/Put/vlen=2/seqid=11 V: v9 + // row1/cf:a/10008/Put/vlen=2/seqid=10 V: v8 + // row1/cf:a/10007/Put/vlen=2/seqid=9 V: v7 + // row1/cf:a/10006/Put/vlen=2/seqid=8 V: v6 + // row1/cf:a/10005/Put/vlen=2/seqid=7 V: v5 + // row1/cf:a/10004/Put/vlen=2/seqid=6 V: v4 + // row1/cf:a/10003/Put/vlen=2/seqid=5 V: v3 + // row1/cf:a/10002/Put/vlen=2/seqid=4 V: v2 + // row1/cf:a/10001/Put/vlen=2/seqid=3 V: v1 + // row1/cf:a/10000/Put/vlen=2/seqid=2 V: v0 + for (int i = 0; i < 10; i++) { + Put put = new Put(ROW); + put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i)); + r.put(put); + } + r.flush(true); + + // row1/cf:a/10018/Put/vlen=3/seqid=16 V: v18 + // row1/cf:a/10017/Put/vlen=3/seqid=17 V: v17 + // row1/cf:a/10016/Put/vlen=3/seqid=18 V: v16 + // row1/cf:a/10015/Put/vlen=3/seqid=19 V: v15 + // row1/cf:a/10014/Put/vlen=3/seqid=20 V: v14 + // row1/cf:a/10013/Put/vlen=3/seqid=21 V: v13 + // row1/cf:a/10012/Put/vlen=3/seqid=22 V: v12 + // row1/cf:a/10011/Put/vlen=3/seqid=23 V: v11 + // row1/cf:a/10010/Put/vlen=3/seqid=24 V: v10 + // row1/cf:a/10009/Put/vlen=2/seqid=25 V: v9 + for (int i = 18; i > 8; i--) { + Put put = new Put(ROW); + put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i)); + r.put(put); + } + r.flush(true); + r.compact(true); + } + + public static class DummyCompactor extends DefaultCompactor { + public DummyCompactor(Configuration conf, Store store) { + super(conf, store); + this.keepSeqIdPeriod = 0; + } + } + private static StoreFile createFile() throws Exception { StoreFile sf = mock(StoreFile.class); when(sf.getPath()).thenReturn(new Path("file"));