HBASE-11646 Handle the MOB in compaction (Jingcheng Du)

This commit is contained in:
Jonathan M Hsieh 2014-09-18 07:03:21 -07:00
parent 7cd71d1db6
commit 7dbd828a90
8 changed files with 678 additions and 15 deletions

View File

@ -0,0 +1,233 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.regionserver.HMobStore;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.util.Bytes;
/**
* Compact passed set of files in the mob-enabled column family.
*/
@InterfaceAudience.Private
public class DefaultMobCompactor extends DefaultCompactor {
private static final Log LOG = LogFactory.getLog(DefaultMobCompactor.class);
private long mobSizeThreshold;
private HMobStore mobStore;
public DefaultMobCompactor(Configuration conf, Store store) {
super(conf, store);
// The mob cells reside in the mob-enabled column family which is held by HMobStore.
// During the compaction, the compactor reads the cells from the mob files and
// probably creates new mob files. All of these operations are included in HMobStore,
// so we need to cast the Store to HMobStore.
if (!(store instanceof HMobStore)) {
throw new IllegalArgumentException("The store " + store + " is not a HMobStore");
}
mobStore = (HMobStore) store;
mobSizeThreshold = MobUtils.getMobThreshold(store.getFamily());
}
/**
* Creates a writer for a new file in a temporary directory.
* @param fd The file details.
* @param smallestReadPoint The smallest mvcc readPoint across all the scanners in this region.
* @return Writer for a new StoreFile in the tmp dir.
* @throws IOException
*/
@Override
protected Writer createTmpWriter(FileDetails fd, long smallestReadPoint) throws IOException {
// make this writer with tags always because of possible new cells with tags.
StoreFile.Writer writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression,
true, fd.maxMVCCReadpoint >= smallestReadPoint, true);
return writer;
}
/**
* Performs compaction on a column family with the mob flag enabled.
* This is for when the mob threshold size has changed or if the mob
* column family mode has been toggled via an alter table statement.
* Compacts the files by the following rules.
* 1. If the cell has a mob reference tag, the cell's value is the path of the mob file.
* <ol>
* <li>
* If the value size of a cell is larger than the threshold, this cell is regarded as a mob,
* directly copy the (with mob tag) cell into the new store file.
* </li>
* <li>
* Otherwise, retrieve the mob cell from the mob file, and writes a copy of the cell into
* the new store file.
* </li>
* </ol>
* 2. If the cell doesn't have a reference tag.
* <ol>
* <li>
* If the value size of a cell is larger than the threshold, this cell is regarded as a mob,
* write this cell to a mob file, and write the path of this mob file to the store file.
* </li>
* <li>
* Otherwise, directly write this cell into the store file.
* </li>
* </ol>
* @param fd File details
* @param scanner Where to read from.
* @param writer Where to write to.
* @param smallestReadPoint Smallest read point.
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
* @param major Is a major compaction.
* @return Whether compaction ended; false if it was interrupted for any reason.
*/
@Override
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
long smallestReadPoint, boolean cleanSeqId, boolean major) throws IOException {
int bytesWritten = 0;
// Since scanner.next() can return 'false' but still be delivering data,
// we have to use a do/while loop.
List<Cell> cells = new ArrayList<Cell>();
// Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
int closeCheckInterval = HStore.getCloseCheckInterval();
boolean hasMore;
Path path = MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName());
byte[] fileName = null;
StoreFile.Writer mobFileWriter = null;
long mobCells = 0;
Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, store.getTableName()
.getName());
try {
try {
// If the mob file writer could not be created, directly write the cell to the store file.
mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount,
store.getFamily().getCompression(), store.getRegionInfo().getStartKey());
fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
} catch (IOException e) {
LOG.error(
"Fail to create mob writer, "
+ "we will continue the compaction by writing MOB cells directly in store files",
e);
}
do {
hasMore = scanner.next(cells, compactionKVMax);
// output to writer:
for (Cell c : cells) {
// TODO remove the KeyValueUtil.ensureKeyValue before merging back to trunk.
KeyValue kv = KeyValueUtil.ensureKeyValue(c);
resetSeqId(smallestReadPoint, cleanSeqId, kv);
if (mobFileWriter == null || kv.getTypeByte() != KeyValue.Type.Put.getCode()) {
// If the mob file writer is null or the kv type is not put, directly write the cell
// to the store file.
writer.append(kv);
} else if (MobUtils.isMobReferenceCell(kv)) {
if (MobUtils.isValidMobRefCellValue(kv)) {
int size = MobUtils.getMobValueLength(kv);
if (size > mobSizeThreshold) {
// If the value size is larger than the threshold, it's regarded as a mob. Since
// its value is already in the mob file, directly write this cell to the store file
writer.append(kv);
} else {
// If the value is not larger than the threshold, it's not regarded a mob. Retrieve
// the mob cell from the mob file, and write it back to the store file.
Cell cell = mobStore.resolve(kv, false);
if (cell.getValueLength() != 0) {
// put the mob data back to the store file
KeyValue mobKv = KeyValueUtil.ensureKeyValue(cell);
mobKv.setSequenceId(kv.getSequenceId());
writer.append(mobKv);
} else {
// If the value of a file is empty, there might be issues when retrieving,
// directly write the cell to the store file, and leave it to be handled by the
// next compaction.
writer.append(kv);
}
}
} else {
LOG.warn("The value format of the KeyValue " + kv
+ " is wrong, its length is less than " + Bytes.SIZEOF_INT);
writer.append(kv);
}
} else if (kv.getValueLength() <= mobSizeThreshold) {
// If the value size of a cell is not larger than the threshold, directly write it to
// the store file.
writer.append(kv);
} else {
// If the value size of a cell is larger than the threshold, it's regarded as a mob,
// write this cell to a mob file, and write the path to the store file.
mobCells++;
// append the original keyValue in the mob file.
mobFileWriter.append(kv);
KeyValue reference = MobUtils.createMobRefKeyValue(kv, fileName, tableNameTag);
// write the cell whose value is the path of a mob file to the store file.
writer.append(reference);
}
++progress.currentCompactedKVs;
// check periodically to see if a system stop is requested
if (closeCheckInterval > 0) {
bytesWritten += kv.getLength();
if (bytesWritten > closeCheckInterval) {
bytesWritten = 0;
if (!store.areWritesEnabled()) {
progress.cancel();
return false;
}
}
}
}
cells.clear();
} while (hasMore);
} finally {
if (mobFileWriter != null) {
appendMetadataAndCloseWriter(mobFileWriter, fd, major);
}
}
if(mobFileWriter!=null) {
if (mobCells > 0) {
// If the mob file is not empty, commit it.
mobStore.commitFile(mobFileWriter.getPath(), path);
} else {
try {
// If the mob file is empty, delete it instead of committing.
store.getFileSystem().delete(mobFileWriter.getPath(), true);
} catch (IOException e) {
LOG.error("Fail to delete the temp mob file", e);
}
}
}
progress.complete();
return true;
}
}

