HBASE-16931 Setting cell's seqId to zero in compaction flow might cause RS down.
Signed-off-by: Yu Li <liyu@apache.org>
This commit is contained in:
parent
a4d48b699f
commit
fa3cbd1d80
|
@ -77,7 +77,7 @@ public abstract class Compactor<T extends CellSink> {
|
|||
protected final Compression.Algorithm compactionCompression;
|
||||
|
||||
/** specify how many days to keep MVCC values during major compaction **/
|
||||
protected final int keepSeqIdPeriod;
|
||||
protected int keepSeqIdPeriod;
|
||||
|
||||
//TODO: depending on Store is not good but, realistically, all compactors currently do.
|
||||
Compactor(final Configuration conf, final Store store) {
|
||||
|
@ -419,9 +419,16 @@ public abstract class Compactor<T extends CellSink> {
|
|||
now = EnvironmentEdgeManager.currentTime();
|
||||
}
|
||||
// output to writer:
|
||||
Cell lastCleanCell = null;
|
||||
long lastCleanCellSeqId = 0;
|
||||
for (Cell c : cells) {
|
||||
if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) {
|
||||
lastCleanCell = c;
|
||||
lastCleanCellSeqId = c.getSequenceId();
|
||||
CellUtil.setSequenceId(c, 0);
|
||||
} else {
|
||||
lastCleanCell = null;
|
||||
lastCleanCellSeqId = 0;
|
||||
}
|
||||
writer.append(c);
|
||||
int len = KeyValueUtil.length(c);
|
||||
|
@ -444,6 +451,12 @@ public abstract class Compactor<T extends CellSink> {
|
|||
}
|
||||
}
|
||||
if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
|
||||
if (lastCleanCell != null) {
|
||||
// HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly.
|
||||
// ShipperListener will do a clone of the last cells it refer, so need to set back
|
||||
// sequence id before ShipperListener.beforeShipped
|
||||
CellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId);
|
||||
}
|
||||
// Clone the cells that are in the writer so that they are freed of references,
|
||||
// if they are holding any.
|
||||
((ShipperListener)writer).beforeShipped();
|
||||
|
@ -457,6 +470,10 @@ public abstract class Compactor<T extends CellSink> {
|
|||
bytesWrittenProgressForShippedCall = 0;
|
||||
}
|
||||
}
|
||||
if (lastCleanCell != null) {
|
||||
// HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly
|
||||
CellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId);
|
||||
}
|
||||
// Log the progress of long running compactions every minute if
|
||||
// logging at DEBUG level
|
||||
if (LOG.isDebugEnabled()) {
|
||||
|
|
|
@ -47,11 +47,9 @@ import org.apache.hadoop.hbase.ChoreService;
|
|||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseTestCase;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Durability;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
|
@ -63,10 +61,13 @@ import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
|
|||
import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
|
||||
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
|
||||
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.junit.After;
|
||||
import org.junit.Assume;
|
||||
import org.junit.Before;
|
||||
|
@ -96,6 +97,7 @@ public class TestCompaction {
|
|||
private int compactionThreshold;
|
||||
private byte[] secondRowBytes, thirdRowBytes;
|
||||
private static final long MAX_FILES_TO_COMPACT = 10;
|
||||
private final byte[] FAMILY = Bytes.toBytes("cf");
|
||||
|
||||
/** constructor */
|
||||
public TestCompaction() {
|
||||
|
@ -118,6 +120,15 @@ public class TestCompaction {
|
|||
@Before
|
||||
public void setUp() throws Exception {
|
||||
this.htd = UTIL.createTableDescriptor(name.getMethodName());
|
||||
if (name.getMethodName().equals("testCompactionSeqId")) {
|
||||
UTIL.getConfiguration().set("hbase.hstore.compaction.kv.max", "10");
|
||||
UTIL.getConfiguration().set(
|
||||
DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY,
|
||||
DummyCompactor.class.getName());
|
||||
HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
|
||||
hcd.setMaxVersions(65536);
|
||||
this.htd.addFamily(hcd);
|
||||
}
|
||||
this.r = UTIL.createLocalHRegion(htd, null, null);
|
||||
}
|
||||
|
||||
|
@ -602,6 +613,72 @@ public class TestCompaction {
|
|||
cst.interruptIfNecessary();
|
||||
}
|
||||
|
||||
/**
|
||||
* Firstly write 10 cells (with different time stamp) to a qualifier and flush
|
||||
* to hfile1, then write 10 cells (with different time stamp) to the same
|
||||
* qualifier and flush to hfile2. The latest cell (cell-A) in hfile1 and the
|
||||
* oldest cell (cell-B) in hfile2 are with the same time stamp but different
|
||||
* sequence id, and will get scanned successively during compaction.
|
||||
* <p/>
|
||||
* We set compaction.kv.max to 10 so compaction will scan 10 versions each
|
||||
* round, meanwhile we set keepSeqIdPeriod=0 in {@link DummyCompactor} so all
|
||||
* 10 versions of hfile2 will be written out with seqId cleaned (set to 0)
|
||||
* including cell-B, then when scanner goes to cell-A it will cause a scan
|
||||
* out-of-order assertion error before HBASE-16931
|
||||
*
|
||||
* @throws Exception
|
||||
* if error occurs during the test
|
||||
*/
|
||||
@Test
|
||||
public void testCompactionSeqId() throws Exception {
|
||||
final byte[] ROW = Bytes.toBytes("row");
|
||||
final byte[] QUALIFIER = Bytes.toBytes("qualifier");
|
||||
|
||||
long timestamp = 10000;
|
||||
|
||||
// row1/cf:a/10009/Put/vlen=2/seqid=11 V: v9
|
||||
// row1/cf:a/10008/Put/vlen=2/seqid=10 V: v8
|
||||
// row1/cf:a/10007/Put/vlen=2/seqid=9 V: v7
|
||||
// row1/cf:a/10006/Put/vlen=2/seqid=8 V: v6
|
||||
// row1/cf:a/10005/Put/vlen=2/seqid=7 V: v5
|
||||
// row1/cf:a/10004/Put/vlen=2/seqid=6 V: v4
|
||||
// row1/cf:a/10003/Put/vlen=2/seqid=5 V: v3
|
||||
// row1/cf:a/10002/Put/vlen=2/seqid=4 V: v2
|
||||
// row1/cf:a/10001/Put/vlen=2/seqid=3 V: v1
|
||||
// row1/cf:a/10000/Put/vlen=2/seqid=2 V: v0
|
||||
for (int i = 0; i < 10; i++) {
|
||||
Put put = new Put(ROW);
|
||||
put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i));
|
||||
r.put(put);
|
||||
}
|
||||
r.flush(true);
|
||||
|
||||
// row1/cf:a/10018/Put/vlen=3/seqid=16 V: v18
|
||||
// row1/cf:a/10017/Put/vlen=3/seqid=17 V: v17
|
||||
// row1/cf:a/10016/Put/vlen=3/seqid=18 V: v16
|
||||
// row1/cf:a/10015/Put/vlen=3/seqid=19 V: v15
|
||||
// row1/cf:a/10014/Put/vlen=3/seqid=20 V: v14
|
||||
// row1/cf:a/10013/Put/vlen=3/seqid=21 V: v13
|
||||
// row1/cf:a/10012/Put/vlen=3/seqid=22 V: v12
|
||||
// row1/cf:a/10011/Put/vlen=3/seqid=23 V: v11
|
||||
// row1/cf:a/10010/Put/vlen=3/seqid=24 V: v10
|
||||
// row1/cf:a/10009/Put/vlen=2/seqid=25 V: v9
|
||||
for (int i = 18; i > 8; i--) {
|
||||
Put put = new Put(ROW);
|
||||
put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i));
|
||||
r.put(put);
|
||||
}
|
||||
r.flush(true);
|
||||
r.compact(true);
|
||||
}
|
||||
|
||||
public static class DummyCompactor extends DefaultCompactor {
|
||||
public DummyCompactor(Configuration conf, Store store) {
|
||||
super(conf, store);
|
||||
this.keepSeqIdPeriod = 0;
|
||||
}
|
||||
}
|
||||
|
||||
private static StoreFile createFile() throws Exception {
|
||||
StoreFile sf = mock(StoreFile.class);
|
||||
when(sf.getPath()).thenReturn(new Path("file"));
|
||||
|
|
Loading…
Reference in New Issue