diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 4e362c7d8a3..a5a48f3db44 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -77,7 +77,7 @@ public abstract class Compactor { protected final Compression.Algorithm compactionCompression; /** specify how many days to keep MVCC values during major compaction **/ - protected final int keepSeqIdPeriod; + protected int keepSeqIdPeriod; //TODO: depending on Store is not good but, realistically, all compactors currently do. Compactor(final Configuration conf, final Store store) { @@ -419,9 +419,16 @@ public abstract class Compactor { now = EnvironmentEdgeManager.currentTime(); } // output to writer: + Cell lastCleanCell = null; + long lastCleanCellSeqId = 0; for (Cell c : cells) { if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) { + lastCleanCell = c; + lastCleanCellSeqId = c.getSequenceId(); CellUtil.setSequenceId(c, 0); + } else { + lastCleanCell = null; + lastCleanCellSeqId = 0; } writer.append(c); int len = KeyValueUtil.length(c); @@ -444,6 +451,12 @@ public abstract class Compactor { } } if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) { + if (lastCleanCell != null) { + // HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly. + // ShipperListener will do a clone of the last cells it refer, so need to set back + // sequence id before ShipperListener.beforeShipped + CellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId); + } // Clone the cells that are in the writer so that they are freed of references, // if they are holding any. ((ShipperListener)writer).beforeShipped(); @@ -457,6 +470,10 @@ public abstract class Compactor { bytesWrittenProgressForShippedCall = 0; } } + if (lastCleanCell != null) { + // HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly + CellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId); + } // Log the progress of long running compactions every minute if // logging at DEBUG level if (LOG.isDebugEnabled()) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index fa630a21d4c..7b4e4ec11bd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -47,11 +47,9 @@ import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestCase; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Put; @@ -63,10 +61,13 @@ import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; -import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.WAL; import org.junit.After; import org.junit.Assume; import org.junit.Before; @@ -96,6 +97,7 @@ public class TestCompaction { private int compactionThreshold; private byte[] secondRowBytes, thirdRowBytes; private static final long MAX_FILES_TO_COMPACT = 10; + private final byte[] FAMILY = Bytes.toBytes("cf"); /** constructor */ public TestCompaction() { @@ -118,6 +120,15 @@ public class TestCompaction { @Before public void setUp() throws Exception { this.htd = UTIL.createTableDescriptor(name.getMethodName()); + if (name.getMethodName().equals("testCompactionSeqId")) { + UTIL.getConfiguration().set("hbase.hstore.compaction.kv.max", "10"); + UTIL.getConfiguration().set( + DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY, + DummyCompactor.class.getName()); + HColumnDescriptor hcd = new HColumnDescriptor(FAMILY); + hcd.setMaxVersions(65536); + this.htd.addFamily(hcd); + } this.r = UTIL.createLocalHRegion(htd, null, null); } @@ -602,6 +613,72 @@ public class TestCompaction { cst.interruptIfNecessary(); } + /** + * Firstly write 10 cells (with different time stamp) to a qualifier and flush + * to hfile1, then write 10 cells (with different time stamp) to the same + * qualifier and flush to hfile2. The latest cell (cell-A) in hfile1 and the + * oldest cell (cell-B) in hfile2 are with the same time stamp but different + * sequence id, and will get scanned successively during compaction. + *

+ * We set compaction.kv.max to 10 so compaction will scan 10 versions each + * round, meanwhile we set keepSeqIdPeriod=0 in {@link DummyCompactor} so all + * 10 versions of hfile2 will be written out with seqId cleaned (set to 0) + * including cell-B, then when scanner goes to cell-A it will cause a scan + * out-of-order assertion error before HBASE-16931 + * + * @throws Exception + * if error occurs during the test + */ + @Test + public void testCompactionSeqId() throws Exception { + final byte[] ROW = Bytes.toBytes("row"); + final byte[] QUALIFIER = Bytes.toBytes("qualifier"); + + long timestamp = 10000; + + // row1/cf:a/10009/Put/vlen=2/seqid=11 V: v9 + // row1/cf:a/10008/Put/vlen=2/seqid=10 V: v8 + // row1/cf:a/10007/Put/vlen=2/seqid=9 V: v7 + // row1/cf:a/10006/Put/vlen=2/seqid=8 V: v6 + // row1/cf:a/10005/Put/vlen=2/seqid=7 V: v5 + // row1/cf:a/10004/Put/vlen=2/seqid=6 V: v4 + // row1/cf:a/10003/Put/vlen=2/seqid=5 V: v3 + // row1/cf:a/10002/Put/vlen=2/seqid=4 V: v2 + // row1/cf:a/10001/Put/vlen=2/seqid=3 V: v1 + // row1/cf:a/10000/Put/vlen=2/seqid=2 V: v0 + for (int i = 0; i < 10; i++) { + Put put = new Put(ROW); + put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i)); + r.put(put); + } + r.flush(true); + + // row1/cf:a/10018/Put/vlen=3/seqid=16 V: v18 + // row1/cf:a/10017/Put/vlen=3/seqid=17 V: v17 + // row1/cf:a/10016/Put/vlen=3/seqid=18 V: v16 + // row1/cf:a/10015/Put/vlen=3/seqid=19 V: v15 + // row1/cf:a/10014/Put/vlen=3/seqid=20 V: v14 + // row1/cf:a/10013/Put/vlen=3/seqid=21 V: v13 + // row1/cf:a/10012/Put/vlen=3/seqid=22 V: v12 + // row1/cf:a/10011/Put/vlen=3/seqid=23 V: v11 + // row1/cf:a/10010/Put/vlen=3/seqid=24 V: v10 + // row1/cf:a/10009/Put/vlen=2/seqid=25 V: v9 + for (int i = 18; i > 8; i--) { + Put put = new Put(ROW); + put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i)); + r.put(put); + } + r.flush(true); + r.compact(true); + } + + public static class DummyCompactor extends DefaultCompactor { + public DummyCompactor(Configuration conf, Store store) { + super(conf, store); + this.keepSeqIdPeriod = 0; + } + } + private static StoreFile createFile() throws Exception { StoreFile sf = mock(StoreFile.class); when(sf.getPath()).thenReturn(new Path("file"));