From 7dbd828a90e36c3b94fcadf659ce047a1ddbce0e Mon Sep 17 00:00:00 2001 From: Jonathan M Hsieh Date: Thu, 18 Sep 2014 07:03:21 -0700 Subject: [PATCH] HBASE-11646 Handle the MOB in compaction (Jingcheng Du) --- .../hadoop/hbase/mob/DefaultMobCompactor.java | 233 ++++++++++++ .../hadoop/hbase/mob/MobStoreEngine.java | 8 + .../org/apache/hadoop/hbase/mob/MobUtils.java | 40 +++ .../hadoop/hbase/regionserver/HMobStore.java | 5 +- .../regionserver/compactions/Compactor.java | 49 ++- .../compactions/DefaultCompactor.java | 23 +- .../compactions/StripeCompactor.java | 3 +- .../hbase/regionserver/TestMobCompaction.java | 332 ++++++++++++++++++ 8 files changed, 678 insertions(+), 15 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java new file mode 100644 index 00000000000..5f13502d2e5 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java @@ -0,0 +1,233 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagType; +import org.apache.hadoop.hbase.regionserver.HMobStore; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFile.Writer; +import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Compact passed set of files in the mob-enabled column family. + */ +@InterfaceAudience.Private +public class DefaultMobCompactor extends DefaultCompactor { + + private static final Log LOG = LogFactory.getLog(DefaultMobCompactor.class); + private long mobSizeThreshold; + private HMobStore mobStore; + public DefaultMobCompactor(Configuration conf, Store store) { + super(conf, store); + // The mob cells reside in the mob-enabled column family which is held by HMobStore. + // During the compaction, the compactor reads the cells from the mob files and + // probably creates new mob files. All of these operations are included in HMobStore, + // so we need to cast the Store to HMobStore. + if (!(store instanceof HMobStore)) { + throw new IllegalArgumentException("The store " + store + " is not a HMobStore"); + } + mobStore = (HMobStore) store; + mobSizeThreshold = MobUtils.getMobThreshold(store.getFamily()); + } + + /** + * Creates a writer for a new file in a temporary directory. + * @param fd The file details. + * @param smallestReadPoint The smallest mvcc readPoint across all the scanners in this region. + * @return Writer for a new StoreFile in the tmp dir. + * @throws IOException + */ + @Override + protected Writer createTmpWriter(FileDetails fd, long smallestReadPoint) throws IOException { + // make this writer with tags always because of possible new cells with tags. + StoreFile.Writer writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, + true, fd.maxMVCCReadpoint >= smallestReadPoint, true); + return writer; + } + + /** + * Performs compaction on a column family with the mob flag enabled. + * This is for when the mob threshold size has changed or if the mob + * column family mode has been toggled via an alter table statement. + * Compacts the files by the following rules. + * 1. If the cell has a mob reference tag, the cell's value is the path of the mob file. + *
    + *
  1. + * If the value size of a cell is larger than the threshold, this cell is regarded as a mob, + * directly copy the (with mob tag) cell into the new store file. + *
  2. + *
  3. + * Otherwise, retrieve the mob cell from the mob file, and writes a copy of the cell into + * the new store file. + *
  4. + *
+ * 2. If the cell doesn't have a reference tag. + *
    + *
  1. + * If the value size of a cell is larger than the threshold, this cell is regarded as a mob, + * write this cell to a mob file, and write the path of this mob file to the store file. + *
  2. + *
  3. + * Otherwise, directly write this cell into the store file. + *
  4. + *
+ * @param fd File details + * @param scanner Where to read from. + * @param writer Where to write to. + * @param smallestReadPoint Smallest read point. + * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint + * @param major Is a major compaction. + * @return Whether compaction ended; false if it was interrupted for any reason. + */ + @Override + protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, + long smallestReadPoint, boolean cleanSeqId, boolean major) throws IOException { + int bytesWritten = 0; + // Since scanner.next() can return 'false' but still be delivering data, + // we have to use a do/while loop. + List cells = new ArrayList(); + // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME + int closeCheckInterval = HStore.getCloseCheckInterval(); + boolean hasMore; + Path path = MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName()); + byte[] fileName = null; + StoreFile.Writer mobFileWriter = null; + long mobCells = 0; + Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, store.getTableName() + .getName()); + try { + try { + // If the mob file writer could not be created, directly write the cell to the store file. + mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount, + store.getFamily().getCompression(), store.getRegionInfo().getStartKey()); + fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); + } catch (IOException e) { + LOG.error( + "Fail to create mob writer, " + + "we will continue the compaction by writing MOB cells directly in store files", + e); + } + do { + hasMore = scanner.next(cells, compactionKVMax); + // output to writer: + for (Cell c : cells) { + // TODO remove the KeyValueUtil.ensureKeyValue before merging back to trunk. + KeyValue kv = KeyValueUtil.ensureKeyValue(c); + resetSeqId(smallestReadPoint, cleanSeqId, kv); + if (mobFileWriter == null || kv.getTypeByte() != KeyValue.Type.Put.getCode()) { + // If the mob file writer is null or the kv type is not put, directly write the cell + // to the store file. + writer.append(kv); + } else if (MobUtils.isMobReferenceCell(kv)) { + if (MobUtils.isValidMobRefCellValue(kv)) { + int size = MobUtils.getMobValueLength(kv); + if (size > mobSizeThreshold) { + // If the value size is larger than the threshold, it's regarded as a mob. Since + // its value is already in the mob file, directly write this cell to the store file + writer.append(kv); + } else { + // If the value is not larger than the threshold, it's not regarded a mob. Retrieve + // the mob cell from the mob file, and write it back to the store file. + Cell cell = mobStore.resolve(kv, false); + if (cell.getValueLength() != 0) { + // put the mob data back to the store file + KeyValue mobKv = KeyValueUtil.ensureKeyValue(cell); + mobKv.setSequenceId(kv.getSequenceId()); + writer.append(mobKv); + } else { + // If the value of a file is empty, there might be issues when retrieving, + // directly write the cell to the store file, and leave it to be handled by the + // next compaction. + writer.append(kv); + } + } + } else { + LOG.warn("The value format of the KeyValue " + kv + + " is wrong, its length is less than " + Bytes.SIZEOF_INT); + writer.append(kv); + } + } else if (kv.getValueLength() <= mobSizeThreshold) { + // If the value size of a cell is not larger than the threshold, directly write it to + // the store file. + writer.append(kv); + } else { + // If the value size of a cell is larger than the threshold, it's regarded as a mob, + // write this cell to a mob file, and write the path to the store file. + mobCells++; + // append the original keyValue in the mob file. + mobFileWriter.append(kv); + KeyValue reference = MobUtils.createMobRefKeyValue(kv, fileName, tableNameTag); + // write the cell whose value is the path of a mob file to the store file. + writer.append(reference); + } + ++progress.currentCompactedKVs; + + // check periodically to see if a system stop is requested + if (closeCheckInterval > 0) { + bytesWritten += kv.getLength(); + if (bytesWritten > closeCheckInterval) { + bytesWritten = 0; + if (!store.areWritesEnabled()) { + progress.cancel(); + return false; + } + } + } + } + cells.clear(); + } while (hasMore); + } finally { + if (mobFileWriter != null) { + appendMetadataAndCloseWriter(mobFileWriter, fd, major); + } + } + if(mobFileWriter!=null) { + if (mobCells > 0) { + // If the mob file is not empty, commit it. + mobStore.commitFile(mobFileWriter.getPath(), path); + } else { + try { + // If the mob file is empty, delete it instead of committing. + store.getFileSystem().delete(mobFileWriter.getPath(), true); + } catch (IOException e) { + LOG.error("Fail to delete the temp mob file", e); + } + } + } + progress.complete(); + return true; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java index d5e6f2ef215..2d5f1ad02e2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java @@ -37,4 +37,12 @@ public class MobStoreEngine extends DefaultStoreEngine { // specific compactor and policy when that is implemented. storeFlusher = new DefaultMobStoreFlusher(conf, store); } + + /** + * Creates the DefaultMobCompactor. + */ + @Override + protected void createCompactor(Configuration conf, Store store) throws IOException { + compactor = new DefaultMobCompactor(conf, store); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java index c106e0b08f1..e52d336b19e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java @@ -260,4 +260,44 @@ public class MobUtils { reference.setSequenceId(kv.getSequenceId()); return reference; } + + /** + * Indicates whether the current mob ref cell has a valid value. + * A mob ref cell has a mob reference tag. + * The value of a mob ref cell consists of two parts, real mob value length and mob file name. + * The real mob value length takes 4 bytes. + * The remaining part is the mob file name. + * @param cell The mob ref cell. + * @return True if the cell has a valid value. + */ + public static boolean isValidMobRefCellValue(Cell cell) { + return cell.getValueLength() > Bytes.SIZEOF_INT; + } + + /** + * Gets the mob value length from the mob ref cell. + * A mob ref cell has a mob reference tag. + * The value of a mob ref cell consists of two parts, real mob value length and mob file name. + * The real mob value length takes 4 bytes. + * The remaining part is the mob file name. + * @param cell The mob ref cell. + * @return The real mob value length. + */ + public static int getMobValueLength(Cell cell) { + return Bytes.toInt(cell.getValueArray(), cell.getValueOffset(), Bytes.SIZEOF_INT); + } + + /** + * Gets the mob file name from the mob ref cell. + * A mob ref cell has a mob reference tag. + * The value of a mob ref cell consists of two parts, real mob value length and mob file name. + * The real mob value length takes 4 bytes. + * The remaining part is the mob file name. + * @param cell The mob ref cell. + * @return The mob file name. + */ + public static String getMobFileName(Cell cell) { + return Bytes.toString(cell.getValueArray(), cell.getValueOffset() + Bytes.SIZEOF_INT, + cell.getValueLength() - Bytes.SIZEOF_INT); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java index 17d380255ad..9c6f34e6cac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java @@ -219,9 +219,8 @@ public class HMobStore extends HStore { */ public Cell resolve(Cell reference, boolean cacheBlocks) throws IOException { Cell result = null; - if (reference.getValueLength() > Bytes.SIZEOF_INT) { - String fileName = Bytes.toString(reference.getValueArray(), reference.getValueOffset() - + Bytes.SIZEOF_INT, reference.getValueLength() - Bytes.SIZEOF_INT); + if (MobUtils.isValidMobRefCellValue(reference)) { + String fileName = MobUtils.getMobFileName(reference); Path targetPath = new Path(mobFamilyPath, fileName); MobFile file = null; try { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 2b053a6715d..13967c2a3d5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -43,7 +43,9 @@ import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.regionserver.StoreScanner; +import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.util.StringUtils; /** @@ -57,7 +59,7 @@ public abstract class Compactor { protected Configuration conf; protected Store store; - private int compactionKVMax; + protected int compactionKVMax; protected Compression.Algorithm compactionCompression; /** specify how many days to keep MVCC values during major compaction **/ @@ -92,6 +94,8 @@ public abstract class Compactor { public long maxKeyCount = 0; /** Earliest put timestamp if major compaction */ public long earliestPutTs = HConstants.LATEST_TIMESTAMP; + /** Latest put timestamp */ + public long latestPutTs = HConstants.LATEST_TIMESTAMP; /** The last key in the files we're compacting. */ public long maxSeqId = 0; /** Latest memstore read point found in any of the involved files */ @@ -158,6 +162,14 @@ public abstract class Compactor { fd.earliestPutTs = Math.min(fd.earliestPutTs, earliestPutTs); } } + tmp = fileInfo.get(StoreFile.TIMERANGE_KEY); + TimeRangeTracker trt = new TimeRangeTracker(); + if (tmp == null) { + fd.latestPutTs = HConstants.LATEST_TIMESTAMP; + } else { + Writables.copyWritable(tmp, trt); + fd.latestPutTs = trt.getMaximumTimestamp(); + } if (LOG.isDebugEnabled()) { LOG.debug("Compacting " + file + ", keycount=" + keyCount + @@ -216,14 +228,16 @@ public abstract class Compactor { /** * Performs the compaction. + * @param fd File details * @param scanner Where to read from. * @param writer Where to write to. * @param smallestReadPoint Smallest read point. * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint + * @param major Is a major compaction. * @return Whether compaction ended; false if it was interrupted for some reason. */ - protected boolean performCompaction(InternalScanner scanner, - CellSink writer, long smallestReadPoint, boolean cleanSeqId) throws IOException { + protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, + long smallestReadPoint, boolean cleanSeqId, boolean major) throws IOException { int bytesWritten = 0; // Since scanner.next() can return 'false' but still be delivering data, // we have to use a do/while loop. @@ -241,9 +255,7 @@ public abstract class Compactor { // output to writer: for (Cell c : kvs) { KeyValue kv = KeyValueUtil.ensureKeyValue(c); - if (cleanSeqId && kv.getSequenceId() <= smallestReadPoint) { - CellUtil.setSequenceId(kv, 0); - } + resetSeqId(smallestReadPoint, cleanSeqId, kv); writer.append(kv); ++progress.currentCompactedKVs; progress.totalCompactedSize += kv.getLength(); @@ -309,4 +321,29 @@ public abstract class Compactor { return new StoreScanner(store, store.getScanInfo(), scan, scanners, smallestReadPoint, earliestPutTs, dropDeletesFromRow, dropDeletesToRow); } + + /** + * Resets the sequence id. + * @param smallestReadPoint The smallest mvcc readPoint across all the scanners in this region. + * @param cleanSeqId Should clean the sequence id. + * @param kv The current KeyValue. + */ + protected void resetSeqId(long smallestReadPoint, boolean cleanSeqId, KeyValue kv) { + if (cleanSeqId && kv.getSequenceId() <= smallestReadPoint) { + kv.setSequenceId(0); + } + } + + /** + * Appends the metadata and closes the writer. + * @param writer The current store writer. + * @param fd The file details. + * @param isMajor Is a major compaction. + * @throws IOException + */ + protected void appendMetadataAndCloseWriter(StoreFile.Writer writer, FileDetails fd, + boolean isMajor) throws IOException { + writer.appendMetadata(fd.maxSeqId, isMajor); + writer.close(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index d5b2b63064d..8056dd0fef1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@ -76,9 +76,9 @@ public class DefaultCompactor extends Compactor { smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint); cleanSeqId = true; } - writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true, - fd.maxMVCCReadpoint >= smallestReadPoint, fd.maxTagsLength > 0); - boolean finished = performCompaction(scanner, writer, smallestReadPoint, cleanSeqId); + writer = createTmpWriter(fd, smallestReadPoint); + boolean finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId, + request.isAllFiles()); if (!finished) { writer.close(); store.getFileSystem().delete(writer.getPath(), false); @@ -94,14 +94,27 @@ public class DefaultCompactor extends Compactor { } } finally { if (writer != null) { - writer.appendMetadata(fd.maxSeqId, request.isAllFiles()); - writer.close(); + appendMetadataAndCloseWriter(writer, fd, request.isAllFiles()); newFiles.add(writer.getPath()); } } return newFiles; } + /** + * Creates a writer for a new file in a temporary directory. + * @param fd The file details. + * @param smallestReadPoint The smallest mvcc readPoint across all the scanners in this region. + * @return Writer for a new StoreFile in the tmp dir. + * @throws IOException + */ + protected StoreFile.Writer createTmpWriter(FileDetails fd, long smallestReadPoint) + throws IOException { + StoreFile.Writer writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, + true, fd.maxMVCCReadpoint >= smallestReadPoint, fd.maxTagsLength > 0); + return writer; + } + /** * Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to * {@link #compact(CompactionRequest)}; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java index 487ff462011..3109015e5b7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java @@ -127,7 +127,8 @@ public class StripeCompactor extends Compactor { // It is ok here if storeScanner is null. StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null; mw.init(storeScanner, factory, store.getComparator()); - finished = performCompaction(scanner, mw, smallestReadPoint, cleanSeqId); + finished = performCompaction(fd, scanner, mw, smallestReadPoint, cleanSeqId, + request.isMajor()); if (!finished) { throw new InterruptedIOException( "Aborting compaction of store " + store + " in region " + store.getRegionInfo().getRegionNameAsString() + diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java new file mode 100644 index 00000000000..f8d6ce442a0 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java @@ -0,0 +1,332 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY; +import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseTestCase.HRegionIncommon; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.After; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +/** + * Test mob compaction + */ +@Category(MediumTests.class) +public class TestMobCompaction { + @Rule + public TestName name = new TestName(); + static final Log LOG = LogFactory.getLog(TestMobCompaction.class.getName()); + private HBaseTestingUtility UTIL = null; + private Configuration conf = null; + + private HRegion region = null; + private HTableDescriptor htd = null; + private HColumnDescriptor hcd = null; + private long mobCellThreshold = 1000; + + private FileSystem fs; + + private static final byte[] COLUMN_FAMILY = fam1; + private final byte[] STARTROW = Bytes.toBytes(START_KEY); + private int compactionThreshold; + + private void init(long mobThreshold) throws Exception { + this.mobCellThreshold = mobThreshold; + + UTIL = HBaseTestingUtility.createLocalHTU(); + + conf = UTIL.getConfiguration(); + compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3); + + htd = UTIL.createTableDescriptor(name.getMethodName()); + hcd = new HColumnDescriptor(COLUMN_FAMILY); + hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE)); + hcd.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(mobThreshold)); + hcd.setMaxVersions(1); + htd.addFamily(hcd); + + region = UTIL.createLocalHRegion(htd, null, null); + fs = FileSystem.get(conf); + } + + @After + public void tearDown() throws Exception { + region.close(); + fs.delete(UTIL.getDataTestDir(), true); + } + + /** + * During compaction, cells smaller than the threshold won't be affected. + */ + @Test + public void testSmallerValue() throws Exception { + init(500); + byte[] dummyData = makeDummyData(300); // smaller than mob threshold + HRegionIncommon loader = new HRegionIncommon(region); + // one hfile per row + for (int i = 0; i < compactionThreshold; i++) { + Put p = createPut(i, dummyData); + loader.put(p); + loader.flushcache(); + } + assertEquals("Before compaction: store files", compactionThreshold, countStoreFiles()); + assertEquals("Before compaction: mob file count", 0, countMobFiles()); + assertEquals("Before compaction: rows", compactionThreshold, countRows()); + assertEquals("Before compaction: mob rows", 0, countMobRows()); + + region.compactStores(); + + assertEquals("After compaction: store files", 1, countStoreFiles()); + assertEquals("After compaction: mob file count", 0, countMobFiles()); + assertEquals("After compaction: referenced mob file count", 0, countReferencedMobFiles()); + assertEquals("After compaction: rows", compactionThreshold, countRows()); + assertEquals("After compaction: mob rows", 0, countMobRows()); + } + + /** + * During compaction, the mob threshold size is changed. + */ + @Test + public void testLargerValue() throws Exception { + init(200); + byte[] dummyData = makeDummyData(300); // larger than mob threshold + HRegionIncommon loader = new HRegionIncommon(region); + for (int i = 0; i < compactionThreshold; i++) { + Put p = createPut(i, dummyData); + loader.put(p); + loader.flushcache(); + } + assertEquals("Before compaction: store files", compactionThreshold, countStoreFiles()); + assertEquals("Before compaction: mob file count", compactionThreshold, countMobFiles()); + assertEquals("Before compaction: rows", compactionThreshold, countRows()); + assertEquals("Before compaction: mob rows", compactionThreshold, countMobRows()); + // Change the threshold larger than the data size + region.getTableDesc().getFamily(COLUMN_FAMILY).setValue( + MobConstants.MOB_THRESHOLD, Bytes.toBytes(500L)); + region.initialize(); + region.compactStores(true); + assertEquals("After compaction: store files", 1, countStoreFiles()); + assertEquals("After compaction: mob file count", compactionThreshold, countMobFiles()); + assertEquals("After compaction: referenced mob file count", 0, countReferencedMobFiles()); + assertEquals("After compaction: rows", compactionThreshold, countRows()); + assertEquals("After compaction: mob rows", 0, countMobRows()); + } + + /** + * This test will first generate store files, then bulk load them and trigger the compaction. When + * compaction, the cell value will be larger than the threshold. + */ + @Test + public void testMobCompactionWithBulkload() throws Exception { + // The following will produce store files of 600. + init(300); + byte[] dummyData = makeDummyData(600); + + Path hbaseRootDir = FSUtils.getRootDir(conf); + Path basedir = new Path(hbaseRootDir, htd.getNameAsString()); + List> hfiles = new ArrayList>(1); + for (int i = 0; i < compactionThreshold; i++) { + Path hpath = new Path(basedir, "hfile" + i); + hfiles.add(Pair.newPair(COLUMN_FAMILY, hpath.toString())); + createHFile(hpath, i, dummyData); + } + + // The following will bulk load the above generated store files and compact, with 600(fileSize) + // > 300(threshold) + boolean result = region.bulkLoadHFiles(hfiles, true); + assertTrue("Bulkload result:", result); + assertEquals("Before compaction: store files", compactionThreshold, countStoreFiles()); + assertEquals("Before compaction: mob file count", 0, countMobFiles()); + assertEquals("Before compaction: rows", compactionThreshold, countRows()); + assertEquals("Before compaction: mob rows", 0, countMobRows()); + assertEquals("Before compaction: referenced mob file count", 0, countReferencedMobFiles()); + + region.compactStores(); + + assertEquals("After compaction: store files", 1, countStoreFiles()); + assertEquals("After compaction: mob file count:", 1, countMobFiles()); + assertEquals("After compaction: rows", compactionThreshold, countRows()); + assertEquals("After compaction: mob rows", compactionThreshold, countMobRows()); + assertEquals("After compaction: referenced mob file count", 1, countReferencedMobFiles()); + } + + private int countStoreFiles() throws IOException { + Store store = region.getStore(COLUMN_FAMILY); + return store.getStorefilesCount(); + } + + private int countMobFiles() throws IOException { + Path mobDirPath = new Path(MobUtils.getMobRegionPath(conf, htd.getTableName()), + hcd.getNameAsString()); + if (fs.exists(mobDirPath)) { + FileStatus[] files = UTIL.getTestFileSystem().listStatus(mobDirPath); + return files.length; + } + return 0; + } + + private Put createPut(int rowIdx, byte[] dummyData) throws IOException { + Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(rowIdx))); + p.setDurability(Durability.SKIP_WAL); + p.add(COLUMN_FAMILY, Bytes.toBytes("colX"), dummyData); + return p; + } + + /** + * Create an HFile with the given number of bytes + */ + private void createHFile(Path path, int rowIdx, byte[] dummyData) throws IOException { + HFileContext meta = new HFileContextBuilder().build(); + HFile.Writer writer = HFile.getWriterFactory(conf, new CacheConfig(conf)).withPath(fs, path) + .withFileContext(meta).create(); + long now = System.currentTimeMillis(); + try { + KeyValue kv = new KeyValue(Bytes.add(STARTROW, Bytes.toBytes(rowIdx)), COLUMN_FAMILY, + Bytes.toBytes("colX"), now, dummyData); + writer.append(kv); + } finally { + writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis())); + writer.close(); + } + } + + private int countMobRows() throws IOException { + Scan scan = new Scan(); + // Do not retrieve the mob data when scanning + scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE)); + InternalScanner scanner = region.getScanner(scan); + + int scannedCount = 0; + List results = new ArrayList(); + boolean hasMore = scanner.next(results); + while (hasMore) { + for (Cell c : results) { + if (MobUtils.isMobReferenceCell(c)) { + scannedCount++; + } + } + hasMore = scanner.next(results); + } + scanner.close(); + + return scannedCount; + } + + private int countRows() throws IOException { + Scan scan = new Scan(); + // Do not retrieve the mob data when scanning + InternalScanner scanner = region.getScanner(scan); + + int scannedCount = 0; + List results = new ArrayList(); + boolean hasMore = scanner.next(results); + while (hasMore) { + scannedCount += results.size(); + hasMore = scanner.next(results); + } + scanner.close(); + + return scannedCount; + } + + private byte[] makeDummyData(int size) { + byte[] dummyData = new byte[size]; + new Random().nextBytes(dummyData); + return dummyData; + } + + private int countReferencedMobFiles() throws IOException { + Scan scan = new Scan(); + // Do not retrieve the mob data when scanning + scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE)); + InternalScanner scanner = region.getScanner(scan); + + List kvs = new ArrayList(); + boolean hasMore = true; + String fileName; + Set files = new HashSet(); + do { + kvs.clear(); + hasMore = scanner.next(kvs); + for (Cell c : kvs) { + KeyValue kv = KeyValueUtil.ensureKeyValue(c); + if (!MobUtils.isMobReferenceCell(kv)) { + continue; + } + if (!MobUtils.isValidMobRefCellValue(kv)) { + continue; + } + int size = MobUtils.getMobValueLength(kv); + if (size <= mobCellThreshold) { + continue; + } + fileName = MobUtils.getMobFileName(kv); + if (fileName.isEmpty()) { + continue; + } + files.add(fileName); + Path familyPath = MobUtils.getMobFamilyPath(conf, htd.getTableName(), + hcd.getNameAsString()); + assertTrue(fs.exists(new Path(familyPath, fileName))); + } + } while (hasMore); + + scanner.close(); + + return files.size(); + } +}