HBASE-13836 Do not reset the mvcc for bulk loaded mob reference cells in reading. (Jingcheng)
This commit is contained in:
parent
ba348cf5a5
commit
26893aa451
|
@ -588,6 +588,7 @@ public class PartitionedMobCompactor extends MobCompactor {
|
|||
if (writer != null) {
|
||||
writer.appendMetadata(maxSeqId, false);
|
||||
writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(bulkloadTime));
|
||||
writer.appendFileInfo(StoreFile.SKIP_RESET_SEQ_ID, Bytes.toBytes(true));
|
||||
try {
|
||||
writer.close();
|
||||
} catch (IOException e) {
|
||||
|
|
|
@ -164,6 +164,13 @@ public class StoreFile {
|
|||
*/
|
||||
private final BloomType cfBloomType;
|
||||
|
||||
/**
|
||||
* Key for skipping resetting sequence id in metadata.
|
||||
* For bulk loaded hfiles, the scanner resets the cell seqId with the latest one,
|
||||
* if this metadata is set as true, the reset is skipped.
|
||||
*/
|
||||
public static final byte[] SKIP_RESET_SEQ_ID = Bytes.toBytes("SKIP_RESET_SEQ_ID");
|
||||
|
||||
/**
|
||||
* Constructor, loads a reader and it's indices, etc. May allocate a
|
||||
* substantial amount of ram depending on the underlying files (10-20MB?).
|
||||
|
@ -407,6 +414,12 @@ public class StoreFile {
|
|||
this.sequenceid += 1;
|
||||
}
|
||||
}
|
||||
// SKIP_RESET_SEQ_ID only works in bulk loaded file.
|
||||
// In mob compaction, the hfile where the cells contain the path of a new mob file is bulk
|
||||
// loaded to hbase, these cells have the same seqIds with the old ones. We do not want
|
||||
// to reset new seqIds for them since this might make a mess of the visibility of cells that
|
||||
// have the same row key but different seqIds.
|
||||
this.reader.setSkipResetSeqId(isSkipResetSeqId(metadataMap.get(SKIP_RESET_SEQ_ID)));
|
||||
this.reader.setBulkLoaded(true);
|
||||
}
|
||||
this.reader.setSequenceID(this.sequenceid);
|
||||
|
@ -536,6 +549,18 @@ public class StoreFile {
|
|||
return sb.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets whether to skip resetting the sequence id for cells.
|
||||
* @param skipResetSeqId The byte array of boolean.
|
||||
* @return Whether to skip resetting the sequence id.
|
||||
*/
|
||||
private boolean isSkipResetSeqId(byte[] skipResetSeqId) {
|
||||
if (skipResetSeqId != null && skipResetSeqId.length == 1) {
|
||||
return Bytes.toBoolean(skipResetSeqId);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public static class WriterBuilder {
|
||||
private final Configuration conf;
|
||||
private final CacheConfig cacheConf;
|
||||
|
@ -1068,6 +1093,7 @@ public class StoreFile {
|
|||
private long deleteFamilyCnt = -1;
|
||||
private boolean bulkLoadResult = false;
|
||||
private KeyValue.KeyOnlyKeyValue lastBloomKeyOnlyKV = null;
|
||||
private boolean skipResetSeqId = true;
|
||||
|
||||
public Reader(FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf)
|
||||
throws IOException {
|
||||
|
@ -1594,6 +1620,14 @@ public class StoreFile {
|
|||
public long getMaxTimestamp() {
|
||||
return timeRangeTracker == null ? Long.MAX_VALUE : timeRangeTracker.getMaximumTimestamp();
|
||||
}
|
||||
|
||||
boolean isSkipResetSeqId() {
|
||||
return skipResetSeqId;
|
||||
}
|
||||
|
||||
void setSkipResetSeqId(boolean skipResetSeqId) {
|
||||
this.skipResetSeqId = skipResetSeqId;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -208,7 +208,7 @@ public class StoreFileScanner implements KeyValueScanner {
|
|||
|
||||
protected void setCurrentCell(Cell newVal) throws IOException {
|
||||
this.cur = newVal;
|
||||
if (this.cur != null && this.reader.isBulkLoaded()) {
|
||||
if (this.cur != null && this.reader.isBulkLoaded() && !this.reader.isSkipResetSeqId()) {
|
||||
CellUtil.setSequenceId(cur, this.reader.getSequenceID());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.io.IOException;
|
|||
import java.security.Key;
|
||||
import java.security.SecureRandom;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
@ -54,6 +55,7 @@ import org.apache.hadoop.hbase.client.Connection;
|
|||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Durability;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
|
@ -177,11 +179,11 @@ public class TestMobCompactor {
|
|||
TableName tableName = TableName.valueOf(tableNameAsString);
|
||||
HColumnDescriptor hcd1 = new HColumnDescriptor(family1);
|
||||
hcd1.setMobEnabled(true);
|
||||
hcd1.setMobThreshold(0L);
|
||||
hcd1.setMobThreshold(5);
|
||||
hcd1.setMaxVersions(4);
|
||||
HColumnDescriptor hcd2 = new HColumnDescriptor(family2);
|
||||
hcd2.setMobEnabled(true);
|
||||
hcd2.setMobThreshold(0L);
|
||||
hcd2.setMobThreshold(5);
|
||||
hcd2.setMaxVersions(4);
|
||||
HTableDescriptor desc = new HTableDescriptor(tableName);
|
||||
desc.addFamily(hcd1);
|
||||
|
@ -570,7 +572,7 @@ public class TestMobCompactor {
|
|||
// do the mob compaction
|
||||
admin.compactMob(tableName, hcd1.getName());
|
||||
|
||||
waitUntilCompactionFinished(tableName);
|
||||
waitUntilMobCompactionFinished(tableName);
|
||||
assertEquals("After compaction: mob rows count", regionNum * (rowNumPerRegion - delRowNum),
|
||||
countMobRows(hTable));
|
||||
assertEquals("After compaction: mob cells count", regionNum
|
||||
|
@ -618,7 +620,7 @@ public class TestMobCompactor {
|
|||
// do the major mob compaction, it will force all files to compaction
|
||||
admin.majorCompactMob(tableName, hcd1.getName());
|
||||
|
||||
waitUntilCompactionFinished(tableName);
|
||||
waitUntilMobCompactionFinished(tableName);
|
||||
assertEquals("After compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
|
||||
countMobRows(hTable));
|
||||
assertEquals("After compaction: mob cells count",
|
||||
|
@ -633,7 +635,58 @@ public class TestMobCompactor {
|
|||
countFiles(tableName, false, family2));
|
||||
}
|
||||
|
||||
private void waitUntilCompactionFinished(TableName tableName) throws IOException,
|
||||
@Test
|
||||
public void testScannerOnBulkLoadRefHFiles() throws Exception {
|
||||
long ts = EnvironmentEdgeManager.currentTime();
|
||||
byte[] key0 = Bytes.toBytes("k0");
|
||||
byte[] key1 = Bytes.toBytes("k1");
|
||||
String value0 = "mobValue0";
|
||||
String value1 = "mobValue1";
|
||||
String newValue0 = "new";
|
||||
Put put0 = new Put(key0);
|
||||
put0.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), ts, Bytes.toBytes(value0));
|
||||
loadData(admin, bufMut, tableName, new Put[] { put0 });
|
||||
put0 = new Put(key0);
|
||||
put0.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), ts, Bytes.toBytes(newValue0));
|
||||
Put put1 = new Put(key1);
|
||||
put1.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), ts, Bytes.toBytes(value1));
|
||||
loadData(admin, bufMut, tableName, new Put[] { put0, put1 });
|
||||
// read the latest cell of key0.
|
||||
Get get = new Get(key0);
|
||||
Result result = hTable.get(get);
|
||||
Cell cell = result.getColumnLatestCell(hcd1.getName(), Bytes.toBytes(qf1));
|
||||
assertEquals("Before compaction: mob value of k0", "new",
|
||||
Bytes.toString(CellUtil.cloneValue(cell)));
|
||||
admin.majorCompactMob(tableName, hcd1.getName());
|
||||
waitUntilMobCompactionFinished(tableName);
|
||||
// read the latest cell of key0, the cell seqId in bulk loaded file is not reset in the
|
||||
// scanner. The cell that has "new" value is still visible.
|
||||
result = hTable.get(get);
|
||||
cell = result.getColumnLatestCell(hcd1.getName(), Bytes.toBytes(qf1));
|
||||
assertEquals("After compaction: mob value of k0", "new",
|
||||
Bytes.toString(CellUtil.cloneValue(cell)));
|
||||
// read the ref cell, not read further to the mob cell.
|
||||
get.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(true));
|
||||
result = hTable.get(get);
|
||||
cell = result.getColumnLatestCell(hcd1.getName(), Bytes.toBytes(qf1));
|
||||
// the ref name is the new file
|
||||
Path mobFamilyPath = new Path(MobUtils.getMobRegionPath(TEST_UTIL.getConfiguration(),
|
||||
tableName), hcd1.getNameAsString());
|
||||
List<Path> paths = new ArrayList<Path>();
|
||||
if (fs.exists(mobFamilyPath)) {
|
||||
FileStatus[] files = fs.listStatus(mobFamilyPath);
|
||||
for (FileStatus file : files) {
|
||||
if (!StoreFileInfo.isDelFile(file.getPath())) {
|
||||
paths.add(file.getPath());
|
||||
}
|
||||
}
|
||||
}
|
||||
assertEquals("After compaction: number of mob files:", 1, paths.size());
|
||||
assertEquals("After compaction: mob file name:", MobUtils.getMobFileName(cell), paths.get(0)
|
||||
.getName());
|
||||
}
|
||||
|
||||
private void waitUntilMobCompactionFinished(TableName tableName) throws IOException,
|
||||
InterruptedException {
|
||||
long finished = EnvironmentEdgeManager.currentTime() + 60000;
|
||||
CompactionState state = admin.getMobCompactionState(tableName);
|
||||
|
@ -804,6 +857,13 @@ public class TestMobCompactor {
|
|||
}
|
||||
}
|
||||
|
||||
private void loadData(Admin admin, BufferedMutator table, TableName tableName, Put[] puts)
|
||||
throws IOException {
|
||||
table.mutate(Arrays.asList(puts));
|
||||
table.flush();
|
||||
admin.flush(tableName);
|
||||
}
|
||||
|
||||
/**
|
||||
* delete the row, family and cell to create the del file
|
||||
*/
|
||||
|
|
Loading…
Reference in New Issue