diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java index d0c81759410..47dd0b18e71 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java @@ -219,7 +219,8 @@ public class DefaultMobCompactor extends DefaultCompactor { } while (hasMore); } finally { if (mobFileWriter != null) { - appendMetadataAndCloseWriter(mobFileWriter, fd, major); + mobFileWriter.appendMetadata(fd.maxSeqId, major, mobCells); + mobFileWriter.close(); } } if(mobFileWriter!=null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java index 194e8988cd3..00b3421b057 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java @@ -196,7 +196,7 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher { } while (hasMore); } finally { status.setStatus("Flushing mob file " + store + ": appending metadata"); - mobFileWriter.appendMetadata(cacheFlushId, false); + mobFileWriter.appendMetadata(cacheFlushId, false, mobCount); status.setStatus("Flushing mob file " + store + ": closing flushed file"); mobFileWriter.close(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java index b0d4c9d8f23..d286b7259ff 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java @@ -122,7 +122,7 @@ public class MemStoreWrapper { */ private void internalFlushCache(final MemStoreSnapshot snapshot) throws IOException { - if (snapshot.getSize() == 0) { + if (snapshot.getCellsCount() == 0) { return; } // generate the files into a temp directory. @@ -135,18 +135,16 @@ public class MemStoreWrapper { LOG.info("Create files under a temp directory " + mobFileWriter.getPath().toString()); byte[] referenceValue = Bytes.toBytes(relativePath); - int keyValueCount = 0; KeyValueScanner scanner = snapshot.getScanner(); Cell cell = null; while (null != (cell = scanner.next())) { KeyValue kv = KeyValueUtil.ensureKeyValue(cell); mobFileWriter.append(kv); - keyValueCount++; } scanner.close(); // Write out the log sequence number that corresponds to this output // hfile. The hfile is current up to and including logCacheFlushId. - mobFileWriter.appendMetadata(Long.MAX_VALUE, false); + mobFileWriter.appendMetadata(Long.MAX_VALUE, false, snapshot.getCellsCount()); mobFileWriter.close(); MobUtils.commitFile(conf, fs, mobFileWriter.getPath(), mobFamilyDir, cacheConfig); @@ -164,9 +162,7 @@ public class MemStoreWrapper { table.put(put); context.getCounter(SweepCounter.RECORDS_UPDATED).increment(1); } - if (keyValueCount > 0) { - table.flushCommits(); - } + table.flushCommits(); scanner.close(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 13ded588be4..56ae51a169d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -112,6 +112,9 @@ public class StoreFile { /** Key for timestamp of earliest-put in metadata*/ public static final byte[] EARLIEST_PUT_TS = Bytes.toBytes("EARLIEST_PUT_TS"); + /** Key for the number of mob cells in metadata*/ + public static final byte[] MOB_CELLS_COUNT = Bytes.toBytes("MOB_CELLS_COUNT"); + private final StoreFileInfo fileInfo; private final FileSystem fs; @@ -779,6 +782,22 @@ public class StoreFile { appendTrackedTimestampsToMetadata(); } + /** + * Writes meta data. + * Call before {@link #close()} since its written as meta data to this file. + * @param maxSequenceId Maximum sequence id. + * @param majorCompaction True if this file is product of a major compaction + * @param mobCellsCount The number of mob cells. + * @throws IOException problem writing to FS + */ + public void appendMetadata(final long maxSequenceId, final boolean majorCompaction, + final long mobCellsCount) throws IOException { + writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId)); + writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction)); + writer.appendFileInfo(MOB_CELLS_COUNT, Bytes.toBytes(mobCellsCount)); + appendTrackedTimestampsToMetadata(); + } + /** * Add TimestampRange and earliest put timestamp to Metadata */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java index f44e5299e00..3234628eb5f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.Set; @@ -169,6 +170,8 @@ public class TestMobCompaction { assertEquals("Before compaction: mob file count", compactionThreshold, countMobFiles()); assertEquals("Before compaction: rows", compactionThreshold, countRows()); assertEquals("Before compaction: mob rows", compactionThreshold, countMobRows()); + assertEquals("Before compaction: number of mob cells", compactionThreshold, + countMobCellsInMetadata()); // Change the threshold larger than the data size region.getTableDesc().getFamily(COLUMN_FAMILY).setMobThreshold(500); region.initialize(); @@ -217,6 +220,8 @@ public class TestMobCompaction { assertEquals("After compaction: rows", compactionThreshold, countRows()); assertEquals("After compaction: mob rows", compactionThreshold, countMobRows()); assertEquals("After compaction: referenced mob file count", 1, countReferencedMobFiles()); + assertEquals("After compaction: number of mob cells", compactionThreshold, + countMobCellsInMetadata()); } /** @@ -290,6 +295,26 @@ public class TestMobCompaction { return 0; } + private long countMobCellsInMetadata() throws IOException { + long mobCellsCount = 0; + Path mobDirPath = new Path(MobUtils.getMobRegionPath(conf, htd.getTableName()), + hcd.getNameAsString()); + Configuration copyOfConf = new Configuration(conf); + copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f); + CacheConfig cacheConfig = new CacheConfig(copyOfConf); + if (fs.exists(mobDirPath)) { + FileStatus[] files = UTIL.getTestFileSystem().listStatus(mobDirPath); + for (FileStatus file : files) { + StoreFile sf = new StoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE); + Map fileInfo = sf.createReader().loadFileInfo(); + byte[] count = fileInfo.get(StoreFile.MOB_CELLS_COUNT); + assertTrue(count != null); + mobCellsCount += Bytes.toLong(count); + } + } + return mobCellsCount; + } + private Put createPut(int rowIdx, byte[] dummyData) throws IOException { Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(rowIdx))); p.setDurability(Durability.SKIP_WAL);