diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java index 065787e9dff..8cda746f445 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java @@ -588,6 +588,7 @@ public class PartitionedMobCompactor extends MobCompactor { if (writer != null) { writer.appendMetadata(maxSeqId, false); writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(bulkloadTime)); + writer.appendFileInfo(StoreFile.SKIP_RESET_SEQ_ID, Bytes.toBytes(true)); try { writer.close(); } catch (IOException e) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 7001acb1f22..009ba0dfbeb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -164,6 +164,13 @@ public class StoreFile { */ private final BloomType cfBloomType; + /** + * Key for skipping resetting sequence id in metadata. + * For bulk loaded hfiles, the scanner resets the cell seqId with the latest one, + * if this metadata is set as true, the reset is skipped. + */ + public static final byte[] SKIP_RESET_SEQ_ID = Bytes.toBytes("SKIP_RESET_SEQ_ID"); + /** * Constructor, loads a reader and it's indices, etc. May allocate a * substantial amount of ram depending on the underlying files (10-20MB?). @@ -407,6 +414,12 @@ public class StoreFile { this.sequenceid += 1; } } + // SKIP_RESET_SEQ_ID only works in bulk loaded file. + // In mob compaction, the hfile where the cells contain the path of a new mob file is bulk + // loaded to hbase, these cells have the same seqIds with the old ones. We do not want + // to reset new seqIds for them since this might make a mess of the visibility of cells that + // have the same row key but different seqIds. + this.reader.setSkipResetSeqId(isSkipResetSeqId(metadataMap.get(SKIP_RESET_SEQ_ID))); this.reader.setBulkLoaded(true); } this.reader.setSequenceID(this.sequenceid); @@ -536,6 +549,18 @@ public class StoreFile { return sb.toString(); } + /** + * Gets whether to skip resetting the sequence id for cells. + * @param skipResetSeqId The byte array of boolean. + * @return Whether to skip resetting the sequence id. + */ + private boolean isSkipResetSeqId(byte[] skipResetSeqId) { + if (skipResetSeqId != null && skipResetSeqId.length == 1) { + return Bytes.toBoolean(skipResetSeqId); + } + return false; + } + public static class WriterBuilder { private final Configuration conf; private final CacheConfig cacheConf; @@ -1068,6 +1093,7 @@ public class StoreFile { private long deleteFamilyCnt = -1; private boolean bulkLoadResult = false; private KeyValue.KeyOnlyKeyValue lastBloomKeyOnlyKV = null; + private boolean skipResetSeqId = true; public Reader(FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf) throws IOException { @@ -1594,6 +1620,14 @@ public class StoreFile { public long getMaxTimestamp() { return timeRangeTracker == null ? Long.MAX_VALUE : timeRangeTracker.getMaximumTimestamp(); } + + boolean isSkipResetSeqId() { + return skipResetSeqId; + } + + void setSkipResetSeqId(boolean skipResetSeqId) { + this.skipResetSeqId = skipResetSeqId; + } } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index 42a378da10d..1111a6124a2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -208,7 +208,7 @@ public class StoreFileScanner implements KeyValueScanner { protected void setCurrentCell(Cell newVal) throws IOException { this.cur = newVal; - if (this.cur != null && this.reader.isBulkLoaded()) { + if (this.cur != null && this.reader.isBulkLoaded() && !this.reader.isSkipResetSeqId()) { CellUtil.setSequenceId(cur, this.reader.getSequenceID()); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java index d63bb95dac1..380ebac23f3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.security.Key; import java.security.SecureRandom; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Random; @@ -54,6 +55,7 @@ import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -177,11 +179,11 @@ public class TestMobCompactor { TableName tableName = TableName.valueOf(tableNameAsString); HColumnDescriptor hcd1 = new HColumnDescriptor(family1); hcd1.setMobEnabled(true); - hcd1.setMobThreshold(0L); + hcd1.setMobThreshold(5); hcd1.setMaxVersions(4); HColumnDescriptor hcd2 = new HColumnDescriptor(family2); hcd2.setMobEnabled(true); - hcd2.setMobThreshold(0L); + hcd2.setMobThreshold(5); hcd2.setMaxVersions(4); HTableDescriptor desc = new HTableDescriptor(tableName); desc.addFamily(hcd1); @@ -570,7 +572,7 @@ public class TestMobCompactor { // do the mob compaction admin.compactMob(tableName, hcd1.getName()); - waitUntilCompactionFinished(tableName); + waitUntilMobCompactionFinished(tableName); assertEquals("After compaction: mob rows count", regionNum * (rowNumPerRegion - delRowNum), countMobRows(hTable)); assertEquals("After compaction: mob cells count", regionNum @@ -618,7 +620,7 @@ public class TestMobCompactor { // do the major mob compaction, it will force all files to compaction admin.majorCompactMob(tableName, hcd1.getName()); - waitUntilCompactionFinished(tableName); + waitUntilMobCompactionFinished(tableName); assertEquals("After compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum), countMobRows(hTable)); assertEquals("After compaction: mob cells count", @@ -633,7 +635,58 @@ public class TestMobCompactor { countFiles(tableName, false, family2)); } - private void waitUntilCompactionFinished(TableName tableName) throws IOException, + @Test + public void testScannerOnBulkLoadRefHFiles() throws Exception { + long ts = EnvironmentEdgeManager.currentTime(); + byte[] key0 = Bytes.toBytes("k0"); + byte[] key1 = Bytes.toBytes("k1"); + String value0 = "mobValue0"; + String value1 = "mobValue1"; + String newValue0 = "new"; + Put put0 = new Put(key0); + put0.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), ts, Bytes.toBytes(value0)); + loadData(admin, bufMut, tableName, new Put[] { put0 }); + put0 = new Put(key0); + put0.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), ts, Bytes.toBytes(newValue0)); + Put put1 = new Put(key1); + put1.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), ts, Bytes.toBytes(value1)); + loadData(admin, bufMut, tableName, new Put[] { put0, put1 }); + // read the latest cell of key0. + Get get = new Get(key0); + Result result = hTable.get(get); + Cell cell = result.getColumnLatestCell(hcd1.getName(), Bytes.toBytes(qf1)); + assertEquals("Before compaction: mob value of k0", "new", + Bytes.toString(CellUtil.cloneValue(cell))); + admin.majorCompactMob(tableName, hcd1.getName()); + waitUntilMobCompactionFinished(tableName); + // read the latest cell of key0, the cell seqId in bulk loaded file is not reset in the + // scanner. The cell that has "new" value is still visible. + result = hTable.get(get); + cell = result.getColumnLatestCell(hcd1.getName(), Bytes.toBytes(qf1)); + assertEquals("After compaction: mob value of k0", "new", + Bytes.toString(CellUtil.cloneValue(cell))); + // read the ref cell, not read further to the mob cell. + get.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(true)); + result = hTable.get(get); + cell = result.getColumnLatestCell(hcd1.getName(), Bytes.toBytes(qf1)); + // the ref name is the new file + Path mobFamilyPath = new Path(MobUtils.getMobRegionPath(TEST_UTIL.getConfiguration(), + tableName), hcd1.getNameAsString()); + List paths = new ArrayList(); + if (fs.exists(mobFamilyPath)) { + FileStatus[] files = fs.listStatus(mobFamilyPath); + for (FileStatus file : files) { + if (!StoreFileInfo.isDelFile(file.getPath())) { + paths.add(file.getPath()); + } + } + } + assertEquals("After compaction: number of mob files:", 1, paths.size()); + assertEquals("After compaction: mob file name:", MobUtils.getMobFileName(cell), paths.get(0) + .getName()); + } + + private void waitUntilMobCompactionFinished(TableName tableName) throws IOException, InterruptedException { long finished = EnvironmentEdgeManager.currentTime() + 60000; CompactionState state = admin.getMobCompactionState(tableName); @@ -804,6 +857,13 @@ public class TestMobCompactor { } } + private void loadData(Admin admin, BufferedMutator table, TableName tableName, Put[] puts) + throws IOException { + table.mutate(Arrays.asList(puts)); + table.flush(); + admin.flush(tableName); + } + /** * delete the row, family and cell to create the del file */