View File

@ -37,4 +37,12 @@ public class MobStoreEngine extends DefaultStoreEngine {
// specific compactor and policy when that is implemented.
storeFlusher = new DefaultMobStoreFlusher(conf, store);
}
/**
* Creates the DefaultMobCompactor.
*/
@Override
protected void createCompactor(Configuration conf, Store store) throws IOException {
compactor = new DefaultMobCompactor(conf, store);
}
}

View File

@ -260,4 +260,44 @@ public class MobUtils {
reference.setSequenceId(kv.getSequenceId());
return reference;
}
/**
* Indicates whether the current mob ref cell has a valid value.
* A mob ref cell has a mob reference tag.
* The value of a mob ref cell consists of two parts, real mob value length and mob file name.
* The real mob value length takes 4 bytes.
* The remaining part is the mob file name.
* @param cell The mob ref cell.
* @return True if the cell has a valid value.
*/
public static boolean isValidMobRefCellValue(Cell cell) {
return cell.getValueLength() > Bytes.SIZEOF_INT;
}
/**
* Gets the mob value length from the mob ref cell.
* A mob ref cell has a mob reference tag.
* The value of a mob ref cell consists of two parts, real mob value length and mob file name.
* The real mob value length takes 4 bytes.
* The remaining part is the mob file name.
* @param cell The mob ref cell.
* @return The real mob value length.
*/
public static int getMobValueLength(Cell cell) {
return Bytes.toInt(cell.getValueArray(), cell.getValueOffset(), Bytes.SIZEOF_INT);
}
/**
* Gets the mob file name from the mob ref cell.
* A mob ref cell has a mob reference tag.
* The value of a mob ref cell consists of two parts, real mob value length and mob file name.
* The real mob value length takes 4 bytes.
* The remaining part is the mob file name.
* @param cell The mob ref cell.
* @return The mob file name.
*/
public static String getMobFileName(Cell cell) {
return Bytes.toString(cell.getValueArray(), cell.getValueOffset() + Bytes.SIZEOF_INT,
cell.getValueLength() - Bytes.SIZEOF_INT);
}
}

