HBASE-13922 Do not reset mvcc in compactions for mob-enabled column.(Jingcheng Du)
This commit is contained in:
parent
ba4ba32b0d
commit
3f062ee236
|
@ -73,7 +73,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
||||||
protected Writer createTmpWriter(FileDetails fd, long smallestReadPoint) throws IOException {
|
protected Writer createTmpWriter(FileDetails fd, long smallestReadPoint) throws IOException {
|
||||||
// make this writer with tags always because of possible new cells with tags.
|
// make this writer with tags always because of possible new cells with tags.
|
||||||
StoreFile.Writer writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression,
|
StoreFile.Writer writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression,
|
||||||
true, fd.maxMVCCReadpoint >= smallestReadPoint, true);
|
true, true, true);
|
||||||
return writer;
|
return writer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -187,9 +187,6 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
||||||
hasMore = compactionScanner.next(cells, scannerContext);
|
hasMore = compactionScanner.next(cells, scannerContext);
|
||||||
// output to writer:
|
// output to writer:
|
||||||
for (Cell c : cells) {
|
for (Cell c : cells) {
|
||||||
if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) {
|
|
||||||
CellUtil.setSequenceId(c, 0);
|
|
||||||
}
|
|
||||||
if (compactionScanner.isOutputDeleteMarkers() && CellUtil.isDelete(c)) {
|
if (compactionScanner.isOutputDeleteMarkers() && CellUtil.isDelete(c)) {
|
||||||
delFileWriter.append(c);
|
delFileWriter.append(c);
|
||||||
deleteMarkersCount++;
|
deleteMarkersCount++;
|
||||||
|
|
|
@ -145,11 +145,11 @@ public class TestMobCompactor {
|
||||||
tableName = TableName.valueOf(tableNameAsString);
|
tableName = TableName.valueOf(tableNameAsString);
|
||||||
hcd1 = new HColumnDescriptor(family1);
|
hcd1 = new HColumnDescriptor(family1);
|
||||||
hcd1.setMobEnabled(true);
|
hcd1.setMobEnabled(true);
|
||||||
hcd1.setMobThreshold(0L);
|
hcd1.setMobThreshold(5);
|
||||||
hcd1.setMaxVersions(4);
|
hcd1.setMaxVersions(4);
|
||||||
hcd2 = new HColumnDescriptor(family2);
|
hcd2 = new HColumnDescriptor(family2);
|
||||||
hcd2.setMobEnabled(true);
|
hcd2.setMobEnabled(true);
|
||||||
hcd2.setMobThreshold(0L);
|
hcd2.setMobThreshold(5);
|
||||||
hcd2.setMaxVersions(4);
|
hcd2.setMaxVersions(4);
|
||||||
desc = new HTableDescriptor(tableName);
|
desc = new HTableDescriptor(tableName);
|
||||||
desc.addFamily(hcd1);
|
desc.addFamily(hcd1);
|
||||||
|
@ -179,11 +179,11 @@ public class TestMobCompactor {
|
||||||
TableName tableName = TableName.valueOf(tableNameAsString);
|
TableName tableName = TableName.valueOf(tableNameAsString);
|
||||||
HColumnDescriptor hcd1 = new HColumnDescriptor(family1);
|
HColumnDescriptor hcd1 = new HColumnDescriptor(family1);
|
||||||
hcd1.setMobEnabled(true);
|
hcd1.setMobEnabled(true);
|
||||||
hcd1.setMobThreshold(5);
|
hcd1.setMobThreshold(0);
|
||||||
hcd1.setMaxVersions(4);
|
hcd1.setMaxVersions(4);
|
||||||
HColumnDescriptor hcd2 = new HColumnDescriptor(family2);
|
HColumnDescriptor hcd2 = new HColumnDescriptor(family2);
|
||||||
hcd2.setMobEnabled(true);
|
hcd2.setMobEnabled(true);
|
||||||
hcd2.setMobThreshold(5);
|
hcd2.setMobThreshold(0);
|
||||||
hcd2.setMaxVersions(4);
|
hcd2.setMaxVersions(4);
|
||||||
HTableDescriptor desc = new HTableDescriptor(tableName);
|
HTableDescriptor desc = new HTableDescriptor(tableName);
|
||||||
desc.addFamily(hcd1);
|
desc.addFamily(hcd1);
|
||||||
|
@ -655,7 +655,7 @@ public class TestMobCompactor {
|
||||||
Get get = new Get(key0);
|
Get get = new Get(key0);
|
||||||
Result result = hTable.get(get);
|
Result result = hTable.get(get);
|
||||||
Cell cell = result.getColumnLatestCell(hcd1.getName(), Bytes.toBytes(qf1));
|
Cell cell = result.getColumnLatestCell(hcd1.getName(), Bytes.toBytes(qf1));
|
||||||
assertEquals("Before compaction: mob value of k0", "new",
|
assertEquals("Before compaction: mob value of k0", newValue0,
|
||||||
Bytes.toString(CellUtil.cloneValue(cell)));
|
Bytes.toString(CellUtil.cloneValue(cell)));
|
||||||
admin.majorCompactMob(tableName, hcd1.getName());
|
admin.majorCompactMob(tableName, hcd1.getName());
|
||||||
waitUntilMobCompactionFinished(tableName);
|
waitUntilMobCompactionFinished(tableName);
|
||||||
|
@ -663,9 +663,10 @@ public class TestMobCompactor {
|
||||||
// scanner. The cell that has "new" value is still visible.
|
// scanner. The cell that has "new" value is still visible.
|
||||||
result = hTable.get(get);
|
result = hTable.get(get);
|
||||||
cell = result.getColumnLatestCell(hcd1.getName(), Bytes.toBytes(qf1));
|
cell = result.getColumnLatestCell(hcd1.getName(), Bytes.toBytes(qf1));
|
||||||
assertEquals("After compaction: mob value of k0", "new",
|
assertEquals("After compaction: mob value of k0", newValue0,
|
||||||
Bytes.toString(CellUtil.cloneValue(cell)));
|
Bytes.toString(CellUtil.cloneValue(cell)));
|
||||||
// read the ref cell, not read further to the mob cell.
|
// read the ref cell, not read further to the mob cell.
|
||||||
|
get = new Get(key1);
|
||||||
get.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(true));
|
get.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(true));
|
||||||
result = hTable.get(get);
|
result = hTable.get(get);
|
||||||
cell = result.getColumnLatestCell(hcd1.getName(), Bytes.toBytes(qf1));
|
cell = result.getColumnLatestCell(hcd1.getName(), Bytes.toBytes(qf1));
|
||||||
|
@ -686,6 +687,47 @@ public class TestMobCompactor {
|
||||||
.getName());
|
.getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testScannerAfterCompactions() throws Exception {
|
||||||
|
long ts = EnvironmentEdgeManager.currentTime();
|
||||||
|
byte[] key0 = Bytes.toBytes("k0");
|
||||||
|
byte[] key1 = Bytes.toBytes("k1");
|
||||||
|
String value = "mobValue"; // larger than threshold
|
||||||
|
String newValue = "new";
|
||||||
|
Put put0 = new Put(key0);
|
||||||
|
put0.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), ts, Bytes.toBytes(value));
|
||||||
|
loadData(admin, bufMut, tableName, new Put[] { put0 });
|
||||||
|
Put put1 = new Put(key1);
|
||||||
|
put1.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), ts, Bytes.toBytes(value));
|
||||||
|
loadData(admin, bufMut, tableName, new Put[] { put1 });
|
||||||
|
put1 = new Put(key1);
|
||||||
|
put1.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), ts, Bytes.toBytes(newValue));
|
||||||
|
loadData(admin, bufMut, tableName, new Put[] { put1 }); // now two mob files
|
||||||
|
admin.majorCompact(tableName);
|
||||||
|
waitUntilCompactionFinished(tableName);
|
||||||
|
admin.majorCompactMob(tableName, hcd1.getName());
|
||||||
|
waitUntilMobCompactionFinished(tableName);
|
||||||
|
// read the latest cell of key1.
|
||||||
|
Get get = new Get(key1);
|
||||||
|
Result result = hTable.get(get);
|
||||||
|
Cell cell = result.getColumnLatestCell(hcd1.getName(), Bytes.toBytes(qf1));
|
||||||
|
assertEquals("After compaction: mob value", "new", Bytes.toString(CellUtil.cloneValue(cell)));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void waitUntilCompactionFinished(TableName tableName) throws IOException,
|
||||||
|
InterruptedException {
|
||||||
|
long finished = EnvironmentEdgeManager.currentTime() + 60000;
|
||||||
|
CompactionState state = admin.getCompactionState(tableName);
|
||||||
|
while (EnvironmentEdgeManager.currentTime() < finished) {
|
||||||
|
if (state == CompactionState.NONE) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
state = admin.getCompactionState(tableName);
|
||||||
|
Thread.sleep(10);
|
||||||
|
}
|
||||||
|
assertEquals(CompactionState.NONE, state);
|
||||||
|
}
|
||||||
|
|
||||||
private void waitUntilMobCompactionFinished(TableName tableName) throws IOException,
|
private void waitUntilMobCompactionFinished(TableName tableName) throws IOException,
|
||||||
InterruptedException {
|
InterruptedException {
|
||||||
long finished = EnvironmentEdgeManager.currentTime() + 60000;
|
long finished = EnvironmentEdgeManager.currentTime() + 60000;
|
||||||
|
|
Loading…
Reference in New Issue