diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
index a21f7de1470..8613a3534d3 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
@@ -28,4 +28,8 @@ public final class TagType {
public static final byte VISIBILITY_TAG_TYPE = (byte) 2;
public static final byte LOG_REPLAY_TAG_TYPE = (byte) 3;
public static final byte VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE = (byte)4;
+
+ // mob tags
+ public static final byte MOB_REFERENCE_TAG_TYPE = (byte) 5;
+ public static final byte MOB_TABLE_NAME_TAG_TYPE = (byte) 6;
}
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index c724c27cbf9..872bbc5f07b 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -1454,4 +1454,33 @@ possible configurations would overwhelm and obscure the important.
hbase.http.staticuser.user
dr.stack
+
+
+ hbase.mob.file.cache.size
+ 1000
+
+ Number of opened file handlers to cache.
+ A larger value will benefit reads by providing more file handlers per mob
+ file cache and would reduce frequent file opening and closing.
+ However, if this is set too high, this could lead to a "too many opened file handlers"
+ The default value is 1000.
+
+
+
+ hbase.mob.cache.evict.period
+ 3600
+
+ The amount of time in seconds before the mob cache evicts cached mob files.
+ The default value is 3600 seconds.
+
+
+
+ hbase.mob.cache.evict.remain.ratio
+ 0.5f
+
+ The ratio (between 0.0 and 1.0) of files that remains cached after an eviction
+ is triggered when the number of cached mob files exceeds the hbase.mob.file.cache.size.
+ The default value is 0.5f.
+
+
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java
index 730da734e92..b5cdae9e725 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CoordinatedStateException;
+import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
@@ -36,6 +37,7 @@ import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
@@ -43,6 +45,8 @@ import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.master.RegionState.State;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
@InterfaceAudience.Private
public class DeleteTableHandler extends TableEventHandler {
@@ -152,10 +156,36 @@ public class DeleteTableHandler extends TableEventHandler {
tempTableDir, HRegion.getRegionDir(tempTableDir, hri.getEncodedName()));
}
+ // Archive the mob data if there is a mob-enabled column
+ HColumnDescriptor[] hcds = hTableDescriptor.getColumnFamilies();
+ boolean hasMob = false;
+ for (HColumnDescriptor hcd : hcds) {
+ if (MobUtils.isMobFamily(hcd)) {
+ hasMob = true;
+ break;
+ }
+ }
+ Path mobTableDir = null;
+ if (hasMob) {
+ // Archive mob data
+ mobTableDir = FSUtils.getTableDir(new Path(mfs.getRootDir(), MobConstants.MOB_DIR_NAME),
+ tableName);
+ Path regionDir =
+ new Path(mobTableDir, MobUtils.getMobRegionInfo(tableName).getEncodedName());
+ if (fs.exists(regionDir)) {
+ HFileArchiver.archiveRegion(fs, mfs.getRootDir(), mobTableDir, regionDir);
+ }
+ }
// 4. Delete table directory from FS (temp directory)
if (!fs.delete(tempTableDir, true)) {
LOG.error("Couldn't delete " + tempTableDir);
}
+ // Delete the table directory where the mob files are saved
+ if (hasMob && mobTableDir != null && fs.exists(mobTableDir)) {
+ if (!fs.delete(mobTableDir, true)) {
+ LOG.error("Couldn't delete " + mobTableDir);
+ }
+ }
LOG.debug("Table '" + tableName + "' archived!");
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/CachedMobFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/CachedMobFile.java
new file mode 100644
index 00000000000..457eb6c50ed
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/CachedMobFile.java
@@ -0,0 +1,114 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+
+/**
+ * Cached mob file.
+ */
+@InterfaceAudience.Private
+public class CachedMobFile extends MobFile implements Comparable {
+
+ private long accessCount;
+ private AtomicLong referenceCount = new AtomicLong(0);
+
+ public CachedMobFile(StoreFile sf) {
+ super(sf);
+ }
+
+ public static CachedMobFile create(FileSystem fs, Path path, Configuration conf,
+ CacheConfig cacheConf) throws IOException {
+ StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE);
+ return new CachedMobFile(sf);
+ }
+
+ public void access(long accessCount) {
+ this.accessCount = accessCount;
+ }
+
+ public int compareTo(CachedMobFile that) {
+ if (this.accessCount == that.accessCount) return 0;
+ return this.accessCount < that.accessCount ? 1 : -1;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof CachedMobFile)) {
+ return false;
+ }
+ return compareTo((CachedMobFile) obj) == 0;
+ }
+
+ @Override
+ public int hashCode() {
+ return (int)(accessCount ^ (accessCount >>> 32));
+ }
+
+ /**
+ * Opens the mob file if it's not opened yet and increases the reference.
+ * It's not thread-safe. Use MobFileCache.openFile() instead.
+ * The reader of the mob file is just opened when it's not opened no matter how many times
+ * this open() method is invoked.
+ * The reference is a counter that how many times this reader is referenced. When the
+ * reference is 0, this reader is closed.
+ */
+ @Override
+ public void open() throws IOException {
+ super.open();
+ referenceCount.incrementAndGet();
+ }
+
+ /**
+ * Decreases the reference of the underlying reader for the mob file.
+ * It's not thread-safe. Use MobFileCache.closeFile() instead.
+ * This underlying reader isn't closed until the reference is 0.
+ */
+ @Override
+ public void close() throws IOException {
+ long refs = referenceCount.decrementAndGet();
+ if (refs == 0) {
+ super.close();
+ }
+ }
+
+ /**
+ * Gets the reference of the current mob file.
+ * Internal usage, currently it's for testing.
+ * @return The reference of the current mob file.
+ */
+ public long getReferenceCount() {
+ return this.referenceCount.longValue();
+ }
+}
\ No newline at end of file
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
new file mode 100644
index 00000000000..4a036a70e1f
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
@@ -0,0 +1,217 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
+import org.apache.hadoop.hbase.regionserver.HMobStore;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * An implementation of the StoreFlusher. It extends the DefaultStoreFlusher.
+ * If the store is not a mob store, the flusher flushes the MemStore the same with
+ * DefaultStoreFlusher,
+ * If the store is a mob store, the flusher flushes the MemStore into two places.
+ * One is the store files of HBase, the other is the mob files.
+ *
+ * - Cells that are not PUT type or have the delete mark will be directly flushed to HBase.
+ * - If the size of a cell value is larger than a threshold, it'll be flushed
+ * to a mob file, another cell with the path of this file will be flushed to HBase.
+ * - If the size of a cell value is smaller than or equal with a threshold, it'll be flushed to
+ * HBase directly.
+ *
+ *
+ */
+@InterfaceAudience.Private
+public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
+
+ private static final Log LOG = LogFactory.getLog(DefaultMobStoreFlusher.class);
+ private final Object flushLock = new Object();
+ private long mobCellValueSizeThreshold = 0;
+ private Path targetPath;
+ private HMobStore mobStore;
+
+ public DefaultMobStoreFlusher(Configuration conf, Store store) throws IOException {
+ super(conf, store);
+ mobCellValueSizeThreshold = MobUtils.getMobThreshold(store.getFamily());
+ this.targetPath = MobUtils.getMobFamilyPath(conf, store.getTableName(),
+ store.getColumnFamilyName());
+ if (!this.store.getFileSystem().exists(targetPath)) {
+ this.store.getFileSystem().mkdirs(targetPath);
+ }
+ this.mobStore = (HMobStore) store;
+ }
+
+ /**
+ * Flushes the snapshot of the MemStore.
+ * If this store is not a mob store, flush the cells in the snapshot to store files of HBase.
+ * If the store is a mob one, the flusher flushes the MemStore into two places.
+ * One is the store files of HBase, the other is the mob files.
+ *
+ * - Cells that are not PUT type or have the delete mark will be directly flushed to
+ * HBase.
+ * - If the size of a cell value is larger than a threshold, it'll be
+ * flushed to a mob file, another cell with the path of this file will be flushed to HBase.
+ * - If the size of a cell value is smaller than or equal with a threshold, it'll be flushed to
+ * HBase directly.
+ *
+ */
+ @Override
+ public List flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
+ MonitoredTask status) throws IOException {
+ ArrayList result = new ArrayList();
+ int cellsCount = snapshot.getCellsCount();
+ if (cellsCount == 0) return result; // don't flush if there are no entries
+
+ // Use a store scanner to find which rows to flush.
+ long smallestReadPoint = store.getSmallestReadPoint();
+ InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint);
+ if (scanner == null) {
+ return result; // NULL scanner returned from coprocessor hooks means skip normal processing
+ }
+ StoreFile.Writer writer;
+ try {
+ // TODO: We can fail in the below block before we complete adding this flush to
+ // list of store files. Add cleanup of anything put on filesystem if we fail.
+ synchronized (flushLock) {
+ status.setStatus("Flushing " + store + ": creating writer");
+ // Write the map out to the disk
+ writer = store.createWriterInTmp(cellsCount, store.getFamily().getCompression(),
+ false, true, true);
+ writer.setTimeRangeTracker(snapshot.getTimeRangeTracker());
+ try {
+ // It's a mob store, flush the cells in a mob way. This is the difference of flushing
+ // between a normal and a mob store.
+ performMobFlush(snapshot, cacheFlushId, scanner, writer, status);
+ } finally {
+ finalizeWriter(writer, cacheFlushId, status);
+ }
+ }
+ } finally {
+ scanner.close();
+ }
+ LOG.info("Flushed, sequenceid=" + cacheFlushId + ", memsize="
+ + snapshot.getSize() + ", hasBloomFilter=" + writer.hasGeneralBloom()
+ + ", into tmp file " + writer.getPath());
+ result.add(writer.getPath());
+ return result;
+ }
+
+ /**
+ * Flushes the cells in the mob store.
+ * In the mob store, the cells with PUT type might have or have no mob tags.
+ * - If a cell does not have a mob tag, flushing the cell to different files depends
+ * on the value length. If the length is larger than a threshold, it's flushed to a
+ * mob file and the mob file is flushed to a store file in HBase. Otherwise, directly
+ * flush the cell to a store file in HBase.
+ * - If a cell have a mob tag, its value is a mob file name, directly flush it
+ * to a store file in HBase.
+ *
+ * @param snapshot Memstore snapshot.
+ * @param cacheFlushId Log cache flush sequence number.
+ * @param scanner The scanner of memstore snapshot.
+ * @param writer The store file writer.
+ * @param status Task that represents the flush operation and may be updated with status.
+ * @throws IOException
+ */
+ protected void performMobFlush(MemStoreSnapshot snapshot, long cacheFlushId,
+ InternalScanner scanner, StoreFile.Writer writer, MonitoredTask status) throws IOException {
+ StoreFile.Writer mobFileWriter = null;
+ int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX,
+ HConstants.COMPACTION_KV_MAX_DEFAULT);
+ long mobKVCount = 0;
+ long time = snapshot.getTimeRangeTracker().getMaximumTimestamp();
+ mobFileWriter = mobStore.createWriterInTmp(new Date(time), snapshot.getCellsCount(),
+ store.getFamily().getCompression(), store.getRegionInfo().getStartKey());
+ // the target path is {tableName}/.mob/{cfName}/mobFiles
+ // the relative path is mobFiles
+ byte[] fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
+ try {
+ Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, store.getTableName()
+ .getName());
+ List cells = new ArrayList();
+ boolean hasMore;
+ do {
+ hasMore = scanner.next(cells, compactionKVMax);
+ if (!cells.isEmpty()) {
+ for (Cell c : cells) {
+ // If we know that this KV is going to be included always, then let us
+ // set its memstoreTS to 0. This will help us save space when writing to
+ // disk.
+ KeyValue kv = KeyValueUtil.ensureKeyValue(c);
+ if (kv.getValueLength() <= mobCellValueSizeThreshold || MobUtils.isMobReferenceCell(kv)
+ || kv.getTypeByte() != KeyValue.Type.Put.getCode()) {
+ writer.append(kv);
+ } else {
+ // append the original keyValue in the mob file.
+ mobFileWriter.append(kv);
+ mobKVCount++;
+
+ // append the tags to the KeyValue.
+ // The key is same, the value is the filename of the mob file
+ KeyValue reference = MobUtils.createMobRefKeyValue(kv, fileName, tableNameTag);
+ writer.append(reference);
+ }
+ }
+ cells.clear();
+ }
+ } while (hasMore);
+ } finally {
+ status.setStatus("Flushing mob file " + store + ": appending metadata");
+ mobFileWriter.appendMetadata(cacheFlushId, false);
+ status.setStatus("Flushing mob file " + store + ": closing flushed file");
+ mobFileWriter.close();
+ }
+
+ if (mobKVCount > 0) {
+ // commit the mob file from temp folder to target folder.
+ // If the mob file is committed successfully but the store file is not,
+ // the committed mob file will be handled by the sweep tool as an unused
+ // file.
+ mobStore.commitFile(mobFileWriter.getPath(), targetPath);
+ } else {
+ try {
+ // If the mob file is empty, delete it instead of committing.
+ store.getFileSystem().delete(mobFileWriter.getPath(), true);
+ } catch (IOException e) {
+ LOG.error("Fail to delete the temp mob file", e);
+ }
+ }
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobCacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobCacheConfig.java
new file mode 100644
index 00000000000..b16001085f8
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobCacheConfig.java
@@ -0,0 +1,56 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+
+/**
+ * The cache configuration for the mob.
+ */
+@InterfaceAudience.Private
+public class MobCacheConfig extends CacheConfig {
+
+ private static MobFileCache mobFileCache;
+
+ public MobCacheConfig(Configuration conf, HColumnDescriptor family) {
+ super(conf, family);
+ instantiateMobFileCache(conf);
+ }
+
+ /**
+ * Instantiates the MobFileCache.
+ * @param conf The current configuration.
+ */
+ public static synchronized void instantiateMobFileCache(Configuration conf) {
+ if (mobFileCache == null) {
+ mobFileCache = new MobFileCache(conf);
+ }
+ }
+
+ /**
+ * Gets the MobFileCache.
+ * @return The MobFileCache.
+ */
+ public MobFileCache getMobFileCache() {
+ return mobFileCache;
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java
new file mode 100644
index 00000000000..d2087224cc7
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java
@@ -0,0 +1,61 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * The constants used in mob.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class MobConstants {
+
+ public static final byte[] IS_MOB = Bytes.toBytes("isMob");
+ public static final byte[] MOB_THRESHOLD = Bytes.toBytes("mobThreshold");
+ public static final long DEFAULT_MOB_THRESHOLD = 100 * 1024; // 100k
+
+ public static final String MOB_SCAN_RAW = "hbase.mob.scan.raw";
+ public static final String MOB_CACHE_BLOCKS = "hbase.mob.cache.blocks";
+
+ public static final String MOB_FILE_CACHE_SIZE_KEY = "hbase.mob.file.cache.size";
+ public static final int DEFAULT_MOB_FILE_CACHE_SIZE = 1000;
+
+ public static final String MOB_DIR_NAME = "mobdir";
+ public static final String MOB_REGION_NAME = ".mob";
+ public static final byte[] MOB_REGION_NAME_BYTES = Bytes.toBytes(MOB_REGION_NAME);
+
+ public static final String MOB_CACHE_EVICT_PERIOD = "hbase.mob.cache.evict.period";
+ public static final String MOB_CACHE_EVICT_REMAIN_RATIO = "hbase.mob.cache.evict.remain.ratio";
+ public static final Tag MOB_REF_TAG = new Tag(TagType.MOB_REFERENCE_TAG_TYPE,
+ HConstants.EMPTY_BYTE_ARRAY);
+
+ public static final float DEFAULT_EVICT_REMAIN_RATIO = 0.5f;
+ public static final long DEFAULT_MOB_CACHE_EVICT_PERIOD = 3600l;
+
+ public final static String TEMP_DIR_NAME = ".tmp";
+ private MobConstants() {
+
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java
new file mode 100644
index 00000000000..a120057c89a
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java
@@ -0,0 +1,140 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
+
+/**
+ * The mob file.
+ */
+@InterfaceAudience.Private
+public class MobFile {
+
+ private StoreFile sf;
+
+ // internal use only for sub classes
+ protected MobFile() {
+ }
+
+ protected MobFile(StoreFile sf) {
+ this.sf = sf;
+ }
+
+ /**
+ * Internal use only. This is used by the sweeper.
+ *
+ * @return The store file scanner.
+ * @throws IOException
+ */
+ public StoreFileScanner getScanner() throws IOException {
+ List sfs = new ArrayList();
+ sfs.add(sf);
+ List sfScanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true,
+ false, null, sf.getMaxMemstoreTS());
+
+ return sfScanners.get(0);
+ }
+
+ /**
+ * Reads a cell from the mob file.
+ * @param search The cell need to be searched in the mob file.
+ * @param cacheMobBlocks Should this scanner cache blocks.
+ * @return The cell in the mob file.
+ * @throws IOException
+ */
+ public Cell readCell(Cell search, boolean cacheMobBlocks) throws IOException {
+ Cell result = null;
+ StoreFileScanner scanner = null;
+ List sfs = new ArrayList();
+ sfs.add(sf);
+ try {
+ List sfScanners = StoreFileScanner.getScannersForStoreFiles(sfs,
+ cacheMobBlocks, true, false, null, sf.getMaxMemstoreTS());
+ if (!sfScanners.isEmpty()) {
+ scanner = sfScanners.get(0);
+ if (scanner.seek(search)) {
+ result = scanner.peek();
+ }
+ }
+ } finally {
+ if (scanner != null) {
+ scanner.close();
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Gets the file name.
+ * @return The file name.
+ */
+ public String getFileName() {
+ return sf.getPath().getName();
+ }
+
+ /**
+ * Opens the underlying reader.
+ * It's not thread-safe. Use MobFileCache.openFile() instead.
+ * @throws IOException
+ */
+ public void open() throws IOException {
+ if (sf.getReader() == null) {
+ sf.createReader();
+ }
+ }
+
+ /**
+ * Closes the underlying reader, but do no evict blocks belonging to this file.
+ * It's not thread-safe. Use MobFileCache.closeFile() instead.
+ * @throws IOException
+ */
+ public void close() throws IOException {
+ if (sf != null) {
+ sf.closeReader(false);
+ sf = null;
+ }
+ }
+
+ /**
+ * Creates an instance of the MobFile.
+ * @param fs The file system.
+ * @param path The path of the underlying StoreFile.
+ * @param conf The configuration.
+ * @param cacheConf The CacheConfig.
+ * @return An instance of the MobFile.
+ * @throws IOException
+ */
+ public static MobFile create(FileSystem fs, Path path, Configuration conf, CacheConfig cacheConf)
+ throws IOException {
+ StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE);
+ return new MobFile(sf);
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java
new file mode 100644
index 00000000000..97530b1d8d3
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java
@@ -0,0 +1,270 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.util.IdLock;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * The cache for mob files.
+ * This cache doesn't cache the mob file blocks. It only caches the references of mob files.
+ * We are doing this to avoid opening and closing mob files all the time. We just keep
+ * references open.
+ */
+@InterfaceAudience.Private
+public class MobFileCache {
+
+ private static final Log LOG = LogFactory.getLog(MobFileCache.class);
+
+ /*
+ * Eviction and statistics thread. Periodically run to print the statistics and
+ * evict the lru cached mob files when the count of the cached files is larger
+ * than the threshold.
+ */
+ static class EvictionThread extends Thread {
+ MobFileCache lru;
+
+ public EvictionThread(MobFileCache lru) {
+ super("MobFileCache.EvictionThread");
+ setDaemon(true);
+ this.lru = lru;
+ }
+
+ @Override
+ public void run() {
+ lru.evict();
+ }
+ }
+
+ // a ConcurrentHashMap, accesses to this map are synchronized.
+ private Map map = null;
+ // caches access count
+ private final AtomicLong count;
+ private long lastAccess;
+ private final AtomicLong miss;
+ private long lastMiss;
+
+ // a lock to sync the evict to guarantee the eviction occurs in sequence.
+ // the method evictFile is not sync by this lock, the ConcurrentHashMap does the sync there.
+ private final ReentrantLock evictionLock = new ReentrantLock(true);
+
+ //stripes lock on each mob file based on its hash. Sync the openFile/closeFile operations.
+ private final IdLock keyLock = new IdLock();
+
+ private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1,
+ new ThreadFactoryBuilder().setNameFormat("MobFileCache #%d").setDaemon(true).build());
+ private final Configuration conf;
+
+ // the count of the cached references to mob files
+ private final int mobFileMaxCacheSize;
+ private final boolean isCacheEnabled;
+ private float evictRemainRatio;
+
+ public MobFileCache(Configuration conf) {
+ this.conf = conf;
+ this.mobFileMaxCacheSize = conf.getInt(MobConstants.MOB_FILE_CACHE_SIZE_KEY,
+ MobConstants.DEFAULT_MOB_FILE_CACHE_SIZE);
+ isCacheEnabled = (mobFileMaxCacheSize > 0);
+ map = new ConcurrentHashMap(mobFileMaxCacheSize);
+ this.count = new AtomicLong(0);
+ this.miss = new AtomicLong(0);
+ this.lastAccess = 0;
+ this.lastMiss = 0;
+ if (isCacheEnabled) {
+ long period = conf.getLong(MobConstants.MOB_CACHE_EVICT_PERIOD,
+ MobConstants.DEFAULT_MOB_CACHE_EVICT_PERIOD); // in seconds
+ evictRemainRatio = conf.getFloat(MobConstants.MOB_CACHE_EVICT_REMAIN_RATIO,
+ MobConstants.DEFAULT_EVICT_REMAIN_RATIO);
+ if (evictRemainRatio < 0.0) {
+ evictRemainRatio = 0.0f;
+ LOG.warn(MobConstants.MOB_CACHE_EVICT_REMAIN_RATIO + " is less than 0.0, 0.0 is used.");
+ } else if (evictRemainRatio > 1.0) {
+ evictRemainRatio = 1.0f;
+ LOG.warn(MobConstants.MOB_CACHE_EVICT_REMAIN_RATIO + " is larger than 1.0, 1.0 is used.");
+ }
+ this.scheduleThreadPool.scheduleAtFixedRate(new EvictionThread(this), period, period,
+ TimeUnit.SECONDS);
+ }
+ LOG.info("MobFileCache is initialized, and the cache size is " + mobFileMaxCacheSize);
+ }
+
+ /**
+ * Evicts the lru cached mob files when the count of the cached files is larger
+ * than the threshold.
+ */
+ public void evict() {
+ if (isCacheEnabled) {
+ // Ensure only one eviction at a time
+ if (!evictionLock.tryLock()) {
+ return;
+ }
+ printStatistics();
+ List evictedFiles = new ArrayList();
+ try {
+ if (map.size() <= mobFileMaxCacheSize) {
+ return;
+ }
+ List files = new ArrayList(map.values());
+ Collections.sort(files);
+ int start = (int) (mobFileMaxCacheSize * evictRemainRatio);
+ if (start >= 0) {
+ for (int i = start; i < files.size(); i++) {
+ String name = files.get(i).getFileName();
+ CachedMobFile evictedFile = map.remove(name);
+ if (evictedFile != null) {
+ evictedFiles.add(evictedFile);
+ }
+ }
+ }
+ } finally {
+ evictionLock.unlock();
+ }
+ // EvictionLock is released. Close the evicted files one by one.
+ // The closes are sync in the closeFile method.
+ for (CachedMobFile evictedFile : evictedFiles) {
+ closeFile(evictedFile);
+ }
+ }
+ }
+
+ /**
+ * Evicts the cached file by the name.
+ * @param fileName The name of a cached file.
+ */
+ public void evictFile(String fileName) {
+ if (isCacheEnabled) {
+ IdLock.Entry lockEntry = null;
+ try {
+ // obtains the lock to close the cached file.
+ lockEntry = keyLock.getLockEntry(fileName.hashCode());
+ CachedMobFile evictedFile = map.remove(fileName);
+ if (evictedFile != null) {
+ evictedFile.close();
+ }
+ } catch (IOException e) {
+ LOG.error("Fail to evict the file " + fileName, e);
+ } finally {
+ if (lockEntry != null) {
+ keyLock.releaseLockEntry(lockEntry);
+ }
+ }
+ }
+ }
+
+ /**
+ * Opens a mob file.
+ * @param fs The current file system.
+ * @param path The file path.
+ * @param cacheConf The current MobCacheConfig
+ * @return A opened mob file.
+ * @throws IOException
+ */
+ public MobFile openFile(FileSystem fs, Path path, MobCacheConfig cacheConf) throws IOException {
+ if (!isCacheEnabled) {
+ return MobFile.create(fs, path, conf, cacheConf);
+ } else {
+ String fileName = path.getName();
+ CachedMobFile cached = map.get(fileName);
+ IdLock.Entry lockEntry = keyLock.getLockEntry(fileName.hashCode());
+ try {
+ if (cached == null) {
+ cached = map.get(fileName);
+ if (cached == null) {
+ if (map.size() > mobFileMaxCacheSize) {
+ evict();
+ }
+ cached = CachedMobFile.create(fs, path, conf, cacheConf);
+ cached.open();
+ map.put(fileName, cached);
+ miss.incrementAndGet();
+ }
+ }
+ cached.open();
+ cached.access(count.incrementAndGet());
+ } finally {
+ keyLock.releaseLockEntry(lockEntry);
+ }
+ return cached;
+ }
+ }
+
+ /**
+ * Closes a mob file.
+ * @param file The mob file that needs to be closed.
+ */
+ public void closeFile(MobFile file) {
+ IdLock.Entry lockEntry = null;
+ try {
+ if (!isCacheEnabled) {
+ file.close();
+ } else {
+ lockEntry = keyLock.getLockEntry(file.getFileName().hashCode());
+ file.close();
+ }
+ } catch (IOException e) {
+ LOG.error("MobFileCache, Exception happen during close " + file.getFileName(), e);
+ } finally {
+ if (lockEntry != null) {
+ keyLock.releaseLockEntry(lockEntry);
+ }
+ }
+ }
+
+ /**
+ * Gets the count of cached mob files.
+ * @return The count of the cached mob files.
+ */
+ public int getCacheSize() {
+ return map == null ? 0 : map.size();
+ }
+
+ /**
+ * Prints the statistics.
+ */
+ public void printStatistics() {
+ long access = count.get() - lastAccess;
+ long missed = miss.get() - lastMiss;
+ long hitRate = (access - missed) * 100 / access;
+ LOG.info("MobFileCache Statistics, access: " + access + ", miss: " + missed + ", hit: "
+ + (access - missed) + ", hit rate: "
+ + ((access == 0) ? 0 : hitRate) + "%");
+ lastAccess += access;
+ lastMiss += missed;
+ }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileName.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileName.java
new file mode 100644
index 00000000000..937e965c596
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileName.java
@@ -0,0 +1,169 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.MD5Hash;
+
+/**
+ * The mob file name.
+ * It consists of a md5 of a start key, a date and an uuid.
+ * It looks like md5(start) + date + uuid.
+ *
+ * - 0-31 characters: md5 hex string of a start key. Since the length of the start key is not
+ * fixed, have to use the md5 instead which has a fix length.
+ * - 32-39 characters: a string of a date with format yyyymmdd. The date is the latest timestamp
+ * of cells in this file
+ * - the remaining characters: the uuid.
+ *
+ * Using md5 hex string of start key as the prefix of file name makes files with the same start
+ * key unique, they're different from the ones with other start keys
+ * The cells come from different regions might be in the same mob file by region split,
+ * this is allowed.
+ * Has the latest timestamp of cells in the file name in order to clean the expired mob files by
+ * TTL easily. If this timestamp is older than the TTL, it's regarded as expired.
+ */
+@InterfaceAudience.Private
+public class MobFileName {
+
+ private final String date;
+ private final String startKey;
+ private final String uuid;
+ private final String fileName;
+
+ /**
+ * @param startKey
+ * The start key.
+ * @param date
+ * The string of the latest timestamp of cells in this file, the format is yyyymmdd.
+ * @param uuid
+ * The uuid
+ */
+ private MobFileName(byte[] startKey, String date, String uuid) {
+ this.startKey = MD5Hash.getMD5AsHex(startKey, 0, startKey.length);
+ this.uuid = uuid;
+ this.date = date;
+ this.fileName = this.startKey + date + uuid;
+ }
+
+ /**
+ * @param startKey
+ * The md5 hex string of the start key.
+ * @param date
+ * The string of the latest timestamp of cells in this file, the format is yyyymmdd.
+ * @param uuid
+ * The uuid
+ */
+ private MobFileName(String startKey, String date, String uuid) {
+ this.startKey = startKey;
+ this.uuid = uuid;
+ this.date = date;
+ this.fileName = this.startKey + date + uuid;
+ }
+
+ /**
+ * Creates an instance of MobFileName
+ *
+ * @param startKey
+ * The start key.
+ * @param date
+ * The string of the latest timestamp of cells in this file, the format is yyyymmdd.
+ * @param uuid The uuid.
+ * @return An instance of a MobFileName.
+ */
+ public static MobFileName create(byte[] startKey, String date, String uuid) {
+ return new MobFileName(startKey, date, uuid);
+ }
+
+ /**
+ * Creates an instance of MobFileName
+ *
+ * @param startKey
+ * The md5 hex string of the start key.
+ * @param date
+ * The string of the latest timestamp of cells in this file, the format is yyyymmdd.
+ * @param uuid The uuid.
+ * @return An instance of a MobFileName.
+ */
+ public static MobFileName create(String startKey, String date, String uuid) {
+ return new MobFileName(startKey, date, uuid);
+ }
+
+ /**
+ * Creates an instance of MobFileName.
+ * @param fileName The string format of a file name.
+ * @return An instance of a MobFileName.
+ */
+ public static MobFileName create(String fileName) {
+ // The format of a file name is md5HexString(0-31bytes) + date(32-39bytes) + UUID
+ // The date format is yyyyMMdd
+ String startKey = fileName.substring(0, 32);
+ String date = fileName.substring(32, 40);
+ String uuid = fileName.substring(40);
+ return new MobFileName(startKey, date, uuid);
+ }
+
+ /**
+ * Gets the hex string of the md5 for a start key.
+ * @return The hex string of the md5 for a start key.
+ */
+ public String getStartKey() {
+ return startKey;
+ }
+
+ /**
+ * Gets the date string. Its format is yyyymmdd.
+ * @return The date string.
+ */
+ public String getDate() {
+ return this.date;
+ }
+
+ @Override
+ public int hashCode() {
+ StringBuilder builder = new StringBuilder();
+ builder.append(startKey);
+ builder.append(date);
+ builder.append(uuid);
+ return builder.toString().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object anObject) {
+ if (this == anObject) {
+ return true;
+ }
+ if (anObject instanceof MobFileName) {
+ MobFileName another = (MobFileName) anObject;
+ if (this.startKey.equals(another.startKey) && this.date.equals(another.date)
+ && this.uuid.equals(another.uuid)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Gets the file name.
+ * @return The file name.
+ */
+ public String getFileName() {
+ return this.fileName;
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java
new file mode 100644
index 00000000000..d5e6f2ef215
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
+import org.apache.hadoop.hbase.regionserver.Store;
+
+/**
+ * MobStoreEngine creates the mob specific compactor, and store flusher.
+ */
+@InterfaceAudience.Private
+public class MobStoreEngine extends DefaultStoreEngine {
+
+ @Override
+ protected void createStoreFlusher(Configuration conf, Store store) throws IOException {
+ // When using MOB, we use DefaultMobStoreFlusher always
+ // Just use the compactor and compaction policy as that in DefaultStoreEngine. We can have MOB
+ // specific compactor and policy when that is implemented.
+ storeFlusher = new DefaultMobStoreFlusher(conf, store);
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
new file mode 100644
index 00000000000..149eed41152
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
@@ -0,0 +1,263 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+/**
+ * The mob utilities
+ */
+@InterfaceAudience.Private
+public class MobUtils {
+
+ private static final ThreadLocal LOCAL_FORMAT =
+ new ThreadLocal() {
+ @Override
+ protected SimpleDateFormat initialValue() {
+ return new SimpleDateFormat("yyyyMMdd");
+ }
+ };
+
+ /**
+ * Indicates whether the column family is a mob one.
+ * @param hcd The descriptor of a column family.
+ * @return True if this column family is a mob one, false if it's not.
+ */
+ public static boolean isMobFamily(HColumnDescriptor hcd) {
+ byte[] isMob = hcd.getValue(MobConstants.IS_MOB);
+ return isMob != null && isMob.length == 1 && Bytes.toBoolean(isMob);
+ }
+
+ /**
+ * Gets the mob threshold.
+ * If the size of a cell value is larger than this threshold, it's regarded as a mob.
+ * The default threshold is 1024*100(100K)B.
+ * @param hcd The descriptor of a column family.
+ * @return The threshold.
+ */
+ public static long getMobThreshold(HColumnDescriptor hcd) {
+ byte[] threshold = hcd.getValue(MobConstants.MOB_THRESHOLD);
+ return threshold != null && threshold.length == Bytes.SIZEOF_LONG ? Bytes.toLong(threshold)
+ : MobConstants.DEFAULT_MOB_THRESHOLD;
+ }
+
+ /**
+ * Formats a date to a string.
+ * @param date The date.
+ * @return The string format of the date, it's yyyymmdd.
+ */
+ public static String formatDate(Date date) {
+ return LOCAL_FORMAT.get().format(date);
+ }
+
+ /**
+ * Parses the string to a date.
+ * @param dateString The string format of a date, it's yyyymmdd.
+ * @return A date.
+ * @throws ParseException
+ */
+ public static Date parseDate(String dateString) throws ParseException {
+ return LOCAL_FORMAT.get().parse(dateString);
+ }
+
+ /**
+ * Whether the current cell is a mob reference cell.
+ * @param cell The current cell.
+ * @return True if the cell has a mob reference tag, false if it doesn't.
+ */
+ public static boolean isMobReferenceCell(Cell cell) {
+ if (cell.getTagsLength() > 0) {
+ Tag tag = Tag.getTag(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength(),
+ TagType.MOB_REFERENCE_TAG_TYPE);
+ return tag != null;
+ }
+ return false;
+ }
+
+ /**
+ * Whether the tag list has a mob reference tag.
+ * @param tags The tag list.
+ * @return True if the list has a mob reference tag, false if it doesn't.
+ */
+ public static boolean hasMobReferenceTag(List tags) {
+ if (!tags.isEmpty()) {
+ for (Tag tag : tags) {
+ if (tag.getType() == TagType.MOB_REFERENCE_TAG_TYPE) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Indicates whether it's a raw scan.
+ * The information is set in the attribute "hbase.mob.scan.raw" of scan.
+ * For a mob cell, in a normal scan the scanners retrieves the mob cell from the mob file.
+ * In a raw scan, the scanner directly returns cell in HBase without retrieve the one in
+ * the mob file.
+ * @param scan The current scan.
+ * @return True if it's a raw scan.
+ */
+ public static boolean isRawMobScan(Scan scan) {
+ byte[] raw = scan.getAttribute(MobConstants.MOB_SCAN_RAW);
+ try {
+ return raw != null && Bytes.toBoolean(raw);
+ } catch (IllegalArgumentException e) {
+ return false;
+ }
+ }
+
+ /**
+ * Indicates whether the scan contains the information of caching blocks.
+ * The information is set in the attribute "hbase.mob.cache.blocks" of scan.
+ * @param scan The current scan.
+ * @return True when the Scan attribute specifies to cache the MOB blocks.
+ */
+ public static boolean isCacheMobBlocks(Scan scan) {
+ byte[] cache = scan.getAttribute(MobConstants.MOB_CACHE_BLOCKS);
+ try {
+ return cache != null && Bytes.toBoolean(cache);
+ } catch (IllegalArgumentException e) {
+ return false;
+ }
+ }
+
+ /**
+ * Sets the attribute of caching blocks in the scan.
+ *
+ * @param scan
+ * The current scan.
+ * @param cacheBlocks
+ * True, set the attribute of caching blocks into the scan, the scanner with this scan
+ * caches blocks.
+ * False, the scanner doesn't cache blocks for this scan.
+ */
+ public static void setCacheMobBlocks(Scan scan, boolean cacheBlocks) {
+ scan.setAttribute(MobConstants.MOB_CACHE_BLOCKS, Bytes.toBytes(cacheBlocks));
+ }
+
+ /**
+ * Gets the root dir of the mob files.
+ * It's {HBASE_DIR}/mobdir.
+ * @param conf The current configuration.
+ * @return the root dir of the mob file.
+ */
+ public static Path getMobHome(Configuration conf) {
+ Path hbaseDir = new Path(conf.get(HConstants.HBASE_DIR));
+ return new Path(hbaseDir, MobConstants.MOB_DIR_NAME);
+ }
+
+ /**
+ * Gets the region dir of the mob files.
+ * It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}.
+ * @param conf The current configuration.
+ * @param tableName The current table name.
+ * @return The region dir of the mob files.
+ */
+ public static Path getMobRegionPath(Configuration conf, TableName tableName) {
+ Path tablePath = FSUtils.getTableDir(MobUtils.getMobHome(conf), tableName);
+ HRegionInfo regionInfo = getMobRegionInfo(tableName);
+ return new Path(tablePath, regionInfo.getEncodedName());
+ }
+
+ /**
+ * Gets the family dir of the mob files.
+ * It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}/{columnFamilyName}.
+ * @param conf The current configuration.
+ * @param tableName The current table name.
+ * @param familyName The current family name.
+ * @return The family dir of the mob files.
+ */
+ public static Path getMobFamilyPath(Configuration conf, TableName tableName, String familyName) {
+ return new Path(getMobRegionPath(conf, tableName), familyName);
+ }
+
+ /**
+ * Gets the family dir of the mob files.
+ * It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}/{columnFamilyName}.
+ * @param regionPath The path of mob region which is a dummy one.
+ * @param familyName The current family name.
+ * @return The family dir of the mob files.
+ */
+ public static Path getMobFamilyPath(Path regionPath, String familyName) {
+ return new Path(regionPath, familyName);
+ }
+
+ /**
+ * Gets the HRegionInfo of the mob files.
+ * This is a dummy region. The mob files are not saved in a region in HBase.
+ * This is only used in mob snapshot. It's internally used only.
+ * @param tableName
+ * @return A dummy mob region info.
+ */
+ public static HRegionInfo getMobRegionInfo(TableName tableName) {
+ HRegionInfo info = new HRegionInfo(tableName, MobConstants.MOB_REGION_NAME_BYTES,
+ HConstants.EMPTY_END_ROW, false, 0);
+ return info;
+ }
+
+ /**
+ * Creates a mob reference KeyValue.
+ * The value of the mob reference KeyValue is mobCellValueSize + mobFileName.
+ * @param kv The original KeyValue.
+ * @param fileName The mob file name where the mob reference KeyValue is written.
+ * @param tableNameTag The tag of the current table name. It's very important in
+ * cloning the snapshot.
+ * @return The mob reference KeyValue.
+ */
+ public static KeyValue createMobRefKeyValue(KeyValue kv, byte[] fileName, Tag tableNameTag) {
+ // Append the tags to the KeyValue.
+ // The key is same, the value is the filename of the mob file
+ List existingTags = Tag.asList(kv.getTagsArray(), kv.getTagsOffset(), kv.getTagsLength());
+ existingTags.add(MobConstants.MOB_REF_TAG);
+ // Add the tag of the source table name, this table is where this mob file is flushed
+ // from.
+ // It's very useful in cloning the snapshot. When reading from the cloning table, we need to
+ // find the original mob files by this table name. For details please see cloning
+ // snapshot for mob files.
+ existingTags.add(tableNameTag);
+ long valueLength = kv.getValueLength();
+ byte[] refValue = Bytes.add(Bytes.toBytes(valueLength), fileName);
+ KeyValue reference = new KeyValue(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
+ kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(), kv.getQualifierArray(),
+ kv.getQualifierOffset(), kv.getQualifierLength(), kv.getTimestamp(), KeyValue.Type.Put,
+ refValue, 0, refValue.length, existingTags);
+ reference.setSequenceId(kv.getSequenceId());
+ return reference;
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
index f2a0b062e2c..4afa80c6694 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
@@ -63,6 +63,12 @@ public class DefaultStoreEngine extends StoreEngine<
protected void createComponents(
Configuration conf, Store store, KVComparator kvComparator) throws IOException {
storeFileManager = new DefaultStoreFileManager(kvComparator, conf);
+ createCompactor(conf, store);
+ createCompactionPolicy(conf, store);
+ createStoreFlusher(conf, store);
+ }
+
+ protected void createCompactor(Configuration conf, Store store) throws IOException {
String className = conf.get(DEFAULT_COMPACTOR_CLASS_KEY, DEFAULT_COMPACTOR_CLASS.getName());
try {
compactor = ReflectionUtils.instantiateWithCustomCtor(className,
@@ -70,7 +76,10 @@ public class DefaultStoreEngine extends StoreEngine<
} catch (Exception e) {
throw new IOException("Unable to load configured compactor '" + className + "'", e);
}
- className = conf.get(
+ }
+
+ protected void createCompactionPolicy(Configuration conf, Store store) throws IOException {
+ String className = conf.get(
DEFAULT_COMPACTION_POLICY_CLASS_KEY, DEFAULT_COMPACTION_POLICY_CLASS.getName());
try {
compactionPolicy = ReflectionUtils.instantiateWithCustomCtor(className,
@@ -79,7 +88,10 @@ public class DefaultStoreEngine extends StoreEngine<
} catch (Exception e) {
throw new IOException("Unable to load configured compaction policy '" + className + "'", e);
}
- className = conf.get(
+ }
+
+ protected void createStoreFlusher(Configuration conf, Store store) throws IOException {
+ String className = conf.get(
DEFAULT_STORE_FLUSHER_CLASS_KEY, DEFAULT_STORE_FLUSHER_CLASS.getName());
try {
storeFlusher = ReflectionUtils.instantiateWithCustomCtor(className,
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
new file mode 100644
index 00000000000..6b4e4504628
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
@@ -0,0 +1,268 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.NavigableSet;
+import java.util.UUID;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.mob.MobCacheConfig;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobFile;
+import org.apache.hadoop.hbase.mob.MobFileName;
+import org.apache.hadoop.hbase.mob.MobStoreEngine;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * The store implementation to save MOBs (medium objects), it extends the HStore.
+ * When a descriptor of a column family has the value "is_mob", it means this column family
+ * is a mob one. When a HRegion instantiate a store for this column family, the HMobStore is
+ * created.
+ * HMobStore is almost the same with the HStore except using different types of scanners.
+ * In the method of getScanner, the MobStoreScanner and MobReversedStoreScanner are returned.
+ * In these scanners, a additional seeks in the mob files should be performed after the seek
+ * to HBase is done.
+ * The store implements how we save MOBs by extending HStore. When a descriptor
+ * of a column family has the value "isMob", it means this column family is a mob one. When a
+ * HRegion instantiate a store for this column family, the HMobStore is created. HMobStore is
+ * almost the same with the HStore except using different types of scanners. In the method of
+ * getScanner, the MobStoreScanner and MobReversedStoreScanner are returned. In these scanners, a
+ * additional seeks in the mob files should be performed after the seek in HBase is done.
+ */
+@InterfaceAudience.Private
+public class HMobStore extends HStore {
+
+ private MobCacheConfig mobCacheConfig;
+ private Path homePath;
+ private Path mobFamilyPath;
+
+ public HMobStore(final HRegion region, final HColumnDescriptor family,
+ final Configuration confParam) throws IOException {
+ super(region, family, confParam);
+ this.mobCacheConfig = (MobCacheConfig) cacheConf;
+ this.homePath = MobUtils.getMobHome(conf);
+ this.mobFamilyPath = MobUtils.getMobFamilyPath(conf, this.getTableName(),
+ family.getNameAsString());
+ }
+
+ /**
+ * Creates the mob cache config.
+ */
+ @Override
+ protected void createCacheConf(HColumnDescriptor family) {
+ cacheConf = new MobCacheConfig(conf, family);
+ }
+
+ /**
+ * Gets the MobStoreScanner or MobReversedStoreScanner. In these scanners, a additional seeks in
+ * the mob files should be performed after the seek in HBase is done.
+ */
+ @Override
+ protected KeyValueScanner createScanner(Scan scan, final NavigableSet targetCols,
+ long readPt, KeyValueScanner scanner) throws IOException {
+ if (scanner == null) {
+ scanner = scan.isReversed() ? new ReversedMobStoreScanner(this, getScanInfo(), scan,
+ targetCols, readPt) : new MobStoreScanner(this, getScanInfo(), scan, targetCols, readPt);
+ }
+ return scanner;
+ }
+
+ /**
+ * Creates the mob store engine.
+ */
+ @Override
+ protected StoreEngine, ?, ?, ?> createStoreEngine(Store store, Configuration conf,
+ KVComparator kvComparator) throws IOException {
+ MobStoreEngine engine = new MobStoreEngine();
+ engine.createComponents(conf, store, kvComparator);
+ return engine;
+ }
+
+ /**
+ * Gets the temp directory.
+ * @return The temp directory.
+ */
+ private Path getTempDir() {
+ return new Path(homePath, MobConstants.TEMP_DIR_NAME);
+ }
+
+ /**
+ * Creates the temp directory of mob files for flushing.
+ * @param date The latest date of cells in the flushing.
+ * @param maxKeyCount The key count.
+ * @param compression The compression algorithm.
+ * @param startKey The start key.
+ * @return The writer for the mob file.
+ * @throws IOException
+ */
+ public StoreFile.Writer createWriterInTmp(Date date, long maxKeyCount,
+ Compression.Algorithm compression, byte[] startKey) throws IOException {
+ if (startKey == null) {
+ startKey = HConstants.EMPTY_START_ROW;
+ }
+ Path path = getTempDir();
+ return createWriterInTmp(MobUtils.formatDate(date), path, maxKeyCount, compression, startKey);
+ }
+
+ /**
+ * Creates the temp directory of mob files for flushing.
+ * @param date The date string, its format is yyyymmmdd.
+ * @param basePath The basic path for a temp directory.
+ * @param maxKeyCount The key count.
+ * @param compression The compression algorithm.
+ * @param startKey The start key.
+ * @return The writer for the mob file.
+ * @throws IOException
+ */
+ public StoreFile.Writer createWriterInTmp(String date, Path basePath, long maxKeyCount,
+ Compression.Algorithm compression, byte[] startKey) throws IOException {
+ MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID()
+ .toString().replaceAll("-", ""));
+ final CacheConfig writerCacheConf = mobCacheConfig;
+ HFileContext hFileContext = new HFileContextBuilder().withCompression(compression)
+ .withIncludesMvcc(false).withIncludesTags(true)
+ .withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE)
+ .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
+ .withBlockSize(getFamily().getBlocksize())
+ .withHBaseCheckSum(true).withDataBlockEncoding(getFamily().getDataBlockEncoding()).build();
+
+ StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf, region.getFilesystem())
+ .withFilePath(new Path(basePath, mobFileName.getFileName()))
+ .withComparator(KeyValue.COMPARATOR).withBloomType(BloomType.NONE)
+ .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();
+ return w;
+ }
+
+ /**
+ * Commits the mob file.
+ * @param sourceFile The source file.
+ * @param targetPath The directory path where the source file is renamed to.
+ * @throws IOException
+ */
+ public void commitFile(final Path sourceFile, Path targetPath) throws IOException {
+ if (sourceFile == null) {
+ return;
+ }
+ Path dstPath = new Path(targetPath, sourceFile.getName());
+ validateMobFile(sourceFile);
+ String msg = "Renaming flushed file from " + sourceFile + " to " + dstPath;
+ LOG.info(msg);
+ Path parent = dstPath.getParent();
+ if (!region.getFilesystem().exists(parent)) {
+ region.getFilesystem().mkdirs(parent);
+ }
+ if (!region.getFilesystem().rename(sourceFile, dstPath)) {
+ throw new IOException("Failed rename of " + sourceFile + " to " + dstPath);
+ }
+ }
+
+ /**
+ * Validates a mob file by opening and closing it.
+ *
+ * @param path the path to the mob file
+ */
+ private void validateMobFile(Path path) throws IOException {
+ StoreFile storeFile = null;
+ try {
+ storeFile =
+ new StoreFile(region.getFilesystem(), path, conf, this.mobCacheConfig, BloomType.NONE);
+ storeFile.createReader();
+ } catch (IOException e) {
+ LOG.error("Fail to open mob file[" + path + "], keep it in temp directory.", e);
+ throw e;
+ } finally {
+ if (storeFile != null) {
+ storeFile.closeReader(false);
+ }
+ }
+ }
+
+ /**
+ * Reads the cell from the mob file.
+ * @param reference The cell found in the HBase, its value is a path to a mob file.
+ * @param cacheBlocks Whether the scanner should cache blocks.
+ * @return The cell found in the mob file.
+ * @throws IOException
+ */
+ public Cell resolve(Cell reference, boolean cacheBlocks) throws IOException {
+ Cell result = null;
+ if (reference.getValueLength() > Bytes.SIZEOF_LONG) {
+ String fileName = Bytes.toString(reference.getValueArray(), reference.getValueOffset()
+ + Bytes.SIZEOF_LONG, reference.getValueLength() - Bytes.SIZEOF_LONG);
+ Path targetPath = new Path(mobFamilyPath, fileName);
+ MobFile file = null;
+ try {
+ file = mobCacheConfig.getMobFileCache().openFile(region.getFilesystem(), targetPath,
+ mobCacheConfig);
+ result = file.readCell(reference, cacheBlocks);
+ } catch (IOException e) {
+ LOG.error("Fail to open/read the mob file " + targetPath.toString(), e);
+ } catch (NullPointerException e) {
+ // When delete the file during the scan, the hdfs getBlockRange will
+ // throw NullPointerException, catch it and manage it.
+ LOG.error("Fail to read the mob file " + targetPath.toString()
+ + " since it's already deleted", e);
+ } finally {
+ if (file != null) {
+ mobCacheConfig.getMobFileCache().closeFile(file);
+ }
+ }
+ } else {
+ LOG.warn("Invalid reference to mob, " + reference.getValueLength() + " bytes is too short");
+ }
+
+ if (result == null) {
+ LOG.warn("The KeyValue result is null, assemble a new KeyValue with the same row,family,"
+ + "qualifier,timestamp,type and tags but with an empty value to return.");
+ result = new KeyValue(reference.getRowArray(), reference.getRowOffset(),
+ reference.getRowLength(), reference.getFamilyArray(), reference.getFamilyOffset(),
+ reference.getFamilyLength(), reference.getQualifierArray(),
+ reference.getQualifierOffset(), reference.getQualifierLength(), reference.getTimestamp(),
+ Type.codeToType(reference.getTypeByte()), HConstants.EMPTY_BYTE_ARRAY,
+ 0, 0, reference.getTagsArray(), reference.getTagsOffset(),
+ reference.getTagsLength());
+ }
+ return result;
+ }
+
+ /**
+ * Gets the mob file path.
+ * @return The mob file path.
+ */
+ public Path getPath() {
+ return mobFamilyPath;
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index b879e8a73f6..c87c12bfb8c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -112,6 +112,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
import org.apache.hadoop.hbase.ipc.RpcCallContext;
import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -3552,6 +3553,9 @@ public class HRegion implements HeapSize { // , Writable{
}
protected HStore instantiateHStore(final HColumnDescriptor family) throws IOException {
+ if (MobUtils.isMobFamily(family)) {
+ return new HMobStore(this, family, this.conf);
+ }
return new HStore(this, family, this.conf);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 8d20d7b445b..45edf7f5d2e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.crypto.Cipher;
@@ -132,11 +133,11 @@ public class HStore implements Store {
protected final MemStore memstore;
// This stores directory in the filesystem.
- private final HRegion region;
+ protected final HRegion region;
private final HColumnDescriptor family;
private final HRegionFileSystem fs;
- private final Configuration conf;
- private final CacheConfig cacheConf;
+ protected final Configuration conf;
+ protected CacheConfig cacheConf;
private long lastCompactSize = 0;
volatile boolean forceMajor = false;
/* how many bytes to write between status checks */
@@ -244,7 +245,7 @@ public class HStore implements Store {
this.offPeakHours = OffPeakHours.getInstance(conf);
// Setting up cache configuration for this family
- this.cacheConf = new CacheConfig(conf, family);
+ createCacheConf(family);
this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
@@ -263,7 +264,7 @@ public class HStore implements Store {
"hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */);
}
- this.storeEngine = StoreEngine.create(this, this.conf, this.comparator);
+ this.storeEngine = createStoreEngine(this, this.conf, this.comparator);
this.storeEngine.getStoreFileManager().loadFiles(loadStoreFiles());
// Initialize checksum type from name. The names are CRC32, CRC32C, etc.
@@ -337,6 +338,27 @@ public class HStore implements Store {
}
}
+ /**
+ * Creates the cache config.
+ * @param family The current column family.
+ */
+ protected void createCacheConf(final HColumnDescriptor family) {
+ this.cacheConf = new CacheConfig(conf, family);
+ }
+
+ /**
+ * Creates the store engine configured for the given Store.
+ * @param store The store. An unfortunate dependency needed due to it
+ * being passed to coprocessors via the compactor.
+ * @param conf Store configuration.
+ * @param kvComparator KVComparator for storeFileManager.
+ * @return StoreEngine to use.
+ */
+ protected StoreEngine, ?, ?, ?> createStoreEngine(Store store, Configuration conf,
+ KVComparator kvComparator) throws IOException {
+ return StoreEngine.create(store, conf, comparator);
+ }
+
/**
* @param family
* @return TTL in seconds of the specified family
@@ -1886,17 +1908,23 @@ public class HStore implements Store {
if (this.getCoprocessorHost() != null) {
scanner = this.getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols);
}
- if (scanner == null) {
- scanner = scan.isReversed() ? new ReversedStoreScanner(this,
- getScanInfo(), scan, targetCols, readPt) : new StoreScanner(this,
- getScanInfo(), scan, targetCols, readPt);
- }
+ scanner = createScanner(scan, targetCols, readPt, scanner);
return scanner;
} finally {
lock.readLock().unlock();
}
}
+ protected KeyValueScanner createScanner(Scan scan, final NavigableSet targetCols,
+ long readPt, KeyValueScanner scanner) throws IOException {
+ if (scanner == null) {
+ scanner = scan.isReversed() ? new ReversedStoreScanner(this,
+ getScanInfo(), scan, targetCols, readPt) : new StoreScanner(this,
+ getScanInfo(), scan, targetCols, readPt);
+ }
+ return scanner;
+ }
+
@Override
public String toString() {
return this.getColumnFamilyName();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java
new file mode 100644
index 00000000000..b4bcbe7a029
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java
@@ -0,0 +1,69 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.NavigableSet;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.mob.MobUtils;
+
+/**
+ * Scanner scans both the memstore and the MOB Store. Coalesce KeyValue stream into List
+ * for a single row.
+ *
+ */
+@InterfaceAudience.Private
+public class MobStoreScanner extends StoreScanner {
+
+ private boolean cacheMobBlocks = false;
+
+ public MobStoreScanner(Store store, ScanInfo scanInfo, Scan scan,
+ final NavigableSet columns, long readPt) throws IOException {
+ super(store, scanInfo, scan, columns, readPt);
+ cacheMobBlocks = MobUtils.isCacheMobBlocks(scan);
+ }
+
+ /**
+ * Firstly reads the cells from the HBase. If the cell are a reference cell (which has the
+ * reference tag), the scanner need seek this cell from the mob file, and use the cell found
+ * from the mob file as the result.
+ */
+ @Override
+ public boolean next(List outResult, int limit) throws IOException {
+ boolean result = super.next(outResult, limit);
+ if (!MobUtils.isRawMobScan(scan)) {
+ // retrieve the mob data
+ if (outResult.isEmpty()) {
+ return result;
+ }
+ HMobStore mobStore = (HMobStore) store;
+ for (int i = 0; i < outResult.size(); i++) {
+ Cell cell = outResult.get(i);
+ if (MobUtils.isMobReferenceCell(cell)) {
+ outResult.set(i, mobStore.resolve(cell, cacheMobBlocks));
+ }
+ }
+ }
+ return result;
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java
new file mode 100644
index 00000000000..e384390b20a
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java
@@ -0,0 +1,69 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.NavigableSet;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.mob.MobUtils;
+
+/**
+ * ReversedMobStoreScanner extends from ReversedStoreScanner, and is used to support
+ * reversed scanning in both the memstore and the MOB store.
+ *
+ */
+@InterfaceAudience.Private
+public class ReversedMobStoreScanner extends ReversedStoreScanner {
+
+ private boolean cacheMobBlocks = false;
+
+ ReversedMobStoreScanner(Store store, ScanInfo scanInfo, Scan scan, NavigableSet columns,
+ long readPt) throws IOException {
+ super(store, scanInfo, scan, columns, readPt);
+ cacheMobBlocks = MobUtils.isCacheMobBlocks(scan);
+ }
+
+ /**
+ * Firstly reads the cells from the HBase. If the cell are a reference cell (which has the
+ * reference tag), the scanner need seek this cell from the mob file, and use the cell found
+ * from the mob file as the result.
+ */
+ @Override
+ public boolean next(List outResult, int limit) throws IOException {
+ boolean result = super.next(outResult, limit);
+ if (!MobUtils.isRawMobScan(scan)) {
+ // retrieve the mob data
+ if (outResult.isEmpty()) {
+ return result;
+ }
+ HMobStore mobStore = (HMobStore) store;
+ for (int i = 0; i < outResult.size(); i++) {
+ Cell cell = outResult.get(i);
+ if (MobUtils.isMobReferenceCell(cell)) {
+ outResult.set(i, mobStore.resolve(cell, cacheMobBlocks));
+ }
+ }
+ }
+ return result;
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
index 27c64f0120b..13ded588be4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
@@ -915,7 +915,7 @@ public class StoreFile {
return this.writer.getPath();
}
- boolean hasGeneralBloom() {
+ public boolean hasGeneralBloom() {
return this.generalBloomFilterWriter != null;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobTestUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobTestUtil.java
new file mode 100644
index 00000000000..5e28cd96f5c
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobTestUtil.java
@@ -0,0 +1,86 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assert;
+
+public class MobTestUtil {
+ protected static final char FIRST_CHAR = 'a';
+ protected static final char LAST_CHAR = 'z';
+
+ protected static String generateRandomString(int demoLength) {
+ String base = "abcdefghijklmnopqrstuvwxyz";
+ Random random = new Random();
+ StringBuffer sb = new StringBuffer();
+ for (int i = 0; i < demoLength; i++) {
+ int number = random.nextInt(base.length());
+ sb.append(base.charAt(number));
+ }
+ return sb.toString();
+ }
+ protected static void writeStoreFile(final StoreFile.Writer writer, String caseName)
+ throws IOException {
+ writeStoreFile(writer, Bytes.toBytes(caseName), Bytes.toBytes(caseName));
+ }
+
+ /*
+ * Writes HStoreKey and ImmutableBytes data to passed writer and then closes
+ * it.
+ *
+ * @param writer
+ *
+ * @throws IOException
+ */
+ private static void writeStoreFile(final StoreFile.Writer writer, byte[] fam,
+ byte[] qualifier) throws IOException {
+ long now = System.currentTimeMillis();
+ try {
+ for (char d = FIRST_CHAR; d <= LAST_CHAR; d++) {
+ for (char e = FIRST_CHAR; e <= LAST_CHAR; e++) {
+ byte[] b = new byte[] { (byte) d, (byte) e };
+ writer.append(new KeyValue(b, fam, qualifier, now, b));
+ }
+ }
+ } finally {
+ writer.close();
+ }
+ }
+
+ /**
+ * Compare two KeyValue only for their row family qualifier value
+ */
+ public static void assertKeyValuesEquals(KeyValue firstKeyValue,
+ KeyValue secondKeyValue) {
+ Assert.assertEquals(Bytes.toString(CellUtil.cloneRow(firstKeyValue)),
+ Bytes.toString(CellUtil.cloneRow(secondKeyValue)));
+ Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(firstKeyValue)),
+ Bytes.toString(CellUtil.cloneFamily(secondKeyValue)));
+ Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(firstKeyValue)),
+ Bytes.toString(CellUtil.cloneQualifier(secondKeyValue)));
+ Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(firstKeyValue)),
+ Bytes.toString(CellUtil.cloneValue(secondKeyValue)));
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestCachedMobFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestCachedMobFile.java
new file mode 100644
index 00000000000..b39dd2ae11f
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestCachedMobFile.java
@@ -0,0 +1,154 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestCachedMobFile extends TestCase{
+ static final Log LOG = LogFactory.getLog(TestCachedMobFile.class);
+ private Configuration conf = HBaseConfiguration.create();
+ private CacheConfig cacheConf = new CacheConfig(conf);
+ private final String TABLE = "tableName";
+ private final String FAMILY = "familyName";
+ private final String FAMILY1 = "familyName1";
+ private final String FAMILY2 = "familyName2";
+ private final long EXPECTED_REFERENCE_ZERO = 0;
+ private final long EXPECTED_REFERENCE_ONE = 1;
+ private final long EXPECTED_REFERENCE_TWO = 2;
+
+ @Test
+ public void testOpenClose() throws Exception {
+ String caseName = getName();
+ FileSystem fs = FileSystem.get(conf);
+ Path testDir = FSUtils.getRootDir(conf);
+ Path outputDir = new Path(new Path(testDir, TABLE),
+ FAMILY);
+ HFileContext meta = new HFileContextBuilder().withBlockSize(8*1024).build();
+ StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, fs)
+ .withOutputDir(outputDir).withFileContext(meta).build();
+ MobTestUtil.writeStoreFile(writer, caseName);
+ CachedMobFile cachedMobFile = CachedMobFile.create(fs, writer.getPath(), conf, cacheConf);
+ Assert.assertEquals(EXPECTED_REFERENCE_ZERO, cachedMobFile.getReferenceCount());
+ cachedMobFile.open();
+ Assert.assertEquals(EXPECTED_REFERENCE_ONE, cachedMobFile.getReferenceCount());
+ cachedMobFile.open();
+ Assert.assertEquals(EXPECTED_REFERENCE_TWO, cachedMobFile.getReferenceCount());
+ cachedMobFile.close();
+ Assert.assertEquals(EXPECTED_REFERENCE_ONE, cachedMobFile.getReferenceCount());
+ cachedMobFile.close();
+ Assert.assertEquals(EXPECTED_REFERENCE_ZERO, cachedMobFile.getReferenceCount());
+ }
+
+ @Test
+ public void testCompare() throws Exception {
+ String caseName = getName();
+ FileSystem fs = FileSystem.get(conf);
+ Path testDir = FSUtils.getRootDir(conf);
+ Path outputDir1 = new Path(new Path(testDir, TABLE),
+ FAMILY1);
+ HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
+ StoreFile.Writer writer1 = new StoreFile.WriterBuilder(conf, cacheConf, fs)
+ .withOutputDir(outputDir1).withFileContext(meta).build();
+ MobTestUtil.writeStoreFile(writer1, caseName);
+ CachedMobFile cachedMobFile1 = CachedMobFile.create(fs, writer1.getPath(), conf, cacheConf);
+ Path outputDir2 = new Path(new Path(testDir, TABLE),
+ FAMILY2);
+ StoreFile.Writer writer2 = new StoreFile.WriterBuilder(conf, cacheConf, fs)
+ .withOutputDir(outputDir2)
+ .withFileContext(meta)
+ .build();
+ MobTestUtil.writeStoreFile(writer2, caseName);
+ CachedMobFile cachedMobFile2 = CachedMobFile.create(fs, writer2.getPath(), conf, cacheConf);
+ cachedMobFile1.access(1);
+ cachedMobFile2.access(2);
+ Assert.assertEquals(cachedMobFile1.compareTo(cachedMobFile2), 1);
+ Assert.assertEquals(cachedMobFile2.compareTo(cachedMobFile1), -1);
+ Assert.assertEquals(cachedMobFile1.compareTo(cachedMobFile1), 0);
+ }
+
+ @Test
+ public void testReadKeyValue() throws Exception {
+ FileSystem fs = FileSystem.get(conf);
+ Path testDir = FSUtils.getRootDir(conf);
+ Path outputDir = new Path(new Path(testDir, TABLE), "familyname");
+ HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
+ StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, fs)
+ .withOutputDir(outputDir).withFileContext(meta).build();
+ String caseName = getName();
+ MobTestUtil.writeStoreFile(writer, caseName);
+ CachedMobFile cachedMobFile = CachedMobFile.create(fs, writer.getPath(), conf, cacheConf);
+ byte[] family = Bytes.toBytes(caseName);
+ byte[] qualify = Bytes.toBytes(caseName);
+ // Test the start key
+ byte[] startKey = Bytes.toBytes("aa"); // The start key bytes
+ KeyValue expectedKey =
+ new KeyValue(startKey, family, qualify, Long.MAX_VALUE, Type.Put, startKey);
+ KeyValue seekKey = expectedKey.createKeyOnly(false);
+ KeyValue kv = KeyValueUtil.ensureKeyValue(cachedMobFile.readCell(seekKey, false));
+ MobTestUtil.assertKeyValuesEquals(expectedKey, kv);
+
+ // Test the end key
+ byte[] endKey = Bytes.toBytes("zz"); // The end key bytes
+ expectedKey = new KeyValue(endKey, family, qualify, Long.MAX_VALUE, Type.Put, endKey);
+ seekKey = expectedKey.createKeyOnly(false);
+ kv = KeyValueUtil.ensureKeyValue(cachedMobFile.readCell(seekKey, false));
+ MobTestUtil.assertKeyValuesEquals(expectedKey, kv);
+
+ // Test the random key
+ byte[] randomKey = Bytes.toBytes(MobTestUtil.generateRandomString(2));
+ expectedKey = new KeyValue(randomKey, family, qualify, Long.MAX_VALUE, Type.Put, randomKey);
+ seekKey = expectedKey.createKeyOnly(false);
+ kv = KeyValueUtil.ensureKeyValue(cachedMobFile.readCell(seekKey, false));
+ MobTestUtil.assertKeyValuesEquals(expectedKey, kv);
+
+ // Test the key which is less than the start key
+ byte[] lowerKey = Bytes.toBytes("a1"); // Smaller than "aa"
+ expectedKey = new KeyValue(startKey, family, qualify, Long.MAX_VALUE, Type.Put, startKey);
+ seekKey = new KeyValue(lowerKey, family, qualify, Long.MAX_VALUE, Type.Put, lowerKey);
+ kv = KeyValueUtil.ensureKeyValue(cachedMobFile.readCell(seekKey, false));
+ MobTestUtil.assertKeyValuesEquals(expectedKey, kv);
+
+ // Test the key which is more than the end key
+ byte[] upperKey = Bytes.toBytes("z{"); // Bigger than "zz"
+ seekKey = new KeyValue(upperKey, family, qualify, Long.MAX_VALUE, Type.Put, upperKey);
+ kv = KeyValueUtil.ensureKeyValue(cachedMobFile.readCell(seekKey, false));
+ Assert.assertNull(kv);
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestDefaultMobStoreFlusher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestDefaultMobStoreFlusher.java
new file mode 100644
index 00000000000..5d85718ee43
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestDefaultMobStoreFlusher.java
@@ -0,0 +1,193 @@
+/**
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.hbase.mob;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(LargeTests.class)
+public class TestDefaultMobStoreFlusher {
+
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private final static byte [] row1 = Bytes.toBytes("row1");
+ private final static byte [] row2 = Bytes.toBytes("row2");
+ private final static byte [] family = Bytes.toBytes("family");
+ private final static byte [] qf1 = Bytes.toBytes("qf1");
+ private final static byte [] qf2 = Bytes.toBytes("qf2");
+ private final static byte [] value1 = Bytes.toBytes("value1");
+ private final static byte [] value2 = Bytes.toBytes("value2");
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
+ TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
+
+ TEST_UTIL.startMiniCluster(1);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testFlushNonMobFile() throws InterruptedException {
+ String TN = "testFlushNonMobFile";
+ HTable table = null;
+ HBaseAdmin admin = null;
+
+ try {
+ HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(TN));
+ HColumnDescriptor hcd = new HColumnDescriptor(family);
+ hcd.setMaxVersions(4);
+ desc.addFamily(hcd);
+
+ admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
+ admin.createTable(desc);
+ table = new HTable(TEST_UTIL.getConfiguration(), TN);
+
+ //Put data
+ Put put0 = new Put(row1);
+ put0.add(family, qf1, 1, value1);
+ table.put(put0);
+
+ //Put more data
+ Put put1 = new Put(row2);
+ put1.add(family, qf2, 1, value2);
+ table.put(put1);
+
+ //Flush
+ table.flushCommits();
+ admin.flush(TN);
+
+ Scan scan = new Scan();
+ scan.addColumn(family, qf1);
+ scan.setMaxVersions(4);
+ ResultScanner scanner = table.getScanner(scan);
+
+ //Compare
+ Result result = scanner.next();
+ int size = 0;
+ while (result != null) {
+ size++;
+ List cells = result.getColumnCells(family, qf1);
+ // Verify the cell size
+ Assert.assertEquals(1, cells.size());
+ // Verify the value
+ Assert.assertEquals(Bytes.toString(value1),
+ Bytes.toString(CellUtil.cloneValue(cells.get(0))));
+ result = scanner.next();
+ }
+ scanner.close();
+ Assert.assertEquals(1, size);
+ admin.close();
+ } catch (MasterNotRunningException e1) {
+ e1.printStackTrace();
+ } catch (ZooKeeperConnectionException e2) {
+ e2.printStackTrace();
+ } catch (IOException e3) {
+ e3.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testFlushMobFile() throws InterruptedException {
+ String TN = "testFlushMobFile";
+ HTable table = null;
+ HBaseAdmin admin = null;
+
+ try {
+ HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(TN));
+ HColumnDescriptor hcd = new HColumnDescriptor(family);
+ hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
+ hcd.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(3L));
+ hcd.setMaxVersions(4);
+ desc.addFamily(hcd);
+
+ admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
+ admin.createTable(desc);
+ table = new HTable(TEST_UTIL.getConfiguration(), TN);
+
+ //put data
+ Put put0 = new Put(row1);
+ put0.add(family, qf1, 1, value1);
+ table.put(put0);
+
+ //put more data
+ Put put1 = new Put(row2);
+ put1.add(family, qf2, 1, value2);
+ table.put(put1);
+
+ //flush
+ table.flushCommits();
+ admin.flush(TN);
+
+ //Scan
+ Scan scan = new Scan();
+ scan.addColumn(family, qf1);
+ scan.setMaxVersions(4);
+ ResultScanner scanner = table.getScanner(scan);
+
+ //Compare
+ Result result = scanner.next();
+ int size = 0;
+ while (result != null) {
+ size++;
+ List cells = result.getColumnCells(family, qf1);
+ // Verify the the cell size
+ Assert.assertEquals(1, cells.size());
+ // Verify the value
+ Assert.assertEquals(Bytes.toString(value1),
+ Bytes.toString(CellUtil.cloneValue(cells.get(0))));
+ result = scanner.next();
+ }
+ scanner.close();
+ Assert.assertEquals(1, size);
+ admin.close();
+ } catch (MasterNotRunningException e1) {
+ e1.printStackTrace();
+ } catch (ZooKeeperConnectionException e2) {
+ e2.printStackTrace();
+ } catch (IOException e3) {
+ e3.printStackTrace();
+ }
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobDataBlockEncoding.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobDataBlockEncoding.java
new file mode 100644
index 00000000000..ae10aad6204
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobDataBlockEncoding.java
@@ -0,0 +1,141 @@
+/**
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.hbase.mob;
+
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(MediumTests.class)
+public class TestMobDataBlockEncoding {
+
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private final static byte [] row1 = Bytes.toBytes("row1");
+ private final static byte [] family = Bytes.toBytes("family");
+ private final static byte [] qf1 = Bytes.toBytes("qualifier1");
+ private final static byte [] qf2 = Bytes.toBytes("qualifier2");
+ protected final byte[] qf3 = Bytes.toBytes("qualifier3");
+ private static HTable table;
+ private static HBaseAdmin admin;
+ private static HColumnDescriptor hcd;
+ private static HTableDescriptor desc;
+ private static Random random = new Random();
+ private static long defaultThreshold = 10;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
+ TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
+
+ TEST_UTIL.startMiniCluster(1);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ public void setUp(long threshold, String TN, DataBlockEncoding encoding)
+ throws Exception {
+ desc = new HTableDescriptor(TableName.valueOf(TN));
+ hcd = new HColumnDescriptor(family);
+ hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
+ hcd.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(threshold));
+ hcd.setMaxVersions(4);
+ hcd.setDataBlockEncoding(encoding);
+ desc.addFamily(hcd);
+ admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
+ admin.createTable(desc);
+ table = new HTable(TEST_UTIL.getConfiguration(), TN);
+ }
+
+ /**
+ * Generate the mob value.
+ *
+ * @param size the size of the value
+ * @return the mob value generated
+ */
+ private static byte[] generateMobValue(int size) {
+ byte[] mobVal = new byte[size];
+ random.nextBytes(mobVal);
+ return mobVal;
+ }
+
+ @Test
+ public void testDataBlockEncoding() throws Exception {
+ for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
+ testDataBlockEncoding(encoding);
+ }
+ }
+
+ public void testDataBlockEncoding(DataBlockEncoding encoding) throws Exception {
+ String TN = "testDataBlockEncoding" + encoding;
+ setUp(defaultThreshold, TN, encoding);
+ long ts1 = System.currentTimeMillis();
+ long ts2 = ts1 + 1;
+ long ts3 = ts1 + 2;
+ byte[] value = generateMobValue((int) defaultThreshold + 1);
+
+ Put put1 = new Put(row1);
+ put1.add(family, qf1, ts3, value);
+ put1.add(family, qf2, ts2, value);
+ put1.add(family, qf3, ts1, value);
+ table.put(put1);
+
+ table.flushCommits();
+ admin.flush(TN);
+
+ Scan scan = new Scan();
+ scan.setMaxVersions(4);
+
+ ResultScanner results = table.getScanner(scan);
+ int count = 0;
+ for (Result res : results) {
+ List cells = res.listCells();
+ for(Cell cell : cells) {
+ // Verify the value
+ Assert.assertEquals(Bytes.toString(value),
+ Bytes.toString(CellUtil.cloneValue(cell)));
+ count++;
+ }
+ }
+ results.close();
+ Assert.assertEquals(3, count);
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFile.java
new file mode 100644
index 00000000000..f6511f71f25
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFile.java
@@ -0,0 +1,124 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestMobFile extends TestCase {
+ static final Log LOG = LogFactory.getLog(TestMobFile.class);
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private Configuration conf = TEST_UTIL.getConfiguration();
+ private CacheConfig cacheConf = new CacheConfig(conf);
+ private final String TABLE = "tableName";
+ private final String FAMILY = "familyName";
+
+ @Test
+ public void testReadKeyValue() throws Exception {
+ FileSystem fs = FileSystem.get(conf);
+ Path testDir = FSUtils.getRootDir(conf);
+ Path outputDir = new Path(new Path(testDir, TABLE), FAMILY);
+ HFileContext meta = new HFileContextBuilder().withBlockSize(8*1024).build();
+ StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, fs)
+ .withOutputDir(outputDir)
+ .withFileContext(meta)
+ .build();
+ String caseName = getName();
+ MobTestUtil.writeStoreFile(writer, caseName);
+
+ MobFile mobFile = new MobFile(new StoreFile(fs, writer.getPath(),
+ conf, cacheConf, BloomType.NONE));
+ byte[] family = Bytes.toBytes(caseName);
+ byte[] qualify = Bytes.toBytes(caseName);
+
+ // Test the start key
+ byte[] startKey = Bytes.toBytes("aa"); // The start key bytes
+ KeyValue expectedKey =
+ new KeyValue(startKey, family, qualify, Long.MAX_VALUE, Type.Put, startKey);
+ KeyValue seekKey = expectedKey.createKeyOnly(false);
+ KeyValue kv = KeyValueUtil.ensureKeyValue(mobFile.readCell(seekKey, false));
+ MobTestUtil.assertKeyValuesEquals(expectedKey, kv);
+
+ // Test the end key
+ byte[] endKey = Bytes.toBytes("zz"); // The end key bytes
+ expectedKey = new KeyValue(endKey, family, qualify, Long.MAX_VALUE, Type.Put, endKey);
+ seekKey = expectedKey.createKeyOnly(false);
+ kv = KeyValueUtil.ensureKeyValue(mobFile.readCell(seekKey, false));
+ MobTestUtil.assertKeyValuesEquals(expectedKey, kv);
+
+ // Test the random key
+ byte[] randomKey = Bytes.toBytes(MobTestUtil.generateRandomString(2));
+ expectedKey = new KeyValue(randomKey, family, qualify, Long.MAX_VALUE, Type.Put, randomKey);
+ seekKey = expectedKey.createKeyOnly(false);
+ kv = KeyValueUtil.ensureKeyValue(mobFile.readCell(seekKey, false));
+ MobTestUtil.assertKeyValuesEquals(expectedKey, kv);
+
+ // Test the key which is less than the start key
+ byte[] lowerKey = Bytes.toBytes("a1"); // Smaller than "aa"
+ expectedKey = new KeyValue(startKey, family, qualify, Long.MAX_VALUE, Type.Put, startKey);
+ seekKey = new KeyValue(lowerKey, family, qualify, Long.MAX_VALUE, Type.Put, lowerKey);
+ kv = KeyValueUtil.ensureKeyValue(mobFile.readCell(seekKey, false));
+ MobTestUtil.assertKeyValuesEquals(expectedKey, kv);
+
+ // Test the key which is more than the end key
+ byte[] upperKey = Bytes.toBytes("z{"); // Bigger than "zz"
+ seekKey = new KeyValue(upperKey, family, qualify, Long.MAX_VALUE, Type.Put, upperKey);
+ kv = KeyValueUtil.ensureKeyValue(mobFile.readCell(seekKey, false));
+ assertNull(kv);
+ }
+
+ @Test
+ public void testGetScanner() throws Exception {
+ FileSystem fs = FileSystem.get(conf);
+ Path testDir = FSUtils.getRootDir(conf);
+ Path outputDir = new Path(new Path(testDir, TABLE), FAMILY);
+ HFileContext meta = new HFileContextBuilder().withBlockSize(8*1024).build();
+ StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, fs)
+ .withOutputDir(outputDir)
+ .withFileContext(meta)
+ .build();
+ MobTestUtil.writeStoreFile(writer, getName());
+
+ MobFile mobFile = new MobFile(new StoreFile(fs, writer.getPath(),
+ conf, cacheConf, BloomType.NONE));
+ assertNotNull(mobFile.getScanner());
+ assertTrue(mobFile.getScanner() instanceof StoreFileScanner);
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCache.java
new file mode 100644
index 00000000000..9478544cbe9
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCache.java
@@ -0,0 +1,206 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import java.io.IOException;
+import java.util.Date;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.HMobStore;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestMobFileCache extends TestCase {
+ static final Log LOG = LogFactory.getLog(TestMobFileCache.class);
+ private HBaseTestingUtility UTIL;
+ private HRegion region;
+ private Configuration conf;
+ private MobCacheConfig mobCacheConf;
+ private MobFileCache mobFileCache;
+ private Date currentDate = new Date();
+ private final String TEST_CACHE_SIZE = "2";
+ private final int EXPECTED_CACHE_SIZE_ZERO = 0;
+ private final int EXPECTED_CACHE_SIZE_ONE = 1;
+ private final int EXPECTED_CACHE_SIZE_TWO = 2;
+ private final int EXPECTED_CACHE_SIZE_THREE = 3;
+ private final long EXPECTED_REFERENCE_ONE = 1;
+ private final long EXPECTED_REFERENCE_TWO = 2;
+
+ private final String TABLE = "tableName";
+ private final String FAMILY1 = "family1";
+ private final String FAMILY2 = "family2";
+ private final String FAMILY3 = "family3";
+
+ private final byte[] ROW = Bytes.toBytes("row");
+ private final byte[] ROW2 = Bytes.toBytes("row2");
+ private final byte[] VALUE = Bytes.toBytes("value");
+ private final byte[] VALUE2 = Bytes.toBytes("value2");
+ private final byte[] QF1 = Bytes.toBytes("qf1");
+ private final byte[] QF2 = Bytes.toBytes("qf2");
+ private final byte[] QF3 = Bytes.toBytes("qf3");
+
+ @Override
+ public void setUp() throws Exception {
+ UTIL = HBaseTestingUtility.createLocalHTU();
+ conf = UTIL.getConfiguration();
+ HTableDescriptor htd = UTIL.createTableDescriptor("testMobFileCache");
+ HColumnDescriptor hcd1 = new HColumnDescriptor(FAMILY1);
+ hcd1.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
+ hcd1.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(0L));
+ HColumnDescriptor hcd2 = new HColumnDescriptor(FAMILY2);
+ hcd2.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
+ hcd2.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(0L));
+ HColumnDescriptor hcd3 = new HColumnDescriptor(FAMILY3);
+ hcd3.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
+ hcd3.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(0L));
+ htd.addFamily(hcd1);
+ htd.addFamily(hcd2);
+ htd.addFamily(hcd3);
+ region = UTIL.createLocalHRegion(htd, null, null);
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ region.close();
+ region.getFilesystem().delete(UTIL.getDataTestDir(), true);
+ }
+
+ /**
+ * Create the mob store file.
+ * @param family
+ */
+ private Path createMobStoreFile(String family) throws IOException {
+ return createMobStoreFile(HBaseConfiguration.create(), family);
+ }
+
+ /**
+ * Create the mob store file
+ * @param conf
+ * @param family
+ */
+ private Path createMobStoreFile(Configuration conf, String family) throws IOException {
+ HColumnDescriptor hcd = new HColumnDescriptor(family);
+ hcd.setMaxVersions(4);
+ hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
+ mobCacheConf = new MobCacheConfig(conf, hcd);
+ return createMobStoreFile(conf, hcd);
+ }
+
+ /**
+ * Create the mob store file
+ * @param conf
+ * @param hcd
+ */
+ private Path createMobStoreFile(Configuration conf, HColumnDescriptor hcd)
+ throws IOException {
+ // Setting up a Store
+ HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TABLE));
+ htd.addFamily(hcd);
+ HMobStore mobStore = (HMobStore) region.getStore(hcd.getName());
+ KeyValue key1 = new KeyValue(ROW, hcd.getName(), QF1, 1, VALUE);
+ KeyValue key2 = new KeyValue(ROW, hcd.getName(), QF2, 1, VALUE);
+ KeyValue key3 = new KeyValue(ROW2, hcd.getName(), QF3, 1, VALUE2);
+ KeyValue[] keys = new KeyValue[] { key1, key2, key3 };
+ int maxKeyCount = keys.length;
+ HRegionInfo regionInfo = new HRegionInfo();
+ StoreFile.Writer mobWriter = mobStore.createWriterInTmp(currentDate,
+ maxKeyCount, hcd.getCompactionCompression(), regionInfo.getStartKey());
+ Path mobFilePath = mobWriter.getPath();
+ String fileName = mobFilePath.getName();
+ mobWriter.append(key1);
+ mobWriter.append(key2);
+ mobWriter.append(key3);
+ mobWriter.close();
+ String targetPathName = MobUtils.formatDate(currentDate);
+ Path targetPath = new Path(mobStore.getPath(), targetPathName);
+ mobStore.commitFile(mobFilePath, targetPath);
+ return new Path(targetPath, fileName);
+ }
+
+ @Test
+ public void testMobFileCache() throws Exception {
+ FileSystem fs = FileSystem.get(conf);
+ conf.set(MobConstants.MOB_FILE_CACHE_SIZE_KEY, TEST_CACHE_SIZE);
+ mobFileCache = new MobFileCache(conf);
+ Path file1Path = createMobStoreFile(FAMILY1);
+ Path file2Path = createMobStoreFile(FAMILY2);
+ Path file3Path = createMobStoreFile(FAMILY3);
+
+ // Before open one file by the MobFileCache
+ assertEquals(EXPECTED_CACHE_SIZE_ZERO, mobFileCache.getCacheSize());
+ // Open one file by the MobFileCache
+ CachedMobFile cachedMobFile1 = (CachedMobFile) mobFileCache.openFile(
+ fs, file1Path, mobCacheConf);
+ assertEquals(EXPECTED_CACHE_SIZE_ONE, mobFileCache.getCacheSize());
+ assertNotNull(cachedMobFile1);
+ assertEquals(EXPECTED_REFERENCE_TWO, cachedMobFile1.getReferenceCount());
+
+ // The evict is also managed by a schedule thread pool.
+ // And its check period is set as 3600 seconds by default.
+ // This evict should get the lock at the most time
+ mobFileCache.evict(); // Cache not full, evict it
+ assertEquals(EXPECTED_CACHE_SIZE_ONE, mobFileCache.getCacheSize());
+ assertEquals(EXPECTED_REFERENCE_TWO, cachedMobFile1.getReferenceCount());
+
+ mobFileCache.evictFile(file1Path.getName()); // Evict one file
+ assertEquals(EXPECTED_CACHE_SIZE_ZERO, mobFileCache.getCacheSize());
+ assertEquals(EXPECTED_REFERENCE_ONE, cachedMobFile1.getReferenceCount());
+
+ cachedMobFile1.close(); // Close the cached mob file
+
+ // Reopen three cached file
+ cachedMobFile1 = (CachedMobFile) mobFileCache.openFile(
+ fs, file1Path, mobCacheConf);
+ assertEquals(EXPECTED_CACHE_SIZE_ONE, mobFileCache.getCacheSize());
+ CachedMobFile cachedMobFile2 = (CachedMobFile) mobFileCache.openFile(
+ fs, file2Path, mobCacheConf);
+ assertEquals(EXPECTED_CACHE_SIZE_TWO, mobFileCache.getCacheSize());
+ CachedMobFile cachedMobFile3 = (CachedMobFile) mobFileCache.openFile(
+ fs, file3Path, mobCacheConf);
+ // Before the evict
+ // Evict the cache, should clost the first file 1
+ assertEquals(EXPECTED_CACHE_SIZE_THREE, mobFileCache.getCacheSize());
+ assertEquals(EXPECTED_REFERENCE_TWO, cachedMobFile1.getReferenceCount());
+ assertEquals(EXPECTED_REFERENCE_TWO, cachedMobFile2.getReferenceCount());
+ assertEquals(EXPECTED_REFERENCE_TWO, cachedMobFile3.getReferenceCount());
+ mobFileCache.evict();
+ assertEquals(EXPECTED_CACHE_SIZE_ONE, mobFileCache.getCacheSize());
+ assertEquals(EXPECTED_REFERENCE_ONE, cachedMobFile1.getReferenceCount());
+ assertEquals(EXPECTED_REFERENCE_ONE, cachedMobFile2.getReferenceCount());
+ assertEquals(EXPECTED_REFERENCE_TWO, cachedMobFile3.getReferenceCount());
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileName.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileName.java
new file mode 100644
index 00000000000..9a6cf7f879f
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileName.java
@@ -0,0 +1,79 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import java.util.Date;
+import java.util.Random;
+import java.util.UUID;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.MD5Hash;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestMobFileName extends TestCase {
+
+ private String uuid;
+ private Date date;
+ private String dateStr;
+ private byte[] startKey;
+
+ public void setUp() {
+ Random random = new Random();
+ uuid = UUID.randomUUID().toString().replaceAll("-", "");
+ date = new Date();
+ dateStr = MobUtils.formatDate(date);
+ startKey = Bytes.toBytes(random.nextInt());
+ }
+
+ @Test
+ public void testHashCode() {
+ assertEquals(MobFileName.create(startKey, dateStr, uuid).hashCode(),
+ MobFileName.create(startKey, dateStr, uuid).hashCode());
+ assertNotSame(MobFileName.create(startKey, dateStr, uuid).hashCode(),
+ MobFileName.create(startKey, dateStr, uuid).hashCode());
+ }
+
+ @Test
+ public void testCreate() {
+ MobFileName mobFileName = MobFileName.create(startKey, dateStr, uuid);
+ assertEquals(mobFileName, MobFileName.create(mobFileName.getFileName()));
+ }
+
+ @Test
+ public void testGet() {
+ MobFileName mobFileName = MobFileName.create(startKey, dateStr, uuid);
+ assertEquals(MD5Hash.getMD5AsHex(startKey, 0, startKey.length), mobFileName.getStartKey());
+ assertEquals(dateStr, mobFileName.getDate());
+ assertEquals(mobFileName.getFileName(), MD5Hash.getMD5AsHex(startKey, 0, startKey.length)
+ + dateStr + uuid);
+ }
+
+ @Test
+ public void testEquals() {
+ MobFileName mobFileName = MobFileName.create(startKey, dateStr, uuid);
+ assertTrue(mobFileName.equals(mobFileName));
+ assertFalse(mobFileName.equals(this));
+ assertTrue(mobFileName.equals(MobFileName.create(startKey, dateStr, uuid)));
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDeleteMobTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDeleteMobTable.java
new file mode 100644
index 00000000000..ed4b703486e
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDeleteMobTable.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.HFileArchiveUtil;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(MediumTests.class)
+public class TestDeleteMobTable {
+
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private final static byte[] FAMILY = Bytes.toBytes("family");
+ private final static byte[] QF = Bytes.toBytes("qualifier");
+ private static Random random = new Random();
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
+ TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
+ TEST_UTIL.startMiniCluster(1);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ /**
+ * Generate the mob value.
+ *
+ * @param size
+ * the size of the value
+ * @return the mob value generated
+ */
+ private static byte[] generateMobValue(int size) {
+ byte[] mobVal = new byte[size];
+ random.nextBytes(mobVal);
+ return mobVal;
+ }
+
+ @Test
+ public void testDeleteMobTable() throws Exception {
+ byte[] tableName = Bytes.toBytes("testDeleteMobTable");
+ TableName tn = TableName.valueOf(tableName);
+ HTableDescriptor htd = new HTableDescriptor(tn);
+ HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
+ hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
+ hcd.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(0L));
+ htd.addFamily(hcd);
+ HBaseAdmin admin = null;
+ HTable table = null;
+ try {
+ admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
+ admin.createTable(htd);
+ table = new HTable(TEST_UTIL.getConfiguration(), tableName);
+ byte[] value = generateMobValue(10);
+
+ byte[] row = Bytes.toBytes("row");
+ Put put = new Put(row);
+ put.add(FAMILY, QF, EnvironmentEdgeManager.currentTime(), value);
+ table.put(put);
+
+ table.flushCommits();
+ admin.flush(tableName);
+
+ // the mob file exists
+ Assert.assertEquals(1, countMobFiles(tn, hcd.getNameAsString()));
+ Assert.assertEquals(0, countArchiveMobFiles(tn, hcd.getNameAsString()));
+ String fileName = assertHasOneMobRow(table, tn, hcd.getNameAsString());
+ Assert.assertFalse(mobArchiveExist(tn, hcd.getNameAsString(), fileName));
+ Assert.assertTrue(mobTableDirExist(tn));
+ table.close();
+
+ admin.disableTable(tn);
+ admin.deleteTable(tn);
+
+ Assert.assertFalse(admin.tableExists(tn));
+ Assert.assertEquals(0, countMobFiles(tn, hcd.getNameAsString()));
+ Assert.assertEquals(1, countArchiveMobFiles(tn, hcd.getNameAsString()));
+ Assert.assertTrue(mobArchiveExist(tn, hcd.getNameAsString(), fileName));
+ Assert.assertFalse(mobTableDirExist(tn));
+ } finally {
+ if (admin != null) {
+ admin.close();
+ }
+ }
+ }
+
+ @Test
+ public void testDeleteNonMobTable() throws Exception {
+ byte[] tableName = Bytes.toBytes("testDeleteNonMobTable");
+ TableName tn = TableName.valueOf(tableName);
+ HTableDescriptor htd = new HTableDescriptor(tn);
+ HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
+ htd.addFamily(hcd);
+ HBaseAdmin admin = null;
+ HTable table = null;
+ try {
+ admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
+ admin.createTable(htd);
+ table = new HTable(TEST_UTIL.getConfiguration(), tableName);
+ byte[] value = generateMobValue(10);
+
+ byte[] row = Bytes.toBytes("row");
+ Put put = new Put(row);
+ put.add(FAMILY, QF, EnvironmentEdgeManager.currentTime(), value);
+ table.put(put);
+
+ table.flushCommits();
+ admin.flush(tableName);
+ table.close();
+
+ // the mob file doesn't exist
+ Assert.assertEquals(0, countMobFiles(tn, hcd.getNameAsString()));
+ Assert.assertEquals(0, countArchiveMobFiles(tn, hcd.getNameAsString()));
+ Assert.assertFalse(mobTableDirExist(tn));
+
+ admin.disableTable(tn);
+ admin.deleteTable(tn);
+
+ Assert.assertFalse(admin.tableExists(tn));
+ Assert.assertEquals(0, countMobFiles(tn, hcd.getNameAsString()));
+ Assert.assertEquals(0, countArchiveMobFiles(tn, hcd.getNameAsString()));
+ Assert.assertFalse(mobTableDirExist(tn));
+ } finally {
+ if (admin != null) {
+ admin.close();
+ }
+ }
+ }
+
+ private int countMobFiles(TableName tn, String familyName) throws IOException {
+ FileSystem fs = TEST_UTIL.getTestFileSystem();
+ Path mobFileDir = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tn, familyName);
+ if (fs.exists(mobFileDir)) {
+ return fs.listStatus(mobFileDir).length;
+ } else {
+ return 0;
+ }
+ }
+
+ private int countArchiveMobFiles(TableName tn, String familyName)
+ throws IOException {
+ FileSystem fs = TEST_UTIL.getTestFileSystem();
+ Path storePath = HFileArchiveUtil.getStoreArchivePath(TEST_UTIL.getConfiguration(), tn,
+ MobUtils.getMobRegionInfo(tn).getEncodedName(), familyName);
+ if (fs.exists(storePath)) {
+ return fs.listStatus(storePath).length;
+ } else {
+ return 0;
+ }
+ }
+
+ private boolean mobTableDirExist(TableName tn) throws IOException {
+ FileSystem fs = TEST_UTIL.getTestFileSystem();
+ Path tableDir = FSUtils.getTableDir(MobUtils.getMobHome(TEST_UTIL.getConfiguration()), tn);
+ return fs.exists(tableDir);
+ }
+
+ private boolean mobArchiveExist(TableName tn, String familyName, String fileName)
+ throws IOException {
+ FileSystem fs = TEST_UTIL.getTestFileSystem();
+ Path storePath = HFileArchiveUtil.getStoreArchivePath(TEST_UTIL.getConfiguration(), tn,
+ MobUtils.getMobRegionInfo(tn).getEncodedName(), familyName);
+ return fs.exists(new Path(storePath, fileName));
+ }
+
+ private String assertHasOneMobRow(HTable table, TableName tn, String familyName)
+ throws IOException {
+ Scan scan = new Scan();
+ scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
+ ResultScanner rs = table.getScanner(scan);
+ Result r = rs.next();
+ Assert.assertNotNull(r);
+ byte[] value = r.getValue(FAMILY, QF);
+ String fileName = Bytes.toString(value, Bytes.SIZEOF_LONG, value.length - Bytes.SIZEOF_LONG);
+ Path filePath = new Path(
+ MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tn, familyName), fileName);
+ FileSystem fs = TEST_UTIL.getTestFileSystem();
+ Assert.assertTrue(fs.exists(filePath));
+ r = rs.next();
+ Assert.assertNull(r);
+ return fileName;
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
new file mode 100644
index 00000000000..c253611a677
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java
@@ -0,0 +1,471 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.mockito.Mockito;
+
+@Category(MediumTests.class)
+public class TestHMobStore {
+ public static final Log LOG = LogFactory.getLog(TestHMobStore.class);
+ @Rule public TestName name = new TestName();
+
+ private HMobStore store;
+ private HRegion region;
+ private HColumnDescriptor hcd;
+ private FileSystem fs;
+ private byte [] table = Bytes.toBytes("table");
+ private byte [] family = Bytes.toBytes("family");
+ private byte [] row = Bytes.toBytes("row");
+ private byte [] row2 = Bytes.toBytes("row2");
+ private byte [] qf1 = Bytes.toBytes("qf1");
+ private byte [] qf2 = Bytes.toBytes("qf2");
+ private byte [] qf3 = Bytes.toBytes("qf3");
+ private byte [] qf4 = Bytes.toBytes("qf4");
+ private byte [] qf5 = Bytes.toBytes("qf5");
+ private byte [] qf6 = Bytes.toBytes("qf6");
+ private byte[] value = Bytes.toBytes("value");
+ private byte[] value2 = Bytes.toBytes("value2");
+ private Path mobFilePath;
+ private Date currentDate = new Date();
+ private KeyValue seekKey1;
+ private KeyValue seekKey2;
+ private KeyValue seekKey3;
+ private NavigableSet qualifiers =
+ new ConcurrentSkipListSet(Bytes.BYTES_COMPARATOR);
+ private List expected = new ArrayList();
+ private long id = System.currentTimeMillis();
+ private Get get = new Get(row);
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private final String DIR = TEST_UTIL.getDataTestDir("TestHMobStore").toString();
+
+ /**
+ * Setup
+ * @throws Exception
+ */
+ @Before
+ public void setUp() throws Exception {
+ qualifiers.add(qf1);
+ qualifiers.add(qf3);
+ qualifiers.add(qf5);
+
+ Iterator iter = qualifiers.iterator();
+ while(iter.hasNext()){
+ byte [] next = iter.next();
+ expected.add(new KeyValue(row, family, next, 1, value));
+ get.addColumn(family, next);
+ get.setMaxVersions(); // all versions.
+ }
+ }
+
+ private void init(String methodName, Configuration conf, boolean testStore)
+ throws IOException {
+ hcd = new HColumnDescriptor(family);
+ hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
+ hcd.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(3L));
+ hcd.setMaxVersions(4);
+ init(methodName, conf, hcd, testStore);
+ }
+
+ private void init(String methodName, Configuration conf,
+ HColumnDescriptor hcd, boolean testStore) throws IOException {
+ HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
+ init(methodName, conf, htd, hcd, testStore);
+ }
+
+ private void init(String methodName, Configuration conf, HTableDescriptor htd,
+ HColumnDescriptor hcd, boolean testStore) throws IOException {
+ //Setting up tje Region and Store
+ Path basedir = new Path(DIR+methodName);
+ Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName());
+ String logName = "logs";
+ Path logdir = new Path(basedir, logName);
+ FileSystem fs = FileSystem.get(conf);
+ fs.delete(logdir, true);
+
+ htd.addFamily(hcd);
+ HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
+ HLog hlog = HLogFactory.createHLog(fs, basedir, logName, conf);
+ region = new HRegion(tableDir, hlog, fs, conf, info, htd, null);
+ store = new HMobStore(region, hcd, conf);
+ if(testStore) {
+ init(conf, hcd);
+ }
+ }
+
+ private void init(Configuration conf, HColumnDescriptor hcd)
+ throws IOException {
+ Path basedir = FSUtils.getRootDir(conf);
+ fs = FileSystem.get(conf);
+ Path homePath = new Path(basedir, Bytes.toString(family) + Path.SEPARATOR
+ + Bytes.toString(family));
+ fs.mkdirs(homePath);
+
+ KeyValue key1 = new KeyValue(row, family, qf1, 1, value);
+ KeyValue key2 = new KeyValue(row, family, qf2, 1, value);
+ KeyValue key3 = new KeyValue(row2, family, qf3, 1, value2);
+ KeyValue[] keys = new KeyValue[] { key1, key2, key3 };
+ int maxKeyCount = keys.length;
+ StoreFile.Writer mobWriter = store.createWriterInTmp(currentDate,
+ maxKeyCount, hcd.getCompactionCompression(), region.getStartKey());
+ mobFilePath = mobWriter.getPath();
+
+ mobWriter.append(key1);
+ mobWriter.append(key2);
+ mobWriter.append(key3);
+ mobWriter.close();
+
+ long valueLength1 = key1.getValueLength();
+ long valueLength2 = key2.getValueLength();
+ long valueLength3 = key3.getValueLength();
+
+ String targetPathName = MobUtils.formatDate(currentDate);
+ byte[] referenceValue =
+ Bytes.toBytes(targetPathName + Path.SEPARATOR
+ + mobFilePath.getName());
+ byte[] newReferenceValue1 = Bytes.add(Bytes.toBytes(valueLength1), referenceValue);
+ byte[] newReferenceValue2 = Bytes.add(Bytes.toBytes(valueLength2), referenceValue);
+ byte[] newReferenceValue3 = Bytes.add(Bytes.toBytes(valueLength3), referenceValue);
+ seekKey1 = new KeyValue(row, family, qf1, Long.MAX_VALUE, newReferenceValue1);
+ seekKey2 = new KeyValue(row, family, qf2, Long.MAX_VALUE, newReferenceValue2);
+ seekKey3 = new KeyValue(row2, family, qf3, Long.MAX_VALUE, newReferenceValue3);
+ }
+
+ /**
+ * Getting data from memstore
+ * @throws IOException
+ */
+ @Test
+ public void testGetFromMemStore() throws IOException {
+ final Configuration conf = HBaseConfiguration.create();
+ init(name.getMethodName(), conf, false);
+
+ //Put data in memstore
+ this.store.add(new KeyValue(row, family, qf1, 1, value));
+ this.store.add(new KeyValue(row, family, qf2, 1, value));
+ this.store.add(new KeyValue(row, family, qf3, 1, value));
+ this.store.add(new KeyValue(row, family, qf4, 1, value));
+ this.store.add(new KeyValue(row, family, qf5, 1, value));
+ this.store.add(new KeyValue(row, family, qf6, 1, value));
+
+ Scan scan = new Scan(get);
+ InternalScanner scanner = (InternalScanner) store.getScanner(scan,
+ scan.getFamilyMap().get(store.getFamily().getName()),
+ 0);
+
+ List results = new ArrayList();
+ scanner.next(results);
+ Collections.sort(results, KeyValue.COMPARATOR);
+ scanner.close();
+
+ //Compare
+ Assert.assertEquals(expected.size(), results.size());
+ for(int i=0; i results = new ArrayList();
+ scanner.next(results);
+ Collections.sort(results, KeyValue.COMPARATOR);
+ scanner.close();
+
+ //Compare
+ Assert.assertEquals(expected.size(), results.size());
+ for(int i=0; i results = new ArrayList();
+ scanner.next(results);
+ Collections.sort(results, KeyValue.COMPARATOR);
+ scanner.close();
+
+ //Compare
+ Assert.assertEquals(expected.size(), results.size());
+ for(int i=0; i results = new ArrayList();
+ scanner.next(results);
+ Collections.sort(results, KeyValue.COMPARATOR);
+ scanner.close();
+
+ //Compare
+ Assert.assertEquals(expected.size(), results.size());
+ for(int i=0; i results = new ArrayList();
+ scanner.next(results);
+ Collections.sort(results, KeyValue.COMPARATOR);
+ scanner.close();
+
+ //Compare
+ Assert.assertEquals(expected.size(), results.size());
+ for(int i=0; i cells = res.listCells();
+ for(Cell cell : cells) {
+ // Verify the value
+ Assert.assertEquals(Bytes.toString(value),
+ Bytes.toString(CellUtil.cloneValue(cell)));
+ count++;
+ }
+ }
+ results.close();
+ Assert.assertEquals(3, count);
+ }
+
+ public void testGetFromMemStore(boolean reversed) throws Exception {
+ String TN = "testGetFromMemStore" + reversed;
+ setUp(defaultThreshold, TN);
+ long ts1 = System.currentTimeMillis();
+ long ts2 = ts1 + 1;
+ long ts3 = ts1 + 2;
+ byte [] value = generateMobValue((int)defaultThreshold+1);;
+
+ Put put1 = new Put(row1);
+ put1.add(family, qf1, ts3, value);
+ put1.add(family, qf2, ts2, value);
+ put1.add(family, qf3, ts1, value);
+ table.put(put1);
+
+ Scan scan = new Scan();
+ setScan(scan, reversed, false);
+
+ ResultScanner results = table.getScanner(scan);
+ int count = 0;
+ for (Result res : results) {
+ List cells = res.listCells();
+ for(Cell cell : cells) {
+ // Verify the value
+ Assert.assertEquals(Bytes.toString(value),
+ Bytes.toString(CellUtil.cloneValue(cell)));
+ count++;
+ }
+ }
+ results.close();
+ Assert.assertEquals(3, count);
+ }
+
+ public void testGetReferences(boolean reversed) throws Exception {
+ String TN = "testGetReferences" + reversed;
+ setUp(defaultThreshold, TN);
+ long ts1 = System.currentTimeMillis();
+ long ts2 = ts1 + 1;
+ long ts3 = ts1 + 2;
+ byte [] value = generateMobValue((int)defaultThreshold+1);;
+
+ Put put1 = new Put(row1);
+ put1.add(family, qf1, ts3, value);
+ put1.add(family, qf2, ts2, value);
+ put1.add(family, qf3, ts1, value);
+ table.put(put1);
+
+ table.flushCommits();
+ admin.flush(TN);
+
+ Scan scan = new Scan();
+ setScan(scan, reversed, true);
+
+ ResultScanner results = table.getScanner(scan);
+ int count = 0;
+ for (Result res : results) {
+ List cells = res.listCells();
+ for(Cell cell : cells) {
+ // Verify the value
+ assertIsMobReference(cell, row1, family, value, TN);
+ count++;
+ }
+ }
+ results.close();
+ Assert.assertEquals(3, count);
+ }
+
+ public void testMobThreshold(boolean reversed) throws Exception {
+ String TN = "testMobThreshold" + reversed;
+ setUp(defaultThreshold, TN);
+ byte [] valueLess = generateMobValue((int)defaultThreshold-1);
+ byte [] valueEqual = generateMobValue((int)defaultThreshold);
+ byte [] valueGreater = generateMobValue((int)defaultThreshold+1);
+ long ts1 = System.currentTimeMillis();
+ long ts2 = ts1 + 1;
+ long ts3 = ts1 + 2;
+
+ Put put1 = new Put(row1);
+ put1.add(family, qf1, ts3, valueLess);
+ put1.add(family, qf2, ts2, valueEqual);
+ put1.add(family, qf3, ts1, valueGreater);
+ table.put(put1);
+
+ table.flushCommits();
+ admin.flush(TN);
+
+ Scan scan = new Scan();
+ setScan(scan, reversed, true);
+
+ Cell cellLess= null;
+ Cell cellEqual = null;
+ Cell cellGreater = null;
+ ResultScanner results = table.getScanner(scan);
+ int count = 0;
+ for (Result res : results) {
+ List cells = res.listCells();
+ for(Cell cell : cells) {
+ // Verify the value
+ String qf = Bytes.toString(CellUtil.cloneQualifier(cell));
+ if(qf.equals(Bytes.toString(qf1))) {
+ cellLess = cell;
+ }
+ if(qf.equals(Bytes.toString(qf2))) {
+ cellEqual = cell;
+ }
+ if(qf.equals(Bytes.toString(qf3))) {
+ cellGreater = cell;
+ }
+ count++;
+ }
+ }
+ Assert.assertEquals(3, count);
+ assertNotMobReference(cellLess, row1, family, valueLess);
+ assertNotMobReference(cellEqual, row1, family, valueEqual);
+ assertIsMobReference(cellGreater, row1, family, valueGreater, TN);
+ results.close();
+ }
+
+ /**
+ * Assert the value is not store in mob.
+ */
+ private static void assertNotMobReference(Cell cell, byte[] row, byte[] family,
+ byte[] value) throws IOException {
+ Assert.assertEquals(Bytes.toString(row),
+ Bytes.toString(CellUtil.cloneRow(cell)));
+ Assert.assertEquals(Bytes.toString(family),
+ Bytes.toString(CellUtil.cloneFamily(cell)));
+ Assert.assertTrue(Bytes.toString(value).equals(
+ Bytes.toString(CellUtil.cloneValue(cell))));
+ }
+
+ /**
+ * Assert the value is store in mob.
+ */
+ private static void assertIsMobReference(Cell cell, byte[] row, byte[] family,
+ byte[] value, String TN) throws IOException {
+ Assert.assertEquals(Bytes.toString(row),
+ Bytes.toString(CellUtil.cloneRow(cell)));
+ Assert.assertEquals(Bytes.toString(family),
+ Bytes.toString(CellUtil.cloneFamily(cell)));
+ Assert.assertFalse(Bytes.toString(value).equals(
+ Bytes.toString(CellUtil.cloneValue(cell))));
+ byte[] referenceValue = CellUtil.cloneValue(cell);
+ String fileName = Bytes.toString(referenceValue, 8, referenceValue.length-8);
+ Path mobFamilyPath;
+ mobFamilyPath = new Path(MobUtils.getMobRegionPath(TEST_UTIL.getConfiguration(),
+ TableName.valueOf(TN)), hcd.getNameAsString());
+ Path targetPath = new Path(mobFamilyPath, fileName);
+ FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
+ Assert.assertTrue(fs.exists(targetPath));
+ }
+}
| | | | | | | | | | | | | | | | | |