View File

@ -219,9 +219,8 @@ public class HMobStore extends HStore {
*/
public Cell resolve(Cell reference, boolean cacheBlocks) throws IOException {
Cell result = null;
if (reference.getValueLength() > Bytes.SIZEOF_INT) {
String fileName = Bytes.toString(reference.getValueArray(), reference.getValueOffset()
+ Bytes.SIZEOF_INT, reference.getValueLength() - Bytes.SIZEOF_INT);
if (MobUtils.isValidMobRefCellValue(reference)) {
String fileName = MobUtils.getMobFileName(reference);
Path targetPath = new Path(mobFamilyPath, fileName);
MobFile file = null;
try {

View File

@ -43,7 +43,9 @@ import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.util.StringUtils;
/**
@ -57,7 +59,7 @@ public abstract class Compactor {
protected Configuration conf;
protected Store store;
private int compactionKVMax;
protected int compactionKVMax;
protected Compression.Algorithm compactionCompression;
/** specify how many days to keep MVCC values during major compaction **/
@ -92,6 +94,8 @@ public abstract class Compactor {
public long maxKeyCount = 0;
/** Earliest put timestamp if major compaction */
public long earliestPutTs = HConstants.LATEST_TIMESTAMP;
/** Latest put timestamp */
public long latestPutTs = HConstants.LATEST_TIMESTAMP;
/** The last key in the files we're compacting. */
public long maxSeqId = 0;
/** Latest memstore read point found in any of the involved files */
@ -158,6 +162,14 @@ public abstract class Compactor {
fd.earliestPutTs = Math.min(fd.earliestPutTs, earliestPutTs);
}
}
tmp = fileInfo.get(StoreFile.TIMERANGE_KEY);
TimeRangeTracker trt = new TimeRangeTracker();
if (tmp == null) {
fd.latestPutTs = HConstants.LATEST_TIMESTAMP;
} else {
Writables.copyWritable(tmp, trt);
fd.latestPutTs = trt.getMaximumTimestamp();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Compacting " + file +
", keycount=" + keyCount +
@ -216,14 +228,16 @@ public abstract class Compactor {
/**
* Performs the compaction.
* @param fd File details
* @param scanner Where to read from.
* @param writer Where to write to.
* @param smallestReadPoint Smallest read point.
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
* @param major Is a major compaction.
* @return Whether compaction ended; false if it was interrupted for some reason.
*/
protected boolean performCompaction(InternalScanner scanner,
CellSink writer, long smallestReadPoint, boolean cleanSeqId) throws IOException {
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
long smallestReadPoint, boolean cleanSeqId, boolean major) throws IOException {
int bytesWritten = 0;
// Since scanner.next() can return 'false' but still be delivering data,
// we have to use a do/while loop.
@ -241,9 +255,7 @@ public abstract class Compactor {
// output to writer:
for (Cell c : kvs) {
KeyValue kv = KeyValueUtil.ensureKeyValue(c);
if (cleanSeqId && kv.getSequenceId() <= smallestReadPoint) {
CellUtil.setSequenceId(kv, 0);
}
resetSeqId(smallestReadPoint, cleanSeqId, kv);
writer.append(kv);
++progress.currentCompactedKVs;
progress.totalCompactedSize += kv.getLength();
@ -309,4 +321,29 @@ public abstract class Compactor {
return new StoreScanner(store, store.getScanInfo(), scan, scanners, smallestReadPoint,
earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
}
/**
* Resets the sequence id.
* @param smallestReadPoint The smallest mvcc readPoint across all the scanners in this region.
* @param cleanSeqId Should clean the sequence id.
* @param kv The current KeyValue.
*/
protected void resetSeqId(long smallestReadPoint, boolean cleanSeqId, KeyValue kv) {
if (cleanSeqId && kv.getSequenceId() <= smallestReadPoint) {
kv.setSequenceId(0);
}
}
/**
* Appends the metadata and closes the writer.
* @param writer The current store writer.
* @param fd The file details.
* @param isMajor Is a major compaction.
* @throws IOException
*/
protected void appendMetadataAndCloseWriter(StoreFile.Writer writer, FileDetails fd,
boolean isMajor) throws IOException {
writer.appendMetadata(fd.maxSeqId, isMajor);
writer.close();
}
}

View File

@ -76,9 +76,9 @@ public class DefaultCompactor extends Compactor {
smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
cleanSeqId = true;
}
writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true,
fd.maxMVCCReadpoint >= smallestReadPoint, fd.maxTagsLength > 0);
boolean finished = performCompaction(scanner, writer, smallestReadPoint, cleanSeqId);
writer = createTmpWriter(fd, smallestReadPoint);
boolean finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId,
request.isAllFiles());
if (!finished) {
writer.close();
store.getFileSystem().delete(writer.getPath(), false);
@ -94,14 +94,27 @@ public class DefaultCompactor extends Compactor {
}
} finally {
if (writer != null) {
writer.appendMetadata(fd.maxSeqId, request.isAllFiles());
writer.close();
appendMetadataAndCloseWriter(writer, fd, request.isAllFiles());
newFiles.add(writer.getPath());
}
}
return newFiles;
}
/**
* Creates a writer for a new file in a temporary directory.
* @param fd The file details.
* @param smallestReadPoint The smallest mvcc readPoint across all the scanners in this region.
* @return Writer for a new StoreFile in the tmp dir.
* @throws IOException
*/
protected StoreFile.Writer createTmpWriter(FileDetails fd, long smallestReadPoint)
throws IOException {
StoreFile.Writer writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression,
true, fd.maxMVCCReadpoint >= smallestReadPoint, fd.maxTagsLength > 0);
return writer;
}
/**
* Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to
* {@link #compact(CompactionRequest)};

View File

@ -127,7 +127,8 @@ public class StripeCompactor extends Compactor {
// It is ok here if storeScanner is null.
StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
mw.init(storeScanner, factory, store.getComparator());
finished = performCompaction(scanner, mw, smallestReadPoint, cleanSeqId);
finished = performCompaction(fd, scanner, mw, smallestReadPoint, cleanSeqId,
request.isMajor());
if (!finished) {
throw new InterruptedIOException( "Aborting compaction of store " + store +
" in region " + store.getRegionInfo().getRegionNameAsString() +

View File

@ -0,0 +1,332 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseTestCase.HRegionIncommon;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
/**
* Test mob compaction
*/
@Category(MediumTests.class)
public class TestMobCompaction {
@Rule
public TestName name = new TestName();
static final Log LOG = LogFactory.getLog(TestMobCompaction.class.getName());
private HBaseTestingUtility UTIL = null;
private Configuration conf = null;
private HRegion region = null;
private HTableDescriptor htd = null;
private HColumnDescriptor hcd = null;
private long mobCellThreshold = 1000;
private FileSystem fs;
private static final byte[] COLUMN_FAMILY = fam1;
private final byte[] STARTROW = Bytes.toBytes(START_KEY);
private int compactionThreshold;
private void init(long mobThreshold) throws Exception {
this.mobCellThreshold = mobThreshold;
UTIL = HBaseTestingUtility.createLocalHTU();
conf = UTIL.getConfiguration();
compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
htd = UTIL.createTableDescriptor(name.getMethodName());
hcd = new HColumnDescriptor(COLUMN_FAMILY);
hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
hcd.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(mobThreshold));
hcd.setMaxVersions(1);
htd.addFamily(hcd);
region = UTIL.createLocalHRegion(htd, null, null);
fs = FileSystem.get(conf);
}
@After
public void tearDown() throws Exception {
region.close();
fs.delete(UTIL.getDataTestDir(), true);
}
/**
* During compaction, cells smaller than the threshold won't be affected.
*/
@Test
public void testSmallerValue() throws Exception {
init(500);
byte[] dummyData = makeDummyData(300); // smaller than mob threshold
HRegionIncommon loader = new HRegionIncommon(region);
// one hfile per row
for (int i = 0; i < compactionThreshold; i++) {
Put p = createPut(i, dummyData);
loader.put(p);
loader.flushcache();
}
assertEquals("Before compaction: store files", compactionThreshold, countStoreFiles());
assertEquals("Before compaction: mob file count", 0, countMobFiles());
assertEquals("Before compaction: rows", compactionThreshold, countRows());
assertEquals("Before compaction: mob rows", 0, countMobRows());
region.compactStores();
assertEquals("After compaction: store files", 1, countStoreFiles());
assertEquals("After compaction: mob file count", 0, countMobFiles());
assertEquals("After compaction: referenced mob file count", 0, countReferencedMobFiles());
assertEquals("After compaction: rows", compactionThreshold, countRows());
assertEquals("After compaction: mob rows", 0, countMobRows());
}
/**
* During compaction, the mob threshold size is changed.
*/
@Test
public void testLargerValue() throws Exception {
init(200);
byte[] dummyData = makeDummyData(300); // larger than mob threshold
HRegionIncommon loader = new HRegionIncommon(region);
for (int i = 0; i < compactionThreshold; i++) {
Put p = createPut(i, dummyData);
loader.put(p);
loader.flushcache();
}
assertEquals("Before compaction: store files", compactionThreshold, countStoreFiles());
assertEquals("Before compaction: mob file count", compactionThreshold, countMobFiles());
assertEquals("Before compaction: rows", compactionThreshold, countRows());
assertEquals("Before compaction: mob rows", compactionThreshold, countMobRows());
// Change the threshold larger than the data size
region.getTableDesc().getFamily(COLUMN_FAMILY).setValue(
MobConstants.MOB_THRESHOLD, Bytes.toBytes(500L));
region.initialize();
region.compactStores(true);
assertEquals("After compaction: store files", 1, countStoreFiles());
assertEquals("After compaction: mob file count", compactionThreshold, countMobFiles());
assertEquals("After compaction: referenced mob file count", 0, countReferencedMobFiles());
assertEquals("After compaction: rows", compactionThreshold, countRows());
assertEquals("After compaction: mob rows", 0, countMobRows());
}
/**
* This test will first generate store files, then bulk load them and trigger the compaction. When
* compaction, the cell value will be larger than the threshold.
*/
@Test
public void testMobCompactionWithBulkload() throws Exception {
// The following will produce store files of 600.
init(300);
byte[] dummyData = makeDummyData(600);
Path hbaseRootDir = FSUtils.getRootDir(conf);
Path basedir = new Path(hbaseRootDir, htd.getNameAsString());
List<Pair<byte[], String>> hfiles = new ArrayList<Pair<byte[], String>>(1);
for (int i = 0; i < compactionThreshold; i++) {
Path hpath = new Path(basedir, "hfile" + i);
hfiles.add(Pair.newPair(COLUMN_FAMILY, hpath.toString()));
createHFile(hpath, i, dummyData);
}
// The following will bulk load the above generated store files and compact, with 600(fileSize)
// > 300(threshold)
boolean result = region.bulkLoadHFiles(hfiles, true);
assertTrue("Bulkload result:", result);
assertEquals("Before compaction: store files", compactionThreshold, countStoreFiles());
assertEquals("Before compaction: mob file count", 0, countMobFiles());
assertEquals("Before compaction: rows", compactionThreshold, countRows());
assertEquals("Before compaction: mob rows", 0, countMobRows());
assertEquals("Before compaction: referenced mob file count", 0, countReferencedMobFiles());
region.compactStores();
assertEquals("After compaction: store files", 1, countStoreFiles());
assertEquals("After compaction: mob file count:", 1, countMobFiles());
assertEquals("After compaction: rows", compactionThreshold, countRows());
assertEquals("After compaction: mob rows", compactionThreshold, countMobRows());
assertEquals("After compaction: referenced mob file count", 1, countReferencedMobFiles());
}
private int countStoreFiles() throws IOException {
Store store = region.getStore(COLUMN_FAMILY);
return store.getStorefilesCount();
}
private int countMobFiles() throws IOException {
Path mobDirPath = new Path(MobUtils.getMobRegionPath(conf, htd.getTableName()),
hcd.getNameAsString());
if (fs.exists(mobDirPath)) {
FileStatus[] files = UTIL.getTestFileSystem().listStatus(mobDirPath);
return files.length;
}
return 0;
}
private Put createPut(int rowIdx, byte[] dummyData) throws IOException {
Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(rowIdx)));
p.setDurability(Durability.SKIP_WAL);
p.add(COLUMN_FAMILY, Bytes.toBytes("colX"), dummyData);
return p;
}
/**
* Create an HFile with the given number of bytes
*/
private void createHFile(Path path, int rowIdx, byte[] dummyData) throws IOException {
HFileContext meta = new HFileContextBuilder().build();
HFile.Writer writer = HFile.getWriterFactory(conf, new CacheConfig(conf)).withPath(fs, path)
.withFileContext(meta).create();
long now = System.currentTimeMillis();
try {
KeyValue kv = new KeyValue(Bytes.add(STARTROW, Bytes.toBytes(rowIdx)), COLUMN_FAMILY,
Bytes.toBytes("colX"), now, dummyData);
writer.append(kv);
} finally {
writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis()));
writer.close();
}
}
private int countMobRows() throws IOException {
Scan scan = new Scan();
// Do not retrieve the mob data when scanning
scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
InternalScanner scanner = region.getScanner(scan);
int scannedCount = 0;
List<Cell> results = new ArrayList<Cell>();
boolean hasMore = scanner.next(results);
while (hasMore) {
for (Cell c : results) {
if (MobUtils.isMobReferenceCell(c)) {
scannedCount++;
}
}
hasMore = scanner.next(results);
}
scanner.close();
return scannedCount;
}
private int countRows() throws IOException {
Scan scan = new Scan();
// Do not retrieve the mob data when scanning
InternalScanner scanner = region.getScanner(scan);
int scannedCount = 0;
List<Cell> results = new ArrayList<Cell>();
boolean hasMore = scanner.next(results);
while (hasMore) {
scannedCount += results.size();
hasMore = scanner.next(results);
}
scanner.close();
return scannedCount;
}
private byte[] makeDummyData(int size) {
byte[] dummyData = new byte[size];
new Random().nextBytes(dummyData);
return dummyData;
}
private int countReferencedMobFiles() throws IOException {
Scan scan = new Scan();
// Do not retrieve the mob data when scanning
scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
InternalScanner scanner = region.getScanner(scan);
List<Cell> kvs = new ArrayList<Cell>();
boolean hasMore = true;
String fileName;
Set<String> files = new HashSet<String>();
do {
kvs.clear();
hasMore = scanner.next(kvs);
for (Cell c : kvs) {
KeyValue kv = KeyValueUtil.ensureKeyValue(c);
if (!MobUtils.isMobReferenceCell(kv)) {
continue;
}
if (!MobUtils.isValidMobRefCellValue(kv)) {
continue;
}
int size = MobUtils.getMobValueLength(kv);
if (size <= mobCellThreshold) {
continue;
}
fileName = MobUtils.getMobFileName(kv);
if (fileName.isEmpty()) {
continue;
}
files.add(fileName);
Path familyPath = MobUtils.getMobFamilyPath(conf, htd.getTableName(),
hcd.getNameAsString());
assertTrue(fs.exists(new Path(familyPath, fileName)));
}
} while (hasMore);
scanner.close();
return files.size();
}
}