HBASE-12698 Add mob cell count to the metadata of each mob file (Jingcheng Du)
This commit is contained in:
parent
9f1f8c3bc6
commit
917adbf0e5
|
@ -219,7 +219,8 @@ public class DefaultMobCompactor extends DefaultCompactor {
|
||||||
} while (hasMore);
|
} while (hasMore);
|
||||||
} finally {
|
} finally {
|
||||||
if (mobFileWriter != null) {
|
if (mobFileWriter != null) {
|
||||||
appendMetadataAndCloseWriter(mobFileWriter, fd, major);
|
mobFileWriter.appendMetadata(fd.maxSeqId, major, mobCells);
|
||||||
|
mobFileWriter.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if(mobFileWriter!=null) {
|
if(mobFileWriter!=null) {
|
||||||
|
|
|
@ -196,7 +196,7 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
|
||||||
} while (hasMore);
|
} while (hasMore);
|
||||||
} finally {
|
} finally {
|
||||||
status.setStatus("Flushing mob file " + store + ": appending metadata");
|
status.setStatus("Flushing mob file " + store + ": appending metadata");
|
||||||
mobFileWriter.appendMetadata(cacheFlushId, false);
|
mobFileWriter.appendMetadata(cacheFlushId, false, mobCount);
|
||||||
status.setStatus("Flushing mob file " + store + ": closing flushed file");
|
status.setStatus("Flushing mob file " + store + ": closing flushed file");
|
||||||
mobFileWriter.close();
|
mobFileWriter.close();
|
||||||
}
|
}
|
||||||
|
|
|
@ -122,7 +122,7 @@ public class MemStoreWrapper {
|
||||||
*/
|
*/
|
||||||
private void internalFlushCache(final MemStoreSnapshot snapshot)
|
private void internalFlushCache(final MemStoreSnapshot snapshot)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if (snapshot.getSize() == 0) {
|
if (snapshot.getCellsCount() == 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// generate the files into a temp directory.
|
// generate the files into a temp directory.
|
||||||
|
@ -135,18 +135,16 @@ public class MemStoreWrapper {
|
||||||
LOG.info("Create files under a temp directory " + mobFileWriter.getPath().toString());
|
LOG.info("Create files under a temp directory " + mobFileWriter.getPath().toString());
|
||||||
|
|
||||||
byte[] referenceValue = Bytes.toBytes(relativePath);
|
byte[] referenceValue = Bytes.toBytes(relativePath);
|
||||||
int keyValueCount = 0;
|
|
||||||
KeyValueScanner scanner = snapshot.getScanner();
|
KeyValueScanner scanner = snapshot.getScanner();
|
||||||
Cell cell = null;
|
Cell cell = null;
|
||||||
while (null != (cell = scanner.next())) {
|
while (null != (cell = scanner.next())) {
|
||||||
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
|
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
|
||||||
mobFileWriter.append(kv);
|
mobFileWriter.append(kv);
|
||||||
keyValueCount++;
|
|
||||||
}
|
}
|
||||||
scanner.close();
|
scanner.close();
|
||||||
// Write out the log sequence number that corresponds to this output
|
// Write out the log sequence number that corresponds to this output
|
||||||
// hfile. The hfile is current up to and including logCacheFlushId.
|
// hfile. The hfile is current up to and including logCacheFlushId.
|
||||||
mobFileWriter.appendMetadata(Long.MAX_VALUE, false);
|
mobFileWriter.appendMetadata(Long.MAX_VALUE, false, snapshot.getCellsCount());
|
||||||
mobFileWriter.close();
|
mobFileWriter.close();
|
||||||
|
|
||||||
MobUtils.commitFile(conf, fs, mobFileWriter.getPath(), mobFamilyDir, cacheConfig);
|
MobUtils.commitFile(conf, fs, mobFileWriter.getPath(), mobFamilyDir, cacheConfig);
|
||||||
|
@ -164,9 +162,7 @@ public class MemStoreWrapper {
|
||||||
table.put(put);
|
table.put(put);
|
||||||
context.getCounter(SweepCounter.RECORDS_UPDATED).increment(1);
|
context.getCounter(SweepCounter.RECORDS_UPDATED).increment(1);
|
||||||
}
|
}
|
||||||
if (keyValueCount > 0) {
|
table.flushCommits();
|
||||||
table.flushCommits();
|
|
||||||
}
|
|
||||||
scanner.close();
|
scanner.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -112,6 +112,9 @@ public class StoreFile {
|
||||||
/** Key for timestamp of earliest-put in metadata*/
|
/** Key for timestamp of earliest-put in metadata*/
|
||||||
public static final byte[] EARLIEST_PUT_TS = Bytes.toBytes("EARLIEST_PUT_TS");
|
public static final byte[] EARLIEST_PUT_TS = Bytes.toBytes("EARLIEST_PUT_TS");
|
||||||
|
|
||||||
|
/** Key for the number of mob cells in metadata*/
|
||||||
|
public static final byte[] MOB_CELLS_COUNT = Bytes.toBytes("MOB_CELLS_COUNT");
|
||||||
|
|
||||||
private final StoreFileInfo fileInfo;
|
private final StoreFileInfo fileInfo;
|
||||||
private final FileSystem fs;
|
private final FileSystem fs;
|
||||||
|
|
||||||
|
@ -779,6 +782,22 @@ public class StoreFile {
|
||||||
appendTrackedTimestampsToMetadata();
|
appendTrackedTimestampsToMetadata();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Writes meta data.
|
||||||
|
* Call before {@link #close()} since its written as meta data to this file.
|
||||||
|
* @param maxSequenceId Maximum sequence id.
|
||||||
|
* @param majorCompaction True if this file is product of a major compaction
|
||||||
|
* @param mobCellsCount The number of mob cells.
|
||||||
|
* @throws IOException problem writing to FS
|
||||||
|
*/
|
||||||
|
public void appendMetadata(final long maxSequenceId, final boolean majorCompaction,
|
||||||
|
final long mobCellsCount) throws IOException {
|
||||||
|
writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
|
||||||
|
writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction));
|
||||||
|
writer.appendFileInfo(MOB_CELLS_COUNT, Bytes.toBytes(mobCellsCount));
|
||||||
|
appendTrackedTimestampsToMetadata();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add TimestampRange and earliest put timestamp to Metadata
|
* Add TimestampRange and earliest put timestamp to Metadata
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -27,6 +27,7 @@ import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
@ -169,6 +170,8 @@ public class TestMobCompaction {
|
||||||
assertEquals("Before compaction: mob file count", compactionThreshold, countMobFiles());
|
assertEquals("Before compaction: mob file count", compactionThreshold, countMobFiles());
|
||||||
assertEquals("Before compaction: rows", compactionThreshold, countRows());
|
assertEquals("Before compaction: rows", compactionThreshold, countRows());
|
||||||
assertEquals("Before compaction: mob rows", compactionThreshold, countMobRows());
|
assertEquals("Before compaction: mob rows", compactionThreshold, countMobRows());
|
||||||
|
assertEquals("Before compaction: number of mob cells", compactionThreshold,
|
||||||
|
countMobCellsInMetadata());
|
||||||
// Change the threshold larger than the data size
|
// Change the threshold larger than the data size
|
||||||
region.getTableDesc().getFamily(COLUMN_FAMILY).setMobThreshold(500);
|
region.getTableDesc().getFamily(COLUMN_FAMILY).setMobThreshold(500);
|
||||||
region.initialize();
|
region.initialize();
|
||||||
|
@ -217,6 +220,8 @@ public class TestMobCompaction {
|
||||||
assertEquals("After compaction: rows", compactionThreshold, countRows());
|
assertEquals("After compaction: rows", compactionThreshold, countRows());
|
||||||
assertEquals("After compaction: mob rows", compactionThreshold, countMobRows());
|
assertEquals("After compaction: mob rows", compactionThreshold, countMobRows());
|
||||||
assertEquals("After compaction: referenced mob file count", 1, countReferencedMobFiles());
|
assertEquals("After compaction: referenced mob file count", 1, countReferencedMobFiles());
|
||||||
|
assertEquals("After compaction: number of mob cells", compactionThreshold,
|
||||||
|
countMobCellsInMetadata());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -290,6 +295,26 @@ public class TestMobCompaction {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private long countMobCellsInMetadata() throws IOException {
|
||||||
|
long mobCellsCount = 0;
|
||||||
|
Path mobDirPath = new Path(MobUtils.getMobRegionPath(conf, htd.getTableName()),
|
||||||
|
hcd.getNameAsString());
|
||||||
|
Configuration copyOfConf = new Configuration(conf);
|
||||||
|
copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
|
||||||
|
CacheConfig cacheConfig = new CacheConfig(copyOfConf);
|
||||||
|
if (fs.exists(mobDirPath)) {
|
||||||
|
FileStatus[] files = UTIL.getTestFileSystem().listStatus(mobDirPath);
|
||||||
|
for (FileStatus file : files) {
|
||||||
|
StoreFile sf = new StoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE);
|
||||||
|
Map<byte[], byte[]> fileInfo = sf.createReader().loadFileInfo();
|
||||||
|
byte[] count = fileInfo.get(StoreFile.MOB_CELLS_COUNT);
|
||||||
|
assertTrue(count != null);
|
||||||
|
mobCellsCount += Bytes.toLong(count);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return mobCellsCount;
|
||||||
|
}
|
||||||
|
|
||||||
private Put createPut(int rowIdx, byte[] dummyData) throws IOException {
|
private Put createPut(int rowIdx, byte[] dummyData) throws IOException {
|
||||||
Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(rowIdx)));
|
Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(rowIdx)));
|
||||||
p.setDurability(Durability.SKIP_WAL);
|
p.setDurability(Durability.SKIP_WAL);
|
||||||
|
|
Loading…
Reference in New Issue